Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support Mariadb events, such as GTID and annotate rows event(sql) #45

Merged
merged 3 commits into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ kick off from a specific filename or position, use `client.setBinlogFilename(fil
> `client.connect()` is blocking (meaning that client will listen for events in the current thread).
`client.connect(timeout)`, on the other hand, spawns a separate thread.

> Note difference between MariaDB and MySQL
```
BinaryLogClient client = new MariadbBinaryLogClient("hostname", 3306, "username", "password");
// ... as same as BinaryLogClient
```
> `client.setGtidSet(gtid)` meaning that client kick off from a specific gtid, MariaDB also support.
> `client.setUseSendAnnotateRowsEvent(true)` meaning that client will send annotate rows events(describe the query which caused the row event), and 'false' by default

#### Controlling event deserialization

> You might need it for several reasons:
Expand Down
92 changes: 62 additions & 30 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidListEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
Expand All @@ -46,9 +49,6 @@
import com.github.shyiko.mysql.binlog.network.protocol.Packet;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateNativePasswordCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSHA2Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateSecurityPasswordCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
Expand Down Expand Up @@ -135,8 +135,8 @@ public X509Certificate[] getAcceptedIssuers() {
private volatile long connectionId;
private SSLMode sslMode = SSLMode.DISABLED;

private GtidSet gtidSet;
private final Object gtidSetAccessLock = new Object();
protected GtidSet gtidSet;
protected final Object gtidSetAccessLock = new Object();
private boolean gtidSetFallbackToPurged;
private boolean useBinlogFilenamePositionInGtidMode;
private String gtid;
Expand All @@ -150,7 +150,7 @@ public X509Certificate[] getAcceptedIssuers() {
private SocketFactory socketFactory;
private SSLSocketFactory sslSocketFactory;

private volatile PacketChannel channel;
protected volatile PacketChannel channel;
private volatile boolean connected;
private volatile long masterServerId = -1;

Expand Down Expand Up @@ -335,10 +335,14 @@ public void setGtidSet(String gtidSet) {
this.binlogFilename = "";
}
synchronized (gtidSetAccessLock) {
this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
this.gtidSet = gtidSet != null ? buildGtidSet(gtidSet) : null;
}
}

protected GtidSet buildGtidSet(String gtidSet) {
return new GtidSet(gtidSet);
}

/**
* @see #setGtidSetFallbackToPurged(boolean)
* @return whether gtid_purged is used as a fallback
Expand Down Expand Up @@ -539,11 +543,7 @@ public void connect() throws IOException, IllegalStateException {

connectionId = greetingPacket.getThreadId();
if ("".equals(binlogFilename)) {
synchronized (gtidSetAccessLock) {
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
gtidSet = new GtidSet(fetchGtidPurged());
}
}
setupGtidSet();
}
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
Expand Down Expand Up @@ -592,8 +592,7 @@ public void connect() throws IOException, IllegalStateException {
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
ensureGtidEventDataDeserializer();
}
}
listenForEventPackets();
Expand Down Expand Up @@ -666,7 +665,7 @@ public Object call() throws Exception {
};
}

private void checkError(byte[] packet) throws IOException {
protected void checkError(byte[] packet) throws IOException {
if (packet[0] == (byte) 0xFF /* error */) {
byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length);
ErrorPacket errorPacket = new ErrorPacket(bytes);
Expand Down Expand Up @@ -710,7 +709,6 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio
return false;
}


private void enableHeartbeat() throws IOException {
channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000));
byte[] statementResult = channel.read();
Expand All @@ -725,7 +723,7 @@ private void setMasterServerId() throws IOException {
}
}

private void requestBinaryLogStream() throws IOException {
protected void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
Expand All @@ -741,7 +739,7 @@ private void requestBinaryLogStream() throws IOException {
channel.write(dumpBinaryLogCommand);
}

private void ensureEventDataDeserializer(EventType eventType,
protected void ensureEventDataDeserializer(EventType eventType,
Class<? extends EventDataDeserializer> eventDataDeserializerClass) {
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType);
if (eventDataDeserializer.getClass() != eventDataDeserializerClass &&
Expand All @@ -758,6 +756,10 @@ private void ensureEventDataDeserializer(EventType eventType,
}
}

protected void ensureGtidEventDataDeserializer() {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
}

private void spawnKeepAliveThread() {
final ExecutorService threadExecutor =
Expand Down Expand Up @@ -902,6 +904,14 @@ private String fetchGtidPurged() throws IOException {
return "";
}

protected void setupGtidSet() throws IOException{
synchronized (gtidSetAccessLock) {
if (gtidSet != null && "".equals(gtidSet.toString()) && gtidSetFallbackToPurged) {
gtidSet = new GtidSet(fetchGtidPurged());
}
}
}

private void fetchBinlogFilenameAndPosition() throws IOException {
ResultSetRowPacket[] resultSet;
channel.write(new QueryCommand("show master status"));
Expand Down Expand Up @@ -1003,7 +1013,7 @@ private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int pac
return result;
}

private void updateClientBinlogFilenameAndPosition(Event event) {
protected void updateClientBinlogFilenameAndPosition(Event event) {
EventHeader eventHeader = event.getHeader();
EventType eventType = eventHeader.getEventType();
if (eventType == EventType.ROTATE) {
Expand All @@ -1022,7 +1032,7 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
}
}

private void updateGtidSet(Event event) {
protected void updateGtidSet(Event event) {
synchronized (gtidSetAccessLock) {
if (gtidSet == null) {
return;
Expand All @@ -1034,6 +1044,15 @@ private void updateGtidSet(Event event) {
GtidEventData gtidEventData = (GtidEventData) EventDataWrapper.internal(event.getData());
gtid = gtidEventData.getGtid();
break;
case MARIADB_GTID:
MariadbGtidEventData mariadbGtidEventData = (MariadbGtidEventData) EventDataWrapper.internal(event.getData());
mariadbGtidEventData.setServerId(eventHeader.getServerId());
gtid = mariadbGtidEventData.toString();
break;
case MARIADB_GTID_LIST:
MariadbGtidListEventData mariadbGtidListEventData = (MariadbGtidListEventData) EventDataWrapper.internal(event.getData());
gtid = mariadbGtidListEventData.getMariaGTIDSet().toString();
break;
case XID:
commitGtid();
tx = false;
Expand All @@ -1044,21 +1063,34 @@ private void updateGtidSet(Event event) {
if (sql == null) {
break;
}
if ("BEGIN".equals(sql)) {
tx = true;
} else
if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
commitGtid();
tx = false;
} else
if (!tx) {
// auto-commit query, likely DDL
commitGtid();
commitGtid(sql);
break;
case ANNOTATE_ROWS:
AnnotateRowsEventData annotateRowsEventData = (AnnotateRowsEventData) EventDeserializer.EventDataWrapper.internal(event.getData());
sql = annotateRowsEventData.getRowsQuery();
if (sql == null) {
break;
}
commitGtid(sql);
break;
default:
}
}

protected void commitGtid(String sql) {
if ("BEGIN".equals(sql)) {
tx = true;
} else
if ("COMMIT".equals(sql) || "ROLLBACK".equals(sql)) {
commitGtid();
tx = false;
} else
if (!tx) {
// auto-commit query, likely DDL
commitGtid();
}
}

private void commitGtid() {
if (gtid != null) {
synchronized (gtidSetAccessLock) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.AnnotateRowsEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidListEventDataDeserializer;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;

import java.io.IOException;

/**
* Mariadb replication stream client.
*
* @author <a href="mailto:[email protected]">Winger</a>
*/
public class MariadbBinaryLogClient extends BinaryLogClient {

private boolean useSendAnnotateRowsEvent;

public MariadbBinaryLogClient(String username, String password) {
super(username, password);
}

public MariadbBinaryLogClient(String schema, String username, String password) {
super(schema, username, password);
}

public MariadbBinaryLogClient(String hostname, int port, String username, String password) {
super(hostname, port, username, password);
}

public MariadbBinaryLogClient(String hostname, int port, String schema, String username, String password) {
super(hostname, port, schema, username, password);
}

@Override
protected GtidSet buildGtidSet(String gtidSet) {
return new MariadbGtidSet(gtidSet);
}

@Override
protected void setupGtidSet() throws IOException {
//Mariadb ignore
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does maria-db not support the gtid_purged variable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Override
protected void requestBinaryLogStream() throws IOException {
long serverId = isBlocking() ? this.getServerId() : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
channel.write(new QueryCommand("SET @mariadb_slave_capability=4"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_connect_state = '" + gtidSet.toString() + "'"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0"));
checkError(channel.read());
channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0"));
checkError(channel.read());
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent());

} else {
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, getBinlogFilename(), getBinlogPosition());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
channel.write(dumpBinaryLogCommand);
}

@Override
protected void ensureGtidEventDataDeserializer() {
ensureEventDataDeserializer(EventType.ANNOTATE_ROWS, AnnotateRowsEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.MARIADB_GTID, MariadbGtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.MARIADB_GTID_LIST, MariadbGtidListEventDataDeserializer.class);
}

public boolean isUseSendAnnotateRowsEvent() {
return useSendAnnotateRowsEvent;
}

public void setUseSendAnnotateRowsEvent(boolean useSendAnnotateRowsEvent) {
this.useSendAnnotateRowsEvent = useSendAnnotateRowsEvent;
}
}
Loading