Skip to content

Commit

Permalink
v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 29, 2024
1 parent 66a9401 commit d59cdaa
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -867,20 +867,23 @@ record -> {
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
}

private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileAllRecordsByPayloadForSecIndex(HoodieSeekingFileReader reader, Set<String> keySet, String partitionName) throws IOException {
private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileAllRecordsByPayloadForSecIndex(
HoodieSeekingFileReader<?> reader, Set<String> keySet, String partitionName) throws IOException {
if (reader == null) {
// No base file at all
return Collections.emptyMap();
}

ClosableIterator<HoodieRecord<?>> recordIterator = reader.getRecordIterator();

return toStream(recordIterator).map(record -> {
GenericRecord data = (GenericRecord) record.getData();
return composeRecord(data, partitionName);
}).filter(record -> {
return keySet.contains(SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey()));
}).collect(Collectors.toMap(HoodieRecord::getRecordKey, record -> record));
try (ClosableIterator<? extends HoodieRecord<?>> recordIterator = reader.getRecordIterator()) {
return toStream(recordIterator).map(record -> {
GenericRecord data = (GenericRecord) record.getData();
return composeRecord(data, partitionName);
}).filter(record -> keySet.contains(SecondaryIndexKeyUtils
.getRecordKeyFromSecondaryIndexKey(record.getRecordKey())))
.collect(Collectors.toMap(record ->
SecondaryIndexKeyUtils.getRecordKeyFromSecondaryIndexKey(record.getRecordKey()),
record -> record));
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,
StoragePath filePath,
int bufferSize,
boolean wrapStream) {
FSDataInputStream fsDataInputStream = null;
FSDataInputStream fsDataInputStream;
try {
fsDataInputStream = fs.open(convertToHadoopPath(filePath), bufferSize);
} catch (IOException e) {
Expand All @@ -230,16 +230,16 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,

if (isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, filePath, bufferSize), true);
return new SchemeAwareFSDataInputStream(new WrappedWithOriginalFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, filePath, bufferSize), fsDataInputStream), true);
}

if (isCHDFileSystem(fs)) {
return new BoundedFsDataInputStream(fs, convertToHadoopPath(filePath), fsDataInputStream);
}

if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(convertToHadoopPath(filePath), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
return new TimedFSDataInputStream(convertToHadoopPath(filePath), new WrappedWithOriginalFSDataInputStream(new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)), fsDataInputStream));
}

// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.hadoop.fs;

import org.apache.hadoop.fs.FSDataInputStream;

import java.io.IOException;
import java.io.InputStream;

/**
* A wrapper class for {@link FSDataInputStream} that ensures the original input stream
* is properly closed when this stream is closed.
*/
public class WrappedWithOriginalFSDataInputStream extends FSDataInputStream {

private final InputStream originalStream;

public WrappedWithOriginalFSDataInputStream(InputStream wrappedStream, InputStream originalStream) {
super(wrappedStream);
this.originalStream = originalStream;
}

@Override
public void close() throws IOException {
super.close();
originalStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,10 @@ public void releaseBuffer(ByteBuffer buffer) {
public void unbuffer() {
outerStream.unbuffer();
}

@Override
public void close() throws IOException {
super.close();
outerStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema)

reader.getRecordIterator(requiredAvroSchema).asScala
reader.getRecordIterator(requiredAvroSchema)
.map(record => {
avroToRowConverter.apply(record.getData.asInstanceOf[GenericRecord]).get
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
options.foreach(kv => props.setProperty(kv._1, kv._2))
val reader = new HoodieFileGroupReader[InternalRow](
readerContext,
new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
metaClient.getStorage,
tableState.tablePath,
tableState.latestCommitTimestamp.get,
fileSlice,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,25 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.hudi.testutils.HoodieClientTestUtils.{createMetaClient, getSparkConfForTest}
import org.apache.spark.SparkConf
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.checkMessageContains
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.Waiters.{interval, timeout}
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Tag}
import org.slf4j.LoggerFactory

import java.io.File
import java.util.TimeZone
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import scala.util.matching.Regex

class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
private val LOG = LoggerFactory.getLogger(getClass)

Expand All @@ -63,6 +65,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
DateTimeZone.setDefault(DateTimeZone.UTC)
TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
protected lazy val spark: SparkSession = SparkSession.builder()
.config("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath)
.config("spark.sql.session.timeZone", "UTC")
.config("hoodie.insert.shuffle.parallelism", "4")
Expand Down Expand Up @@ -117,6 +120,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
s"h${tableId.incrementAndGet()}"
}

protected override def beforeEach(): Unit = {
super.beforeEach()
DebugFilesystem.clearOpenStreams()
}

protected override def afterEach(): Unit = {
super.afterEach()
// Clear all persistent datasets after each test
spark.sharedState.cacheManager.clearCache()
// files can be closed from other threads, so wait a bit
// normally this doesn't take more than 1s
eventually(timeout(10.seconds), interval(2.seconds)) {
DebugFilesystem.assertNoOpenStreams()
}
}

override protected def afterAll(): Unit = {
Utils.deleteRecursively(sparkWareHouse)
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,33 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
}
}

test("Test Delete MOR Table") {
withTable(generateTableName) { tableName =>
spark.sql(
s"""
|create table $tableName (id int, name string, ts bigint)
|using hudi
|tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|""".stripMargin)
spark.sql(
s"""
|insert into $tableName values (1, "v1", 1000), (2, "v2", 2000),
| (3, "v1", 3000), (4, "v2", 4000)
|""".stripMargin)
spark.sql(
s"""
|delete from $tableName where id = 1
|""".stripMargin)
checkAnswer(s"select id, name from $tableName where name = 'v1'")(
Seq(3, "v1")
)
}
}

test("Test Delete Table with op upsert") {
withTempDir { tmp =>
Seq("cow", "mor").foreach {tableType =>
Expand Down

0 comments on commit d59cdaa

Please sign in to comment.