diff --git a/.github/workflows/ci-hadoop3.yml b/.github/workflows/ci-hadoop3.yml index 7ce1820cd2..63cfbd2efd 100644 --- a/.github/workflows/ci-hadoop3.yml +++ b/.github/workflows/ci-hadoop3.yml @@ -26,7 +26,7 @@ jobs: strategy: fail-fast: false matrix: - java: [ { setup: '11', maven: '11' }, { setup: '17', maven: '17' } ] + java: [ { setup: '17', maven: '17' } ] codes: [ 'uncompressed,brotli', 'gzip,snappy' ] name: Build Parquet with JDK ${{ matrix.java.setup }} and ${{ matrix.codes }} @@ -44,14 +44,10 @@ jobs: bash dev/ci-before_install.sh - name: install run: | - EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout) - export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS" ./mvnw install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true -Djava.version=${{ matrix.java.maven }} - name: verify env: TEST_CODECS: ${{ matrix.codes }} JAVA_VERSION: ${{ matrix.java.setup }} run: | - EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout) - export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS" ./mvnw verify --batch-mode javadoc:javadoc diff --git a/.github/workflows/vector-plugins.yml b/.github/workflows/vector-plugins.yml index cc57e97ffd..911fb9f479 100644 --- a/.github/workflows/vector-plugins.yml +++ b/.github/workflows/vector-plugins.yml @@ -44,14 +44,10 @@ jobs: bash dev/ci-before_install.sh - name: install run: | - EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout) - export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS" ./mvnw install --batch-mode -Pvector-plugins -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true -Dmaven.buildNumber.skip=true -Djava.version=${{ matrix.java }} -pl parquet-plugins/parquet-encoding-vector,parquet-plugins/parquet-plugins-benchmarks -am - name: verify env: TEST_CODECS: ${{ matrix.codes }} JAVA_VERSION: ${{ matrix.java }} run: | - EXTRA_JAVA_TEST_ARGS=$(./mvnw help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout) - export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS" ./mvnw verify --batch-mode -Pvector-plugins javadoc:javadoc -pl parquet-plugins/parquet-encoding-vector,parquet-plugins/parquet-plugins-benchmarks -am diff --git a/.gitignore b/.gitignore index ad049afc9e..d51372d111 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ mvn_install.log .DS_Store .memsearch/ +.sdkmanrc + diff --git a/README.md b/README.md index 221970ee93..4b4f712adc 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ You can find additional details about the format and intended use cases in our [ ## Building -Parquet-Java uses Maven to build and depends on the thrift compiler (protoc is now managed by maven plugin). +Parquet-Java requires Java 17 or higher and uses Maven to build. It also depends on the thrift compiler (protoc is now managed by maven plugin). ### Install Thrift diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 3597898c30..6b0cfdb5e8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -763,7 +763,7 @@ public EncodingStats convertEncodingStats(List stats) { switch (stat.getPage_type()) { case DATA_PAGE_V2: builder.withV2Pages(); - // falls through + // falls through case DATA_PAGE: builder.addDataEncoding(getEncoding(stat.getEncoding()), stat.getCount()); break; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 98b49835a6..253cf2a6d1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -325,7 +325,7 @@ private String cacheKey(CompressionCodecName codecName) { level = conf.get("parquet.compression.codec.zstd.level"); break; default: - // compression level is not supported; ignore it + // compression level is not supported; ignore it } String codecClass = codecName.getHadoopCompressionCodecClassName(); return level == null ? codecClass : codecClass + ":" + level; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java index 92f7db413c..5142506d2a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java @@ -546,7 +546,7 @@ private void validateBoundaryOrder( prevMaxValue::toString); break; case UNORDERED: - // No checks necessary. + // No checks necessary. } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index b2b5233eeb..5d02e84632 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -103,8 +103,8 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) return new SnappyCompressor(); case ZSTD: return new ZstdCompressor(); - // todo: create class similar to the SnappyCompressor for zlib and exclude it as - // snappy is above since it also generates allocateDirect calls. + // todo: create class similar to the SnappyCompressor for zlib and exclude it as + // snappy is above since it also generates allocateDirect calls. default: return super.createCompressor(codecName); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..e5d196abdc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -65,6 +65,7 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -1361,12 +1362,42 @@ private void readVectored(List allParts, ChunkListBuilder b totalSize += len; } LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); - // Request a vectored read; - f.readVectored(ranges, options.getAllocator()); - int k = 0; - for (ConsecutivePartList consecutivePart : allParts) { - ParquetFileRange currRange = ranges.get(k++); - consecutivePart.readFromVectoredRange(currRange, builder); + // Use a capturing allocator to track all buffers allocated by Hadoop during vectored reads. + // The buffer returned from the read future may differ from the one originally allocated + // (e.g., ChecksumFileSystem wraps/copies buffers), so we must track the actual allocations. + List allocatedBuffers = new ArrayList<>(); + ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + ByteBuffer buf = options.getAllocator().allocate(size); + allocatedBuffers.add(buf); + return buf; + } + + @Override + public void release(ByteBuffer b) { + // Use identity comparison; ByteBuffer.equals() is content-based and could match wrong buffer + allocatedBuffers.removeIf(buf -> buf == b); + options.getAllocator().release(b); + } + + @Override + public boolean isDirect() { + return options.getAllocator().isDirect(); + } + }; + try { + // Request a vectored read; + f.readVectored(ranges, capturingAllocator); + int k = 0; + for (ConsecutivePartList consecutivePart : allParts) { + ParquetFileRange currRange = ranges.get(k++); + consecutivePart.readFromVectoredRange(currRange, builder); + } + } finally { + // Register all buffers allocated during vectored reads for release. + // In a finally block so buffers are not leaked on read failures. + builder.addBuffersToRelease(allocatedBuffers); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java index dd542f33a9..fe139d47e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java @@ -26,68 +26,37 @@ import org.slf4j.LoggerFactory; /** - * A Helper class which use reflections to clean up DirectBuffer. It's implemented for - * better compatibility with both java8 and java9+, because the Cleaner class is moved to - * another place since java9+. - *

- * Strongly inspired by: - * https://github.com/apache/tomcat/blob/master/java/org/apache/tomcat/util/buf/ByteBufferUtils.java + * A helper class which uses {@code sun.misc.Unsafe.invokeCleaner} to explicitly free + * direct ByteBuffers. */ public class CleanUtil { private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class); private static final Object unsafe; - private static final Method cleanerMethod; - private static final Method cleanMethod; private static final Method invokeCleanerMethod; - private static final int majorVersion = - Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]); - static { final ByteBuffer tempBuffer = ByteBuffer.allocateDirect(0); - Method cleanerMethodLocal = null; - Method cleanMethodLocal = null; Object unsafeLocal = null; Method invokeCleanerMethodLocal = null; - if (majorVersion >= 9) { - try { - final Class clazz = Class.forName("sun.misc.Unsafe"); - final Field theUnsafe = clazz.getDeclaredField("theUnsafe"); - theUnsafe.setAccessible(true); - unsafeLocal = theUnsafe.get(null); - invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class); - invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer); - } catch (IllegalAccessException - | IllegalArgumentException - | InvocationTargetException - | NoSuchMethodException - | SecurityException - | ClassNotFoundException - | NoSuchFieldException e) { - logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e); - unsafeLocal = null; - invokeCleanerMethodLocal = null; - } - } else { - try { - cleanerMethodLocal = tempBuffer.getClass().getMethod("cleaner"); - cleanerMethodLocal.setAccessible(true); - final Object cleanerObject = cleanerMethodLocal.invoke(tempBuffer); - cleanMethodLocal = cleanerObject.getClass().getMethod("clean"); - cleanMethodLocal.invoke(cleanerObject); - } catch (NoSuchMethodException - | SecurityException - | IllegalAccessException - | IllegalArgumentException - | InvocationTargetException e) { - logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e); - cleanerMethodLocal = null; - cleanMethodLocal = null; - } + try { + final Class clazz = Class.forName("sun.misc.Unsafe"); + final Field theUnsafe = clazz.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + unsafeLocal = theUnsafe.get(null); + invokeCleanerMethodLocal = clazz.getMethod("invokeCleaner", ByteBuffer.class); + invokeCleanerMethodLocal.invoke(unsafeLocal, tempBuffer); + } catch (IllegalAccessException + | IllegalArgumentException + | InvocationTargetException + | NoSuchMethodException + | SecurityException + | ClassNotFoundException + | NoSuchFieldException e) { + logger.warn("Cannot use direct ByteBuffer cleaner, memory leaking may occur", e); + unsafeLocal = null; + invokeCleanerMethodLocal = null; } - cleanerMethod = cleanerMethodLocal; - cleanMethod = cleanMethodLocal; unsafe = unsafeLocal; invokeCleanerMethod = invokeCleanerMethodLocal; } @@ -97,16 +66,7 @@ private CleanUtil() { } public static void cleanDirectBuffer(ByteBuffer buf) { - if (cleanMethod != null) { - try { - cleanMethod.invoke(cleanerMethod.invoke(buf)); - } catch (IllegalAccessException - | IllegalArgumentException - | InvocationTargetException - | SecurityException e) { - logger.warn("Error while cleaning up the DirectBuffer", e); - } - } else if (invokeCleanerMethod != null) { + if (invokeCleanerMethod != null) { try { invokeCleanerMethod.invoke(unsafe, buf); } catch (IllegalAccessException diff --git a/parquet-plugins/parquet-encoding-vector/pom.xml b/parquet-plugins/parquet-encoding-vector/pom.xml index 06c4084be1..4ceb0b43d4 100644 --- a/parquet-plugins/parquet-encoding-vector/pom.xml +++ b/parquet-plugins/parquet-encoding-vector/pom.xml @@ -34,11 +34,6 @@ Apache Parquet Encodings Vector https://parquet.apache.org - - - 17 - - org.apache.parquet diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java index 7e8a1633f8..72ada5ae38 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java @@ -226,7 +226,7 @@ private boolean readOneValue(TProtocol in, byte type, List buffer, Thrif writeShortAction(buffer, s); break; case TType.ENUM: // same as i32 => actually never seen in the protocol layer as enums are written as a i32 - // field + // field case TType.I32: final int i = in.readI32(); checkEnum(expectedType, i); diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ProtocolReadToWrite.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ProtocolReadToWrite.java index 94c68b0081..830a5c605b 100644 --- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ProtocolReadToWrite.java +++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ProtocolReadToWrite.java @@ -74,7 +74,7 @@ void readOneValue(TProtocol in, TProtocol out, byte type) throws TException { out.writeI16(in.readI16()); break; case TType.ENUM: // same as i32 => actually never seen in the protocol layer as enums are written as a i32 - // field + // field case TType.I32: out.writeI32(in.readI32()); break; diff --git a/pom.xml b/pom.xml index 1bd9893d87..bc530c01ca 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ - 11 + 17 github com.fasterxml.jackson.core com.fasterxml.jackson.datatype @@ -80,10 +80,10 @@ 2.21 0.25.7 1.3.2 - 2.46.1 + 3.5.1 shaded.parquet - 3.3.0 + 3.4.3 2.12.0 1.17.0 thrift @@ -119,23 +119,6 @@ true - - - -XX:+IgnoreUnrecognizedVMOptions - --add-opens=java.base/java.lang=ALL-UNNAMED - --add-opens=java.base/java.lang.invoke=ALL-UNNAMED - --add-opens=java.base/java.lang.reflect=ALL-UNNAMED - --add-opens=java.base/java.io=ALL-UNNAMED - --add-opens=java.base/java.net=ALL-UNNAMED - --add-opens=java.base/java.nio=ALL-UNNAMED - --add-opens=java.base/java.util=ALL-UNNAMED - --add-opens=java.base/java.util.concurrent=ALL-UNNAMED - --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED - --add-opens=java.base/sun.nio.ch=ALL-UNNAMED - --add-opens=java.base/sun.nio.cs=ALL-UNNAMED - --add-opens=java.base/sun.security.action=ALL-UNNAMED - --add-opens=java.base/sun.util.calendar=ALL-UNNAMED - @@ -284,6 +267,21 @@ maven-enforcer-plugin + + enforce-java-version + + enforce + + + + + [17,) + Java 17 or higher is required to build parquet-java. + + + true + + enforce-banned-dependencies @@ -432,9 +430,6 @@ org.apache.maven.plugins maven-failsafe-plugin - - ${extraJavaTestArgs} - @@ -449,7 +444,7 @@ org.apache.maven.plugins maven-surefire-plugin - ${surefire.argLine} ${extraJavaTestArgs} + ${surefire.argLine} + org.apache.parquet.internal.column.columnindex.IndexIterator org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReader#gatherElementDataFromStreams(byte[])