Benchmarking Java Streams – DZone – Uplaza

In my earlier article, I took a more in-depth have a look at the Java ExecutorService interface and its implementations, with some concentrate on the Fork/Be part of framework and ThreadPerTaskExecutor. At present, I wish to take a step ahead and examine how effectively they behave when put below stress. Briefly, I’m going to make benchmarks, loads of benchmarks.

All of the code from under, and extra, can be out there in a devoted GitHub repository.

Logic Beneath Benchmark

I wish to begin this textual content with a stroll by way of the logic that would be the base for benchmarks as it’s break up into two primary classes:

  1. Based mostly on the traditional stream
  2. Based mostly on the Fork/Be part of method

Traditional Stream Logic

public static Map groupByIncomingIp(Stream requests, 
LocalDateTime upperTimeBound, LocalDateTime lowerTimeBound) {
return requests
     .map(line -> line.break up(","))
     .filter(phrases -> phrases.size == 3)
      .map(phrases -> new Request(phrases[1], LocalDateTime.parse(phrases[2])))
      .filter(request -> request.timestamp().isBefore(upperTimeBound) && 
               request.timestamp().isAfter(lowerTimeBound))
      .map(i -> new Ip(i.ip()))
      .gather(groupingBy(i -> i, summingInt(i -> 1)));
}

In principle, the aim of this piece of code is to rework an inventory of strings, then do some filtering and grouping round and return the map. Provided strings are within the following format:

1,192.168.1.1,2023-10-29T17:33:33.647641574

It represents the occasion of studying an IP deal with making an attempt to entry a selected server. The output maps an IP deal with to the variety of entry makes an attempt in a selected interval, expressed by decrease and higher time boundaries.

Fork/Be part of Logic

@Override
public Map compute() {
    if (information.dimension() >= THRESHOLD) {
        Map output = new HashMap();
        ForkJoinTask
            .invokeAll(createSubTasks())
            .forEach(activity -> activity
                     .be a part of()
                     .forEach((okay, v) -> updateOutput(okay, v, output))
                    );
        return output;
    }
    return course of();
}

personal void updateOutput(Ip okay, Integer v, Map output) {
    Integer currentValue = output.get(okay);
    if (currentValue == null) {
        output.put(okay, v);
    } else {
        output.substitute(okay, currentValue + v);
    }
}

personal Record createSubTasks() {
    int dimension = information.dimension();
    int center = dimension / 2;
    return Record.of(
        new ForkJoinDefinition(new ArrayList(information.subList(0, center)), now),
        new ForkJoinDefinition(new ArrayList(information.subList(center, dimension)), now)
    );
}

personal Map course of() {
    return groupByIncomingIp(information.stream(), upperTimeBound, lowerTimeBound);
}

The one impactful distinction right here is that I break up the dataset into smaller batches till a sure threshold is reached. By default, the brink is ready to twenty. After this operation, I begin to carry out the computations. Computations are the identical as within the traditional stream method logic described above – I’m utilizing the groupByIncomingIp methodology.

JMH Setup

All of the benchmarks are written utilizing Java Microbenchmark Harness (or JMH for brief).

I’ve used JMH in model 1.37 to run benchmarks. Benchmarks share the identical setup: 5 warm-up iterations and twenty measurement iterations.

There are two completely different modes right here: common time and throughput. Within the case of common time, the JMH measures the typical execution time of code below benchmark, and output time is expressed in milliseconds.

For throughput, JMH measures the variety of operations – full execution of code – in a selected unit of time, milliseconds on this case. The result’s expressed in ops per millisecond.

In additional JMH syntax:

@Warmup(iterations = 5, time = 10, timeUnit = SECONDS)
@Measurement(iterations = 20, time = 10, timeUnit = SECONDS)
@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
@OutputTimeUnit(MILLISECONDS)
@Fork(1)
@Threads(1)

Moreover, every benchmark has its distinctive State with a Benchmark scope containing all the info and variables wanted by a selected benchmark.

Benchmark State

Traditional Stream

The bottom benchmark state for Traditional Stream will be considered under.

@State(Scope.Benchmark)
public class BenchmarkState {

    @Param({"0"})
    public int dimension;
    public Record enter;
    public ClassicDefinition definitions;
    public ForkJoinPool forkJoinPool_4;
    public ForkJoinPool forkJoinPool_8;
    public ForkJoinPool forkJoinPool_16;
    public ForkJoinPool forkJoinPool_32;
    personal remaining LocalDateTime now = LocalDateTime.now();

    @Setup(Degree.Trial)
    public void trialUp() {
        enter = new TestDataGen(now).generate(dimension);
        definitions = new ClassicDefinition(now);
        System.out.println(enter.dimension());
    }

    @Setup(Degree.Iteration)
    public void up() {
        forkJoinPool_4 = new ForkJoinPool(4);
        forkJoinPool_8 = new ForkJoinPool(8);
        forkJoinPool_16 = new ForkJoinPool(16);
        forkJoinPool_32 = new ForkJoinPool(32);
    }

    @TearDown(Degree.Iteration)
    public void down() {
        forkJoinPool_4.shutdown();
        forkJoinPool_8.shutdown();
        forkJoinPool_16.shutdown();
        forkJoinPool_32.shutdown();
    }
}

First, I arrange all of the variables wanted to carry out benchmarks. Other than the dimension parameter, which is especially particular on this half, thread swimming pools can be used solely within the benchmark.

The dimension parameter, alternatively, is kind of an attention-grabbing mechanism of JMH. It permits the parametrization of a sure variable used in the course of the benchmark. You will note how I took benefit of this later once we transfer to operating benchmarks.

As for now, I’m utilizing this parameter to generate the enter dataset that may stay unchanged all through the entire benchmark – to realize higher repeatability of outcomes.

The second half is an up methodology that works equally to @BeforeEach from the JUnit library.

Will probably be triggered earlier than every of the 20 iterations of my benchmark and reset all of the variables used within the benchmark. Due to such a setting, I begin with a transparent state for each iteration.

The final half is the down methodology that works equally to @AfterEach from the JUnit library.

Will probably be triggered after every of the 20 iterations of my benchmark and shut down all of the thread swimming pools used within the iteration – principally to deal with attainable reminiscence leaks.

Fork/Be part of

The state for the Fork/Be part of model appears as under.

@State(Scope.Benchmark)
public class ForkJoinState {

    @Param({"0"})
    public int dimension;
    public Record enter;
    public ForkJoinPool forkJoinPool_4;
    public ForkJoinPool forkJoinPool_8;
    public ForkJoinPool forkJoinPool_16;
    public ForkJoinPool forkJoinPool_32;
    public remaining LocalDateTime now = LocalDateTime.now();

    @Setup(Degree.Trial)
    public void trialUp() {
        enter = new TestDataGen(now).generate(dimension);
        System.out.println(enter.dimension());
    }

    @Setup(Degree.Iteration)
    public void up() {
        forkJoinPool_4 = new ForkJoinPool(4);
        forkJoinPool_8 = new ForkJoinPool(8);
        forkJoinPool_16 = new ForkJoinPool(16);
        forkJoinPool_32 = new ForkJoinPool(32);
    }

    @TearDown(Degree.Iteration)
    public void down() {
        forkJoinPool_4.shutdown();
        forkJoinPool_8.shutdown();
        forkJoinPool_16.shutdown();
        forkJoinPool_32.shutdown();
    }
}

There isn’t a huge distinction between the setup for traditional stream and Fork/Be part of. The one distinction comes from putting the definitions inside benchmarks themselves, not in state as within the case of the Traditional method.

Such change comes from how RecursiveTask works – activity executions are memoized and saved – thus, it might probably impression total benchmark outcomes.

Benchmark Enter

The essential enter for benchmarks is an inventory of strings within the following format:

1,192.168.1.1,2023-10-29T17:33:33.647641574

Or in a extra generalized description:

{ordering-number},{ip-like-string},{timestamp}

There are 5 completely different enter sizes:

  1. 100
  2. 1000
  3. 10000
  4. 100000
  5. 1000000

There’s some deeper which means behind the sizes, as I consider that such a dimension vary can illustrate how effectively the answer will scale and doubtlessly present some efficiency bottleneck.

Moreover, the general setup of the benchmark may be very versatile, so including a brand new dimension shouldn’t be tough if somebody is serious about doing so.

Benchmark Setup

Traditional Stream

There’s solely a single class associated to the traditional stream benchmark. Completely different sizes are dealt with on a State degree.

public class ClassicStreamBenchmark extends BaseBenchmarkConfig {
   @Benchmark
   public void bench_sequential(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.sequentialStream(state.enter);
       bh.eat(map);
   }

   @Benchmark
   public void bench_defaultParallelStream(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.defaultParallelStream(state.enter);
       bh.eat(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_4(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_4, state.enter);
       bh.eat(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_8(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_8, state.enter);
       bh.eat(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_16(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_16, state.enter);
       bh.eat(map);
   }

   @Benchmark
   public void bench_parallelStreamWithCustomForkJoinPool_32(SingleStreamState state, Blackhole bh) {
       Map map = state.definitions.parallelStreamWithCustomForkJoinPool(state.forkJoinPool_32, state.enter);
       bh.eat(map);
   }
}

There are six completely different benchmark setups of the identical logic:

  1. bench_sequential: Easy benchmark with only a singular sequential stream
  2. bench_defaultParallelStream: Benchmark with default Java parallel stream through .parallelStream() methodology of Stream class in follow a commonPool from ForkJoinPool and parallelism of 19 (at the least on my machine)
  3. bench_parallelStreamWithCustomForkJoinPool_4: Customized ForkJoinPool with parallelism degree equal to 4
  4. bench_parallelStreamWithCustomForkJoinPool_8: Customized ForkJoinPool with parallelism degree equal to eight
  5. bench_parallelStreamWithCustomForkJoinPool_16: Customized ForkJoinPool with parallelism degree equal to 16
  6. bench_parallelStreamWithCustomForkJoinPool_32 : Customized ForkJoinPool with parallelism degree equal to 32

For traditional stream logic, I’ve 6 completely different setups and 5 completely different enter sizes leading to a complete of 30 completely different distinctive mixtures of benchmarks.

Fork/Be part of

public class ForkJoinBenchmark extends BaseBenchmarkConfig {
   @Benchmark
   public void bench(ForkJoinState state, Blackhole bh) {
       Map map = new ForkJoinDefinition(state.enter, state.now).compute();
       bh.eat(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_4(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_4.invoke(forkJoinDefinition);
       bh.eat(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_8(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_8.invoke(forkJoinDefinition);
       bh.eat(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_16(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_16.invoke(forkJoinDefinition);
       bh.eat(map);
   }

   @Benchmark
   public void bench_customForkJoinPool_32(ForkJoinState state, Blackhole bh) {
       ForkJoinDefinition forkJoinDefinition = new ForkJoinDefinition(state.enter, state.now);
       Map map = state.forkJoinPool_32.invoke(forkJoinDefinition);
       bh.eat(map);
   }
}

There are six completely different benchmark setups of the identical logic:

  1. bench -> easy benchmark with only a singular sequential stream
  2. bench_customForkJoinPool_4: Customized ForkJoinPool with parallelism degree equal to 4
  3. bench_customForkJoinPool_8: Customized ForkJoinPool with parallelism degree equal to eight
  4. bench_customForkJoinPool_16: Customized ForkJoinPool with parallelism degree equal to 16
  5. bench_customForkJoinPool_32: Customized ForkJoinPool with parallelism degree equal to 32

For traditional stream logic, I’ve 5 completely different setups and 5 completely different enter sizes leading to a complete of 25 completely different distinctive mixtures of benchmarks.

What’s extra, in each instances I’m additionally utilizing the Blackhole idea from JMH to “cheat” the compiler optimization of lifeless code. There’s extra about Blackholes and their use case right here.

Benchmark Surroundings

Machine 1

The assessments we performed on my Dell XPS with the next parameters:

  • OS: Ubuntu 20.04.6 LTS
  • CPU: i9-12900HK × 20
  • Reminiscence: 64 GB

JVM

  • OpenJDK model “21” 2023-09-19
  • OpenJDK Runtime Surroundings (construct 21+35-2513)
  • OpenJDK 64-Bit Server VM (construct 21+35-2513, combined mode, sharing)

Machine 2

The assessments we performed on my Lenovo Y700 with the next parameters:

  • OS: Ubuntu 20.04.6 LTS
  • CPU: i7-6700HQ × 8
  • Reminiscence: 32 GB

JVM

  • OpenJDK model “21” 2023-09-19
  • OpenJDK Runtime Surroundings (construct 21+35-2513)
  • OpenJDK 64-Bit Server VM (construct 21+35-2513, combined mode, sharing)

For each machines, all aspect/insignificant purposes had been closed. I attempted to make the runtime system as pure as attainable in order to not generate any undesirable efficiency overhead. Nonetheless, on a pure Ubuntu server or when run inside a container, the general efficiency could differ.

Benchmark Report

The outcomes of operating benchmarks are saved in .csv information and the GitHub repository within the reviews listing. Moreover, to ease the obtain of reviews, there’s a separate .zip file named reviews.zip that comprises all of the .csv information with information.

Stories directories are structured on per dimension foundation with three particular reviews for all enter sizes:

  • report_classic: All enter sizes for traditional stream
  • report_forkjoin: All enter sizes for fork/be a part of stream
  • report_whole: All enter sizes for each traditional and fork/be a part of stream

Every report listing from the above 3 separate information:

  • averagetime.csv: Outcomes for common time mode benchmarks
  • throughput.csv: Outcomes for throughput mode benchmarks
  • complete.csv: Mix outcomes for each modes

For the actual reviews, I’ve two codecs: averagetime.csv and throughput.csv share one format, and complete.csv has a separate one. Let’s name them modes and complete codecs.

The modes report comprises eight columns:

  1. Label: Title of the benchmark
  2. Enter Dimension: Benchmark enter dimension
  3. Threads: Variety of threads utilized in benchmark from set 1,4,7,8,16,19,32
  4. Mode: Benchmark mode, both common time or throughput
  5. Cnt: The variety of benchmark iterations ought to all the time be equal to twenty
  6. Rating: Precise outcomes of benchmark
  7. Rating Imply Error: Benchmark measurement error
  8. Models: Models of benchmark both ms/op (for common time) or ops/ms (for throughput)

The whole report comprises 10 columns:

  1. Label: Title of the benchmark
  2. Enter Dimension: Benchmark enter dimension
  3. Threads: Variety of threads utilized in benchmark from set 1,4,7,8,16,19,32
  4. Cnt: The variety of benchmark iterations ought to all the time be equal to twenty
  5. AvgTimeScore: Precise outcomes of benchmark for common time mode
  6. AvgTimeMeanError: Benchmark measurement error for common time mode
  7. AvgUnits: Models of benchmark for common time mode in ms/op
  8. ThroughputScore: Precise outcomes of benchmark
  9. ThroughputMeanError: Benchmark measurement error for throughput mode
  10. ThroughputUnits: Models of benchmark for throughput mode in ops/ms

Outcomes Evaluation

Assumptions

Baseline

I’ll current common outcomes and insights primarily based on the scale of 10000 – so I can be utilizing the .csv information from the report_10000 listing.

There are two foremost causes behind selecting this specific information dimension.

  1. The execution time is excessive sufficient to point out any distinction primarily based on completely different setups.
  2. Information sizes 100 and 1000 are, for my part, too small to note some efficiency bottlenecks

Thus, I feel that an in-depth evaluation of this specific information dimension can be essentially the most impactful.

After all, different sizes may even get a outcomes overview nevertheless it is not going to be as thorough as this one except I encounter some anomalies – compared to the conduct for dimension 10000.

A Phrase On Fork/Be part of Native Method

With the present code below benchmark, there can be efficiency overhead related to Fork/Be part of benchmarks.

Because the fork-join benchmark logic closely depends on splitting the enter dataset there have to be a second when all the outcomes are mixed right into a single cohesive output. That is the fragment that isn’t included in regular benchmarks, so appropriately understanding its impression on total efficiency is essential.

Please bear in mind about this.

Evaluation

Machine 1 (20 cores)

As you may see above one of the best total outcome for enter quantity 10 1000’s on machine 1 belongs to variations with defaultParallelStream.

For ClassicStream-based benchmarks, bench_defaultParallelStream returns by far one of the best outcome. Even once we consider a attainable error in measurements, it nonetheless comes on high.

Setup for ForkJoinPool with parallelism 32 and 16 and return worse outcomes. On one hand, it’s shocking – for parallelism 32, I’d count on a greater rating than for the default pool (parallelism 19). Nonetheless, parallelism 16 has worse outcomes than each parallelism 19 and 32.

With 20 CPU threads on Machine 1, parallelism 32 just isn’t sufficient to image efficiency degradation brought on by an overabundance of threads. Nonetheless, you’ll be capable of discover such conduct for Machine 2. I’d assume that to point out such conduct on Machine 1, the parallelism must be set to 64 or extra.

What’s curious right here is that the connection with bench_defaultParallelStream approaching high appears to not maintain for increased enter sizes of 100k and a million. The very best efficiency belongs to bench_parallelStreamWithCustomForkJoinPool_16 which can point out that in the long run, moderately smaller parallelism could also be a good suggestion.

The Fork/Be part of-based implementation is noticeably slower than the default parallel stream implementation, with round 10 % worse efficiency. This sample additionally happens for different sizes. It confirms my assumption from above that becoming a member of completely different smaller components of a break up information set has a noticeable impression.

After all, the worst rating belongs to the single-threaded method and is round 5 instances slower than one of the best outcome. Such a scenario is predicted, as a single-threaded benchmark is a type of baseline for me. I need to examine how far we are able to transfer its execution time and 5 instances higher common execution time within the best-case situation looks like rating.

As for the worth of the rating imply error it is extremely very small. Within the worst case (the very best error), it’s inside 1,5% of its respectable rating (outcome for ClassicStreamBenchmark.bench_parallelStreamWithCustomForkJoinPool_4).

In different instances, it varies from 0,1 % to 0,7 % of the general rating.

There appears to be no distinction in outcome positions for sizes above 10 thousand.

Machine 2 (8 cores)

As within the case of Machine 1, the primary rating additionally belongs to bench_defaultParallelStream. Once more, even once we think about a attainable measurement error, it nonetheless comes out on high: nothing particularly attention-grabbing.
What’s attention-grabbing, nevertheless, is that the sample of the primary 3 positions for Machine 2 adjustments quite a bit primarily based on increased enter sizes. For enter 100 to 10000, now we have considerably comparable conduct, with bench_defaultParallelStream occupying 1 place and bench_parallelStreamWithCustomForkJoinPool_8 following shortly after.

Then again, for inputs 100000 and 1000000, the primary place belongs to bench_parallelStreamWithCustomForkJoinPool_8 adopted by bench_parallelStreamWithCustomForkJoinPool_32. Whereas bench_defaultParallelStream is moved to 4th and third positions.

One other curious factor about Machine 2 could also be that for smaller enter sizes, parallelism 32 is kind of far-off from the highest. Such efficiency degradation could also be brought on by the overabundance of threads in comparison with the 8 CPU threads complete out there on the machine.

However, on inputs 100000 and 1000000, ForkJoinPool with parallelism 32 is within the second place, which can point out that for longer time spans, such overabundance of threads just isn’t an issue.

Another points which can be similar to the conduct of Machine 1 are skipped right here and are talked about under.

Frequent Factors

There are a number of observations legitimate for each machines:

  1. My ForkJoinNative (“naive”)-based benchmarks yield outcomes which can be noticeably worse, round 10% on each machines, than these delivered by default variations of a parallel stream and even ones with customForkJoinPool. After all, one of many causes is that they don’t seem to be optimized in any approach. There are most likely some low-hanging efficiency fruits right here. Thus, I strongly advocate getting conversant in the Fork/Be part of framework, earlier than transferring its implementations to manufacturing.
  2. The time distinction between positions one to a few may be very, very small – lower than a millisecond. Thus, it might be onerous to realize any kind of repeatability for these benchmarks. With such a small distinction it’s straightforward for the outcomes distribution to vary between benchmark runs.
  3. The imply error of the scores can be very, very small, as much as 2% of the general rating in worse instances – principally lower than 1%. Such low error could point out two issues. The primary, benchmarks are dependable as a result of outcomes are targeted round some level. If there have been some anomalies alongside the best way the error can be increased. Second, JMH is sweet at making measurements.
  4. There isn’t a breaking distinction in outcomes between throughput and common time modes. If one of many benchmarks carried out effectively in common time mode, it will additionally carry out effectively in throughput mode.

Above you may see all of the variations and similarities I discovered contained in the report information. In case you discover anything that appears to be attention-grabbing don’t hesitate to say it within the remark part under.

Abstract

Earlier than we lastly break up methods for at this time, I wish to point out another crucial factor:

JAVA IS NOT SLOW

Processing the checklist with a million parts, all potential JMH overhead, and a single thread takes 560 milliseconds (Machine 1) and 1142 milliseconds (Machine 2). There aren’t any particular optimizations or magic included, simply pure default JVM.

The whole greatest time for processing a million parts for Machine 1 was 88 milliseconds for ClassicStreamBenchmark.bench_parallelStreamWithCustomForkJoinPool_16. Within the case of Machine 2, it was 321 milliseconds for ClassicStreamBenchmark.bench_parallelStreamWithCustomForkJoinPool_8.

Though each outcomes will not be nearly as good as C/C++-based options, the relative simplicity and descriptiveness of the method make it very attention-grabbing, for my part.

Total, it’s fairly a pleasant addition to Java’s one billion rows problem.

I’d identical to to say that every one the reviews and benchmark code are within the GitHub repository (linked within the introduction of this text). You possibly can simply confirm my outcomes and examine them to the benchmark conduct in your machine.

Moreover, to ease up the obtain of reposts there’s a separate .zip file named reviews.zip that comprises all of the .csv information with information.

Moreover, bear in mind Java just isn’t gradual.

Thanks on your time.

Evaluation by: Krzysztof Ciesielski, Łukasz Rola

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version