From fdbde319c54f61709885b6a81a91a0711ba65250 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 17 Jun 2020 15:02:33 -0700 Subject: [PATCH] GH-264: Minor refactoring --- .../kafka/connect/bigquery/SchemaManager.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java index fb49450aa..15df988e1 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java @@ -197,8 +197,7 @@ public void createTable(TableId table, Set records) { } catch (BigQueryException e) { if (e.getCode() == 409) { logger.debug("Failed to create {} as it already exists (possibly created by another task)", table(table)); - com.google.cloud.bigquery.Schema schema = bigQuery.getTable(table).getDefinition().getSchema(); - schemaCache.put(table, schema); + schemaCache.put(table, readTableSchema(table)); } } } @@ -210,12 +209,10 @@ public void createTable(TableId table, Set records) { * @param records The sink records used to update the schema. */ public void updateSchema(TableId table, Set records) { - synchronized (tableUpdateLocks.computeIfAbsent(table, t -> new Object())) { + synchronized (lock(tableUpdateLocks, table)) { TableInfo tableInfo = getTableInfo(table, records); - if (!schemaCache.containsKey(table)) { - logger.debug("Reading schema for {}", table(table)); - schemaCache.put(table, bigQuery.getTable(table).getDefinition().getSchema()); + schemaCache.put(table, readTableSchema(table)); } if (!schemaCache.get(table).equals(tableInfo.getDefinition().getSchema())) { @@ -453,6 +450,11 @@ private String table(TableId table) { + table; } + private com.google.cloud.bigquery.Schema readTableSchema(TableId table) { + logger.trace("Reading schema for {}", table(table)); + return bigQuery.getTable(table).getDefinition().getSchema(); + } + private Object lock(ConcurrentMap locks, TableId table) { return locks.computeIfAbsent(table, t -> new Object()); }