diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java index 02c0c5895c8..138ac3d9417 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java @@ -17,16 +17,6 @@ package org.apache.gobblin.hive.writer; -import com.codahale.metrics.Timer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import com.google.common.io.Closer; -import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -38,20 +28,37 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; + +import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.io.Closer; +import com.google.common.util.concurrent.ListenableFuture; + +import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.specific.SpecificData; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist; import org.apache.gobblin.hive.HiveRegister; import org.apache.gobblin.hive.HiveRegistrationUnit; import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.avro.HiveAvroSerDeManager; import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister; import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; @@ -68,10 +75,6 @@ import org.apache.gobblin.util.AvroUtils; import org.apache.gobblin.util.ClustersNames; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; - /** * This writer is used to register the hiveSpec into hive metaStore @@ -313,9 +316,15 @@ protected static boolean updateLatestSchemaMapWithExistingSchema(String dbName, return false; } - HiveTable existingTable = hiveRegister.getTable(dbName, tableName).get(); - latestSchemaMap.put(tableKey, - existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())); + HiveTable table = hiveRegister.getTable(dbName, tableName).get(); + String latestSchema = table.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); + if (latestSchema == null) { + throw new IllegalStateException(String.format("The %s in the table %s.%s is null. This implies the DB is " + + "misconfigured and was not correctly created through Gobblin, since all Gobblin managed tables should " + + "have %s", HiveAvroSerDeManager.SCHEMA_LITERAL, dbName, tableName, HiveAvroSerDeManager.SCHEMA_LITERAL)); + } + + latestSchemaMap.put(tableKey, latestSchema); return true; } @@ -445,10 +454,14 @@ private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec spec, return; } //Force to set the schema even there is no schema literal defined in the spec - if (latestSchemaMap.containsKey(tableKey)) { - spec.getTable().getSerDeProps() - .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey)); - HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec); + String latestSchema = latestSchemaMap.get(tableKey); + if (latestSchema != null) { + String tableSchema = spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); + if (tableSchema == null || !tableSchema.equals(latestSchema)) { + spec.getTable().getSerDeProps() + .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey)); + HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec); + } } } diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index 57a28f77851..b6c9e15dc11 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -48,10 +48,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificData; - import org.apache.commons.lang3.tuple.Pair; -import org.apache.gobblin.hive.writer.MetadataWriterKeys; -import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -112,6 +109,7 @@ import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.gobblin.hive.writer.MetadataWriter; +import org.apache.gobblin.hive.writer.MetadataWriterKeys; import org.apache.gobblin.iceberg.Utils.IcebergUtils; import org.apache.gobblin.metadata.GobblinMetadataChangeEvent; import org.apache.gobblin.metadata.OperationType; @@ -122,6 +120,7 @@ import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; import org.apache.gobblin.metrics.kafka.SchemaRegistryException; +import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.time.TimeIterator; import org.apache.gobblin.util.AvroUtils; @@ -129,6 +128,7 @@ import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.util.WriterUtils; + import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*; /** @@ -493,13 +493,20 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti * @return table with updated schema and partition spec */ private Table addPartitionToIcebergTable(Table table, String fieldName, String type) { + boolean isTableUpdated = false; if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) { table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit(); + isTableUpdated = true; } if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) { table.updateSpec().addField(fieldName).commit(); + isTableUpdated = true; + } + + if (isTableUpdated) { + table.refresh(); } - table.refresh(); + return table; } diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java index f50bc81aa9a..aa6c73d5295 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java @@ -388,6 +388,12 @@ public void testUpdateLatestSchemaWithExistingSchema() throws IOException { )); Assert.assertTrue(updateLatestSchema.apply(tableNameAllowed)); Mockito.verify(hiveRegister, Mockito.times(2)).getTable(eq(dbName), eq(tableNameAllowed)); + + HiveTable tableThatHasNoSchemaLiteral = Mockito.mock(HiveTable.class); + String nameOfTableThatHasNoSchemaLiteral = "improperlyConfiguredTable"; + Mockito.when(hiveRegister.getTable(eq(dbName), eq(nameOfTableThatHasNoSchemaLiteral))).thenReturn(Optional.of(tableThatHasNoSchemaLiteral)); + Mockito.when(tableThatHasNoSchemaLiteral.getSerDeProps()).thenReturn(new State()); + Assert.assertThrows(IllegalStateException.class, () -> updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral)); } private String writeRecord(File file) throws IOException {