From 2beb724243a9db91b583ec4479e0c3ae993363f7 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Sat, 28 May 2022 22:41:51 -0400 Subject: [PATCH 01/10] DRILL-8239: Convert JSON UDF to EVF --- .../expr/fn/impl/conv/JsonConvertFrom.java | 105 ++++++++++++------ .../easy/json/loader/JsonLoaderImpl.java | 5 + 2 files changed, 74 insertions(+), 36 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index bbfc2829147..85ddb23515a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -18,8 +18,6 @@ package org.apache.drill.exec.expr.fn.impl.conv; -import io.netty.buffer.DrillBuf; - import javax.inject.Inject; import org.apache.drill.exec.expr.DrillSimpleFunc; @@ -32,12 +30,12 @@ import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; public class JsonConvertFrom { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonConvertFrom.class); - private JsonConvertFrom() { } @@ -45,24 +43,32 @@ private JsonConvertFrom() { public static class ConvertFromJson implements DrillSimpleFunc { @Param VarBinaryHolder in; - @Inject DrillBuf buffer; - @Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + @Inject + ResultSetLoader loader; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + + @Inject + OptionManager options; @Output ComplexWriter writer; @Override public void setup() { - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .build(); + jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() + .resultSetLoader(loader) + .standardOptions(options); } @Override public void eval() { try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); + jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); + org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); + loader.startBatch(); + jsonLoader.readBatch(); + loader.close(); + } catch (Exception e) { throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } @@ -73,24 +79,33 @@ public void eval() { public static class ConvertFromJsonVarchar implements DrillSimpleFunc { @Param VarCharHolder in; - @Inject DrillBuf buffer; - @Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + + @Inject + OptionManager options; + + @Inject + ResultSetLoader loader; @Output ComplexWriter writer; @Override public void setup() { - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .build(); + jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() + .resultSetLoader(loader) + .standardOptions(options); } @Override public void eval() { try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); + jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); + org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); + loader.startBatch(); + jsonLoader.readBatch(); + loader.close(); + } catch (Exception e) { throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } @@ -101,16 +116,23 @@ public void eval() { public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @Param NullableVarBinaryHolder in; - @Inject DrillBuf buffer; - @Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + + @Inject + OptionManager options; + + @Inject + ResultSetLoader loader; @Output ComplexWriter writer; @Override public void setup() { - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .build(); + jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() + .resultSetLoader(loader) + .standardOptions(options); } @Override @@ -124,9 +146,11 @@ public void eval() { } try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); + jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); + org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); + loader.startBatch(); + jsonLoader.readBatch(); + loader.close(); } catch (Exception e) { throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } @@ -137,16 +161,23 @@ public void eval() { public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFunc { @Param NullableVarCharHolder in; - @Inject DrillBuf buffer; - @Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + + @Inject + OptionManager options; + + @Inject + ResultSetLoader loader; @Output ComplexWriter writer; @Override public void setup() { - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .build(); + jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() + .resultSetLoader(loader) + .standardOptions(options); } @Override @@ -160,9 +191,11 @@ public void eval() { } try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); + jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); + org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); + loader.startBatch(); + jsonLoader.readBatch(); + loader.close(); } catch (Exception e) { throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java index e1851cc3a2c..cf6886474e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java @@ -197,6 +197,11 @@ public JsonLoaderBuilder fromStream(InputStream... stream) { return this; } + public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) { + this.streams = Collections.singletonList(DrillBufInputStream.getStream(start, end, buf)); + return this; + } + public JsonLoaderBuilder fromStream(Iterable streams) { this.streams = streams; return this; From 4349e45607ca1e4c84d2a2e87cf5009e07f19f42 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Tue, 31 May 2022 07:58:10 -0400 Subject: [PATCH 02/10] WIP --- .../expr/fn/impl/conv/JsonConvertFrom.java | 114 ++++------------ .../store/json/TestJsonConversionUDF.java | 122 ++++++++++++++++++ .../drill/exec/store/json/TestJsonNanInf.java | 100 +++----------- .../src/test/resources/jsoninput/allTypes.csv | 8 ++ .../src/test/resources/jsoninput/nan_test.csv | 1 + 5 files changed, 172 insertions(+), 173 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java create mode 100644 exec/java-exec/src/test/resources/jsoninput/allTypes.csv create mode 100644 exec/java-exec/src/test/resources/jsoninput/nan_test.csv diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 85ddb23515a..7504db83917 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -23,99 +23,27 @@ import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; import org.apache.drill.exec.expr.annotations.Output; import org.apache.drill.exec.expr.annotations.Param; import org.apache.drill.exec.expr.annotations.Workspace; import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; -import org.apache.drill.exec.expr.holders.NullableVarCharHolder; -import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; public class JsonConvertFrom { - private JsonConvertFrom() { - } - - @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true) - public static class ConvertFromJson implements DrillSimpleFunc { - - @Param VarBinaryHolder in; - @Inject - ResultSetLoader loader; - @Workspace - org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; - - @Inject - OptionManager options; - - @Output ComplexWriter writer; - - @Override - public void setup() { - jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() - .resultSetLoader(loader) - .standardOptions(options); - } - - @Override - public void eval() { - try { - jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); - org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); - loader.startBatch(); - jsonLoader.readBatch(); - loader.close(); - - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } - } - } - - @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true) - public static class ConvertFromJsonVarchar implements DrillSimpleFunc { - - @Param VarCharHolder in; - @Workspace - org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; - - @Inject - OptionManager options; - - @Inject - ResultSetLoader loader; - - @Output ComplexWriter writer; - - @Override - public void setup() { - jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() - .resultSetLoader(loader) - .standardOptions(options); - } - - @Override - public void eval() { - try { - jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); - org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); - loader.startBatch(); - jsonLoader.readBatch(); - loader.close(); - - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } - } - } + private JsonConvertFrom() {} - @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true) + @FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"}, + scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { - @Param NullableVarBinaryHolder in; + @Param + NullableVarBinaryHolder in; @Workspace org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; @@ -126,7 +54,8 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @Inject ResultSetLoader loader; - @Output ComplexWriter writer; + @Output + BaseWriter.ComplexWriter writer; @Override public void setup() { @@ -137,7 +66,7 @@ public void setup() { @Override public void eval() { - if (in.isSet == 0) { + if (in.end == 0) { // Return empty map org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); mapWriter.start(); @@ -157,10 +86,15 @@ public void eval() { } } - @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true) - public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFunc { + @FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"}, + scope = FunctionScope.SIMPLE) + public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { - @Param NullableVarCharHolder in; + @Param + VarCharHolder in; + + @Output + ComplexWriter writer; @Workspace org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; @@ -171,19 +105,19 @@ public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFu @Inject ResultSetLoader loader; - @Output ComplexWriter writer; - @Override public void setup() { - jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() + jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() .resultSetLoader(loader) .standardOptions(options); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map + String jsonString = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(in.start, in.end, in.buffer); + + // If the input is null or empty, return an empty map + if (jsonString.length() == 0) { org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); mapWriter.start(); mapWriter.end(); @@ -191,7 +125,7 @@ public void eval() { } try { - jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); + jsonLoaderBuilder.fromString(jsonString); org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); loader.startBatch(); jsonLoader.readBatch(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java new file mode 100644 index 00000000000..58eef69cb03 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.json; + + +import ch.qos.logback.classic.Level; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; +import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.LogFixture; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class TestJsonConversionUDF extends ClusterTest { + + protected static LogFixture logFixture; + private final static Level CURRENT_LOG_LEVEL = Level.DEBUG; + @BeforeClass + public static void setup() throws Exception { + logFixture = LogFixture.builder() + .toConsole() + .logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL) + .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL) + .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL) + .build(); + + startCluster(ClusterFixture.builder(dirTestWatcher)); + } + + @Test + public void testConvertFromJsonFunctionWithBinaryInput() throws Exception { + client.alterSession(ExecConstants.JSON_READER_NAN_INF_NUMBERS, true); + String sql = "SELECT string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col FROM cp.`jsoninput/nan_test.csv`"; + RowSet results = client.queryBuilder().sql(sql).rowSet(); + assertEquals("Query result must contain 1 row", 1, results.rowCount()); + + results.print(); + } + + @Test + public void testConvertFromJSONWithStringInput() throws Exception { + // String sql = "SELECT *, convert_FromJSON('{\"foo\":\"bar\"}') FROM cp.`jsoninput/allTypes.csv`"; + String sql = "SELECT convert_FromJSON('{\"foo\":\"bar\"}') FROM (VALUES(1))"; + RowSet results = client.queryBuilder().sql(sql).rowSet(); + results.print(); + } + +/* + private void doTestConvertToJsonFunction() throws Exception { + String table = "nan_test.csv"; + File file = new File(dirTestWatcher.getRootDir(), table); + String csv = "col_0, {\"nan_col\":NaN}"; + String query = String.format("select string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col " + + "from dfs.`%s` where columns[0]='col_0'", table); + try { + FileUtils.writeStringToFile(file, csv, Charset.defaultCharset()); + List results = testSqlWithResults(query); + RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); + assertEquals("Query result must contain 1 row", 1, results.size()); + QueryDataBatch batch = results.get(0); + + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + VectorWrapper vw = batchLoader.getValueAccessorById(VarCharVector.class, batchLoader.getValueVectorId(SchemaPath.getCompoundPath("col")).getFieldIds()); + // ensuring that `NaN` token ARE NOT enclosed with double quotes + String resultJson = vw.getValueVector().getAccessor().getObject(0).toString(); + int nanIndex = resultJson.indexOf("NaN"); + assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1)); + assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length())); + batch.release(); + batchLoader.clear(); + } finally { + FileUtils.deleteQuietly(file); + } + } + + @Test + public void testConvertFromJsonFunction() throws Exception { + //runBoth(this::doTestConvertFromJsonFunction); + } + + private void doTestConvertFromJsonFunction() throws Exception { + String table = "nan_test.csv"; + File file = new File(dirTestWatcher.getRootDir(), table); + String csv = "col_0, {\"nan_col\":NaN}"; + try { + FileUtils.writeStringToFile(file, csv); + testBuilder() + .sqlQuery(String.format("select convert_fromJSON(columns[1]) as col from dfs.`%s`", table)) + .unOrdered() + .baselineColumns("col") + .baselineValues(mapOf("nan_col", Double.NaN)) + .go(); + } finally { + FileUtils.deleteQuietly(file); + } + } + */ + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java index e556ec16ea1..91f5e544684 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonNanInf.java @@ -15,33 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.drill.exec.store.json; -import static org.apache.drill.test.TestBuilder.mapOf; import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertEquals; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; import java.io.File; -import java.util.List; +import java.nio.charset.Charset; import org.apache.commons.io.FileUtils; import org.apache.drill.common.exceptions.UserRemoteException; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.physical.impl.join.JoinTestBase; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.apache.drill.exec.vector.VarCharVector; import org.apache.drill.exec.store.json.TestJsonReader.TestWrapper; import org.apache.drill.test.BaseTestQuery; -import org.junit.Ignore; import org.junit.Test; -// TODO: Split or rename: this tests mor than NanInf public class TestJsonNanInf extends BaseTestQuery { public void runBoth(TestWrapper wrapper) throws Exception { @@ -66,7 +58,7 @@ private void doTestNanInfSelect() throws Exception { String json = "{\"nan_col\":NaN, \"inf_col\":Infinity}"; String query = String.format("select * from dfs.`%s`",table); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); testBuilder() .sqlQuery(query) .unOrdered() @@ -79,7 +71,6 @@ private void doTestNanInfSelect() throws Exception { } @Test - @Ignore // see DRILL-6018 public void testExcludePositiveInfinity() throws Exception { runBoth(this::doTestExcludePositiveInfinity); } @@ -91,7 +82,7 @@ private void doTestExcludePositiveInfinity() throws Exception { "{\"nan_col\":5.0, \"inf_col\":5.0}]"; String query = String.format("select inf_col from dfs.`%s` where inf_col <> cast('Infinity' as double)",table); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); testBuilder() .sqlQuery(query) .unOrdered() @@ -104,7 +95,6 @@ private void doTestExcludePositiveInfinity() throws Exception { } @Test - @Ignore // see DRILL-6018 public void testExcludeNegativeInfinity() throws Exception { runBoth(this::doTestExcludeNegativeInfinity); } @@ -116,7 +106,7 @@ private void doTestExcludeNegativeInfinity() throws Exception { "{\"nan_col\":5.0, \"inf_col\":5.0}]"; String query = String.format("select inf_col from dfs.`%s` where inf_col <> cast('-Infinity' as double)",table); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); testBuilder() .sqlQuery(query) .unOrdered() @@ -129,7 +119,6 @@ private void doTestExcludeNegativeInfinity() throws Exception { } @Test - @Ignore // see DRILL-6018 public void testIncludePositiveInfinity() throws Exception { runBoth(this::doTestIncludePositiveInfinity); } @@ -141,7 +130,7 @@ private void doTestIncludePositiveInfinity() throws Exception { "{\"nan_col\":5.0, \"inf_col\":5.0}]"; String query = String.format("select inf_col from dfs.`%s` where inf_col = cast('Infinity' as double)",table); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); testBuilder() .sqlQuery(query) .unOrdered() @@ -166,7 +155,7 @@ private void doTestExcludeNan() throws Exception { "{\"nan_col\":5.0, \"inf_col\":5.0}]"; String query = String.format("select nan_col from dfs.`%s` where cast(nan_col as varchar) <> 'NaN'",table); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); testBuilder() .sqlQuery(query) .unOrdered() @@ -190,7 +179,7 @@ private void doTestIncludeNan() throws Exception { "{\"nan_col\":5.0, \"inf_col\":5.0}]"; String query = String.format("select nan_col from dfs.`%s` where cast(nan_col as varchar) = 'NaN'",table); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); testBuilder() .sqlQuery(query) .unOrdered() @@ -213,7 +202,7 @@ private void doTestNanInfFailure() throws Exception { test("alter session set `%s` = false", ExecConstants.JSON_READER_NAN_INF_NUMBERS); String json = "{\"nan_col\":NaN, \"inf_col\":Infinity}"; try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); test("select * from dfs.`%s`;", table); fail(); } catch (UserRemoteException e) { @@ -235,13 +224,13 @@ private void doTestCreateTableNanInf() throws Exception { String json = "{\"nan_col\":NaN, \"inf_col\":Infinity}"; String newTable = "ctas_test"; try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); test("alter session set `store.format`='json'"); test("create table dfs.`%s` as select * from dfs.`%s`;", newTable, table); // ensuring that `NaN` and `Infinity` tokens ARE NOT enclosed with double quotes File resultFile = new File(new File(file.getParent(), newTable),"0_0_0.json"); - String resultJson = FileUtils.readFileToString(resultFile); + String resultJson = FileUtils.readFileToString(resultFile, Charset.defaultCharset()); int nanIndex = resultJson.indexOf("NaN"); assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1)); assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length())); @@ -254,28 +243,6 @@ private void doTestCreateTableNanInf() throws Exception { } } - @Test - public void testConvertFromJsonFunction() throws Exception { - runBoth(this::doTestConvertFromJsonFunction); - } - - private void doTestConvertFromJsonFunction() throws Exception { - String table = "nan_test.csv"; - File file = new File(dirTestWatcher.getRootDir(), table); - String csv = "col_0, {\"nan_col\":NaN}"; - try { - FileUtils.writeStringToFile(file, csv); - testBuilder() - .sqlQuery(String.format("select convert_fromJSON(columns[1]) as col from dfs.`%s`", table)) - .unOrdered() - .baselineColumns("col") - .baselineValues(mapOf("nan_col", Double.NaN)) - .go(); - } finally { - FileUtils.deleteQuietly(file); - } - } - @Test public void testLargeStringBinary() throws Exception { runBoth(() -> doTestLargeStringBinary()); @@ -292,39 +259,6 @@ private void doTestLargeStringBinary() throws Exception { } @Test - public void testConvertToJsonFunction() throws Exception { - runBoth(() -> doTestConvertToJsonFunction()); - } - - private void doTestConvertToJsonFunction() throws Exception { - String table = "nan_test.csv"; - File file = new File(dirTestWatcher.getRootDir(), table); - String csv = "col_0, {\"nan_col\":NaN}"; - String query = String.format("select string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col " + - "from dfs.`%s` where columns[0]='col_0'", table); - try { - FileUtils.writeStringToFile(file, csv); - List results = testSqlWithResults(query); - RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); - assertEquals("Query result must contain 1 row", 1, results.size()); - QueryDataBatch batch = results.get(0); - - batchLoader.load(batch.getHeader().getDef(), batch.getData()); - VectorWrapper vw = batchLoader.getValueAccessorById(VarCharVector.class, batchLoader.getValueVectorId(SchemaPath.getCompoundPath("col")).getFieldIds()); - // ensuring that `NaN` token ARE NOT enclosed with double quotes - String resultJson = vw.getValueVector().getAccessor().getObject(0).toString(); - int nanIndex = resultJson.indexOf("NaN"); - assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1)); - assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length())); - batch.release(); - batchLoader.clear(); - } finally { - FileUtils.deleteQuietly(file); - } - } - - @Test - @Ignore("DRILL-6018") public void testNanInfLiterals() throws Exception { testBuilder() .sqlQuery(" select sin(cast('NaN' as double)) as sin_col, " + @@ -350,7 +284,7 @@ private void doTestOrderByWithNaN() throws Exception { File file = new File(dirTestWatcher.getRootDir(), table_name); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); test("alter session set `%s` = true", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); testBuilder() .sqlQuery(query) @@ -391,7 +325,7 @@ private void doTestNestedLoopJoinWithNaN() throws Exception { File file = new File(dirTestWatcher.getRootDir(), table_name); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); test("alter session set `%s` = true", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); testBuilder() .sqlQuery(query) @@ -426,7 +360,7 @@ private void doTestHashJoinWithNaN() throws Exception { File file = new File(dirTestWatcher.getRootDir(), table_name); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); test("alter session set `%s` = true", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); testBuilder() .sqlQuery(query) @@ -459,7 +393,7 @@ private void doTestMergeJoinWithNaN() throws Exception { File file = new File(dirTestWatcher.getRootDir(), table_name); try { - FileUtils.writeStringToFile(file, json); + FileUtils.writeStringToFile(file, json, Charset.defaultCharset()); test("alter session set `%s` = true", ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); testBuilder() .sqlQuery(query) @@ -475,11 +409,11 @@ private void doTestMergeJoinWithNaN() throws Exception { } } - private void enableV2Reader(boolean enable) throws Exception { + private void enableV2Reader(boolean enable) { alterSession(ExecConstants.ENABLE_V2_JSON_READER_KEY, enable); } - private void resetV2Reader() throws Exception { + private void resetV2Reader() { resetSessionOption(ExecConstants.ENABLE_V2_JSON_READER_KEY); } } diff --git a/exec/java-exec/src/test/resources/jsoninput/allTypes.csv b/exec/java-exec/src/test/resources/jsoninput/allTypes.csv new file mode 100644 index 00000000000..7f44072bdd6 --- /dev/null +++ b/exec/java-exec/src/test/resources/jsoninput/allTypes.csv @@ -0,0 +1,8 @@ +col1,"{ + bi: 123, + fl: 123.4, + st: ""foo"", + mp: { a: 10, b: ""bar"" }, + ar: [ 10, 20 ], + nu: null +}" diff --git a/exec/java-exec/src/test/resources/jsoninput/nan_test.csv b/exec/java-exec/src/test/resources/jsoninput/nan_test.csv new file mode 100644 index 00000000000..aae95c09099 --- /dev/null +++ b/exec/java-exec/src/test/resources/jsoninput/nan_test.csv @@ -0,0 +1 @@ +col_0, {"nan_col":NaN} From 6e1ba4e09609175b03ee2c43e65c2e5336cfd4ef Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Tue, 7 Jun 2022 10:35:55 -0400 Subject: [PATCH 03/10] Addressed review comments --- .../exec/expr/fn/impl/conv/JsonConvertFrom.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 7504db83917..97224f6155e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -54,7 +54,7 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @Inject ResultSetLoader loader; - @Output + @Output // TODO Remove in future work BaseWriter.ComplexWriter writer; @Override @@ -68,9 +68,6 @@ public void setup() { public void eval() { if (in.end == 0) { // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); return; } @@ -79,7 +76,6 @@ public void eval() { org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); loader.startBatch(); jsonLoader.readBatch(); - loader.close(); } catch (Exception e) { throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } @@ -93,7 +89,7 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { @Param VarCharHolder in; - @Output + @Output // TODO Remove in future work ComplexWriter writer; @Workspace @@ -118,9 +114,6 @@ public void eval() { // If the input is null or empty, return an empty map if (jsonString.length() == 0) { - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); return; } @@ -129,7 +122,6 @@ public void eval() { org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); loader.startBatch(); jsonLoader.readBatch(); - loader.close(); } catch (Exception e) { throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } From 71cf83e27570f889296109459ef71df10beec4e7 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Tue, 7 Jun 2022 12:03:50 -0400 Subject: [PATCH 04/10] Re-added fromString --- .../drill/exec/store/easy/json/loader/JsonLoaderImpl.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java index cf6886474e0..94b720fb63c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java @@ -202,6 +202,11 @@ public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) { return this; } + public JsonLoaderBuilder fromString(String jsonString) { + this.streams = Collections.singletonList(IOUtils.toInputStream(jsonString, Charset.defaultCharset())); + return this; + } + public JsonLoaderBuilder fromStream(Iterable streams) { this.streams = streams; return this; From b298e3236433e2b9fc5306d7b2834f2c525da34c Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Wed, 15 Jun 2022 10:29:28 -0400 Subject: [PATCH 05/10] Updated to reflect DRILL-8248 --- .../expr/fn/impl/conv/JsonConvertFrom.java | 77 ++++++++++++------- .../expr/fn/impl/conv/JsonConverterUtils.java | 69 +++++++++++++++++ .../easy/json/loader/JsonLoaderImpl.java | 10 --- 3 files changed, 120 insertions(+), 36 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 97224f6155e..01164329d3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.expr.annotations.Param; import org.apache.drill.exec.expr.annotations.Workspace; import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; -import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter; @@ -45,23 +45,24 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @Param NullableVarBinaryHolder in; - @Workspace - org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + @Output // TODO Remove in future work + BaseWriter.ComplexWriter writer; @Inject OptionManager options; @Inject - ResultSetLoader loader; + ResultSetLoader rsLoader; - @Output // TODO Remove in future work - BaseWriter.ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.SingleElementIterator stream; + + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() - .resultSetLoader(loader) - .standardOptions(options); + rsLoader.startBatch(); } @Override @@ -71,41 +72,57 @@ public void eval() { return; } + java.io.InputStream inputStream = org.apache.drill.exec.vector.complex.fn.DrillBufInputStream.getStream(in.start, in.end, in.buffer); + try { - jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer); - org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); - loader.startBatch(); - jsonLoader.readBatch(); + stream.setValue(inputStream); + + if (jsonLoader == null) { + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, stream); + } + + org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer(); + rowWriter.start(); + if (jsonLoader.parser().next()) { + rowWriter.save(); + } + inputStream.close(); + } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); + throw org.apache.drill.common.exceptions.UserException.dataReadError(e) + .message("Error while reading JSON. ") + .addContext(e.getMessage()) + .build(); } } } @FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"}, - scope = FunctionScope.SIMPLE) + scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { @Param - VarCharHolder in; + NullableVarCharHolder in; @Output // TODO Remove in future work ComplexWriter writer; @Workspace - org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder; + org.apache.drill.exec.store.easy.json.loader.SingleElementIterator stream; @Inject OptionManager options; @Inject - ResultSetLoader loader; + ResultSetLoader rsLoader; + + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; + @Override public void setup() { - jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder() - .resultSetLoader(loader) - .standardOptions(options); + rsLoader.startBatch(); } @Override @@ -118,12 +135,20 @@ public void eval() { } try { - jsonLoaderBuilder.fromString(jsonString); - org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build(); - loader.startBatch(); - jsonLoader.readBatch(); + stream.setValue(org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertStringToInputStream(jsonString)); + if (jsonLoader == null) { + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, stream); + } + org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer(); + rowWriter.start(); + if (jsonLoader.parser().next()) { + rowWriter.save(); + } } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); + throw org.apache.drill.common.exceptions.UserException.dataReadError(e) + .message("Error while reading JSON. ") + .addContext(e.getMessage()) + .build(); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java new file mode 100644 index 00000000000..3a6a59b5171 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.expr.fn.impl.conv; + + +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; +import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; + +public class JsonConverterUtils { + + private static final Logger logger = LoggerFactory.getLogger(JsonConverterUtils.class); + + public static InputStream convertStringToInputStream(String input) { + try (InputStream stream = IOUtils.toInputStream(input, Charset.defaultCharset())) { + return stream; + } catch (IOException e) { + throw UserException.dataReadError(e) + .message("Unable to read JSON string") + .build(logger); + } + } + + /** + * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs. + * @param rsLoader The {@link ResultSetLoader} used in the UDF + * @param options The {@link OptionManager} used in the UDF. This is used to extract the global JSON options + * @param stream An input stream containing the input JSON data + * @return A {@link JsonLoaderImpl} for use in the UDF. + */ + public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, + OptionManager options, + SingleElementIterator stream) { + // Add JSON configuration from Storage plugin, if present. + JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .standardOptions(options) + .fromStream(() -> stream); + + return (JsonLoaderImpl) jsonLoaderBuilder.build(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java index 94b720fb63c..e1851cc3a2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java @@ -197,16 +197,6 @@ public JsonLoaderBuilder fromStream(InputStream... stream) { return this; } - public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) { - this.streams = Collections.singletonList(DrillBufInputStream.getStream(start, end, buf)); - return this; - } - - public JsonLoaderBuilder fromString(String jsonString) { - this.streams = Collections.singletonList(IOUtils.toInputStream(jsonString, Charset.defaultCharset())); - return this; - } - public JsonLoaderBuilder fromStream(Iterable streams) { this.streams = streams; return this; From a05d61c30f4021c004855fd585f3181651ff00e5 Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Wed, 15 Jun 2022 19:51:36 -0400 Subject: [PATCH 06/10] Fixed unit test --- .../exec/physical/impl/project/TestProjectEmitOutcome.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java index 8580a37d86d..21f32701de0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java @@ -188,7 +188,7 @@ public void testProjectWithComplexWritersAndEmitOutcome_NonEmptyFirstBatch() thr fail(); } catch (UserException e) { // exception is expected because of complex writers case - assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType()); + assertEquals(ErrorType.FUNCTION, e.getErrorType()); } } @@ -225,7 +225,7 @@ public void testProjectWithComplexWritersAndEmitOutcome_EmptyFirstBatch() throws fail(); } catch (UserException e) { // exception is expected because of complex writers case - assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType()); + assertEquals(ErrorType.FUNCTION, e.getErrorType()); } } } From a4af9f630975e2b7db4bf0aa6c51e86e64ef6bae Mon Sep 17 00:00:00 2001 From: Charles Givre Date: Sat, 18 Jun 2022 22:29:57 -0400 Subject: [PATCH 07/10] Removed extra function names --- .../apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 01164329d3b..95260503897 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -38,7 +38,7 @@ public class JsonConvertFrom { private JsonConvertFrom() {} - @FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"}, + @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @@ -97,7 +97,7 @@ public void eval() { } } - @FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"}, + @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { From c49a872c808b7c6eede50a8b485d00846436ece8 Mon Sep 17 00:00:00 2001 From: James Turton Date: Tue, 5 Jul 2022 15:27:32 +0200 Subject: [PATCH 08/10] Update expected error type in TestProjectEmitOutcome. --- .../exec/physical/impl/project/TestProjectEmitOutcome.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java index 21f32701de0..8580a37d86d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java @@ -188,7 +188,7 @@ public void testProjectWithComplexWritersAndEmitOutcome_NonEmptyFirstBatch() thr fail(); } catch (UserException e) { // exception is expected because of complex writers case - assertEquals(ErrorType.FUNCTION, e.getErrorType()); + assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType()); } } @@ -225,7 +225,7 @@ public void testProjectWithComplexWritersAndEmitOutcome_EmptyFirstBatch() throws fail(); } catch (UserException e) { // exception is expected because of complex writers case - assertEquals(ErrorType.FUNCTION, e.getErrorType()); + assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType()); } } } From 25c906927878ad2d019c50b6b2a4357d2a624cb8 Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 6 Jul 2022 11:10:09 +0200 Subject: [PATCH 09/10] WIP. --- .../expr/fn/impl/conv/JsonConvertFrom.java | 29 ++++++++++--------- .../expr/fn/impl/conv/JsonConverterUtils.java | 6 ++-- .../store/json/TestJsonConversionUDF.java | 9 +++--- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 95260503897..425fec56a4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -33,7 +33,7 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; - +@SuppressWarnings("unused") public class JsonConvertFrom { private JsonConvertFrom() {} @@ -55,30 +55,31 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.store.easy.json.loader.SingleElementIterator stream; + org.apache.drill.exec.store.easy.json.loader.SingleElementIterator streamIter; @Workspace org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { + streamIter = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>(); rsLoader.startBatch(); } @Override public void eval() { - if (in.end == 0) { - // Return empty map + // If the input is null or empty, return an empty map + if (in.isSet == 0 || in.start == in.end) { return; } java.io.InputStream inputStream = org.apache.drill.exec.vector.complex.fn.DrillBufInputStream.getStream(in.start, in.end, in.buffer); try { - stream.setValue(inputStream); + streamIter.setValue(inputStream); if (jsonLoader == null) { - jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, stream); + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, streamIter); } org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer(); @@ -86,7 +87,7 @@ public void eval() { if (jsonLoader.parser().next()) { rowWriter.save(); } - inputStream.close(); + //inputStream.close(); } catch (Exception e) { throw org.apache.drill.common.exceptions.UserException.dataReadError(e) @@ -108,7 +109,7 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { ComplexWriter writer; @Workspace - org.apache.drill.exec.store.easy.json.loader.SingleElementIterator stream; + org.apache.drill.exec.store.easy.json.loader.SingleElementIterator streamIter; @Inject OptionManager options; @@ -119,25 +120,25 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { @Workspace org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; - @Override public void setup() { + streamIter = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>(); rsLoader.startBatch(); } @Override public void eval() { - String jsonString = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(in.start, in.end, in.buffer); - // If the input is null or empty, return an empty map - if (jsonString.length() == 0) { + if (in.isSet == 0 || in.start == in.end) { return; } + java.io.InputStream inputStream = org.apache.drill.exec.vector.complex.fn.DrillBufInputStream.getStream(in.start, in.end, in.buffer); + try { - stream.setValue(org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertStringToInputStream(jsonString)); + streamIter.setValue(inputStream); if (jsonLoader == null) { - jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, stream); + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.createJsonLoader(rsLoader, options, streamIter); } org.apache.drill.exec.physical.resultSet.RowSetLoader rowWriter = rsLoader.writer(); rowWriter.start(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java index 3a6a59b5171..9c9c55f8992 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java @@ -19,8 +19,6 @@ package org.apache.drill.exec.expr.fn.impl.conv; -import org.apache.commons.io.IOUtils; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; @@ -29,14 +27,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.InputStream; -import java.nio.charset.Charset; public class JsonConverterUtils { private static final Logger logger = LoggerFactory.getLogger(JsonConverterUtils.class); + /* public static InputStream convertStringToInputStream(String input) { try (InputStream stream = IOUtils.toInputStream(input, Charset.defaultCharset())) { return stream; @@ -46,6 +43,7 @@ public static InputStream convertStringToInputStream(String input) { .build(logger); } } + */ /** * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs. diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java index 58eef69cb03..7ece2c9888d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java @@ -51,21 +51,20 @@ public static void setup() throws Exception { } @Test - public void testConvertFromJsonFunctionWithBinaryInput() throws Exception { + public void testConvertFromJsonVarBinary() throws Exception { client.alterSession(ExecConstants.JSON_READER_NAN_INF_NUMBERS, true); String sql = "SELECT string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col FROM cp.`jsoninput/nan_test.csv`"; RowSet results = client.queryBuilder().sql(sql).rowSet(); assertEquals("Query result must contain 1 row", 1, results.rowCount()); - - results.print(); + results.clear(); } @Test - public void testConvertFromJSONWithStringInput() throws Exception { + public void testConvertFromJsonVarChar() throws Exception { // String sql = "SELECT *, convert_FromJSON('{\"foo\":\"bar\"}') FROM cp.`jsoninput/allTypes.csv`"; String sql = "SELECT convert_FromJSON('{\"foo\":\"bar\"}') FROM (VALUES(1))"; RowSet results = client.queryBuilder().sql(sql).rowSet(); - results.print(); + results.clear(); } /* From 6b65419c9b0ccebc6ef2252759b521dbfeb2c34d Mon Sep 17 00:00:00 2001 From: cgivre Date: Sun, 7 Jan 2024 15:47:38 -0500 Subject: [PATCH 10/10] WIP --- .../expr/fn/impl/conv/JsonConvertFrom.java | 12 ++-- .../expr/fn/impl/conv/JsonConverterUtils.java | 22 +----- .../store/json/TestJsonConversionUDF.java | 69 ++++++------------- .../test/resources/jsoninput/multirow.csvh | 3 + 4 files changed, 31 insertions(+), 75 deletions(-) create mode 100644 exec/java-exec/src/test/resources/jsoninput/multirow.csvh diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 425fec56a4f..91c06530d4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -18,8 +18,6 @@ package org.apache.drill.exec.expr.fn.impl.conv; -import javax.inject.Inject; - import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; @@ -33,6 +31,8 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import javax.inject.Inject; @SuppressWarnings("unused") public class JsonConvertFrom { @@ -55,14 +55,14 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.store.easy.json.loader.SingleElementIterator streamIter; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; @Workspace org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - streamIter = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); rsLoader.startBatch(); } @@ -109,7 +109,7 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { ComplexWriter writer; @Workspace - org.apache.drill.exec.store.easy.json.loader.SingleElementIterator streamIter; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; @Inject OptionManager options; @@ -122,7 +122,7 @@ public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc { @Override public void setup() { - streamIter = new org.apache.drill.exec.store.easy.json.loader.SingleElementIterator<>(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); rsLoader.startBatch(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java index 9c9c55f8992..be14dc1efa3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java @@ -21,30 +21,13 @@ import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; -import org.apache.drill.exec.store.easy.json.loader.SingleElementIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.InputStream; public class JsonConverterUtils { - private static final Logger logger = LoggerFactory.getLogger(JsonConverterUtils.class); - - /* - public static InputStream convertStringToInputStream(String input) { - try (InputStream stream = IOUtils.toInputStream(input, Charset.defaultCharset())) { - return stream; - } catch (IOException e) { - throw UserException.dataReadError(e) - .message("Unable to read JSON string") - .build(logger); - } - } - */ - /** * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs. * @param rsLoader The {@link ResultSetLoader} used in the UDF @@ -54,7 +37,7 @@ public static InputStream convertStringToInputStream(String input) { */ public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, OptionManager options, - SingleElementIterator stream) { + ClosingStreamIterator stream) { // Add JSON configuration from Storage plugin, if present. JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() .resultSetLoader(rsLoader) @@ -63,5 +46,4 @@ public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, return (JsonLoaderImpl) jsonLoaderBuilder.build(); } - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java index 7ece2c9888d..3b3620da1b3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonConversionUDF.java @@ -20,14 +20,19 @@ import ch.qos.logback.classic.Level; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator; import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterTest; import org.apache.drill.test.LogFixture; +import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.BeforeClass; import org.junit.Test; @@ -61,61 +66,27 @@ public void testConvertFromJsonVarBinary() throws Exception { @Test public void testConvertFromJsonVarChar() throws Exception { - // String sql = "SELECT *, convert_FromJSON('{\"foo\":\"bar\"}') FROM cp.`jsoninput/allTypes.csv`"; - String sql = "SELECT convert_FromJSON('{\"foo\":\"bar\"}') FROM (VALUES(1))"; + String sql = "SELECT json_data['foo'] AS foo, json_data['num'] AS num FROM " + + "(SELECT convert_FromJSON('{\"foo\":\"bar\", \"num\":10}') as json_data FROM (VALUES(1)))"; RowSet results = client.queryBuilder().sql(sql).rowSet(); - results.clear(); - } -/* - private void doTestConvertToJsonFunction() throws Exception { - String table = "nan_test.csv"; - File file = new File(dirTestWatcher.getRootDir(), table); - String csv = "col_0, {\"nan_col\":NaN}"; - String query = String.format("select string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col " + - "from dfs.`%s` where columns[0]='col_0'", table); - try { - FileUtils.writeStringToFile(file, csv, Charset.defaultCharset()); - List results = testSqlWithResults(query); - RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); - assertEquals("Query result must contain 1 row", 1, results.size()); - QueryDataBatch batch = results.get(0); + TupleMetadata expectedSchema = new SchemaBuilder() + .addNullable("foo", MinorType.VARCHAR) + .addNullable("num", MinorType.BIGINT) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("bar", 10L) + .build(); - batchLoader.load(batch.getHeader().getDef(), batch.getData()); - VectorWrapper vw = batchLoader.getValueAccessorById(VarCharVector.class, batchLoader.getValueVectorId(SchemaPath.getCompoundPath("col")).getFieldIds()); - // ensuring that `NaN` token ARE NOT enclosed with double quotes - String resultJson = vw.getValueVector().getAccessor().getObject(0).toString(); - int nanIndex = resultJson.indexOf("NaN"); - assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1)); - assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length())); - batch.release(); - batchLoader.clear(); - } finally { - FileUtils.deleteQuietly(file); - } + new RowSetComparison(expected).verifyAndClearAll(results); } @Test - public void testConvertFromJsonFunction() throws Exception { - //runBoth(this::doTestConvertFromJsonFunction); - } + public void testMultipleRows() throws Exception { + String sql = "SELECT string_binary(convert_toJSON(`name`)) FROM cp.`jsoninput/multirow.csvh`"; - private void doTestConvertFromJsonFunction() throws Exception { - String table = "nan_test.csv"; - File file = new File(dirTestWatcher.getRootDir(), table); - String csv = "col_0, {\"nan_col\":NaN}"; - try { - FileUtils.writeStringToFile(file, csv); - testBuilder() - .sqlQuery(String.format("select convert_fromJSON(columns[1]) as col from dfs.`%s`", table)) - .unOrdered() - .baselineColumns("col") - .baselineValues(mapOf("nan_col", Double.NaN)) - .go(); - } finally { - FileUtils.deleteQuietly(file); - } + RowSet results = client.queryBuilder().sql(sql).rowSet(); + results.print(); } - */ - } diff --git a/exec/java-exec/src/test/resources/jsoninput/multirow.csvh b/exec/java-exec/src/test/resources/jsoninput/multirow.csvh new file mode 100644 index 00000000000..b207f1462e4 --- /dev/null +++ b/exec/java-exec/src/test/resources/jsoninput/multirow.csvh @@ -0,0 +1,3 @@ +num, name +1, "bob" +4,"steve"