Skip to content

Commit 54a87d6

Browse files
committed
Add VFS capability for file functions
1 parent c2d3273 commit 54a87d6

19 files changed

+1649
-1391
lines changed

component/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@
8989
<groupId>org.apache.commons</groupId>
9090
<artifactId>commons-compress</artifactId>
9191
</dependency>
92+
<dependency>
93+
<groupId>io.siddhi.extension.execution.list</groupId>
94+
<artifactId>siddhi-execution-list</artifactId>
95+
</dependency>
9296
</dependencies>
9397
<profiles>
9498
<profile>

component/src/main/java/io/siddhi/extension/execution/file/FileArchiveExtension.java

Lines changed: 86 additions & 144 deletions
Large diffs are not rendered by default.

component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java

Lines changed: 106 additions & 130 deletions
Large diffs are not rendered by default.

component/src/main/java/io/siddhi/extension/execution/file/FileCreateExtension.java

Lines changed: 31 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,23 @@
2020
import io.siddhi.annotation.Example;
2121
import io.siddhi.annotation.Extension;
2222
import io.siddhi.annotation.Parameter;
23-
import io.siddhi.annotation.ReturnAttribute;
2423
import io.siddhi.annotation.util.DataType;
2524
import io.siddhi.core.config.SiddhiQueryContext;
26-
import io.siddhi.core.event.ComplexEventChunk;
27-
import io.siddhi.core.event.stream.MetaStreamEvent;
28-
import io.siddhi.core.event.stream.StreamEvent;
29-
import io.siddhi.core.event.stream.StreamEventCloner;
30-
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
31-
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
3225
import io.siddhi.core.exception.SiddhiAppRuntimeException;
3326
import io.siddhi.core.executor.ExpressionExecutor;
3427
import io.siddhi.core.query.processor.ProcessingMode;
35-
import io.siddhi.core.query.processor.Processor;
36-
import io.siddhi.core.query.processor.stream.StreamProcessor;
28+
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
3729
import io.siddhi.core.util.config.ConfigReader;
38-
import io.siddhi.core.util.snapshot.state.State;
3930
import io.siddhi.core.util.snapshot.state.StateFactory;
40-
import io.siddhi.extension.io.file.util.Constants;
41-
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
31+
import io.siddhi.extension.util.Utils;
4232
import io.siddhi.query.api.definition.AbstractDefinition;
4333
import io.siddhi.query.api.definition.Attribute;
44-
import io.siddhi.query.api.exception.SiddhiAppValidationException;
34+
import org.apache.commons.vfs2.FileObject;
35+
import org.apache.commons.vfs2.FileSystemException;
4536
import org.apache.log4j.Logger;
46-
import org.wso2.carbon.messaging.BinaryCarbonMessage;
47-
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
48-
import org.wso2.transport.file.connector.sender.VFSClientConnector;
4937

50-
import java.nio.ByteBuffer;
51-
import java.nio.charset.StandardCharsets;
5238
import java.util.ArrayList;
53-
import java.util.HashMap;
5439
import java.util.List;
55-
import java.util.Map;
56-
57-
import static io.siddhi.extension.io.file.util.Constants.WAIT_TILL_DONE;
58-
import static io.siddhi.extension.util.Constant.CREATE;
59-
import static io.siddhi.extension.util.Constant.CREATE_FOLDER;
6040

6141
/**
6242
* This extension can be used to create a file or a folder.
@@ -77,13 +57,6 @@
7757
type = DataType.STRING
7858
)
7959
},
80-
returnAttributes = {
81-
@ReturnAttribute(
82-
name = "isSuccess",
83-
description = "Status of the file or the directory creation.",
84-
type = DataType.BOOL
85-
)
86-
},
8760
examples = {
8861
@Example(
8962
syntax = "from CreateFileStream#file:create('/User/wso2/source/test.txt', false)",
@@ -95,89 +68,48 @@
9568
)
9669
}
9770
)
98-
public class FileCreateExtension extends StreamProcessor<State> {
71+
public class FileCreateExtension extends StreamFunctionProcessor {
9972
private static final Logger log = Logger.getLogger(FileCreateExtension.class);
10073

10174
@Override
102-
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
103-
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
104-
State state) {
105-
while (streamEventChunk.hasNext()) {
106-
StreamEvent streamEvent = streamEventChunk.next();
107-
String fileSourcePath = (String) attributeExpressionExecutors[0].execute(streamEvent);
108-
boolean isDirectory = (Boolean) attributeExpressionExecutors[1].execute(streamEvent);
109-
VFSClientConnector vfsClientConnector = new VFSClientConnector();
110-
VFSClientConnectorCallback vfsClientConnectorCallback = new VFSClientConnectorCallback();
111-
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
112-
fileSourcePath.getBytes(StandardCharsets.UTF_8)), true);
113-
Map<String, String> properties = new HashMap<>();
114-
properties.put(Constants.URI, fileSourcePath);
115-
properties.put(Constants.ACTION, CREATE);
116-
if (isDirectory) {
117-
properties.put(CREATE_FOLDER, "true");
118-
} else {
119-
properties.put(CREATE_FOLDER, "false");
120-
}
121-
try {
122-
vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, properties);
123-
vfsClientConnectorCallback.waitTillDone(WAIT_TILL_DONE, fileSourcePath);
124-
} catch (ClientConnectorException e) {
125-
throw new SiddhiAppRuntimeException("Failure occurred in vfs-client while creating the file " +
126-
fileSourcePath, e);
127-
} catch (InterruptedException e) {
128-
throw new SiddhiAppRuntimeException("Failed to get callback from vfs-client for file " +
129-
fileSourcePath, e);
130-
}
131-
}
132-
133-
}
134-
135-
@Override
136-
protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition,
137-
ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
138-
StreamEventClonerHolder streamEventClonerHolder,
139-
boolean outputExpectsExpiredEvents, boolean findToBeExecuted,
140-
SiddhiQueryContext siddhiQueryContext) {
141-
if (attributeExpressionExecutors.length == 2) {
142-
if (attributeExpressionExecutors[0] == null) {
143-
throw new SiddhiAppValidationException("Invalid input given to uri (first argument) " +
144-
"file:create() function. Argument cannot be null");
145-
}
146-
Attribute.Type firstAttributeType = attributeExpressionExecutors[0].getReturnType();
147-
if (!(firstAttributeType == Attribute.Type.STRING)) {
148-
throw new SiddhiAppValidationException("Invalid parameter type found for uri " +
149-
"(first argument) of file:create() function, required " + Attribute.Type.STRING +
150-
" but found " + firstAttributeType.toString());
151-
}
152-
if (attributeExpressionExecutors[1] == null) {
153-
throw new SiddhiAppValidationException("Invalid input given to is.directory (second argument) " +
154-
"file:create() function. Argument cannot be null");
155-
}
156-
Attribute.Type secondAttributeType = attributeExpressionExecutors[1].getReturnType();
157-
if (!(secondAttributeType == Attribute.Type.BOOL)) {
158-
throw new SiddhiAppValidationException("Invalid parameter type found for is.directory " +
159-
"(second argument) of file:create() function, required " + Attribute.Type.BOOL +
160-
" but found " + firstAttributeType.toString());
161-
}
162-
} else {
163-
throw new SiddhiAppValidationException("Invalid no of arguments passed to file:archive() function, "
164-
+ "required 2, but found " + attributeExpressionExecutors.length);
165-
}
75+
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
76+
ConfigReader configReader, boolean outputExpectsExpiredEvents,
77+
SiddhiQueryContext siddhiQueryContext) {
16678
return null;
16779
}
16880

16981
@Override
17082
public List<Attribute> getReturnAttributes() {
171-
List<Attribute> attributes = new ArrayList<>();
172-
attributes.add(new Attribute("isSuccess", Attribute.Type.BOOL));
173-
return attributes;
83+
return new ArrayList<>();
17484
}
17585

17686
@Override
17787
public ProcessingMode getProcessingMode() {
17888
return ProcessingMode.BATCH;
17989
}
18090

91+
@Override
92+
protected Object[] process(Object[] data) {
93+
String fileSourcePath = (String) data[0];
94+
boolean isDirectory = (Boolean) data[1];
95+
FileObject rootFileObject = Utils.getFileObject(fileSourcePath);
96+
try {
97+
if (isDirectory) {
98+
rootFileObject.createFolder();
99+
} else {
100+
rootFileObject.createFile();
101+
}
102+
} catch (FileSystemException e) {
103+
throw new SiddhiAppRuntimeException("Failure occurred when creating the file " + fileSourcePath, e);
104+
}
105+
return new Object[0];
106+
}
107+
108+
@Override
109+
protected Object[] process(Object data) {
110+
return new Object[0];
111+
}
112+
181113
@Override
182114
public void start() {
183115

component/src/main/java/io/siddhi/extension/execution/file/FileDeleteExtension.java

Lines changed: 27 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -20,42 +20,24 @@
2020
import io.siddhi.annotation.Example;
2121
import io.siddhi.annotation.Extension;
2222
import io.siddhi.annotation.Parameter;
23-
import io.siddhi.annotation.ReturnAttribute;
2423
import io.siddhi.annotation.util.DataType;
2524
import io.siddhi.core.config.SiddhiQueryContext;
26-
import io.siddhi.core.event.ComplexEventChunk;
27-
import io.siddhi.core.event.stream.MetaStreamEvent;
28-
import io.siddhi.core.event.stream.StreamEvent;
29-
import io.siddhi.core.event.stream.StreamEventCloner;
30-
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
31-
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
3225
import io.siddhi.core.exception.SiddhiAppRuntimeException;
3326
import io.siddhi.core.executor.ExpressionExecutor;
3427
import io.siddhi.core.query.processor.ProcessingMode;
35-
import io.siddhi.core.query.processor.Processor;
36-
import io.siddhi.core.query.processor.stream.StreamProcessor;
28+
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
3729
import io.siddhi.core.util.config.ConfigReader;
38-
import io.siddhi.core.util.snapshot.state.State;
3930
import io.siddhi.core.util.snapshot.state.StateFactory;
40-
import io.siddhi.extension.io.file.util.Constants;
41-
import io.siddhi.extension.io.file.util.VFSClientConnectorCallback;
31+
import io.siddhi.extension.util.Utils;
4232
import io.siddhi.query.api.definition.AbstractDefinition;
4333
import io.siddhi.query.api.definition.Attribute;
44-
import io.siddhi.query.api.exception.SiddhiAppValidationException;
34+
import org.apache.commons.vfs2.FileObject;
35+
import org.apache.commons.vfs2.FileSystemException;
36+
import org.apache.commons.vfs2.Selectors;
4537
import org.apache.log4j.Logger;
46-
import org.wso2.carbon.messaging.BinaryCarbonMessage;
47-
import org.wso2.carbon.messaging.exceptions.ClientConnectorException;
48-
import org.wso2.transport.file.connector.sender.VFSClientConnector;
4938

50-
import java.nio.ByteBuffer;
51-
import java.nio.charset.StandardCharsets;
5239
import java.util.ArrayList;
53-
import java.util.HashMap;
5440
import java.util.List;
55-
import java.util.Map;
56-
57-
import static io.siddhi.extension.io.file.util.Constants.WAIT_TILL_DONE;
58-
import static io.siddhi.query.api.definition.Attribute.Type.BOOL;
5941

6042
/**
6143
* This extension can be used to delete a file or a folder.
@@ -71,13 +53,6 @@
7153
type = DataType.STRING
7254
)
7355
},
74-
returnAttributes = {
75-
@ReturnAttribute(
76-
name = "isSuccess",
77-
description = "Success of the file deletion.",
78-
type = DataType.BOOL
79-
)
80-
},
8156
examples = {
8257
@Example(
8358
syntax = "from DeleteFileStream#file:delete('/User/wso2/source/test.txt')",
@@ -89,72 +64,43 @@
8964
)
9065
}
9166
)
92-
public class FileDeleteExtension extends StreamProcessor<State> {
67+
public class FileDeleteExtension extends StreamFunctionProcessor {
9368
private static final Logger log = Logger.getLogger(FileDeleteExtension.class);
9469

9570
@Override
96-
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor,
97-
StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater,
98-
State state) {
99-
while (streamEventChunk.hasNext()) {
100-
StreamEvent streamEvent = streamEventChunk.next();
101-
String fileDeletePathUri = (String) attributeExpressionExecutors[0].execute(streamEvent);
102-
VFSClientConnector vfsClientConnector = new VFSClientConnector();
103-
VFSClientConnectorCallback vfsClientConnectorCallback = new VFSClientConnectorCallback();
104-
BinaryCarbonMessage carbonMessage = new BinaryCarbonMessage(ByteBuffer.wrap(
105-
fileDeletePathUri.getBytes(StandardCharsets.UTF_8)), true);
106-
Map<String, String> properties = new HashMap<>();
107-
properties.put(Constants.URI, fileDeletePathUri);
108-
properties.put(Constants.ACTION, Constants.DELETE);
109-
try {
110-
vfsClientConnector.send(carbonMessage, vfsClientConnectorCallback, properties);
111-
vfsClientConnectorCallback.waitTillDone(WAIT_TILL_DONE, fileDeletePathUri);
112-
} catch (ClientConnectorException e) {
113-
throw new SiddhiAppRuntimeException("Failure occurred in vfs-client while deleting the file " +
114-
fileDeletePathUri, e);
115-
} catch (InterruptedException e) {
116-
throw new SiddhiAppRuntimeException("Failed to get callback from vfs-client for file " +
117-
fileDeletePathUri, e);
118-
}
119-
}
120-
}
121-
122-
@Override
123-
protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition,
124-
ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader,
125-
StreamEventClonerHolder streamEventClonerHolder,
126-
boolean outputExpectsExpiredEvents, boolean findToBeExecuted,
127-
SiddhiQueryContext siddhiQueryContext) {
128-
if (attributeExpressionExecutors.length == 1) {
129-
if (attributeExpressionExecutors[0] == null) {
130-
throw new SiddhiAppValidationException("Invalid input given to uri (first argument) " +
131-
"file:delete() function. Argument cannot be null");
132-
}
133-
Attribute.Type firstAttributeType = attributeExpressionExecutors[0].getReturnType();
134-
if (!(firstAttributeType == Attribute.Type.STRING)) {
135-
throw new SiddhiAppValidationException("Invalid parameter type found for uri " +
136-
"(first argument) of file:delete() function, required " + Attribute.Type.STRING +
137-
" but found " + firstAttributeType.toString());
138-
}
139-
} else {
140-
throw new SiddhiAppValidationException("Invalid no of arguments passed to file:copy() function, "
141-
+ "required 1, but found " + attributeExpressionExecutors.length);
142-
}
71+
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
72+
ConfigReader configReader, boolean outputExpectsExpiredEvents,
73+
SiddhiQueryContext siddhiQueryContext) {
14374
return null;
14475
}
14576

14677
@Override
14778
public List<Attribute> getReturnAttributes() {
148-
List<Attribute> attributes = new ArrayList<>();
149-
attributes.add(new Attribute("isSuccess", BOOL));
150-
return attributes;
79+
return new ArrayList<>();
15180
}
15281

15382
@Override
15483
public ProcessingMode getProcessingMode() {
15584
return ProcessingMode.BATCH;
15685
}
15786

87+
@Override
88+
protected Object[] process(Object[] data) {
89+
return new Object[0];
90+
}
91+
92+
@Override
93+
protected Object[] process(Object data) {
94+
String fileDeletePathUri = (String) data;
95+
try {
96+
FileObject rootFileObject = Utils.getFileObject(fileDeletePathUri);
97+
rootFileObject.delete(Selectors.SELECT_ALL);
98+
} catch (FileSystemException e) {
99+
throw new SiddhiAppRuntimeException("Failure occurred when deleting the file " + fileDeletePathUri, e);
100+
}
101+
return new Object[0];
102+
}
103+
158104
@Override
159105
public void start() {
160106

0 commit comments

Comments
 (0)