Skip to content

[Detailed Tutorial] Using QBit's Event Bus System (The Employee example)

fadihub edited this page Jun 8, 2015 · 3 revisions

##overview

QBit has an event bus. The advantage of Using the event bus with QBit services is that the events come into the same queue that handles the method calls so that the events method calls are thread safe. Everything comes in on the same thread, events and methods. The event bus is very fast, expect speeds up to 10M to 100M messages per second. Also the event bus is a great way to include additional services without disrupting existing services. With QBit's event bus you can send objects, strongly typed objects, JSON, Maps, etc.

You can wire QBit Services into many event bus systems. This makes it easy to wire a service to listen to events coming from Kafka, RabbitMQ or something else.

Using QBit's Event Bus System (The Employee example)

This wiki will walk you through a simple "employee example" to demonstrate to you how to use QBit's event bus system.

What you will build

You will build a simple "employee example" that includes four services; each service will handle the following situations: when a new employees is hired, add the employee to the payroll system, enroll the employee into the benefits system, and invite them to our community outreach program.

In this example the first service will not know about the other services. And we can add more services in the future which can listen to events and participate in the new employee being hired. This will be great example to demonstrate to you how to use QBit's event bus system. When you run this example you will get the following:

Hired employee Employee{firstName='Rick', employeeId=1}
Employee added to payroll  Rick 1 100
Employee enrolled into benefits system employee Rick 1
Employee will be invited to the community outreach program Rick 1

How to complete this guide

In order to complete this example successfully you will need the following installed on your machine:

Now that your machine is all ready let's get started:

https://github.com/fadihub/event-bus-system-qbit.git

Once this is done you can test the service, let's first explain the process:

As mentioned before this example has four services: EmployeeHiringService, BenefitsService, VolunteerService, PayrollService. These services are all inproc services. QBit supports WebSocket, HTTP and REST remote services as well, but for now, let's focus on inproc services. If you understand inproc then you will understand remote. All four services are doing some kind of work to an employee that looks like this:

  public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

Here are the getters for this employee object:

public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }

This example has two channels:

public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

The first channel NEW_HIRE_CHANNEL is where we send new employee objects when they are hired. A whole slew of services could be listening to this channel, in this example we have the following two services listing on this channel BenefitsService, and VolunteerService.

The EmployeeHiringService actually fires off the events to the services:

public static class EmployeeHiringService {


        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee

            //Sends events
            final EventManager eventManager = serviceContext().eventManager();
            eventManager.send(NEW_HIRE_CHANNEL, employee);
            eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);


        }

When working inside of a QBit Service, you can access the event manager using serviceContext().eventManager(). If you access it this way, the flushing is taken care of for you. Flushing messages to other services in batches helps with the performance. You have to flush after you use a client proxy. The eventManager() method returns a client proxy. When running inside of QBit, you do not have to flush, it is done for you at the time when/where you will get the most performance out of the system. This is what allows the event manager to send so many messages in such a short period of time. Not only send the messages but enqueue them on other service queues.

Notice that we call sendArray so we can send the employee and their salary. The listener for PAYROLL_ADJUSTMENT_CHANNEL (which is PayrollService) will have to handle both an employee and an int that represents the new employees salary. Here are the other three services that are listening:

public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }
public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }
public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

The OnEvent annotation is an alternative to @Listen. We also have @Consume, @Subscribe, @Hear. You do not have to use our annotation. So if you wanted to write a service that was not tied to QBit at all, i.e., no compile time dependencies, then you would just define your own annotation called OnEvent, or Listen, or Consume or Subscribe or Hear. We believe in no compile time dependencies for your services. And no class-loader discovery magic that would tie you to Boon or QBit. There is an API. Your implementation can be divorced from QBit as much as possible.

@Target({ ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface OnEvent {


    /* The channel you want to listen to. */;
    String value() default "";

    /* The consume is the last object listening to this event.
       An event channel can have many subscribers but only one consume.
     */
    boolean consume() default false;


}

So now we have all the services, remember The employee is the employee object from the EmployeeHiringService. Therefore To start things off, we need to get a client proxy to the EmployeeHiringService using the employeeHiringServiceQueue. But first lets wire all the four services into the QBit queuing apparatus. Here the wiring process:

Create the POJOs:

EmployeeHiringService employeeHiring = new EmployeeHiringService();
PayrollService payroll = new PayrollService();
BenefitsService benefits = new BenefitsService();
VolunteerService volunteering = new VolunteerService();

Wire in EmployeeHiringService, BenefitsService, VolunteerService, PayrollService into QBit.

ServiceQueue employeeHiringServiceQueue = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build().startServiceQueue();


        ServiceQueue payrollServiceQueue = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build().startServiceQueue();


        ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build().startServiceQueue();


        ServiceQueue volunteeringServiceQueue = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build().startServiceQueue();

The objects employeeHiringServiceQueue, payrollServiceQueue, employeeBenefitsServiceQueue, and volunteeringServiceQueue are QBit services.

To invoke a method on a QBit service, you want to get a client proxy. A client proxy will send messages to a service. The service will get those messages as method calls.

Every call is sent over a high-speed internal inproc queue. You can also use a client proxy to talk to QBit over WebSockets.

To create a proxy you use the createProxy method of Service:

EmployeeHiringServiceClient employeeHiringServiceClientProxy = employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);

Now that we have created the proxy, we can send messages to it.

employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

Every so often, we have to flush calls to the client proxy.

The client proxy will flush calls every time the queue batch size is met. So if the queue batch size was set to 5, then it would flush every five calls. But no matter, when you are done making calls, you should flush the calls as follows:

flushServiceProxy(employeeHiringServiceClientProxy);

If you were making calls to a service in a tight loop, you may want to flush every ten calls or every 100 calls. Or you may want to flush related calls.

If you set the batch size to 1, then every method calls is flushed, but this hampers performance.

If you use the event manager service, it will get auto flushed for you but in an extremely performant way. We may provide similar support for injected client proxies into a service.

Now we are all done. Note we can add services at any time to this example by following the procedure mentioned before: create your service, create a POJO, wire it into QBit, and make it listen to the new hire event channel or whatever you decide to do. that is it. very simple.

##Full code Listing

EmployeeEventExampleUsingSystemEventBus.java Listing

src/main/java/io.advantageous.qbit.example/EmployeeEventExampleUsingSystemEventBus

/*
 * Copyright (c) 2015. Rick Hightower, Geoff Chandler
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
 */

package io.advantageous.qbit.example;

import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.boon.core.Sys;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/4/15.
 */
public class EmployeeEventExampleUsingSystemEventBus {

    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public static void main(String... args) {


        EmployeeHiringService employeeHiring = new EmployeeHiringService();
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();


        ServiceQueue employeeHiringServiceQueue = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build().startServiceQueue();


        ServiceQueue payrollServiceQueue = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build().startServiceQueue();


        ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build().startServiceQueue();


        ServiceQueue volunteeringServiceQueue = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build().startServiceQueue();

        EmployeeHiringServiceClient employeeHiringServiceClientProxy = employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);

        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    public static class EmployeeHiringService {


        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee

            //Sends events
            final EventManager eventManager = serviceContext().eventManager();
            eventManager.send(NEW_HIRE_CHANNEL, employee);
            eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);


        }

    }

    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }
}

Test The Service

With your terminal cd event-bus-system-qbit

then gradle clean build then gradle run and you should get the following:

Hired employee Employee{firstName='Rick', employeeId=1}
Employee added to payroll  Rick 1 100
Employee enrolled into benefits system employee Rick 1
Employee will be invited to the community outreach program Rick 1

Summary

QBit has Java objects that might not know anything about the other services running in the same JVM but still be able to communicate via the event bus. We can keep adding stuff that keep listening.

You can have more than one event bus btw. An event bus is just another QBit service. It takes three lines of code to create a high-speed event bus. So each module could have its own event bus, and then use the system event bus to send messages between modules. You have built and tested "The Employee example" to learn about QBit's event bus system, see you in the next tutorial!

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally