Skip to content

Commit

Permalink
[GOBBLIN-1853] Reduce # of Hive calls during schema related updates
Browse files Browse the repository at this point in the history
  • Loading branch information
homatthew committed Jul 18, 2023
1 parent 0a49cdb commit 2024fa0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@

package org.apache.gobblin.hive.writer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -37,20 +28,36 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;

import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
Expand All @@ -67,10 +74,6 @@
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;


/**
* This writer is used to register the hiveSpec into hive metaStore
Expand Down Expand Up @@ -307,9 +310,15 @@ protected static boolean updateLatestSchemaMapWithExistingSchema(String dbName,
return false;
}

HiveTable existingTable = hiveRegister.getTable(dbName, tableName).get();
latestSchemaMap.put(tableKey,
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
HiveTable table = hiveRegister.getTable(dbName, tableName).get();
String latestSchema = table.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
if (latestSchema == null) {
throw new IllegalStateException(String.format("The %s in the table %s.%s is null. This implies the DB is "
+ "misconfigured and was not correctly created through Gobblin, since all Gobblin managed tables should "
+ "have %s", HiveAvroSerDeManager.SCHEMA_LITERAL, dbName, tableName, HiveAvroSerDeManager.SCHEMA_LITERAL));
}

latestSchemaMap.put(tableKey, latestSchema);
return true;
}

Expand Down Expand Up @@ -439,10 +448,14 @@ private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec spec,
return;
}
//Force to set the schema even there is no schema literal defined in the spec
if (latestSchemaMap.containsKey(tableKey)) {
spec.getTable().getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
if (latestSchemaMap.get(tableKey) != null) {
String latestSchema = latestSchemaMap.get(tableKey);
String tableSchema = spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
if (tableSchema == null || !tableSchema.equals(latestSchema)) {
spec.getTable().getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;

import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -107,6 +104,7 @@
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
Expand All @@ -117,13 +115,15 @@
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;

import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;

/**
Expand Down Expand Up @@ -488,13 +488,20 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti
* @return table with updated schema and partition spec
*/
private Table addPartitionToIcebergTable(Table table, String fieldName, String type) {
boolean isTableUpdated = false;
if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit();
isTableUpdated = true;
}
if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
table.updateSpec().addField(fieldName).commit();
isTableUpdated = true;
}

if (isTableUpdated) {
table.refresh();
}
table.refresh();

return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import java.util.Map;
import java.util.function.Function;

import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
Expand All @@ -33,7 +33,6 @@
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -45,6 +44,7 @@
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.thrift.TException;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -63,6 +63,7 @@
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.spec.SimpleHiveSpec;
Expand All @@ -83,7 +84,6 @@
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.function.CheckedExceptionFunction;
import org.mockito.Mockito;

import static org.mockito.Matchers.eq;

Expand Down Expand Up @@ -389,6 +389,12 @@ public void testUpdateLatestSchemaWithExistingSchema() throws IOException {
));
Assert.assertTrue(updateLatestSchema.apply(tableNameAllowed));
Mockito.verify(hiveRegister, Mockito.times(2)).getTable(eq(dbName), eq(tableNameAllowed));

HiveTable tableThatHasNoSchemaLiteral = Mockito.mock(HiveTable.class);
String nameOfTableThatHasNoSchemaLiteral = "improperlyConfiguredTable";
Mockito.when(hiveRegister.getTable(eq(dbName), eq(nameOfTableThatHasNoSchemaLiteral))).thenReturn(Optional.of(tableThatHasNoSchemaLiteral));
Mockito.when(tableThatHasNoSchemaLiteral.getSerDeProps()).thenReturn(new State());
Assert.assertThrows(IllegalStateException.class, () -> updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
}

private String writeRecord(File file) throws IOException {
Expand Down

0 comments on commit 2024fa0

Please sign in to comment.