Skip to content

Commit def45fa

Browse files
authored
minor: include taskAttemptId in log messages (apache#2467)
1 parent d867b0e commit def45fa

File tree

4 files changed

+57
-13
lines changed

4 files changed

+57
-13
lines changed

native/core/src/execution/memory_pools/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool(
4242
match memory_pool_config.pool_type {
4343
MemoryPoolType::Unified => {
4444
// Set Comet memory pool for native
45-
let memory_pool = CometUnifiedMemoryPool::new(comet_task_memory_manager);
45+
let memory_pool =
46+
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
4647
Arc::new(TrackConsumersPool::new(
4748
memory_pool,
4849
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),

native/core/src/execution/memory_pools/unified_pool.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use log::warn;
4040
pub struct CometUnifiedMemoryPool {
4141
task_memory_manager_handle: Arc<GlobalRef>,
4242
used: AtomicUsize,
43+
task_attempt_id: i64,
4344
}
4445

4546
impl Debug for CometUnifiedMemoryPool {
@@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool {
5152
}
5253

5354
impl CometUnifiedMemoryPool {
54-
pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> CometUnifiedMemoryPool {
55+
pub fn new(
56+
task_memory_manager_handle: Arc<GlobalRef>,
57+
task_attempt_id: i64,
58+
) -> CometUnifiedMemoryPool {
5559
Self {
5660
task_memory_manager_handle,
61+
task_attempt_id,
5762
used: AtomicUsize::new(0),
5863
}
5964
}
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
8287
fn drop(&mut self) {
8388
let used = self.used.load(Relaxed);
8489
if used != 0 {
85-
warn!("CometUnifiedMemoryPool dropped with {used} bytes still reserved");
90+
warn!(
91+
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
92+
self.task_attempt_id
93+
);
8694
}
8795
}
8896
}
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
96104
}
97105

98106
fn shrink(&self, _: &MemoryReservation, size: usize) {
99-
self.release_to_spark(size)
100-
.unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
107+
if let Err(e) = self.release_to_spark(size) {
108+
panic!(
109+
"Task {} failed to return {size} bytes to Spark: {e:?}",
110+
self.task_attempt_id
111+
);
112+
}
101113
if let Err(prev) = self
102114
.used
103115
.fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
104116
{
105-
panic!("overflow when releasing {size} of {prev} bytes");
117+
panic!(
118+
"Task {} overflow when releasing {size} of {prev} bytes",
119+
self.task_attempt_id
120+
);
106121
}
107122
}
108123

@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
116131
self.release_to_spark(acquired as usize)?;
117132

118133
return Err(resources_datafusion_err!(
119-
"Failed to acquire {} bytes, only got {}. Reserved: {}",
134+
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
135+
self.task_attempt_id,
120136
additional,
121137
acquired,
122138
self.reserved()
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
127143
.fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired as usize))
128144
{
129145
return Err(resources_datafusion_err!(
130-
"Failed to acquire {} bytes due to overflow. Reserved: {}",
146+
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
147+
self.task_attempt_id,
131148
additional,
132149
prev
133150
));

spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
4040
/** The id uniquely identifies the native plan this memory manager is associated to */
4141
private final long id;
4242

43+
private final long taskAttemptId;
44+
4345
public final TaskMemoryManager internal;
4446
private final NativeMemoryConsumer nativeMemoryConsumer;
4547
private final AtomicLong used = new AtomicLong();
4648

47-
public CometTaskMemoryManager(long id) {
49+
public CometTaskMemoryManager(long id, long taskAttemptId) {
4850
this.id = id;
51+
this.taskAttemptId = taskAttemptId;
4952
this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
5053
this.nativeMemoryConsumer = new NativeMemoryConsumer();
5154
}
5255

5356
// Called by Comet native through JNI.
5457
// Returns the actual amount of memory (in bytes) granted.
5558
public long acquireMemory(long size) {
59+
if (logger.isTraceEnabled()) {
60+
logger.trace("Task {} requested {} bytes", taskAttemptId, size);
61+
}
5662
long acquired = internal.acquireExecutionMemory(size, nativeMemoryConsumer);
57-
used.addAndGet(acquired);
63+
long newUsed = used.addAndGet(acquired);
5864
if (acquired < size) {
65+
logger.warn(
66+
"Task {} requested {} bytes but only received {} bytes. Current allocation is {} and "
67+
+ "the total memory consumption is {} bytes.",
68+
taskAttemptId,
69+
size,
70+
acquired,
71+
newUsed,
72+
internal.getMemoryConsumptionForThisTask());
5973
// If memory manager is not able to acquire the requested size, log memory usage
6074
internal.showMemoryUsage();
6175
}
@@ -64,10 +78,16 @@ public long acquireMemory(long size) {
6478

6579
// Called by Comet native through JNI
6680
public void releaseMemory(long size) {
81+
if (logger.isTraceEnabled()) {
82+
logger.trace("Task {} released {} bytes", taskAttemptId, size);
83+
}
6784
long newUsed = used.addAndGet(-size);
6885
if (newUsed < 0) {
6986
logger.error(
70-
"Used memory is negative: " + newUsed + " after releasing memory chunk of: " + size);
87+
"Task {} used memory is negative ({}) after releasing {} bytes",
88+
taskAttemptId,
89+
newUsed,
90+
size);
7191
}
7292
internal.releaseExecutionMemory(size, nativeMemoryConsumer);
7393
}

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ class CometExecIterator(
6868
private val memoryMXBean = ManagementFactory.getMemoryMXBean
6969
private val nativeLib = new Native()
7070
private val nativeUtil = new NativeUtil()
71-
private val cometTaskMemoryManager = new CometTaskMemoryManager(id)
71+
private val taskAttemptId = TaskContext.get().taskAttemptId
72+
private val cometTaskMemoryManager = new CometTaskMemoryManager(id, taskAttemptId)
7273
private val cometBatchIterators = inputs.map { iterator =>
7374
new CometBatchIterator(iterator, nativeUtil)
7475
}.toArray
@@ -116,7 +117,7 @@ class CometExecIterator(
116117
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
117118
memoryLimit,
118119
memoryLimitPerTask,
119-
taskAttemptId = TaskContext.get().taskAttemptId,
120+
taskAttemptId,
120121
debug = COMET_DEBUG_ENABLED.get(),
121122
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
122123
tracingEnabled)
@@ -175,6 +176,11 @@ class CometExecIterator(
175176
})
176177
} catch {
177178
case e: CometNativeException =>
179+
// it is generally considered bad practice to log and then rethrow an
180+
// exception, but it really helps debugging to be able to see which task
181+
// threw the exception, so we log the exception with taskAttemptId here
182+
logError(s"Native execution for task $taskAttemptId failed", e)
183+
178184
val fileNotFoundPattern: Regex =
179185
("""^External: Object at location (.+?) not found: No such file or directory """ +
180186
"""\(os error \d+\)$""").r

0 commit comments

Comments
 (0)