diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java index 15df988e1..78e47f37d 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java @@ -165,11 +165,12 @@ public void createOrUpdateTable(TableId table, Set records) { synchronized (lock(tableCreateLocks, table)) { if (bigQuery.getTable(table) == null) { logger.debug("{} doesn't exist; creating instead of updating", table(table)); - createTable(table, records); - return; + if (createTable(table, records)) { + return; + } } } - + // Table already existed; attempt to update instead logger.debug("{} already exists; updating instead of creating", table(table)); updateSchema(table, records); @@ -179,13 +180,14 @@ public void createOrUpdateTable(TableId table, Set records) { * Create a new table in BigQuery. * @param table The BigQuery table to create. * @param records The sink records used to determine the schema. + * @return whether the table had to be created; if the table already existed, will return false */ - public void createTable(TableId table, Set records) { + public boolean createTable(TableId table, Set records) { synchronized (lock(tableCreateLocks, table)) { if (schemaCache.containsKey(table)) { // Table already exists; noop logger.debug("Skipping create of {} as it should already exist or appear very soon", table(table)); - return; + return false; } TableInfo tableInfo = getTableInfo(table, records); logger.info("Attempting to create {} with schema {}", @@ -194,11 +196,14 @@ public void createTable(TableId table, Set records) { bigQuery.create(tableInfo); logger.debug("Successfully created {}", table(table)); schemaCache.put(table, tableInfo.getDefinition().getSchema()); + return true; } catch (BigQueryException e) { if (e.getCode() == 409) { logger.debug("Failed to create {} as it already exists (possibly created by another task)", table(table)); schemaCache.put(table, readTableSchema(table)); + return false; } + throw e; } } }