Skip to content

Commit 6c7b13c

Browse files
committed
capture SQL failures invisible to the listener bus
1 parent c0ce9c5 commit 6c7b13c

File tree

3 files changed

+93
-1
lines changed

3 files changed

+93
-1
lines changed

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,31 @@ private synchronized void onStreamingQueryProgressEvent(
10521052
}
10531053
}
10541054

1055+
public synchronized void onSqlFailure(String sqlText, Throwable throwable) {
1056+
if (isRunningOnDatabricks) return;
1057+
if (applicationEnded) return;
1058+
1059+
initApplicationSpanIfNotInitialized();
1060+
1061+
AgentSpan sqlSpan =
1062+
buildSparkSpan("spark.sql", null)
1063+
.withTag(DDTags.RESOURCE_NAME, sqlText)
1064+
.withTag("description", sqlText)
1065+
.asChildOf(applicationSpan.context())
1066+
.start();
1067+
1068+
sqlSpan.setError(true);
1069+
sqlSpan.setTag(DDTags.ERROR_TYPE, throwable.getClass().getName());
1070+
sqlSpan.setTag(DDTags.ERROR_MSG, throwable.getMessage());
1071+
1072+
java.io.StringWriter sw = new java.io.StringWriter();
1073+
throwable.printStackTrace(new java.io.PrintWriter(sw));
1074+
sqlSpan.setTag(DDTags.ERROR_STACK, sw.toString());
1075+
1076+
setDataJobsSamplingPriority(sqlSpan);
1077+
sqlSpan.finish();
1078+
}
1079+
10551080
private void setDataJobsSamplingPriority(AgentSpan span) {
10561081
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
10571082
}

dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
66
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
77
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
89

910
import datadog.trace.agent.tooling.Instrumenter;
1011
import datadog.trace.agent.tooling.InstrumenterModule;
@@ -39,7 +40,9 @@ public String[] knownMatchingTypes() {
3940
"org.apache.spark.util.Utils",
4041
"org.apache.spark.util.SparkClassUtils",
4142
"org.apache.spark.scheduler.LiveListenerBus",
42-
"org.apache.spark.sql.execution.SparkPlanInfo$"
43+
"org.apache.spark.sql.execution.SparkPlanInfo$",
44+
"org.apache.spark.sql.SparkSession",
45+
"org.apache.spark.sql.DataFrameReader"
4346
};
4447
}
4548

@@ -74,6 +77,23 @@ public void methodAdvice(MethodTransformer transformer) {
7477
.and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface")))
7578
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
7679
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
80+
81+
// SparkSession.sql(String) and SparkSession.table(String) — catch AnalysisException failures
82+
// that fire before SparkListenerSQLExecutionStart and are invisible to the listener bus
83+
transformer.applyAdvice(
84+
isMethod()
85+
.and(named("sql").or(named("table")))
86+
.and(takesArguments(1))
87+
.and(takesArgument(0, String.class))
88+
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
89+
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
90+
transformer.applyAdvice(
91+
isMethod()
92+
.and(named("table"))
93+
.and(takesArguments(1))
94+
.and(takesArgument(0, String.class))
95+
.and(isDeclaredBy(named("org.apache.spark.sql.DataFrameReader"))),
96+
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
7797
}
7898

7999
public static class PrepareSubmitEnvAdvice {
@@ -122,6 +142,21 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
122142
}
123143
}
124144

145+
public static class SparkSqlFailureAdvice {
146+
@Advice.OnMethodEnter(suppress = Throwable.class)
147+
public static void enter(
148+
@Advice.Argument(0) String identifier, @Advice.Local("sqlIdentifier") String stored) {
149+
stored = identifier;
150+
}
151+
152+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
153+
public static void exit(
154+
@Advice.Thrown Throwable throwable, @Advice.Local("sqlIdentifier") String identifier) {
155+
if (throwable == null || AbstractDatadogSparkListener.listener == null) return;
156+
AbstractDatadogSparkListener.listener.onSqlFailure(identifier, throwable);
157+
}
158+
}
159+
125160
public static class LiveListenerBusAdvice {
126161
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
127162
// If OL is disabled in tracer config but user set it up manually don't interfere

dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,6 +1639,38 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
16391639
return plan
16401640
}
16411641

1642+
def "failed spark.sql call creates an error span"() {
1643+
def sqlText = "SELECT * FROM non_existing_table"
1644+
def sparkSession = SparkSession.builder()
1645+
.config("spark.master", "local[2]")
1646+
.getOrCreate()
1647+
1648+
try {
1649+
sparkSession.sql(sqlText).show()
1650+
} catch (Exception e) {
1651+
// expected
1652+
}
1653+
sparkSession.stop()
1654+
1655+
expect:
1656+
assertTraces(1) {
1657+
trace(2) {
1658+
span {
1659+
operationName "spark.application"
1660+
spanType "spark"
1661+
errored false
1662+
}
1663+
span {
1664+
operationName "spark.sql"
1665+
spanType "spark"
1666+
resourceName sqlText
1667+
childOf(span(0))
1668+
errored true
1669+
}
1670+
}
1671+
}
1672+
}
1673+
16421674
private static Object normalizeColumnRefs(Object plan) {
16431675
if (plan instanceof String) {
16441676
return plan.replaceAll(/#\d+L?/, '#N').replaceAll(/plan_id=\d+/, 'plan_id=N')

0 commit comments

Comments
 (0)