batchingCache

Cache implementation which collects cache requests into batches which are queried at the same time.

This adapter is meant to be used as the first layer in a layer chain. By itself, it does no caching (all calls to get call transform). To learn more about layer chaining, or about the type parameters, see Cache.

Implementation

This describes the current implementation. It is possible for this to change in future versions.

This adapter starts multiple workers which await requests. When one request arrives, the worker which receives it attempts to assign to itself as many other requests as possible without suspending. Once it has collected the requests, it calls transform with all the requests it is assigned to, and redistributes the results as appropriate.

While a worker is sending a request, the other workers (if any) await for more requests. In practice, this means a low number of workers increases the latency (as requests cannot start until one of the workers is available). A high number of workers increases the likelihood of two or more workers sharing a single batch between them, leading to smaller batches.

Example

val scope: CoroutineScope = …

// Let's imagine this is an I/O request
suspend fun foo(ids: Set<Int>): Map<Int, String> {
println("Batch: $ids")
delay(1000)

return ids.associateWith { it.toString() }
}

val cachedFoo = batchingCache(scope) {
val results = foo(it)
.map { (id, value) -> id to value.success() }

for ((id, result) in results) {
emit(id to result)
}
}

scope.launch { println(cachedFoo[5].now()) }
scope.launch { println(cachedFoo[6].now()) }
scope.launch { println(cachedFoo[7].now()) }

/* Output:
*
* Batch: [5, 6, 7]
* Success(value=5)
* Success(value=6)
* Success(value=7)
*/

Parameters

scope

The coroutine scope in which the workers are started.

workers

The number of parallel workers which should be started (at least 1).