Kotlin Coroutines

Coroutines are officially part of the Kotlin standard library starting with version 1.3 and they are very helpful in creating concurrent non-blocking code. They simplify async programming. Coroutines are actually lightweight threads. 

articles kotlin (7)

coroutines = cooperation + functions

Sequential execution

single-thread

Parallel execution

multiple-thread

Concurrent execution

coroutines

📌Coroutine Scopes

A coroutine should run in a scope. The scope is used in order to track the lifecycle of the coroutine. Also it is a way to bind the coroutine(s) to an app specific lifecycle.

Behind the scope there is the concept of structured concurrency that helps us, the developers, to avoid leaking coroutines which can waste the resources (disk, memory, cpu, network). 

Structured concurrency is related to three main things:

  • Keep track of the running coroutine
  • Cancel work when it is not longer necessary to run it
  • Notify errors when something bad happens to a coroutine

Every coroutine builder is an extension of CoroutineScope and inherits its coroutine context in order to propagate context elements and cancellation.

 

🔨Coroutine Builders

Coroutine Builders are extension functions of CoroutineScope used to build and manage the execution of the coroutine.

run() 

Function definition

  • lambda with a receiver
  • return type is whatever the lambda returns
  • there is also a non-extension version of this function
inline fun <R> run(block: () -> R): R

The result 👇

Start main
Start task1 | Thread Thread[main,5,main]
End task1 | Thread Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
Called task1 and task2 from Thread[main,5,main]
End main

 

runBlocking 

Function definition

  • Runs a new coroutine
  • Blocks current thread interruptible until its completion
  • Could set a name to the coroutine for debug purposes
runBlocking(CoroutineName("pink-coroutine"))
fun <T> runBlocking(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T
): T (source)

The result 👇

Start main
Start task1 | Thread Thread[main @coroutine#1,5,main]
End task1 | Thread Thread[main @coroutine#1,5,main]
Start task2 | Thread Thread[main @coroutine#1,5,main]
End task2 | Thread Thread[main @coroutine#1,5,main]
Called task1 and task2 from Thread[main @coroutine#1,5,main]
End main

 

launch()  

Function definition

  • Extension function of CoroutineScope
  • Launches a new coroutine
  • Does not block the current thread
  • Returns a reference to the coroutine as a Job
fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job (source)

The Job class represents a cancellable task that has its own lifecycle and it can also have children. 

The states of job’s life cycle:

articles kotlin (8)

The result 👇

Start main
Called task1 and task2 from Thread[main,5,main]
Start task1 | Thread Thread[main,5,main]
End task1 | Thread Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
End main

 

suspend functions

Function definition

  • Suspend the execution of the current task and let another task execute
  • Could be called only from within a coroutine
  • There are 2 functions that could help us to achieve this behavior:
    • delay() => pause the currently executing task for a specific time (milliseconds)
    • yield() => let other task to be executed

The result 👇

Start main
Called task1 and task2 from Thread[main,5,main]
Start task1 | Thread Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task1 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
End main

 

async / await

Function definition

  • async() is an extension function of CoroutineScope
  • If we want to execute a task asynchronously and get the result, then we should use async() instead of launch() function. 
  • async() creates a new coroutine and returns its future result as an implementation of Deferred<T> which has an await() function very helpful in order to get the status of the coroutine.
  • The call to await() will wait for the coroutine started by async() to complete.
fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T): Deferred<T>

The result 👇

Start main
Awaiting the sums
Start sumOf2
Start sumOf3
Total is 10
End main

 

⛓️Coroutine Context and Threads

CoroutineContext defines the context in which the coroutine will run.

 

Specific Context

When we want to run our tasks on a dedicated thread pool the solution is to use the proper dispatchers from the Dispatchers classes.

  • Dispatchers.Default – Different thread (if possible). It is backed by a shared pool of threads on JVM => CPU
  • Dispatchers.Main – Platform specific main thread (if exists) => UI/Non-blocking
  • Dispatchers.IO – Thread designed for offloading blocking IO tasks to a shared pool of threads => Network and disk
  • Dispatchers.Unconfined – Always uses the first available thread (most performant dispatcher).

The result 👇

Start main
Start task1 | Thread Thread[DefaultDispatcher-worker-2,5,main]
End task1 | Thread Thread[DefaultDispatcher-worker-2,5,main]
Called task1 and task2 from Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
End main

 

Custom pool of threads

When we want to run our tasks by using our own custom thread pool we apply the asCoroutineDispatcher function on Java’s ExecutorService. 

  • Executors.newSingleThreadExecutor() 
  • Executors.newFixedThreadPool()
  • Executors.newScheduledThreadPool()
  • Exectors.newCachedThreadPool()

An important thing to mention here is that the Executor should be closed because if we don’t do it our program may run infinitely. So when the coroutine(s) completes then the executor must be closed. An easy and clean way to implement this behavior is by using the use() function.

The result 👇

Start main
Called task1 and task2 from Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
Start task1 | Thread Thread[pool-1-thread-1,5,main]
End task1 | Thread Thread[pool-1-thread-1,5,main]
End main

 

Change the CoroutineContext

If we want to run a coroutine in one context and then change the context midway the solution is to use the withContext() function.

The result 👇

Start main
Starting in Thread Thread[main,5,main]
Start task1 | Thread Thread[DefaultDispatcher-worker-1,5,main]
End task1 | Thread Thread[DefaultDispatcher-worker-1,5,main]
Ending in Thread Thread[main,5,main]
End main
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]

 

❌Cancelling coroutines

Coroutines can be cancelled. A coroutine is cancelled only if it is currently in a suspension point.

In order to cancel an asynchronous process running in a coroutine we could apply one the next solutions:

  • Use Job reference returned by launch builder 
  • Use Deferred<T> reference returned by async()
    • Have a cancel and cancelAndJoin function
  • Use withTimeout function
  • Use withTimeoutOrNull function

 

Job is an interface that has the following states (source):

states-job

SupervisorJob creates a supervisor job object in an active state. Children of a supervisor job can fail independently of each other.

 

cancel()

Function definition

  • Cancels the Job
abstract fun cancel(
    cause: CancellationException? = null
): Unit (source)

 

join()

Function definition

  • Suspends the coroutine until the current job is complete
abstract suspend fun join(): Unit

 

cancelAndJoin()

Function definition

  • Cancels the job and suspends the invoking coroutine until the cancelled job is complete.
suspend fun Job.cancelAndJoin(): Unit

 

withTimeout()

Function definition

  • Helpful to use it when a job might be taking too long 
  • The function runs a suspending block of code inside a coroutine and throws a TimeoutCancellationException if the timeout is exceeded
suspend fun <T> withTimeout(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T

withTimeoutOrNull 

Function definition

  • Runs a given suspending block of code inside a coroutine with a specified timeout and returns null if this timeout was exceeded.
suspend fun <T> withTimeoutOrNull(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T?

 

Code sample cancel/join or cancelAndJoin

The result 👇

Start main
Job is waiting 0...
Job is waiting 1...
Job is waiting 2...
Job is waiting 3...
Job is waiting 4...
Job is waiting 5...
Stop waiting. Let's cancel it...
End main

 

Code sample withTimeout / withTimeoutOrNull

✨The magic behind the scenes…

Under the hood when we use coroutines we actually work with Continuation objects. This class contains the results of the partial execution of the function so that the result can be sent to the caller by using the Continuation callback. 

The compiler empowers the continuations to implement the mechanism of the coroutines, to switch the context, the threads and to restore the states.

 

📚 Learn more…

 

One thought on “Kotlin Coroutines

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s