Skip to content

Commit

Permalink
[FLINK-31275] notify listener about query operation
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
  • Loading branch information
pawel-big-lebowski committed Jan 29, 2025
1 parent 750d96c commit f15dd9e
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Flink provides a pluggable interface for users to register their custom logic fo
This enables users to implement their own flink lineage reporter to send lineage info to third party data lineage systems for example Datahub and Openlineage.

The job status changed listeners are triggered every time status change happened for the application. The data lineage info is included in the JobCreatedEvent.
QueryOperationEvent can be used together with JobCreatedEvent to provide column level lineage for Flink SQL jobs.

### Implement a plugin for your custom enricher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,46 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;

import org.apache.flink.shaded.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;

/** Util class for {@link JobStatusChangedListener}. */
@Internal
public final class JobStatusChangedListenerUtils {
/**
* Create job status changed listeners from configuration for job.
*
* @param configuration The job configuration.
* @return the job status changed listeners.
*/
public static List<JobStatusChangedListener> createJobStatusChangedListeners(
Configuration configuration) {
// One thread to notify job status changes.
final ExecutorService ioExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("TableEnvironmentImpl JobStatusChange Listener")
.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE)
.build());

return JobStatusChangedListenerUtils.createJobStatusChangedListeners(
Thread.currentThread().getContextClassLoader(),
new Configuration(configuration),
ioExecutor);
}

/**
* Create job status changed listeners from configuration for job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -464,6 +466,10 @@ private TableEnvironmentInternal createStreamTableEnvironment(
catalogManager,
functionCatalog);

final List<JobStatusChangedListener> jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
settings.getConfiguration());

return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
Expand All @@ -473,7 +479,8 @@ private TableEnvironmentInternal createStreamTableEnvironment(
env,
planner,
executor,
settings.isStreamingMode());
settings.isStreamingMode(),
jobStatusChangedListeners);
}

private ResultFetcher executeOperationInStatementSetState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -84,7 +85,8 @@ public AbstractStreamTableEnvironmentImpl(
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode,
StreamExecutionEnvironment executionEnvironment) {
StreamExecutionEnvironment executionEnvironment,
List<JobStatusChangedListener> jobStatusChangedListeners) {
super(
catalogManager,
moduleManager,
Expand All @@ -93,7 +95,8 @@ public AbstractStreamTableEnvironmentImpl(
executor,
functionCatalog,
planner,
isStreamingMode);
isStreamingMode,
jobStatusChangedListeners);
this.executionEnvironment = executionEnvironment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -62,6 +65,7 @@

import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

/**
Expand All @@ -83,7 +87,8 @@ public StreamTableEnvironmentImpl(
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode) {
boolean isStreamingMode,
List<JobStatusChangedListener> jobStatusChangedListeners) {
super(
catalogManager,
moduleManager,
Expand All @@ -93,7 +98,8 @@ public StreamTableEnvironmentImpl(
functionCatalog,
planner,
isStreamingMode,
executionEnvironment);
executionEnvironment,
jobStatusChangedListeners);
}

public static StreamTableEnvironment create(
Expand Down Expand Up @@ -157,6 +163,10 @@ public static StreamTableEnvironment create(
catalogManager,
functionCatalog);

final List<JobStatusChangedListener> jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
(Configuration) tableConfig.getRootConfiguration());

return new StreamTableEnvironmentImpl(
catalogManager,
moduleManager,
Expand All @@ -166,7 +176,8 @@ public static StreamTableEnvironment create(
executionEnvironment,
planner,
executor,
settings.isStreamingMode());
settings.isStreamingMode(),
jobStatusChangedListeners);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.table.api.bridge.java.internal;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
Expand Down Expand Up @@ -84,6 +86,9 @@ private StreamTableEnvironmentImpl getStreamTableEnvironment(
new URL[0],
Thread.currentThread().getContextClassLoader(),
tableConfig.getConfiguration());
final List<JobStatusChangedListener> jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
tableConfig.getConfiguration());

return new StreamTableEnvironmentImpl(
catalogManager,
Expand All @@ -94,7 +99,8 @@ private StreamTableEnvironmentImpl getStreamTableEnvironment(
env,
new TestPlanner(elements.getTransformation()),
new ExecutorMock(),
true);
true,
jobStatusChangedListeners);
}

private static class TestPlanner extends PlannerMock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobStatusChangedListener;
import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.CompiledPlan;
Expand Down Expand Up @@ -109,6 +111,7 @@
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.runtime.execution.DefaultQueryOperationEvent;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
Expand Down Expand Up @@ -157,6 +160,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final Planner planner;
private final boolean isStreamingMode;
private final ExecutableOperation.Context operationCtx;
private final List<JobStatusChangedListener> jobStatusChangedListeners;

private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
Expand All @@ -176,7 +180,8 @@ protected TableEnvironmentImpl(
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode) {
boolean isStreamingMode,
List<JobStatusChangedListener> jobStatusChangedListeners) {
this.catalogManager = catalogManager;
this.moduleManager = moduleManager;
this.resourceManager = resourceManager;
Expand Down Expand Up @@ -224,6 +229,7 @@ protected TableEnvironmentImpl(
resourceManager,
tableConfig,
isStreamingMode);
this.jobStatusChangedListeners = jobStatusChangedListeners;
}

public static TableEnvironmentImpl create(Configuration configuration) {
Expand Down Expand Up @@ -281,6 +287,10 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
.build())
.build();

final List<JobStatusChangedListener> jobStatusChangedListeners =
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
(Configuration) tableConfig.getRootConfiguration());

final FunctionCatalog functionCatalog =
new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);

Expand All @@ -301,7 +311,8 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
executor,
functionCatalog,
planner,
settings.isStreamingMode());
settings.isStreamingMode(),
jobStatusChangedListeners);
}

@Override
Expand Down Expand Up @@ -839,7 +850,7 @@ public TableResultInternal executePlan(InternalPlan plan) {
List<Transformation<?>> transformations = planner.translatePlan(plan);
List<String> sinkIdentifierNames =
deduplicateSinkIdentifierNames(plan.getSinkIdentifiers());
return executeInternal(transformations, sinkIdentifierNames);
return executeInternal(transformations, sinkIdentifierNames, Collections.emptyList());
}

private CompiledPlan compilePlanAndWrite(
Expand Down Expand Up @@ -929,7 +940,13 @@ public TableResultInternal executeInternal(List<ModifyOperation> operations) {

List<Transformation<?>> transformations = translate(mapOperations);
List<String> sinkIdentifierNames = extractSinkIdentifierNames(mapOperations);
return executeInternal(transformations, sinkIdentifierNames, jobStatusHookList);
List<QueryOperation> queryOperationsList = extractQueryOperations(mapOperations);
return executeInternal(
transformations, sinkIdentifierNames, queryOperationsList, jobStatusHookList);
}

private List<QueryOperation> extractQueryOperations(List<ModifyOperation> mapOperations) {
return mapOperations.stream().map(ModifyOperation::getChild).collect(Collectors.toList());
}

private ModifyOperation getModifyOperation(
Expand Down Expand Up @@ -1062,13 +1079,17 @@ private TableResultInternal executeInternal(
}

private TableResultInternal executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
return executeInternal(transformations, sinkIdentifierNames, Collections.emptyList());
List<Transformation<?>> transformations,
List<String> sinkIdentifierNames,
List<QueryOperation> queryOperationsList) {
return executeInternal(
transformations, sinkIdentifierNames, queryOperationsList, Collections.emptyList());
}

private TableResultInternal executeInternal(
List<Transformation<?>> transformations,
List<String> sinkIdentifierNames,
List<QueryOperation> queryOperationsList,
List<JobStatusHook> jobStatusHookList) {
final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);

Expand All @@ -1083,6 +1104,17 @@ private TableResultInternal executeInternal(
jobStatusHookList);
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
jobStatusChangedListeners.forEach(
listener -> {
try {
listener.onEvent(
new DefaultQueryOperationEvent(
jobClient.getJobID(), queryOperationsList));
} catch (Throwable e) {
// should not interrupt the flow
}
});

final List<Column> columns = new ArrayList<>();
Long[] affectedRowCounts = new Long[transformations.size()];
for (int i = 0; i < transformations.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.execution;

import org.apache.flink.api.common.JobID;
import org.apache.flink.table.operations.QueryOperation;

import java.util.List;

/** Default implementation of {@link QueryOperationEvent}. */
public class DefaultQueryOperationEvent implements QueryOperationEvent {
private final JobID jobId;
private final List<QueryOperation> queryOperation;

public DefaultQueryOperationEvent(JobID jobId, List<QueryOperation> queryOperation) {
this.jobId = jobId;
this.queryOperation = queryOperation;
}

@Override
public List<QueryOperation> queryOperation() {
return queryOperation;
}

@Override
public JobID jobId() {
return jobId;
}

@Override
public String jobName() {
return null;
}
}
Loading

0 comments on commit f15dd9e

Please sign in to comment.