Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
sasou committed Mar 6, 2018
2 parents 427af6c + bd9782c commit a39fc2c
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 80 deletions.
17 changes: 13 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ search build
Topic规则:数据库的每个表有单独的topic,如数据库admin的user表,对应的kafka主题名为:sync_admin_user  
Topic数据字段:

插入数据
插入数据同步格式
{
"head": {
"binlog_pos": 53036,
Expand All @@ -82,7 +82,7 @@ Topic数据字段:
]
}

修改数据
修改数据同步格式
{
"head": {
"binlog_pos": 53036,
Expand Down Expand Up @@ -115,7 +115,7 @@ Topic数据字段:
]
}

删除数据
删除数据同步格式
{
"head": {
"binlog_pos": 53036,
Expand Down Expand Up @@ -158,4 +158,13 @@ List规则:数据库的每个表有单独的list,如数据库admin的user表

**Elasticsearch**

规则:同步index = sync,如数据库admin的user表,对应的Elasticsearch type名为:admin_user  
规则:数据库的每个表有单独的Elasticsearch index,如数据库admin的user表,对应的es index名为:sync_admin_user, index type 为default;

Elasticsearch同步数据的head中有id字段;

Mysql 同步到 Elasticsearch注意事项:

1、表需要有一个唯一id主键;
2、表时间字段datetime会转为es的时间字段,其他字段对应es的文本类型;
3、主键、时间字段禁止修改,其他字段尽量提前规划好;

17 changes: 6 additions & 11 deletions bin/SysConfig.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@
system.debug=1

#canal
canal.ip=127.0.0.1
canal.ip=10.5.3.70
canal.port=11111
canal.destination=sdsw,epos,es
canal.destination=es,redis
canal.username=
canal.password=
canal.filter=

#redis
sdsw.target_type=redis
sdsw.target_ip=192.168.27.101
sdsw.target_port=

#kafka
epos.target_type=kafka
epos.target_ip=127.0.0.1
epos.target_port=
redis.target_type=redis
redis.target_ip=10.5.3.67
redis.target_port=

#elasticsearch
es.target_type=elasticsearch
es.target_ip=127.0.0.1
es.target_ip=10.5.3.72
es.target_port=
Binary file modified bin/syncClient.jar
Binary file not shown.
21 changes: 8 additions & 13 deletions src/SysConfig.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@
system.debug=1

#canal
canal.ip=127.0.0.1
canal.ip=10.5.3.70
canal.port=11111
canal.destination=sdsw
canal.destination=es,redis
canal.username=
canal.password=
canal.filter=

#redis
test.target_type=redis
test.target_ip=127.0.0.1
test.target_port=

#kafka
epos.target_type=kafka
epos.target_ip=127.0.0.1
epos.target_port=
redis.target_type=redis
redis.target_ip=10.5.3.67
redis.target_port=

#elasticsearch
sdsw.target_type=elasticsearch
sdsw.target_ip=10.5.3.66
sdsw.target_port=
es.target_type=elasticsearch
es.target_ip=10.5.3.72
es.target_port=
14 changes: 3 additions & 11 deletions src/src/com/sync/common/EsApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@ public boolean sync(String index, String content) throws Exception {
Map<String,Object> data = jsonToMap(content);
Map<String,Object> head = jsonToMap((String) data.get("head").toString());
String type = (String) head.get("type").toString();
String db = (String) head.get("db").toString();
String table = (String) head.get("table").toString();
String id = (String) head.get("id").toString();
String text = "";
switch(type) {
case "INSERT":
text = (String) data.get("after").toString();
if (!"".equals(text)) {
try {
return insert("sync" + "-" + db + "-"+ table, "default", id, text);
return insert(index, "default", id, text);
} catch (Exception e) {
throw new Exception("elasticsearch insert fail", e);
}
Expand All @@ -63,7 +61,7 @@ public boolean sync(String index, String content) throws Exception {
text = (String) data.get("after").toString();
if (!"".equals(id)) {
try {
return update("sync" + "-" + db + "-"+ table, "default", id, text);
return update(index, "default", id, text);
} catch (Exception e) {
throw new Exception("elasticsearch update fail", e);
}
Expand All @@ -72,7 +70,7 @@ public boolean sync(String index, String content) throws Exception {
case "DELETE":
if (!"".equals(id)) {
try {
return delete("sync" + "-" + db + "-"+ table, "default", id);
return delete(index, "default", id);
} catch (Exception e) {

}
Expand Down Expand Up @@ -101,9 +99,6 @@ public static Map<String,Object> jsonToMap(String jsonObj) {
* @throws Exception
*/
public boolean insert(String index, String type, String id, String content) throws Exception {
if (!index("sync-sdsw-sys_log")) {
System.out.println(setMappings("sync-sdsw-sys_log"));
}
Map<String, String> params = Collections.emptyMap();
HttpEntity entity = new NStringEntity(content, ContentType.APPLICATION_JSON);
Response response = rs.performRequest("PUT", "/" + index + "/" + type + "/" + id, params, entity);
Expand All @@ -119,9 +114,6 @@ public boolean insert(String index, String type, String id, String content) thro
* @throws Exception
*/
public boolean update(String index, String type, String id, String content) throws Exception {
if (!index("sync-sdsw-sys_log")) {
System.out.println(setMappings("sync-sdsw-sys_log"));
}
Map<String, String> params = Collections.emptyMap();
HttpEntity entity = new NStringEntity(content, ContentType.APPLICATION_JSON);
Response response = rs.performRequest("PUT", "/" + index + "/" + type + "/" + id, params, entity);
Expand Down
2 changes: 1 addition & 1 deletion src/src/com/sync/common/GetProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.sync.common.ReadProperties;

/**
* config
* GetProperties
*
* @author sasou <[email protected]> web:http://www.php-gene.com/
* @version 1.0.0
Expand Down
44 changes: 16 additions & 28 deletions src/src/com/sync/common/RedisApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
* config
* RedisApi
*
* @author sasou <[email protected]> web:http://www.php-gene.com/
* @version 1.0.0
Expand All @@ -22,28 +22,18 @@ public class RedisApi {
public RedisApi(String name) {
canal_destination = name;
JedisPoolConfig config = new JedisPoolConfig();
// 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(1000);
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(50);
// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000 * 10);

// 在获取连接的时候检查有效性, 默认false
config.setTestOnBorrow(false);
// 在空闲时检查有效性, 默认false
config.setTestWhileIdle(false);

// 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
config.setBlockWhenExhausted(true);

config.setMinEvictableIdleTimeMillis(300000);
pool = new JedisPool(config, GetProperties.target.get(canal_destination).ip, GetProperties.target.get(canal_destination).port, 1000 * 10);
}

/**
* 返还到连接池
* return Resource to pool
*
* @param pool
* @param redis
Expand All @@ -55,7 +45,7 @@ public void returnResource(JedisPool pool, Jedis redis) {
}

/**
* 获取数据
* get data
*
* @param key
* @return
Expand All @@ -74,7 +64,7 @@ public String get(String key) throws Exception {
}

/**
* 获取Set数据
* zrange data
*
* @param key
* @return
Expand All @@ -94,7 +84,7 @@ public Set zrange(String key) throws Exception {
}

/**
* 获取数据
* lrange data
*
* @param key
* @return
Expand All @@ -114,7 +104,7 @@ public List lrange(String key) throws Exception {
}

/**
* 写String数据
* set string
*
* @param key
* @return
Expand All @@ -132,7 +122,7 @@ public void set(String key, String value) throws Exception {
}

/**
* 写set数据
* set data
*
* @param key
* @return
Expand All @@ -150,7 +140,7 @@ public void zadd(String key, String member) throws Exception {
}

/**
* 写List数据 左添加
* push list in left
*
* @param key
* @return
Expand All @@ -167,7 +157,7 @@ public void lpush(String key, String member) throws Exception {
}

/**
* 写List数据 右添加
* push list in right
*
* @param key
* @return
Expand All @@ -184,7 +174,7 @@ public void rpush(String key, String member) throws Exception {
}

/**
* 写数据
* exists
*
* @param key
* @return
Expand All @@ -204,7 +194,7 @@ public boolean exists(String key) throws Exception {
}

/**
* 写数据
* del
*
* @param key
* @return
Expand All @@ -221,7 +211,7 @@ public void del(String key) throws Exception {
}

/**
* 删除List元素
* lrem
*
* @param key
* @return
Expand All @@ -238,7 +228,7 @@ public void lrem(String key, String member) throws Exception {
}

/**
* 删除set元素
* zrem
*
* @param key
* @return
Expand All @@ -255,11 +245,10 @@ public void zrem(String key, String member) throws Exception {
}

/**
* 设置过期时间
* expire
*
* @param key
* @param num
* 过期时间 分钟
*/
public void expire(String key, int num) throws Exception {
Jedis jedis = null;
Expand All @@ -273,7 +262,7 @@ public void expire(String key, int num) throws Exception {
}

/**
* 执行+1操作
* incr 1
*
* @param key
*/
Expand All @@ -289,10 +278,9 @@ public void incr(String key) throws Exception {
}

/**
* 清空
* clear
*
* @param num
* 过期时间 分钟
*/
public void clear() throws Exception {
Jedis jedis = null;
Expand Down
11 changes: 5 additions & 6 deletions src/src/com/sync/common/WriteLog.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.sync.common;

/**
* TargetData
* WriteLog
*
* @author sasou <[email protected]> web:http://www.php-gene.com/
* @version 1.0.0
Expand All @@ -11,10 +11,9 @@
import java.util.Calendar;

/**
* 写日志 写logString字符串到./log目录下的文件中
* write logString
*
* @param logString
* 日志字符串
* @author tower
*/
public class WriteLog {
Expand All @@ -25,7 +24,7 @@ public static void write(String type, String logString) {
base = System.getProperty("user.dir");
}

String current = base + "\\logs\\";
String current = base + "/logs/";
try {
String logFilePathName = null;
Calendar cd = Calendar.getInstance();
Expand All @@ -46,7 +45,7 @@ public static void write(String type, String logString) {

FileOutputStream fos = new FileOutputStream(logFilePathName, true);
String time = "[" + year + "-" + month + "-" + day + " " + hour + ":" + min + ":" + sec + "] ";
String content = time + logString + "\n\n";
String content = time + logString + "\r\n";
fos.write(content.getBytes());
fos.flush();
fos.close();
Expand All @@ -59,7 +58,7 @@ public static void write(String type, String logString) {
}

/**
* 整数i小于10则前面补0
* add 0
*
* @param i
* @return
Expand Down
Loading

0 comments on commit a39fc2c

Please sign in to comment.