Skip to content

Commit

Permalink
0.1.11 Release (#30)
Browse files Browse the repository at this point in the history
* Schema match checking (#27)

* initial look at introducing a schema match check

* update targeted spark version (excluding 3.4 currently)

* Spark 3.4 update (#29)

* implement required changes for the Spark 3.4.0 release

* Update GitHub Action

* Update spark.yml

move to non-deprecated versions

* update remaining action and remove deprecated setting

* add new test to cover more of the options setting

* add missing copyright header

* add unit test to cover exception case

* update README.md

* remove TOC

* update version to final
  • Loading branch information
dazfuller authored May 29, 2023
1 parent 14cac16 commit c2511ab
Show file tree
Hide file tree
Showing 21 changed files with 821 additions and 147 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/spark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ jobs:
build:
strategy:
matrix:
sparkVersion: [ 3.0.1, 3.0.2, 3.0.3, 3.1.1, 3.1.2, 3.1.3, 3.2.0, 3.2.1, 3.2.2, 3.3.0 ]
sparkVersion: [ 3.0.1, 3.0.2, 3.0.3, 3.1.2, 3.2.1, 3.2.4, 3.3.0, 3.3.1, 3.3.2, 3.4.0 ]

runs-on: ubuntu-latest

steps:

- name: Checkout with LFS
uses: actions/checkout@v2
uses: actions/checkout@v3.5.2
with:
lfs: true

- name: Set up JDK 8
uses: actions/setup-java@v2
uses: actions/setup-java@v3.11.0
with:
java-version: '8'
distribution: 'adopt'
Expand All @@ -33,14 +33,14 @@ jobs:
run: sbt -DsparkVersion="${{matrix.sparkVersion}}" clean coverageOn test coverageReport

- name: Upload coverage to CodeCov
uses: codecov/codecov-action@v1.5.2
uses: codecov/codecov-action@v3.1.4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./target/spark-${{ matrix.sparkVersion }}/scala-2.12/coverage-report/cobertura.xml
env_vars: ${{ matrix.sparkVersion }}
fail_ci_if_error: true
name: spark-excel
path_to_write_report: ./target/spark-${{ matrix.sparkVersion }}/scala-2.12/coverage-report/codecov_report.txt
# path_to_write_report: ./target/spark-${{ matrix.sparkVersion }}/scala-2.12/coverage-report/codecov_report.txt
verbose: true

- name: Create assembly
Expand All @@ -49,7 +49,7 @@ jobs:

- name: Upload the package
if: ${{ github.event_name != 'pull_request' }}
uses: actions/upload-artifact@v2.2.4
uses: actions/upload-artifact@v3.1.2
with:
path: ./target/spark-${{ matrix.sparkVersion }}/scala-2.12/spark-excel*.jar
if-no-files-found: warn
48 changes: 39 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,22 @@ version of Spark you're developing against.
- Cleans column names on read to remove invalid characters which can cause issues in Spark (spaces, periods etc...)
- Handle merged cells (repeats data to all cells in the merged region)
- Formula evaluation (for those supported by Apache POI)
- Schema matching validation in output (set via an option)
- Works in Scala, PySpark, and Spark SQL

## Usage

Standard usage of the library works as follows:

### Scala

```scala
// Scala
val df = spark.read
.format("com.elastacloud.spark.excel")
.option("cellAddress", "A1")
.load("/path/to/my_file.xlsx")
```

```python
# Python
df = spark.read
.format("com.elastacloud.spark.excel")
.option("cellAddress", "A1")
.load("/path/to/my_file.xlsx")
```

A short name has been provided for convenience, as well as convenience method (Scala only currently).

```scala
Expand All @@ -68,6 +62,40 @@ val df = spark.read
.excel("/path/to/my_file.xlsx")
```

### Python

```python
df = (spark.read
.format("com.elastacloud.spark.excel")
.option("cellAddress", "A1")
.load("/path/to/my_file.xlsx")
)
```
### SQL

Accessing Excel files directly (N.B. this style of access requires back-ticks instead of single-quotes)

```sql
SELECT
*
FROM
excel.`/path/to/my_file.xlsx`
```

Accessing using options by creating a view

```sql
CREATE TEMPORARY VIEW sample_excel
USING excel
OPTIONS (
path '/path/to/my_file.xlsx',
schemaMatchColumnName 'is_value'
);


SELECT * FROM sample_excel;
```

All of these methods accept glob patterns and multiple path values.

```scala
Expand Down Expand Up @@ -96,6 +124,7 @@ The library supports the following options:
| maxRowCount | Int | 1000 | Number of records to read to infer the schema. If set to 0 (zero) then all available rows will be read |
| maxBytesForTempFiles | Int | 10000000 | Sets the number of bytes at which a workbook is (ooxml format) is regarded as too large to hold in memory and the data is put into temp files instead. Whilst the cluster may have large volumes of memory, the node processing the file will be limited. |
| thresholdBytesForTempFiles | Int | 10000000 | _Alias for maxBytesForTempFiles_ |
| schemaMatchColumnName | Boolean | False | Defines the column name to write the flag indicating if the current record matches the provided or inferred schema. If the schema is provided then the column name must exist within that provided schema. |

```scala
val df = spark.read
Expand All @@ -107,6 +136,7 @@ val df = spark.read
.option("sheetNamePattern", """Sheet[13]""") // Read data from all sheets matching this pattern (e.g. Sheet1 and Sheet3)
.option("maxRowCount", 10) // Read only the first 10 records to determine the schema of the data
.option("thresholdBytesForTempFiles", 50000000) // Set size limit before temp files are used
.option("schemaMatchColumnName", "_isValid") // Write a flag to the '_isValid' column indicating if the records matches the schema
.load("/path/to/file.xlsx")
```

Expand Down
2 changes: 1 addition & 1 deletion build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
https://www.elastacloud.com
#>

$versions = @("3.0.1", "3.0.2", "3.0.3", "3.1.1", "3.1.2", "3.1.3", "3.2.0", "3.2.1")
$versions = @("3.0.1", "3.0.2", "3.1.2", "3.2.1", "3.2.4", "3.3.0", "3.3.1", "3.3.2", "3.4.0")
$jarPath = "./target/jars"
$covPath = "./target/coverage"

Expand Down
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ licenses += ("Apache License, Version 2.0", url("https://www.apache.org/licenses
Compile / unmanagedSourceDirectories ++= {
if (sparkVersion.value < "3.3.0") {
Seq(baseDirectory.value / "src/main/3.0/scala")
} else {
} else if (sparkVersion.value < "3.4.0") {
Seq(baseDirectory.value / "src/main/3.3/scala")
} else {
Seq(baseDirectory.value / "src/main/3.4/scala")
}
}

Expand Down Expand Up @@ -115,17 +117,19 @@ addArtifact(Compile / assembly / artifact, assembly)

// Define common settings for the library
val commonSettings = Seq(
sparkVersion := System.getProperty("sparkVersion", "3.3.0"),
sparkExcelVersion := "0.1.11-SNAPSHOT",
sparkVersion := System.getProperty("sparkVersion", "3.4.0"),
sparkExcelVersion := "0.1.11",
version := s"${sparkVersion.value}_${sparkExcelVersion.value}",
scalaVersion := {
if (sparkVersion.value < "3.2.0") {
"2.12.10"
} else {
} else if (sparkVersion.value < "3.4.0") {
"2.12.14"
} else {
"2.12.15"
}
},
scalaTestVersion := "3.2.11",
scalaTestVersion := "3.2.16",
poiVersion := "5.2.2",
crossVersion := CrossVersion.disabled
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.elastacloud.spark.excel

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf

/**
* Common options for the file-based data source.
*/
class FileSourceOptions(
@transient private val parameters: CaseInsensitiveMap[String])
extends Serializable {

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

val ignoreCorruptFiles: Boolean = parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean)
.getOrElse(SQLConf.get.ignoreCorruptFiles)

val ignoreMissingFiles: Boolean = parameters.get(IGNORE_MISSING_FILES).map(_.toBoolean)
.getOrElse(SQLConf.get.ignoreMissingFiles)
}

object FileSourceOptions {
val IGNORE_CORRUPT_FILES = "ignoreCorruptFiles"
val IGNORE_MISSING_FILES = "ignoreMissingFiles"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021 Elastacloud Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.elastacloud.spark.excel

import com.elastacloud.spark.excel.parser.ExcelParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{CodecStreams, PartitionedFile}
import org.apache.spark.sql.types.StructType

import java.net.URI

private[excel] class ExcelDataIterator(options: ExcelParserOptions) extends Logging {
def readFile(conf: Configuration, file: PartitionedFile, schema: StructType): Iterator[InternalRow] = {
val filePath = new Path(new URI(file.filePath))
val inputStream = CodecStreams.createInputStreamWithCloseResource(conf, filePath)

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => inputStream.close()))

val parser = new ExcelParser(inputStream, options, Some(schema), Some(schema))
parser.getDataIterator.map(it => InternalRow.fromSeq(it))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 Elastacloud Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.elastacloud.spark.excel

import com.elastacloud.spark.excel.parser.ExcelParser
import org.apache.hadoop.fs.Path
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.{CodecStreams, PartitionedFile}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import java.net.URI

private[excel] class ExcelPartitionReader(
path: PartitionedFile,
broadcastConf: Broadcast[SerializableConfiguration],
excelOptions: ExcelParserOptions,
schema: StructType,
readSchema: StructType
) extends PartitionReader[InternalRow] {
private val filePath = new Path(new URI(path.filePath))
private val inputStream = CodecStreams.createInputStreamWithCloseResource(broadcastConf.value.value, filePath)
private val excelParser = new ExcelParser(inputStream, excelOptions, Some(schema), Some(readSchema))
private val dataIterator = excelParser.getDataIterator

override def next(): Boolean = dataIterator.hasNext

override def get(): InternalRow = {
val rowData = dataIterator.next()
InternalRow.fromSeq(rowData)
}

override def close(): Unit = {
excelParser.close()
inputStream.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2021 Elastacloud Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.elastacloud.spark.excel

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

private[excel] case class ExcelPartitionReaderFactory(
sqlConf: SQLConf,
broadcastConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
excelOptions: ExcelParserOptions
) extends FilePartitionReaderFactory {
override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
val reader = new ExcelPartitionReader(partitionedFile, broadcastConf, excelOptions, dataSchema, readDataSchema)
new PartitionReaderWithPartitionValues(reader, readDataSchema, partitionSchema, partitionedFile.partitionValues)
}
}
Loading

0 comments on commit c2511ab

Please sign in to comment.