diff --git a/toolkit/scripts/telemetry_hopper/requirements.txt b/toolkit/scripts/telemetry_hopper/requirements.txt new file mode 100755 index 000000000..4e86e361d --- /dev/null +++ b/toolkit/scripts/telemetry_hopper/requirements.txt @@ -0,0 +1,5 @@ +azure-monitor-opentelemetry-exporter==1.0.0b37 +opentelemetry-api==1.33.1 +opentelemetry-sdk==1.33.1 +opentelemetry-proto==1.33.1 +grpcio==1.72.1 diff --git a/toolkit/scripts/telemetry_hopper/telemetry_hopper.py b/toolkit/scripts/telemetry_hopper/telemetry_hopper.py new file mode 100755 index 000000000..ed22ceb4d --- /dev/null +++ b/toolkit/scripts/telemetry_hopper/telemetry_hopper.py @@ -0,0 +1,238 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import os +import logging +import grpc +import signal +import argparse +from concurrent import futures +from typing import Dict, Any, List, Optional + +from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter +from opentelemetry import trace +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace import SpanContext, TraceFlags, TraceState +from opentelemetry.proto.collector.trace.v1 import ( + trace_service_pb2, + trace_service_pb2_grpc, +) +from opentelemetry.proto.trace.v1.trace_pb2 import Span as ProtoSpan +from opentelemetry.proto.common.v1.common_pb2 import KeyValue + +SHUTDOWN_GRACE_PERIOD_SEC = 5 + +# TODO: Replace with PME resource string when available +AZURE_CONN_STR = "InstrumentationKey=c0b360fa-422d-40e5-b8a9-d642578f9fce;IngestionEndpoint=https://eastus-8.in.applicationinsights.azure.com/;ApplicationId=087d527c-b60e-4346-a679-f6abf367d0f0" + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("image-customizer-telemetry") + + +class SpanData: + """SpanData class for Azure Monitor export.""" + + def __init__( + self, proto_span: ProtoSpan, resource_attrs: Dict[str, Any], inst_scope: Any + ) -> None: + try: + self.name = proto_span.name + self.start_time = proto_span.start_time_unix_nano + self.end_time = proto_span.end_time_unix_nano + self.kind = proto_span.kind + + self.attributes = self._set_attributes( + proto_span.attributes, resource_attrs + ) + self.status = self._extract_status(proto_span) + self.events = proto_span.events + self.links = proto_span.links + self.context = self._create_span_context( + proto_span.trace_id, proto_span.span_id + ) + self.parent = self._create_span_context( + proto_span.trace_id, proto_span.parent_span_id + ) + self.resource = Resource.create(resource_attrs) + self.instrumentation_scope = inst_scope + + except Exception as e: + logger.error(f"Failed to initialize SpanData: {e}") + raise + + def _set_attributes( + self, proto_attributes: List[KeyValue], resource_attrs: Dict[str, Any] + ) -> Dict[str, Any]: + attributes = dict(resource_attrs) + + span_attrs = extract_attributes_from_proto(proto_attributes) + attributes.update(span_attrs) + + return attributes + + def _extract_status(self, proto_span: ProtoSpan) -> Status: + if proto_span.HasField("status"): + return Status( + status_code=StatusCode(proto_span.status.code), + description=proto_span.status.message or None, + ) + return Status(StatusCode.UNSET) + + def _create_span_context(self, trace_id, span_id) -> SpanContext: + return SpanContext( + trace_id=int.from_bytes(trace_id, "big"), + span_id=int.from_bytes(span_id, "big"), + is_remote=True, + trace_flags=TraceFlags(0), + trace_state=TraceState(), + ) + + +class TraceServiceHandler(trace_service_pb2_grpc.TraceServiceServicer): + """OTLP trace service handler that forwards traces to Azure Monitor.""" + + def __init__(self) -> None: + """Initialize the trace service handler.""" + self.exporter = self._initialize_telemetry() + + def _initialize_telemetry(self) -> AzureMonitorTraceExporter: + """Initialize OpenTelemetry and Azure Monitor exporter.""" + provider = TracerProvider(resource=Resource.create({})) + trace.set_tracer_provider(provider) + + return AzureMonitorTraceExporter(connection_string=AZURE_CONN_STR) + + def Export(self, request, context) -> trace_service_pb2.ExportTraceServiceResponse: + """Export traces to Azure Monitor.""" + try: + spans = self._process_trace_request(request) + + if spans: + result = self.exporter.export(spans) + logger.info( + "Successfully exported %d spans to Azure Monitor (result: %s)", + len(spans), + result, + ) + return trace_service_pb2.ExportTraceServiceResponse() + + except Exception as e: + logger.error("Error processing spans: %s", e, exc_info=True) + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"Failed to process spans: {str(e)}") + return trace_service_pb2.ExportTraceServiceResponse() + + def _process_trace_request(self, request) -> List[SpanData]: + """Process trace request and convert to SpanData objects.""" + spans = [] + + for rs in request.resource_spans: + resource_attrs = extract_attributes_from_proto(rs.resource.attributes) + + for ss in rs.scope_spans: + for proto_span in ss.spans: + try: + span_data = SpanData(proto_span, resource_attrs, ss.scope) + spans.append(span_data) + except Exception as e: + logger.warning(f"Failed to process span {proto_span.name}: {e}") + + return spans + + +# Utility functions for protobuf attribute extraction +def extract_attribute_value(value_proto: Any) -> Optional[Any]: + """Extract value from protobuf AnyValue.""" + value_case = value_proto.WhichOneof("value") + value_mapping = { + "string_value": value_proto.string_value, + "int_value": value_proto.int_value, + "double_value": value_proto.double_value, + "bool_value": value_proto.bool_value, + } + return value_mapping.get(value_case) + + +def extract_attributes_from_proto(proto_attributes: List[KeyValue]) -> Dict[str, Any]: + """Extract attributes from protobuf KeyValue pairs.""" + attributes = {} + for kv in proto_attributes: + value = extract_attribute_value(kv.value) + if value is not None: + attributes[kv.key] = value + return attributes + + +class TelemetryServer: + """Telemetry hopper server that forwards OTLP traces to Azure Monitor.""" + + def __init__(self, port: int): + self.port = port + self.server: Optional[grpc.Server] = None + + def _start(self) -> None: + """Start the telemetry forwarding server.""" + try: + self.server = self._create_server() + self._setup_signal_handlers() + + self.server.start() + logger.info( + f"Telemetry server listening on port {self.port} for OTLP traces" + ) + + except Exception as e: + logger.error(f"Failed to start server: {e}") + raise + + def stop(self, grace_period: int = SHUTDOWN_GRACE_PERIOD_SEC) -> None: + """Stop the telemetry server gracefully.""" + self.server.stop(grace_period) + logger.info("Server stopped") + + def wait_for_termination(self) -> None: + self.server.wait_for_termination() + + def run(self) -> None: + self._start() + self.wait_for_termination() + + def _create_server(self) -> grpc.Server: + server = grpc.server(futures.ThreadPoolExecutor()) + trace_service_pb2_grpc.add_TraceServiceServicer_to_server( + TraceServiceHandler(), server + ) + server.add_insecure_port(f"[::]:{self.port}") + return server + + def _setup_signal_handlers(self) -> None: + + def shutdown_handler(signum, frame): + logger.info(f"Received signal {signum}, stopping server") + self.stop() + + signal.signal(signal.SIGINT, shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Telemetry hopper server that forwards OTLP traces to Azure Monitor" + ) + parser.add_argument( + "--port", + type=int, + required=True, + help=f"Port number for the gRPC server to listen on", + ) + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + server = TelemetryServer(port=args.port) + server.run() diff --git a/toolkit/tools/imagecustomizer/container/build-container.sh b/toolkit/tools/imagecustomizer/container/build-container.sh index 7681106aa..06b8af45e 100755 --- a/toolkit/tools/imagecustomizer/container/build-container.sh +++ b/toolkit/tools/imagecustomizer/container/build-container.sh @@ -71,6 +71,10 @@ trap 'cleanUp' ERR exeFile="$enlistmentRoot/toolkit/out/tools/imagecustomizer" licensesDir="$enlistmentRoot/toolkit/out/LICENSES" +telemetryScript="$enlistmentRoot/toolkit/scripts/telemetry_hopper/telemetry_hopper.py" +telemetryRequirements="$enlistmentRoot/toolkit/scripts/telemetry_hopper/requirements.txt" +entrypointScript="$scriptDir/entrypoint.sh" + stagingBinDir="${containerStagingFolder}/usr/local/bin" stagingLicensesDir="${containerStagingFolder}/usr/local/share/licenses" @@ -84,6 +88,9 @@ mkdir -p "${stagingLicensesDir}" cp "$exeFile" "${stagingBinDir}" cp "$runScriptPath" "${stagingBinDir}" cp -R "$licensesDir" "${stagingLicensesDir}" +cp "$telemetryScript" "${stagingBinDir}" +cp "$telemetryRequirements" "${stagingBinDir}" +cp "$entrypointScript" "${stagingBinDir}" touch ${containerStagingFolder}/.mariner-toolkit-ignore-dockerenv diff --git a/toolkit/tools/imagecustomizer/container/entrypoint.sh b/toolkit/tools/imagecustomizer/container/entrypoint.sh new file mode 100755 index 000000000..841b07aca --- /dev/null +++ b/toolkit/tools/imagecustomizer/container/entrypoint.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +set -e + +ENABLE_TELEMETRY="${ENABLE_TELEMETRY:-false}" + +# Check if --disable-telemetry flag is present in arguments +for arg in "$@"; do + if [[ "$arg" == "--disable-telemetry" ]]; then + ENABLE_TELEMETRY=false + break + fi +done + +# Start telemetry service if enabled +if [[ "$ENABLE_TELEMETRY" == "true" ]]; then + + export OTEL_PORT=4317 + export OTEL_EXPORTER_OTLP_PROTOCOL="grpc" + export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:${OTEL_PORT}" + + /opt/telemetry-venv/bin/python /usr/local/bin/telemetry_hopper.py --port $OTEL_PORT > /dev/null 2>&1 || true & + sleep 1 +fi + +exec "$@" diff --git a/toolkit/tools/imagecustomizer/container/imagecustomizer.Dockerfile b/toolkit/tools/imagecustomizer/container/imagecustomizer.Dockerfile index ec329afc8..ce08b23d6 100644 --- a/toolkit/tools/imagecustomizer/container/imagecustomizer.Dockerfile +++ b/toolkit/tools/imagecustomizer/container/imagecustomizer.Dockerfile @@ -7,6 +7,14 @@ FROM ${BASE_IMAGE} RUN tdnf update -y && \ tdnf install -y qemu-img rpm coreutils util-linux systemd openssl \ sed createrepo_c squashfs-tools cdrkit parted e2fsprogs dosfstools \ - xfsprogs zstd veritysetup grub2 grub2-pc systemd-ukify binutils lsof + xfsprogs zstd veritysetup grub2 grub2-pc systemd-ukify binutils lsof \ + python3 python3-pip && \ + tdnf clean all COPY . / + +# Create virtual environment and install Python dependencies for telemetry +RUN python3 -m venv /opt/telemetry-venv && \ + /opt/telemetry-venv/bin/pip install --no-cache-dir -r /usr/local/bin/requirements.txt + +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] diff --git a/toolkit/tools/internal/telemetry/telemetry.go b/toolkit/tools/internal/telemetry/telemetry.go index 576a52db0..74f8e9b64 100644 --- a/toolkit/tools/internal/telemetry/telemetry.go +++ b/toolkit/tools/internal/telemetry/telemetry.go @@ -19,8 +19,11 @@ import ( var shutdownFn func(ctx context.Context) error func InitTelemetry(disableTelemetry bool) error { - if disableTelemetry || os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" { - logger.Log.Debug("Disabled telemetry collection") + if disableTelemetry { + logger.Log.Info("Disabled telemetry collection") + return nil + } else if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") == "" { + logger.Log.Debug("No OTLP endpoint set, telemetry will not be collected") return nil }