Event Bus Publish Subscribe Messaging

Event Bus Publish Subscribe Messaging

In this tutorial, we are going to discuss about the Event Bus Publish Subscribe Messaging. The third possibility of communicating over the event bus is Publish Subscribe communication.

Event Bus Publish Subscribe Messaging in the Vert.x allows messages to be broadcast to multiple consumers (verticles) that are subscribed to a specific address. Unlike point-to-point messaging, where a message is processed by only one consumer, in publish/subscribe messaging, the message is delivered to all consumers listening on the given address.

Event Bus Publish Subscribe Messaging

The publish() method is used for broadcasting a message to all handlers (or consumers) registered on a particular address.

One verticle is publishing a message that can be received by multiple consumers at the same time. You could see it as a broadcast message.

For this example, we are creating a new class with the name PublishSubscribeExample, and we add the Java main method to create the Vertx instance. After that, we will create one Publish verticle and multiple Subscribe verticles.

package com.ashok.vertx.vertx_starter.eventbus;

import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
 *
 * @author ashok.mariyala
 *
 */
public class PublishSubscribeExample {

  public static void main(String[] args) {
    var vertx = Vertx.vertx();
    vertx.deployVerticle(new Publish());
    vertx.deployVerticle(new Subscriber1());
    vertx.deployVerticle(Subscriber2.class.getName(), new DeploymentOptions().setInstances(2));
  }

  public static class Publish extends AbstractVerticle {
    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      startPromise.complete();
      vertx.setPeriodic(Duration.ofSeconds(10).toMillis(), id ->
        vertx.eventBus().publish(Publish.class.getName(), "A message for everyone!")
      );
    }
  }

  public static class Subscriber1 extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(Subscriber1.class);

    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      vertx.eventBus().<String>consumer(Publish.class.getName(), message -> {
        LOG.debug("Received: {}", message.body());
      });
      startPromise.complete();
    }
  }

  public static class Subscriber2 extends AbstractVerticle {

    private static final Logger LOG = LoggerFactory.getLogger(Subscriber2.class);

    @Override
    public void start(final Promise<Void> startPromise) throws Exception {
      vertx.eventBus().<String>consumer(Publish.class.getName(), message -> {
        LOG.debug("Received: {}", message.body());
      });
      startPromise.complete();
    }
  }
}

As usual we have access to the Eventbus instance to publish messages. But here we used vertx.eventBus().publish() method to publish a message to all subscribers of this address. Here also the first parameter is the address where we are publishing. The second parameter is the content of the message with the value a message for everyone. We now have a publisher, but we don’t have subscribers to see if it works. So we deployed multiple subscriber verticles to subscribe the messages. To subscribe the messages in subscribers verticles we used same vertx.eventBus().consumer() method.

Now lets run our code and verify the result

3:25:33 pm: Executing ':PublishSubscribeExample.main()'...

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :PublishSubscribeExample.main()
[vert.x-eventloop-thread-3] DEBUG com.ashok.vertx.vertx_starter.eventbus.PublishSubscribeExample$Subscriber2 - Received: A message for everyone!
[vert.x-eventloop-thread-1] DEBUG com.ashok.vertx.vertx_starter.eventbus.PublishSubscribeExample$Subscriber1 - Received: A message for everyone!
[vert.x-eventloop-thread-4] DEBUG com.ashok.vertx.vertx_starter.eventbus.PublishSubscribeExample$Subscriber2 - Received: A message for everyone!
[vert.x-eventloop-thread-4] DEBUG com.ashok.vertx.vertx_starter.eventbus.PublishSubscribeExample$Subscriber2 - Received: A message for everyone!
[vert.x-eventloop-thread-1] DEBUG com.ashok.vertx.vertx_starter.eventbus.PublishSubscribeExample$Subscriber1 - Received: A message for everyone!
[vert.x-eventloop-thread-3] DEBUG com.ashok.vertx.vertx_starter.eventbus.PublishSubscribeExample$Subscriber2 - Received: A message for everyone!

If you see the above output for every 10 seconds all the the subscriber verticles received the messages. So it is possible to distribute messages in a thread safe way to multiple verticles at the same time. You don’t have to worry about concurrency issues as Vertx will take care of it.

Key Concepts of Event Bus Publish Subscribe Messaging

1. Broadcasting Messages

  • Messages sent with publish() are delivered to all consumers registered on the given address.

2. Multiple Consumers

  • There can be any number of consumers subscribed to an address, and all of them will receive the published message.

3. Asynchronous Communication

  • Like other forms of Vert.x messaging, communication is non-blocking and asynchronous, which allows the system to handle a high volume of messages.

4. Fire-and-Forget

  • When using publish/subscribe, the sender does not expect a reply from the consumers, as the message is broadcast to all of them.
Key Differences Between Publish/Subscribe and Point-to-Point
  • Point-to-Point (send()): Only one consumer receives each message, and Vert.x uses a round-robin mechanism if multiple consumers are subscribed to the same address.
  • Publish/Subscribe (publish()): All consumers receive the same message.
Use Case Scenarios for Publish/Subscribe
  • Notification Systems: When you want to broadcast notifications (e.g., news updates, status alerts) to multiple clients.
  • Logging: Sending log data or metrics to multiple listeners.
  • Monitoring: When various components of the system need to be aware of state changes or events that occur.
Clustered Event Bus Publish Subscribe Messaging

In a clustered Vert.x environment, the publish/subscribe model works across multiple nodes. This means that messages published from one instance of Vert.x can be consumed by other instances running on different machines or JVMs.

Enabling Clustering in Vert.x

To enable clustering, you can configure the Vert.x instance with clustering options, such as using Hazelcast or Zookeeper as the clustering backend.

VertxOptions options = new VertxOptions().setClustered(true);

Vertx.clusteredVertx(options, res -> {
    if (res.succeeded()) {
        Vertx vertx = res.result();
        // Now use the event bus in cluster mode
        vertx.eventBus().publish("clustered-news-feed", "News from the cluster");
    }
});

When the event bus is in cluster mode, messages published on the event bus are automatically shared across the entire cluster, allowing for distributed messaging in large-scale systems.

Example of Clustering

  1. You have two Vert.x instances running on different machines.
  2. A publisher verticle on one machine publishes a message to the event bus.
  3. Consumers on both machines will receive that message, as they are listening on the same address in the cluster.
Error Handling in Event Bus Publish Subscribe Messaging

Since publish() is a fire-and-forget method, there’s no direct reply or error mechanism. However, if you need acknowledgment or error handling, you may consider using the request/response pattern in combination with multiple consumers.

Summary
  • Event Bus Publish Subscribe messaging in Vert.x allows messages to be broadcast to multiple consumers listening on the same address.
  • The publish() method broadcasts messages, while send() is used for point-to-point communication.
  • Consumers receive the same message, making this model ideal for notifications, updates, or events that need to be distributed to multiple parts of the system.
  • Vert.x supports publish/subscribe across clusters, making it suitable for distributed systems.

That’s all about the Event Bus Publish Subscribe Messaging. If you have any queries or feedback, please write us email at contact@waytoeasylearn.com. Enjoy learning, Enjoy Vert.x tutorials..!!

Event Bus Publish Subscribe Messaging
Scroll to top