|
9 | 9 | import org.slf4j.Logger;
|
10 | 10 | import org.slf4j.LoggerFactory;
|
11 | 11 |
|
12 |
| -import java.util.ArrayList; |
13 |
| -import java.util.List; |
14 |
| -import java.util.Map; |
| 12 | +import java.util.*; |
15 | 13 |
|
16 | 14 | import static com.altinity.clickhouse.sink.connector.model.SinkRecordColumns.*;
|
17 | 15 |
|
@@ -176,29 +174,34 @@ public void setAdditionalMetaData(Map<String, Object> convertedValue) {
|
176 | 174 | }
|
177 | 175 | Struct source = (Struct) convertedValue.get(SOURCE);
|
178 | 176 |
|
| 177 | + List<Field> fields = source.schema().fields(); |
| 178 | + HashSet<String> fieldNames = new HashSet<String>(); |
| 179 | + for(Field f: fields) { |
| 180 | + fieldNames.add(f.name()); |
| 181 | + } |
179 | 182 | try {
|
180 |
| - if (source.get(TS_MS) != null && source.get(TS_MS) instanceof Long) { |
| 183 | + if (fieldNames.contains(TS_MS) && source.get(TS_MS) != null && source.get(TS_MS) instanceof Long) { |
181 | 184 | this.setTs_ms((Long) source.get(TS_MS));
|
182 | 185 | }
|
183 |
| - if (source.get(SNAPSHOT) != null && source.get(SNAPSHOT) instanceof String) { |
| 186 | + if (fieldNames.contains(SNAPSHOT) && source.get(SNAPSHOT) != null && source.get(SNAPSHOT) instanceof String) { |
184 | 187 | this.setSnapshot(Boolean.parseBoolean((String) source.get(SNAPSHOT)));
|
185 | 188 | }
|
186 |
| - if (source.get(SERVER_ID) != null && source.get(SERVER_ID) instanceof Long) { |
| 189 | + if (fieldNames.contains(SERVER_ID) && source.get(SERVER_ID) != null && source.get(SERVER_ID) instanceof Long) { |
187 | 190 | this.setServerId((Long) source.get(SERVER_ID));
|
188 | 191 | }
|
189 |
| - if (source.get(BINLOG_FILE) != null && source.get(BINLOG_FILE) instanceof String) { |
| 192 | + if (fieldNames.contains(BINLOG_FILE) && source.get(BINLOG_FILE) != null && source.get(BINLOG_FILE) instanceof String) { |
190 | 193 | this.setFile((String) source.get(BINLOG_FILE));
|
191 | 194 | }
|
192 |
| - if (source.get(BINLOG_POS) != null && source.get(BINLOG_POS) instanceof Long) { |
| 195 | + if (fieldNames.contains(BINLOG_POS) && source.get(BINLOG_POS) != null && source.get(BINLOG_POS) instanceof Long) { |
193 | 196 | this.setPos((Long) source.get(BINLOG_POS));
|
194 | 197 | }
|
195 |
| - if (source.get(ROW) != null && source.get(ROW) instanceof Integer) { |
| 198 | + if (fieldNames.contains(ROW) && source.get(ROW) != null && source.get(ROW) instanceof Integer) { |
196 | 199 | this.setRow((Integer) source.get(ROW));
|
197 | 200 | }
|
198 |
| - if (source.get(SERVER_THREAD) != null && source.get(SERVER_THREAD) instanceof Integer) { |
| 201 | + if (fieldNames.contains(SERVER_THREAD) && source.get(SERVER_THREAD) != null && source.get(SERVER_THREAD) instanceof Integer) { |
199 | 202 | this.setThread((Integer) convertedValue.get(SERVER_THREAD));
|
200 | 203 | }
|
201 |
| - if(source.get(GTID) != null && source.get(GTID) instanceof String) { |
| 204 | + if(fieldNames.contains(GTID) && source.get(GTID) != null && source.get(GTID) instanceof String) { |
202 | 205 | String[] gtidArray = ((String) source.get(GTID)).split(":");
|
203 | 206 | if(gtidArray.length == 2) {
|
204 | 207 | this.setGtid(Integer.parseInt(gtidArray[1]));
|
|
0 commit comments