Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Java’s Streams API is a powerful and versatile tool for processing data. By definition, a streaming operation is a single iteration through a set of data.

However, sometimes we want to process parts of the stream differently and get more than one set of results.

In this tutorial, we’ll learn how to split a stream into multiple groups and process them independently.

2. Using Collectors

Stream should be operated on once and have one terminal operation. It can have multiple intermediate operations, but the data can only be collected once before it closes.

This means that the Streams API specification explicitly forbids forking the stream and having different intermediate operations for each fork. This would lead to multiple terminal operations. However, we can split the stream inside the terminal operation. This creates a result divided into two or more groups.

2.1. Binary Split with partitioningBy

If we want to split a stream in two, we can use partitioningBy from the Collectors class. It takes a Predicate and returns a Map that groups elements that satisfied the predicate under the Boolean true key and the rest under false.

Let’s say we have a list of articles that contains information about the target sites they should be posted on and if they should be featured.

List<Article> articles = Lists.newArrayList(
  new Article("Baeldung", true),
  new Article("Baeldung", false),
  new Article("Programming Daily", false),
  new Article("The Code", false));

We’ll divide it into two groups, one containing only Baeldung articles and the second one containing the rest:

Map<Boolean, List<Article>> groupedArticles = articles.stream()
  .collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));

Let’s see which articles are filed under the true and false keys in the map:

assertThat(groupedArticles.get(true)).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
  new Article("Programming Daily", false),
  new Article("The Code", false));

2.2. Splitting with groupingBy

If we want to have more categories, then we need to use the groupingBy method. It takes a function that classifies each element into a group. Then it returns a Map that links each group classifier to a collection of its elements.

Let’s say we want to group articles by target site. The returned Map will have keys containing names of the sites and values containing collections of the articles associated with the given site:

Map<String, List<Article>> groupedArticles = articles.stream()
  .collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));

3. Using teeing

Since Java 12, we have another option for the binary split. We can use the teeing collector. teeing combines two collectors into one composite. Every element is processed by both of them and then merged into a single return value using the provided merger function.

3.1. teeing with a Predicate

The teeing collector pairs nicely with another collector from the Collectors class called filtering. It takes a predicate and uses it to filter processed elements and then passes them to yet another collector.

Let’s divide articles into groups of Baeldung and non-Baeldung ones and count them. We’ll also use the List constructor as a merger function:

List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
  Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
  Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
  List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);

3.2. teeing with Overlapping Results

There is one important difference between this solution and the previous ones. The groups we created earlier had no overlap, each element from the source stream belonged to at most one group. With teeing, we are no longer bound by this limitation because each collector potentially processes the whole stream. Let’s look at how we can take advantage of it.

We may want to process articles into two groups, one with featured articles only and the second one with Baeldung articles only. The resulting sets of articles may overlap as an article can be at the same time featured and targeted at Baeldung.

This time instead of counting, we’ll collect them into lists:

List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
  Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
  Collectors.filtering(article -> article.featured, Collectors.toList()),
  List::of));

assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);

assertThat(groupedArticles.get(0)).containsExactly(
  new Article("Baeldung", true),
  new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));

4. Using RxJava

While Java’s Streams API is a useful tool, sometimes it’s not enough. Other solutions, like reactive streams provided by RxJava, may be able to help us. Let’s look at a short example of how we can use an Observable and multiple Subscribers to achieve the same results as our Stream examples.

4.1. Creating an Observable

First, we need to create an Observable instance from our list of articles. We can use the Observable class’s from factory method:

Observable<Article> observableArticles = Observable.from(articles);

4.2. Filtering Observables

Next, we need to create Observables that will filter articles. To do that, we’ll use the filter method from the Observable class:

Observable<Article> baeldungObservable = observableArticles.filter(
  article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
  article -> article.featured);

4.3. Creating Multiple Subscribers

Finally, we need to subscribe to the Observables and provide an Action that will describe what we want to do with the articles. A real-world example would be saving them in the database or sending them to the client, but we’ll settle for adding them to the list:

List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);

5. Conclusion

In this tutorial, we learned how to split streams into groups and process them separately. First, we looked at the older Streams API methods: groupingBy and partitionBy. Next, we used a newer approach utilizing the teeing method introduced in Java 12. Finally, we looked at how we can use RxJava to achieve similar results with greater elasticity.

As always, the source code is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.