Getting Started with AMQP in your Quarkus application
AMQP 1.0 is a standard for passing messages between applications or organizations. It connects systems, feeds business processes with the information they need, and reliably handles communication between systems. AMQP is a robust and mature protocol widely used in event-driven applications.
This post is the equivalent of the Kafka getting started post, but focuses on the usage of AMQP. You will learn how to get started with AMQP in your Quarkus application in less than ten steps. We will use SmallRye Reactive Messaging - a declarative approach to building event-driven microservices.
The complete code is available from GitHub. |
Step 1 - Generate your project
Let’s start with the very beginning, getting a new project structure with the right dependencies. Go to https://code.quarkus.io, enter your group id and artifact id. Then in the extension list, select:
-
SmallRye Reactive Messaging - AMQP Connector
-
RESTEasy Jackson
You can disable the "Example Code" to avoid the generated project containing examples. |
Then, click on Generate your application, download the project as a zip file, unzip it, and load it in your favorite IDE.
If you opened the generated pom.xml
, you would see that the quarkus-smallrye-reactive-messaging-amqp
and quarkus-resteasy-jackson
dependencies are declared, so we’re ready to write some code.
Step 2 - What are we going to exchange?
We need something to exchange.
Without much originality, let’s say we will send and receive Movie
objects.
In your project, create the org.acme.Movie
class with the following content:
package org.acme;
public class Movie {
public String title;
public int year;
}
With AMQP, we exchange messages, which can have multiple data sections (or multiple AMQP sequences, or a single AMQP value section).
In our application, as we are exchanging Movie
object, it encodes the instances as JSON and transfers it in a single data section.
The content-type
header is set to application/json
.
AMQP messages are sent to a destination. To keep things simple, let’s name it movies.
Step 3 - Configure the application
As said above, we will use Reactive Messaging.
When you use Reactive Messaging, you send messages to a channel and receive them from another channel.
These channels are mapped to the underlying messaging technology by configuration.
We must indicate that our reception and publication channels will use the movies address in our application.
In src/main/resources/application.properties
, add the following content:
# The AMQP broker location and credentials
amqp-host=localhost
amqp-port=5672
amqp-username=quarkus
amqp-password=quarkus
# Configuring the incoming channel (reading from AMQP)
mp.messaging.incoming.movies-in.connector=smallrye-amqp
mp.messaging.incoming.movies-in.address=movies
# Configuring the outgoing channel (writing to AMQP)
mp.messaging.outgoing.movies-out.connector=smallrye-amqp
mp.messaging.outgoing.movies-out.address=movies
After having configured the broker location and credentials (amqp-
properties), we configure our two channels: movies-in
(receiving the records) and movies-out
(publishing the records).
We use the mp.messaging.incoming.movies-in
prefix to configure the channel.
The connector
attribute indicates who’s responsible for this channel, here the AMQP connector.
We also need to specify the consumed destination using the address
attribute.
To configure the outbound movies-out
channel, we use the mp.messaging.outgoing.movies-out
prefix.
In addition to indicating who’s responsible for that channel, we also need to configure the address.
Step 4 - Publishing movies to AMQP
Now, it’s time to send messages.
Create the org.acme.MovieProducer
class with the following content:
package org.acme;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class MovieProducer {
@Inject
@Channel("movies-out")
Emitter<Movie> emitter;
public void send(Movie movie) {
emitter.send(movie);
}
}
In this class, we inject an Emitter,
i.e., an object responsible for sending a message to a channel.
This emitter is attached to the movies-out
channel (and will send messages to AMQP).
The connector automatically encoded the content as JSON and set the content-type
header.
You need to make sure your payload can be encoded to JSON. |
So, the rest of our application can use the send
method to send a movie to our AMQP destination.
Step 5 - Consuming movies
Let’s now look at the other side and retrieve the movies from AMQP.
package org.acme;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MovieConsumer {
private final Logger logger = Logger.getLogger(MovieConsumer.class);
@Incoming("movies-in")
public void receive(Movie movie) {
logger.infof("Got a movie: %d - %s", movie.year, movie.title);
}
}
Here, we use the @Incoming
annotation to indicate to Quarkus to call the receive
method for every received record.
Remember, the movie is encoded into JSON, so we need to help the connector produce a Movie
from the received JSON.
Create the org.acme.JsonToObjectConverter
class with the following content:
package org.acme;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.vertx.core.json.JsonObject;
import org.eclipse.microprofile.reactive.messaging.Message;
import javax.enterprise.context.ApplicationScoped;
import java.lang.reflect.Type;
@ApplicationScoped
public class JsonToObjectConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> in, Type target) {
return in.getMetadata(IncomingAmqpMetadata.class)
.map(meta -> meta.getContentType().equals("application/json") && target instanceof Class)
.orElse(false);
}
@Override
public Message<?> convert(Message<?> in, Type target) {
return in.withPayload(((JsonObject) in.getPayload()).mapTo((Class<?>) target));
}
}
This class is a converter.
It maps the content of a Message
to another type.
In the canConvert
method, we verify that the incoming message is coming from AMQP (so contain the IncomingAmqpMetadata
metadata) and that the content-type is set to application/json
.
The convert
method maps the received JsonObject
into the target type (Movie
in our case).
With this converter, our consume
method will receive the Movie
objects.
Step 6 - Sending movies from a REST endpoint
It’s quite common to send messages to AMQP from a REST endpoint.
To do this, create the org.acme.MovieResource
class with the following content:
package org.acme;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class MovieResource {
@Inject
MovieProducer producer;
@POST
public Response send(Movie movie) {
producer.send(movie);
// Return an 202 - Accepted response.
return Response.accepted().build();
}
}
This class uses the MovieProducer
we implemented above to send the movies
.
You could also use an Emitter
directly.
Step 7 - Let’s get this running!
Well, first, we need an AMQP broker, for example Apache ActiveMQ Artemis.
You can follow the Getting Started with Artemis documentation, or use the following docker-compose.yaml
file:
version: '2'
services:
artemis:
image: vromero/activemq-artemis:2-alpine-latest
ports:
- "5672:5672"
- "8161:8161"
- "61616:61616"
environment:
ARTEMIS_USERNAME: quarkus
ARTEMIS_PASSWORD: quarkus
Copy the docker-compose.yaml
file in your project, and from a terminal, start your broker with: `docker-compose up -d'
Then, run the application using:
./mvnw quarkus:dev
The application runs in dev mode, meaning that you can still update the code. It will reload it automatically.
In another terminal, emit a few HTTP POST request such as:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"The Shawshank Redemption"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1972, "title":"The Godfather"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2008, "title":"The Dark Knight"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":1994, "title":"Pulp Fiction"}' \
http://localhost:8080/
curl --header "Content-Type: application/json" \
--request POST \
--data '{"year":2010, "title":"Inception"}' \
http://localhost:8080/
In the terminal running the application, you will see:
...
2021-01-27 09:29:41,087 INFO [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 1994 - Pulp Fiction
2021-01-27 09:29:41,114 INFO [org.acm.MovieConsumer] (vert.x-eventloop-thread-9) Got a movie: 2010 - Inception
...
It works!
Step 8 - Native packaging
If you have GraalVM installed and configured correctly, you can package this application as a native executable:
./mvnw package -Pnative
Then, execute your native executable with: ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner
, and you get a Quarkus application using AMQP starting in a few milliseconds and consuming a ridiculous amount of memory: only 33Mb after 100 ingested records!
$ rss getting-started-amqp-1.0.0-SNAPSHOT-runner
PID 0M COMMAND
54986 33M ./target/getting-started-amqp-1.0.0-SNAPSHOT-runner
Summary
In less than 10 minutes, we have a new Quarkus application using AMQP. If you want to go further, check the AMQP guide.