LiveData, Flow, Channel.. why we need all these pipes?

What you will learn:

  • Observer pattern
  • Cold and hot streams
  • Synchronious vs asynchronious code
  • LiveData
  • Channel
  • Flow

Well..it all began with reactive programming paradigm. Rective programming started in .Net platform and became popular when Netflix ported it over Java, naming it RxJava. It introduced complete new way of thinking about our programming model, famously phrased as:

everything is a stream, and it’s observable

But I like to call it: Don’t fetch the data, it will come to you.

The rest is history! Nowdays every app has reactive patterns and in this modern age of streams there are many, many ways to produce and consume a stream of data. We will review APIs and characteristics of three of them: LiveData, Flow, and Channel. But before a deep dive into pipes system, we need to know few base concepts: Obserbale pattern, Cold and hot streams, synchronious vs asynchronius code.

Observable Pattern

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

import java.util.Scanner

typealias Observer = (event: String) -> Unit;

class EventSource {
private val observers = mutableListOf<Observer>()

private fun notifyObservers(event: String) {
observers.forEach { it(event) }
}


fun addObserver(observer: Observer) {
observers += observer
}

fun scanSystemIn() {
val scanner = Scanner(System.`in`)
while (scanner.hasNext()) {
val line = scanner.nextLine()
notifyObservers(line)
}
}
}
fun main(arg: List<String>) {
println("Enter Text: ")
val eventSource = EventSource()

eventSource.addObserver { event ->
println("Received response: $event")
}

eventSource.scanSystemIn()
}

Here EventSource acts as Observalbe — it emits events to whoever subscribes (observers). The main method adds one observer, that will be notified for the event whenever EventSource emits one.

Why I share this with you? Beause I want you to be cool and know that all channels and pipes are just different implementations of this Observable pattern.

🍨Cold or Hot 🔥

When you emit data you have two options:

  1. Emit data no matter if you have observers.
  2. Emit data only if you have active observers to listen.👂

The first option is called *HOT* stream and it is suitable for bulk data emits as changes in your location while driving.

The second is *COLD* and it is used for data that is mandatory to be received, for example single response of a network request.

Remember it that way: Hot streams will melt any container, so they cannot be holded, while cold ones are good to store, they are actually frozen, so you can use the data whenever you need it.

Synchronious vs Asynchronious Code

Sorry, another foundation you need to know 😑

  • Synchronious or sequential code means that the code is executed line by line and it is blocking. The computer will block on each statement until the work is complete before moving on to the next statement. If you have long running operation you need to wait till it ends, so you are blocked (and bored). This is called synchronious execution:
for (value in values) {
println(value)
}
  • Asynchronious or concurrent code, on the other hand, is when you run something asynchronously - it is non-blocking, you execute it without waiting for it to complete and carry on with other things. This is like starting another application from yours and expecting the result at some point in the time, whenever it is ready, not whenever you need it 😠. The base of asynchronous programming is concurrency and parallelism. Bonus — I won’t explain the difference.
val one = async { doSomethingUsefulOne() } // not blocking!!
val two = async { doSomethingUsefulTwo() } // not blocking!!
/**
I am not blocked, so I can continue with my code. But instead I will sing.. Whenever I need the result I just need to call await().
**/

singASongHere()
// Take the result if ready, if not - well you are blocked!!
val sum = one.await() + two.await()

Any long-running operation should be asynchronous because actively waiting for a it to end can freeze your programs.

You can use Collections orSequences for synchronous streams. But you need a different solution for asynchronous streams. So here come LiveData, Flow and Channels.

🍧 LiveData

LiveData is the new reactive way to communicate between ViewModel and View in Android. View subscribes to the data, ViewModel emits it whenever it is available. It has few main characteristis:

  • It stores the last received data.
  • It is cold and won’t emit the data until someone listen.
  • It can have only one subscriber.
  • It unsubscribes and subscribes according to the View state, so it prevents memory leaks.

LiveData is cold. It won’t emit values to theView if it is not subscried or active (not in a pause).

If you send data to LiveData it will store it, but in the following manner: If your view is paused and you emit few values during this time, let’s say 1, 2, 3 when the view is resumed it will receive only 3. So data is dropped along the line. This is very important and often misunderstood by Android developers. RememberLiveData stores only the last received data, it drops everything transmitted in paused view state. ❗️

Before LiveData to be released we used to use RxJava’s BehaviorSubject inside ViewModel to emit view states. It is cold and in the moment you subscribe you receive the latest data sent. BUT it wasn’t lifecycle aware, so in order to stop data emitting when my view is paused, I had to manually subscribe at onResume and unsubscribe at onPause. What about onDestroy!? you name it…

LiveData is an enhanced solution. It is also cold and it stores only the last data sent, BUT it observes lifecylce of the activity / fragment for onPause, onResume calls and does subscribe and unsubscribe calls for you. So you won’t receive values to your view when it is paused (recall those Can not perform this action after onSaveInstanceState exceptions. With LiveData you won’t have them anymore.)

LiveData stores only ONE data, and you can have only ONE subscriber.

The other specifics of LiveData is that you can post events only from the main thread and events are received only on it. Be carefull using LiveData everywhere, its events are dispatched on the main thread. ❗️ So using LiveData outside of aViewModel is not a good idea.

👀 Let’s see the API:

  • Create
val currentName: MutableLiveData<String> by lazy {
MutableLiveData<String>()
}
  • Observe
// Observe the LiveData
// *this* parameter is the activity and it is the LifecycleOwner
model.currentName.observe(this, { newName ->
// Update the UI, in this case, a TextView.
nameTextView.text = newName
})
}
  • Emit
currentName.setValue(anotherName)

Note: setValue can be called from the main thread. If you wish to emit value from another thread then use postValue.

currentName.postValue(anotherName)

🎊 Tricks

  • You can extend LiveData and put logic in its onActive()method. It is called when LiveData object has an active observer.
  • You can transfrom one value to another
val userName: LiveData<String> = Transformations.map(userLiveData) {
user -> "${user.name} ${user.lastName}"
}

Be carefull your transformation happens on the main thread, don’t do long running tasks there ❗️

  • You can merge multiple LiveData sources
MediatorLiveData liveDataMerger = new MediatorLiveData<>();

liveDataMerger.addSource(liveData1,
value -> liveDataMerger.setValue(value));
liveDataMerger.addSource(liveData2,
value -> liveDataMerger.setValue(value));
  • You can use coroutines with LiveData
val user: LiveData<User> = liveData(context = Dispatcher.IO) {
val data = database.loadUser() // loadUser is a suspend //function.
emit(data)
}
  • You can emit multiple values from a LiveData by calling the emitSource().

Be carefull of the context, See Dispatcher.IO usage above. ❗️What would happen without it?

🔥 Channel

Channels were introduced in Kotlin as a way for communication between multiple coroutines.

  • Channels are hot — they emit no matter if someone listen.
  • Can have multiple subscribers.
  • Channels are very useful to control an endless stream of data asynchronoiously.
  • There are many types of channels, and they act in different ways — some are cold, some are hot, some are blocking, some are not.

Channel is a non-blocking primitive for communication between a sender and a receiver. Conceptually, a channel is similar to Java’s BlockingQueue, but it has suspending operations instead of blocking ones and can be closed.

Channels are hot. They will emit data even if there’s no-one observing it.

Note: There is a type of channel that can act as a cold one (Rendezvous channel)

Channels can have multiple subscribers. Values are served in first-in first-out order, e.g. the first subscriber gets the element. If you put 9 subscribers and 10 values, subscriber 1 will receive 1, subscriber 2 will receive 2, etc.. subscriber 9 will receive 9 and then subscrber 1 will receive 10.

Channel is the way to share information between coroutines. When needed, many coroutines can send information to the same channel, and many coroutines can receive information from it.

Still don’t know what I am talking about? Recall my asynchronious code example. It used await() to receive the results. What if I have an endless stream of values? My program will block forever. Here comes the power of channels — I can use a channel to controll how and when I want my data served.

val channel = Channel<Int>()
launch {
channel.send(doSomethingUsefulOne())
channel.send(doSomethingUsefulTwo())
channel.close() // No more data!
}
println("Answers are: ")
for (y in channel) println(y)println("Done")

So it is really a pipe with sophysticated features. Channels are not blocking because they have send and receive operations that can suspend. This happens when the channel is empty or full, then your send or receive operations are suspended. Once your buffer has space your send operation will resume. Once you buffer is not empty, your receive operation will resume.

There are different types of channels:

  • Unlimited channel — unlimited capacity (limited only by available memory).
  • Buffered channel — you give the buffer size. If the buffer is full it wont receive new values.
  • “Rendezvous” channel — This channel does not have any buffer at all. An element is transferred from the sender to the receiver only when send and receive invocations meet in time (rendezvous).
  • Conflated channel —This channel buffers at most ONE element. Only the last sent element is received, while previously sent elements are lost. So you can send unlimited values here, and subscriber will aways receive a value — last value sent. Sounds familiar? It is like my BehaviourSubject. 👋

You see different channels act in different ways — randezvous channel won’t emit value unless there is a receiver, conflated channel will store last received value and will emit it to a subscriber later.

What is the difference between Channel and LiveData? Chanel will emit every value it receives and won’t store the last emited value (unless it is a conflated channel). If you are not listening to the stream, you are losing values. You can have multiple subscribers. You can close the channel. Channel is not lifecycle aware. Channel doesn’t store values for later (conflated channel does !). When you subscribe you will receive value only if value is sent after your subscription.(unless it is rendezvous channel)

When I should use a channel? Use it when you expect a stream of data. It gives you control over data flow — you can use buffer, receive exact number of values, receive only latest value.

  • Pipelines — this is a pattern when coroutine produce infinite stream of data. Here with a channel you can pick a buffer strategy and you can control when you will receive values (when you subscribe). The benefit of a pipeline that uses channels is that it can actually use multiple CPU cores if you run it in Dispatchers.Default context.
  • Filters — you can filter infite stream of data
  • Distribution of work — Multiple coroutines may receive from the same channel, distributing work between themselves.
  • Merge couroutines result — Multiple coroutines may send to the same channel.

👀 Let’s see the API:

Channel is represented via three different interfaces: SendChannel, ReceiveChannel, and Channel

  • Create
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

By default, a “Rendezvous” channel is created.

  • Emit value
launch {
channel.send(1)
}
ORchannel.offer()

send() is a suspend function and should be called from a coroutine scope.

offer() is a synchronous variant of send which backs off in situations when send suspends.

  • Receive value
launch {
val x = channel.receive()
}
  • Close channel
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we’re done sending
}
  • Producing a sequence of elements
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting //from 1
}

🎊 Tricks

  • Ticker channel

Ticker channel is a special rendezvous channel that produces Unit every time given delay passes since last consumption from this channel.

val tickerChannel = ticker(delayMillis = 100, 
initialDelayMillis = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
  • Convert to flow
val usersChannel = ConflatedBroadcastChannel<String?>()val users: Flow<List<User>> = usersChannel.asFlow()

🍧 Flow

Combine the power of suspension and Rx cold streams this is how Flow was created. It’s idea is to be simple Rx library — you don’t have to manage Subscription, onError and onCompleted.

  • Flow is a cold stream. The code won’t run until you call collect().
  • It is designed to work in lower levels of your architecture. Use LiveData for the UI and Flow for the Repository
  • It handles back-pressure automatically (stream of endless data is ok).
  • Designed to prevent subscription leak problem.
  • Beware on which thread you call collect().

Let’s see what problem Flow resolves.

This is a suspend function that returns list of values:

suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}

The code here can be run asynchronious, BUT it should return all values at once. What if we need a stream? With Flow we can emit integers forever:

fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}

We observe emited values with collect().
simple().collect { value -> println(value) }

This code is asynchronious. The main thread won’t be blocked. You can continue singing songs or some real work, while waiting for a int values at the same time.

Code in the flow block is executed in the thread that calls collect. ❗️

Since simple().collect is called from the main thread, the body of simple's flow is also called in the main thread. So for long running operations use operator flowOn().

flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default)

👀 Let’s see the API:

  • Create

flow { } — this is the most basic one

flowOf(1, 2, 3) — builder that defines a flow emitting a fixed set of values

(1..3).asFlow() — you can convert everything to Flow (collections, sequences, suspend functions)

Note that function that return Flow is not marked with suspend

  • Emit value

emit(1)

  • Receive value

flow.collect { value -> println(value) }

  • Change the thead of execution

.flowOn(Dispatchers.Default)

  • Handle back-presssure

When you expect values that are emited so fast, you can add delay using methodconflate().

val result = flow.conflate().onEach { delay(1000) }.toList()
assertEquals(listOf(1, 10, 20, 30), result)

Applying conflate() operator to it allows a collector that delays 1 second on each element to get integers 1, 10, 20, 30:

  • Close

Remember Subscription.unsubscribe()? Well with Flows you don’t have to do it anymore. They are closed when function is suspended.

withTimeoutOrNull(250) { // Timeout after 250ms 
simple().collect { value -> println(value) }
}
//Flow is closed after the timeout
  • Operations

map(), transform{}, take(2), reduce{}

  • Save the last received value

Use StateFlow — aFlow that represents a read-only state with a single updatable data value. StateFlow is simpler, because it does not have to implement all the Channel APIs, which allows for faster, garbage-free implementation, unlike ConflatedBroadcastChannel implementation that allocates objects on each emitted value.

private val _counter = MutableStateFlow(0) // private mutable state flow
val counter: StateFlow<Int> get() = _counter // publicly exposed as read-only state flow
_counter.value++

🎊 Tricks

  • Change thread of execution:

flowOn(Dispatchers.Default)

We can easily change the thread we work on using flowOn() and handle backpressure by calling conflate() on the Flow chain that skips values emitted by this Flow if the collector is slower than emitter.

@Query(“SELECT * FROM users”)
fun getAllUsersFlow(): Flow<List<User>>
fun getAllUsersFlow(): Flow<List<User>> =
dao.getAllUsersFlow()
// do some mapping
.flowOn(Dispatchers.Default)
.conflate()
  • Transform to LiveData

We can transform Flow to LiveData in two ways:

val users: LiveData<List<User>> = repository.getAllUsersFlow().asLiveData()

or

val users: LiveData<List<User>> = liveData {
// some additional work
repository.getAllUsersFlow()
}

Flow will take care of threading and data sources operations, and propagating the results to LiveData.

Whenever there’s a new observer added to Flow, the Flow will start a new execution for it.

  • Execute code after Flow has completed:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
// reemit all values from the original flow
collect { value -> emit(value) }
// this code runs only after the normal completion
action()
}

Great article about Flow:

https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9

--

--

--

Android enthusiast and keen tea drinker. I love to learn and to be funny, both for me are passions.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Excel — Look out for Lookups!!

pytest.fail in a fixture and in a test

Live Streaming to Multiple Platforms with Multiple Users

How To: Solving ‘Printing Steps’ Interview Questions

An Example of TDD Cycle With PHP

My Journey to Open source and GSoC

Set Up a ChainLink Test Node with AWS EC2 , Fiews , and Docker

HTML Headings

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Ivana Tanova

Ivana Tanova

Android enthusiast and keen tea drinker. I love to learn and to be funny, both for me are passions.

More from Medium

I feel hopeless

The Covid Edition

Empathy

Make Friend