Skip to content

Commit

Permalink
changes done for adding streaming support in XLSX module.
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasrathee-cs committed Oct 8, 2024
1 parent 2cc85a5 commit c2afeee
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 63 deletions.
26 changes: 22 additions & 4 deletions format-xls/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<name>XLS format plugins</name>
<packaging>jar</packaging>
<properties>
<poi.version>5.2.4</poi.version>
<poi.version>5.2.5</poi.version>
<log4j-core.version>2.20.0</log4j-core.version>
</properties>

Expand All @@ -45,7 +45,7 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>${log4j-core.version}</version>
<version>${log4j-core.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
Expand All @@ -64,12 +64,30 @@
<artifactId>format-common</artifactId>
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>excel-streaming-reader</artifactId>
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>com.github.pjfanning</groupId>
<artifactId>poi-shared-strings</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.cdap.plugin.format.xls.input;


import com.github.pjfanning.xlsx.StreamingReader;
import com.google.common.base.Preconditions;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
Expand All @@ -25,17 +28,23 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.EmptyFileException;
import org.apache.poi.poifs.filesystem.FileMagic;
import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;
import org.apache.poi.util.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import javax.annotation.Nullable;


Expand All @@ -51,11 +60,12 @@ public class XlsInputFormat extends PathTrackingInputFormat {
public static final String SHEET_VALUE = "sheetValue";
public static final String NAME_SKIP_HEADER = "skipHeader";
public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";
protected static final int EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT = Integer.MAX_VALUE / 2;

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
Configuration jobConf = context.getConfiguration();
boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false);
boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);
Expand All @@ -65,6 +75,10 @@ protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReade
return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow);
}

public boolean isSplitable(JobContext context, Path file) {
return false;
}

/**
* Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record.
*/
Expand All @@ -74,11 +88,7 @@ public static class XlsRecordReader extends RecordReader<NullWritable, Structure
FormulaEvaluator formulaEvaluator;
// Builder for building structured record
private StructuredRecord.Builder valueBuilder;
private Sheet workSheet;
// InputStream handler for Excel files.
private FSDataInputStream fileIn;
// Specifies the row index.
private int rowIndex;
// Specifies last row num.
private int lastRowNum;
private boolean isRowNull;
Expand All @@ -87,6 +97,11 @@ public static class XlsRecordReader extends RecordReader<NullWritable, Structure
private final Schema outputSchema;
private final boolean terminateIfEmptyRow;
private final boolean skipFirstRow;
private int rowCount;
private Iterator<Row> rows;
// Specifies the row index.
private long rowIdx;


/**
* Constructor for XlsRecordReader.
Expand All @@ -113,10 +128,35 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fileIn = fs.open(file);
Sheet workSheet;
Workbook workbook;
boolean isStreaming = false;
try {
// Use Magic Bytes to detect the file type
InputStream is = FileMagic.prepareToCheckMagic(fileIn);
byte[] emptyFileCheck = new byte[1];
is.mark(emptyFileCheck.length);
if (is.read(emptyFileCheck) < emptyFileCheck.length) {
throw new EmptyFileException();
}
is.reset();

final FileMagic fm = FileMagic.valueOf(is);
switch (fm) {
case OOXML:
workbook = StreamingReader.builder().rowCacheSize(10).open(is);
isStreaming = true;
break;
case OLE2:
IOUtils.setByteArrayMaxOverride(EXCEL_BYTE_ARRAY_MAX_OVERRIDE_DEFAULT);
workbook = WorkbookFactory.create(is);
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
break;
default:
throw new IOException("Can't open workbook - unsupported file type: " + fm);
}

try (Workbook workbook = WorkbookFactory.create(fileIn)) {
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
// Check if user wants to access with name or number
if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) {
workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
Expand All @@ -127,37 +167,43 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
} catch (Exception e) {
throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e);
}

// As we cannot get the number of rows in a sheet while streaming.
// -1 is used as rowCount to indicate that all rows should be read.
rowCount = isStreaming ? -1 : workSheet.getPhysicalNumberOfRows();
lastRowNum = workSheet.getLastRowNum();
rows = workSheet.iterator();
isRowNull = false;
rowIndex = skipFirstRow ? 1 : 0;
rowIdx = 0;
valueBuilder = StructuredRecord.builder(outputSchema);
if (skipFirstRow) {
Preconditions.checkArgument(rows.hasNext(), "No rows found on sheet %s", sheetValue);
rowIdx = 1;
rows.next();
}
}

@Override
public boolean nextKeyValue() {
// If any is true, then we stop processing.
if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
if (!rows.hasNext() || rowCount == 0 || (isRowNull && terminateIfEmptyRow)) {
return false;
}
// Get the next row.
Row row = workSheet.getRow(rowIndex);
Row row = rows.next();
valueBuilder = rowConverter.convert(row, outputSchema);
if (row == null || valueBuilder == null) {
isRowNull = true;
// set valueBuilder to a new builder with all fields set to null
valueBuilder = StructuredRecord.builder(outputSchema);
}
// if all fields are null, then the row is null
rowIndex++;

rowIdx++;
// Stop processing if the row is null and terminateIfEmptyRow is true.
return !isRowNull || !terminateIfEmptyRow;
}

@Override
public float getProgress() {
return (float) rowIndex / lastRowNum;
return (float) rowIdx / lastRowNum;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package io.cdap.plugin.format.xls.input;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.PluginPropertyField;
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.format.input.PathTrackingConfig;

import java.util.Collections;
Expand All @@ -37,7 +33,6 @@
*/
public class XlsInputFormatConfig extends PathTrackingConfig {
public static final String SHEET_NUMBER = "Sheet Number";
private static final String NAME_OVERRIDE = "override";
private static final String NAME_SHEET = "sheet";
public static final String NAME_SHEET_VALUE = "sheetValue";
private static final String NAME_SKIP_HEADER = "skipHeader";
Expand All @@ -53,18 +48,18 @@ public class XlsInputFormatConfig extends PathTrackingConfig {
"Can be either sheet name or sheet no; for example: 'Sheet1' or '0' in case user selects 'Sheet Name' or " +
"'Sheet Number' as 'sheet' input respectively. Sheet number starts with 0. Default is 'Sheet Number' 0.";
public static final String DESC_TERMINATE_ROW = "Specify whether to stop reading after " +
"encountering the first empty row. Defaults to false.";
"encountering the first empty row. Defaults to false.";
public static final Map<String, PluginPropertyField> XLS_FIELDS;

static {
Map<String, PluginPropertyField> fields = new HashMap<>(FIELDS);
fields.put(NAME_SKIP_HEADER,
new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
// Add fields specific for excel format handling.
fields.put(NAME_SHEET, new PluginPropertyField(NAME_SHEET, DESC_SHEET, "string", false, true));
fields.put(NAME_SHEET_VALUE, new PluginPropertyField(NAME_SHEET_VALUE, DESC_SHEET_VALUE, "string", false, true));
fields.put(NAME_TERMINATE_IF_EMPTY_ROW, new PluginPropertyField(
NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
XLS_FIELDS = Collections.unmodifiableMap(fields);
}

Expand Down Expand Up @@ -102,7 +97,7 @@ public XlsInputFormatConfig(@Nullable String schema, @Nullable String sheet, @Nu
super();
this.schema = schema;
this.sheet = sheet;
this.sheetValue = sheetValue;
this.sheetValue = sheetValue;
this.skipHeader = skipHeader;
this.terminateIfEmptyRow = terminateIfEmptyRow;
}
Expand Down Expand Up @@ -173,7 +168,7 @@ public Builder setTerminateIfEmptyRow(Boolean terminateIfEmptyRow) {
}

public XlsInputFormatConfig build() {
return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
}
}

Expand Down
Loading

0 comments on commit c2afeee

Please sign in to comment.