Vert.x Worker Threads

Vert.x Worker Threads

In this tutorial, we are going to discuss about the Vert.x Worker Threads. As mentioned in the previous tutorial, Vertx provides a worker thread pool to execute blocking operations.

In Vert.x, worker threads are designed to handle blocking or long-running tasks that could otherwise block the main event loop. This allows the main event loop to remain responsive for handling events like I/O operations, HTTP requests, or timer events, while moving heavy or blocking tasks to a separate thread pool.

There are two ways to execute blocking operations, either by calling vertx.executeBlocking() method or by deploying a worker verticle. We will see how to work with both. Let’s get started.

First, we are creating a new package called worker and in there we are creating a new class called WorkerExample.

package com.ashok.vertx.vertx_starter.worker;

import io.vertx.core.ThreadingModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
 *
 * @author ashok.mariyala
 *
 */
public class WorkerExample extends AbstractVerticle {

  private static final Logger LOG = LoggerFactory.getLogger(WorkerExample.class);

  public static void main(String[] args) {
    var vertx = Vertx.vertx();
    vertx.deployVerticle(new WorkerExample());
  }

  @Override
  public void start(final Promise<Void> startPromise) throws Exception {
    startPromise.complete();
    vertx.executeBlocking(event -> {
      LOG.debug("Executing blocking code");
      try {
        Thread.sleep(5000);
        event.complete();
      } catch (InterruptedException e) {
        LOG.error("Failed: ", e);
        event.fail(e);
      }
    }, result -> {
      if (result.succeeded()) {
        LOG.debug("Blocking call done.");
      } else {
        LOG.error("Blocking call failed due to:", result.cause());
      }
    });
  }
}

This worker example has the same setup as we had before. We have a Java main method where we are creating a new vertx instance and then we deployed the WorkerExample verticle there and the WorkerExample extends from AbstractVerticle. Then we override the start method and completed the start promise using startPromise.complete() so the verticle is starting up.

When we have this, we can add a blocking operation in our start method. Blocking operations are in general everything that blocks a thread. For example file I/O heavy computations, waiting for network or a simple Thread.sleep(). For simplicity, I used a Thread.sleep() here, but we don’t add the Thread.sleep() normally. We are using vertx.executeBlocking() method. vertx.executeBlocking() method will schedule the code on a worker thread pool. So it’s fine to run blocking operations there.

The first parameter is an event handler, so we are adding event and then we can add our code. This code is now executed on the blocking thread with Log.debug executing blocking code and then we added Thread.sleep(5000) to block the thread.

Additionally, we defined a second parameter, which is also an asynchronous handler. When the blocking code finishes, the code should return back on the correct event loop. The second handler will exactly achieve that.

How do we tell the blocking code to return the result? In our first handler we have a variable called event and this is actually an asynchronous callback that can be completed or can fail when an exception occurs. We used event.fail() with the exception as message and otherwise just complete it using event.complete(). After success or failure of this asynchronous handler, the code will continue to execute in the result handler. In the result handler, we can check what the result.

The application is now starting up and we see our first log output.

4:21:59 pm: Executing ':WorkerExample.main()'...

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes

> Task :WorkerExample.main()
[vert.x-worker-thread-0] DEBUG com.ashok.vertx.vertx_starter.worker.WorkerExample - Executing blocking code
[vert.x-eventloop-thread-0] DEBUG com.ashok.vertx.vertx_starter.worker.WorkerExample - Blocking call done.

First log output has the message execute blocking code and is executed on the vertx worker thread 0. The second message we see is blocking call done and we see that this one is on the vertx event loop thread 0.

Let’s go back to the code again to see how this works. So the first part in the executeBlocking is a handler which is executed on the worker thread 0. The second part the result is executed on the event loop thread 0. That means if you have some code in your verticle that is blocking the event loop, make sure to use vertx.executeBlocking() to schedule this blocking call on the worker thread. With that, you guarantee that the event loop can work as expected.

It is possible that the code on the worker threads fail and return with an error. So make sure to handle this error cases. We have seen the first possibility to execute blocking code and there is another one we can define a verticle as worker verticle and all code inside a verticle will be executed on a worker thread.

For this, we are going to create a new class called WorkerVerticle, which extends from AbstractVerticle.

package com.ashok.vertx.vertx_starter.worker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

/**
 *
 * @author ashok.mariyala
 *
 */
public class WorkerVerticle extends AbstractVerticle {

  private static final Logger LOG = LoggerFactory.getLogger(WorkerVerticle.class);

  @Override
  public void start(final Promise<Void> startPromise) throws Exception {
    LOG.debug("Deployed as worker verticle.");
    startPromise.complete();
    Thread.sleep(5000);
    LOG.debug("Blocking operation done.");
  }
}

The code of the WorkerVerticle is not really different to a normal verticle. The only difference is how it is deployed in the worker verticle. Now we are deploying this verticle in the WorkerExample verticle

package com.ashok.vertx.vertx_starter.worker;

import io.vertx.core.ThreadingModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;

/**
 *
 * @author ashok.mariyala
 *
 */
public class WorkerExample extends AbstractVerticle {

  private static final Logger LOG = LoggerFactory.getLogger(WorkerExample.class);

  public static void main(String[] args) {
    var vertx = Vertx.vertx();
    vertx.deployVerticle(new WorkerExample());
  }

  @Override
  public void start(final Promise<Void> startPromise) throws Exception {
    vertx.deployVerticle(new WorkerVerticle(),
      new DeploymentOptions()
        .setThreadingModel(ThreadingModel.WORKER)
        .setWorkerPoolSize(1)
        .setWorkerPoolName("my-worker-verticle")
    );
    startPromise.complete();
    executeBlockingCode();
  }

  private void executeBlockingCode() {
    vertx.executeBlocking(event -> {
      LOG.debug("Executing blocking code");
      try {
        Thread.sleep(5000);
        event.complete();
      } catch (InterruptedException e) {
        LOG.error("Failed: ", e);
        event.fail(e);
      }
    }, result -> {
      if (result.succeeded()) {
        LOG.debug("Blocking call done.");
      } else {
        LOG.error("Blocking call failed due to:", result.cause());
      }
    });
  }
}

There is the possibility to set the worker pool size and also to name the worker pool name. With that, we could identify our worker pools. If nothing is defined, Vertx will deploy this verticle on the standard worker pool.

4:54:01 pm: Executing ':WorkerExample.main()'...


> Task :compileJava

> Task :processResources UP-TO-DATE
> Task :classes

> Task :WorkerExample.main()
[my-worker-verticle-0] DEBUG com.ashok.vertx.vertx_starter.worker.WorkerVerticle - Deployed as worker verticle.
[vert.x-worker-thread-0] DEBUG com.ashok.vertx.vertx_starter.worker.WorkerExample - Executing blocking code
[my-worker-verticle-0] DEBUG com.ashok.vertx.vertx_starter.worker.WorkerVerticle - Blocking operation done.
[vert.x-eventloop-thread-0] DEBUG com.ashok.vertx.vertx_starter.worker.WorkerExample - Blocking call done.

And then we should see that the WorkerVerticle is executed on the custom thread pool.

If you encounter a lot of block thread tracker warnings in your application, remember to schedule your code on a blocking thread to avoid blocking the event loop.

Key Differences Between Event Loop Threads and Vert.x Worker Threads
Vert.x Worker Threads

That’s all about the Vert.x Worker Threads with example. If you have any queries or feedback, please write us email at contact@waytoeasylearn.com. Enjoy learning, Enjoy Vert.x tutorials..!!

Vert.x Worker Threads
Scroll to top