Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azeventhubreceiver copy 2024-06-11 #21

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions collector/components/azureeventhubreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this would need to be added

125 changes: 125 additions & 0 deletions collector/components/azureeventhubreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Azure Event Hub Receiver

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [alpha]: metrics, logs |
| Distributions | [contrib], [observiq], [splunk], [sumo] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fazureeventhub%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fazureeventhub) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fazureeventhub%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fazureeventhub) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@djaglowski](https://www.github.com/djaglowski) |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[observiq]: https://github.com/observIQ/observiq-otel-collector
[splunk]: https://github.com/signalfx/splunk-otel-collector
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
<!-- end autogenerated section -->

## Overview
Azure resources and services can be
[configured](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/diagnostic-settings)
to send their logs to an Azure Event Hub. The Azure Event Hub receiver pulls logs from an Azure
Event Hub, transforms them, and pushes them through the collector pipeline.

## Configuration

### connection (Required)
A string describing the connection to an Azure event hub.

### group (Optional)
The Consumer Group to read from. If empty will default to the default Consumer Group $Default

### partition (Optional)
The partition to watch. If empty, it will watch explicitly all partitions.

Default: ""

### offset (Optional)
The offset at which to start watching the event hub. If empty, it starts with the latest offset.

Default: ""

### format (Optional)
Determines how to transform the Event Hub messages into OpenTelemetry logs. See the "Format"
section below for details.

Default: "azure"

### Example Configuration

```yaml
receivers:
azureeventhub:
connection: Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
partition: foo
group: bar
offset: "1234-5566"
format: "azure"
```

This component can persist its state using the [storage extension].

## Format

### raw

The "raw" format maps the AMQP properties and data into the
attributes and body of an OpenTelemetry LogRecord, respectively.
The body is represented as a raw byte array.

This format is not supported for Metrics.

### azure

The "azure" format extracts the Azure log records from the AMQP
message data, parses them, and maps the fields to OpenTelemetry
attributes. The table below summarizes the mapping between the
[Azure common log format](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema)
and the OpenTelemetry attributes.


| Azure | OpenTelemetry |
|----------------------------------|----------------------------------------|
| callerIpAddress (optional) | net.sock.peer.addr (attribute) |
| correlationId (optional) | azure.correlation.id (attribute) |
| category (optional) | azure.category (attribute) |
| durationMs (optional) | azure.duration (attribute) |
| Level (optional) | severity_number, severity_text (field) |
| location (optional) | cloud.region (attribute) |
| — | cloud.provider (attribute) |
| operationName (required) | azure.operation.name (attribute) |
| operationVersion (optional) | azure.operation.version (attribute) |
| properties (optional) | azure.properties (attribute, nested) |
| resourceId (required) | azure.resource.id (resource attribute) |
| resultDescription (optional) | azure.result.description (attribute) |
| resultSignature (optional) | azure.result.signature (attribute) |
| resultType (optional) | azure.result.type (attribute) |
| tenantId (required, tenant logs) | azure.tenant.id (attribute) |
| time or timeStamp (required) | time_unix_nano (time takes precedence) |
| identity (optional) | azure.identity (attribute, nested) |

Notes:
* JSON does not distinguish between fixed and floating point numbers. All
JSON numbers are encoded as doubles.

For Metrics the Azure Metric Records are an array
of "records" with the following fields.

| Azure | Open Telemetry |
|------------|---------------------------------------------|
| time | time_unix_nano (field) |
| resourceId | azure.resource.id (resource attribute) |
| metricName | |
| timeGrain | start_time_unix_nano (field) |
| total | mapped to datapoint metricName + "_TOTAL" |
| count | mapped to datapoint metricName + "_COUNT" |
| minimum | mapped to datapoint metricName + "_MINIMUM" |
| maximum | mapped to datapoint metricName + "_MAXIMUM" |
| average | mapped to datapoint metricName + "_AVERAGE" |

From this data a Metric of type Gauge is created
with a Data Points that represents the values
for the Metric including: Total, Minimum, Maximum,
Average and Count.

[storage extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage
190 changes: 190 additions & 0 deletions collector/components/azureeventhubreceiver/azureeventprocessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package azureeventhubreceiver

// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/processor.go
// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/processor_partition_client.go

/*
>> https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go
- get a processor
- dispatchPartitionClients
- processor.Run



>> https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_events_test.go
- ReceiveEvents(ctx, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
- call cancel()
- panic if there's an error that isn't context.DeadlineExceeded
- process events
--> put them into the entity thingy
*/

// import (
// "context"
// "errors"
// "fmt"
// "time"

// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
// "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
// )

// // Assuming there's a struct managing the processor setup
// // type EventHubProcessor struct {
// // Processor *azeventhubs.Processor
// // }

// // Updated initialization function using the new SDK components
// func NewEventHubProcessor(ehConn, ehName, storageConn, storageCnt string) (*EventHubProcessor, error) {
// checkpointingProcessor, err := newCheckpointingProcessor(ehConn, ehName, storageConn, storageCnt)
// if err != nil {
// return nil, fmt.Errorf("failed to create checkpointing processor: %w", err)
// }

// // Start processing events
// return &EventHubProcessor{
// Processor: checkpointingProcessor,
// }, nil
// }

// // Assume there's a function to start processing events
// func (e *EventHubProcessor) StartProcessing(ctx context.Context) error {
// // Start the processor
// if err := e.Processor.Run(ctx); err != nil {
// return fmt.Errorf("error running processor: %w", err)
// }
// return nil
// }

// // Assuming there's a struct managing the processor setup
// type EventHubProcessor struct {
// Processor *azeventhubs.Processor
// }

// // These are config values the processor factory can use to create processors:
// //
// // (a) EventHubConnectionString
// // (b) EventHubName
// // (c) StorageConnectionString
// // (d) StorageContainerName
// //
// // You always need the EventHub variable values.
// // And you need all 4 of these to checkpoint.
// //
// // I think the config values should be managed in the factory struct.
// /*
// func (pf *processorFactory) CreateProcessor() (*azeventhubs.Processor, error) {
// // Create the consumer client
// consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(pf.EventHubConnectionString, pf.EventHubName, azeventhubs.DefaultConsumerGroup, nil)
// if err != nil {
// return nil, err
// }

// // Create the blob container client for the checkpoint store
// blobContainerClient, err := container.NewClientFromConnectionString(pf.StorageConnectionString, pf.StorageContainerName, nil)
// if err != nil {
// return nil, err
// }

// // Create the checkpoint store using the blob container client
// checkpointStore, err := azeventhubs.NewBlobCheckpointStore(blobContainerClient, nil)
// // checkpointStore, err := azeventhubs.NewBlobCheckpointStore(blobContainerClient, nil)
// // if err != nil {
// // return nil, err
// // }

// // Create the processor with checkpointing
// processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
// if err != nil {
// return nil, err
// }

// return processor, nil
// }
// */

// // checkpointing processor should be auth aware

// func newCheckpointingProcessor(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.Processor, error) {
// blobContainerClient, err := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil)
// if err != nil {
// return nil, err
// }
// checkpointStore, err := checkpoints.NewBlobStore(blobContainerClient, nil)
// if err != nil {
// return nil, err
// }

// consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(eventHubConnectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil)
// if err != nil {
// return nil, err
// }

// return azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
// }
/*
func dispatchPartitionClients(processor *azeventhubs.Processor) {
for {
processorPartitionClient := processor.NextPartitionClient(context.TODO())
if processorPartitionClient == nil {
break
}

go func() {
if err := processEventsForPartition(processorPartitionClient); err != nil {
panic(err)
}
}()
}
}

func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer shutdownPartitionResources(partitionClient)
if err := initializePartitionResources(partitionClient.PartitionID()); err != nil {
return err
}

for {
receiveCtx, cancelReceive := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
cancelReceive()

if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
if len(events) == 0 {
continue
}

if err := processEvents(events, partitionClient); err != nil {
return err
}

if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}

func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
if err := partitionClient.Close(context.TODO()); err != nil {
panic(err)
}
}

func initializePartitionResources(partitionID string) error {
fmt.Printf("Initializing resources for partition %s\n", partitionID)
return nil
}

// This is very much like the old processEvents function
func processEvents(events []*azeventhubs.ReceivedEventData, partitionClient *azeventhubs.ProcessorPartitionClient) error {
for _, event := range events {


// fmt.Printf("Processing event: %v\n", event.EventData())
}
return nil
}
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure"
)

type AzureResourceLogsEventUnmarshaler struct {
unmarshaler *azure.ResourceLogsUnmarshaler
}

func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventLogsUnmarshaler {
return AzureResourceLogsEventUnmarshaler{
unmarshaler: &azure.ResourceLogsUnmarshaler{
Version: buildInfo.Version,
Logger: logger,
},
}
}

// UnmarshalLogs takes a byte array containing a JSON-encoded
// payload with Azure log records and transforms it into
// an OpenTelemetry plog.Logs object. The data in the Azure
// log record appears as fields and attributes in the
// OpenTelemetry representation; the bodies of the
// OpenTelemetry log records are empty.
func (r AzureResourceLogsEventUnmarshaler) UnmarshalLogs(event *azeventhubs.ReceivedEventData) (plog.Logs, error) {
return r.unmarshaler.UnmarshalLogs(event.Body)
}
Loading
Loading