From e2abf097299bc702326af9d05f03be3f850018ba Mon Sep 17 00:00:00 2001 From: Jan Kalkan Date: Fri, 16 Mar 2018 11:41:07 +0100 Subject: [PATCH 1/3] Some classes are not supported anymore in Apache Flink v1.4 and were exchanged: import org.apache.flink.hadoop.shaded.com.google.common.base.Strings; import org.apache.flink.hadoop.shaded.com.google.common.collect.Lists; --- .../apache/flink/api/java/io/neo4j/Neo4jFormatBase.java | 5 ++--- .../apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java index be03dac..561052f 100644 --- a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java +++ b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java @@ -20,7 +20,6 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; -import org.apache.flink.hadoop.shaded.com.google.common.base.Strings; import java.io.Serializable; @@ -199,10 +198,10 @@ public T setReadTimeout(int readTimeout) { * Validates mandatory arguments. */ protected void validate() { - if (Strings.isNullOrEmpty(restURI)) { + if (restURI.isEmpty() || restURI == null) { throw new IllegalArgumentException("No Rest URI was supplied."); } - if (Strings.isNullOrEmpty(query)) { + if (query.isEmpty() || query == null) { throw new IllegalArgumentException("No Cypher statement was supplied."); } } diff --git a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java index cb7e367..8c283c2 100644 --- a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java +++ b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java @@ -23,14 +23,13 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoop.shaded.com.google.common.base.Strings; -import org.apache.flink.hadoop.shaded.com.google.common.collect.Lists; import org.codehaus.jackson.node.JsonNodeFactory; import org.codehaus.jackson.node.ObjectNode; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -242,7 +241,7 @@ public static Builder buildNeo4jOutputFormat() { */ public static class Builder extends Neo4jFormatBase.Builder { - private List elementKeys = Lists.newArrayList(); + private List elementKeys = new ArrayList<>(); private int batchSize; @@ -276,7 +275,7 @@ public Builder addParameterKey(String key) { * @return builder */ public Builder addParameterKey(int position, String key) { - checkArgument(!Strings.isNullOrEmpty(key), "Key must not be null or empty."); + checkArgument(!(key.isEmpty() || key == null), "Key must not be null or empty."); elementKeys.add(position, key); return getThis(); } From 1679d9497d3c93809963c2daa9c1d1028b0d0126 Mon Sep 17 00:00:00 2001 From: Jan Kalkan Date: Fri, 20 Apr 2018 15:40:10 +0200 Subject: [PATCH 2/3] - fixes null checks - POM updated to Flink v.1.4.2 --- pom.xml | 2 +- .../apache/flink/api/java/io/neo4j/Neo4jFormatBase.java | 4 ++-- .../apache/flink/api/java/io/neo4j/Neo4jInputFormat.java | 7 ++++++- .../apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index cddb434..d10f5cf 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ 3.0 2.19 - 1.1.3 + 1.4.2 1.19 4.12 2.3.7 diff --git a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java index 561052f..12ffbda 100644 --- a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java +++ b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jFormatBase.java @@ -198,10 +198,10 @@ public T setReadTimeout(int readTimeout) { * Validates mandatory arguments. */ protected void validate() { - if (restURI.isEmpty() || restURI == null) { + if (restURI == null || restURI.isEmpty()) { throw new IllegalArgumentException("No Rest URI was supplied."); } - if (query.isEmpty() || query == null) { + if (query == null || query.isEmpty()) { throw new IllegalArgumentException("No Cypher statement was supplied."); } } diff --git a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jInputFormat.java b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jInputFormat.java index 400ea7e..ebb904a 100644 --- a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jInputFormat.java +++ b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jInputFormat.java @@ -115,7 +115,12 @@ private void readFields(OUT reuse, JsonNode fieldValues) throws IOException { for (int i = 0; i < fieldValues.size(); i++) { JsonNode fieldValue = fieldValues.get(i); if (fieldValue.isNull()) { - reuse.setField(NullValue.getInstance(), i); + if (reuse.getField(i).getClass().equals(String.class)) { + reuse.setField(fieldValue.getTextValue(), i); + } else { + reuse.setField(NullValue.getInstance(), i); + } + reuse.setField(NullValue.getInstance(), i); } else if (fieldValue.isBoolean()) { reuse.setField(fieldValue.getBooleanValue(), i); } else if (fieldValue.isInt()) { diff --git a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java index 8c283c2..ea906fb 100644 --- a/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java +++ b/src/main/java/org/apache/flink/api/java/io/neo4j/Neo4jOutputFormat.java @@ -275,7 +275,7 @@ public Builder addParameterKey(String key) { * @return builder */ public Builder addParameterKey(int position, String key) { - checkArgument(!(key.isEmpty() || key == null), "Key must not be null or empty."); + checkArgument(!(key == null || key.isEmpty()), "Key must not be null or empty."); elementKeys.add(position, key); return getThis(); } From cb5a53193a9ad664879a7d59f2a9f0770596a061 Mon Sep 17 00:00:00 2001 From: Jan Kalkan Date: Fri, 20 Apr 2018 15:50:30 +0200 Subject: [PATCH 3/3] - java version added in POM --- pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index d10f5cf..f96ae9d 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,10 @@ 1.19 4.12 2.3.7 + 1.8 + + ${java.version} + ${java.version} log4j-test.properties