The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Sending and Receiving Cloud Events with Kafka

Cloud Events is a specification for describing events. It aims to ease interoperability. With the rise of event-driven architecture, it’s not surprising to see Cloud Events gaining popularity.

This post explains how you can generate and consume Cloud Events using Quarkus, Kafka, and Reactive Messaging.

What is Cloud Event?

First, let’s look at the why. Events are everywhere. Many modern systems are using events one way or another. Events can be used to implement event sourcing, communicate facts, trigger out-of-band processing, or send notifications. Events become an essential piece of any system.

Yet event publishers tend to describe events differently. I don’t mean the content is different, but the envelope and the event’s format are heterogeneous, even if these events transit on the same event mesh. Some applications choose JSON and encode everything in the event’s payload; some other systems prefer binary formats, such as Avro or Protobuf, and use the protocol capability, such as headers or properties, to transport metadata about the wrapped payload. While event-driven architecture claims to ease the integration with external systems, this disparity is doing the opposite. It’s not rare to need a specific event translator with the only purpose to adapt events from one format to another.

So, what’s Cloud Event? Cloud Event proposes a common way to describe events. The goal is, obviously, interoperability and easing the integration burden. Cloud Event 1.0 was released almost a year ago. Over the past year, many Cloud Providers adopted this format, such as Azure and Oracle. Several middlewares have also added support for Cloud Events, such as Knative, Kogito, Debezium, and Quarkus.

Show me some examples!

Ok, so, how does it look? The easiest way to understand Cloud Event is to look at one of them:

{
    "specversion" : "1.0",
    "id" : "O234-345-890",
    "source" : "https://reactive-coffee-shop.io/1234/order",
    "type" : "me.escoffier.coffee.Order",
    "subject" : "order",
    "time" : "2020-11-25T09:05:00Z",
    "datacontenttype" : "application/json",
    "data" : "{\"name\": \"clement\", \"order\":\"espresso\"}",
    "custom-attribute" : "some custom value"
}

This event is described in JSON, but that’s just one of the possibilities. Let’s look at the fields.

First, the specversion indicates which version of Cloud Event it is using (1.0). The id field provides an id for that specific event. The source attribute is a URI identifying the event source, i.e., the context in which an event happened or the application that emitted that specific event. Combining the id and the source provides a unique identifier. Such uniqueness is essential to implement idempotence and handle potential duplicates. The type is the last mandatory attribute. It indicates the type of the event. Here, I use the fully qualified class name, but you can imagine anything. It should refer to the kind of event you have defined in your system.

The other attributes are optional. datacontenttype defines the content-type of the data attribute. subject allows passing extra details about the event, such as an additional hint about the context or the type of event. time is a timestamp, generally indicating the creation time. There is another optional attribute not used in my example. The dataschema attribute lets you pass the schema of the event data.

The data attribute contains the wrapped business event. It is an essential part, and the other attributes are just providing details about that specific business event.

You can also define extensions. These extensions would be a set of custom attributes used when the proposed set of attributes is not enough for your use case.

That’s it! So, we can summarize Cloud Events as just enough metadata to understand an event - its source, an id, a type, and the business data.

Cloud Events on the wire - the bindings

But, how would these events be encoded? The previous example using JSON is nice, but some protocols may want to leverage their own capabilities to transmit these metadata.

That’s why Cloud Events also proposes bindings. A binding is a set of recommendations specific to one protocol. It explains how each protocol should encode Cloud Events. For example, there is a binding for HTTP, one for Kafka, and another for AMQP.

Most of these bindings propose two approaches:

  • structured

  • binary

The structured approach keeps event metadata and data together in the payload of the message or request. It generally uses JSON to encode that data. If you pass the Cloud Event example (from above), in an HTTP request, it will use the structured mode. It will also use the structured mode when you write that JSON snippet in a Kafka record’s value.

The structured approach allows simple forwarding across multiple protocols. However, it may not be efficient and may constraint the type of business data.

The other approach relies on protocol capabilities and enables efficient transfer and encoding. If we use the binary mode with Kafka, we will store the data attribute value in the Kafka record’s value and pass the other attributes using the record’s headers. Consequently, business data can be encoded using binary protocols such as Avro, leading to higher efficiency.

The rest of the post explains how you can send and receive Cloud Events using Quarkus, Kafka, and Reactive Messaging.

Sending Cloud Events on Kafka

The Kafka connector used by Quarkus has built-in support for Cloud Events. It can send and consume Cloud Events using the structured mode (encoding everything in a JSON payload) or the binary mode (using Kafka headers).

To write your outgoing messages as Cloud Event, you only need to specify the cloud-events-type and cloud-events-source attributes on your channel:

mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.generated-price.cloud-events-source=price-generators
mp.messaging.outgoing.generated-price.cloud-events-type=price
mp.messaging.outgoing.generated-price.cloud-events-subject=generated-prices

By default, the connector writes Cloud Events using the binary mode. The connector generates a random id for each message. You can also customize the other Cloud Event attributes using cloud-events-$attribute, such as cloud-events-subject.

The configuration shown above is applied to all the outgoing messages. Sometimes, you want to customize the value for each message individually. To achieve this, you can also attach io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata to your message to customize the id, source, type and subject for each message:

@Outgoing("cloud-events")
public Message<String> toCloudEvents(Message<String> in) {
    return in.addMetadata(OutgoingCloudEventMetadata.builder()
      .withId("id-" + in.getPayload())
      .withType("greetings")
      .withSource(URI.create("http://example.com"))
      .withSubject("greeting-message") .build());
}

The connector also supports the structured mode. You can write structured Cloud Events by setting the cloud-events-mode attribute to structured. It only supports JSON at the moment. The written record gets its content-type header set to application/cloudevents+json; charset=UTF-8, which allows the receiver to understand that it’s a structured Cloud Event.

Consuming Cloud Event from Kafka

Naturally, the connector can also consume Cloud Events. The connector detects Cloud Events automatically by checking the record’s headers. It also determines the mode.

When the connector receives a Cloud Event, it attaches an IncomingKafkaCloudEventMetadata to the message metadata. So, you can retrieve the various attributes as well as the extensions:

public Message<Double> process(Message<Integer> priceInUsd) {
  IncomingCloudEventMetadata<Integer> cloudEventMetadata = priceInUsd.getMetadata(IncomingCloudEventMetadata.class)
    .orElseThrow(() -> new IllegalArgumentException("Expected a Cloud Event"));

  LOGGER.infof("Received Cloud Events (spec-version: %s): source:  '%s', type: '%s', subject: '%s' ",
    cloudEventMetadata.getSpecVersion(),
    cloudEventMetadata.getSource(),
    cloudEventMetadata.getType(),
    cloudEventMetadata.getSubject().orElse("no subject"));

  return priceInUsd.withPayload(Integer.valueOf(priceInUsd.getPayload()) * CONVERSION_RATE);
}

Summary

With the rise of event-driven architecture, Cloud Events are becoming highly popular. Since Quarkus 1.9, the Kafka Connector used in Quarkus has built-in support for Cloud Events. This post introduced Cloud Events and showed how you could write and read Cloud Events easily.

Many more options are available, and Kafka is not the only part of Quarkus with Cloud Events support. For example, Funqy[https://quarkus.io/guides/funqy#context-injection] also supports Cloud Event out of the box.