Quarkus Messaging Extensions
Event-driven messaging systems have become the backbone of most modern applications, enabling the building of message-driven microservices or complex data streaming pipelines.
Quarkus offers a comprehensive suite of messaging extensions designed to synchronize with leading messaging technologies effortlessly. This empowers developers to concentrate on crafting the core application logic, liberating them from the necessity to delve into the complexities of individual APIs and messaging infrastructures.
This page focuses on common features and the development model for all messaging extensions.
Some of these extensions are maintained in the core Quarkus repository:
-
Messaging: The core extension defines the basic concepts and APIs to develop messaging applications
-
Messaging - MQTT Connector
Some extensions are contributed and maintained by the community:
Other connectors, such as the JMS Connector or the Google PubSub Connector, do not benefit from the same level of integration and require more manual configuration to set up.
On the other hand, some messaging-related extensions propose low-level provider-specific integrations. The level of support covered on this page DOES NOT involve these low-level extensions. A non-exhaustive list of this kind of extension are the following:
Quarkus Messaging Development Model
Quarkus simplifies message-driven application development by establishing a uniform model for publishing, consuming, and processing messages, regardless of whether the underlying broker technology uses message queuing or event streaming. Built upon the MicroProfile Reactive Messaging specification, Quarkus Messaging extensions ensure seamless integration with these technologies. Importantly, proficiency in reactive programming is NOT a prerequisite for leveraging these capabilities.
The Reactive Messaging specification defines a CDI-based programming model for implementing event-driven and message-driven applications. Using a small set of annotations, CDI beans become building blocks for implementing interactions with message brokers. These interactions happen through channels where application components read and write messages.
Channels are identified by a unique name and declared using a set of annotations.
@Incoming
and @Outgoing
annotations
@Incoming
and @Outgoing
method annotations define channels allowing to consume messages from and produce messages to the message broker:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
@Outgoing("sink")
public String process(String consumedPayload) {
// Process the incoming message payload and return an updated payload
return consumedPayload.toUpperCase();
}
}
@Outgoing
can be used by itself on a method to generate messages:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageGeneratorBean {
@Outgoing("sink")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
@Incoming
can be used by itself to consume messages:
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MessageProcessingBean {
@Incoming("source")
public void process(String consumedPayload) {
// process the payload
consumedPayload.toUpperCase();
}
}
Note that you should not call methods annotated with |
You can read more on supported method signatures in the SmallRye Reactive Messaging – Supported signatures.
Emitters and @Channel
annotation
An application often needs to combine messaging with other parts of the application, ex. produce messages from HTTP endpoints, or stream consumed messages as a response.
To send messages from imperative code to a specific channel, you need to inject an Emitter
object identified by the @Channel
annotation:
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
Emitter<Double> emitter;
@GET
@Path("/send")
public CompletionStage<Void> send(double d) {
return emitter.send(d);
}
}
The @Channel
annotation lets you indicate to which channel you will send your payloads or messages.
The Emitter
allows buffering messages sent to the channel.
For more control, using Mutiny APIs, you can use the MutinyEmitter
emitter interface:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.MutinyEmitter;
@ApplicationScoped
@Path("/")
public class MyImperativeBean {
@Channel("prices")
MutinyEmitter<Double> emitter;
@GET
@Path("/send")
public void send(double d) {
emitter.sendAndAwait(d);
}
}
The @Channel
annotation can also be used to inject the stream of messages from an incoming channel:
import org.eclipse.microprofile.reactive.messaging.Channel;
@ApplicationScoped
@Path("/")
public class SseResource {
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
When consuming messages with @Channel
, the application code is responsible for subscribing to the stream.
In the example above, the Quarkus REST (formerly RESTEasy Reactive) endpoint handles that for you.
You can read more on the emitters and channels in the SmallRye Reactive Messaging – Emitter and Channels documentation.
Messages and Metadata
A Message
is an envelope around a payload.
In the examples above only payloads were used, but every payload is wrapped around a Message
internally in Quarkus Messaging.
The Message<T>
interface associates a payload of type <T>
with Metadata
,
a set of arbitrary objects and asynchronous actions for acknowledgement (ack) and negative acknowledgement (nack).
import org.eclipse.microprofile.reactive.messaging.Message;
@Incoming("source")
@Outgoing("sink")
public Message<String> process(Message<String> consumed) {
// Access the metadata
MyMetadata my = consumed.getMetadata(MyMetadata.class).get();
// Process the incoming message and return an updated message
return consumed.withPayload(consumed.getPayload().toUpperCase());
}
A message is acknowledged back to the broker when its processing or reception has been successful.
Acknowledgements between messages are chained, meaning that when processing a message,
the acknowledgement of an outgoing message triggers the acknowledgement of incoming message(s).
In most cases, acks and nacks are managed for you and connectors allow you to configure different strategies per channel.
So, you usually don’t need to interact with the Message
interface directly.
Only advanced use cases require dealing with the Message directly.
Accessing the Metadata
, on the other hand, can be practical in many cases.
Connectors add specific metadata objects to the message to give access to the message headers, properties, and other connector-specific information.
You do not need to interact with the Message
interface to access connector-specific metadata.
You can simply inject the metadata object as a method parameter after the payload parameter:
import org.eclipse.microprofile.reactive.messaging.Metadata;
@Incoming("source")
@Outgoing("sink")
public String process(String payload, MyMetadata my) {
// Access the metadata
Map<String, Object> props = my.getProperties();
// Process the payload and return an updated payload
return payload.toUpperCase();
}
Depending on the connector, payload types available to consume in processing methods differ.
You can implement a custom MessageConverter
to transform the payload to a type that is accepted by your application.
Channel configuration
Channel attributes can be configured using the mp.messaging.incoming.<channel-name>
and mp.messaging.outgoing.<channel-name>
configuration properties.
For example, to configure the Kafka connector to consume messages from the my-topic
topic with a custom deserializer:
mp.messaging.incoming.source.connector=smallrye-kafka
mp.messaging.incoming.source.topic=my-topic
mp.messaging.incoming.source.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.source.auto.offset.reset=earliest
The connector
attribute is required for all channels and specifies the connector to use.
You can omit this configuration if you have a single connector on your classpath, as Quarkus will automatically select the connector.
Global channel attributes can be configured using the connector name:
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
Connector-specific attributes are listed in connector documentation.
Channel wiring and Messaging patterns
At startup time, Quarkus analyzes declared channels to wire them together and verify that all channels are connected. Concretely, each channel creates a reactive stream of messages connected to another channel’s reactive stream of messages. Adhering to the reactive stream protocol, the back-pressure mechanism is enforced between channels, allowing to control application resource usage and not over-commit and overloading part of the system.
On the flip side it is NOT possible to create new channels programmatically at runtime. There are, however, many patterns that let you implement most, if not all, messaging and integration use cases:
Some messaging technologies allow consumers to subscribe to a set of topics or queues, and producers to send messages to a specific topic on message basis. If you are sure you need to configure and create clients dynamically at runtime, you should consider using the low-level clients directly. |
Internal Channels
In some use cases, it is convenient to use messaging patterns to transfer messages inside the same application. When you don’t connect a channel to a messaging backend, i.e. a connector, everything happens internally to the application, and the streams are created by chaining methods together. Each chain is still a reactive stream and enforces the back-pressure protocol.
The framework verifies that the producer/consumer chain is complete,
meaning that if the application writes messages into an internal channel (using a method with only @Outgoing
, or an Emitter
),
it must also consume the messages from within the application (using a method with only @Incoming
or using an unmanaged stream).
Enable/Disable channels
All defined channels are enabled by default, but it is possible to disable a channel with the configuration:
mp.messaging.incoming.my-channel.enabled=false
This can be used alongside Quarkus build profiles to enable/disable channels based on some build-time condition, such as the the target environment. You need to make sure of two things when disabling a channel:
-
the disabled channel usage is located in a bean that can be filtered out at build time,
-
that without the channel, the remaining channels still work correctly.
@ApplicationScoped
@IfBuildProfile("my-profile")
public class MyProfileBean {
@Outgoing("my-channel")
public Multi<String> generate() {
return Multi.createFrom().items("a", "b", "c");
}
}
Multiple Outgoings and @Broadcast
By default, messages transmitted in a channel are only dispatched to a single consumer. Having multiple consumers is considered an error and is reported at deployment time.
The @Broadcast
annotation changes this behavior and indicates that messages transiting in the channel are dispatched to all the consumers.
@Broadcast
must be used with the @Outgoing
annotation:
import org.eclipse.microprofile.reactive.messaging.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out")
@Broadcast
public int increment(int i) {
return i + 1;
}
@Incoming("out")
public void consume1(int i) {
//...
}
@Incoming("out")
public void consume2(int i) {
//...
}
Similarly to @Broadcast
, you can use @Outgoing
annotation multiple times on the same method to indicate that the method produces messages to multiple channels:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
public String process(String s) {
// send messages from channel in to both channels out1 and out2
return s.toUpperCase();
}
Using Multiple Outgoings can be useful for implementing fan-out patterns, in which a single message is processed by multiple target channels.
You can selectively dispatch messages to multiple outgoings by returning Targeted
from the processing method:
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
// send messages from channel-in to both channel-out1 and channel-out2
Targeted targeted = Targeted.of("out1", "Price: " + price, "out2", "Quote: " + price);
if (price > 90.0) {
return targeted.with("out3", price);
}
return targeted;
}
Multiple Incomings and @Merge
By default, a single producer can transmit messages in a channel.
Having multiple producers is considered erroneous and is reported at deployment time.
The @Merge
annotation changes this behavior and indicates that a channel can have multiple producers.
@Merge
must be used with the @Incoming
annotation:
@Incoming("in1")
@Outgoing("out")
public int increment(int i) {
return i + 1;
}
@Incoming("in2")
@Outgoing("out")
public int multiply(int i) {
return i * 2;
}
@Incoming("out")
@Merge
public void getAll(int i) {
//...
}
Similarly to @Merge
, you can use @Incoming
annotation multiple times on the same method to indicate that the method consumes messages from multiple channels:
@Incoming("in1")
@Incoming("in2")
public String process(String s) {
// get messages from channel-1 and channel-2
return s.toUpperCase();
}
Stream Processing
In some advanced scenarios, you can manipulate directly the stream of messages instead of each individual message.
Using Mutiny APIs in incoming and outgoing signatures allow you to process the stream of messages:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class StreamProcessor {
@Incoming("source")
@Outgoing("sink")
public Multi<String> process(Multi<String> in) {
return in.map(String::toUpperCase);
}
}
Execution Model
Quarkus Messaging sits on top of the reactive engine of Quarkus and leverages Eclipse Vert.x to dispatch messages for processing. It supports three execution modes:
-
Event-loop, where messages are dispatched on the Vert.x I/O thread. Remember that you should not perform blocking operations on the event loop.
-
Worker-threads, where messages are dispatched on a worker thread pool.
-
Virtual-threads, where messages are dispatched on a virtual thread (requires Java 21+). As virtual threads are not pooled, a new virtual thread is created for each message. Please refer to the dedicated Quarkus Virtual Thread support guide for more information.
Quarkus chooses the default execution mode based on the method signature. If the method signature is synchronous, messages are dispatched on worker threads otherwise it defaults to event-loop:
Method signature | Default execution mode |
---|---|
@Incoming("source") void process(String payload) |
Worker-threads |
@Incoming("source") Uni<Void> process(String payload) |
Event-loop |
@Incoming("source") CompletionStage<Void> process(Message<String> message) |
Event-loop |
@Incoming("source") @Outgoing("sink") Multi<R> process(Multi<T> in) |
Stream-processing methods are executed at startup, then each message is dispatched on event loop. |
Fine-grained control over the execution model is possible using annotations:
-
@Blocking
will force the method to be executed on a worker thread pool. The default pool of worker threads is shared between all channels. Using@Blocking("my-custom-pool")
you can configure channels with a custom thread pool. The configuration propertysmallrye.messaging.worker.my-custom-pool.max-concurrency
specifies the maximum number of threads in the pool. You can read more on the blocking processing in SmallRye Reactive Messaging documentation. -
@NonBlocking
will force the method to be executed on the event-loop thread. -
@RunOnVirtualThread
will force the method to be executed on a virtual thread. To leverage the lightweight nature of virtual threads, the default maximum concurrency for methods annotated with@RunOnVirtualThread
is 1024. This can be changed by setting thesmallrye.messaging.worker.<virtual-thread>.max-concurrency
configuration property or using together with the@Blocking("my-custom-pool")
annotation.
The presence of @Transactional
annotation implies blocking execution.
In messaging applications, produced and consumed messages constitute an ordered stream of events,
either enforced by the broker (inside a topic or a queue)
or by the order of reception and emission in the application.
To preserve this order, Quarkus Messaging dispatches messages sequentially by default.
You can override this behavior by using @Blocking(ordered = false)
or @RunOnVirtualThread
annotation.
Incoming Channel Concurrency
Some connectors support configuring the concurrency level of incoming channels.
mp.messaging.incoming.my-channel.concurrency=4
This creates four copies of the incoming channel under the hood, wiring them to the same processing method. Depending on the broker technology, this can be useful to increase the application’s throughput by processing multiple messages concurrently while still preserving the partial order of messages received in different copies. This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.
Health Checks
Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel. The implementation of startup, readiness and liveness checks depends on the connector. Some connectors allow configuring the health check behavior or disabling them completely or per channel.
Channel health checks can be disabled using quarkus.messaging.health.<channel-name>.enabled
or per health check type,
ex. quarkus.messaging.health.<channel-name>.liveness.enabled
.
Setting the quarkus.messaging.health.enabled
configuration property to false
completely disables the messaging health checks.
Observability
Micrometer Metrics
Quarkus Messaging extensions provide simple but useful metrics to monitor the health of the messaging system. The Micrometer extension exposes these metrics.
The following metrics can be gathered per channel, identified with the channel
tag:
-
quarkus.messaging.message.count
: The number of messages produced or received -
quarkus.messaging.message.acks
: The number of messages processed successfully -
quarkus.messaging.message.failures
: The number of messages processed with failures -
quarkus.messaging.message.duration
: The duration of the message processing
For backwards compatibility reasons, channel metrics are not enabled by default and can be enabled with: smallrye.messaging.observation.enabled=true
.
OpenTelemetry Tracing
Some Quarkus Messaging connectors integrate out-of-the-box with OpenTelemetry Tracing. When the OpenTelemetry extension is present, outgoing messages propagate the current tracing span. On incoming channels, if a received message contains tracing information, the message processing inherits the message span as parent.
You can disable tracing for a specific channel using the following configuration:
mp.messaging.incoming.data.tracing-enabled=false
Configuración TLS
Some messaging extensions integrate with the Quarkus TLS Registry to configure the underlying client.
To configure the TLS on a channel, you need to provide the named TLS configuration to the tls-configuration-name
property:
quarkus.tls.my-tls-config.trust-store=truststore.jks
quarkus.tls.my-tls-config.trust-store-password=secret
mp.messaging.incoming.my-channel.tls-configuration-name=my-tls-config
Or you can configure it globally on all channels of a connector:
mp.messaging.connector.smallrye-pulsar.tls-configuration-name=my-tls-config
Currently, the following messaging extensions support configuration through the Quarkus TLS Registry:
-
Kafka: Provides the
ssl.engine.factory.class
property for the Kafka client. -
Pulsar: Only mTLS authentication is supported.
-
RabbitMQ
-
AMQP 1.0
-
MQTT
Probando
Testing with Dev Services
Most Quarkus Messaging extensions provide a Dev Service to simplify the development and testing of applications. The Dev Service creates a broker instance configured to work out-of-the-box with the Quarkus Messaging extension.
During testing Quarkus creates a separate broker instance to run the tests against it.
You can read more about Dev Services in the Dev Services guide, including a list of Dev Services provided by platform extensions.
Testing with InMemoryConnector
It can be useful to test the application without starting a broker. To achieve this, you can switch the channels managed by a connector to in-memory.
This approach only works for JVM tests. It cannot be used for native tests (because they do not support injection). |
Let’s say we want to test the following sample application:
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyMessagingApplication {
@Inject
@Channel("words-out")
Emitter<String> emitter;
public void sendMessage(String out) {
emitter.send(out);
}
@Incoming("words-in")
@Outgoing("uppercase")
public Message<String> toUpperCase(Message<String> message) {
return message.withPayload(message.getPayload().toUpperCase());
}
}
First, add the following test dependency to your application:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.smallrye.reactive:smallrye-reactive-messaging-in-memory")
Then, create a Quarkus Test Resource as follows:
public class InMemoryConnectorLifecycleManager implements QuarkusTestResourceLifecycleManager {
@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("words-in"); (1)
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("uppercase"); (2)
Map<String, String> props3 = InMemoryConnector.switchOutgoingChannelsToInMemory("words-out"); (3)
env.putAll(props1);
env.putAll(props2);
env.putAll(props3);
return env; (4)
}
@Override
public void stop() {
InMemoryConnector.clear(); (5)
}
}
1 | Switch the incoming channel words-in (consumed messages) to in-memory. |
2 | Switch the outgoing channel words-out (produced messages) to in-memory. |
3 | Switch the outgoing channel uppercase (processed messages) to in-memory. |
4 | Builds and returns a Map containing all the properties required to configure the application to use in-memory channels. |
5 | When the test stops, clear the InMemoryConnector (discard all the received and sent messages) |
Create a @QuarkusTest
using the test resource created above:
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.reactive.messaging.memory.InMemoryConnector;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.junit.jupiter.api.Test;
import jakarta.inject.Inject;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.awaitility.Awaitility.await;
@QuarkusTest
@QuarkusTestResource(InMemoryConnectorLifecycleManager.class)
class MyMessagingApplicationTest {
@Inject
@Connector("smallrye-in-memory")
InMemoryConnector connector; (1)
@Inject
MyMessagingApplication app;
@Test
void test() {
InMemorySink<String> wordsOut = connector.sink("words-out"); (2)
InMemorySource<String> wordsIn = connector.source("words-in"); (3)
InMemorySink<String> uppercaseOut = connector.sink("uppercase"); (4)
app.sendMessage("Hello"); (5)
assertEquals("Hello", wordsOut.received().get(0).getPayload()); (6)
wordsIn.send("Bonjour"); (7)
await().untilAsserted(() -> assertEquals("BONJOUR", uppercaseOut.received().get(0).getPayload())); (8)
}
}
1 | Inject the in-memory connector in your test class, using the @Connector or @Any qualifier. |
2 | Retrieve the outgoing channel (words-out ) - the channel must have been switched to in-memory in the test resource. |
3 | Retrieve the incoming channel (words-in ) |
4 | Retrieve the outgoing channel (uppercase ) |
5 | Use the injected application bean to call sendMessage method to send a message using the emitter with the channel words-out . |
6 | Use the received method on words-out in-memory channel to check the message produced by the application. |
7 | Use the send mwthod on words-in in-memory channel to send a message.
The application will process this message and send a message to uppercase channel. |
8 | Use the received method on uppercase channel to check the messages produced by the application. |
In-memory connector is solely intended for testing purposes. There are some caveats to consider when using the in-memory connector:
The If your tests are dependent on context propagation, you can configure the in-memory connector channels with |
Ir más allá
This guide shows the general principles of Quarkus Messaging extensions.
If you want to go further, you can check the SmallRye Reactive Messaging documentation, which has in-depth documentation for each of these concepts and more.