Event Bus Request Response Messaging

Event Bus Request Response Messaging

In this tutorial, we are going to discuss about the Event Bus Request Response Messaging. The first possibility on how to communicate over the event bus is request response messaging.

In Vert.x, event bus request response messaging on the Event Bus allows a sender to send a message and wait for a reply from the recipient. This is useful when you need to perform asynchronous communication between verticles or services but still expect a response from the message handler.

In Vert.x, this is achieved using the send() method along with a reply mechanism. The sender sends a message to a specific address and the receiver, upon processing the message, can reply back to the sender using the reply() method.

Event Bus Request Response Messaging

Now we are going to create a new class called RequestResponseExample with the Java main class where we are creating a vertx object and in there we are deploying two verticles. The first verticle has the name RequestVerticle and the second one ResponseVerticle. Both must extend from abstract vertical.

To keep everything visible in one place. I’m adding these two verticles as a static class in the same file. Of course it is possible to keep them in separate files and this would be also the preferred way. So I will put all of them in one class.

package com.ashok.vertx.vertx_starter.eventbus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
 *
 * @author ashok.mariyala
 *
 */
public class RequestResponseExample {

  public static void main(String[] args) {
    var vertx = Vertx.vertx();
    vertx.deployVerticle(new RequestVerticle());
    vertx.deployVerticle(new ResponseVerticle());
  }

  public static class RequestVerticle extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(RequestVerticle.class);
    static final String ADDRESS = "my.request.address";

    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      var eventBus = vertx.eventBus();
      final String message = "Hello World!";
      LOG.debug("Sending: {}", message);
      eventBus.<String>request(ADDRESS, message, reply -> {
        LOG.debug("Response: {}", reply.result());
      });
    }
  }

  public static class ResponseVerticle extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseVerticle.class);

    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      startPromise.complete();
    }
  }
}

In the main method we deployed both RequestVerticle and ResponseVerticle using vertx.deployVerticle() method. Next we override the start method of each vertical and call start promised complete to signal that the start up was completed. Now we did our first interaction with the Eventbus.

There is only one event bus available per vertx instance and we can obtain it by calling vertx.eventBus(). We used the event bus to send a request by calling eventBus.request() and the first parameter is an event bus address. Vertx is not defining an event bus address schema, so it is completely up to you how to define an event bus address so you can use any string you like. Personally, I established a best practice to always use the class name of the sender. With that, you will get the unique event bus address and it is easier to follow where the message is coming from to showcase that all addresses are allowed. For this example, we used a custom address with the name my dot request dot address.

The second parameter is the message we want to send over the event bus. We are going to send a simple string with the content Hello world!. It is possible to send strings, vertx objects or also custom objects over the event bus. We will see how this works in later tutorials. For now, we focus on the different ways how to interact with the event bus.

The third parameter is a callback that will be executed when the response arrives. To see when the response arrives, we used a logger to print the message. With that, we can start our application.

7:37:18 am: Executing ':RequestResponseExample.main()'...

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :RequestResponseExample.main()
[vert.x-eventloop-thread-0] DEBUG com.ashok.vertx.vertx_starter.eventbus.RequestResponseExample$RequestVerticle - Sending: Hello World!
[vert.x-eventloop-thread-0] DEBUG com.ashok.vertx.vertx_starter.eventbus.RequestResponseExample$RequestVerticle - Response: null

Notice that our response verticle is not consuming this message. So we see a response of null currently as no one is listening to this address. We need another verticle that is consuming the message and returns a response that is not null. In our example, the ResponseVerticle will do this. We can call vertx.eventBus().consumer() method to register a consumer.

package com.ashok.vertx.vertx_starter.eventbus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
 *
 * @author ashok.mariyala
 *
 */
public class RequestResponseExample {

  public static void main(String[] args) {
    var vertx = Vertx.vertx();
    vertx.deployVerticle(new RequestVerticle());
    vertx.deployVerticle(new ResponseVerticle());
  }

  public static class RequestVerticle extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(RequestVerticle.class);
    static final String ADDRESS = "my.request.address";

    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      var eventBus = vertx.eventBus();
      final String message = "Hello World!";
      LOG.debug("Sending: {}", message);
      eventBus.<String>request(ADDRESS, message, reply -> {
        LOG.debug("Response: {}", reply.result().body());
      });
    }
  }

  public static class ResponseVerticle extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseVerticle.class);

    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.eventBus().<String>consumer(RequestVerticle.ADDRESS, message -> {
        LOG.debug("Received Message: {}", message.body());
        message.reply("Received your message. Thanks!");
      });
    }
  }
}

In the vertx.eventBus().consumer() method, The first parameter is the address and the second parameter we have a handler which receives the message. Additionally, we called message.reply() method to act on the message and send it back on the same address. Again, we returned a simple string or also a vertex object or a custom object. In this example, we are returning a string with the content Received your message. Thanks! as a second parameter. It would be possible to define some delivery options.

Okay, let’s run our application and see what happens.

7:48:35 am: Executing ':RequestResponseExample.main()'...

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :RequestResponseExample.main()
[vert.x-eventloop-thread-0] DEBUG com.ashok.vertx.vertx_starter.eventbus.RequestResponseExample$RequestVerticle - Sending: Hello World!
[vert.x-eventloop-thread-1] DEBUG com.ashok.vertx.vertx_starter.eventbus.RequestResponseExample$ResponseVerticle - Received Message: Hello World!
[vert.x-eventloop-thread-0] DEBUG com.ashok.vertx.vertx_starter.eventbus.RequestResponseExample$RequestVerticle - Response: Received your message. Thanks!

We should see three messages. The first message is from the RequestVertical Sending: Hello World!. The second one is received message in the ResponseVerticle. And the third message is response in the RequestVertical again.

Key Concepts in Event Bus Request Response Messaging

1. Sender

  • The sender sends a message to a specific address and waits for a reply.
  • The send() method is used to send the message and attach a handler that will process the reply.

2. Receiver

  • The receiver listens for messages on a specific address. When it receives a message, it processes the message and sends a reply back to the sender using reply().
How Event Bus Request Response Messaging
  • The sender verticle sends a message to the service-address.
  • The receiver verticle has a consumer listening on service-address and processes the message.
  • After processing, the receiver sends a reply to the sender using message.reply().
  • The sender‘s send() method includes a reply handler that is triggered once the reply is received.
Summary
  • Event Bus Request Response Messaging in the Vert.x Event Bus allows sending messages and waiting for a reply asynchronously.
  • You can use the send() method to initiate a request and reply() to respond to the message.
  • It supports handling simple types, JSON objects, or even complex custom objects.
  • You can manage failures and timeouts to make the communication more robust.
  • It works both locally and in clustered environments for distributed messaging.
  • In a clustered environment, this pattern allows for distributed communication between multiple Vert.x instances.

That’s all about the Event Bus Request Response Messaging. If you have any queries or feedback, please write us email at contact@waytoeasylearn.com. Enjoy learning, Enjoy Vert.x tutorials..!!

Event Bus Request Response Messaging
Scroll to top