The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Using Redis with Mutiny - Composing asynchronous actions

I got an interesting question from a user about Redis and Mutiny. While the problem was not specifically about Redis and could be applied to many other APIs, I found the context amusing.

Enrico, the user, wanted to do something like this:

1. get all keys from Redis
2. for each key -> retrieve the associated object
3. add this object to a JsonArray
4. produce the JsonArray with all the objects

Enrico is using the Mutiny variant of the Vert.x Redis Client.

This client offers a few methods to help us with our problem:

  • the RedisClient.keys(pattern) method returning Uni<JsonArray>. This array contains the list of keys matching a pattern passed to the keys method. To simplify this post, let’s use: keys("*") returning all the keys.

  • the RedisClient.hgetall(key) method returning a Uni<JsonObject>. This method retrieves the object associated with the passed key.

Both methods are asynchronous (they return Uni), and we need to call the second one for each retrieved key. In other words, we need to iterate over the set of keys, and for each key to invoke an asynchronous action. Finally, we want to collect the result of these asynchronous actions into a JsonArray.

Let’s start with the beginning; we need the Redis Client instance:

RedisClient redis = RedisClient.create(vertx, new JsonObject()
     .put("port", 6379)
     .put("host", "localhost"));

Note that in Quarkus, you should use the Redis extension directly, which exposes a similar API. Enrico wanted to use the Vert.x Redis Client directly.

Now that we have our client, let’s retrieve the list of keys:

Uni<JsonArray> keys = redis.keys("*")

That produces the JsonArray, but we want a stream of keys. Again, it’s an asynchronous method. The returned Uni receives the array when it’s available. Once received (onItem), we can create a stream out of this array:

Multi<String> keys = redis.keys("*")
     .onItem().transformToMulti(array -> Multi.createFrom().iterable(array))
     .onItem().castTo(String.class);

This snippet:

  1. retrieves the JsonArray containing the keys

  2. creates a Multi streaming these keys, it’s a Multi<Object> as a JsonArray is extending Iterable<Object>

  3. maps the items from this Multi to String

At this point, we have a stream of (String) keys. So, we are done with step 1.

Now, step 2: for each key, we want to retrieve the associated object.

So let’s use the hgetall method:

Multi<JsonObject> objects = keys
  .onItem().transformToUniAndMerge(key -> redis.hgetall(key));

This snippet requires a bit of an explanation.

For each item of the stream keys, we call hgetall, which produces a Uni<JsonObject>.

So, we want to transform our key into a Uni (transformToUni).

When you have a stream of items and need to invoke an asynchronous action for each item, you must choose how you will merge the results. Mutiny provides two strategies:

  • merge - as soon as the item produced by the Uni is received we send it downstream

  • concatenate - we preserve the order of the input stream to be sure that the items are sent downstream in the same order

Let’s illustrate this. Imagine we have the keys 1, 2, 3 and to the stream {1, 2, 3}. Also, let’s consider that in our Redis database, the key 1 is associated to A, 2 to B and 3 to C.

If you use the merge strategy, we are retrieving the associated objects in an undetermined order. We can end up with {A, C, B} or {B, A, C}. It depends on many factors, such as the latency, scheduling, load and so on. However, it also means we can retrieve all the associated objects concurrently and produce the resulting stream without taking care of the order.

If you use the concatenate strategy, it preserves the order from the input stream. So, it will always produce {A, B, C}. While it may be desirable, it may reduce the ability to retrieve the object concurrently, as Mutiny has to wait for all the retrieval of all the previous objects. For example, if Mutiny receives C first, it needs to wait for A and B before sending C downstream.

In our context, let’s not preserve the order and use the merge strategy. So we use transformToUniAndMerge.

If you run the code multiple times, you might see order changes in the resulting array.

Ok, step 2 done. Let’s focus on the final steps: accumulate the objects into a JsonArray, and produce a Uni<JsonArray>, containing all the objects. Mutiny provides methods to gather items from a stream into lists, maps, sets, but there is no built-in JsonArray support. Fortunately, Mutiny offers a method that you can use to collect items in any structure:

Uni<JsonArray> result = objects
   .collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));

collectItems().in allows accumulating the items in your own structure. It takes two parameters: a supplier of the structure, called only once, and a bi-consumer taking the structure and the item to add, called for each item.

Here we go, we have everything to solve Enrico’s question.

The all in one code is the following:

Uni<JsonArray> result =
  // Step 1 - retrieve the keys
  redis.keys("*")
    .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
    .onItem().castTo(String.class)
  // Step 2 - retrieve the associated object for each key
    .onItem().transformToUniAndMerge(key -> redis.hgetall(key))
  // Step 3 and 4 - accumulate the retrieved object in a JsonArray
    .collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));

In this snippet, there are a few interesting patterns:

  • When you have a collection, and you want to iterate on it with Mutiny, transform it into a Multi

  • When you execute asynchronous action for each item of a stream, think about merge vs. concatenate. Use the one that makes sense for you.

  • To accumulate items into a structure, use collectItems, it offers many methods to produce your structure of choice.

If you want to see this code in action, check this gist. You even can run it directly with JBang:

jbang https://gist.github.com/cescoffier/e8c8a18897f9e5ca15f1378876a1bd93

You can replace merge with concatenate to see the difference.

Enjoy!