Skip to content

Commit 15e34d0

Browse files
authored
Merge pull request #72 from ramindu90/master
Add functions for file manipulation
2 parents 72aa449 + f3d1e06 commit 15e34d0

30 files changed

+4422
-11
lines changed

component/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@
8585
<artifactId>testng</artifactId>
8686
<scope>test</scope>
8787
</dependency>
88+
<dependency>
89+
<groupId>org.apache.commons</groupId>
90+
<artifactId>commons-compress</artifactId>
91+
</dependency>
92+
<dependency>
93+
<groupId>io.siddhi.extension.execution.list</groupId>
94+
<artifactId>siddhi-execution-list</artifactId>
95+
</dependency>
8896
</dependencies>
8997
<profiles>
9098
<profile>
Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
/*
2+
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3+
*
4+
* WSO2 Inc. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"); you may not use this file except
6+
* in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
package io.siddhi.extension.execution.file;
19+
20+
import io.siddhi.annotation.Example;
21+
import io.siddhi.annotation.Extension;
22+
import io.siddhi.annotation.Parameter;
23+
import io.siddhi.annotation.ParameterOverload;
24+
import io.siddhi.annotation.util.DataType;
25+
import io.siddhi.core.config.SiddhiQueryContext;
26+
import io.siddhi.core.exception.SiddhiAppRuntimeException;
27+
import io.siddhi.core.executor.ConstantExpressionExecutor;
28+
import io.siddhi.core.executor.ExpressionExecutor;
29+
import io.siddhi.core.query.processor.ProcessingMode;
30+
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
31+
import io.siddhi.core.util.config.ConfigReader;
32+
import io.siddhi.core.util.snapshot.state.StateFactory;
33+
import io.siddhi.query.api.definition.AbstractDefinition;
34+
import io.siddhi.query.api.definition.Attribute;
35+
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
36+
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
37+
import org.apache.commons.io.IOUtils;
38+
import org.apache.log4j.Logger;
39+
40+
import java.io.BufferedInputStream;
41+
import java.io.File;
42+
import java.io.FileInputStream;
43+
import java.io.FileOutputStream;
44+
import java.io.IOException;
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
import java.util.regex.Pattern;
48+
import java.util.zip.ZipEntry;
49+
import java.util.zip.ZipOutputStream;
50+
51+
import static io.siddhi.extension.util.Constant.TAR_FILE_EXTENSION;
52+
import static io.siddhi.extension.util.Constant.ZIP_FILE_EXTENSION;
53+
54+
/**
55+
* This extension can be used to archive files.
56+
*/
57+
@Extension(
58+
name = "archive",
59+
namespace = "file",
60+
description = "Archives files and folders as a zip or in tar format that are available in the given file " +
61+
"uri.\n",
62+
parameters = {
63+
@Parameter(
64+
name = "uri",
65+
description = "Absolute path of the file or the directory",
66+
type = DataType.STRING
67+
),
68+
@Parameter(
69+
name = "destination.dir.uri",
70+
description = "Absolute directory path of the the archived file.",
71+
type = DataType.STRING
72+
),
73+
@Parameter(
74+
name = "archive.type",
75+
description = "Archive type can be zip or tar",
76+
type = DataType.STRING,
77+
optional = true,
78+
defaultValue = "zip"
79+
),
80+
@Parameter(
81+
name = "include.by.regexp",
82+
description = "Only the files matching the patterns will be archived.\n" +
83+
"Note: Add an empty string to match all files",
84+
type = DataType.STRING,
85+
optional = true,
86+
defaultValue = "<Empty_String>"
87+
),
88+
@Parameter(
89+
name = "exclude.subdirectories",
90+
description = "This flag is used to exclude the subdirectories and its files without " +
91+
"archiving.",
92+
type = DataType.BOOL,
93+
defaultValue = "false",
94+
optional = true
95+
),
96+
},
97+
parameterOverloads = {
98+
@ParameterOverload(
99+
parameterNames = {"uri", "destination.dir.uri"}
100+
),
101+
@ParameterOverload(
102+
parameterNames = {"uri", "destination.dir.uri", "archive.type"}
103+
),
104+
@ParameterOverload(
105+
parameterNames = {"uri", "destination.dir.uri", "archive.type", "include.by.regexp"}
106+
),
107+
@ParameterOverload(
108+
parameterNames = {"uri", "destination.dir.uri", "archive.type", "include.by.regexp",
109+
"exclude.subdirectories"}
110+
)
111+
},
112+
examples = {
113+
@Example(
114+
syntax = "InputStream#file:archive('/User/wso2/to_be_archived', " +
115+
"'/User/wso2/archive_destination/file.zip')",
116+
description = "Archives to_be_archived folder in zip format and stores archive_destination " +
117+
"folder as file.zip."
118+
),
119+
@Example(
120+
syntax = "InputStream#file:archive('/User/wso2/to_be_archived', " +
121+
"'/User/wso2/archive_destination/file', 'tar')",
122+
description = "Archives to_be_archived folder in tar format and stores in " +
123+
"archive_destination folder as file.tar."
124+
),
125+
@Example(
126+
syntax = "InputStream#file:archive('/User/wso2/to_be_archived', " +
127+
"'/User/wso2/archive_destination/file', 'tar', '.*test3.txt$')",
128+
description = "Archives files which adheres to '.*test3.txt$' regex in to_be_archived " +
129+
"folder in tar format and stores in archive_destination folder as file.tar."
130+
),
131+
@Example(
132+
syntax = "InputStream#file:archive('/User/wso2/to_be_archived', " +
133+
"'/User/wso2/archive_destination/file', '', '', 'false')",
134+
description = "Archives to_be_archived folder excluding the sub-folders in zip format and " +
135+
"stores in archive_destination folder as file.tar."
136+
)
137+
}
138+
)
139+
public class FileArchiveExtension extends StreamFunctionProcessor {
140+
private static final Logger log = Logger.getLogger(FileArchiveExtension.class);
141+
private Pattern pattern = null;
142+
private int inputExecutorLength;
143+
144+
@Override
145+
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
146+
ConfigReader configReader, boolean outputExpectsExpiredEvents,
147+
SiddhiQueryContext siddhiQueryContext) {
148+
inputExecutorLength = attributeExpressionExecutors.length;
149+
if (attributeExpressionExecutors.length >= 4 &&
150+
attributeExpressionExecutors[3] instanceof ConstantExpressionExecutor) {
151+
pattern = Pattern.compile(
152+
((ConstantExpressionExecutor) attributeExpressionExecutors[3]).getValue().toString());
153+
}
154+
return null;
155+
}
156+
157+
@Override
158+
public List<Attribute> getReturnAttributes() {
159+
return new ArrayList<>();
160+
}
161+
162+
@Override
163+
public ProcessingMode getProcessingMode() {
164+
return ProcessingMode.BATCH;
165+
}
166+
167+
@Override
168+
protected Object[] process(Object[] data) {
169+
String uri = (String) data[0];
170+
String destinationDirUri = (String) data[1];
171+
boolean excludeSubdirectories = false;
172+
String regex = "";
173+
String archiveType = ZIP_FILE_EXTENSION;
174+
if (inputExecutorLength >= 3) {
175+
archiveType = (String) data[2];
176+
}
177+
if (inputExecutorLength >= 4) {
178+
regex = (String) data[3];
179+
}
180+
if (inputExecutorLength == 5) {
181+
excludeSubdirectories = (Boolean) data[4];
182+
}
183+
if (pattern == null) {
184+
pattern = Pattern.compile(regex);
185+
}
186+
if (archiveType.compareToIgnoreCase(ZIP_FILE_EXTENSION) == 0) {
187+
File sourceFile = new File(uri);
188+
List<String> fileList = new ArrayList<>();
189+
generateFileList(uri, sourceFile, fileList, excludeSubdirectories);
190+
try {
191+
zip(uri, destinationDirUri, fileList);
192+
} catch (IOException e) {
193+
throw new SiddhiAppRuntimeException("IOException occurred when archiving " + uri, e);
194+
}
195+
} else {
196+
try {
197+
if (archiveType.compareToIgnoreCase(TAR_FILE_EXTENSION) == 0) {
198+
addToTarArchiveCompression(
199+
getTarArchiveOutputStream(destinationDirUri), new File(uri), uri);
200+
} else {
201+
throw new SiddhiAppRuntimeException("Unsupported archive type: " + archiveType);
202+
}
203+
} catch (IOException e) {
204+
throw new SiddhiAppRuntimeException("Exception occurred when archiving " + uri, e);
205+
}
206+
}
207+
return new Object[0];
208+
}
209+
210+
@Override
211+
protected Object[] process(Object data) {
212+
return new Object[0];
213+
}
214+
215+
@Override
216+
public void start() {
217+
218+
}
219+
220+
@Override
221+
public void stop() {
222+
223+
}
224+
225+
/**
226+
* Zip it
227+
*
228+
* @param zipFile output ZIP file location
229+
*/
230+
private void zip(String sourceFileUri, String zipFile, List<String> fileList) throws IOException {
231+
byte[] buffer = new byte[1024];
232+
FileInputStream in = null;
233+
ZipOutputStream zos = null;
234+
FileOutputStream fos = null;
235+
try {
236+
if (!zipFile.endsWith(TAR_FILE_EXTENSION)) {
237+
zipFile = zipFile.concat("." + ZIP_FILE_EXTENSION);
238+
}
239+
fos = new FileOutputStream(zipFile);
240+
zos = new ZipOutputStream(fos);
241+
if (log.isDebugEnabled()) {
242+
log.debug("Output to Zip : " + zipFile + " started for folder/ file: " + sourceFileUri);
243+
}
244+
for (String file : fileList) {
245+
if (log.isDebugEnabled()) {
246+
log.debug("File Adding : " + file + " to " + zipFile + ".");
247+
}
248+
ZipEntry ze = new ZipEntry(file);
249+
zos.putNextEntry(ze);
250+
in = new FileInputStream(sourceFileUri + File.separator + file);
251+
int len;
252+
while ((len = in.read(buffer)) > 0) {
253+
zos.write(buffer, 0, len);
254+
}
255+
in.close();
256+
}
257+
zos.closeEntry();
258+
if (log.isDebugEnabled()) {
259+
log.debug("Output to Zip : " + zipFile + " is complete for folder/ file: " + sourceFileUri);
260+
}
261+
} finally {
262+
if (in != null) {
263+
try {
264+
in.close();
265+
} catch (IOException e) {
266+
log.error("IO exception occurred when closing zip input stream for file path: " + sourceFileUri);
267+
}
268+
}
269+
if (zos != null) {
270+
try {
271+
zos.close();
272+
} catch (IOException e) {
273+
log.error("IO exception occurred when closing zip output stream for file path: " + sourceFileUri);
274+
}
275+
}
276+
if (fos != null) {
277+
try {
278+
fos.close();
279+
} catch (IOException e) {
280+
log.error("IO exception occurred when closing file output stream for file path: " + sourceFileUri);
281+
}
282+
}
283+
}
284+
}
285+
286+
private void addToTarArchiveCompression(TarArchiveOutputStream out, File file, String sourceFileUri)
287+
throws IOException {
288+
if (file.isFile()) {
289+
String entryName = generateZipEntry(file.getAbsoluteFile().toString(), sourceFileUri);
290+
TarArchiveEntry entry = new TarArchiveEntry(file, entryName);
291+
out.putArchiveEntry(entry);
292+
FileInputStream fin = new FileInputStream(file);
293+
BufferedInputStream bis = new BufferedInputStream(fin);
294+
IOUtils.copy(bis, out);
295+
out.closeArchiveEntry();
296+
} else if (file.isDirectory()) {
297+
File[] children = file.listFiles();
298+
if (children != null) {
299+
for (File child : children) {
300+
addToTarArchiveCompression(out, child, sourceFileUri);
301+
}
302+
}
303+
} else {
304+
log.error(file.getName() + " is not supported for archiving. Archiving process continues..");
305+
}
306+
}
307+
308+
private static TarArchiveOutputStream getTarArchiveOutputStream(String name) throws IOException {
309+
if (!name.endsWith(TAR_FILE_EXTENSION)) {
310+
name = name.concat("." + TAR_FILE_EXTENSION);
311+
}
312+
TarArchiveOutputStream taos = new TarArchiveOutputStream(new FileOutputStream(name));
313+
// TAR has an 8 gig file limit by default, this gets around that
314+
taos.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_STAR);
315+
// TAR originally didn't support long file names, so enable the support for it
316+
taos.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
317+
taos.setAddPaxHeadersForNonAsciiNames(true);
318+
return taos;
319+
}
320+
321+
/**
322+
* Traverse a directory and get all files,
323+
* and add the file into fileList
324+
*
325+
* @param node file or directory
326+
*/
327+
private void generateFileList(String sourceFileUri, File node, List<String> fileList,
328+
boolean excludeSubdirectories) {
329+
//add file only
330+
if (node.isFile() && pattern.matcher(node.getName()).lookingAt()) {
331+
fileList.add(generateZipEntry(node.getAbsoluteFile().toString(), sourceFileUri));
332+
}
333+
if (node.isDirectory() && !excludeSubdirectories) {
334+
String[] subNote = node.list();
335+
if (subNote != null) {
336+
for (String filename : subNote) {
337+
generateFileList(sourceFileUri, new File(node, filename), fileList, false);
338+
}
339+
}
340+
}
341+
}
342+
343+
/**
344+
* Format the file path for zip
345+
*
346+
* @param file file path
347+
* @return Formatted file path
348+
*/
349+
private String generateZipEntry(String file, String sourceFileUri) {
350+
return file.substring(sourceFileUri.length());
351+
}
352+
}

0 commit comments

Comments
 (0)