readOutput(TimePartitionedFileSet fileSet, Schema schema) throws IOException {
diff --git a/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java b/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java
index b291d34e8..cc1cacec5 100644
--- a/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java
+++ b/format-common/src/main/java/io/cdap/plugin/format/FileFormat.java
@@ -38,7 +38,8 @@ public enum FileFormat {
ORC(false, true),
PARQUET(true, true),
TEXT(true, false),
- TSV(true, true);
+ TSV(true, true),
+ XLS(true, false);
private final boolean canRead;
private final boolean canWrite;
diff --git a/format-xls/pom.xml b/format-xls/pom.xml
new file mode 100644
index 000000000..77358cfdb
--- /dev/null
+++ b/format-xls/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+ 4.0.0
+
+ io.cdap.plugin
+ hydrator-plugins
+ 2.13.0-SNAPSHOT
+
+ format-xls
+ XLS format plugins
+ jar
+
+ 5.2.4
+ 2.20.0
+
+
+
+
+ org.apache.poi
+ poi
+ ${poi.version}
+
+
+ org.apache.poi
+ poi-ooxml
+ ${poi.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ compile
+ ${log4j-core.version}
+
+
+ io.cdap.cdap
+ cdap-etl-api
+
+
+ io.cdap.cdap
+ cdap-formats
+
+
+ io.cdap.cdap
+ hydrator-test
+
+
+ io.cdap.plugin
+ format-common
+ ${project.version}
+
+
+
+
+ junit
+ junit
+
+
+
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+
+
+ <_exportcontents>
+ io.cdap.plugin.format.xls.*
+
+ *;inline=false;scope=compile
+ true
+ lib
+
+
+
+
+ io.cdap
+ cdap-maven-plugin
+
+
+
+
+
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java
new file mode 100644
index 000000000..0b79787f6
--- /dev/null
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormat.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.format.input.PathTrackingInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
+import org.apache.poi.ss.usermodel.WorkbookFactory;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+
+/**
+ * {@link XlsInputFormat} is {@link TextInputFormat} implementation for reading Excel files.
+ *
+ * The {@link XlsInputFormat.XlsRecordReader} reads a given sheet, and within a sheet reads
+ * all columns and all rows.
+ */
+public class XlsInputFormat extends PathTrackingInputFormat {
+
+ public static final String SHEET_NUM = "Sheet Number";
+ public static final String SHEET_VALUE = "sheetValue";
+ public static final String NAME_SKIP_HEADER = "skipHeader";
+ public static final String TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";
+
+ @Override
+ protected RecordReader createRecordReader(
+ FileSplit split, TaskAttemptContext context, @Nullable String pathField,
+ @Nullable Schema schema) throws IOException {
+ Configuration jobConf = context.getConfiguration();
+ boolean skipFirstRow = jobConf.getBoolean(NAME_SKIP_HEADER, false);
+ boolean terminateIfEmptyRow = jobConf.getBoolean(TERMINATE_IF_EMPTY_ROW, false);
+ Schema outputSchema = schema != null ? Schema.parseJson(context.getConfiguration().get("schema")) : null;
+ String sheet = jobConf.get(SHEET_NUM);
+ String sheetValue = jobConf.get(SHEET_VALUE, "0");
+ return new XlsRecordReader(sheet, sheetValue, outputSchema, terminateIfEmptyRow, skipFirstRow);
+ }
+
+ /**
+ * Reads Excel sheet, where each row is a {@link StructuredRecord} and each cell is a field in the record.
+ */
+ public static class XlsRecordReader extends RecordReader {
+ // Converter for converting xls row to structured record
+ XlsRowConverter rowConverter;
+ FormulaEvaluator formulaEvaluator;
+ // Builder for building structured record
+ private StructuredRecord.Builder valueBuilder;
+ private Sheet workSheet;
+ // InputStream handler for Excel files.
+ private FSDataInputStream fileIn;
+ // Specifies the row index.
+ private int rowIndex;
+ // Specifies last row num.
+ private int lastRowNum;
+ private boolean isRowNull;
+ private final String sheet;
+ private final String sheetValue;
+ private final Schema outputSchema;
+ private final boolean terminateIfEmptyRow;
+ private final boolean skipFirstRow;
+
+ /**
+ * Constructor for XlsRecordReader.
+ */
+ public XlsRecordReader(String sheet, String sheetValue, Schema outputSchema, boolean terminateIfEmptyRow,
+ boolean skipFirstRow) {
+ this.sheet = sheet;
+ this.sheetValue = sheetValue;
+ this.outputSchema = outputSchema;
+ this.terminateIfEmptyRow = terminateIfEmptyRow;
+ this.skipFirstRow = skipFirstRow;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
+
+ if (!(split instanceof FileSplit)) {
+ // should never happen
+ throw new IllegalStateException("Input split is not a FileSplit.");
+ }
+ FileSplit fileSplit = (FileSplit) split;
+ Configuration jobConf = context.getConfiguration();
+ // Path of input file.
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(jobConf);
+ fileIn = fs.open(file);
+
+ try (Workbook workbook = WorkbookFactory.create(fileIn)) {
+ formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
+ formulaEvaluator.setIgnoreMissingWorkbooks(true);
+ // Check if user wants to access with name or number
+ if (sheet.equals(XlsInputFormatConfig.SHEET_NUMBER)) {
+ workSheet = workbook.getSheetAt(Integer.parseInt(sheetValue));
+ } else {
+ workSheet = workbook.getSheet(sheetValue);
+ }
+ rowConverter = new XlsRowConverter(formulaEvaluator);
+ } catch (Exception e) {
+ throw new IOException("Exception while reading excel sheet. " + e.getMessage(), e);
+ }
+
+ lastRowNum = workSheet.getLastRowNum();
+ isRowNull = false;
+ rowIndex = skipFirstRow ? 1 : 0;
+ valueBuilder = StructuredRecord.builder(outputSchema);
+ }
+
+ @Override
+ public boolean nextKeyValue() {
+ // If any is true, then we stop processing.
+ if (rowIndex > lastRowNum || lastRowNum == -1 || (isRowNull && terminateIfEmptyRow)) {
+ return false;
+ }
+ // Get the next row.
+ Row row = workSheet.getRow(rowIndex);
+ valueBuilder = rowConverter.convert(row, outputSchema);
+ if (row == null || valueBuilder == null) {
+ isRowNull = true;
+ // set valueBuilder to a new builder with all fields set to null
+ valueBuilder = StructuredRecord.builder(outputSchema);
+ }
+ // if all fields are null, then the row is null
+ rowIndex++;
+
+ // Stop processing if the row is null and terminateIfEmptyRow is true.
+ return !isRowNull || !terminateIfEmptyRow;
+ }
+
+ @Override
+ public float getProgress() {
+ return (float) rowIndex / lastRowNum;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (fileIn != null) {
+ fileIn.close();
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public StructuredRecord.Builder getCurrentValue() {
+ return valueBuilder;
+ }
+ }
+}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java
new file mode 100644
index 000000000..27ec7c343
--- /dev/null
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatConfig.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.lib.KeyValue;
+import io.cdap.cdap.api.plugin.PluginPropertyField;
+import io.cdap.plugin.common.KeyValueListParser;
+import io.cdap.plugin.format.input.PathTrackingConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Common config for Excel related formats.
+ */
+public class XlsInputFormatConfig extends PathTrackingConfig {
+ public static final String SHEET_NUMBER = "Sheet Number";
+ private static final String NAME_OVERRIDE = "override";
+ private static final String NAME_SHEET = "sheet";
+ public static final String NAME_SHEET_VALUE = "sheetValue";
+ private static final String NAME_SKIP_HEADER = "skipHeader";
+ private static final String NAME_TERMINATE_IF_EMPTY_ROW = "terminateIfEmptyRow";
+
+ // properties
+ public static final String NAME_SAMPLE_SIZE = "sampleSize";
+
+ public static final String DESC_SKIP_HEADER =
+ "Whether to skip the first line of each sheet. The default value is false.";
+ public static final String DESC_SHEET = "Select the sheet by name or number. Default is 'Sheet Number'.";
+ public static final String DESC_SHEET_VALUE = "Specifies the value corresponding to 'sheet' input. " +
+ "Can be either sheet name or sheet no; for example: 'Sheet1' or '0' in case user selects 'Sheet Name' or " +
+ "'Sheet Number' as 'sheet' input respectively. Sheet number starts with 0. Default is 'Sheet Number' 0.";
+ public static final String DESC_TERMINATE_ROW = "Specify whether to stop reading after " +
+ "encountering the first empty row. Defaults to false.";
+ public static final Map XLS_FIELDS;
+
+ static {
+ Map fields = new HashMap<>(FIELDS);
+ fields.put(NAME_SKIP_HEADER,
+ new PluginPropertyField(NAME_SKIP_HEADER, DESC_SKIP_HEADER, "boolean", false, true));
+ // Add fields specific for excel format handling.
+ fields.put(NAME_SHEET, new PluginPropertyField(NAME_SHEET, DESC_SHEET, "string", false, true));
+ fields.put(NAME_SHEET_VALUE, new PluginPropertyField(NAME_SHEET_VALUE, DESC_SHEET_VALUE, "string", false, true));
+ fields.put(NAME_TERMINATE_IF_EMPTY_ROW, new PluginPropertyField(
+ NAME_TERMINATE_IF_EMPTY_ROW, DESC_TERMINATE_ROW, "boolean", false, true));
+ XLS_FIELDS = Collections.unmodifiableMap(fields);
+ }
+
+ @Macro
+ @Nullable
+ @Name(NAME_SHEET)
+ @Description(DESC_SHEET)
+ private String sheet;
+
+ @Macro
+ @Nullable
+ @Name(NAME_SHEET_VALUE)
+ @Description(DESC_SHEET_VALUE)
+ private String sheetValue;
+
+ @Macro
+ @Nullable
+ @Name(NAME_SKIP_HEADER)
+ @Description(DESC_SKIP_HEADER)
+ private Boolean skipHeader;
+
+ @Macro
+ @Nullable
+ @Name(NAME_TERMINATE_IF_EMPTY_ROW)
+ @Description(DESC_TERMINATE_ROW)
+ private Boolean terminateIfEmptyRow;
+
+ public XlsInputFormatConfig() {
+ super();
+ }
+
+ @VisibleForTesting
+ public XlsInputFormatConfig(@Nullable String schema, @Nullable String sheet, @Nullable String sheetValue,
+ @Nullable Boolean skipHeader, @Nullable Boolean terminateIfEmptyRow) {
+ super();
+ this.schema = schema;
+ this.sheet = sheet;
+ this.sheetValue = sheetValue;
+ this.skipHeader = skipHeader;
+ this.terminateIfEmptyRow = terminateIfEmptyRow;
+ }
+
+ public int getSampleSize() {
+ String sampleSize = getProperties().getProperties().getOrDefault(NAME_SAMPLE_SIZE, "1000");
+ try {
+ return Integer.parseInt(sampleSize);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format("Invalid sample size '%s'.", sampleSize));
+ }
+ }
+
+ public String getSheet() {
+ return sheet == null ? SHEET_NUMBER : sheet;
+ }
+
+ @Nullable
+ public String getSheetValue() {
+ return sheetValue;
+ }
+
+ public boolean getSkipHeader() {
+ return skipHeader != null ? skipHeader : false;
+ }
+
+ public boolean getTerminateIfEmptyRow() {
+ return terminateIfEmptyRow != null ? terminateIfEmptyRow : false;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for building a {@link XlsInputFormatConfig}.
+ */
+ public static class Builder {
+ private String schema;
+ private String sheet;
+ private String sheetValue;
+ private Boolean skipHeader;
+ private Boolean terminateIfEmptyRow;
+
+ public Builder setSchema(String schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public Builder setSheet(String sheet) {
+ this.sheet = sheet;
+ return this;
+ }
+
+ public Builder setSheetValue(String sheetValue) {
+ this.sheetValue = sheetValue;
+ return this;
+ }
+
+ public Builder setSkipHeader(Boolean skipHeader) {
+ this.skipHeader = skipHeader;
+ return this;
+ }
+
+ public Builder setTerminateIfEmptyRow(Boolean terminateIfEmptyRow) {
+ this.terminateIfEmptyRow = terminateIfEmptyRow;
+ return this;
+ }
+
+ public XlsInputFormatConfig build() {
+ return new XlsInputFormatConfig(schema, sheet, sheetValue, skipHeader, terminateIfEmptyRow);
+ }
+ }
+
+}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java
new file mode 100644
index 000000000..bedcb48a3
--- /dev/null
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatProvider.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.plugin.PluginClass;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.validation.FormatContext;
+import io.cdap.cdap.etl.api.validation.InputFile;
+import io.cdap.cdap.etl.api.validation.InputFiles;
+import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
+import io.cdap.plugin.format.input.PathTrackingConfig;
+import io.cdap.plugin.format.input.PathTrackingInputFormatProvider;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.DataFormatter;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
+import org.apache.poi.ss.usermodel.WorkbookFactory;
+import org.apache.poi.ss.util.CellReference;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+
+/**
+ * Reads XLS(X) into StructuredRecords.
+ */
+@Plugin(type = ValidatingInputFormat.PLUGIN_TYPE)
+@Name(XlsInputFormatProvider.NAME)
+@Description(XlsInputFormatProvider.DESC)
+public class XlsInputFormatProvider extends PathTrackingInputFormatProvider {
+ static final String NAME = "xls";
+ static final String DESC = "Plugin for reading files in xls(x) format.";
+ public static final PluginClass PLUGIN_CLASS = PluginClass.builder()
+ .setType(ValidatingInputFormat.PLUGIN_TYPE)
+ .setName(NAME)
+ .setDescription(DESC)
+ .setClassName(XlsInputFormatProvider.class.getName())
+ .setConfigFieldName("conf")
+ .setProperties(XlsInputFormatConfig.XLS_FIELDS)
+ .build();
+ private final XlsInputFormatConfig conf;
+
+ public XlsInputFormatProvider(XlsInputFormatConfig conf) {
+ super(conf);
+ this.conf = conf;
+ }
+
+ @Override
+ public String getInputFormatClassName() {
+ return XlsInputFormat.class.getName();
+ }
+
+ @Override
+ public void validate(FormatContext context) {
+ Schema schema = super.getSchema(context);
+ FailureCollector collector = context.getFailureCollector();
+ // When the sheet is specified by number, the sheet value must be a number
+ if (!conf.containsMacro(XlsInputFormatConfig.NAME_SHEET_VALUE) &&
+ conf.getSheet().equals(XlsInputFormatConfig.SHEET_NUMBER) &&
+ !Strings.isNullOrEmpty(conf.getSheetValue())) {
+ getSheetAsNumber(collector);
+ }
+ if (!conf.containsMacro(PathTrackingConfig.NAME_SCHEMA) && schema == null && context.getInputSchema() == null) {
+ collector.addFailure("XLS format cannot be used without specifying a schema.", "Schema must be specified.")
+ .withConfigProperty(PathTrackingConfig.NAME_SCHEMA);
+ }
+ }
+
+ @Override
+ protected void addFormatProperties(Map properties) {
+ properties.put(XlsInputFormat.SHEET_NUM, conf.getSheet());
+ if (!Strings.isNullOrEmpty(conf.getSheetValue())) {
+ properties.put(XlsInputFormat.SHEET_VALUE, conf.getSheetValue());
+ }
+ properties.put(XlsInputFormat.NAME_SKIP_HEADER, String.valueOf(conf.getSkipHeader()));
+ properties.put(XlsInputFormat.TERMINATE_IF_EMPTY_ROW, String.valueOf(conf.getTerminateIfEmptyRow()));
+ properties.put(FileInputFormat.SPLIT_MINSIZE, Long.toString(Long.MAX_VALUE));
+ }
+
+ @Override
+ @Nullable
+ public Schema detectSchema(FormatContext context, InputFiles inputFiles) throws IOException {
+ String blankHeader = "BLANK";
+ FailureCollector failureCollector = context.getFailureCollector();
+ FormulaEvaluator formulaEvaluator;
+ for (InputFile inputFile : inputFiles) {
+ DataFormatter formatter = new DataFormatter();
+ try (Workbook workbook = WorkbookFactory.create(inputFile.open())) {
+ formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator();
+ formulaEvaluator.setIgnoreMissingWorkbooks(true);
+ Sheet workSheet;
+ // Check if user wants to access with name or number
+ if (conf.getSheet() != null && conf.getSheet().equals(XlsInputFormatConfig.SHEET_NUMBER)) {
+ Integer sheetValue = getSheetAsNumber(failureCollector);
+ if (sheetValue == null) {
+ return null;
+ }
+ workSheet = workbook.getSheetAt(sheetValue);
+ } else {
+ if (Strings.isNullOrEmpty(conf.getSheetValue())) {
+ failureCollector.addFailure("Sheet name must be specified.", null)
+ .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
+ return null;
+ }
+ workSheet = workbook.getSheet(conf.getSheetValue());
+ }
+
+ // If provided sheet does not exist, throw an exception
+ if (workSheet == null) {
+ failureCollector.addFailure("Sheet " + conf.getSheetValue() + " does not exist in the workbook.",
+ "Specify a valid sheet.");
+ return null;
+ }
+
+ int sampleSize = conf.getSampleSize();
+ // Row numbers are 0 based in POI
+ int rowStart = Math.min(0, workSheet.getFirstRowNum());
+ int rowEnd = Math.min(sampleSize, workSheet.getLastRowNum());
+
+ int lastCellNumMax = 0;
+ List columnNames = new ArrayList<>();
+ XlsInputFormatSchemaDetector schemaDetector = new XlsInputFormatSchemaDetector();
+ for (int rowIndex = rowStart; rowIndex <= rowEnd; rowIndex++) {
+ Row row = workSheet.getRow(rowIndex);
+ if (row == null) {
+ continue;
+ }
+ lastCellNumMax = Math.max(lastCellNumMax, row.getLastCellNum());
+
+ // Use the first row to get the column names
+ if (rowIndex == 0 && conf.getSkipHeader()) {
+ for (int cellIndex = 0; cellIndex < lastCellNumMax; cellIndex++) {
+ Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
+ columnNames.add(cell == null ? blankHeader : formatter.formatCellValue(cell, formulaEvaluator));
+ }
+ // Skip Header
+ continue;
+ }
+
+ for (int cellIndex = 0; cellIndex < lastCellNumMax; cellIndex++) {
+ Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
+ boolean isFirstRow = rowIndex == (conf.getSkipHeader() ? 1 : 0);
+ schemaDetector.reduceSchema(cellIndex, cell, isFirstRow);
+ }
+
+ }
+
+ // If some rows have more cells than the first row, add blank headers for the extra cells
+ if (lastCellNumMax > columnNames.size() && conf.getSkipHeader()) {
+ for (int i = columnNames.size(); i < lastCellNumMax; i++) {
+ columnNames.add(blankHeader);
+ }
+ }
+
+ // Set column names if header is not skipped
+ if (!conf.getSkipHeader()) {
+ for (int i = 0; i < lastCellNumMax; i++) {
+ columnNames.add(CellReference.convertNumToColString(i));
+ }
+ }
+
+ Schema schema = Schema.recordOf("xls", schemaDetector.getFields(
+ XlsInputFormatUtils.getSafeColumnNames(columnNames)));
+ return PathTrackingInputFormatProvider.addPathField(context.getFailureCollector(), schema, conf.getPathField());
+ }
+ }
+ return null;
+ }
+
+ private Integer getSheetAsNumber(FailureCollector failureCollector) {
+ if (!Strings.isNullOrEmpty(conf.getSheetValue())) {
+ try {
+ int sheetValue = Integer.parseInt(conf.getSheetValue());
+ if (sheetValue >= 0) {
+ return sheetValue;
+ }
+ failureCollector.addFailure("Sheet number must be a positive number.", null)
+ .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
+ } catch (NumberFormatException e) {
+ failureCollector.addFailure("Sheet number must be a number.", null)
+ .withConfigProperty(XlsInputFormatConfig.NAME_SHEET_VALUE);
+ }
+ }
+ return null;
+ }
+}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetector.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetector.java
new file mode 100644
index 000000000..3b7b4605f
--- /dev/null
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetector.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.CellType;
+import org.apache.poi.ss.usermodel.DateUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Detects the schema of an Excel file.
+ */
+public class XlsInputFormatSchemaDetector {
+
+ private final Map columnSchemaReducerMap = new HashMap<>();
+ private final Map columnNullableMap = new HashMap<>();
+
+ /**
+ * Reduces the schema of the Excel file.
+ *
+ * @param columnIndex the column index of the cell
+ * @param cell the cell to reduce the schema from
+ * @param isFirstRow whether the cell is in the first row
+ */
+ public void reduceSchema(int columnIndex, Cell cell, boolean isFirstRow) {
+ boolean isCellEmpty = isCellEmpty(cell);
+
+ if (!columnNullableMap.containsKey(columnIndex)) {
+ // When we see the index for the first time and this is not the first row,
+ // we can assume that the column is nullable as the previous rows did not have a value for this column.
+ columnNullableMap.put(columnIndex, !isFirstRow);
+ }
+ // Pin the nullability of the column to true if the cell is empty
+ columnNullableMap.put(columnIndex, isCellEmpty || columnNullableMap.get(columnIndex));
+ if (isCellEmpty) {
+ return;
+ }
+ // Check if key exists in map
+ if (columnSchemaReducerMap.containsKey(columnIndex)) {
+ // If key exists, reduce the schema type
+ columnSchemaReducerMap.put(columnIndex, reduceSchemaType(columnSchemaReducerMap.get(columnIndex), cell));
+ } else {
+ // If key does not exist, add it to the map
+ columnSchemaReducerMap.put(columnIndex, getSchemaType(cell));
+ }
+ }
+
+ private void normalizeColumn(int numColumns) {
+ for (int i = 0; i < numColumns; i++) {
+ // set all nullability to true if not present
+ columnNullableMap.putIfAbsent(i, true);
+ // set all schema types to string if not present
+ columnSchemaReducerMap.putIfAbsent(i, Schema.Type.STRING);
+ }
+ }
+
+ /**
+ * Returns the schema of the Excel file.
+ *
+ * @param columnNames the column names of the Excel file
+ * @return the schema of the Excel file
+ */
+ public List getFields(List columnNames) {
+ normalizeColumn(columnNames.size());
+ List fields = new ArrayList<>();
+ for (int i = 0; i < columnNames.size(); i++) {
+ String columnName = columnNames.get(i);
+ boolean isNullable = columnNullableMap.get(i);
+ Schema.Type schemaType = columnSchemaReducerMap.get(i);
+ Schema schema = isNullable ? Schema.nullableOf(Schema.of(schemaType)) : Schema.of(schemaType);
+ fields.add(Schema.Field.of(columnName, schema));
+ }
+ return fields;
+ }
+
+ private static boolean isCellEmpty(Cell cell) {
+ if (cell != null && cell.getCellType() == CellType.FORMULA) {
+ return cell.getCachedFormulaResultType() == CellType.BLANK;
+ }
+ return cell == null || cell.getCellType() == CellType.BLANK;
+ }
+
+ private static Schema.Type getSchemaType(Cell cell) {
+ CellType cellType = cell.getCellType() == CellType.FORMULA ?
+ cell.getCachedFormulaResultType() : cell.getCellType();
+ // Force Dates As String
+ if (cellType == CellType.NUMERIC && DateUtil.isCellDateFormatted(cell)) {
+ return Schema.Type.STRING;
+ }
+ // Mapping for XLS Cell Types to CDAP Schema Types
+ switch (cellType) {
+ case BOOLEAN:
+ return Schema.Type.BOOLEAN;
+ case NUMERIC:
+ return Schema.Type.DOUBLE;
+ default:
+ return Schema.Type.STRING;
+ }
+ }
+ private static Schema.Type reduceSchemaType(Schema.Type detectedSchemaType, Cell cell) {
+ if (detectedSchemaType == Schema.Type.STRING) {
+ return Schema.Type.STRING;
+ }
+ CellType cellType = cell.getCellType() == CellType.FORMULA ?
+ cell.getCachedFormulaResultType() : cell.getCellType();
+ switch (cellType) {
+ case BOOLEAN:
+ switch (detectedSchemaType) {
+ case BOOLEAN:
+ return Schema.Type.BOOLEAN;
+ case DOUBLE:
+ return Schema.Type.DOUBLE;
+ }
+ return Schema.Type.STRING;
+ case NUMERIC:
+ return Schema.Type.DOUBLE;
+ }
+ return Schema.Type.STRING;
+ }
+}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtils.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtils.java
new file mode 100644
index 000000000..bb3abc318
--- /dev/null
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Utilities around XLS input format.
+ */
+public class XlsInputFormatUtils {
+ private static final Pattern NOT_VALID_PATTERN = Pattern.compile("[^A-Za-z0-9_]+");
+
+ /**
+ * Cleans a list of column names to make sure they comply with avro field naming standard.
+ * It also makes sure each name is unique in the list.
+ * Field names can start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_].
+ *
+ * Steps:
+ * 1) Trim surrounding spaces
+ * 2) If its empty replace it with BLANK
+ * 3) If it starts with a number, prepend "col_"
+ * 4) Replace invalid characters with "_" (multiple invalid characters gets replaced with one symbol)
+ * 5) Check if the name has been found before (without considering case)
+ * if so add _# where # is the number of times seen before + 1
+ */
+ public static List getSafeColumnNames(List columnNames) {
+ return cleanSchemaColumnNames(columnNames);
+ }
+
+ private static List cleanSchemaColumnNames(List columnNames) {
+ final String replacementChar = "_";
+ final List cleanColumnNames = new ArrayList<>();
+ final Map seenColumnNames = new HashMap<>();
+ for (String columnName : columnNames) {
+ StringBuilder cleanColumnNameBuilder = new StringBuilder();
+
+ // Remove any spaces at the end of the strings
+ columnName = columnName.trim();
+
+ // If it's an empty string replace it with BLANK
+ if (columnName.isEmpty()) {
+ cleanColumnNameBuilder.append("BLANK");
+ } else if ((columnName.charAt(0) >= '0') && (columnName.charAt(0) <= '9')) {
+ // Prepend a col_ if the first character is a number
+ cleanColumnNameBuilder.append("col_");
+ }
+
+ // Replace all invalid characters with the replacement char
+ cleanColumnNameBuilder.append(NOT_VALID_PATTERN.matcher(columnName).replaceAll(replacementChar));
+
+ // Check if the field exist if so append and index at the end
+ // We use lowercase to match columns "A" and "a" to avoid issues with wrangler.
+ String cleanColumnName = cleanColumnNameBuilder.toString();
+ String lowerCaseCleanColumnName = cleanColumnName.toLowerCase();
+ while (seenColumnNames.containsKey(lowerCaseCleanColumnName)) {
+ cleanColumnNameBuilder.append(replacementChar).append(seenColumnNames.get(lowerCaseCleanColumnName));
+ seenColumnNames.put(lowerCaseCleanColumnName, seenColumnNames.get(lowerCaseCleanColumnName) + 1);
+ cleanColumnName = cleanColumnNameBuilder.toString();
+ lowerCaseCleanColumnName = cleanColumnName.toLowerCase();
+ }
+ seenColumnNames.put(lowerCaseCleanColumnName, 2);
+
+ cleanColumnNames.add(cleanColumnName);
+ }
+ return cleanColumnNames;
+ }
+}
diff --git a/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java
new file mode 100644
index 000000000..251d6b36f
--- /dev/null
+++ b/format-xls/src/main/java/io/cdap/plugin/format/xls/input/XlsRowConverter.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.CellType;
+import org.apache.poi.ss.usermodel.DataFormatter;
+import org.apache.poi.ss.usermodel.DateUtil;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Converts a row of XLS cells to a StructuredRecord.
+ */
+public class XlsRowConverter {
+ private final FormulaEvaluator evaluator;
+ private static final DataFormatter dataFormatter = new DataFormatter();
+
+ XlsRowConverter(FormulaEvaluator evaluator) {
+ this.evaluator = evaluator;
+ }
+
+ /**
+ * Converts a row of XLS cells to a StructuredRecord.
+ * Returns null if the row is null or empty.
+ */
+ @Nullable
+ public StructuredRecord.Builder convert(Row row, Schema outputSchema) {
+ if (row == null) {
+ return null;
+ }
+ boolean isRowEmpty = true;
+ StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
+ List fields = outputSchema.getFields();
+ for (int cellIndex = 0; cellIndex < row.getLastCellNum() && cellIndex < fields.size(); cellIndex++) {
+ Cell cell = row.getCell(cellIndex, Row.MissingCellPolicy.RETURN_BLANK_AS_NULL);
+ if (cell == null) {
+ // Blank cells are skipped, builder will set null for the field, no processing needed.
+ continue;
+ }
+ Schema.Field field = fields.get(cellIndex);
+ Schema.Type type = field.getSchema().isNullable() ?
+ field.getSchema().getNonNullable().getType() : field.getSchema().getType();
+ Object cellValue;
+ switch (type) {
+ case STRING:
+ cellValue = getCellAsString(cell);
+ break;
+ case DOUBLE:
+ cellValue = getCellAsDouble(cell);
+ break;
+ case BOOLEAN:
+ cellValue = getCellAsBoolean(cell);
+ break;
+ default:
+ // As we only support string, double and boolean, this should never happen.
+ throw new IllegalStateException(
+ String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s",
+ field.getName(), type, "string, double, boolean"));
+ }
+ if (cellValue == null) {
+ continue;
+ }
+ builder.set(field.getName(), cellValue);
+ isRowEmpty = false;
+ }
+ if (isRowEmpty) {
+ return null;
+ }
+ return builder;
+ }
+
+ private CellType getCellType(Cell cell) {
+ CellType cellType = cell.getCellType();
+ if (cellType == CellType.FORMULA) {
+ try {
+ cellType = cell.getCachedFormulaResultType();
+ } catch (Exception e) {
+ cellType = evaluator.evaluateFormulaCell(cell);
+ }
+ }
+ return cellType;
+ }
+
+ private String getCellAsString(Cell cell) {
+ CellType cellType = getCellType(cell);
+
+ switch (cellType) {
+ case NUMERIC:
+ if (DateUtil.isCellDateFormatted(cell)) {
+ return dataFormatter.formatCellValue(cell);
+ }
+ return Double.toString(cell.getNumericCellValue());
+ case STRING:
+ return cell.getRichStringCellValue().getString();
+ case BOOLEAN:
+ return cell.getBooleanCellValue() ? "TRUE" : "FALSE";
+ case BLANK:
+ case ERROR:
+ return null;
+ default:
+ throw new IllegalStateException(
+ String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
+ }
+ }
+
+ private boolean getCellAsBoolean(Cell cell) {
+ CellType cellType = getCellType(cell);
+
+ switch (cellType) {
+ case NUMERIC:
+ // Non-zero values are true
+ return cell.getNumericCellValue() != 0;
+ case STRING:
+ return cell.getRichStringCellValue().getString().equalsIgnoreCase("true");
+ case BOOLEAN:
+ return cell.getBooleanCellValue();
+ case BLANK:
+ case ERROR:
+ return false;
+ default:
+ throw new IllegalStateException(
+ String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
+ }
+ }
+
+ private Double getCellAsDouble(Cell cell) {
+ CellType cellType = getCellType(cell);
+
+ switch (cellType) {
+ case NUMERIC:
+ return cell.getNumericCellValue();
+ case STRING:
+ return null;
+ case BOOLEAN:
+ return cell.getBooleanCellValue() ? 1.0 : 0.0;
+ case BLANK:
+ case ERROR:
+ return 0.0;
+ default:
+ throw new IllegalStateException(
+ String.format("Failed to format (%s) due to unsupported cell type (%s)", cell, cellType));
+ }
+ }
+
+}
diff --git a/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatProviderTest.java b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatProviderTest.java
new file mode 100644
index 000000000..990bfee9c
--- /dev/null
+++ b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatProviderTest.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.validation.FormatContext;
+import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link XlsInputFormatProvider}
+ */
+public class XlsInputFormatProviderTest {
+ XlsInputFormatProvider xlsInputFormatProvider;
+ MockFailureCollector failureCollector;
+ FormatContext formatContext;
+ String validSchemaString;
+ XlsInputFormatConfig.Builder xlsInputFormatConfigBuilder;
+
+ @Before
+ public void setup() {
+ failureCollector = new MockFailureCollector();
+ formatContext = new FormatContext(failureCollector, null);
+ xlsInputFormatConfigBuilder = XlsInputFormatConfig.builder();
+ validSchemaString = Schema.recordOf("test",
+ Schema.Field.of("test", Schema.of(Schema.Type.STRING))).toString();
+ }
+
+ @Test
+ public void testValidateInvalidSheetNumber() {
+ xlsInputFormatProvider = new XlsInputFormatProvider(xlsInputFormatConfigBuilder
+ .setSheet(XlsInputFormatConfig.SHEET_NUMBER)
+ .setSheetValue("A")
+ .setSchema(validSchemaString).build());
+ xlsInputFormatProvider.validate(formatContext);
+ Assert.assertEquals(1, failureCollector.getValidationFailures().size());
+ Assert.assertEquals("Sheet number must be a number.",
+ failureCollector.getValidationFailures().get(0).getMessage());
+ }
+
+ @Test
+ public void testValidateValidSheetNumber() {
+ xlsInputFormatProvider = new XlsInputFormatProvider(xlsInputFormatConfigBuilder
+ .setSheet(XlsInputFormatConfig.SHEET_NUMBER)
+ .setSheetValue("0")
+ .setSchema(validSchemaString).build());
+ xlsInputFormatProvider.validate(formatContext);
+ Assert.assertEquals(0, failureCollector.getValidationFailures().size());
+ }
+
+ @Test
+ public void testValidateWithNoSchema() {
+ xlsInputFormatProvider = new XlsInputFormatProvider(xlsInputFormatConfigBuilder
+ .setSheet(XlsInputFormatConfig.SHEET_NUMBER)
+ .setSheetValue("0")
+ .build());
+ xlsInputFormatProvider.validate(formatContext);
+ Assert.assertEquals(1, failureCollector.getValidationFailures().size());
+ Assert.assertEquals("XLS format cannot be used without specifying a schema.",
+ failureCollector.getValidationFailures().get(0).getMessage());
+ }
+}
+
diff --git a/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetectorTest.java b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetectorTest.java
new file mode 100644
index 000000000..c121f1b9c
--- /dev/null
+++ b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatSchemaDetectorTest.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.FormulaError;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
+import org.apache.poi.ss.usermodel.WorkbookFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Unit tests for {@link XlsInputFormatSchemaDetector}
+ */
+public class XlsInputFormatSchemaDetectorTest {
+ Method isCellEmptyMethod;
+ Method getSchemaTypeMethod;
+ Method reduceSchemaTypeMethod;
+ Workbook workbook;
+ Sheet sheet;
+ XlsInputFormatSchemaDetector xlsInputFormatSchemaDetector;
+
+ @Before
+ public void setUp() throws Exception {
+ xlsInputFormatSchemaDetector = new XlsInputFormatSchemaDetector();
+ isCellEmptyMethod = xlsInputFormatSchemaDetector.getClass().getDeclaredMethod("isCellEmpty", Cell.class);
+ isCellEmptyMethod.setAccessible(true);
+ getSchemaTypeMethod = xlsInputFormatSchemaDetector.getClass().getDeclaredMethod("getSchemaType", Cell.class);
+ getSchemaTypeMethod.setAccessible(true);
+ reduceSchemaTypeMethod = xlsInputFormatSchemaDetector.getClass().getDeclaredMethod("reduceSchemaType",
+ Schema.Type.class, Cell.class);
+ reduceSchemaTypeMethod.setAccessible(true);
+ // Mock XLS File
+ boolean newXssfFile = true;
+ workbook = WorkbookFactory.create(newXssfFile);
+ sheet = workbook.createSheet("sheet");
+ }
+
+ @Test
+ public void testIsCellEmptyMethod() throws IOException, InvocationTargetException, IllegalAccessException {
+ Row row = sheet.createRow(0);
+ int testColumn = 1;
+ Cell blankCell = row.createCell(++testColumn);
+ blankCell.setBlank();
+ Assert.assertEquals(true, isCellEmptyMethod.invoke(xlsInputFormatSchemaDetector, blankCell));
+
+ Cell stringCell = row.createCell(++testColumn);
+ stringCell.setCellValue("string");
+ Assert.assertEquals(false, isCellEmptyMethod.invoke(xlsInputFormatSchemaDetector, stringCell));
+
+ Cell numericCell = row.createCell(++testColumn);
+ numericCell.setCellValue(1.0);
+ Assert.assertEquals(false, isCellEmptyMethod.invoke(xlsInputFormatSchemaDetector, numericCell));
+
+ Cell booleanCell = row.createCell(++testColumn);
+ booleanCell.setCellValue(true);
+ Assert.assertEquals(false, isCellEmptyMethod.invoke(xlsInputFormatSchemaDetector, booleanCell));
+
+ Cell formulaCell = row.createCell(++testColumn);
+ formulaCell.setCellFormula("SUM(A1:B1)");
+ Assert.assertEquals(false, isCellEmptyMethod.invoke(xlsInputFormatSchemaDetector, formulaCell));
+
+ Cell errorCell = row.createCell(++testColumn);
+ errorCell.setCellErrorValue(FormulaError.DIV0.getCode());
+ Assert.assertEquals(false, isCellEmptyMethod.invoke(xlsInputFormatSchemaDetector, errorCell));
+ workbook.close();
+ }
+
+ @Test
+ public void testGetSchemaTypeMethod() throws IOException, InvocationTargetException, IllegalAccessException {
+ Row row = sheet.createRow(0);
+ int testColumn = 1;
+ Cell stringCell = row.createCell(++testColumn);
+ stringCell.setCellValue("string");
+ Assert.assertEquals(Schema.Type.STRING, getSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector, stringCell));
+
+ Cell numericCell = row.createCell(++testColumn);
+ numericCell.setCellValue(1.0);
+ Assert.assertEquals(Schema.Type.DOUBLE, getSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector, numericCell));
+
+ Cell booleanCell = row.createCell(++testColumn);
+ booleanCell.setCellValue(true);
+ Assert.assertEquals(Schema.Type.BOOLEAN, getSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector, booleanCell));
+
+ Cell errorCell = row.createCell(++testColumn);
+ errorCell.setCellErrorValue(FormulaError.DIV0.getCode());
+ Assert.assertEquals(Schema.Type.STRING, getSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector, errorCell));
+
+ workbook.close();
+ }
+
+ @Test
+ public void testGetSchemaTypeMethodWithFormula() throws IOException, InvocationTargetException,
+ IllegalAccessException {
+ Row row = sheet.createRow(0);
+ double numericValue1 = 1.0;
+ double numericValue2 = 2.0;
+ Cell a1Numeric = row.createCell(0);
+ a1Numeric.setCellValue(numericValue1);
+ Cell b1Numeric = row.createCell(1);
+ b1Numeric.setCellValue(numericValue2);
+
+ Row row2 = sheet.createRow(1);
+ String stringValue1 = "hello";
+ String stringValue2 = "world";
+ Cell a2String = row2.createCell(0);
+ a2String.setCellValue(stringValue1);
+ Cell b2String = row2.createCell(1);
+ b2String.setCellValue(stringValue2);
+
+ Row row3 = sheet.createRow(2);
+ Cell formulaCell = row3.createCell(0);
+ formulaCell.setCellFormula("SUM(A1:B1)");
+ Cell formulaCell2 = row3.createCell(1);
+ formulaCell2.setCellFormula("CONCAT(A2:B2)");
+
+ FormulaEvaluator evaluator = workbook.getCreationHelper().createFormulaEvaluator();
+ evaluator.evaluateAll();
+
+ Assert.assertEquals(numericValue1 + numericValue2, formulaCell.getNumericCellValue(), 0.0);
+ Assert.assertEquals(Schema.Type.DOUBLE, getSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector, formulaCell));
+ Assert.assertEquals(stringValue1 + stringValue2, formulaCell2.getStringCellValue());
+ Assert.assertEquals(Schema.Type.STRING, getSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector, formulaCell2));
+
+ workbook.close();
+ }
+
+ @Test
+ public void testReduceSchemaType() throws InvocationTargetException, IllegalAccessException {
+ Row row = sheet.createRow(0);
+ int testColumn = 1;
+
+ // CDAP $TYPE + XLS_CELL $TPYE = CDAP $TYPE
+ // STRING + ANY = STRING
+ Cell stringCell = row.createCell(++testColumn);
+ stringCell.setCellValue("string");
+ Assert.assertEquals(Schema.Type.STRING, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.STRING, stringCell));
+ Cell numericCell = row.createCell(++testColumn);
+ numericCell.setCellValue(1.0);
+ Assert.assertEquals(Schema.Type.STRING, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.STRING, numericCell));
+ Cell booleanCell = row.createCell(++testColumn);
+ booleanCell.setCellValue(true);
+ Assert.assertEquals(Schema.Type.STRING, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.STRING, booleanCell));
+ Cell errorCell = row.createCell(++testColumn);
+ errorCell.setCellErrorValue(FormulaError.DIV0.getCode());
+ Assert.assertEquals(Schema.Type.STRING, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.STRING, errorCell));
+ Cell formulaCell = row.createCell(++testColumn);
+ formulaCell.setCellFormula("SUM(A1:B1)");
+ Assert.assertEquals(Schema.Type.STRING, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.STRING, formulaCell));
+
+ // BOOLEAN + BOOLEAN = BOOLEAN
+ // BOOLEAN + NUMERIC = DOUBLE
+ Cell booleanCell2 = row.createCell(++testColumn);
+ booleanCell2.setCellValue(true);
+ Assert.assertEquals(Schema.Type.BOOLEAN, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.BOOLEAN, booleanCell2));
+ Cell numericCell2 = row.createCell(++testColumn);
+ numericCell2.setCellValue(1.0);
+ Assert.assertEquals(Schema.Type.DOUBLE, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.BOOLEAN, numericCell2));
+
+ // DOUBLE + NUMERIC = DOUBLE
+ Cell numericCell3 = row.createCell(++testColumn);
+ numericCell3.setCellValue(1.0);
+ Assert.assertEquals(Schema.Type.DOUBLE, reduceSchemaTypeMethod.invoke(xlsInputFormatSchemaDetector,
+ Schema.Type.DOUBLE, numericCell3));
+ }
+}
diff --git a/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtilsTest.java b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtilsTest.java
new file mode 100644
index 000000000..7557a4b5b
--- /dev/null
+++ b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsInputFormatUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Unit tests for {@link XlsInputFormatUtils}
+ */
+public class XlsInputFormatUtilsTest {
+
+ @Test
+ public void testGetSafeColumnNames() {
+ List columnNames = ImmutableList.of(
+ "column_A", "column_B", "column_C",
+ "column_A", "column_B", "column_C",
+ "\"column_A\"", "\"column_B\"", "\"column_C\"",
+ "1st column", "2nd column", "3rd column",
+ "column-1", "1column", "1234", "column#a",
+ "column", "column_1", "_column", "Column", "_COLUMN_1_2_",
+ "column_1", "column_1", "column_1_2", "s p a c e s", "1!)@#*$%&!@",
+ "1234", "\"", ",", " ", "_", " column#a"
+ );
+ List expectedColumnNames = ImmutableList.of(
+ "column_A", "column_B", "column_C",
+ "column_A_2", "column_B_2", "column_C_2",
+ "_column_A_", "_column_B_", "_column_C_",
+ "col_1st_column", "col_2nd_column", "col_3rd_column",
+ "column_1", "col_1column", "col_1234", "column_a_3",
+ "column", "column_1_2", "_column", "Column_2",
+ "_COLUMN_1_2_", "column_1_3", "column_1_4", "column_1_2_2",
+ "s_p_a_c_e_s", "col_1_", "col_1234_2", "_", "__2",
+ "BLANK", "__3", "column_a_4"
+ );
+ List actualColumnNames = XlsInputFormatUtils.getSafeColumnNames(columnNames);
+ Assert.assertEquals(expectedColumnNames.size(), actualColumnNames.size());
+ for (int i = 0; i < expectedColumnNames.size(); i++) {
+ Assert.assertEquals(expectedColumnNames.get(i), actualColumnNames.get(i));
+ }
+ }
+}
diff --git a/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsRowConverterTest.java b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsRowConverterTest.java
new file mode 100644
index 000000000..0e44260e5
--- /dev/null
+++ b/format-xls/src/test/java/io/cdap/plugin/format/xls/input/XlsRowConverterTest.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.format.xls.input;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import org.apache.poi.ss.usermodel.Cell;
+import org.apache.poi.ss.usermodel.FormulaError;
+import org.apache.poi.ss.usermodel.FormulaEvaluator;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Workbook;
+import org.apache.poi.ss.usermodel.WorkbookFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Unit tests for {@link XlsRowConverter}
+ */
+public class XlsRowConverterTest {
+ Workbook workbook;
+ Sheet sheet;
+
+ @Before
+ public void setUp() throws IOException {
+ // Mock XLS File
+ boolean newXssfFile = true;
+ workbook = WorkbookFactory.create(newXssfFile);
+ sheet = workbook.createSheet("sheet");
+ }
+
+ @Test
+ public void testFormatCellValue() {
+ Row row = sheet.createRow(0);
+ row.createCell(0).setCellValue("test");
+ int testColumn = 0;
+
+ Cell blankCell = row.createCell(++testColumn);
+ blankCell.setBlank();
+
+ Cell booleanCell = row.createCell(++testColumn);
+ booleanCell.setCellValue(true);
+
+ Cell numericCell = row.createCell(++testColumn);
+ numericCell.setCellValue(1.0);
+
+ Cell stringCell = row.createCell(++testColumn);
+ stringCell.setCellValue("test");
+
+ Cell errorCell = row.createCell(++testColumn);
+ errorCell.setCellErrorValue(FormulaError.DIV0.getCode());
+
+ Schema outputSchema = Schema.recordOf(
+ "record",
+ Schema.Field.of("string", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("blank", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("boolean", Schema.of(Schema.Type.BOOLEAN)),
+ Schema.Field.of("numeric", Schema.of(Schema.Type.DOUBLE)),
+ Schema.Field.of("string2", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("error", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
+ );
+ XlsRowConverter rowConverter = new XlsRowConverter(workbook.getCreationHelper().createFormulaEvaluator());
+ StructuredRecord record = rowConverter.convert(row, outputSchema).build();
+ Assert.assertEquals("test", record.get("string"));
+ Assert.assertNull(record.get("blank"));
+ Assert.assertEquals(true, record.get("boolean"));
+ Assert.assertEquals(1.0, record.get("numeric"), 0.0001);
+ Assert.assertEquals("test", record.get("string2"));
+ Assert.assertNull(record.get("error"));
+ }
+
+ @Test
+ public void testFormatCellValueWithCachedFormulaResult() {
+ Row row = sheet.createRow(0);
+ double numericValue1 = 1.0;
+ double numericValue2 = 2.0;
+ Cell a1Numeric = row.createCell(0);
+ a1Numeric.setCellValue(numericValue1);
+ Cell b1Numeric = row.createCell(1);
+ b1Numeric.setCellValue(numericValue2);
+
+ Row row2 = sheet.createRow(1);
+ String stringValue1 = "hello";
+ String stringValue2 = "world";
+ Cell a2String = row2.createCell(0);
+ a2String.setCellValue(stringValue1);
+ Cell b2String = row2.createCell(1);
+ b2String.setCellValue(stringValue2);
+
+ Row row3 = sheet.createRow(2);
+ Cell formulaCell = row3.createCell(0);
+ formulaCell.setCellFormula("SUM(A1:B1)");
+ Cell formulaCell2 = row3.createCell(1);
+ formulaCell2.setCellFormula("CONCAT(A2:B2)");
+
+ FormulaEvaluator evaluator = workbook.getCreationHelper().createFormulaEvaluator();
+ // Cache the formula results
+ evaluator.evaluateAll();
+
+ XlsRowConverter xlsRowConverter = new XlsRowConverter(evaluator);
+ Schema outputSchema = Schema.recordOf(
+ "record",
+ Schema.Field.of("numeric", Schema.of(Schema.Type.DOUBLE)),
+ Schema.Field.of("string", Schema.of(Schema.Type.STRING))
+ );
+ StructuredRecord record = xlsRowConverter.convert(row3, outputSchema).build();
+ Assert.assertEquals(3.0, record.get("numeric"), 0.0001);
+ Assert.assertEquals("helloworld", record.get("string"));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 63c00f4b7..4f6356f43 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
format-orc
format-parquet
format-text
+ format-xls
hbase-plugins
http-plugins
mongodb-plugins