Skip to content

Commit b3e5809

Browse files
committed
Allow different rdf-syntax in flowfile
1 parent 65ac30a commit b3e5809

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

nifi-sparql-integrate-processors/src/main/java/org/aksw/sparql_integrate/processors/sparql_integrate/SparqlIntegrateProcessor.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.jena.rdfconnection.RDFConnectionFactory;
5454
import org.apache.jena.riot.Lang;
5555
import org.apache.jena.riot.RDFDataMgr;
56+
import org.apache.jena.riot.RDFLanguages;
5657
import org.apache.jena.riot.out.SinkQuadOutput;
5758
import org.apache.jena.riot.out.SinkTripleOutput;
5859
import org.apache.jena.shared.PrefixMapping;
@@ -100,6 +101,10 @@ public interface FLOW_FILE_CONTENTS {
100101
public static final AllowableValue NON_RDF_DATA = new AllowableValue(FLOW_FILE_CONTENTS.NON_RDF_DATA);
101102
public static final AllowableValue EMPTY = new AllowableValue(FLOW_FILE_CONTENTS.EMPTY);
102103

104+
public static final AllowableValue TURTLE = new AllowableValue(Lang.TURTLE.getLabel());
105+
public static final AllowableValue NT = new AllowableValue(Lang.NT.getLabel());
106+
public static final AllowableValue JSONLD = new AllowableValue(Lang.JSONLD.getLabel());
107+
103108
public static final PropertyDescriptor BASE_URI = new PropertyDescriptor.Builder()
104109
.name("BASE_URI")
105110
.displayName("Base URI")
@@ -116,9 +121,18 @@ public interface FLOW_FILE_CONTENTS {
116121
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
117122
.build();
118123

124+
public static final PropertyDescriptor RDF_DATA_INPUT_SYNTAX = new PropertyDescriptor.Builder()
125+
.name("RDF_DATA_INPUT_SYNTAX")
126+
.displayName("RDF Data Input Syntax")
127+
.description("RDF-Syntax of the FlowFile content, only used when Content of flow file is set to rdf-data")
128+
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
129+
.allowableValues(TURTLE, NT, JSONLD)
130+
.defaultValue(TURTLE.getValue())
131+
.build();
132+
119133
public static final PropertyDescriptor CONTENT_FLOW_FILE = new PropertyDescriptor.Builder()
120-
.name("Content of FlowFile")
121-
.displayName("Content of the processors input FlowFile")
134+
.name("CONTENT_FLOW_FILE")
135+
.displayName("Content of FlowFile")
122136
.description("Content of the processors input FlowFile")
123137
.required(true)
124138
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@@ -138,6 +152,7 @@ protected void init(final ProcessorInitializationContext context) {
138152
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
139153
descriptors.add(BASE_URI);
140154
descriptors.add(CONTENT_FLOW_FILE);
155+
descriptors.add(RDF_DATA_INPUT_SYNTAX);
141156
descriptors.add(SPARQL_QUERY_PROPERTY);
142157
this.descriptors = Collections.unmodifiableList(descriptors);
143158

@@ -165,6 +180,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
165180

166181
final String contentFlowFile = context.getProperty(CONTENT_FLOW_FILE).getValue();
167182
final String baseUri = context.getProperty(BASE_URI).getValue();
183+
final String rdfDataInputSyntax = context.getProperty(RDF_DATA_INPUT_SYNTAX).getValue();
168184
final AtomicReference<Stream<SparqlStmt>> stmts = new AtomicReference<>();
169185

170186
FlowFile flowFile = session.get();
@@ -193,7 +209,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
193209
session.read(flowFile, new InputStreamCallback() {
194210
@Override
195211
public void process(InputStream in) throws IOException {
196-
RDFDataMgr.read(dataset, in, baseUri, Lang.TURTLE);
212+
RDFDataMgr.read(dataset, in, baseUri, RDFLanguages.nameToLang(rdfDataInputSyntax));
197213
}
198214
});
199215
break;

0 commit comments

Comments
 (0)