diff --git a/README.md b/README.md index 365cdfd..28ba89d 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ search build Topic规则:数据库的每个表有单独的topic,如数据库admin的user表,对应的kafka主题名为:sync_admin_user   Topic数据字段: - 插入数据: + 插入数据同步格式: { "head": { "binlog_pos": 53036, @@ -82,7 +82,7 @@ Topic数据字段: ] } - 修改数据: + 修改数据同步格式: { "head": { "binlog_pos": 53036, @@ -115,7 +115,7 @@ Topic数据字段: ] } - 删除数据: + 删除数据同步格式: { "head": { "binlog_pos": 53036, @@ -158,4 +158,13 @@ List规则:数据库的每个表有单独的list,如数据库admin的user表 **Elasticsearch** -规则:同步index = sync,如数据库admin的user表,对应的Elasticsearch type名为:admin_user   +规则:数据库的每个表有单独的Elasticsearch index,如数据库admin的user表,对应的es index名为:sync_admin_user, index type 为default; + +Elasticsearch同步数据的head中有id字段; + +Mysql 同步到 Elasticsearch注意事项: + +1、表需要有一个唯一id主键; +2、表时间字段datetime会转为es的时间字段,其他字段对应es的文本类型; +3、主键、时间字段禁止修改,其他字段尽量提前规划好; + diff --git a/bin/SysConfig.properties b/bin/SysConfig.properties index e8c1d7b..4d4f562 100644 --- a/bin/SysConfig.properties +++ b/bin/SysConfig.properties @@ -2,24 +2,19 @@ system.debug=1 #canal -canal.ip=127.0.0.1 +canal.ip=10.5.3.70 canal.port=11111 -canal.destination=sdsw,epos,es +canal.destination=es,redis canal.username= canal.password= canal.filter= #redis -sdsw.target_type=redis -sdsw.target_ip=192.168.27.101 -sdsw.target_port= - -#kafka -epos.target_type=kafka -epos.target_ip=127.0.0.1 -epos.target_port= +redis.target_type=redis +redis.target_ip=10.5.3.67 +redis.target_port= #elasticsearch es.target_type=elasticsearch -es.target_ip=127.0.0.1 +es.target_ip=10.5.3.72 es.target_port= \ No newline at end of file diff --git a/bin/syncClient.jar b/bin/syncClient.jar index f3a2cc9..77d47fc 100644 Binary files a/bin/syncClient.jar and b/bin/syncClient.jar differ diff --git a/src/SysConfig.properties b/src/SysConfig.properties index cb7536e..4d4f562 100644 --- a/src/SysConfig.properties +++ b/src/SysConfig.properties @@ -2,24 +2,19 @@ system.debug=1 #canal -canal.ip=127.0.0.1 +canal.ip=10.5.3.70 canal.port=11111 -canal.destination=sdsw +canal.destination=es,redis canal.username= canal.password= canal.filter= #redis -test.target_type=redis -test.target_ip=127.0.0.1 -test.target_port= - -#kafka -epos.target_type=kafka -epos.target_ip=127.0.0.1 -epos.target_port= +redis.target_type=redis +redis.target_ip=10.5.3.67 +redis.target_port= #elasticsearch -sdsw.target_type=elasticsearch -sdsw.target_ip=10.5.3.66 -sdsw.target_port= \ No newline at end of file +es.target_type=elasticsearch +es.target_ip=10.5.3.72 +es.target_port= \ No newline at end of file diff --git a/src/src/com/sync/common/EsApi.java b/src/src/com/sync/common/EsApi.java index abab97f..256f8dd 100644 --- a/src/src/com/sync/common/EsApi.java +++ b/src/src/com/sync/common/EsApi.java @@ -44,8 +44,6 @@ public boolean sync(String index, String content) throws Exception { Map data = jsonToMap(content); Map head = jsonToMap((String) data.get("head").toString()); String type = (String) head.get("type").toString(); - String db = (String) head.get("db").toString(); - String table = (String) head.get("table").toString(); String id = (String) head.get("id").toString(); String text = ""; switch(type) { @@ -53,7 +51,7 @@ public boolean sync(String index, String content) throws Exception { text = (String) data.get("after").toString(); if (!"".equals(text)) { try { - return insert("sync" + "-" + db + "-"+ table, "default", id, text); + return insert(index, "default", id, text); } catch (Exception e) { throw new Exception("elasticsearch insert fail", e); } @@ -63,7 +61,7 @@ public boolean sync(String index, String content) throws Exception { text = (String) data.get("after").toString(); if (!"".equals(id)) { try { - return update("sync" + "-" + db + "-"+ table, "default", id, text); + return update(index, "default", id, text); } catch (Exception e) { throw new Exception("elasticsearch update fail", e); } @@ -72,7 +70,7 @@ public boolean sync(String index, String content) throws Exception { case "DELETE": if (!"".equals(id)) { try { - return delete("sync" + "-" + db + "-"+ table, "default", id); + return delete(index, "default", id); } catch (Exception e) { } @@ -101,9 +99,6 @@ public static Map jsonToMap(String jsonObj) { * @throws Exception */ public boolean insert(String index, String type, String id, String content) throws Exception { - if (!index("sync-sdsw-sys_log")) { - System.out.println(setMappings("sync-sdsw-sys_log")); - } Map params = Collections.emptyMap(); HttpEntity entity = new NStringEntity(content, ContentType.APPLICATION_JSON); Response response = rs.performRequest("PUT", "/" + index + "/" + type + "/" + id, params, entity); @@ -119,9 +114,6 @@ public boolean insert(String index, String type, String id, String content) thro * @throws Exception */ public boolean update(String index, String type, String id, String content) throws Exception { - if (!index("sync-sdsw-sys_log")) { - System.out.println(setMappings("sync-sdsw-sys_log")); - } Map params = Collections.emptyMap(); HttpEntity entity = new NStringEntity(content, ContentType.APPLICATION_JSON); Response response = rs.performRequest("PUT", "/" + index + "/" + type + "/" + id, params, entity); diff --git a/src/src/com/sync/common/GetProperties.java b/src/src/com/sync/common/GetProperties.java index 54aaf42..c8bf0d0 100644 --- a/src/src/com/sync/common/GetProperties.java +++ b/src/src/com/sync/common/GetProperties.java @@ -7,7 +7,7 @@ import com.sync.common.ReadProperties; /** - * config + * GetProperties * * @author sasou web:http://www.php-gene.com/ * @version 1.0.0 diff --git a/src/src/com/sync/common/RedisApi.java b/src/src/com/sync/common/RedisApi.java index 5d03c64..6053f5f 100644 --- a/src/src/com/sync/common/RedisApi.java +++ b/src/src/com/sync/common/RedisApi.java @@ -9,7 +9,7 @@ import redis.clients.jedis.exceptions.JedisConnectionException; /** - * config + * RedisApi * * @author sasou web:http://www.php-gene.com/ * @version 1.0.0 @@ -22,28 +22,18 @@ public class RedisApi { public RedisApi(String name) { canal_destination = name; JedisPoolConfig config = new JedisPoolConfig(); - // 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取; - // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 config.setMaxTotal(1000); - // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。 config.setMaxIdle(50); - // 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException; config.setMaxWaitMillis(1000 * 10); - - // 在获取连接的时候检查有效性, 默认false config.setTestOnBorrow(false); - // 在空闲时检查有效性, 默认false config.setTestWhileIdle(false); - - // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true config.setBlockWhenExhausted(true); - config.setMinEvictableIdleTimeMillis(300000); pool = new JedisPool(config, GetProperties.target.get(canal_destination).ip, GetProperties.target.get(canal_destination).port, 1000 * 10); } /** - * 返还到连接池 + * return Resource to pool * * @param pool * @param redis @@ -55,7 +45,7 @@ public void returnResource(JedisPool pool, Jedis redis) { } /** - * 获取数据 + * get data * * @param key * @return @@ -74,7 +64,7 @@ public String get(String key) throws Exception { } /** - * 获取Set数据 + * zrange data * * @param key * @return @@ -94,7 +84,7 @@ public Set zrange(String key) throws Exception { } /** - * 获取数据 + * lrange data * * @param key * @return @@ -114,7 +104,7 @@ public List lrange(String key) throws Exception { } /** - * 写String数据 + * set string * * @param key * @return @@ -132,7 +122,7 @@ public void set(String key, String value) throws Exception { } /** - * 写set数据 + * set data * * @param key * @return @@ -150,7 +140,7 @@ public void zadd(String key, String member) throws Exception { } /** - * 写List数据 左添加 + * push list in left * * @param key * @return @@ -167,7 +157,7 @@ public void lpush(String key, String member) throws Exception { } /** - * 写List数据 右添加 + * push list in right * * @param key * @return @@ -184,7 +174,7 @@ public void rpush(String key, String member) throws Exception { } /** - * 写数据 + * exists * * @param key * @return @@ -204,7 +194,7 @@ public boolean exists(String key) throws Exception { } /** - * 写数据 + * del * * @param key * @return @@ -221,7 +211,7 @@ public void del(String key) throws Exception { } /** - * 删除List元素 + * lrem * * @param key * @return @@ -238,7 +228,7 @@ public void lrem(String key, String member) throws Exception { } /** - * 删除set元素 + * zrem * * @param key * @return @@ -255,11 +245,10 @@ public void zrem(String key, String member) throws Exception { } /** - * 设置过期时间 + * expire * * @param key * @param num - * 过期时间 分钟 */ public void expire(String key, int num) throws Exception { Jedis jedis = null; @@ -273,7 +262,7 @@ public void expire(String key, int num) throws Exception { } /** - * 执行+1操作 + * incr 1 * * @param key */ @@ -289,10 +278,9 @@ public void incr(String key) throws Exception { } /** - * 清空 + * clear * * @param num - * 过期时间 分钟 */ public void clear() throws Exception { Jedis jedis = null; diff --git a/src/src/com/sync/common/WriteLog.java b/src/src/com/sync/common/WriteLog.java index e05ae92..6eff916 100644 --- a/src/src/com/sync/common/WriteLog.java +++ b/src/src/com/sync/common/WriteLog.java @@ -1,7 +1,7 @@ package com.sync.common; /** - * TargetData + * WriteLog * * @author sasou web:http://www.php-gene.com/ * @version 1.0.0 @@ -11,10 +11,9 @@ import java.util.Calendar; /** - * 写日志 写logString字符串到./log目录下的文件中 + * write logString * * @param logString - * 日志字符串 * @author tower */ public class WriteLog { @@ -25,7 +24,7 @@ public static void write(String type, String logString) { base = System.getProperty("user.dir"); } - String current = base + "\\logs\\"; + String current = base + "/logs/"; try { String logFilePathName = null; Calendar cd = Calendar.getInstance(); @@ -46,7 +45,7 @@ public static void write(String type, String logString) { FileOutputStream fos = new FileOutputStream(logFilePathName, true); String time = "[" + year + "-" + month + "-" + day + " " + hour + ":" + min + ":" + sec + "] "; - String content = time + logString + "\n\n"; + String content = time + logString + "\r\n"; fos.write(content.getBytes()); fos.flush(); fos.close(); @@ -59,7 +58,7 @@ public static void write(String type, String logString) { } /** - * 整数i小于10则前面补0 + * add 0 * * @param i * @return diff --git a/src/src/com/sync/process/ElasticSearch.java b/src/src/com/sync/process/ElasticSearch.java index 98cdab3..b0e2108 100644 --- a/src/src/com/sync/process/ElasticSearch.java +++ b/src/src/com/sync/process/ElasticSearch.java @@ -135,10 +135,10 @@ private boolean syncEntry(List entrys) { try { ret = es.sync(topic, text); if (GetProperties.system_debug > 0) { - WriteLog.write(canal_destination, thread_name + "data(" + topic + "," + no + ", " + text + ")"); + WriteLog.write(canal_destination + ".access", thread_name + "data(" + topic + "," + no + ", " + text + ")"); } } catch (Exception e) { - WriteLog.write(canal_destination, thread_name + e.getMessage()); + WriteLog.write(canal_destination + ".error", thread_name + e.getMessage()); ret = false; } } diff --git a/src/src/com/sync/process/Kafka.java b/src/src/com/sync/process/Kafka.java index e362b90..ea09180 100644 --- a/src/src/com/sync/process/Kafka.java +++ b/src/src/com/sync/process/Kafka.java @@ -144,10 +144,10 @@ private boolean syncEntry(List entrys) { ret = false; } if (GetProperties.system_debug > 0) { - WriteLog.write(canal_destination, thread_name + "data(" + topic + "," + no + ", " + text + ")"); + WriteLog.write(canal_destination + ".access", thread_name + "data(" + topic + "," + no + ", " + text + ")"); } } catch (InterruptedException | ExecutionException e) { - WriteLog.write(canal_destination, thread_name + "kafka link failure!"); + WriteLog.write(canal_destination + ".error", thread_name + "kafka link failure!"); ret = false; } } diff --git a/src/src/com/sync/process/Redis.java b/src/src/com/sync/process/Redis.java index 0b65eed..e6915fa 100644 --- a/src/src/com/sync/process/Redis.java +++ b/src/src/com/sync/process/Redis.java @@ -124,10 +124,10 @@ private boolean syncEntry(List entrys) { try { RedisPool.rpush(topic, text); if (GetProperties.system_debug > 0) { - WriteLog.write(canal_destination, thread_name + "data(" + topic + "," + no + ", " + text + ")"); + WriteLog.write(canal_destination + ".access", thread_name + "data(" + topic + "," + no + ", " + text + ")"); } } catch (Exception e) { - WriteLog.write(canal_destination, thread_name + "redis link failure!"); + WriteLog.write(canal_destination + ".error", thread_name + "redis link failure!"); ret = false; } }