Skip to content

Commit

Permalink
增加输出数据的集合或者队列的多种名称规则;
Browse files Browse the repository at this point in the history
  • Loading branch information
sasou2008 committed Jul 25, 2018
1 parent fb4abed commit 4180387
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 6 deletions.
3 changes: 2 additions & 1 deletion bin/SysConfig.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ es.target_port=
#httpmq
ztbiao.target_type=httpmq
ztbiao.target_ip=10.5.5.15
ztbiao.target_port=1218
ztbiao.target_port=1218
ztbiao.target_deep=2
Binary file modified bin/syncClient.jar
Binary file not shown.
4 changes: 4 additions & 0 deletions src/src/com/sync/common/GetProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public GetProperties() {
if (!"".equals(tmp)) {
target_tmp.setPort(Integer.parseInt(tmp));
}
tmp = String.valueOf(p.get(canal.destination[i] + ".target_deep"));
if (!"".equals(tmp)) {
target_tmp.setDeep(Integer.parseInt(tmp));
}
target.put(canal.destination[i], target_tmp);
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/src/com/sync/common/TargetData.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public final class TargetData {
public String type = "";
public String ip = "";
public int port = 0;
public int deep = 1;

/**
* @return the type
Expand Down Expand Up @@ -55,4 +56,20 @@ public int getPort() {
public void setPort(int port) {
this.port = port;
}

/**
* @return the deep
*/
public int getDeep() {
return deep;
}

/**
* @param deep
* the deep to set
*/
public void setDeep(int deep) {
this.deep = deep;
}

}
28 changes: 28 additions & 0 deletions src/src/com/sync/common/Tool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.sync.common;

public class Tool {

public static String makeTargetName(String canal_destination, String db, String table) {
int type = GetProperties.target.get(canal_destination).deep;
String ret = null;
switch(type) {
case 1:
ret = "sync_" + canal_destination + "_" + db + "_" + table;
break;
case 2:
ret = "sync_" + canal_destination + "_" + db;
break;
case 3:
ret = "sync_" + canal_destination;
break;
case 4:
ret = "sync_" + "_" + db + "_" + table;
break;
default:
ret = "sync_" + canal_destination + "_" + db + "_" + table;
break;
}
return ret;
}

}
3 changes: 2 additions & 1 deletion src/src/com/sync/process/ElasticSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.sync.common.EsApi;
import com.sync.common.GetProperties;
import com.sync.common.Tool;
import com.sync.common.WriteLog;
import com.alibaba.fastjson.JSON;

Expand Down Expand Up @@ -117,7 +118,7 @@ private boolean syncEntry(List<Entry> entrys) {
head.put("type", eventType);
data.put("head", head);

topic = "sync_" + entry.getHeader().getSchemaName() + "_" + entry.getHeader().getTableName();
topic = Tool.makeTargetName(canal_destination, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
no = (int) entry.getHeader().getLogfileOffset();
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
Expand Down
5 changes: 3 additions & 2 deletions src/src/com/sync/process/Httpmq.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.sync.common.GetProperties;
import com.sync.common.HttpmqApi;
import com.sync.common.RedisApi;
import com.sync.common.Tool;
import com.sync.common.WriteLog;
import com.alibaba.fastjson.JSON;

Expand Down Expand Up @@ -110,7 +111,7 @@ private boolean syncEntry(List<Entry> entrys) {
head.put("table", entry.getHeader().getTableName());
head.put("type", eventType);
data.put("head", head);
topic = "sync_" + canal_destination + "_" + entry.getHeader().getSchemaName() + "_" + entry.getHeader().getTableName();
topic = Tool.makeTargetName(canal_destination, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
no = (int) entry.getHeader().getLogfileOffset();
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
Expand Down Expand Up @@ -139,7 +140,7 @@ private boolean syncEntry(List<Entry> entrys) {
}
return ret;
}

private Map<String, Object> makeColumn(List<Column> columns) {
Map<String, Object> one = new HashMap<String, Object>();
for (Column column : columns) {
Expand Down
3 changes: 2 additions & 1 deletion src/src/com/sync/process/Kafka.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.sync.common.GetProperties;
import com.sync.common.Tool;
import com.sync.common.WriteLog;
import com.alibaba.fastjson.JSON;

Expand Down Expand Up @@ -126,7 +127,7 @@ private boolean syncEntry(List<Entry> entrys) {
head.put("table", entry.getHeader().getTableName());
head.put("type", eventType);
data.put("head", head);
topic = "sync_" + entry.getHeader().getSchemaName() + "_" + entry.getHeader().getTableName();
topic = Tool.makeTargetName(canal_destination, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
no = (int) entry.getHeader().getLogfileOffset();
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
Expand Down
3 changes: 2 additions & 1 deletion src/src/com/sync/process/Redis.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.sync.common.GetProperties;
import com.sync.common.RedisApi;
import com.sync.common.Tool;
import com.sync.common.WriteLog;
import com.alibaba.fastjson.JSON;

Expand Down Expand Up @@ -109,7 +110,7 @@ private boolean syncEntry(List<Entry> entrys) {
head.put("table", entry.getHeader().getTableName());
head.put("type", eventType);
data.put("head", head);
topic = "sync_" + entry.getHeader().getSchemaName() + "_" + entry.getHeader().getTableName();
topic = Tool.makeTargetName(canal_destination, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
no = (int) entry.getHeader().getLogfileOffset();
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
Expand Down

0 comments on commit 4180387

Please sign in to comment.