Skip to content

Add python telemetry hopper to land telemetry data in Azure Monitor #249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions toolkit/scripts/telemetry_hopper/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
azure-monitor-opentelemetry-exporter==1.0.0b37
opentelemetry-api==1.33.1
opentelemetry-sdk==1.33.1
opentelemetry-proto==1.33.1
grpcio==1.72.1
238 changes: 238 additions & 0 deletions toolkit/scripts/telemetry_hopper/telemetry_hopper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import os
import logging
import grpc
import signal
import argparse
from concurrent import futures
from typing import Dict, Any, List, Optional

from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
from opentelemetry import trace
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace import SpanContext, TraceFlags, TraceState
from opentelemetry.proto.collector.trace.v1 import (
trace_service_pb2,
trace_service_pb2_grpc,
)
from opentelemetry.proto.trace.v1.trace_pb2 import Span as ProtoSpan
from opentelemetry.proto.common.v1.common_pb2 import KeyValue

SHUTDOWN_GRACE_PERIOD_SEC = 5

# TODO: Replace with PME resource string when available
AZURE_CONN_STR = "InstrumentationKey=c0b360fa-422d-40e5-b8a9-d642578f9fce;IngestionEndpoint=https://eastus-8.in.applicationinsights.azure.com/;ApplicationId=087d527c-b60e-4346-a679-f6abf367d0f0"


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("image-customizer-telemetry")


class SpanData:
"""SpanData class for Azure Monitor export."""

def __init__(
self, proto_span: ProtoSpan, resource_attrs: Dict[str, Any], inst_scope: Any
) -> None:
try:
self.name = proto_span.name
self.start_time = proto_span.start_time_unix_nano
self.end_time = proto_span.end_time_unix_nano
self.kind = proto_span.kind

self.attributes = self._set_attributes(
proto_span.attributes, resource_attrs
)
self.status = self._extract_status(proto_span)
self.events = proto_span.events
self.links = proto_span.links
self.context = self._create_span_context(
proto_span.trace_id, proto_span.span_id
)
self.parent = self._create_span_context(
proto_span.trace_id, proto_span.parent_span_id
)
self.resource = Resource.create(resource_attrs)
self.instrumentation_scope = inst_scope

except Exception as e:
logger.error(f"Failed to initialize SpanData: {e}")
raise

def _set_attributes(
self, proto_attributes: List[KeyValue], resource_attrs: Dict[str, Any]
) -> Dict[str, Any]:
attributes = dict(resource_attrs)

span_attrs = extract_attributes_from_proto(proto_attributes)
attributes.update(span_attrs)

return attributes

def _extract_status(self, proto_span: ProtoSpan) -> Status:
if proto_span.HasField("status"):
return Status(
status_code=StatusCode(proto_span.status.code),
description=proto_span.status.message or None,
)
return Status(StatusCode.UNSET)

def _create_span_context(self, trace_id, span_id) -> SpanContext:
return SpanContext(
trace_id=int.from_bytes(trace_id, "big"),
span_id=int.from_bytes(span_id, "big"),
is_remote=True,
trace_flags=TraceFlags(0),
trace_state=TraceState(),
)


class TraceServiceHandler(trace_service_pb2_grpc.TraceServiceServicer):
"""OTLP trace service handler that forwards traces to Azure Monitor."""

def __init__(self) -> None:
"""Initialize the trace service handler."""
self.exporter = self._initialize_telemetry()

def _initialize_telemetry(self) -> AzureMonitorTraceExporter:
"""Initialize OpenTelemetry and Azure Monitor exporter."""
provider = TracerProvider(resource=Resource.create({}))
trace.set_tracer_provider(provider)

return AzureMonitorTraceExporter(connection_string=AZURE_CONN_STR)

def Export(self, request, context) -> trace_service_pb2.ExportTraceServiceResponse:
"""Export traces to Azure Monitor."""
try:
spans = self._process_trace_request(request)

if spans:
result = self.exporter.export(spans)
logger.info(
"Successfully exported %d spans to Azure Monitor (result: %s)",
len(spans),
result,
)
return trace_service_pb2.ExportTraceServiceResponse()

except Exception as e:
logger.error("Error processing spans: %s", e, exc_info=True)
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Failed to process spans: {str(e)}")
return trace_service_pb2.ExportTraceServiceResponse()

def _process_trace_request(self, request) -> List[SpanData]:
"""Process trace request and convert to SpanData objects."""
spans = []

for rs in request.resource_spans:
resource_attrs = extract_attributes_from_proto(rs.resource.attributes)

for ss in rs.scope_spans:
for proto_span in ss.spans:
try:
span_data = SpanData(proto_span, resource_attrs, ss.scope)
spans.append(span_data)
except Exception as e:
logger.warning(f"Failed to process span {proto_span.name}: {e}")

return spans


# Utility functions for protobuf attribute extraction
def extract_attribute_value(value_proto: Any) -> Optional[Any]:
"""Extract value from protobuf AnyValue."""
value_case = value_proto.WhichOneof("value")
value_mapping = {
"string_value": value_proto.string_value,
"int_value": value_proto.int_value,
"double_value": value_proto.double_value,
"bool_value": value_proto.bool_value,
}
return value_mapping.get(value_case)


def extract_attributes_from_proto(proto_attributes: List[KeyValue]) -> Dict[str, Any]:
"""Extract attributes from protobuf KeyValue pairs."""
attributes = {}
for kv in proto_attributes:
value = extract_attribute_value(kv.value)
if value is not None:
attributes[kv.key] = value
return attributes


class TelemetryServer:
"""Telemetry hopper server that forwards OTLP traces to Azure Monitor."""

def __init__(self, port: int):
self.port = port
self.server: Optional[grpc.Server] = None

def _start(self) -> None:
"""Start the telemetry forwarding server."""
try:
self.server = self._create_server()
self._setup_signal_handlers()

self.server.start()
logger.info(
f"Telemetry server listening on port {self.port} for OTLP traces"
)

except Exception as e:
logger.error(f"Failed to start server: {e}")
raise

def stop(self, grace_period: int = SHUTDOWN_GRACE_PERIOD_SEC) -> None:
"""Stop the telemetry server gracefully."""
self.server.stop(grace_period)
logger.info("Server stopped")

def wait_for_termination(self) -> None:
self.server.wait_for_termination()

def run(self) -> None:
self._start()
self.wait_for_termination()

def _create_server(self) -> grpc.Server:
server = grpc.server(futures.ThreadPoolExecutor())
trace_service_pb2_grpc.add_TraceServiceServicer_to_server(
TraceServiceHandler(), server
)
server.add_insecure_port(f"[::]:{self.port}")
return server

def _setup_signal_handlers(self) -> None:

def shutdown_handler(signum, frame):
logger.info(f"Received signal {signum}, stopping server")
self.stop()

signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)


def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Telemetry hopper server that forwards OTLP traces to Azure Monitor"
)
parser.add_argument(
"--port",
type=int,
required=True,
help=f"Port number for the gRPC server to listen on",
)
return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
server = TelemetryServer(port=args.port)
server.run()
7 changes: 7 additions & 0 deletions toolkit/tools/imagecustomizer/container/build-container.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ trap 'cleanUp' ERR
exeFile="$enlistmentRoot/toolkit/out/tools/imagecustomizer"
licensesDir="$enlistmentRoot/toolkit/out/LICENSES"

telemetryScript="$enlistmentRoot/toolkit/scripts/telemetry_hopper/telemetry_hopper.py"
telemetryRequirements="$enlistmentRoot/toolkit/scripts/telemetry_hopper/requirements.txt"
entrypointScript="$scriptDir/entrypoint.sh"

stagingBinDir="${containerStagingFolder}/usr/local/bin"
stagingLicensesDir="${containerStagingFolder}/usr/local/share/licenses"

Expand All @@ -84,6 +88,9 @@ mkdir -p "${stagingLicensesDir}"
cp "$exeFile" "${stagingBinDir}"
cp "$runScriptPath" "${stagingBinDir}"
cp -R "$licensesDir" "${stagingLicensesDir}"
cp "$telemetryScript" "${stagingBinDir}"
cp "$telemetryRequirements" "${stagingBinDir}"
cp "$entrypointScript" "${stagingBinDir}"

touch ${containerStagingFolder}/.mariner-toolkit-ignore-dockerenv

Expand Down
24 changes: 24 additions & 0 deletions toolkit/tools/imagecustomizer/container/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

set -e

ENABLE_TELEMETRY="${ENABLE_TELEMETRY:-true}"

# Check if --disable-telemetry flag is present in arguments
for arg in "$@"; do
if [[ "$arg" == "--disable-telemetry" ]]; then
ENABLE_TELEMETRY=false
break
fi
done

# Start telemetry service if enabled
if [[ "$ENABLE_TELEMETRY" == "true" ]]; then
/opt/telemetry-venv/bin/python /usr/local/bin/telemetry_hopper.py --port $OTEL_PORT > /dev/null 2>&1 || true &
sleep 1
fi

exec "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ FROM ${BASE_IMAGE}
RUN tdnf update -y && \
tdnf install -y qemu-img rpm coreutils util-linux systemd openssl \
sed createrepo_c squashfs-tools cdrkit parted e2fsprogs dosfstools \
xfsprogs zstd veritysetup grub2 grub2-pc systemd-ukify binutils lsof
xfsprogs zstd veritysetup grub2 grub2-pc systemd-ukify binutils lsof \
python3 python3-pip && \
tdnf clean all

COPY . /

ENV OTEL_PORT=4317
ENV OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:${OTEL_PORT}" \
OTEL_EXPORTER_OTLP_PROTOCOL="grpc"

# Create virtual environment and install Python dependencies for telemetry
RUN python3 -m venv /opt/telemetry-venv && \
/opt/telemetry-venv/bin/pip install --no-cache-dir -r /usr/local/bin/requirements.txt

ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, it is kind of odd that the user needs to specify either imagecustomizer or run.sh when calling the Image Customizer container. If we are going to specify ENTRYPOINT, I wonder if we should take the opportunity to unify things.

Copy link
Contributor Author

@aditjha-msft aditjha-msft Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like specifying imagecustomizer in the docker run call actually seems more simple than the alternate? The alternate in this case being the entrypoint script that would need all the same information (probably via environment variables passed in?), so it doesn't seem to help the docker run command all that much anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But happy to expand the entrypoint logic if we want to unify how the container is ran. Though, I think this would be better decided through a different PR.

2 changes: 1 addition & 1 deletion toolkit/tools/internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var shutdownFn func(ctx context.Context) error

func InitTelemetry(disableTelemetry bool) error {
if disableTelemetry || os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" {
logger.Log.Debug("Disabled telemetry collection")
logger.Log.Info("Disabled telemetry collection")
return nil
}

Expand Down
Loading