1. Overview

Using parallel operations on Kotlin collections allows us to process elements in a collection concurrently, taking advantage of multiple processor cores to improve performance. This can be very useful for computationally intensive tasks such as filtering, mapping, and data reduction.

In this article, we’ll discuss some approaches to perform parallel operations on Kotlin collections.

2. Parallel Operations on Collections

To explain how parallel operations work, we’ll use the following collection:

data class Person(val name: String, val age: Int, var isAdult: Boolean? = null)

private val people = listOf(
  Person("Martin", 12),
  Person("Ahmad", 42),
  Person("Alina", 13),
  Person("Alice", 30),
  Person("Bob", 16),
  Person("Charlie", 40)
)

In our examples, we’ll assign adult status (isAdult = true) if age is greater than or equal to 18 and isAdult = false if it is less than 18.

To make the parallel operations clearer, we’ll also display the system time and thread name (by default in SLF4J logger):

private fun Person.setAdult(){
    this.isAdult = this.age >= 18
    logger.info(this.toString())
}

We expect the output to be a collection of elements of people over the age of 15 and sorted by age:

private fun List<Person>.assertOver15AndSortedByAge() {
    assertThat(this).containsExactly(
      Person("Bob", 16, false),
      Person("Alice", 30, true),
      Person("Charlie", 40, true),
      Person("Ahmad", 42, true)
    )
}

We’ll use this extension function List<Person>.assertOver15AndSortedByAge() to make sure each of our solutions behaves as expected.

2.1. Using Coroutines

Coroutines can be relied on for parallel operations because they’re non-blocking, lightweight, flexible and allow us to run multiple tasks concurrently:

val filteredPeople = people
  .map { person ->
      async {
          person.setAdult()
          person
      }
  }
  .awaitAll()
  .filter { it.age > 15 }
  .sortedBy { it.age }

filteredPeople.assertOver15AndSortedByAge()

In people.map { person -> … }, we create a new coroutine for each person object using async { … }.

This allows coroutines to execute concurrently with other coroutines and the main thread.

We can look at the log output to see each operation running on a different coroutine thread:

13:03:44.484 [main @coroutine#1] INFO  - Using Coroutines
13:03:44.522 [main @coroutine#2] INFO  - Person(name=Martin, age=12, isAdult=false)
13:03:44.523 [main @coroutine#3] INFO  - Person(name=Ahmad, age=42, isAdult=true)
13:03:44.523 [main @coroutine#4] INFO  - Person(name=Alina, age=13, isAdult=false)
13:03:44.523 [main @coroutine#5] INFO  - Person(name=Alice, age=30, isAdult=true)
13:03:44.523 [main @coroutine#6] INFO  - Person(name=Bob, age=16, isAdult=false)
13:03:44.524 [main @coroutine#7] INFO  - Person(name=Charlie, age=40, isAdult=true)
13:03:44.529 [main @coroutine#1] INFO  - Total time taken: 40 ms

The awaitAll() ensures that all the asynchronous coroutines created in the map step are completed. This ensures that the filteredPeople list contains the results of all parallel processing.

2.2. Using Kotlin Flow

Coroutines Flow — often called Kotlin Flow or simply Flow — is an additional library built on top of Coroutines to handle streaming data asynchronously.

We can use flatMapMerge() to process elements in a Flow in parallel:

val filteredPeople = people.asFlow()
  .flatMapMerge { person ->
      flow {
          emit(
            async {
                person.setAdult()
                person
            }.await()
          )
      }
  }
  .filter { it.age > 15 }
  .toList()
  .sortedBy { it.age }

filteredPeople.assertOver15AndSortedByAge()

The code concurrently processes each person object in the people array using Flow:

13:03:44.706 [main @coroutine#8] INFO  - Using Kotlin Flow
13:03:44.738 [main @coroutine#16] INFO  - Person(name=Martin, age=12, isAdult=false)
13:03:44.739 [main @coroutine#17] INFO  - Person(name=Ahmad, age=42, isAdult=true)
13:03:44.739 [main @coroutine#18] INFO  - Person(name=Alina, age=13, isAdult=false)
13:03:44.739 [main @coroutine#19] INFO  - Person(name=Alice, age=30, isAdult=true)
13:03:44.739 [main @coroutine#20] INFO  - Person(name=Bob, age=16, isAdult=false)
13:03:44.739 [main @coroutine#21] INFO  - Person(name=Charlie, age=40, isAdult=true)
13:03:44.748 [main @coroutine#8] INFO  - Total time taken: 41 ms

But we must note that flatMapMerge() is an experimental feature in Kotlin Coroutines that isn’t yet stable or may change in future versions. So to be able to use it, we must add an annotation:

@OptIn(ExperimentalCoroutinesApi::class)

As usual, we can add annotations to classes or functions.

2.3. Using RxJava or RxKotlin

RxJava is a Java-based reactive programming library which is an implementation of reactive extensions. Meanwhile, RxKotlin is a Kotlin extension for RxJava:

val observable = Observable.fromIterable(people)
  .flatMap(
      { 
          Observable.just(it)
            .subscribeOn(Schedulers.computation())
            .doOnNext { person -> person.setAdult()}
      }, 
      people.size // Uses maxConcurrency for the number of elements
  )
  .filter { it.age > 15 }
  .toList()
  .map { it.sortedBy { person -> person.age } }
  .blockingGet()

observable.assertOver15AndSortedByAge()

First, we’ll convert the original people array into an Observable object:

Observable.fromIterable(people)

However, RxKotlin provides a more concise extension function as an alternative:

people.toObservable()

The flatMap() applies a transformation to each person emitted by the Observable. In this case, it creates a new Observable emitting the same person object.

Then, for controlled parallel operations, it’s highly recommended to explicitly set the maxConcurrency parameter in the flatMap(). This allows us to define the maximum number of concurrent inner Observables, ensuring predictable resource utilization.

Let’s see each operation run in a different thread in the log output:

13:03:44.691 [main] INFO  - Using RxKotlin
13:03:44.695 [RxComputationThreadPool-7] INFO  - Person(name=Martin, age=12, isAdult=false)
13:03:44.695 [RxComputationThreadPool-8] INFO  - Person(name=Ahmad, age=42, isAdult=true)
13:03:44.695 [RxComputationThreadPool-1] INFO  - Person(name=Alina, age=13, isAdult=false)
13:03:44.695 [RxComputationThreadPool-3] INFO  - Person(name=Bob, age=16, isAdult=false)
13:03:44.695 [RxComputationThreadPool-2] INFO  - Person(name=Alice, age=30, isAdult=true)
13:03:44.695 [RxComputationThreadPool-4] INFO  - Person(name=Charlie, age=40, isAdult=true)
13:03:44.696 [main] INFO  - Total time taken: 4 ms

We can see different thread names for each operation. This indicates that the operations are running in parallel.

2.4. Using Java Stream API

In Java 8, the Stream API introduced a powerful mechanism for processing collections of data in a declarative and functional manner.

We can use parallelStream() that is available for Collection types (like List, Set, etc.) that creates a parallel Stream from the elements of the Collection:

val filteredPeople = people.parallelStream()
  .map { person ->
      person.setAdult()
      person
  }.filter { it.age > 15 }
  .sorted { p1, p2 -> p1.age.compareTo(p2.age) }
  .collect(Collectors.toList())

filteredPeople.assertOver15AndSortedByAge()

When we call parallelStream(), the elements of the Collection are divided into several sub-Stream instances.

Each sub-Stream is then processed concurrently on a separate thread:

13:03:44.683 [main] INFO  - Using Stream API
13:03:44.688 [main] INFO  - Person(name=Alice, age=30, isAdult=true)
13:03:44.688 [ForkJoinPool.commonPool-worker-1] INFO  - Person(name=Ahmad, age=42, isAdult=true)
13:03:44.688 [ForkJoinPool.commonPool-worker-2] INFO  - Person(name=Charlie, age=40, isAdult=true)
13:03:44.688 [ForkJoinPool.commonPool-worker-4] INFO  - Person(name=Bob, age=16, isAdult=false)
13:03:44.688 [main] INFO  - Person(name=Alina, age=13, isAdult=false)
13:03:44.688 [ForkJoinPool.commonPool-worker-3] INFO  - Person(name=Martin, age=12, isAdult=false)
13:03:44.689 [main] INFO  - Total time taken: 5 ms

Finally, the results from each sub-Stream are combined to produce the terminal result of the Stream operations.

2.5. Using ExecutorService

Now we’ll use ExecutorService, an interface in Java that provides a way to execute tasks (Runnable or Callable) asynchronously.

First, we must create a pool of threads whose size is equal to the number of people elements:

val executor = Executors.newFixedThreadPool(people.size)

Then we call map{} to apply a lambda expression to each element (person) in people. We use the lambda expression to create a new Callable object and submit() it to the executor:

val futures = people
  .map { person ->
      executor.submit(Callable {
          person.setAdult()
          person
      }).get()
  }
  .filter { it.age > 15 }
  .sortedBy { it.age }

futures.assertOver15AndSortedByAge()

We can check the log again to see several concurrent threads were used:

13:03:44.700 [main] INFO  - Using ExecutorService
13:03:44.701 [pool-2-thread-1] INFO  - Person(name=Martin, age=12, isAdult=false)
13:03:44.701 [pool-2-thread-2] INFO  - Person(name=Ahmad, age=42, isAdult=true)
13:03:44.702 [pool-2-thread-3] INFO  - Person(name=Alina, age=13, isAdult=false)
13:03:44.702 [pool-2-thread-4] INFO  - Person(name=Alice, age=30, isAdult=true)
13:03:44.702 [pool-2-thread-5] INFO  - Person(name=Bob, age=16, isAdult=false)
13:03:44.702 [pool-2-thread-6] INFO  - Person(name=Charlie, age=40, isAdult=true)
13:03:44.703 [main] INFO  - Total time taken: 2 ms

Finally, we’ll stop the thread pool by calling shutdown():

executor.shutdown()

This ensures the executor releases the resources it holds.

3. Conclusion

In this tutorial, we discussed various approaches to perform parallel operations on Kotlin collections.

Coroutines and Kotlin Flow with their expressive Kotlin style can do this well. If we want to use third-party libraries, RxJava or RxKotlin are mature and reliable alternatives too. Alternatively, Java also has APIs for handling this, such as Stream and ExecutorService.

As always, the source code for the examples is available over on GitHub.

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments