Guía de referencia Vert.x
Vert.x es un conjunto de herramientas para construir aplicaciones reactivas. Como se describe en la Arquitectura Reactiva de Quarkus, Quarkus utiliza Vert.x por debajo.
Esta guía es el complemento de la guía Uso de la API Vert.x de Eclipse desde una aplicación Quarkus. Proporciona detalles más avanzados sobre el uso y la configuración de la instancia Vert.x utilizada por Quarkus.
Acceso a la instancia Vert.x
Para acceder a la instancia gestionada de Vert.x, añada la extensión quarkus-vertx a su proyecto.
Es posible que esta dependencia ya esté disponible en su proyecto (como dependencia transitiva).
Con esta extensión, puede recuperar la instancia gestionada de Vert.x utilizando la inyección por campo o constructor:
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
Puede inyectar:
-
la instancia
io.vertx.core.Vertxque expone la API Vert.x bare -
la instancia
io.vertx.mutiny.core.Vertxque expone la API Mutiny
Recomendamos utilizar la variante Mutiny, ya que se integra con las demás APIs reactivas proporcionadas por Quarkus.
|
Mutiny
Si no está familiarizado con Mutiny, consulte Mutiny - una biblioteca de programación reactiva intuitiva. |
La documentación sobre la variante Vert.x Mutiny está disponible en smallrye.io.
Configurar la instancia Vert.x
Puede configurar la instancia Vert.x desde el archivo application.properties.
La siguiente tabla enumera las propiedades admitidas:
Propiedad de configuración fijada en tiempo de compilación - Todas las demás propiedades de configuración son anulables en tiempo de ejecución
Configuration property |
Tipo |
Por defecto |
|---|---|---|
Enables or disables the Vert.x cache. Environment variable: Show more |
boolean |
|
Configure the file cache directory. When not set, the cache is stored in the system temporary directory (read from the Note that this property is ignored if the Environment variable: Show more |
string |
|
Enables or disabled the Vert.x classpath resource resolver. Environment variable: Show more |
boolean |
|
The number of event loops. By default, it matches the number of CPUs detected on the system. Environment variable: Show more |
int |
|
The maximum amount of time the event loop can be blocked. Environment variable: Show more |
|
|
The amount of time before a warning is displayed if the event loop is blocked. Environment variable: Show more |
|
|
The maximum amount of time the worker thread can be blocked. Environment variable: Show more |
|
|
The size of the internal thread pool (used for the file system). Environment variable: Show more |
int |
|
The queue size. For most applications this should be unbounded Environment variable: Show more |
int |
|
The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of Environment variable: Show more |
float |
|
The amount of time a thread will stay alive with no work. Environment variable: Show more |
|
|
Prefill thread pool when creating a new Executor. When Environment variable: Show more |
boolean |
|
Enables the async DNS resolver. Environment variable: Show more |
boolean |
|
PEM Key/cert config is disabled by default. Environment variable: Show more |
boolean |
|
Comma-separated list of the path to the key files (Pem format). Environment variable: Show more |
list of string |
|
Comma-separated list of the path to the certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
boolean |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
boolean |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
PEM Trust config is disabled by default. Environment variable: Show more |
boolean |
|
Comma-separated list of the trust certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
boolean |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
boolean |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
The accept backlog. Environment variable: Show more |
int |
|
The client authentication. Environment variable: Show more |
string |
|
The connect timeout. Environment variable: Show more |
|
|
The idle timeout in milliseconds. Environment variable: Show more |
||
The receive buffer size. Environment variable: Show more |
int |
|
The number of reconnection attempts. Environment variable: Show more |
int |
|
The reconnection interval in milliseconds. Environment variable: Show more |
|
|
Whether to reuse the address. Environment variable: Show more |
boolean |
|
Whether to reuse the port. Environment variable: Show more |
boolean |
|
The send buffer size. Environment variable: Show more |
int |
|
The so linger. Environment variable: Show more |
int |
|
Enables or Disabled SSL. Environment variable: Show more |
boolean |
|
Whether to keep the TCP connection opened (keep-alive). Environment variable: Show more |
boolean |
|
Configure the TCP no delay. Environment variable: Show more |
boolean |
|
Configure the traffic class. Environment variable: Show more |
int |
|
Enables or disables the trust all parameter. Environment variable: Show more |
boolean |
|
The host name. Environment variable: Show more |
string |
|
int |
||
The public host name. Environment variable: Show more |
string |
|
The public port. Environment variable: Show more |
int |
|
Enables or disables the clustering. Environment variable: Show more |
boolean |
|
The ping interval. Environment variable: Show more |
|
|
The ping reply interval. Environment variable: Show more |
|
|
The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever. Environment variable: Show more |
int |
|
The minimum amount of time in seconds that a successfully resolved address will be cached. Environment variable: Show more |
int |
|
The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached. Environment variable: Show more |
int |
|
The maximum number of queries to be sent during a resolution. Environment variable: Show more |
int |
|
The duration after which a DNS query is considered to be failed. Environment variable: Show more |
|
|
Set the path of an alternate hosts configuration file to use instead of the one provided by the os. The default value is Environment variable: Show more |
string |
|
Set the hosts configuration refresh period in millis, The resolver caches the hosts configuration (configured using Environment variable: Show more |
int |
|
Set the list of DNS server addresses, an address is the IP of the dns server, followed by an optional colon and a port, e.g Environment variable: Show more |
list of string |
|
Set to true to enable the automatic inclusion in DNS queries of an optional record that hints the remote DNS server about how much data the resolver can read per response. Environment variable: Show more |
boolean |
|
Set the DNS queries Recursion Desired flag value. Environment variable: Show more |
boolean |
|
Set the lists of DNS search domains. When the search domain list is null, the effective search domain list will be populated using the system DNS search domains. Environment variable: Show more |
list of string |
|
Set the ndots value used when resolving using search domains, the default value is Environment variable: Show more |
int |
|
Set to Environment variable: Show more |
boolean |
|
Set to Environment variable: Show more |
boolean |
|
Enable or disable native transport Environment variable: Show more |
boolean |
|
This property is deprecated: use The size of the worker pool size. Environment variable: Show more |
int |
|
|
About the Duration format
To write duration values, use the standard You can also use a simplified format, starting with a number:
In other cases, the simplified format is translated to the
|
Consulte Personalizar la configuración de Vert.x para configurar la instancia Vert.x mediante un enfoque programático.
Uso de clientes Vert.x
Además del núcleo Vert.x, puede utilizar la mayoría de las bibliotecas del ecosistema Vert.x. Algunas extensiones de Quarkus ya envuelven bibliotecas Vert.x.
APIs disponibles
La siguiente tabla enumera las bibliotecas más utilizadas del ecosistema Vert.x. Para acceder a estas APIs, añada la extensión o dependencia indicada a su proyecto. Consulte la documentación asociada para aprender a utilizarlas.
API |
Extensión o dependencia |
Documentación |
Cliente AMQP |
|
|
Interruptor automático |
|
|
Cliente Consul |
|
|
Cliente DB2 |
|
|
Cliente Kafka |
|
|
Cliente de correo |
|
|
Cliente MQTT |
|
Todavía no hay guía |
Cliente MS SQL |
|
|
Cliente MySQL |
|
|
Cliente Oracle |
|
|
Cliente PostgreSQL |
|
|
Cliente RabbitMQ |
|
|
Cliente Redis |
|
|
Cliente web |
|
Para saber más sobre el uso de la API de Vert.x Mutiny, consulte smallrye.io.
Uso del cliente web Vert.x
Esta sección ofrece un ejemplo de utilización del WebClient de Vert.x en el contexto de una aplicación REST de Quarkus (anteriormente RESTEasy Reactive).
Como se indica en la tabla anterior, añada la siguiente dependencia a su proyecto:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
Ahora, en su código, puede crear una instancia de WebClient:
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
Este recurso crea un WebClient y, al recibir una petición, utiliza este cliente para invocar una API HTTP remota.
Dependiendo del resultado, la respuesta se reenvía tal y como se recibió, o crea un objeto JSON que envuelve el error.
El WebClient es asíncrono (y no bloqueante), por lo que el endpoint devuelve un Uni.
La aplicación también puede ejecutarse como un ejecutable nativo.
Pero, primero, necesitamos instruir a Quarkus para que habilite ssl (si la API remota utiliza HTTPS).
Abra el src/main/resources/application.properties y añada:
quarkus.ssl.native=true
Luego, cree el ejecutable nativo con:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Uso de Vert.x JSON
Las APIs de Vert.x dependen a menudo de JSON.
Vert.x proporciona dos clases convenientes para manipular documentos JSON: io.vertx.core.json.JsonObject y io.vertx.core.json.JsonArray.
JsonObject se puede utilizar para mapear un objeto en su representación JSON y construir un objeto a partir de un documento JSON:
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
Tenga en cuenta que estas funciones utilizan el mapeador gestionado por la extensión quarkus-jackson.
Consulte configuración de Jackson para personalizar el mapeo.
Tanto el objeto JSON como la matriz JSON se admiten como peticiones de endpoint HTTP de Quarkus y cuerpos de respuesta (utilizando RESTEasy clásico y REST de Quarkus). Considere estos endpoints:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]
Esto funciona igualmente bien cuando el contenido JSON es un cuerpo de petición o está envuelto en un Uni, Multi, CompletionStage o Publisher.
Uso de vértices
Verticles es "un modelo de despliegue y concurrencia simple, escalable y similar a un actor" proporcionado por _Vert.x. Este modelo no pretende ser una implementación estricta del modelo actor, pero comparte similitudes, especialmente en lo que respecta a la concurrencia, el escalado y el despliegue. Para utilizar este modelo, usted escribe y despliega vértices, que se comunican mediante el envío de mensajes en el bus de eventos.
Puede desplegar vértices en Quarkus. Es compatible con:
-
vértice bare - clases Java que extienden
io.vertx.core.AbstractVerticle -
vértice Mutiny - clases Java que extienden
io.smallrye.mutiny.vertx.core.AbstractVerticle
Desplegar vértices
Para desplegar vértices, utilice el método deployVerticle:
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
Si utiliza la variante Mutiny de Vert.x, tenga en cuenta que el método deployVerticle devuelve un Uni, y tendría que activar una suscripción para realizar el despliegue real.
| A continuación, un ejemplo que explica cómo desplegar vértices durante la inicialización de la aplicación. |
Uso de beans @ApplicationScoped como vértice
En general, los vértices Vert.x no son beans CDI. Por lo tanto, no pueden utilizar la inyección. Sin embargo, en Quarkus, puede desplegar vértices como beans. Tenga en cuenta que, en este caso, CDI (Arc en Quarkus) se encarga de crear la instancia.
El siguiente fragmento ofrece un ejemplo:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
No tiene que inyectar la instancia vertx; en su lugar, aproveche el campo protegido de AbstractVerticle.
Luego, despliegue las instancias de vértice con:
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
Si quiere desplegar cada AbstractVerticle expuesto, puede utilizar:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
Crear múltiples instancias de vértices
Cuando utilice @ApplicationScoped, obtendrá una única instancia para su vértice.
Tener varias instancias de vértices puede ser útil para repartir la carga entre ellas.
Cada una de ellas estará asociada a un hilo de E/S diferente (bucle de eventos Vert.x).
Para desplegar varias instancias de su vértice, utilice el ámbito @Dependent en lugar de @ApplicationScoped:
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
A continuación, despliegue su vértice de la siguiente manera:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
El método init recibe un Instance<MyVerticle>.
A continuación, pasa un proveedor al método deployVerticle.
El proveedor no hace más que llamar al método get().
Gracias al ámbito @Dependent, devuelve una nueva instancia en cada llamada.
Por último, pasa el número deseado de instancias a DeploymentOptions, como dos en el ejemplo anterior.
Llamará al proveedor dos veces, lo que creará dos instancias de su vértice.
Uso del bus de eventos
Vert.x viene con un bus de eventos incorporado que puede utilizar desde su aplicación Quarkus. De este modo, los componentes de su aplicación (beans CDI, recursos…) pueden interactuar utilizando eventos asíncronos, promoviendo así el acoplamiento débil.
Con el bus de eventos, usted envía mensajes a direcciones virtuales. El bus de eventos ofrece tres tipos de mecanismos de entrega:
-
punto a punto - se envía el mensaje, un consumidor lo recibe. Si varios consumidores escuchan la dirección, se aplica un round-robin;
-
publicar/suscribir - publicar un mensaje; todos los consumidores que escuchan la dirección reciben el mensaje;
-
petición/respuesta - se envía el mensaje y se espera una respuesta. El receptor puede responder al mensaje de forma asíncrona.
Todos estos mecanismos de entrega son no bloqueantes y proporcionan uno de los elementos fundamentales para construir aplicaciones reactivas.
Consumir eventos
Aunque puede utilizar la API de Vert.x para registrar consumidores, Quarkus viene con soporte declarativo.
Para consumir eventos, utilice la anotación io.quarkus.vertx.ConsumeEvent:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
| 1 | Si no se establece, la dirección es el nombre completo del bean; por ejemplo, en este fragmento, es org.acme.vertx.GreetingService. |
| 2 | El parámetro del método es el cuerpo del mensaje. Si el método devuelve algo, es la respuesta del mensaje. |
Configurar la dirección
La anotación @ConsumeEvent se puede configurar para establecer la dirección:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
| 1 | Recibir los mensajes enviados a la dirección greeting |
El valor de la dirección puede ser una expresión de propiedad.
En este caso, se utiliza el valor configurado en su lugar: @ConsumeEvent("${my.consumer.address}").
Además, la expresión de propiedad puede especificar un valor por defecto: @ConsumeEvent("${my.consumer.address:defaultAddress}").
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
| 1 | Recibir los mensajes enviados a la dirección configurada con la clave my.consumer.address. |
| Si no existe ninguna propiedad de configuración con la clave especificada y no se establece ningún valor por defecto, el inicio de la aplicación falla. |
Procesar eventos de forma asíncrona
Los ejemplos anteriores utilizan el procesamiento síncrono.
El procesamiento asíncrono también es posible devolviendo un io.smallrye.mutiny.Uni o un java.util.concurrent.CompletionStage:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
|
Mutiny
El ejemplo anterior utiliza tipos reactivos Mutiny. Si no está familiarizado con Mutiny, consulte Mutiny - una biblioteca de programación reactiva intuitiva. |
Procesamiento bloqueante de eventos
Por defecto, el código que consume el evento debe ser no bloqueante, ya que se llama en un hilo de E/S.
Si su procesamiento es bloqueante, utilice la anotación @io.smallrye.common.annotation.Blocking:
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
Alternativamente, puede utilizar el atributo blocking de la anotación @ConsumeEvent:
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
Cuando se utiliza @Blocking, se ignora el valor del atributo blocking de @ConsumeEvent.
Responder a eventos
El valor de retorno de un método anotado con @ConsumeEvent se utiliza para responder al mensaje entrante.
Por ejemplo, en el siguiente fragmento, el String devuelto es la respuesta.
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
También puede devolver un Uni<T> o un CompletionStage<T> para manejar la respuesta asíncrona:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
|
Puede inyectar un
|
Implementar interacciones de disparar y olvidar
No es necesario responder a los mensajes recibidos.
Normalmente, para una interacción de disparar y olvidar, los mensajes se consumen y el remitente no necesita saberlo.
Para implementar este patrón, su método consumidor devuelve void.
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Consumir mensajes (en lugar de eventos)
A diferencia del ejemplo anterior en el que se utilizaban directamente las cargas útiles, también se puede utilizar directamente Message:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
Manejo de fallos
Si un método anotado con @ConsumeEvent lanza una excepción, entonces:
-
si se establece un manejador de respuesta, entonces el fallo se propaga de vuelta al remitente a través de un
io.vertx.core.eventbus.ReplyExceptioncon el códigoConsumeEvent#FAILURE_CODEy el mensaje de excepción, -
si no se establece un manejador de respuesta, la excepción se vuelve a lanzar (y se envuelve en un
RuntimeExceptionsi es necesario) y puede ser manejada por el manejador de excepciones por defecto, es decir,io.vertx.core.Vertx#exceptionHandler().
Enviar mensajes
El envío y la publicación de mensajes utilizan el bus de eventos Vert.x:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
| 1 | Inyectar el bus de eventos |
| 2 | Envía un mensaje a la dirección greeting. El contenido del mensaje es name |
El objeto EventBus proporciona métodos para:
-
sendun mensaje a una dirección específica: un solo consumidor recibe el mensaje. -
publishun mensaje a una dirección específica: todos los consumidores reciben los mensajes. -
requestun mensaje y esperar una respuesta
// Case 1
bus.send("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
Procesar eventos en hilos virtuales
Los métodos anotados con @ConsumeEvent también pueden anotarse con @RunOnVirtualThread.
En este caso, el método se invoca en un hilo virtual.
Cada evento se invoca en un hilo virtual diferente.
Para utilizar esta función, asegúrese de que:
-
El tiempo de ejecución de Java admite hilos virtuales.
-
Su método utiliza una firma bloqueante.
El segundo punto significa que solo los métodos que devuelven un objeto o void pueden utilizar @RunOnVirtualThread.
Los métodos que devuelven un Uni o un CompletionStage no pueden ejecutarse en hilos virtuales.
Lea la guía de hilos virtuales para más detalles.
Uso de códecs
El Bus de Eventos Vert.x utiliza códecs para serializar y deserializar objetos de mensaje.
Quarkus proporciona un códec por defecto para la entrega local.
Este códec se utiliza automáticamente para los tipos de retorno y los parámetros del cuerpo del mensaje de los consumidores locales, es decir, los métodos anotados con @ConsumeEvent donde ConsumeEvent#local() == true (que es el predeterminado).
Para que pueda intercambiar los objetos de mensaje de la siguiente manera:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
Si quiere usar un códec específico, tiene que configurarlo en ambos extremos explícitamente:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
| 1 | Establezca el nombre del códec que se utilizará para enviar el mensaje |
| 2 | Establezca el códec que se utilizará para recibir el mensaje |
Combinar HTTP y el bus de eventos
Volvamos a un endpoint HTTP de saludo y utilicemos el paso asíncrono de mensajes para delegar la llamada a un bean separado. Utiliza el mecanismo de envío de petición/respuesta. En lugar de implementar la lógica de negocio dentro del endpoint REST de Jakarta, estamos enviando un mensaje. Otro bean consume este mensaje, y la respuesta se envía utilizando el mecanismo de respuesta.
En su clase de endpoint HTTP, inyecte el bus de eventos y utilice el método request para enviar un mensaje al bus de eventos y esperar una respuesta:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
| 1 | enviar el name a la dirección greeting y solicitar una respuesta |
| 2 | cuando obtenemos la respuesta, extraer el cuerpo y enviarlo al usuario |
el método HTTP devuelve un Uni.
Si está utilizando Quarkus REST, la compatibilidad con Uni está incorporada.
Si utiliza RESTEasy clásico, deberá añadir la extensión quarkus resteasy-mutiny a su proyecto.
|
Necesitamos un consumidor que escuche en la dirección greeting.
Este consumidor puede estar en la misma clase o en otro bean como:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
Este bean recibe el nombre y devuelve el mensaje de saludo.
Con esto en marcha, cada petición HTTP en /bus/quarkus envía un mensaje al bus de eventos, espera una respuesta, y cuando esta llega, escribe la respuesta HTTP:
Hello Quarkus
Para entenderlo mejor, vamos a detallar cómo se ha gestionado la petición/respuesta HTTP:
-
La petición es recibida por el método
greeting -
se envía un mensaje con el name al bus de eventos
-
Otro bean recibe este mensaje y calcula la respuesta
-
Esta respuesta se devuelve mediante el mecanismo de respuesta
-
Una vez que el remitente recibe la respuesta, el contenido se escribe en la respuesta HTTP
Comunicación bidireccional con los navegadores mediante SockJS
El puente SockJS proporcionado por Vert.x permite que las aplicaciones del navegador y las aplicaciones de Quarkus se comuniquen utilizando el bus de eventos. Conecta ambos lados. Así, ambos lados pueden enviar mensajes recibidos en el otro lado. Soporta los tres mecanismos de entrega.
SockJS negocia el canal de comunicación entre la aplicación Quarkus y el navegador. Si se admiten WebSockets, los utiliza; de lo contrario, se degrada a SSE, sondeo largo, etc.
Para utilizar SockJS, es necesario configurar el puente, especialmente las direcciones que se utilizarán para comunicarse:
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
Este código configura el puente SockJS para que envíe todos los mensajes dirigidos a la dirección ticks a los navegadores conectados.
Encontrará explicaciones más detalladas sobre la configuración en la documentación del puente SockJS de Vert.x.
El navegador debe utilizar la biblioteca JavaScript vertx-eventbus para consumir el mensaje:
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
Uso de transportes nativos
| Los transportes nativos no están soportados en los ejecutables nativos. |
Para utilizar io_uring, consulte la sección Uso de io_uring.
|
Vert.x es capaz de utilizar los transportes nativos de Netty, que ofrecen mejoras de rendimiento en plataformas específicas. Para habilitarlos, debe incluir la dependencia adecuada para su plataforma. Suele ser una buena idea tener ambas para mantener su aplicación agnóstica a la plataforma. Netty es lo suficientemente inteligente como para utilizar la correcta, es decir, ninguna en absoluto en plataformas no soportadas:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
También tendrá que configurar explícitamente Vert.x para que utilice el transporte nativo.
En application.properties añada:
quarkus.vertx.prefer-native-transport=true
O en application.yml:
quarkus:
vertx:
prefer-native-transport: true
Si todo va bien, Quarkus registrará:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx tiene habilitado el transporte nativo: true
Uso de un programador Vert.x sensible al contexto
Algunos operadores de Mutiny necesitan programar el trabajo en un grupo de hilos ejecutores.
Un buen ejemplo es .onItem().delayIt().by(Duration.ofMillis(10)) ya que necesita un ejecutor de este tipo para retrasar las emisiones.
El ejecutor por defecto es devuelto por io.smallrye.mutiny.infrastructure.Infrastructure y ya está configurado y gestionado por Quarkus.
Dicho esto, hay casos en los que es necesario asegurarse de que una operación se ejecuta en un contexto Vert.x (duplicado) y no en un hilo cualquiera.
La interfaz io.smallrye.mutiny.vertx.core.ContextAwareScheduler ofrece una API para obtener programadores conscientes del contexto.
Un programador de este tipo se configura con:
-
un delegado
ScheduledExecutorServicede su elección (pista: puede reutilizarInfrastructure.getDefaultWorkerPool()), y -
una estrategia de obtención de contexto entre:
-
un
Contextexplícito, o -
llamar a
Vertx::getOrCreateContext(), ya sea en el hilo actual o más tarde, cuando se produzca la solicitud de programación, o -
llamar a
Vertx::currentContext(), que falla si el hilo actual no es un hilo Vert.x.
-
He aquí un ejemplo en el que se utiliza ContextAwareScheduler:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
En este ejemplo se crea un programador capturando el contexto del bucle de eventos Vert.x que llama a asyncStart().
El operador delayIt utiliza ese programador, y podemos comprobar que el contexto que obtenemos en invoke es un contexto duplicado Vert.x en el que se han propagado los datos de la clave "foo".
Uso de un socket de dominio Unix
Escuchar en un socket de dominio Unix nos permite prescindir de la sobrecarga de TCP si la conexión al servicio Quarkus se establece desde el mismo host. Esto puede ocurrir si el acceso al servicio se realiza a través de un proxy, lo que suele ser el caso si está configurando una malla de servicios con un proxy como Envoy.
| Esto solo funcionará en plataformas que admitan Uso de transportes nativos. |
Habilite el Uso de transportes nativos correspondiente y establezca la siguiente propiedad de entorno:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
Por sí mismo esto no desactivará el socket TCP que por defecto se abrirá en
0.0.0.0:8080. Puede desactivarse explícitamente:
quarkus.http.host-enabled=false
Estas propiedades pueden establecerse a través del parámetro de línea de comandos -D de Java o
en application.properties.
| No olvide añadir la dependencia de transporte nativo. Consulte Uso de transportes nativos para más detalles. |
| Asegúrese de que su aplicación tiene los permisos adecuados para escribir en el socket. |
Uso de io_uring
io_uring no está soportado en los ejecutables nativos.
|
El soporte de io_uring es experimental
|
io_uring es una interfaz del núcleo de Linux que permite enviar y recibir datos de forma asíncrona.
Proporciona una semántica unificada tanto para la E/S de archivos como de red.
Se diseñó originalmente para dirigirse a dispositivos de bloques y archivos, pero desde entonces ha adquirido la capacidad de trabajar con elementos como los sockets de red.
Tiene el potencial de proporcionar modestos beneficios de rendimiento a la E/S de red por sí sola y mayores beneficios para cargas de trabajo de aplicaciones mixtas de E/S de archivo y de red.
Para saber más sobre io_uring, le recomendamos los siguientes enlaces:
-
Por qué debería utilizar io_uring para la E/S de red: El principal beneficio de io_uring para la E/S de red es una API asíncrona moderna que es fácil de usar y proporciona una semántica unificada para la E/S de archivos y de red. Un beneficio potencial de rendimiento de io_uring para la E/S de red es la reducción del número de syscalls. Esto podría proporcionar el mayor beneficio para grandes volúmenes de pequeñas operaciones en las que la sobrecarga de las llamadas al sistema puede ser significativa.
-
La revolución del backend y por qué io_uring es tan importante: La API io_uring utiliza dos búferes de anillo para la comunicación entre la aplicación y el núcleo (de ahí el nombre de la API) y está diseñada de tal forma que permite el procesamiento natural por lotes de las peticiones y las respuestas. Además, proporciona una forma de enviar múltiples peticiones en una sola llamada al sistema, lo que puede reducir la sobrecarga.
-
¿Qué es exactamente io_uring?: io_uring es una interfaz del núcleo de Linux que permite enviar y recibir datos de forma asíncrona de manera eficiente. Se diseñó originalmente para dirigirse a dispositivos de bloques y archivos, pero desde entonces ha adquirido la capacidad de trabajar con elementos como los sockets de red.
Para utilizar io_uring, necesita añadir dos dependencias a su proyecto y habilitar el transporte nativo.
En primer lugar, añada las siguientes dependencias a su proyecto:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
A continuación, en el application.properties, añada:
quarkus.vertx.prefer-native-transport=true
|
¿Puedo utilizar io_uring en mi máquina Linux?
Para comprobar si puede utilizar
Si imprime algo como lo anterior, puede utilizar |
|
Solución de problemas
El soporte de |
| Los sockets de dominio aún no están soportados con io_uring. |
| La API del sistema de archivos asíncrono Vert.x aún no utiliza io_uring. |
Desplegar en entornos de solo lectura
En entornos con sistemas de archivos de solo lectura puede recibir errores de la forma:
java.lang.IllegalStateException: Failed to create cache dir
Suponiendo que /tmp/ sea escribible, esto puede solucionarse estableciendo la propiedad vertx.cacheDirBase para que apunte a un directorio en /tmp/, por ejemplo en Kubernetes creando una variable de entorno JAVA_OPTS con el valor -Dvertx.cacheDirBase=/tmp/vertx, o estableciendo la propiedad quarkus.vertx.cache-directory en application.properties:
quarkus.vertx.cache-directory=/tmp/vertx
Personalizar la configuración de Vert.x
La configuración de la instancia Vert.x gestionada puede proporcionarse mediante el archivo application.properties, pero también mediante beans especiales.
Los beans CDI que exponen la interfaz io.quarkus.vertx.VertxOptionsCustomizer pueden utilizarse para personalizar la configuración de Vert.x.
Por ejemplo, el siguiente personalizador cambia el directorio base tmp:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
Los beans customizer reciben los VertxOptions (provenientes de la configuración de la aplicación) y pueden modificarlos.
Brotli4J y soporte multiplataforma
Brotli4J es una biblioteca nativa que proporciona soporte para el algoritmo de compresión Brotli. Por defecto, Quarkus incluye la biblioteca nativa Brotli correspondiente a la plataforma en la que se está ejecutando. Pero a veces, es necesario incluir la biblioteca nativa para una plataforma diferente.
En este caso, deberá añadir explícitamente una dependencia a su proyecto.
Por ejemplo, si necesita incluir la biblioteca nativa para linux-aarch64, puede añadir la siguiente dependencia:
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>native-linux-aarch64</artifactId>
</dependency>
Esto incluirá la biblioteca nativa para linux-aarch64 en su proyecto, además de la que corresponda a su máquina.
Aquí está la lista de artefactos brotli4j disponibles para las diferentes plataformas:
-
native-linux-x86_64 -
native-linux-s390x -
native-linux-ppc64le -
native-linux-aarch64 -
native-linux-armv7 -
native-linux-riscv64 -
native-windows-x86_64 -
native-windows-aarch64 -
native-macos-x86_64 -
native-macos-aarch64