Skip to content

Commit 9f4e16d

Browse files
committed
Handle SQL AnalysisException failures
1 parent 1f5d818 commit 9f4e16d

File tree

2 files changed

+187
-0
lines changed

2 files changed

+187
-0
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
114114
private final HashMap<Long, AgentSpan> stageSpans = new HashMap<>();
115115

116116
private final HashMap<Integer, Integer> stageToJob = new HashMap<>();
117+
private final HashMap<Integer, Long> jobToSqlExecution = new HashMap<>();
117118
private final HashMap<Long, Properties> stageProperties = new HashMap<>();
118119

119120
private final SparkAggregatedTaskMetrics applicationMetrics = new SparkAggregatedTaskMetrics();
@@ -139,6 +140,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
139140
private boolean lastJobFailed = false;
140141
private String lastJobFailedMessage;
141142
private String lastJobFailedStackTrace;
143+
private boolean lastSqlFailed = false;
144+
private String lastSqlFailedMessage;
145+
private String lastSqlFailedStack;
142146
private int jobCount = 0;
143147
private int currentExecutorCount = 0;
144148
private int maxExecutorCount = 0;
@@ -356,6 +360,11 @@ public synchronized void finishApplication(
356360
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed");
357361
applicationSpan.setTag(DDTags.ERROR_MSG, lastJobFailedMessage);
358362
applicationSpan.setTag(DDTags.ERROR_STACK, lastJobFailedStackTrace);
363+
} else if (lastSqlFailed) {
364+
applicationSpan.setError(true);
365+
applicationSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
366+
applicationSpan.setTag(DDTags.ERROR_MSG, lastSqlFailedMessage);
367+
applicationSpan.setTag(DDTags.ERROR_STACK, lastSqlFailedStack);
359368
}
360369

361370
applicationMetrics.setSpanMetrics(applicationSpan);
@@ -513,6 +522,9 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
513522
for (int stageId : getSparkJobStageIds(jobStart)) {
514523
stageToJob.put(stageId, jobStart.jobId());
515524
}
525+
if (sqlExecutionId != null) {
526+
jobToSqlExecution.put(jobStart.jobId(), sqlExecutionId);
527+
}
516528
jobSpans.put(jobStart.jobId(), jobSpan);
517529
notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart);
518530
}
@@ -524,6 +536,8 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
524536
return;
525537
}
526538

539+
Long sqlExecutionId = jobToSqlExecution.remove(jobEnd.jobId());
540+
527541
if (jobEnd.jobResult() instanceof JobFailed) {
528542
JobFailed jobFailed = (JobFailed) jobEnd.jobResult();
529543
Exception exception = jobFailed.exception();
@@ -536,6 +550,17 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
536550
jobSpan.setTag(DDTags.ERROR_STACK, errorStackTrace);
537551
jobSpan.setTag(DDTags.ERROR_TYPE, "Spark Job Failed");
538552

553+
// Propagate the error to the parent SQL span if one exists
554+
if (sqlExecutionId != null) {
555+
AgentSpan sqlSpan = sqlSpans.get(sqlExecutionId);
556+
if (sqlSpan != null) {
557+
sqlSpan.setError(true);
558+
sqlSpan.setErrorMessage(errorMessage);
559+
sqlSpan.setTag(DDTags.ERROR_STACK, errorStackTrace);
560+
sqlSpan.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
561+
}
562+
}
563+
539564
// Only propagate the error to the application if it is not a cancellation
540565
if (errorMessage != null && !errorMessage.toLowerCase().contains("cancelled")) {
541566
lastJobFailed = true;
@@ -842,6 +867,9 @@ private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
842867
private static final MethodHandle adaptiveExecutionIdMethod;
843868
private static final MethodHandle adaptiveSparkPlanMethod;
844869

870+
// Spark 3.4+ added errorMessage() to SparkListenerSQLExecutionEnd (SPARK-41827)
871+
private static final MethodHandle sqlEndErrorMessageMethod;
872+
845873
@SuppressForbidden // Using reflection to avoid splitting the instrumentation once more
846874
private static Class<?> findAdaptiveExecutionUpdateClass() throws ClassNotFoundException {
847875
return Class.forName(
@@ -869,6 +897,18 @@ private static Class<?> findAdaptiveExecutionUpdateClass() throws ClassNotFoundE
869897
adaptiveExecutionUpdateClass = executionUpdateClass;
870898
adaptiveExecutionIdMethod = executionIdMethod;
871899
adaptiveSparkPlanMethod = sparkPlanMethod;
900+
901+
MethodHandle errorMessageMethod = null;
902+
try {
903+
MethodHandles.Lookup lookup = MethodHandles.lookup();
904+
errorMessageMethod =
905+
lookup.findVirtual(
906+
SparkListenerSQLExecutionEnd.class,
907+
"errorMessage",
908+
MethodType.methodType(scala.Option.class));
909+
} catch (NoSuchMethodException | IllegalAccessException ignored) {
910+
}
911+
sqlEndErrorMessageMethod = errorMessageMethod;
872912
}
873913

874914
private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
@@ -891,6 +931,15 @@ private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sql
891931

892932
private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
893933
AgentSpan span = sqlSpans.remove(sqlEnd.executionId());
934+
935+
// If the span was never created (no job ran, e.g. AnalysisException), create it now
936+
if (span == null && sqlQueries.containsKey(sqlEnd.executionId())) {
937+
span = getOrCreateSqlSpan(sqlEnd.executionId(), null, null);
938+
if (span != null) {
939+
sqlSpans.remove(sqlEnd.executionId());
940+
}
941+
}
942+
894943
SparkAggregatedTaskMetrics metrics = sqlMetrics.remove(sqlEnd.executionId());
895944
sqlQueries.remove(sqlEnd.executionId());
896945
sqlPlans.remove(sqlEnd.executionId());
@@ -899,12 +948,44 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
899948
if (metrics != null) {
900949
metrics.setSpanMetrics(span);
901950
}
951+
952+
String errorMsg = getSqlEndErrorMessage(sqlEnd);
953+
if (errorMsg != null) {
954+
span.setError(true);
955+
span.setErrorMessage(getErrorMessageWithoutStackTrace(errorMsg));
956+
span.setTag(DDTags.ERROR_STACK, errorMsg);
957+
span.setTag(DDTags.ERROR_TYPE, "Spark SQL Failed");
958+
959+
String sqlErrorMessage = getErrorMessageWithoutStackTrace(errorMsg);
960+
if (sqlErrorMessage == null || !sqlErrorMessage.toLowerCase().contains("cancelled")) {
961+
lastSqlFailed = true;
962+
lastSqlFailedMessage = sqlErrorMessage;
963+
lastSqlFailedStack = errorMsg;
964+
}
965+
} else {
966+
lastSqlFailed = false;
967+
}
968+
902969
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd);
903970

904971
span.finish(sqlEnd.time() * 1000);
905972
}
906973
}
907974

975+
private static String getSqlEndErrorMessage(SparkListenerSQLExecutionEnd sqlEnd) {
976+
if (sqlEndErrorMessageMethod == null) {
977+
return null;
978+
}
979+
try {
980+
scala.Option<?> errorMessage = (scala.Option<?>) sqlEndErrorMessageMethod.invoke(sqlEnd);
981+
if (errorMessage.isDefined()) {
982+
return (String) errorMessage.get();
983+
}
984+
} catch (Throwable ignored) {
985+
}
986+
return null;
987+
}
988+
908989
private synchronized void onStreamingQueryStartedEvent(
909990
StreamingQueryListener.QueryStartedEvent event) {
910991
if (streamingQueries.size() > MAX_COLLECTION_SIZE) {

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import org.apache.spark.scheduler.StageInfo
2424
import org.apache.spark.scheduler.TaskInfo
2525
import org.apache.spark.scheduler.TaskLocality
2626
import org.apache.spark.scheduler.cluster.ExecutorInfo
27+
import org.apache.spark.sql.execution.SparkPlanInfo
28+
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
29+
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
2730
import org.apache.spark.storage.RDDInfo
2831
import scala.Option
2932
import scala.collection.immutable.HashMap
@@ -652,6 +655,109 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
652655
.getOption("spark.openlineage.circuitBreaker.timeoutInSeconds") == Option.apply("120")
653656
}
654657

658+
protected sqlExecutionStartEvent(Long executionId, Long time, String description = "SELECT * FROM test") {
659+
def emptySeq = JavaConverters.asScalaBuffer([]).toSeq()
660+
def emptyMap = new scala.collection.immutable.HashMap<String, String>()
661+
def sparkPlanInfo = new SparkPlanInfo("TestPlan", "TestPlan", emptySeq, emptyMap, emptySeq)
662+
663+
return new SparkListenerSQLExecutionStart(
664+
executionId,
665+
description,
666+
"details",
667+
"physical plan",
668+
sparkPlanInfo,
669+
time
670+
)
671+
}
672+
673+
protected sqlExecutionEndEvent(Long executionId, Long time) {
674+
return new SparkListenerSQLExecutionEnd(executionId, time)
675+
}
676+
677+
def "test SQL span created when no job runs"() {
678+
setup:
679+
def listener = getTestDatadogSparkListener()
680+
listener.onApplicationStart(applicationStartEvent(1000L))
681+
listener.onOtherEvent(sqlExecutionStartEvent(1L, 2000L, "SELECT * FROM missing_table"))
682+
listener.onOtherEvent(sqlExecutionEndEvent(1L, 3000L))
683+
listener.onApplicationEnd(new SparkListenerApplicationEnd(4000L))
684+
685+
expect:
686+
assertTraces(1) {
687+
trace(2) {
688+
span {
689+
operationName "spark.application"
690+
resourceName "spark.application"
691+
spanType "spark"
692+
parent()
693+
}
694+
span {
695+
operationName "spark.sql"
696+
resourceName "SELECT * FROM missing_table"
697+
spanType "spark"
698+
errored false
699+
childOf(span(0))
700+
}
701+
}
702+
}
703+
}
704+
705+
def "test job failure propagates error to SQL span"() {
706+
setup:
707+
def listener = getTestDatadogSparkListener()
708+
listener.onApplicationStart(applicationStartEvent(1000L))
709+
710+
// SQL execution with a job that fails
711+
listener.onOtherEvent(sqlExecutionStartEvent(1L, 2000L, "SELECT * FROM bad_table"))
712+
listener.onJobStart(jobStartEventWithSql(1, 2100L, [1], 1L))
713+
listener.onJobEnd(jobFailedEvent(1, 2500L, "Table not found"))
714+
listener.onOtherEvent(sqlExecutionEndEvent(1L, 3000L))
715+
716+
listener.onApplicationEnd(new SparkListenerApplicationEnd(4000L))
717+
718+
expect:
719+
assertTraces(1) {
720+
trace(3) {
721+
span {
722+
operationName "spark.application"
723+
resourceName "spark.application"
724+
spanType "spark"
725+
errored true
726+
parent()
727+
}
728+
span {
729+
operationName "spark.sql"
730+
resourceName "SELECT * FROM bad_table"
731+
spanType "spark"
732+
errored true
733+
assert span.tags["error.type"] == "Spark SQL Failed"
734+
childOf(span(0))
735+
}
736+
span {
737+
operationName "spark.job"
738+
spanType "spark"
739+
errored true
740+
childOf(span(1))
741+
}
742+
}
743+
}
744+
}
745+
746+
protected jobStartEventWithSql(Integer jobId, Long time, ArrayList<Integer> stageIds, Long sqlExecutionId) {
747+
def stageInfos = stageIds.collect { stageId ->
748+
createStageInfo(stageId)
749+
}
750+
def props = new Properties()
751+
props.setProperty("spark.sql.execution.id", sqlExecutionId.toString())
752+
753+
return new SparkListenerJobStart(
754+
jobId,
755+
time,
756+
JavaConverters.asScalaBuffer(stageInfos).toSeq(),
757+
props
758+
)
759+
}
760+
655761
protected validateRelativeError(double value, double expected, double relativeAccuracy) {
656762
double relativeError = Math.abs(value - expected) / expected
657763
assert relativeError < relativeAccuracy

0 commit comments

Comments
 (0)