From 14d3260e5075894ac9f693e6f4abd6b2c453c953 Mon Sep 17 00:00:00 2001 From: psainics Date: Tue, 20 Aug 2024 10:23:37 +0530 Subject: [PATCH] Add validation for logical type (array/repeated) --- .../gcp/bigquery/util/BigQueryUtil.java | 23 +++++++++++---- .../gcp/bigquery/util/BigQueryUtilTest.java | 29 +++++++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index e8eb019e0..7f7c1242d 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -438,15 +438,28 @@ public static ValidationFailure validateFieldSchemaMatches(Field bqField, Schema if (bqField.getMode() == Field.Mode.REPEATED) { fieldSchema = fieldSchema.getComponentSchema(); type = fieldSchema.getType(); + logicalType = fieldSchema.getLogicalType(); } } + String[] incompatibleFieldErrorMessage = { + String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' in BigQuery table '%s.%s'.", + field.getName(), fieldSchema.getDisplayName(), bqField.getName(), + BQ_TYPE_MAP.get(bqField.getType()), dataset, table) , + String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType())) + }; + if (logicalType != null) { + if (LOGICAL_TYPE_MAP.get(logicalType) != null && !LOGICAL_TYPE_MAP.get(logicalType).contains(bqField.getType())) { + return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]); + } + + // Return once logical types are validated. This is because logical types are represented as primitive types + // internally. + return null; + } + if (TYPE_MAP.get(type) != null && !TYPE_MAP.get(type).contains(bqField.getType())) { - return collector.addFailure( - String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' " + - "in BigQuery table '%s.%s'.", field.getName(), fieldSchema.getDisplayName(), bqField.getName(), - BQ_TYPE_MAP.get(bqField.getType()), dataset, table), - String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType()))); + return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]); } return null; } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java index a09048e2d..b199caabd 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java @@ -23,6 +23,7 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric; import io.cdap.plugin.gcp.common.GCPUtils; @@ -478,4 +479,32 @@ public void testConvertFieldTypeJsonToString() { Schema result = BigQueryUtil.convertFieldType(field, null, null); Assert.assertEquals(expectedSchema, result); } + + @Test + public void testValidateFieldSchemaMatchesDate() { + MockFailureCollector collector = new MockFailureCollector(); + Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE) + .setMode(Field.Mode.REPEATED).build(); + Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate", + Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.LogicalType.DATE)))); + ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset", + "table", BigQuerySourceConfig.SUPPORTED_TYPES, collector); + Assert.assertNull(failure); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testValidateFieldSchemaNotMatchesDate() { + MockFailureCollector collector = new MockFailureCollector(); + Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE) + .setMode(Field.Mode.REPEATED).build(); + Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate", + Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING)))); + ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset", + "table", BigQuerySourceConfig.SUPPORTED_TYPES, collector); + Assert.assertNotNull(failure); + Assert.assertEquals(1, collector.getValidationFailures().size()); + Assert.assertEquals("Field 'testFieldRepeatedDate' of type 'string' is incompatible with" + + " column 'testFieldRepeatedDate' of type 'date' in BigQuery table 'dataset.table'.", failure.getMessage()); + } }