Skip to content

Commit

Permalink
GH-264: Address code review
Browse files Browse the repository at this point in the history
  • Loading branch information
C0urante committed Sep 10, 2020
1 parent 7171d06 commit c37e8c7
Showing 1 changed file with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,12 @@ public void createOrUpdateTable(TableId table, Set<SinkRecord> records) {
synchronized (lock(tableCreateLocks, table)) {
if (bigQuery.getTable(table) == null) {
logger.debug("{} doesn't exist; creating instead of updating", table(table));
createTable(table, records);
return;
if (createTable(table, records)) {
return;
}
}
}

// Table already existed; attempt to update instead
logger.debug("{} already exists; updating instead of creating", table(table));
updateSchema(table, records);
Expand All @@ -179,13 +180,14 @@ public void createOrUpdateTable(TableId table, Set<SinkRecord> records) {
* Create a new table in BigQuery.
* @param table The BigQuery table to create.
* @param records The sink records used to determine the schema.
* @return whether the table had to be created; if the table already existed, will return false
*/
public void createTable(TableId table, Set<SinkRecord> records) {
public boolean createTable(TableId table, Set<SinkRecord> records) {
synchronized (lock(tableCreateLocks, table)) {
if (schemaCache.containsKey(table)) {
// Table already exists; noop
logger.debug("Skipping create of {} as it should already exist or appear very soon", table(table));
return;
return false;
}
TableInfo tableInfo = getTableInfo(table, records);
logger.info("Attempting to create {} with schema {}",
Expand All @@ -194,11 +196,14 @@ public void createTable(TableId table, Set<SinkRecord> records) {
bigQuery.create(tableInfo);
logger.debug("Successfully created {}", table(table));
schemaCache.put(table, tableInfo.getDefinition().getSchema());
return true;
} catch (BigQueryException e) {
if (e.getCode() == 409) {
logger.debug("Failed to create {} as it already exists (possibly created by another task)", table(table));
schemaCache.put(table, readTableSchema(table));
return false;
}
throw e;
}
}
}
Expand Down

0 comments on commit c37e8c7

Please sign in to comment.