Three Paradigms of Asynchronous Programming in Vertx

Three Paradigms of Asynchronous Programming in VertxPavel BernshtamBlockedUnblockFollowFollowingApr 27I want to show three paradigms of asynchronous programming — callbacks, futures and coroutines by example of simple web application using Vertx framework and Kotlin programming language.

Let’s assume we are writing an application, which receives a string in HTTP request, searches by this string for a URL in DB, fetches this URL content and sends it back to the client.

Vertx was created as an asynchronuous framework for high load application, it uses Netty, new I/O, event bus.

As it is common in Vertx, one Verticle (something like Actor, if you know Akka) receives a request, sends received string to an event bus to another verticle — BusinessVerticle, which is responsible for the fetching itself.

object Main { @JvmStatic fun main(args: Array<String>) { val vertx = Vertx.

vertx() vertx.

deployVerticle(HttpVerticle()) vertx.

deployVerticle(BusinessVerticleCoroutines()) }}class HttpVerticle : AbstractVerticle() { @Throws(Exception::class) override fun start(startFuture: Future<Void>) { val router = createRouter()vertx.

createHttpServer() .

requestHandler(router) .

listen(8080) { result -> if (result.

succeeded()) { startFuture.

complete() } else { startFuture.

fail(result.

cause()) } } }private fun createRouter(): Router = Router.

router(vertx).

apply { get("/").

handler(handlerRoot) }private val handlerRoot = Handler<RoutingContext> { rc -> vertx.

eventBus().

send("my.

addr", rc.

request().

getParam("id") ?: "") { resp: AsyncResult<Message<String>> -> if (resp.

succeeded()) { rc.

response().

end(resp.

result().

body()) } else { rc.

fail(500) } } }}In the standard Vertx API you do all asynchronuous flow by callbacks, so the initial implementation of BusinessVerticle looks like this:class BusinessVerticle : AbstractVerticle() {private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClientoverride fun start() { vertx.

eventBus().

consumer<String>("my.

addr") { message -> handleMessage(message) } dbclient = JDBCClient.

createShared( vertx, JsonObject() .

put("url", "jdbc:postgresql://localhost:5432/payroll") .

put("driver_class", "org.

postgresql.

Driver") .

put("user", "vala") .

put("password", "vala") .

put("max_pool_size", 30) )val options = WebClientOptions() .

setUserAgent("My-App/1.

2.

3")options.

isKeepAlive = false webclient = WebClient.

create(vertx, options) }private fun handleMessage(message: Message<String>) { dbclient.

getConnection { res -> if (res.

succeeded()) {val connection = res.

result()connection.

query("SELECT url FROM payee_company where name='${message.

body()}'") { res2 -> if (res2.

succeeded()) { try { val url = res2.

result().

rows[0].

getString("url").

removePrefix("http://") webclient .

get(url,"/") .

send { ar -> if (ar.

succeeded()) { // Obtain response val response = ar.

result() message.

reply(response.

bodyAsString()) } else { message.

fail(500, ar.

cause().

message) } }} catch (e: Exception) { message.

fail(500, e.

message) } } else { message.

fail(500, res2.

cause().

message) } } } else { message.

fail(500, res.

cause().

message) } } }}And this looks bad.

Callbacks hell and error handling is done in several places.

Let’s improve the situation by extraction of each callback to a separate method:class BusinessVerticle : AbstractVerticle() {private lateinit var dbclient: JDBCClient private lateinit var webclient: WebClientoverride fun start() { vertx.

eventBus().

consumer<String>("my.

addr") { message -> handleMessage(message) } dbclient = JDBCClient.

createShared( vertx, JsonObject() .

put("url", "jdbc:postgresql://localhost:5432/payroll") .

put("driver_class", "org.

postgresql.

Driver") .

put("user", "vala") .

put("password", "vala") .

put("max_pool_size", 30) )val options = WebClientOptions() .

setUserAgent("My-App/1.

2.

3")options.

isKeepAlive = false webclient = WebClient.

create(vertx, options) }private fun handleMessage(message: Message<String>) { dbclient.

getConnection { res -> handleConnectionCallback(res, message) } }private fun handleConnectionCallback( res: AsyncResult<SQLConnection>, message: Message<String> ) { if (res.

succeeded()) {val connection = res.

result()connection.

query("SELECT url FROM payee_company where name='${message.

body()}'") { res2 -> handleQueryCallBack(res2, message) } } else { message.

fail(500, res.

cause().

message) } }private fun handleQueryCallBack( res2: AsyncResult<ResultSet>, message: Message<String> ) { if (res2.

succeeded()) { try { val url = res2.

result().

rows[0].

getString("url").

removePrefix("http://") webclient .

get(url, "/") .

send { ar -> handleHttpCallback(ar, message) }} catch (e: Exception) { message.

fail(500, e.

message) } } else { message.

fail(500, res2.

cause().

message) } }private fun handleHttpCallback( ar: AsyncResult<HttpResponse<Buffer>>, message: Message<String> ) { if (ar.

succeeded()) { // Obtain response val response = ar.

result() message.

reply(response.

bodyAsString()) } else { message.

fail(500, ar.

cause().

message) } }}Better, but still bad.

Not readable code.

Message object which we should pass to all methods, separate error handling in each callback.

Let’s rewrite it using Futures.

The great advantage of Futures is the ability to chain them, using Future.

compose()First, let’s translate standard Vertx methods, which receive callbacks to methods which returns Future.

We will use “extension methods” — Kotlin feature, which allows to add a method to existing class.

fun JDBCClient.

getConnectionF(): Future<SQLConnection> { val f = Future.

future<SQLConnection>() getConnection { res -> if (res.

succeeded()) { val connection = res.

result() f.

complete(connection) } else { f.

fail(res.

cause()) } } return f}fun SQLConnection.

queryF(query:String): Future<ResultSet> { val f = Future.

future<ResultSet>() query(query) { res -> if (res.

succeeded()) { val resultSet = res.

result() f.

complete(resultSet) } else { f.

fail(res.

cause()) } } return f}fun HttpRequest<Buffer>.

sendF(): Future<HttpResponse<Buffer>> { val f = Future.

future<HttpResponse<Buffer>>() send() { res -> if (res.

succeeded()) { val response = res.

result() f.

complete(response) } else { f.

fail(res.

cause()) } } return f}And then BusinessVerticle.

handleMessage method will be transformed to:private fun handleMessage(message: Message<String>) { val content = getContent(message)content.

setHandler{res-> if (res.

succeeded()) { // Obtain response val response = res.

result() message.

reply(response) } else { message.

fail(500, res.

cause().

message) } } }private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.

getConnectionF() val resultSet = connection.

compose { it.

queryF("SELECT url FROM payee_company where name='${message.

body()}'") } val url = resultSet.

map { it.

rows[0].

getString("url").

removePrefix("http://") } val httpResponse = url.

compose { webclient.

get(it, "/").

sendF() } val content = httpResponse.

map { it.

bodyAsString() } return content }Looks good.

Simple, readable code.

Error handling in one place.

If required we can add different error handling for different exceptions and/or extract it to a separate method.

But what if we need, under some condition, stop the chain of Future.

compose()?For example, if there is no records in the DB, we want to return a response “No records” with HTTP code 200 instead of an error and code 500?The only way to do it is to throw a special exception and make a special handling for such exception:class NoContentException(message:String):Exception(message)private fun getContent(message: Message<String>): Future<String> { val connection = dbclient.

getConnectionF() val resultSet = connection.

compose { it.

queryF("SELECT url FROM payee_company where name='${message.

body()}'") } val url = resultSet.

map { if (it.

numRows<1) throw NoContentException("No records") it.

rows[0].

getString("url").

removePrefix("http://") } val httpResponse = url.

compose { webclient.

get(it, "/").

sendF() } val content = httpResponse.

map { it.

bodyAsString() } return content }private fun handleMessage(message: Message<String>) { val content = getContent(message)content.

setHandler{res-> if (res.

succeeded()) { // Obtain response val response = res.

result() message.

reply(response) } else { if (res.

cause() is NoContentException) message.

reply(res.

cause().

message) else message.

fail(500, res.

cause().

message) } } }It is working well, but looks not so good — we are using exception for flow control and if there are a lot of “special cases” in flow, the code will be much less readable.

So, let’s try to do the same thing with Kotlin coroutines.

There are a lot of articles about Kotlin coroutines, so I’ll not explain here what they are and how they work .

In last versions of Vertx you can include automatically generated coroutine-friendly versions of all callback methods.

You should add libraries‘vertx-lang-kotlin-coroutines’‘vertx-lang-kotlin’and you get JDBCClient.

getConnectionAwait(), SQLConnection.

queryAwait() and others.

If we use those methods, our message handling method will become simpler:private suspend fun handleMessage(message: Message<String>) { try { val content = getContent(message) message.

reply(content) } catch(e:Exception){ message.

fail(500, e.

message) }}private suspend fun getContent(message: Message<String>): String { val connection = dbclient.

getConnectionAwait() val resultSet = connection.

queryAwait("SELECT url FROM payee_company where name='${message.

body()}'") val url = resultSet.

rows[0].

getString("url").

removePrefix("http://") val httpResponse = webclient.

get(url, "/").

sendAwait() val content = httpResponse.

bodyAsString() return content }Additionally you should change the event bus message subscribe code:vertx.

eventBus().

consumer<String>("my.

addr") { message -> GlobalScope.

launch (vertx.

dispatcher()) { handleMessage(message)} }What happens here?All those “await” methods call a code in an asynchronous way, then wait for a result and while they wait, a thread is switching to running another coroutine.

If we will look to implementation for those “await” methods, we will see something very similar to our homemade implementation for Futures:suspend fun SQLClient.

getConnectionAwait(): SQLConnection { return awaitResult { this.

getConnection(it) }}suspend fun <T> awaitResult(block: (h: Handler<AsyncResult<T>>) -> Unit): T { val asyncResult = awaitEvent(block) if (asyncResult.

succeeded()) return asyncResult.

result() else throw asyncResult.

cause()}suspend fun <T> awaitEvent(block: (h: Handler<T>) -> Unit): T { return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> try { block.

invoke(Handler { t -> cont.

resume(t) }) } catch (e: Exception) { cont.

resumeWithException(e) } }}But here we get a normal code — String as return type (and not Future<String>) and try/catch instead of the ugly callback with AsyncResult.

If we need to stop a flow in the middle, we can do it in a natural way, without exceptions:private suspend fun getContent(message: Message<String>): String { val connection = dbclient.

getConnectionAwait() val resultSet = connection.

queryAwait("SELECT url FROM payee_company where name='${message.

body()}'") if (resultSet.

numRows<1) return "No records" val url = resultSet.

rows[0].

getString("url").

removePrefix("http://") val httpResponse = webclient.

get(url, "/").

sendAwait() val content = httpResponse.

bodyAsString() return content }IMHO, it is just beautiful!.

. More details

Leave a Reply