Emulating request scoped objects with Kotlin Coroutines

Background

A common pattern in web applications is to have data that is request scoped, that is, available during the lifetime of the current request only. An example of an object typically available in request scope is the current database session. The implementation of different request scoped objects however do not all work correctly when used in Kotlin coroutines.

The reason for that is they often use ThreadLocal variables. A ThreadLocal is a container object with the special behavior that the inner object it will return is unique to the current thread it is accessed on. The ThreadLocal is rarely visible directly to the user of a library or framework but is used behind the scenes, for example in the default CurrentSessionContext used by Hibernates SessionFactory.getCurrentSession() and the backing adapters of the MDC object in SLF4J.

A typical web framework will set the values of these objects at the start of a request and then clear them at the end of the same request. This works fine as long as all the users of these objects are accessing them on the same thread the request started on. The typical use case of Kotlin coroutines however will have them potentially suspend and then resume on any arbitrary thread in any number of different thread pools during the life-cycle of a single request.

We therefore need a different way to implement request scoped objects that preferably don’t require us to pass the objects as arguments through a long chain of function calls.

Enter coroutine contexts

The preferred way to store values that needs to be available for the duration of a Kotlin coroutine is to extend the current CoroutineContext to add more elements to it, each element is then accessible anywhere inside a suspending function by key lookup.

An example coroutine context

Here is an example context that simply stores a single requestId value.

class RequestContext(
        val requestId: String
) : AbstractCoroutineContextElement(RequestContext) {
    companion object Key : CoroutineContext.Key
}

Notice the RequestContext.Key companion object, it is what will enable us to look up the RequestContext object later inside of our coroutine.

Using the new context

In order to use our context we need to pass it as an argument to the coroutine launcher. Then, inside the coroutine, we use the global coroutineContext property to look up our context element.

fun test() {
    val context = RequestContext("my-request-id")
    runBlocking(context) {
        val requestId = coroutineContext[RequestContext]?.requestId
        println(requestId) // prints "my-request-id"
    }
}

Easy enough, but what happens if we launch a new coroutine from within the current one? Perhaps we use async to wrap a blocking call to an external service or a database. Will our context element be available within this new async call? The answer, unfortunately, is no.

fun test() {
    val context = RequestContext("my-request-id")
    runBlocking(context) {
        nestedCoroutine()
    }
}

suspend fun nestedCoroutine() {
    return async {
        // imagine a blocking call here
        val requestId = coroutineContext[RequestContext]?.requestId
        println(requestId) // prints "null"
    }.await()
}

The reason that our element is no longer available is that async by default launches with a completely separate context: DefaultDispatcher (the CommonPool).

In order to preserve our current context when launching a new coroutine we will need to explicitly pass it to the launcher, like we did with the original coroutine.

suspend fun nestedCoroutine() {
    return async(coroutineContext) {
        // imagine a blocking call here
        val requestId = coroutineContext[RequestContext]?.requestId
        println(requestId) // prints "my-request-id"
    }.await()
}

Combining coroutine contexts

But what if we are already using a separate context when launching our coroutine? Maybe we have an IOPool context that executes the coroutine on a thread pool meant for IO workloads, we don’t want to replace that behavior when using our new RequestContext.

The solution is to compose the two contexts, this is easily done using the plus operator defined for contexts:

suspend fun nestedCoroutine() {
    val combinedContext = coroutineContext + IOPool
    return async(combinedContext) {
        // imagine a blocking call here
        val requestId = coroutineContext[RequestContext]?.requestId
        println(requestId) // prints "my-request-id"
    }.await()
}

The plus operator associativeness is such that the behavior and elements of the right-hand operand will override the behavior and elements of the left-hand-operand. In this case we will get a context that includes both the dispatching behavior of the IOPool and the RequestContext-element in the current coroutineContext.

The need to remember to reuse the current context when launching new coroutines is a bit annoying but can be alleviated by defining custom coroutine launchers in our applications.

suspend fun <T> asyncIO(block: suspend () -> T): Deferred<T> {
    val combinedContext = coroutineContext + IOPool
    return async(combinedContext) {
         block()
    }
}

Read more

Android Broadcasts made easy with RxAndroid

Background

A common task in an Android application is to listen to system broadcasts and perform some action. For example you may want to listen to network connectivity changes and pause or resume some network communication depending on the network state.

Normally this involves implementing a BroadcastReceiver then registering it with Context.registerReceiver and unregistering it with Context.unregisterReceiver in the appropriate life-cycle methods. While it is not a lot of code by itself, it can quickly become tiring to write and maintain if receivers are used many times throughout an application.

For this reason we can simplify the procedure significantly using a combination of Kotlin extension functions, RxAndroid and the Android Lifecycle-Aware Components library.

First steps

Our first step is to define a observeBroadcasts extension function on Context, this will allow us to use it anywhere we have a Context object available, be it in Activity or in a Service.


fun Context.observeBroadcasts(action: String): Observable<Intent> {
return observeBroadcasts(IntentFilter(action))
}
fun Context.observeBroadcasts(intentFilter: IntentFilter): Observable<Intent> {
val observable = Observable.create<Intent> { observer ->
val receiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
observer.onNext(intent)
}
}
observer.setDisposable(Disposables.fromRunnable {
unregisterReceiver(receiver)
})
registerReceiver(receiver, intentFilter)
}
return observable
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
}

Example usage in an Activity:


private var connectivitySubscription: Disposable? = null
override fun onStart() {
super.onStart()
connectivitySubscription = observeBroadcasts(ConnectivityManager.CONNECTIVITY_ACTION)
.subscribe(this::onConnectivityChange)
}
override fun onStop() {
super.onStop()
connectivitySubscription?.dispose()
}
private fun onConnectivityChange(intent: Intent) {
val connectivityManager = getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
val isConnected = connectivityManager.activeNetworkInfo?.isConnected ?: false
Log.i("MainActivity", "isConnected = $isConnected")
}

Adding lifecycle awareness

The previous example improved our code by removing the need to implement a BroadcastReceiver and hiding the registration flow behind a standard Rx Observable. We can however take one step further by utilizing the recently added LifecycleObserver interface and related methods in the Android Architecture Components.

This new subscribeToBroadcastsOnLifecycle is called only from Activity.onCreate with no further interaction necessary.


fun AppCompatActivity.subscribeToBroadcastsOnLifecycle(action: String, fn: (Intent) -> Unit) {
observeBroadcasts(action).subscribeOnLifecycle(lifecycle, fn)
}
fun <T> Observable<T>.subscribeOnLifecycle(lifecycle: Lifecycle, fn: (T) -> Unit) {
val lifecycleObserver: LifecycleObserver = object : LifecycleObserver {
private var subscription: Disposable? = null
@OnLifecycleEvent(Lifecycle.Event.ON_START)
fun onStart() {
subscription = subscribe(fn)
}
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
fun onStop() {
subscription?.dispose()
}
}
lifecycle.addObserver(lifecycleObserver)
}

Example usage in an Activity:


override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
subscribeToBroadcastsOnLifecycle(ConnectivityManager.CONNECTIVITY_ACTION, this::onConnectivityChange)
}
private fun onConnectivityChange(intent: Intent) {
val connectivityManager = getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
val isConnected = connectivityManager.activeNetworkInfo?.isConnected ?: false
Log.i("MainActivity", "isConnected = $isConnected")
}

End notes

  • Typical blog-code disclaimer: The code snippets above are only intended as inspiration and do not include all recommended error handling code.
  • subscribeToBroadcastsOnLifecycle currently only uses onStart and onStop but should be easy to extend to support any appropriate lifecycle method pairs.