diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java index 8400dd94b05..d1c30766ffa 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java @@ -17,15 +17,6 @@ package org.apache.gobblin.hive.writer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; -import com.google.common.io.Closer; -import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -37,20 +28,36 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.io.Closer; +import com.google.common.util.concurrent.ListenableFuture; + +import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.specific.SpecificData; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist; import org.apache.gobblin.hive.HiveRegister; import org.apache.gobblin.hive.HiveRegistrationUnit; import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.avro.HiveAvroSerDeManager; import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister; import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; @@ -67,10 +74,6 @@ import org.apache.gobblin.util.AvroUtils; import org.apache.gobblin.util.ClustersNames; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; - /** * This writer is used to register the hiveSpec into hive metaStore @@ -307,9 +310,15 @@ protected static boolean updateLatestSchemaMapWithExistingSchema(String dbName, return false; } - HiveTable existingTable = hiveRegister.getTable(dbName, tableName).get(); - latestSchemaMap.put(tableKey, - existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName())); + HiveTable table = hiveRegister.getTable(dbName, tableName).get(); + String latestSchema = table.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); + if (latestSchema == null) { + throw new IllegalStateException(String.format("The %s in the table %s.%s is null. This implies the DB is " + + "misconfigured and was not correctly created through Gobblin, since all Gobblin managed tables should " + + "have %s", HiveAvroSerDeManager.SCHEMA_LITERAL, dbName, tableName, HiveAvroSerDeManager.SCHEMA_LITERAL)); + } + + latestSchemaMap.put(tableKey, latestSchema); return true; } @@ -439,10 +448,14 @@ private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec spec, return; } //Force to set the schema even there is no schema literal defined in the spec - if (latestSchemaMap.containsKey(tableKey)) { - spec.getTable().getSerDeProps() - .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey)); - HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec); + if (latestSchemaMap.get(tableKey) != null) { + String latestSchema = latestSchemaMap.get(tableKey); + String tableSchema = spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); + if (tableSchema == null || !tableSchema.equals(latestSchema)) { + spec.getTable().getSerDeProps() + .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey)); + HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec); + } } } diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index e643b1dbb3a..898ec6e6789 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -46,9 +46,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificData; - -import org.apache.gobblin.hive.writer.MetadataWriterKeys; -import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -107,6 +104,7 @@ import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.gobblin.hive.writer.MetadataWriter; +import org.apache.gobblin.hive.writer.MetadataWriterKeys; import org.apache.gobblin.iceberg.Utils.IcebergUtils; import org.apache.gobblin.metadata.GobblinMetadataChangeEvent; import org.apache.gobblin.metadata.OperationType; @@ -117,6 +115,7 @@ import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; import org.apache.gobblin.metrics.kafka.SchemaRegistryException; +import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.time.TimeIterator; import org.apache.gobblin.util.AvroUtils; @@ -124,6 +123,7 @@ import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.util.WriterUtils; + import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*; /** @@ -488,13 +488,20 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti * @return table with updated schema and partition spec */ private Table addPartitionToIcebergTable(Table table, String fieldName, String type) { + boolean isTableUpdated = false; if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) { table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit(); + isTableUpdated = true; } if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) { table.updateSpec().addField(fieldName).commit(); + isTableUpdated = true; + } + + if (isTableUpdated) { + table.refresh(); } - table.refresh(); + return table; } diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java index 392c57f9cc8..dd07b5b0587 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java @@ -22,9 +22,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; - import java.util.Map; import java.util.function.Function; + import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -33,7 +33,6 @@ import org.apache.avro.io.DatumWriter; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +44,7 @@ import org.apache.iceberg.hive.HiveMetastoreTest; import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.thrift.TException; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -63,6 +63,7 @@ import org.apache.gobblin.hive.HiveRegister; import org.apache.gobblin.hive.HiveRegistrationUnit; import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister; import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.gobblin.hive.spec.SimpleHiveSpec; @@ -83,7 +84,6 @@ import org.apache.gobblin.util.ClustersNames; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.function.CheckedExceptionFunction; -import org.mockito.Mockito; import static org.mockito.Matchers.eq; @@ -389,6 +389,12 @@ public void testUpdateLatestSchemaWithExistingSchema() throws IOException { )); Assert.assertTrue(updateLatestSchema.apply(tableNameAllowed)); Mockito.verify(hiveRegister, Mockito.times(2)).getTable(eq(dbName), eq(tableNameAllowed)); + + HiveTable tableThatHasNoSchemaLiteral = Mockito.mock(HiveTable.class); + String nameOfTableThatHasNoSchemaLiteral = "improperlyConfiguredTable"; + Mockito.when(hiveRegister.getTable(eq(dbName), eq(nameOfTableThatHasNoSchemaLiteral))).thenReturn(Optional.of(tableThatHasNoSchemaLiteral)); + Mockito.when(tableThatHasNoSchemaLiteral.getSerDeProps()).thenReturn(new State()); + Assert.assertThrows(IllegalStateException.class, () -> updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral)); } private String writeRecord(File file) throws IOException {