 Java, Spring and Web Development tutorials  1. Overview
Java introduced the Stream API in Java 8, and it has since become a staple in Java development. It’s easy to use, understand, and maintain, and provides sequential and parallel processing options.
However, only a fixed number of intermediate operations could be performed with little flexibility. To overcome this limitation, Java 24 has introduced the Gatherer interface, which provides greater flexibility for intermediate operations.
In this tutorial, we’ll learn what Gatherers are and how to work with them.
2. Stream Gatherer
The goal of Stream Gatherers is to allow custom intermediate operations to make the pipelines more flexible and expressive. They support asynchronous and incremental processing while allowing for custom data grouping or accumulation.
They can transform elements to an m-to-n relationship, keep track of previously seen elements to decide on the transformation of later elements, enable parallel execution, and transform infinite streams to finite streams.
Next, we’ll take a look at the different functions that make up a Gatherer.
2.1. Gatherer Functions
The Gatherer has four functions that define how the elements are collected and transformed:
- initializer(): An optional function that stores the state while processing a stream. It provides the initial state for the transformations.
- integrator(): Integrates new stream elements, optionally in the context of the processing state, and optionally emitting elements downstream. Also capable of terminating the processing early, based on a conditional match. It controls the core behaviour of the transformations.
- combiner(): An optional function enabling parallel processing capabilities for a gatherer. This function is useful when processing elements in parallel by combining the two states. Without it, or if the input stream is not marked parallel, then the Gatherer processes sequentially.
- finisher(): An optional function invoked when no elements are left to consume in the stream. It’s useful for stateful operations like buffering or a sliding window
Next, let’s take a look at some built-in Gatherers.
2.2. fold()
fold() combines many elements to produce a final result in an ordered fashion. An advantage over the reduce() function is that we could still use the results in a stream.
Let’s take a look at a code example:
@Test
void givenNumbers_whenFolded_thenSumIsEmitted() {
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
Stream folded = numbers.gather(Gatherers.fold(() -> 0, Integer::sum));
List<Integer> resultList = folded.toList();
assertEquals(1, resultList.size());
assertEquals(Integer.valueOf(15), resultList.getFirst());
}
We’ve initialized the fold() method with an initial value of 0 and want to sum all the numbers from the input stream. Since gatherers are intermediate operations, we collect the results in a list and verify the expected outcome.
2.3. mapConcurrent()
As the name suggests, mapConcurrent() applies the function to all elements in parallel, given the supplied concurrency limit. It helps us avoid managing thread pools or working with Callable or Future.
We’ll analyze a code sample below:
@Test
void givenWords_whenMappedConcurrently_thenUppercasedWordsAreEmitted() {
Stream<String> words = Stream.of("a", "b", "c", "d");
List<String> resultList = words.gather(Gatherers.mapConcurrent(2, String::toUpperCase)).toList();
assertEquals(4, resultList.size());
assertEquals(List.of("A", "B", "C", "D"),resultList);
}
We set the maxConcurrency to 2, which is the maximum desired concurrency for the function toUpperCase(), and we verified the expected output.
2.4. scan()
scan() performs an incremental accumulation, meaning starting from the initial state, the current state is evaluated and applied to the current element to produce a value for downstream.
In the code example below, we’ve verified the same:
@Test
void givenNumbers_whenScanned_thenRunningTotalsAreEmitted() {
Stream<Integer> numbers = Stream.of(1, 2, 3, 4);
List<Integer> resultList = numbers.gather(Gatherers.scan(() -> 0, Integer::sum)).toList();
assertEquals(4, resultList.size());
assertEquals(List.of(1, 3, 6, 10),resultList);
}
We find the running total for the input stream using scan(). We provided an initial value of 0, and subsequently, the running totals for all the input values are calculated.
2.5. windowSliding()
As the name suggests, it’s related to the implementation of the sliding window algorithm. If the window size is greater than the stream input, then there will be only one window with all stream elements in it. In general, it gathers the input elements in sliding windows of a configured size.
Let’s take a look at an example below:
@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
List<List<Integer>> resultList = numbers.gather(Gatherers.windowSliding(3))
.toList();
assertEquals(3, resultList.size());
assertEquals(expectedOutput,resultList);
}
As expected, we get an m-to-n mapping of input elements grouped into a list of configured size.
3. Use Cases
So far, we’ve seen the built-in support for intermediate operations.
Now, let’s explore how we can build custom Gatherers for different input-output relations.
3.1. One-to-One
The only mandatory function of a Gatherer is the integrator(). Let’s convert an input stream of String elements to their lengths (one-to-one mapping), while only defining the integrator():
@Test
void givenStrings_whenUsingCustomGatherer_thenLengthsAreCalculated() {
List<Integer> expectedOutput = List.of(5, 6, 3);
Stream<String> inputStrings = Stream.of("apple", "banana", "cat");
List<Object> resultList = inputStrings.gather(Gatherer.of((state, element, downstream) -> {
downstream.push(element.length());
return true;
}))
.toList();
assertEquals(3, resultList.size());
assertEquals(expectedOutput, resultList);
}
We’ve defined the Gatherer integrator() as a lambda expression that pushes the length of the String downstream. We could’ve also defined a custom Gatherer class by extending the Gatherer interface.
3.2. One-to-Many
We’ll take a Stream of String elements as input and generate the combination of words by splitting the sentences.
Let’s define a custom Gatherer to explore the different functions:
public class SentenceSplitterGatherer implements Gatherer<String, List<String>,String> {
@Override
public Supplier<List<String>> initializer() {
return ArrayList::new;
}
@Override
public BinaryOperator<List<String>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}
@Override
public Integrator<List<String>, String, String> integrator() {
return (state, element, downstream) -> {
var words = element.split("\\s+");
for (var word : words) {
state.add(word);
downstream.push(word);
}
return true;
};
}
}
In SentenceSplitterGatherer, we’ve defined the initializer(), which returns an empty ArrayList as the initial state. Next, we have the combiner() needed for parallel processing capabilities. Finally, we have the integrator() logic, where we split the string and update the state and downstream for further processing.
Let’s verify, using some simple sentences, that our custom Gatherer works as expected:
@Test
void givenSentences_whenUsingCustomOneToManyGatherer_thenWordsAreExtracted() {
List<String> expectedOutput = List.of("hello", "world", "java", "streams");
Stream<String> sentences = Stream.of("hello world", "java streams");
List<String> words = sentences.gather(new SentenceSplitterGatherer())
.toList();
assertEquals(expectedOutput, words);
}
3.3. Many-to-One
Let’s define a custom Gatherer, where we initialize an empty ArrayList, define the summing logic for a stream of Integer values, and finally, the finisher() logic, which is executed when there are no more elements from the upstream:
public class NumericSumGatherer implements Gatherer<Integer, ArrayList<Integer>, Integer> {
@Override
public Supplier<ArrayList<Integer>> initializer() {
return ArrayList::new;
}
@Override
public Integrator<ArrayList<Integer>, Integer, Integer> integrator() {
return new Integrator<>() {
@Override
public boolean integrate(ArrayList<Integer> state, Integer element, Downstream<? super Integer> downstream) {
if (state.isEmpty()) {
state.add(element);
} else {
state.addFirst(state.getFirst() + element);
}
return true;
}
};
}
@Override
public BiConsumer<ArrayList<Integer>, Downstream<? super Integer>> finisher() {
return (state, downstream) -> {
if (!downstream.isRejecting() && !state.isEmpty()) {
downstream.push(state.getFirst());
state.clear();
}
};
}
}
The idea here is to sum all the incoming elements in a Stream. Let’s verify the same via a simple test case:
@Test
void givenNumbers_whenUsingCustomManyToOneGatherer_thenSumIsCalculated() {
Stream<Integer> inputValues = Stream.of(1, 2, 3, 4, 5, 6);
List<Integer> result = inputValues.gather(new NumericSumGatherer())
.toList();
Assertions.assertEquals(Integer.valueOf(21), result.getFirst());
}
3.4. Many-to-Many
Previously, we saw how the built-in windowSliding() Gatherer works.
Let’s implement the same functionality using custom logic and verify that the expected output is the same as the one with the built-in Gatherer:
public class SlidingWindowGatherer implements Gatherer<Integer, ArrayList<Integer>, List<Integer>> {
@Override
public Supplier<ArrayList<Integer>> initializer() {
return ArrayList::new;
}
@Override
public Integrator<ArrayList<Integer>, Integer, List<Integer>> integrator() {
return new Integrator<>() {
@Override
public boolean integrate(ArrayList<Integer> state, Integer element, Downstream<? super List<Integer>> downstream) {
state.add(element);
if (state.size() == 3) {
downstream.push(new ArrayList<>(state));
state.removeFirst();
}
return true;
}
};
}
@Override
public BiConsumer<ArrayList<Integer>, Downstream<? super List<Integer>>> finisher() {
return (state, downstream) -> {
if (state.size()==3) {
downstream.push(new ArrayList<>(state));
}
};
}
}
We initialize an empty window and set the size to 3. While integrating and finishing, we only push windows of the configured size.
Let’s verify our implementation with the same inputs as the built-in Gatherer:
@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
List<List<Integer>> resultList = numbers.gather(new SlidingWindowGatherer())
.toList();
Assertions.assertEquals(3, resultList.size());
Assertions.assertEquals(expectedOutput, resultList);
}
4. Conclusion
In this article, we first explored what the Gatherer API offers in terms of its features and the challenges it solves, namely providing similar capabilities to intermediate Stream operations as the collect() provides to terminal operations.
Next, we briefly touched on the different functions of the API and some of the built-in Gatherers available.
Finally, we implemented a few custom Gatherer implementations for different input-output relationships while looking at the different function implementations in more detail.
As always, the code is available over on GitHub. The post Stream Gatherers in Java first appeared on Baeldung.
Content mobilized by FeedBlitz RSS Services, the premium FeedBurner alternative. |