Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .github/workflows/ci-hadoop3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [ { setup: '11', maven: '11' }, { setup: '17', maven: '17' } ]
java: [ { setup: '17', maven: '17' } ]
codes: [ 'uncompressed,brotli', 'gzip,snappy' ]
name: Build Parquet with JDK ${{ matrix.java.setup }} and ${{ matrix.codes }}

Expand All @@ -44,14 +44,10 @@ jobs:
bash dev/ci-before_install.sh
- name: install
run: |
EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout)
export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS"
./mvnw install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true -Djava.version=${{ matrix.java.maven }}
- name: verify
env:
TEST_CODECS: ${{ matrix.codes }}
JAVA_VERSION: ${{ matrix.java.setup }}
run: |
EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout)
export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS"
./mvnw verify --batch-mode javadoc:javadoc
4 changes: 0 additions & 4 deletions .github/workflows/vector-plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,10 @@ jobs:
bash dev/ci-before_install.sh
- name: install
run: |
EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout)
export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS"
./mvnw install --batch-mode -Pvector-plugins -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true -Dmaven.buildNumber.skip=true -Djava.version=${{ matrix.java }} -pl parquet-plugins/parquet-encoding-vector,parquet-plugins/parquet-plugins-benchmarks -am
- name: verify
env:
TEST_CODECS: ${{ matrix.codes }}
JAVA_VERSION: ${{ matrix.java }}
run: |
EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout)
export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS"
./mvnw verify --batch-mode -Pvector-plugins javadoc:javadoc -pl parquet-plugins/parquet-encoding-vector,parquet-plugins/parquet-plugins-benchmarks -am
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ mvn_install.log
.DS_Store
.memsearch/

.sdkmanrc

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ You can find additional details about the format and intended use cases in our [

## Building

Parquet-Java uses Maven to build and depends on the thrift compiler (protoc is now managed by maven plugin).
Parquet-Java requires Java 17 or higher and uses Maven to build. It also depends on the thrift compiler (protoc is now managed by maven plugin).

### Install Thrift

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ public EncodingStats convertEncodingStats(List<PageEncodingStats> stats) {
switch (stat.getPage_type()) {
case DATA_PAGE_V2:
builder.withV2Pages();
// falls through
// falls through
case DATA_PAGE:
builder.addDataEncoding(getEncoding(stat.getEncoding()), stat.getCount());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private String cacheKey(CompressionCodecName codecName) {
level = conf.get("parquet.compression.codec.zstd.level");
break;
default:
// compression level is not supported; ignore it
// compression level is not supported; ignore it
}
String codecClass = codecName.getHadoopCompressionCodecClassName();
return level == null ? codecClass : codecClass + ":" + level;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ private void validateBoundaryOrder(
prevMaxValue::toString);
break;
case UNORDERED:
// No checks necessary.
// No checks necessary.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName)
return new SnappyCompressor();
case ZSTD:
return new ZstdCompressor();
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
// snappy is above since it also generates allocateDirect calls.
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
// snappy is above since it also generates allocateDirect calls.
default:
return super.createCompressor(codecName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
Expand Down Expand Up @@ -1361,12 +1362,42 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
totalSize += len;
}
LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
// Request a vectored read;
f.readVectored(ranges, options.getAllocator());
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
consecutivePart.readFromVectoredRange(currRange, builder);
// Use a capturing allocator to track all buffers allocated by Hadoop during vectored reads.
// The buffer returned from the read future may differ from the one originally allocated
// (e.g., ChecksumFileSystem wraps/copies buffers), so we must track the actual allocations.
List<ByteBuffer> allocatedBuffers = new ArrayList<>();
ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(int size) {
ByteBuffer buf = options.getAllocator().allocate(size);
allocatedBuffers.add(buf);
return buf;
}

@Override
public void release(ByteBuffer b) {
// Use identity comparison; ByteBuffer.equals() is content-based and could match wrong buffer
allocatedBuffers.removeIf(buf -> buf == b);
options.getAllocator().release(b);
}

@Override
public boolean isDirect() {
return options.getAllocator().isDirect();
}
};
try {
// Request a vectored read;
f.readVectored(ranges, capturingAllocator);
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
consecutivePart.readFromVectoredRange(currRange, builder);
}
} finally {
// Register all buffers allocated during vectored reads for release.
// In a finally block so buffers are not leaked on read failures.
builder.addBuffersToRelease(allocatedBuffers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,68 +26,37 @@
import org.slf4j.LoggerFactory;

/**
* A Helper class which use reflections to clean up DirectBuffer. It's implemented for
* better compatibility with both java8 and java9+, because the Cleaner class is moved to
* another place since java9+.
* <p>
* Strongly inspired by:
* https://github.com/apache/tomcat/blob/master/java/org/apache/tomcat/util/buf/ByteBufferUtils.java
* A helper class which uses {@code sun.misc.Unsafe.invokeCleaner} to explicitly free
* direct ByteBuffers.
*/
public class CleanUtil {
private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class);

private static final Object unsafe;
private static final Method cleanerMethod;
private static final Method cleanMethod;
private static final Method invokeCleanerMethod;

private static final int majorVersion =
Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]);

static {
final ByteBuffer tempBuffer = ByteBuffer.allocateDirect(0);
Method cleanerMethodLocal = null;
Method cleanMethodLocal = null;
Object unsafeLocal = null;
Method invokeCleanerMethodLocal = null;
if (majorVersion >= 9) {
try {
final Class<?> clazz = Class.forName("sun.misc.Unsafe");
final Field theUnsafe = clazz.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafeLocal = theUnsafe.get(null);
invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class);
invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer);
} catch (IllegalAccessException
| IllegalArgumentException
| InvocationTargetException
| NoSuchMethodException
| SecurityException
| ClassNotFoundException
| NoSuchFieldException e) {
logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
unsafeLocal = null;
invokeCleanerMethodLocal = null;
}
} else {
try {
cleanerMethodLocal = tempBuffer.getClass().getMethod("cleaner");
cleanerMethodLocal.setAccessible(true);
final Object cleanerObject = cleanerMethodLocal.invoke(tempBuffer);
cleanMethodLocal = cleanerObject.getClass().getMethod("clean");
cleanMethodLocal.invoke(cleanerObject);
} catch (NoSuchMethodException
| SecurityException
| IllegalAccessException
| IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
cleanerMethodLocal = null;
cleanMethodLocal = null;
}
try {
final Class<?> clazz = Class.forName("sun.misc.Unsafe");
final Field theUnsafe = clazz.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafeLocal = theUnsafe.get(null);
invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class);
invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer);
} catch (IllegalAccessException
| IllegalArgumentException
| InvocationTargetException
| NoSuchMethodException
| SecurityException
| ClassNotFoundException
| NoSuchFieldException e) {
logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e);
unsafeLocal = null;
invokeCleanerMethodLocal = null;
}
cleanerMethod = cleanerMethodLocal;
cleanMethod = cleanMethodLocal;
unsafe = unsafeLocal;
invokeCleanerMethod = invokeCleanerMethodLocal;
}
Expand All @@ -97,16 +66,7 @@ private CleanUtil() {
}

public static void cleanDirectBuffer(ByteBuffer buf) {
if (cleanMethod != null) {
try {
cleanMethod.invoke(cleanerMethod.invoke(buf));
} catch (IllegalAccessException
| IllegalArgumentException
| InvocationTargetException
| SecurityException e) {
logger.warn("Error while cleaning up the DirectBuffer", e);
}
} else if (invokeCleanerMethod != null) {
if (invokeCleanerMethod != null) {
try {
invokeCleanerMethod.invoke(unsafe, buf);
} catch (IllegalAccessException
Expand Down
5 changes: 0 additions & 5 deletions parquet-plugins/parquet-encoding-vector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@
<name>Apache Parquet Encodings Vector</name>
<url>https://parquet.apache.org</url>

<properties>
<!-- Those properties prevent Java 11 to try and compile this code -->
<maven.compiler.release>17</maven.compiler.release>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private boolean readOneValue(TProtocol in, byte type, List<Action> buffer, Thrif
writeShortAction(buffer, s);
break;
case TType.ENUM: // same as i32 => actually never seen in the protocol layer as enums are written as a i32
// field
// field
case TType.I32:
final int i = in.readI32();
checkEnum(expectedType, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void readOneValue(TProtocol in, TProtocol out, byte type) throws TException {
out.writeI16(in.readI16());
break;
case TType.ENUM: // same as i32 => actually never seen in the protocol layer as enums are written as a i32
// field
// field
case TType.I32:
out.writeI32(in.readI32());
break;
Expand Down
45 changes: 20 additions & 25 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
</mailingLists>

<properties>
<maven.compiler.release>11</maven.compiler.release>
<maven.compiler.release>17</maven.compiler.release>
<github.global.server>github</github.global.server>
<jackson.groupId>com.fasterxml.jackson.core</jackson.groupId>
<jackson.datatype.groupId>com.fasterxml.jackson.datatype</jackson.datatype.groupId>
Expand All @@ -80,10 +80,10 @@
<jackson-annotations.version>2.21</jackson-annotations.version>
<japicmp.version>0.25.7</japicmp.version>
<javax.annotation.version>1.3.2</javax.annotation.version>
<spotless.version>2.46.1</spotless.version>
<spotless.version>3.5.1</spotless.version>
<shade.prefix>shaded.parquet</shade.prefix>
<!-- Guarantees no newer classes/methods/constants are used by parquet. -->
<hadoop.version>3.3.0</hadoop.version>
<hadoop.version>3.4.3</hadoop.version>
<parquet.format.version>2.12.0</parquet.format.version>
<previous.version>1.17.0</previous.version>
<thrift.executable>thrift</thrift.executable>
Expand Down Expand Up @@ -119,23 +119,6 @@

<!-- Resource intesive tests are enabled by default but disabled in the CI envrionment -->
<enableResourceIntensiveTests>true</enableResourceIntensiveTests>

<extraJavaTestArgs>
-XX:+IgnoreUnrecognizedVMOptions
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
</extraJavaTestArgs>
</properties>

<modules>
Expand Down Expand Up @@ -284,6 +267,21 @@
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-java-version</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireJavaVersion>
<version>[17,)</version>
<message>Java 17 or higher is required to build parquet-java.</message>
</requireJavaVersion>
</rules>
<fail>true</fail>
</configuration>
</execution>
<execution>
<id>enforce-banned-dependencies</id>
<goals>
Expand Down Expand Up @@ -432,9 +430,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<argLine>${extraJavaTestArgs}</argLine>
</configuration>
<executions>
<execution>
<goals>
Expand All @@ -449,7 +444,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${surefire.argLine} ${extraJavaTestArgs}</argLine>
<argLine>${surefire.argLine}</argLine>
<systemPropertyVariables>
<!-- Configure Parquet logging during tests
See http://www.slf4j.org/api/org/slf4j/impl/SimpleLogger.html
Expand Down Expand Up @@ -590,7 +585,7 @@
</excludeModules>
<excludes>
<exclude>${shade.prefix}</exclude>
<!-- JDK 11 adds interface methods/bridges on PrimitiveIterator.OfInt; ignore japicmp source incompatibility -->
<!-- JDK adds interface methods/bridges on PrimitiveIterator.OfInt; ignore japicmp source incompatibility -->
<exclude>org.apache.parquet.internal.column.columnindex.IndexIterator</exclude>
<!-- Removal of a protected method in a class that's not supposed to be subclassed by third-party code -->
<exclude>org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])</exclude>
Expand Down