Skip to content

Commit

Permalink
Added format xls
Browse files Browse the repository at this point in the history
[s] Review Squashed
  • Loading branch information
vikasrathee-cs authored and psainics committed Feb 8, 2024
1 parent 923e603 commit 5ae5ae1
Show file tree
Hide file tree
Showing 15 changed files with 1,534 additions and 1 deletion.
6 changes: 6 additions & 0 deletions core-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.plugin</groupId>
<artifactId>format-xls</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import io.cdap.plugin.format.parquet.input.ParquetInputFormatProvider;
import io.cdap.plugin.format.parquet.output.ParquetOutputFormatProvider;
import io.cdap.plugin.format.text.input.TextInputFormatProvider;
import io.cdap.plugin.format.xls.input.XlsInputFormatProvider;
import io.cdap.plugin.transform.JavaScriptTransform;
import io.cdap.plugin.transform.ProjectionTransform;
import org.apache.avro.file.DataFileStream;
Expand Down Expand Up @@ -174,6 +175,8 @@ public static void setupTest() throws Exception {
Snappy.class);
addPluginArtifact(NamespaceId.DEFAULT.artifact("formats-text", "4.0.0"), DATAPIPELINE_ARTIFACT_ID,
ImmutableSet.of(TextInputFormatProvider.PLUGIN_CLASS), TextInputFormatProvider.class);
addPluginArtifact(NamespaceId.DEFAULT.artifact("formats-xls", "4.0.0"), DATAPIPELINE_ARTIFACT_ID,
ImmutableSet.of(XlsInputFormatProvider.PLUGIN_CLASS), XlsInputFormatProvider.class);
}

protected List<GenericRecord> readOutput(TimePartitionedFileSet fileSet, Schema schema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum FileFormat {
ORC(false, true),
PARQUET(true, true),
TEXT(true, false),
TSV(true, true);
TSV(true, true),
XLS(true, false);
private final boolean canRead;
private final boolean canWrite;

Expand Down
99 changes: 99 additions & 0 deletions format-xls/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
~ Copyright © 2024 Cask Data, Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
-->

<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.cdap.plugin</groupId>
<artifactId>hydrator-plugins</artifactId>
<version>2.13.0-SNAPSHOT</version>
</parent>
<artifactId>format-xls</artifactId>
<name>XLS format plugins</name>
<packaging>jar</packaging>
<properties>
<poi.version>5.2.4</poi.version>
<log4j-core.version>2.20.0</log4j-core.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>compile</scope>
<version>${log4j-core.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-etl-api</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-formats</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>hydrator-test</artifactId>
</dependency>
<dependency>
<groupId>io.cdap.plugin</groupId>
<artifactId>format-common</artifactId>
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.format.xls.*
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
<Embed-Directory>lib</Embed-Directory>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>io.cdap</groupId>
<artifactId>cdap-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.format.xls.input;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.format.input.PathTrackingInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.poi.ss.usermodel.FormulaEvaluator;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;

import java.io.IOException;
import javax.annotation.Nullable;


/**
* {@link XlsInputFormat} is {@link TextInputFormat} implementation for reading Excel files.
* <p>
* The {@link XlsInputFormat.XlsRecordReader} reads a given sheet, and within a sheet reads
* all columns and all rows.
*/
public class XlsInputFormat extends PathTrackingInputFormat {

public static final String SHEET_NUM = "Sheet Number";
public static final String SHEET_VALUE = "sheetValue";
public static final String NAME_SKIP_HEADER = "skipHeader";
public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";

@Override
protected RecordReader<NullWritable, StructuredRecord.Builder> createRecordReader(
FileSplit split, TaskAttemptContext context, @Nullable String pathField,
@Nullable Schema schema) throws IOException {
Configuration jobConf = context.getConfiguration();
boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false);
boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);
Schema outputSchema = schema != null ? Schema.parseJson(context.getConfiguration().get("schema")) : null;
String sheet = jobConf.get(SHEET_NUM);
String sheetValue = jobConf.get(SHEET_VALUE, "0");
return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow);
}

/**
* Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record.
*/
public static class XlsRecordReader extends RecordReader<NullWritable, StructuredRecord.Builder> {
// Converter for converting xls row to structured record
XlsRowConverter rowConverter;
FormulaEvaluator formulaEvaluator;
// Builder for building structured record
private StructuredRecord.Builder valueBuilder;
private Sheet workSheet;
// InputStream handler for Excel files.
private FSDataInputStream fileIn;
// Specifies the row index.
private int rowIndex;
// Specifies last row num.
private int lastRowNum;
private boolean isRowNull;
private final String sheet;
private final String sheetValue;
private final Schema outputSchema;
private final boolean terminateIfEmptyRow;
private final boolean skipFirstRow;

/**
* Constructor for XlsRecordReader.
*/
public XlsRecordReader(String sheet, String sheetValue, Schema outputSchema, boolean terminateIfEmptyRow,
boolean skipFirstRow) {
this.sheet = sheet;
this.sheetValue = sheetValue;
this.outputSchema = outputSchema;
this.terminateIfEmptyRow = terminateIfEmptyRow;
this.skipFirstRow = skipFirstRow;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {

if (!(split instanceof FileSplit)) {
// should never happen
throw new IllegalStateException("Input split is not a FileSplit.");
}
FileSplit fileSplit = (FileSplit) split;
Configuration jobConf = context.getConfiguration();
// Path of input file.
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fileIn = fs.open(file);

try (Workbook workbook = WorkbookFactory.create(fileIn)) {
formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
formulaEvaluator.setIgnoreMissingWorkbooks(true);
// Check if user wants to access with name or number
if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) {
workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
} else {
workSheet = workbook.getSheet(sheetValue);
}
rowConverter = new XlsRowConverter(formulaEvaluator);
} catch (Exception e) {
throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e);
}

lastRowNum = workSheet.getLastRowNum();
isRowNull = false;
rowIndex = skipFirstRow ? 1 : 0;
valueBuilder = StructuredRecord.builder(outputSchema);
}

@Override
public boolean nextKeyValue() {
// If any is true, then we stop processing.
if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
return false;
}
// Get the next row.
Row row = workSheet.getRow(rowIndex);
valueBuilder = rowConverter.convert(row, outputSchema);
if (row == null || valueBuilder == null) {
isRowNull = true;
// set valueBuilder to a new builder with all fields set to null
valueBuilder = StructuredRecord.builder(outputSchema);
}
// if all fields are null, then the row is null
rowIndex++;

// Stop processing if the row is null and terminateIfEmptyRow is true.
return !isRowNull || !terminateIfEmptyRow;
}

@Override
public float getProgress() {
return (float) rowIndex / lastRowNum;
}

@Override
public void close() throws IOException {
if (fileIn != null) {
fileIn.close();
}
}

@Override
public NullWritable getCurrentKey() {
return NullWritable.get();
}

@Override
public StructuredRecord.Builder getCurrentValue() {
return valueBuilder;
}
}
}
Loading

0 comments on commit 5ae5ae1

Please sign in to comment.