Skip to content

Commit

Permalink
Fix broker usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Rubenicos committed Nov 11, 2024
1 parent fa2a01e commit 777c0f2
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ protected void onClose() {
this.bridge.unsubscribe();
} catch (Throwable ignored) { }
this.pool.destroy();
if (this.aliveTask != null) {
getExecutor().cancel(this.aliveTask);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected void onStart() {
} finally {
if (connection != null && this.source.isClosable()) {
try {
this.source.close();
connection.close();
} catch (SQLException e) {
getLogger().log(2, "Cannot close sql connection", e);
}
Expand Down Expand Up @@ -126,7 +126,7 @@ protected void onSend(@NotNull String channel, byte[] data) throws IOException {
} finally {
if (connection != null && this.source.isClosable()) {
try {
this.source.close();
connection.close();
} catch (SQLException e) {
getLogger().log(2, "Cannot close sql connection", e);
}
Expand Down Expand Up @@ -190,7 +190,7 @@ public void getMessages() {
} finally {
if (connection != null && this.source.isClosable()) {
try {
this.source.close();
connection.close();
} catch (SQLException e) {
getLogger().log(2, "Cannot close sql connection", e);
}
Expand Down Expand Up @@ -219,7 +219,7 @@ public void cleanMessages() {
} finally {
if (connection != null && this.source.isClosable()) {
try {
this.source.close();
connection.close();
} catch (SQLException e) {
getLogger().log(2, "Cannot close sql connection", e);
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/saicone/delivery4j/AbstractMessenger.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public abstract class AbstractMessenger {

private Executor executor = CompletableFuture.completedFuture(null).defaultExecutor();
private Broker<?> broker;
private Broker broker;
private final Map<String, MessageChannel> channels = new HashMap<>();

/**
Expand All @@ -43,7 +43,7 @@ public Executor getExecutor() {
* @return a delivery client or null.
*/
@Nullable
public Broker<?> getBroker() {
public Broker getBroker() {
return broker;
}

Expand All @@ -56,7 +56,7 @@ public void setExecutor(@NotNull Executor executor) {
this.executor = executor;
}

public void setBroker(@Nullable Broker<?> broker) {
public void setBroker(@Nullable Broker broker) {
this.broker = broker;
}

Expand All @@ -66,7 +66,7 @@ public void setBroker(@Nullable Broker<?> broker) {
* @return an usable delivery client.
*/
@NotNull
protected Broker<?> loadBroker() {
protected Broker loadBroker() {
if (getBroker() != null) {
return getBroker();
}
Expand All @@ -86,22 +86,22 @@ public void start() {
* @param broker the delivery client to use.
*/
@SuppressWarnings("unchecked")
public void start(@NotNull Broker<?> broker) {
public void start(@NotNull Broker broker) {
close();

if (this instanceof Executor) {
this.executor = (Executor) this;
}

broker.getSubscribedChannels().addAll(getChannels().keySet());
broker.consumer(this::accept);
broker.setConsumer(this::accept);
if (this instanceof ByteCodec) {
try {
broker.codec((ByteCodec<String>) this);
broker.setCodec((ByteCodec<String>) this);
} catch (Throwable ignored) { }
}
if (this instanceof DelayedExecutor) {
broker.executor((DelayedExecutor<?>) this);
broker.setExecutor((DelayedExecutor<?>) this);
}

this.broker = broker;
Expand Down
7 changes: 1 addition & 6 deletions src/test/java/com/saicone/delivery4j/broker/TestBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

import java.io.IOException;

public class TestBroker extends Broker<TestBroker> {

@Override
protected @NotNull TestBroker get() {
return this;
}
public class TestBroker extends Broker {

@Override
protected void onStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public TestMessenger() {
}

@Override
protected @NotNull Broker<?> loadBroker() {
protected @NotNull Broker loadBroker() {
return new TestBroker();
}

Expand Down

0 comments on commit 777c0f2

Please sign in to comment.