Skip to content

Java library facade to send messages using mutliple data delivery concepts

License

Notifications You must be signed in to change notification settings

saicone/delivery4j

Repository files navigation

Delivery4j

Java library facade for multiple data delivery concepts.

There are multiple ways to transfer data between Java applications, this library offers an easy way to connect with them using common methods.

Currently supporting the brokers:

  • ActiveMQ using topic producers and consumers.
  • Kafka using empty-key records to producers.
  • NATS using subject subscription.
  • PostgreSQL using LISTEN and NOTIFY statement.
  • RabbitMQ using queue and consumer via exchange.
  • Redis using publish and subscribe (also compatible with KeyDB).
  • SQL polling (not a real broker, but can be used as one).
  • Valkey using a copy-paste from redis broker (since valkey java client is a jedis fork).

PostgreSQL and SQL are also compatible with Hikari.

Dependency

Delivery4j contains the following artifacts:

  • delivery4j - The main project.
  • broker-activemq - ActiveMQ broker.
  • broker-kafka - Kafka broker.
  • broker-nats - NATS broker.
  • broker-postgresql - PostgreSQL broker using plain Java connections.
  • broker-postgresql-hikari - PostgreSQL broker using Hikari library.
  • broker-rabbitmq - RabbitMQ broker.
  • broker-redis - Redis broker.
  • broker-sql - SQL broker using plain Java connections.
  • broker-sql-hikari - SQL broker using Hikari library.
  • broker-valkey - Valkey broker.
  • extension-caffeine - Extension to detect and use Caffeine cache on MessageChannel.
  • extension-guava - Extension to detect and use Guava cache on MessageChannel.
  • extension-log4j - Extension to detect and use log4j logger on Broker instance.
  • extension-slf4j - Extension to detect and use slf4j logger on Broker instance.
build.gradle
repositories {
    maven { url 'https://jitpack.io' }
}

dependencies {
    implementation 'com.saicone.delivery4j:delivery4j:VERSION'
}
build.gradle.kts
repositories {
    maven("https://jitpack.io")
}

dependencies {
    implementation("com.saicone.delivery4j:delivery4j:VERSION")
}
pom.xml
<repositories>
    <repository>
        <id>Jitpack</id>
        <url>https://jitpack.io</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>com.saicone.delivery4j</groupId>
        <artifactId>delivery4j</artifactId>
        <version>VERSION</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

Usage

How to use Delivery4j library.

Broker

Using brokers is pretty simple, you just need to create a broker instance (depending on the implementation) and set a consumer.

Broker broker = // Create instance from any implementation

// Subscribe to channels
broker.subscribe("hello:world", "myChannel1");

broker.setConsumer((channel, data) -> {
    // do something
});

// Start connection
broker.start();

// Send data
byte[] data = ...;

broker.send("myChannel1", data);

Some brokers require to convert bytes to String and viceversa, Base64 is used by default.

Broker broker = // Create instance from any implementation

broker.setCodec(new ByteCodec<>() {
    @Override
    public @NotNull String encode(byte[] src) {
        // convert bytes to String
    }

    @Override
    public byte[] decode(@NotNull String src) {
        // convert String to bytes
    }
});

Some brokers have blocking operations or repetitive tasks, it's suggested to implement your own executor.

Broker broker = // Create instance from any implementation

broker.setExecutor(new DelayedExecutor<MyTaskObject>() {
    @Override
    public @NotNull MyTaskObject execute(@NotNull Runnable command) {
        // run task and return itself
    }

    @Override
    public @NotNull MyTaskObject execute(@NotNull Runnable command, long delay, @NotNull TimeUnit unit) {
        // run delayed task and return itself
    }

    @Override
    public @NotNull MyTaskObject execute(@NotNull Runnable command, long delay, long period, @NotNull TimeUnit unit) {
        // run repetitive task and return itself
    }

    @Override
    public void cancel(@NotNull MyTaskObject unused) {
        // cancel task
    }
});

And also a logging instance to log information about connection and exceptions, by default it use the best available implementation.

It uses a number terminology for logging levels:

  1. Error
  2. Warning
  3. Information
  4. Debug
Broker broker = // Create instance from any implementation

broker.setLogger(new Broker.Logger() {
    @Override
    public void log(int level, @NotNull String msg) {
        // log raw message
    }

    @Override
    public void log(int level, @NotNull String msg, @NotNull Throwable throwable) {
        // log raw message with throwable
    }
});

Messenger

Probably the reason why you are here, it's a simple usage of brokers to send and receive multi-line String messages.

First you need to extend AbstractMessenger and provide a broker.

public class Messenger extends AbstractMessenger {
    @Override
    protected Broker loadBroker() {
        // Create instance from any implementation
    }
}

And then use the Messenger.

Messenger messenger = new Messenger();

// Start connection
messenger.start();

// Send multi-line message to channel
messeger.send("myChannel1", "Hello", "World");

// Subscribe to channel
messenger.subscribe("myChannel1").consume((channel, lines) -> {
    // do something
});

The subscribed message channels can have a cache instance to avoid receive outbound messages, by default it use the best available implementation.

Messenger messenger = new Messenger();

// Subscribe to channel
MessageChannel channel = messenger.subscribe("myChannel1").consume((channel, lines) -> {
    // do something
});

// Cache message IDs
channel.cache(true);

// Cache with provided expiration
channel.cache(20, TimeUnit.SECONDS);

And also can have an end-to-end encryption.

Messenger messenger = new Messenger();

// Subscribe to channel
MessageChannel channel = messenger.subscribe("myChannel1").consume((channel, lines) -> {
    // do something
});

// Your key
SecretKey key = ...;

channel.encryptor(Encryptor.of(key));