-
Notifications
You must be signed in to change notification settings - Fork 5
Add python telemetry hopper to land telemetry data in Azure Monitor #249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2a3dc66
6e3f71c
fedd099
2cb962d
99daf97
cf9e7a3
ccddd49
1ca31fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
azure-monitor-opentelemetry-exporter==1.0.0b37 | ||
opentelemetry-api==1.33.1 | ||
opentelemetry-sdk==1.33.1 | ||
opentelemetry-proto==1.33.1 | ||
grpcio==1.72.1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,238 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
import os | ||
import logging | ||
import grpc | ||
import signal | ||
import argparse | ||
from concurrent import futures | ||
from typing import Dict, Any, List, Optional | ||
|
||
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter | ||
from opentelemetry import trace | ||
from opentelemetry.trace.status import Status, StatusCode | ||
from opentelemetry.sdk.resources import Resource | ||
from opentelemetry.sdk.trace import TracerProvider | ||
from opentelemetry.trace import SpanContext, TraceFlags, TraceState | ||
from opentelemetry.proto.collector.trace.v1 import ( | ||
trace_service_pb2, | ||
trace_service_pb2_grpc, | ||
) | ||
from opentelemetry.proto.trace.v1.trace_pb2 import Span as ProtoSpan | ||
from opentelemetry.proto.common.v1.common_pb2 import KeyValue | ||
|
||
SHUTDOWN_GRACE_PERIOD_SEC = 5 | ||
|
||
# TODO: Replace with PME resource string when available | ||
AZURE_CONN_STR = "InstrumentationKey=c0b360fa-422d-40e5-b8a9-d642578f9fce;IngestionEndpoint=https://eastus-8.in.applicationinsights.azure.com/;ApplicationId=087d527c-b60e-4346-a679-f6abf367d0f0" | ||
|
||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger("image-customizer-telemetry") | ||
|
||
|
||
class SpanData: | ||
"""SpanData class for Azure Monitor export.""" | ||
|
||
def __init__( | ||
self, proto_span: ProtoSpan, resource_attrs: Dict[str, Any], inst_scope: Any | ||
) -> None: | ||
try: | ||
self.name = proto_span.name | ||
self.start_time = proto_span.start_time_unix_nano | ||
self.end_time = proto_span.end_time_unix_nano | ||
self.kind = proto_span.kind | ||
|
||
self.attributes = self._set_attributes( | ||
proto_span.attributes, resource_attrs | ||
) | ||
self.status = self._extract_status(proto_span) | ||
self.events = proto_span.events | ||
self.links = proto_span.links | ||
self.context = self._create_span_context( | ||
proto_span.trace_id, proto_span.span_id | ||
) | ||
self.parent = self._create_span_context( | ||
proto_span.trace_id, proto_span.parent_span_id | ||
) | ||
self.resource = Resource.create(resource_attrs) | ||
self.instrumentation_scope = inst_scope | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to initialize SpanData: {e}") | ||
raise | ||
|
||
def _set_attributes( | ||
self, proto_attributes: List[KeyValue], resource_attrs: Dict[str, Any] | ||
) -> Dict[str, Any]: | ||
attributes = dict(resource_attrs) | ||
|
||
span_attrs = extract_attributes_from_proto(proto_attributes) | ||
attributes.update(span_attrs) | ||
|
||
return attributes | ||
|
||
def _extract_status(self, proto_span: ProtoSpan) -> Status: | ||
if proto_span.HasField("status"): | ||
return Status( | ||
status_code=StatusCode(proto_span.status.code), | ||
description=proto_span.status.message or None, | ||
) | ||
return Status(StatusCode.UNSET) | ||
|
||
def _create_span_context(self, trace_id, span_id) -> SpanContext: | ||
return SpanContext( | ||
trace_id=int.from_bytes(trace_id, "big"), | ||
span_id=int.from_bytes(span_id, "big"), | ||
is_remote=True, | ||
trace_flags=TraceFlags(0), | ||
trace_state=TraceState(), | ||
) | ||
|
||
|
||
class TraceServiceHandler(trace_service_pb2_grpc.TraceServiceServicer): | ||
"""OTLP trace service handler that forwards traces to Azure Monitor.""" | ||
|
||
def __init__(self) -> None: | ||
"""Initialize the trace service handler.""" | ||
self.exporter = self._initialize_telemetry() | ||
|
||
def _initialize_telemetry(self) -> AzureMonitorTraceExporter: | ||
"""Initialize OpenTelemetry and Azure Monitor exporter.""" | ||
provider = TracerProvider(resource=Resource.create({})) | ||
trace.set_tracer_provider(provider) | ||
|
||
return AzureMonitorTraceExporter(connection_string=AZURE_CONN_STR) | ||
|
||
def Export(self, request, context) -> trace_service_pb2.ExportTraceServiceResponse: | ||
"""Export traces to Azure Monitor.""" | ||
try: | ||
spans = self._process_trace_request(request) | ||
|
||
if spans: | ||
result = self.exporter.export(spans) | ||
logger.info( | ||
"Successfully exported %d spans to Azure Monitor (result: %s)", | ||
len(spans), | ||
result, | ||
) | ||
return trace_service_pb2.ExportTraceServiceResponse() | ||
|
||
except Exception as e: | ||
logger.error("Error processing spans: %s", e, exc_info=True) | ||
context.set_code(grpc.StatusCode.INTERNAL) | ||
context.set_details(f"Failed to process spans: {str(e)}") | ||
return trace_service_pb2.ExportTraceServiceResponse() | ||
|
||
def _process_trace_request(self, request) -> List[SpanData]: | ||
"""Process trace request and convert to SpanData objects.""" | ||
spans = [] | ||
|
||
for rs in request.resource_spans: | ||
resource_attrs = extract_attributes_from_proto(rs.resource.attributes) | ||
|
||
for ss in rs.scope_spans: | ||
for proto_span in ss.spans: | ||
try: | ||
span_data = SpanData(proto_span, resource_attrs, ss.scope) | ||
spans.append(span_data) | ||
except Exception as e: | ||
logger.warning(f"Failed to process span {proto_span.name}: {e}") | ||
|
||
return spans | ||
|
||
|
||
# Utility functions for protobuf attribute extraction | ||
def extract_attribute_value(value_proto: Any) -> Optional[Any]: | ||
"""Extract value from protobuf AnyValue.""" | ||
value_case = value_proto.WhichOneof("value") | ||
value_mapping = { | ||
"string_value": value_proto.string_value, | ||
"int_value": value_proto.int_value, | ||
"double_value": value_proto.double_value, | ||
"bool_value": value_proto.bool_value, | ||
} | ||
return value_mapping.get(value_case) | ||
|
||
|
||
def extract_attributes_from_proto(proto_attributes: List[KeyValue]) -> Dict[str, Any]: | ||
"""Extract attributes from protobuf KeyValue pairs.""" | ||
attributes = {} | ||
for kv in proto_attributes: | ||
value = extract_attribute_value(kv.value) | ||
if value is not None: | ||
attributes[kv.key] = value | ||
return attributes | ||
|
||
|
||
class TelemetryServer: | ||
"""Telemetry hopper server that forwards OTLP traces to Azure Monitor.""" | ||
|
||
def __init__(self, port: int): | ||
self.port = port | ||
self.server: Optional[grpc.Server] = None | ||
|
||
def _start(self) -> None: | ||
"""Start the telemetry forwarding server.""" | ||
try: | ||
self.server = self._create_server() | ||
self._setup_signal_handlers() | ||
|
||
self.server.start() | ||
logger.info( | ||
f"Telemetry server listening on port {self.port} for OTLP traces" | ||
) | ||
|
||
except Exception as e: | ||
logger.error(f"Failed to start server: {e}") | ||
raise | ||
|
||
def stop(self, grace_period: int = SHUTDOWN_GRACE_PERIOD_SEC) -> None: | ||
"""Stop the telemetry server gracefully.""" | ||
self.server.stop(grace_period) | ||
logger.info("Server stopped") | ||
|
||
def wait_for_termination(self) -> None: | ||
self.server.wait_for_termination() | ||
|
||
def run(self) -> None: | ||
self._start() | ||
self.wait_for_termination() | ||
|
||
def _create_server(self) -> grpc.Server: | ||
server = grpc.server(futures.ThreadPoolExecutor()) | ||
trace_service_pb2_grpc.add_TraceServiceServicer_to_server( | ||
TraceServiceHandler(), server | ||
) | ||
server.add_insecure_port(f"[::]:{self.port}") | ||
return server | ||
|
||
def _setup_signal_handlers(self) -> None: | ||
|
||
def shutdown_handler(signum, frame): | ||
logger.info(f"Received signal {signum}, stopping server") | ||
self.stop() | ||
|
||
signal.signal(signal.SIGINT, shutdown_handler) | ||
signal.signal(signal.SIGTERM, shutdown_handler) | ||
|
||
|
||
def parse_args(): | ||
"""Parse command line arguments.""" | ||
parser = argparse.ArgumentParser( | ||
description="Telemetry hopper server that forwards OTLP traces to Azure Monitor" | ||
) | ||
parser.add_argument( | ||
"--port", | ||
type=int, | ||
required=True, | ||
help=f"Port number for the gRPC server to listen on", | ||
) | ||
return parser.parse_args() | ||
|
||
|
||
if __name__ == "__main__": | ||
args = parse_args() | ||
server = TelemetryServer(port=args.port) | ||
server.run() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
#!/usr/bin/env bash | ||
aditjha-msft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
set -e | ||
|
||
ENABLE_TELEMETRY="${ENABLE_TELEMETRY:-true}" | ||
|
||
# Check if --disable-telemetry flag is present in arguments | ||
for arg in "$@"; do | ||
if [[ "$arg" == "--disable-telemetry" ]]; then | ||
ENABLE_TELEMETRY=false | ||
aditjha-msft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break | ||
fi | ||
done | ||
|
||
# Start telemetry service if enabled | ||
if [[ "$ENABLE_TELEMETRY" == "true" ]]; then | ||
/opt/telemetry-venv/bin/python /usr/local/bin/telemetry_hopper.py --port $OTEL_PORT > /dev/null 2>&1 || true & | ||
sleep 1 | ||
fi | ||
|
||
exec "$@" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,18 @@ FROM ${BASE_IMAGE} | |
RUN tdnf update -y && \ | ||
tdnf install -y qemu-img rpm coreutils util-linux systemd openssl \ | ||
sed createrepo_c squashfs-tools cdrkit parted e2fsprogs dosfstools \ | ||
xfsprogs zstd veritysetup grub2 grub2-pc systemd-ukify binutils lsof | ||
xfsprogs zstd veritysetup grub2 grub2-pc systemd-ukify binutils lsof \ | ||
python3 python3-pip && \ | ||
tdnf clean all | ||
|
||
COPY . / | ||
|
||
ENV OTEL_PORT=4317 | ||
ENV OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:${OTEL_PORT}" \ | ||
OTEL_EXPORTER_OTLP_PROTOCOL="grpc" | ||
|
||
# Create virtual environment and install Python dependencies for telemetry | ||
RUN python3 -m venv /opt/telemetry-venv && \ | ||
/opt/telemetry-venv/bin/pip install --no-cache-dir -r /usr/local/bin/requirements.txt | ||
|
||
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest, it is kind of odd that the user needs to specify either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like specifying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But happy to expand the entrypoint logic if we want to unify how the container is ran. Though, I think this would be better decided through a different PR. |
Uh oh!
There was an error while loading. Please reload this page.