Skip to content

Commit 4fc4e37

Browse files
authored
Merge branch 'apache:main' into main
2 parents 8aba4be + 07393c1 commit 4fc4e37

File tree

22 files changed

+213
-269
lines changed

22 files changed

+213
-269
lines changed

.github/workflows/benchmark.yml renamed to .github/workflows/benchmark-tpcds.yml

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ jobs:
6363
key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}
6464
restore-keys: |
6565
${{ runner.os }}-java-maven-
66-
66+
- name: Build Comet
67+
run: make release
6768
- name: Cache TPC-DS generated data
6869
id: cache-tpcds-sf-1
6970
uses: actions/cache@v4
@@ -76,17 +77,6 @@ jobs:
7677
with:
7778
repository: databricks/tpcds-kit
7879
path: ./tpcds-kit
79-
- name: Build Comet
80-
run: make release
81-
- name: Upload Comet native lib
82-
uses: actions/upload-artifact@v4
83-
with:
84-
name: libcomet-${{ github.run_id }}
85-
path: |
86-
native/target/release/libcomet.so
87-
native/target/release/libcomet.dylib
88-
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
89-
overwrite: true
9080
- name: Build tpcds-kit
9181
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
9282
run: |
@@ -132,11 +122,8 @@ jobs:
132122
path: ./tpcds-sf-1
133123
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml') }}
134124
fail-on-cache-miss: true # it's always be cached as it should be generated by pre-step if not existed
135-
- name: Download Comet native lib
136-
uses: actions/download-artifact@v5
137-
with:
138-
name: libcomet-${{ github.run_id }}
139-
path: native/target/release
125+
- name: Build Comet
126+
run: make release
140127
- name: Run TPC-DS queries (Sort merge join)
141128
if: matrix.join == 'sort_merge'
142129
run: |

.github/workflows/benchmark-tpch.yml

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,6 @@ jobs:
7171
key: tpch-${{ hashFiles('.github/workflows/benchmark-tpch.yml') }}
7272
- name: Build Comet
7373
run: make release
74-
- name: Upload Comet native lib
75-
uses: actions/upload-artifact@v4
76-
with:
77-
name: libcomet-${{ github.run_id }}
78-
path: |
79-
native/target/release/libcomet.so
80-
native/target/release/libcomet.dylib
81-
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
82-
overwrite: true
8374
- name: Generate TPC-H (SF=1) table data
8475
if: steps.cache-tpch-sf-1.outputs.cache-hit != 'true'
8576
run: |
@@ -115,11 +106,8 @@ jobs:
115106
path: ./tpch
116107
key: tpch-${{ hashFiles('.github/workflows/benchmark-tpch.yml') }}
117108
fail-on-cache-miss: true # it's always be cached as it should be generated by pre-step if not existed
118-
- name: Download Comet native lib
119-
uses: actions/download-artifact@v5
120-
with:
121-
name: libcomet-${{ github.run_id }}
122-
path: native/target/release
109+
- name: Build Comet
110+
run: make release
123111
- name: Run TPC-H queries
124112
run: |
125113
SPARK_HOME=`pwd` SPARK_TPCH_DATA=`pwd`/tpch/sf1_parquet ./mvnw -B -Prelease -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,10 @@ object CometConf extends ShimCometConf {
457457
val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
458458
conf("spark.comet.explain.verbose.enabled")
459459
.doc(
460-
"When this setting is enabled, Comet will provide a verbose tree representation of " +
461-
"the extended information.")
460+
"When this setting is enabled, Comet's extended explain output will provide the full " +
461+
"query plan annotated with fallback reasons as well as a summary of how much of " +
462+
"the plan was accelerated by Comet. When this setting is disabled, a list of fallback " +
463+
"reasons will be provided instead.")
462464
.booleanConf
463465
.createWithDefault(false)
464466

dev/benchmarks/tpcbench.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
5959

6060
for iteration in range(0, iterations):
6161
print(f"Starting iteration {iteration} of {iterations}")
62+
iter_start_time = time.time()
6263

6364
# Determine which queries to run
6465
if query_num is not None:

dev/diffs/iceberg/1.8.1.diff

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ index 7327b38905d..7967109f039 100644
1717
exclude group: 'org.apache.avro', module: 'avro'
1818
// already shaded by Parquet
1919
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
20-
index 04ffa8f4edc..cc0099ccc93 100644
20+
index 04ffa8f4edc..a909cd552c1 100644
2121
--- a/gradle/libs.versions.toml
2222
+++ b/gradle/libs.versions.toml
2323
@@ -34,6 +34,7 @@ azuresdk-bom = "1.2.31"
@@ -300,10 +300,10 @@ index 00000000000..ddf6c7de5ae
300300
+}
301301
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
302302
new file mode 100644
303-
index 00000000000..88b195b76a2
303+
index 00000000000..a3cba401827
304304
--- /dev/null
305305
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
306-
@@ -0,0 +1,255 @@
306+
@@ -0,0 +1,260 @@
307307
+/*
308308
+ * Licensed to the Apache Software Foundation (ASF) under one
309309
+ * or more contributor license agreements. See the NOTICE file
@@ -446,6 +446,7 @@ index 00000000000..88b195b76a2
446446
+ private long valuesRead = 0;
447447
+ private T last = null;
448448
+ private final FileReader cometReader;
449+
+ private ReadConf conf;
449450
+
450451
+ FileIterator(
451452
+ ReadConf conf,
@@ -470,6 +471,7 @@ index 00000000000..88b195b76a2
470471
+ length,
471472
+ fileEncryptionKey,
472473
+ fileAADPrefix);
474+
+ this.conf = conf;
473475
+ }
474476
+
475477
+ private FileReader newCometReader(
@@ -556,6 +558,9 @@ index 00000000000..88b195b76a2
556558
+ public void close() throws IOException {
557559
+ model.close();
558560
+ cometReader.close();
561+
+ if (conf != null && conf.reader() != null) {
562+
+ conf.reader().close();
563+
+ }
559564
+ }
560565
+ }
561566
+}

dev/diffs/iceberg/1.9.1.diff

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,10 @@ index 00000000000..ddf6c7de5ae
291291
+}
292292
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
293293
new file mode 100644
294-
index 00000000000..88b195b76a2
294+
index 00000000000..a3cba401827
295295
--- /dev/null
296296
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
297-
@@ -0,0 +1,255 @@
297+
@@ -0,0 +1,260 @@
298298
+/*
299299
+ * Licensed to the Apache Software Foundation (ASF) under one
300300
+ * or more contributor license agreements. See the NOTICE file
@@ -437,6 +437,7 @@ index 00000000000..88b195b76a2
437437
+ private long valuesRead = 0;
438438
+ private T last = null;
439439
+ private final FileReader cometReader;
440+
+ private ReadConf conf;
440441
+
441442
+ FileIterator(
442443
+ ReadConf conf,
@@ -461,6 +462,7 @@ index 00000000000..88b195b76a2
461462
+ length,
462463
+ fileEncryptionKey,
463464
+ fileAADPrefix);
465+
+ this.conf = conf;
464466
+ }
465467
+
466468
+ private FileReader newCometReader(
@@ -547,6 +549,9 @@ index 00000000000..88b195b76a2
547549
+ public void close() throws IOException {
548550
+ model.close();
549551
+ cometReader.close();
552+
+ if (conf != null && conf.reader() != null) {
553+
+ conf.reader().close();
554+
+ }
550555
+ }
551556
+ }
552557
+}

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Comet provides the following configuration settings.
6363
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
6464
| spark.comet.exec.window.enabled | Whether to enable window by default. | true |
6565
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
66-
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
66+
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
6767
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
6868
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
6969
| spark.comet.logFallbackReasons.enabled | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |

native/core/src/execution/jni_api.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ use datafusion::{
4141
};
4242
use datafusion_comet_proto::spark_operator::Operator;
4343
use datafusion_spark::function::bitwise::bit_get::SparkBitGet;
44+
use datafusion_spark::function::datetime::date_add::SparkDateAdd;
45+
use datafusion_spark::function::datetime::date_sub::SparkDateSub;
4446
use datafusion_spark::function::hash::sha2::SparkSha2;
4547
use datafusion_spark::function::math::expm1::SparkExpm1;
4648
use datafusion_spark::function::string::char::CharFunc;
@@ -303,6 +305,8 @@ fn prepare_datafusion_session_context(
303305
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default()));
304306
session_ctx.register_udf(ScalarUDF::new_from_impl(CharFunc::default()));
305307
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default()));
308+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default()));
309+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default()));
306310

307311
// Must be the last one to override existing functions with the same name
308312
datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?;

native/core/src/execution/memory_pools/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool(
4242
match memory_pool_config.pool_type {
4343
MemoryPoolType::Unified => {
4444
// Set Comet memory pool for native
45-
let memory_pool = CometUnifiedMemoryPool::new(comet_task_memory_manager);
45+
let memory_pool =
46+
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
4647
Arc::new(TrackConsumersPool::new(
4748
memory_pool,
4849
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),

native/core/src/execution/memory_pools/unified_pool.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use log::warn;
4040
pub struct CometUnifiedMemoryPool {
4141
task_memory_manager_handle: Arc<GlobalRef>,
4242
used: AtomicUsize,
43+
task_attempt_id: i64,
4344
}
4445

4546
impl Debug for CometUnifiedMemoryPool {
@@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool {
5152
}
5253

5354
impl CometUnifiedMemoryPool {
54-
pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> CometUnifiedMemoryPool {
55+
pub fn new(
56+
task_memory_manager_handle: Arc<GlobalRef>,
57+
task_attempt_id: i64,
58+
) -> CometUnifiedMemoryPool {
5559
Self {
5660
task_memory_manager_handle,
61+
task_attempt_id,
5762
used: AtomicUsize::new(0),
5863
}
5964
}
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
8287
fn drop(&mut self) {
8388
let used = self.used.load(Relaxed);
8489
if used != 0 {
85-
warn!("CometUnifiedMemoryPool dropped with {used} bytes still reserved");
90+
warn!(
91+
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
92+
self.task_attempt_id
93+
);
8694
}
8795
}
8896
}
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
96104
}
97105

98106
fn shrink(&self, _: &MemoryReservation, size: usize) {
99-
self.release_to_spark(size)
100-
.unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
107+
if let Err(e) = self.release_to_spark(size) {
108+
panic!(
109+
"Task {} failed to return {size} bytes to Spark: {e:?}",
110+
self.task_attempt_id
111+
);
112+
}
101113
if let Err(prev) = self
102114
.used
103115
.fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
104116
{
105-
panic!("overflow when releasing {size} of {prev} bytes");
117+
panic!(
118+
"Task {} overflow when releasing {size} of {prev} bytes",
119+
self.task_attempt_id
120+
);
106121
}
107122
}
108123

@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
116131
self.release_to_spark(acquired as usize)?;
117132

118133
return Err(resources_datafusion_err!(
119-
"Failed to acquire {} bytes, only got {}. Reserved: {}",
134+
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
135+
self.task_attempt_id,
120136
additional,
121137
acquired,
122138
self.reserved()
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
127143
.fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired as usize))
128144
{
129145
return Err(resources_datafusion_err!(
130-
"Failed to acquire {} bytes due to overflow. Reserved: {}",
146+
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
147+
self.task_attempt_id,
131148
additional,
132149
prev
133150
));

0 commit comments

Comments
 (0)