Kotlin Coroutines

Coroutines are officially part of the Kotlin standard library starting with version 1.3 and they are very helpful in creating concurrent non-blocking code. They simplify async programming. Coroutines are actually lightweight threads. 

articles kotlin (7)

Supporting processing multiple tasks is called internal concurrency. In Kotlin to achieve this we use the coroutines library (kotlinx.coroutines). It is similar to threading, except coroutines are even more light-weight. A coroutine = piece of code that can be suspended and resumed.
We can compare a coroutine with a usual task that can be done by an employee. The number of coroutines scheduled for execution can be more than a million.
In a single thread, only a single coroutine is processed at a time. That’s why it’s called internal concurrency. 

coroutines = cooperation + functions

Sequential execution

single-thread

Parallel execution

multiple-thread

Concurrent execution

coroutines

📌Coroutine Scopes

A coroutine should run in a scope. The scope is used in order to track the lifecycle of the coroutine. Also it is a way to bind the coroutine(s) to an app specific lifecycle.

Behind the scope there is the concept of structured concurrency that helps us, the developers, to avoid leaking coroutines which can waste the resources (disk, memory, cpu, network). 

Structured concurrency is related to three main things:

  • Keep track of the running coroutine
  • Cancel work when it is not longer necessary to run it
  • Notify errors when something bad happens to a coroutine

Every coroutine builder is an extension of CoroutineScope and inherits its coroutine context in order to propagate context elements and cancellation.

🔨Coroutine Builders

Coroutine Builders are extension functions of CoroutineScope used to build and manage the execution of the coroutine.

run() 

Function definition

  • lambda with a receiver
  • return type is whatever the lambda returns
  • there is also a non-extension version of this function
inline fun <R> run(block: () -> R): R


fun task1() {
println("Start task1 | Thread ${Thread.currentThread()}")
println("End task1 | Thread ${Thread.currentThread()}")
}
fun task2() {
println("Start task2 | Thread ${Thread.currentThread()}")
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
println("Start main")
run {
task1()
task2()
println("Called task1 and task2 from ${Thread.currentThread()}")
}
println("End main")
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Start task1 | Thread Thread[main,5,main]
End task1 | Thread Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
Called task1 and task2 from Thread[main,5,main]
End main

 

runBlocking 

Function definition

  • Runs a new coroutine
  • Blocks current thread interruptible until its completion
  • Could set a name to the coroutine for debug purposes
runBlocking(CoroutineName("pink-coroutine"))
fun <T> runBlocking(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T
): T (source)


import kotlinx.coroutines.*
fun task1() {
println("Start task1 | Thread ${Thread.currentThread()}")
println("End task1 | Thread ${Thread.currentThread()}")
}
fun task2() {
println("Start task2 | Thread ${Thread.currentThread()}")
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
println("Start main")
runBlocking {
task1()
task2()
println("Called task1 and task2 from ${Thread.currentThread()}")
}
println("End main")
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Start task1 | Thread Thread[main @coroutine#1,5,main]
End task1 | Thread Thread[main @coroutine#1,5,main]
Start task2 | Thread Thread[main @coroutine#1,5,main]
End task2 | Thread Thread[main @coroutine#1,5,main]
Called task1 and task2 from Thread[main @coroutine#1,5,main]
End main

launch()  

Function definition

  • Extension function of CoroutineScope
  • Launches a new coroutine
  • Does not block the current thread
  • Returns a reference to the coroutine as a Job
fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job (source)

The Job class represents a cancellable task that has its own lifecycle and it can also have children. 

The states of job’s life cycle:

articles kotlin (8)


import kotlinx.coroutines.launch
fun task1() {
println("Start task1 | Thread ${Thread.currentThread()}")
println("End task1 | Thread ${Thread.currentThread()}")
}
fun task2() {
println("Start task2 | Thread ${Thread.currentThread()}")
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
println("Start main")
runBlocking {
launch { task1() }
launch { task2() }
println("Called task1 and task2 from ${Thread.currentThread()}")
}
println("End main")
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Called task1 and task2 from Thread[main,5,main]
Start task1 | Thread Thread[main,5,main]
End task1 | Thread Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
End main

suspend functions

Function definition

  • Suspend the execution of the current task and let another task execute
  • Could be called only from within a coroutine
  • There are 2 functions that could help us to achieve this behavior:
    • delay() => pause the currently executing task for a specific time (milliseconds)
    • yield() => let other task to be executed


import kotlinx.coroutines.yield
suspend fun suspendTask1() {
println("Start task1 | Thread ${Thread.currentThread()}")
yield()
println("End task1 | Thread ${Thread.currentThread()}")
}
suspend fun suspendTask2() {
println("Start task2 | Thread ${Thread.currentThread()}")
yield()
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
println("Start main")
runBlocking {
launch { suspendTask1() }
launch { suspendTask2() }
println("Called task1 and task2 from ${Thread.currentThread()}")
}
println("End main")
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Called task1 and task2 from Thread[main,5,main]
Start task1 | Thread Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task1 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
End main

async / await

Function definition

  • async() is an extension function of CoroutineScope
  • If we want to execute a task asynchronously and get the result, then we should use async() instead of launch() function. 
  • async() creates a new coroutine and returns its future result as an implementation of Deferred<T> which has an await() function very helpful in order to get the status of the coroutine.
  • The call to await() will wait for the coroutine started by async() to complete.
fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T): Deferred<T>


fun add(a: Int, b: Int): Int {
return a + b
}
fun main() {
runBlocking {
println("Start main")
val sumOf2 = async {
println("Start sumOf2")
add(2, 2)
}
val sumOf3 = async {
println("Start sumOf3")
add(3, 3)
}
println("Awaiting the sums")
val total = sumOf2.await() + sumOf3.await()
println("Total is $total")
println("End main")
}
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Awaiting the sums
Start sumOf2
Start sumOf3
Total is 10
End main

⛓️Coroutine Context and Threads

CoroutineContext defines the context in which the coroutine will run.

Specific Context

When we want to run our tasks on a dedicated thread pool the solution is to use the proper dispatchers from the Dispatchers classes.

  • Dispatchers.Default – Different thread (if possible). It is backed by a shared pool of threads on JVM => CPU
  • Dispatchers.Main – Platform specific main thread (if exists) => UI/Non-blocking
  • Dispatchers.IO – Thread designed for offloading blocking IO tasks to a shared pool of threads => Network and disk
  • Dispatchers.Unconfined – Always uses the first available thread (most performant dispatcher).


import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
suspend fun suspendTask1() {
println("Start task1 | Thread ${Thread.currentThread()}")
yield()
println("End task1 | Thread ${Thread.currentThread()}")
}
suspend fun suspendTask2() {
println("Start task2 | Thread ${Thread.currentThread()}")
yield()
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
println("Start main")
runBlocking {
launch(Dispatchers.Default) { suspendTask1() }
launch { suspendTask2() }
println("Called task1 and task2 from ${Thread.currentThread()}")
}
println("End main")
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Start task1 | Thread Thread[DefaultDispatcher-worker-2,5,main]
End task1 | Thread Thread[DefaultDispatcher-worker-2,5,main]
Called task1 and task2 from Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
End main

Custom pool of threads

When we want to run our tasks by using our own custom thread pool we apply the asCoroutineDispatcher function on Java’s ExecutorService. 

  • Executors.newSingleThreadExecutor() 
  • Executors.newFixedThreadPool()
  • Executors.newScheduledThreadPool()
  • Exectors.newCachedThreadPool()

An important thing to mention here is that the Executor should be closed because if we don’t do it our program may run infinitely. So when the coroutine(s) completes then the executor must be closed. An easy and clean way to implement this behavior is by using the use() function.


import kotlinx.coroutines.*
import java.util.concurrent.Executors
suspend fun suspendTask1() {
println("Start task1 | Thread ${Thread.currentThread()}")
yield()
println("End task1 | Thread ${Thread.currentThread()}")
}
suspend fun suspendTask2() {
println("Start task2 | Thread ${Thread.currentThread()}")
yield()
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { context ->
println("Start main")
runBlocking {
launch(context) { suspendTask1() }
launch { suspendTask2() }
println("Called task1 and task2 from ${Thread.currentThread()}")
}
println("End main")
}
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Called task1 and task2 from Thread[main,5,main]
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]
Start task1 | Thread Thread[pool-1-thread-1,5,main]
End task1 | Thread Thread[pool-1-thread-1,5,main]
End main

 

Change the CoroutineContext

If we want to run a coroutine in one context and then change the context midway the solution is to use the withContext() function.


import kotlinx.coroutines.*
import java.util.concurrent.Executors
suspend fun suspendTask1() {
println("Start task1 | Thread ${Thread.currentThread()}")
yield()
println("End task1 | Thread ${Thread.currentThread()}")
}
suspend fun suspendTask2() {
println("Start task2 | Thread ${Thread.currentThread()}")
yield()
println("End task2 | Thread ${Thread.currentThread()}")
}
fun main() {
runBlocking {
println("Start main")
println("Starting in Thread ${Thread.currentThread()}")
withContext(Dispatchers.Default) { suspendTask1() }
launch { suspendTask2() }
println("Ending in Thread ${Thread.currentThread()}")
println("End main")
}
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Starting in Thread Thread[main,5,main]
Start task1 | Thread Thread[DefaultDispatcher-worker-1,5,main]
End task1 | Thread Thread[DefaultDispatcher-worker-1,5,main]
Ending in Thread Thread[main,5,main]
End main
Start task2 | Thread Thread[main,5,main]
End task2 | Thread Thread[main,5,main]

❌Cancelling coroutines

Coroutines can be cancelled. A coroutine is cancelled only if it is currently in a suspension point.

In order to cancel an asynchronous process running in a coroutine we could apply one the next solutions:

  • Use Job reference returned by launch builder 
  • Use Deferred<T> reference returned by async()
    • Have a cancel and cancelAndJoin function
  • Use withTimeout function
  • Use withTimeoutOrNull function

Job is an interface that has the following states (source):

states-job

SupervisorJob creates a supervisor job object in an active state. Children of a supervisor job can fail independently of each other.

cancel()

Function definition

  • Cancels the Job
abstract fun cancel(
    cause: CancellationException? = null
): Unit (source)

join()

Function definition

  • Suspends the coroutine until the current job is complete
abstract suspend fun join(): Unit

cancelAndJoin()

Function definition

  • Cancels the job and suspends the invoking coroutine until the cancelled job is complete.
suspend fun Job.cancelAndJoin(): Unit

withTimeout()

Function definition

  • Helpful to use it when a job might be taking too long 
  • The function runs a suspending block of code inside a coroutine and throws a TimeoutCancellationException if the timeout is exceeded
suspend fun <T> withTimeout(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T

withTimeoutOrNull 

Function definition

  • Runs a given suspending block of code inside a coroutine with a specified timeout and returns null if this timeout was exceeded.
suspend fun <T> withTimeoutOrNull(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T?

Code sample cancel/join or cancelAndJoin


import kotlinx.coroutines.*
fun main() {
runBlocking {
println("Start main")
val job = launch {
repeat(300) { waitingTime ->
println("Job is waiting $waitingTime")
delay(50L)
}
}
delay(300L)
println("Stop waiting. Let's cancel it…")
job.cancel()
job.join()
println("End main")
}
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

The result 👇

Start main
Job is waiting 0...
Job is waiting 1...
Job is waiting 2...
Job is waiting 3...
Job is waiting 4...
Job is waiting 5...
Stop waiting. Let's cancel it...
End main

Code sample withTimeout / withTimeoutOrNull


import kotlinx.coroutines.*
fun main() {
runBlocking {
withTimeout(1000L) {
repeat(50) { waitingTime ->
println("Job is waiting $waitingTime")
delay(100L)
}
}
}
}

view raw

Coroutines.kt

hosted with ❤ by GitHub

✨The magic behind the scenes…

Under the hood when we use coroutines we actually work with Continuation objects. This class contains the results of the partial execution of the function so that the result can be sent to the caller by using the Continuation callback. 

The compiler empowers the continuations to implement the mechanism of the coroutines, to switch the context, the threads and to restore the states.

📚 Learn more…

One thought on “Kotlin Coroutines

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s