Skip to content

Commit

Permalink
GH-264: Minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
C0urante committed Sep 10, 2020
1 parent 5901eb0 commit fdbde31
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ public void createTable(TableId table, Set<SinkRecord> records) {
} catch (BigQueryException e) {
if (e.getCode() == 409) {
logger.debug("Failed to create {} as it already exists (possibly created by another task)", table(table));
com.google.cloud.bigquery.Schema schema = bigQuery.getTable(table).getDefinition().getSchema();
schemaCache.put(table, schema);
schemaCache.put(table, readTableSchema(table));
}
}
}
Expand All @@ -210,12 +209,10 @@ public void createTable(TableId table, Set<SinkRecord> records) {
* @param records The sink records used to update the schema.
*/
public void updateSchema(TableId table, Set<SinkRecord> records) {
synchronized (tableUpdateLocks.computeIfAbsent(table, t -> new Object())) {
synchronized (lock(tableUpdateLocks, table)) {
TableInfo tableInfo = getTableInfo(table, records);

if (!schemaCache.containsKey(table)) {
logger.debug("Reading schema for {}", table(table));
schemaCache.put(table, bigQuery.getTable(table).getDefinition().getSchema());
schemaCache.put(table, readTableSchema(table));
}

if (!schemaCache.get(table).equals(tableInfo.getDefinition().getSchema())) {
Expand Down Expand Up @@ -453,6 +450,11 @@ private String table(TableId table) {
+ table;
}

private com.google.cloud.bigquery.Schema readTableSchema(TableId table) {
logger.trace("Reading schema for {}", table(table));
return bigQuery.getTable(table).getDefinition().getSchema();
}

private Object lock(ConcurrentMap<TableId, Object> locks, TableId table) {
return locks.computeIfAbsent(table, t -> new Object());
}
Expand Down

0 comments on commit fdbde31

Please sign in to comment.