Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 30, 2024
1 parent 66a9401 commit f1dd7e9
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,20 +867,23 @@ record -> {
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
}

private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileAllRecordsByPayloadForSecIndex(HoodieSeekingFileReader reader, Set<String> keySet, String partitionName) throws IOException {
private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileAllRecordsByPayloadForSecIndex(
HoodieSeekingFileReader<?> reader, Set<String> keySet, String partitionName) throws IOException {
if (reader == null) {
// No base file at all
return Collections.emptyMap();
}

ClosableIterator<HoodieRecord<?>> recordIterator = reader.getRecordIterator();

return toStream(recordIterator).map(record -> {
GenericRecord data = (GenericRecord) record.getData();
return composeRecord(data, partitionName);
}).filter(record -> {
return keySet.contains(SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey()));
}).collect(Collectors.toMap(HoodieRecord::getRecordKey, record -> record));
try (ClosableIterator<? extends HoodieRecord<?>> recordIterator = reader.getRecordIterator()) {
return toStream(recordIterator).map(record -> {
GenericRecord data = (GenericRecord) record.getData();
return composeRecord(data, partitionName);
}).filter(record -> keySet.contains(SecondaryIndexKeyUtils
.getRecordKeyFromSecondaryIndexKey(record.getRecordKey())))
.collect(Collectors.toMap(record ->
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey()),
record -> record));
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,
StoragePath filePath,
int bufferSize,
boolean wrapStream) {
FSDataInputStream fsDataInputStream = null;
FSDataInputStream fsDataInputStream;
try {
fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
} catch (IOException e) {
Expand All @@ -230,16 +230,16 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,

if (isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, filePath, bufferSize), true);
return new SchemeAwareFSDataInputStream(new WrappedWithOriginalFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, filePath, bufferSize), fsDataInputStream), true);
}

if (isCHDFileSystem(fs)) {
return new BoundedFsDataInputStream(fs, convertToHadoopPath(filePath), fsDataInputStream);
}

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(convertToHadoopPath(filePath), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
return new TimedFSDataInputStream(convertToHadoopPath(filePath), new WrappedWithOriginalFSDataInputStream(new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)), fsDataInputStream));
}

// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.hadoop.fs;

import org.apache.hadoop.fs.FSDataInputStream;

import java.io.IOException;
import java.io.InputStream;

/**
* A wrapper class for {@link FSDataInputStream} that ensures the original input stream
* is properly closed when this stream is closed.
*/
public class WrappedWithOriginalFSDataInputStream extends FSDataInputStream {

private final InputStream originalStream;

public WrappedWithOriginalFSDataInputStream(InputStream wrappedStream, InputStream originalStream) {
super(wrappedStream);
this.originalStream = originalStream;
}

@Override
public void close() throws IOException {
super.close();
originalStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,10 @@ public void releaseBuffer(ByteBuffer buffer) {
public void unbuffer() {
outerStream.unbuffer();
}

@Override
public void close() throws IOException {
super.close();
outerStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema)

reader.getRecordIterator(requiredAvroSchema).asScala
reader.getRecordIterator(requiredAvroSchema)
.map(record => {
avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ package org.apache.hudi

import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.storage.StoragePathInfo

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.{And, Filter, Or}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

import java.io.Closeable
import scala.collection.JavaConverters._

object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
Expand Down Expand Up @@ -66,6 +67,13 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {

file: PartitionedFile => {
val iter = readParquetFile(file)
iter match {
case closeable: Closeable =>
// register a callback to close logScanner which will be executed on task completion.
// when tasks finished, this method will be called, and release resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => closeable.close()))
case _ =>
}
iter.flatMap {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache

import org.apache.hudi.common.util.collection.ClosableIterator
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}

import java.io.Closeable

package object hudi {

/**
Expand All @@ -35,4 +38,21 @@ package object hudi {
def avro: String => DataFrame = reader.format("org.apache.hudi").load
}

/**
* An implicit class that cast java [[ClosableIterator]] to a scala [[Iterator]].
*/
implicit class ScalaClosableIterator[T](closableIterator: ClosableIterator[T]) extends Iterator[T] with Closeable {

override def hasNext: Boolean = {
val _hasNext = closableIterator.hasNext
if (!_hasNext) {
close()
}
_hasNext
}

override def next(): T = closableIterator.next()

override def close(): Unit = closableIterator.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
options.foreach(kv => props.setProperty(kv._1, kv._2))
val reader = new HoodieFileGroupReader[InternalRow](
readerContext,
new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
metaClient.getStorage,
tableState.tablePath,
tableState.latestCommitTimestamp.get,
fileSlice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ShowBootstrapMappingProcedure extends BaseProcedure with ProcedureBuilder
mappingList.addAll(indexReader.getSourceFileMappingForPartition(part))
}
}
indexReader.close()

val rows: java.util.List[Row] = mappingList.asScala
.map(mapping => Row(mapping.getPartitionPath, mapping.getFileId, mapping.getBootstrapBasePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ class ShowBootstrapPartitionsProcedure extends BaseProcedure with ProcedureBuild
val metaClient = createMetaClient(jsc, basePath)

val indexReader = createBootstrapIndexReader(metaClient)
val indexedPartitions = indexReader.getIndexedPartitionPaths

indexedPartitions.stream().toArray.map(r => Row(r)).toList
try {
val indexedPartitions = indexReader.getIndexedPartitionPaths
indexedPartitions.stream().toArray.map(r => Row(r)).toList
} finally {
indexReader.close()
}
}

private def createBootstrapIndexReader(metaClient: HoodieTableMetaClient) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,25 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest}
import org.apache.spark.SparkConf
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageContains
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.Waiters.{interval, timeout}
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Tag}
import org.slf4j.LoggerFactory

import java.io.File
import java.util.TimeZone
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import scala.util.matching.Regex

class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
private val LOG = LoggerFactory.getLogger(getClass)

Expand All @@ -63,6 +65,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
DateTimeZone.setDefault(DateTimeZone.UTC)
TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
protected lazy val spark: SparkSession = SparkSession.builder()
.config("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
.config("spark.sql.session.timeZone", "UTC")
.config("hoodie.insert.shuffle.parallelism", "4")
Expand Down Expand Up @@ -117,6 +120,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
s"h${tableId.incrementAndGet()}"
}

protected override def beforeEach(): Unit = {
super.beforeEach()
DebugFilesystem.clearOpenStreams()
}

protected override def afterEach(): Unit = {
super.afterEach()
// Clear all persistent datasets after each test
spark.sharedState.cacheManager.clearCache()
// files can be closed from other threads, so wait a bit
// normally this doesn't take more than 1s
eventually(timeout(10.seconds), interval(2.seconds)) {
DebugFilesystem.assertNoOpenStreams()
}
}

override protected def afterAll(): Unit = {
Utils.deleteRecursively(sparkWareHouse)
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,33 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
}
}

test("Test Delete MOR Table") {
withTable(generateTableName) { tableName =>
spark.sql(
s"""
|create table $tableName (id int, name string, ts bigint)
|using hudi
|tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|""".stripMargin)
spark.sql(
s"""
|insert into $tableName values (1, "v1", 1000), (2, "v2", 2000),
| (3, "v1", 3000), (4, "v2", 4000)
|""".stripMargin)
spark.sql(
s"""
|delete from $tableName where id = 1
|""".stripMargin)
checkAnswer(s"select id, name from $tableName where name = 'v1'")(
Seq(3, "v1")
)
}
}

test("Test Delete Table with op upsert") {
withTempDir { tmp =>
Seq("cow", "mor").foreach {tableType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.dml

import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieReaderConfig, HoodieStorageConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
Expand Down Expand Up @@ -470,19 +470,23 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase {
metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
avroSchema, 1024 * 1024, false, false,
"id", null)
assertTrue(logReader.hasNext)
val logBlockHeader = logReader.next().getLogBlockHeader
assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
if (isPartial) {
assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
} else {
assertFalse(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
try {
assertTrue(logReader.hasNext)
val logBlockHeader = logReader.next().getLogBlockHeader
assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
if (isPartial) {
assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
} else {
assertFalse(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
}
val actualSchema = new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
val expectedSchema = HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
avroSchema, changedFields(i).asJava), false)
assertEquals(expectedSchema, actualSchema)
} finally {
logReader.close()
}
val actualSchema = new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
val expectedSchema = HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
avroSchema, changedFields(i).asJava), false)
assertEquals(expectedSchema, actualSchema)
}
}

Expand Down

0 comments on commit f1dd7e9

Please sign in to comment.