Skip to content

Commit

Permalink
Adding Derby for dev purposes plus formmating
Browse files Browse the repository at this point in the history
  • Loading branch information
enriquemolinari committed Dec 18, 2024
1 parent aaff654 commit 5f40713
Show file tree
Hide file tree
Showing 26 changed files with 1,039 additions and 984 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,18 @@ public class UserController {

## Requirements

JQueue currently supports PostgreSQL 9.5+ and MySQL 8.0+. To work properly, it uses the `select for update skip locked` which is a feature that some relational databases have incorporated few years ago. This feature eliminates any type of contention that might occure when queues are implemented using SQL.
JQueue currently supports PostgreSQL 9.5+ and MySQL 8.0+. To work properly, it uses the `select for update skip locked` which is a feature that some relational databases have incorporated few years ago. This feature eliminates any type of contention that might occur when queues are implemented using SQL.

JQueue requires the following table in your data store:

```sql
CREATE TABLE ar_cpfw_jqueue
(
id int NOT NULL auto_increment, --MySQL
-- id serial, --PostgreSQL
-- id serial, --PostgreSQL
-- id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), --Derby
channel varchar(100) NOT NULL,
data text NOT NULL,
data text NOT NULL, --Derby does not have text datatype, use CLOB
attempt int,
delay int,
pushed_at timestamp,
Expand Down
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.16.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbytools</artifactId>
<version>10.16.1.1</version>
</dependency>

<dependency>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-jdbc</artifactId>
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/ar/cpfw/jqueue/JQueueException.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package ar.cpfw.jqueue;

public class JQueueException extends RuntimeException {
private static final long serialVersionUID = 1L;

private static final long serialVersionUID = 1L;
public JQueueException(final String msg) {
super(msg);
}

public JQueueException(final String msg) {
super(msg);
}

public JQueueException(final Exception exception, final String msg) {
super(msg, exception);
}
public JQueueException(final Exception exception, final String msg) {
super(msg, exception);
}
}
30 changes: 15 additions & 15 deletions src/main/java/ar/cpfw/jqueue/push/JTxQueue.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
package ar.cpfw.jqueue.push;

import java.sql.Connection;
import javax.sql.DataSource;
import java.sql.Connection;

public interface JTxQueue {

void push(String data);
static JTxQueue queue(DataSource dataSource, String tableName) {
return new JdbcJQueue(dataSource, tableName);
}

JTxQueue channel(String channelName);
static JTxQueue queue(Connection conn, String tableName) {
return new JdbcJQueue(conn, tableName);
}

static JTxQueue queue(DataSource dataSource, String tableName) {
return new JdbcJQueue(dataSource, tableName);
}
static JTxQueue queue(DataSource dataSource) {
return new JdbcJQueue(dataSource, null);
}

static JTxQueue queue(Connection conn, String tableName) {
return new JdbcJQueue(conn, tableName);
}
static JTxQueue queue(Connection conn) {
return new JdbcJQueue(conn, null);
}

static JTxQueue queue(DataSource dataSource) {
return new JdbcJQueue(dataSource, null);
}
void push(String data);

static JTxQueue queue(Connection conn) {
return new JdbcJQueue(conn, null);
}
JTxQueue channel(String channelName);

}
103 changes: 52 additions & 51 deletions src/main/java/ar/cpfw/jqueue/push/JdbcJQueue.java
Original file line number Diff line number Diff line change
@@ -1,74 +1,75 @@
package ar.cpfw.jqueue.push;

import ar.cpfw.jqueue.JQueueException;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Objects;
import javax.sql.DataSource;
import ar.cpfw.jqueue.JQueueException;

class JdbcJQueue implements JTxQueue {
final class JdbcJQueue implements JTxQueue {

private static final int PUSHEDAT_COLUMN = 3;
private static final int DATA_COLUMN = 2;
private static final int CHANNEL_COLUMN = 1;
private static final String DS_IS_NECESARY =
"An instance of javax.sql.DataSource is necesary";
private Connection connection;
private String channel;
private static final String DEFAULT_CHANNEL = "default";
private static final String QUEUE_TABLE_NAME = "ar_cpfw_jqueue";
private final String databaseTableName;
private static final int PUSHEDAT_COLUMN = 3;
private static final int DATA_COLUMN = 2;
private static final int CHANNEL_COLUMN = 1;
private static final String DS_IS_NECESARY =
"An instance of javax.sql.DataSource is necesary";
private static final String DEFAULT_CHANNEL = "default";
private static final String QUEUE_TABLE_NAME = "ar_cpfw_jqueue";
private final Connection connection;
private final String databaseTableName;
private String channel;

public JdbcJQueue(final DataSource dataSource, final String tableName) {
Objects.requireNonNull(dataSource, DS_IS_NECESARY);
this.databaseTableName = tableName;
try {
this.connection = dataSource.getConnection();
} catch (SQLException e) {
throw new JQueueException(e,
"java.sql.Connection could not be obtained from the dataSource");
public JdbcJQueue(final DataSource dataSource, final String tableName) {
Objects.requireNonNull(dataSource, DS_IS_NECESARY);
this.databaseTableName = tableName;
try {
this.connection = dataSource.getConnection();
} catch (SQLException e) {
throw new JQueueException(e,
"java.sql.Connection could not be obtained from the dataSource");
}
}
}

public JdbcJQueue(final Connection conn, final String tableName) {
Objects.requireNonNull(conn, DS_IS_NECESARY);
public JdbcJQueue(final Connection conn, final String tableName) {
Objects.requireNonNull(conn, DS_IS_NECESARY);

this.connection = conn;
this.databaseTableName = tableName;
}
this.connection = conn;
this.databaseTableName = tableName;
}

@Override
public void push(final String data) {
Objects.requireNonNull(data, "data must not be null");
@Override
public void push(final String data) {
Objects.requireNonNull(data, "data must not be null");

final var channelName =
this.channel != null ? this.channel : DEFAULT_CHANNEL;
final var table = this.databaseTableName != null ? this.databaseTableName
: QUEUE_TABLE_NAME;
final var channelName =
this.channel != null ? this.channel : DEFAULT_CHANNEL;
final var table = this.databaseTableName != null ? this.databaseTableName
: QUEUE_TABLE_NAME;

try {
final PreparedStatement st =
this.connection.prepareStatement("insert into " + table
+ " (channel, data, attempt, delay, pushed_at) "
+ "values (?, ?, null, 0, ?)");
try {
final PreparedStatement st =
this.connection.prepareStatement("insert into " + table
+ " (channel, data, attempt, delay, pushed_at) "
+ "values (?, ?, null, 0, ?)");

st.setString(CHANNEL_COLUMN, channelName);
st.setString(DATA_COLUMN, data);
st.setTimestamp(PUSHEDAT_COLUMN, Timestamp.valueOf(LocalDateTime.now()));
st.executeUpdate();
st.setString(CHANNEL_COLUMN, channelName);
st.setString(DATA_COLUMN, data);
st.setTimestamp(PUSHEDAT_COLUMN, Timestamp.valueOf(LocalDateTime.now()));
st.executeUpdate();

} catch (SQLException e) {
throw new JQueueException(e, "push cannot be done");
} catch (SQLException e) {
throw new JQueueException(e, "push cannot be done");
}
}
}

@Override
public JTxQueue channel(final String channelName) {
this.channel = channelName;
return this;
}
@Override
public JTxQueue channel(final String channelName) {
this.channel = channelName;
return this;
}

}
25 changes: 25 additions & 0 deletions src/main/java/ar/cpfw/jqueue/runner/DerbyDbQueryBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ar.cpfw.jqueue.runner;

class DerbyDbQueryBuilder extends StandardQueryBuilder {

public DerbyDbQueryBuilder(final String tableName) {
super(tableName);
}

@Override
protected String calculateDate() {
return "{fn TIMESTAMPADD(SQL_TSI_MINUTE, -delay, ?)}";
}

@Override
protected String limitOne() {
return "FETCH FIRST 1 ROWS ONLY";
}

@Override
protected String lock() {
//as of version 10.16.1.1
// Derby does not support for update with an order by
return "";
}
}
30 changes: 15 additions & 15 deletions src/main/java/ar/cpfw/jqueue/runner/HsqlDbQueryBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

class HsqlDbQueryBuilder extends StandardQueryBuilder {

public HsqlDbQueryBuilder(final String tableName) {
super(tableName);
}
public HsqlDbQueryBuilder(final String tableName) {
super(tableName);
}

@Override
protected String calculateDate() {
return "(CAST (? as TIMESTAMP) - (INTERVAL '1' MINUTE * delay))";
}
@Override
protected String calculateDate() {
return "(CAST (? as TIMESTAMP) - (INTERVAL '1' MINUTE * delay))";
}

@Override
protected String limitOne() {
return "limit 1";
}
@Override
protected String limitOne() {
return "limit 1";
}

@Override
protected String lock() {
return "for update";
}
@Override
protected String lock() {
return "for update";
}
}
16 changes: 8 additions & 8 deletions src/main/java/ar/cpfw/jqueue/runner/JQueueRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import javax.sql.DataSource;

public interface JQueueRunner {
void executeAll(Job job);
static JQueueRunner runner(DataSource dataSource, String tableName) {
return new JdbcJQueueRunner(dataSource, tableName);
}

JQueueRunner channel(String channelName);
static JQueueRunner runner(DataSource dataSource) {
return new JdbcJQueueRunner(dataSource, null);
}

static JQueueRunner runner(DataSource dataSource, String tableName) {
return new JdbcJQueueRunner(dataSource, tableName);
}
void executeAll(Job job);

static JQueueRunner runner(DataSource dataSource) {
return new JdbcJQueueRunner(dataSource, null);
}
JQueueRunner channel(String channelName);
}
Loading

0 comments on commit 5f40713

Please sign in to comment.