diff --git a/README.md b/README.md index c9d7eb2..2fc83ba 100644 --- a/README.md +++ b/README.md @@ -171,3 +171,26 @@ You can use timing to record delta time file_name = 'myfile.txt' otel_hook.timing("my.last_duration", last_duration, tags={"file_name": file_name}) ``` + +#### External Links + +##### Traces + +This provide can provide external links in the trace instance view. +You may need to define the following env. variable to do so. + +``` +AIRFLOW_OTEL_TRACE_LINK='https://ui.honeycomb.io/demo/environments/otel-workshop/trace?trace_id={trace_id}&span={span_id}&trace_start_ts={start_date_ts}' +``` + +When the environment is defined, you will be able to see the following button named `OTEL Trace Link` shown under `Extra LInks` section. +![external_link](./images/external_link.png) + +The above example is when you are using [Honeycomb](https://honeycomb.io) as the OTEL backend to ingest the traces. + +The following attributes are supported in the template string: +- trace_id : denotes the current trace_id of the dag run +- span_id : denotes the current span_id of the task instance +- start_date_ts : start date (minus 10 sec) in UNIX EPOCH seconds +- end_date_ts : end date (plus 10 sec) in UNIX EPOCH seconds + diff --git a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/CHANGELOG.rst b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/CHANGELOG.rst index 23caa23..3cb51b8 100644 --- a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/CHANGELOG.rst +++ b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/CHANGELOG.rst @@ -27,6 +27,17 @@ Changelog --------- +1.0.3 +..... + +Added external link where a button will appear in task run view that may redirect to the external OTEL backend +using provided URL. + +1.0.2 +..... + +Bug fixes + 1.0.0 ..... diff --git a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel.py b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel.py index fc1cae3..7e55c8c 100644 --- a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel.py +++ b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel.py @@ -19,7 +19,7 @@ from airflow.plugins_manager import AirflowPlugin from airflow_provider_opentelemetry.hooks.otel import is_listener_enabled, is_otel_traces_enabled from airflow_provider_opentelemetry.plugins.otel_listener import get_opentelemetry_listener - +from airflow_provider_opentelemetry.plugins.otel_extra_link import get_opentelemetry_links class OtelPlugin(AirflowPlugin): """provide listener for OpenTelemetry.""" @@ -27,3 +27,5 @@ class OtelPlugin(AirflowPlugin): name = "OtelPlugin" if is_listener_enabled() and not is_otel_traces_enabled(): listeners = [get_opentelemetry_listener()] + + global_operator_extra_links = get_opentelemetry_links() \ No newline at end of file diff --git a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel_extra_link.py b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel_extra_link.py new file mode 100644 index 0000000..970d98e --- /dev/null +++ b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/plugins/otel_extra_link.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from airflow.models.baseoperator import BaseOperator +from airflow.models.baseoperatorlink import BaseOperatorLink +from airflow.models.taskinstancekey import TaskInstanceKey +from airflow.models.dagrun import DagRun +from airflow_provider_opentelemetry.util import gen_trace_id +from airflow_provider_opentelemetry.util import gen_span_id_from_ti_key + +OTEL_TRACE_LINK = 'AIRFLOW_OTEL_TRACE_LINK' + +def get_opentelemetry_links(): + links = [] + link_template = os.getenv(OTEL_TRACE_LINK, None) + if link_template is not None: + links.append(OtelTraceLink()) + return links + +class OtelTraceLink(BaseOperatorLink): + name = "OTEL Trace Link" + + def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey): + dag_run = DagRun.find(dag_id=ti_key.dag_id, run_id=ti_key.run_id)[0] + if dag_run is not None: + trace_id = gen_trace_id(dag_run) + span_id = gen_span_id_from_ti_key(ti_key) + start_date_ts = int(dag_run.start_date.timestamp()) - 10 + end_date_ts = int(dag_run.end_date.timestamp()) + 10 + template = os.getenv(OTEL_TRACE_LINK) + return template.format(trace_id=trace_id, span_id=span_id, start_date_ts=start_date_ts, end_date_ts=end_date_ts) \ No newline at end of file diff --git a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/util.py b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/util.py index f8eb30b..f117ad7 100644 --- a/airflow_provider_opentelemetry/airflow_provider_opentelemetry/util.py +++ b/airflow_provider_opentelemetry/airflow_provider_opentelemetry/util.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING from airflow.utils.hashlib_wrapper import md5 from airflow.utils.state import TaskInstanceState +from airflow.models.taskinstancekey import TaskInstanceKey from airflow import __version__ as airflow_version if TYPE_CHECKING: @@ -52,6 +53,29 @@ def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int: as_int, ) +def gen_trace_id_from_ti_key(ti_key: TaskInstanceKey, start_date, as_int: bool = False) -> str | int: + if start_date is None: + return NO_TRACE_ID + + return _gen_id( + [ti_key.dag_id, str(ti_key.run_id), str(start_date.timestamp())], + as_int, + ) + +def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> str | int: + from packaging.version import parse + """Generate span id from the task instance.""" + # fix: issue #1 + if parse(parse(airflow_version).base_version) == parse("2.10.0"): + try_number = ti_key.try_number - 1 + else: + try_number = ti_key.try_number + return _gen_id( + [ti_key.dag_id, ti_key.run_id, ti_key.task_id, str(try_number)], + as_int, + SPAN_ID, + ) + def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int: """Generate dag's root span id using dag_run.""" if dag_run.start_date is None: diff --git a/airflow_provider_opentelemetry/setup.py b/airflow_provider_opentelemetry/setup.py index 274b915..a5d1057 100644 --- a/airflow_provider_opentelemetry/setup.py +++ b/airflow_provider_opentelemetry/setup.py @@ -2,7 +2,7 @@ setup( name='airflow-provider-opentelemetry', - version='1.0.2', + version='1.0.3', description='Opentelemetry provider for Airflow', long_description='Opentelemetry provider to produce Spans, Metrics within the DAG code', long_description_content_type='text/markdown', diff --git a/images/external_link.png b/images/external_link.png new file mode 100644 index 0000000..787b6fb Binary files /dev/null and b/images/external_link.png differ