Skip to main content
Donovan LaDuke - Developer

Reactive Programming in Kotlin - Flow


Featured in Android Weekly Issue 596

After covering the basics of reactive programming, there is no better place to start in code than with the Flow type. While this article does not intend to make the reader an expert in Flows, it instead intends to leave the reader proficient enough to understand why and when to use Flows and be able to read code written with Flows. Below is introduced a code sample that will be referenced, reiterated, and remixed throughout the article with the hopes that it is fully understood by the reader at the end.

flow {
  emit(1)
  emit(2)
}.map { value ->
  "Emitted Value $value"
}.collect { value -> println(value) }

Definitions #

A Flow represents a stream of sequential asynchronously generated values. This is juxtopose to suspending functions which return a single asynchronous value and collections which hold a sequential set of values but are not asynchronously generated. The values of a Flow are accessed by "collecting" the values, usually through a terminal operator (more on those later). A Flow "emits" values, that is it makes them available to be collected by adding them to the stream. After the Flow has been created but before it has been terminated, a "chain" of "operators" can be applied to the Flow to react to and manipulate the emissions on the Flow. A Flow is "cold" which means that the values are not emitted until a terminal operator is applied. This means a Flow object can be passed around with less concern about wasting resources.

flow {
  emit(1) // <-- Emitting Values
  emit(2)
}.map { value -> // <-- map Operator
  "Emitted Value $value"
}.collect { value -> println(value) } // Collecting Values

Flow Builders #

A Flow is started with a builder function which defines what values will be emitted and how. The two simplest are flowOf(...) which takes a varargs parameter and emits each parameter value in the resulting Flow and .asFlow() which emits all the values from its caller (e.g. a collection or range). The other common and more powerful solution is the flow {} function. This function takes a closure as its parameter which allows you to write whatever code you want inside while using the emit() function to add elements to the resulting Flow. Note that because a Flow is cold, the closure is not run until a terminal operator is applied. While these builders are very useful, it is often more common to consume a Flow created by a library such as Room which can expose your SQL query results as a Flow.

flow { // <-- Builder
  emit(1)
  emit(2)
}.map { value -> 
  "Emitted Value $value"
}.collect { value -> println(value) } 

(0..10).asFlow()

flowOf("Hello", "Flow", "World", "!")

Intermediate Operators #

Now that a Flow has been created, the values of that Flow can be manipulated and reacted to using operators. The special class of operators known as terminal operators will be handled next, for now the focus is on intermediate operators. These operators can limit the values, transform the values, emit additional values, and other important operations on the Flow. These allow for reuse of an underlying Flow by creating different resulting emissions through different chains of operators.

Examples #

flow {
  emit(1)
  emit(2)
}.map { value -> // <-- Intermediate Operator
  "Emitted Value $value"
}.collect { value -> println(value) } 

(0..10).asFlow()
  .onStart { emit(11) }
  .filter { it % 2 != 0 }
  .onEach { println("Odd Number $it") }
  .collect { ... }

Terminal Operators #

The final operators are Terminal Operators which take Flows and makes their values available to use by making the cold Flow active. The first type of terminal operators are canceling operators which retrieve a value or set of values and then cancel the underlying Flow (ending it's emissions). The most common operator of this type is first which receives the first element emitted and then cancels the Flow and receives no more values. The other type are non-canceling operators which continue to retrieve values from the Flow until it is completed or it is canceled by some other mechanism. The most common operator of this type is collect which runs it's closure for each value received.

flow {
  emit(1)
  emit(2)
}.map { value -> 
  "Emitted Value $value"
}.collect { value -> println(value) } // Terminal Operator

// x equals 0
val x = (0..10).asFlow()
  .first() 

Dispatchers #

The Flow operator closures and terminal operators are suspending functions to handle the asynchronous nature of Flows. The dispatcher used when calling the terminal operator will be used throughout the Flow chain if another dispatcher is not selected. To change the current dispatcher, the flowOn operator is applied. An important note about the flowOn operator is that it applies to all prior operators in the chain up to the next higher flowOn call instead of applying to all calls further down the chain.

flow { ... }
  .map { ... // On IO Dispatcher }
  .flowOn(Dispatchers.IO)
  .filter { ... // On Main Dispatcher }
  .flowOn(Dipatchers.Main)

Handling Exceptions #

An exception in a Flow chain will cancel the Flow and expose the exception to the block where the terminating operator resides. This allows the terminating block to try/catch the Flow and proceed as desired in the case of an exception. If the Flow should handle the exception, this can be done by using the catch operator or using try/catch inside another operator. If the catch operator is used, the Flow is still terminated but additional emissions can be performed in the closure. If a try/catch is called in another operator and the exception is handled, then the Flow will continue as normal instead of canceling.

flow {
  emit(1)
  throw exception("Something Went Wrong")
  emit(2)
}.map { value -> 
  "Emitted Value $value"
}.catch { emit("An Exception Occurred") }
.collect { value -> println(value) } 

(0..10).asFlow()
  .map {
    try {
      10 / it
    } catch(e: ArithmeticException) {
      0 // Return 0 to avoid Divide by 0
    }
  }.collect { ... }
// x equals 0

Combining and Flattening Flows #

Everything covered so far has shown the power of creating, transforming, and reacting to Flows, but how can Flows be brought together? The first way is by combining Flows using joining operators like combine. This allows the emissions of two different Flows to be brought together using a function to map their values together. The other way is by creating one Flow from emissions of another using operators like flatMapLatest. This results in a single flow but made up of values generated by the results of another Flow.

val flow1 = flow {
  emit(1)
  emit(2)
}

val flow2 = (0..10).asFlow()

// Combine
combine(flow1, flow2) { val1, val2 -> val1 + val2 }

// Flat Map
flow1.flatMapLatest { value -> flow2.onStart { emit(value) } }

Conclusion #

This only scratches the surface of all the powerful things that can be accomplished with Flows. A great resource for seeing the different Flow operations is Flow Marbles which is great for both seeing the options and exploring how they impact the Flow when they are applied. The next topic covered will be a child of Flow, SharedFlow. Until next time, thanks!

Did you find this content helpful?

Please share this post and be sure to subscribe to the RSS feed to be notified of all future articles!

Want to go above and beyond? Help me out at one of the services below, it goes a long way in helping run this site. Thank you in advance!

Donate with PayPal Buy me a Coffee on Ko-fi Support me on Patreon