-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
azeventhubreceiver copy 2024-06-11 #21
Open
nslaughter
wants to merge
1
commit into
main
Choose a base branch
from
nslaughter/azureeventhubreceiver-20240611
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
# Azure Event Hub Receiver | ||
|
||
<!-- status autogenerated section --> | ||
| Status | | | ||
| ------------- |-----------| | ||
| Stability | [alpha]: metrics, logs | | ||
| Distributions | [contrib], [observiq], [splunk], [sumo] | | ||
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fazureeventhub%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fazureeventhub) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fazureeventhub%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fazureeventhub) | | ||
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@djaglowski](https://www.github.com/djaglowski) | | ||
|
||
[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha | ||
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib | ||
[observiq]: https://github.com/observIQ/observiq-otel-collector | ||
[splunk]: https://github.com/signalfx/splunk-otel-collector | ||
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector | ||
<!-- end autogenerated section --> | ||
|
||
## Overview | ||
Azure resources and services can be | ||
[configured](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/diagnostic-settings) | ||
to send their logs to an Azure Event Hub. The Azure Event Hub receiver pulls logs from an Azure | ||
Event Hub, transforms them, and pushes them through the collector pipeline. | ||
|
||
## Configuration | ||
|
||
### connection (Required) | ||
A string describing the connection to an Azure event hub. | ||
|
||
### group (Optional) | ||
The Consumer Group to read from. If empty will default to the default Consumer Group $Default | ||
|
||
### partition (Optional) | ||
The partition to watch. If empty, it will watch explicitly all partitions. | ||
|
||
Default: "" | ||
|
||
### offset (Optional) | ||
The offset at which to start watching the event hub. If empty, it starts with the latest offset. | ||
|
||
Default: "" | ||
|
||
### format (Optional) | ||
Determines how to transform the Event Hub messages into OpenTelemetry logs. See the "Format" | ||
section below for details. | ||
|
||
Default: "azure" | ||
|
||
### Example Configuration | ||
|
||
```yaml | ||
receivers: | ||
azureeventhub: | ||
connection: Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName | ||
partition: foo | ||
group: bar | ||
offset: "1234-5566" | ||
format: "azure" | ||
``` | ||
|
||
This component can persist its state using the [storage extension]. | ||
|
||
## Format | ||
|
||
### raw | ||
|
||
The "raw" format maps the AMQP properties and data into the | ||
attributes and body of an OpenTelemetry LogRecord, respectively. | ||
The body is represented as a raw byte array. | ||
|
||
This format is not supported for Metrics. | ||
|
||
### azure | ||
|
||
The "azure" format extracts the Azure log records from the AMQP | ||
message data, parses them, and maps the fields to OpenTelemetry | ||
attributes. The table below summarizes the mapping between the | ||
[Azure common log format](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema) | ||
and the OpenTelemetry attributes. | ||
|
||
|
||
| Azure | OpenTelemetry | | ||
|----------------------------------|----------------------------------------| | ||
| callerIpAddress (optional) | net.sock.peer.addr (attribute) | | ||
| correlationId (optional) | azure.correlation.id (attribute) | | ||
| category (optional) | azure.category (attribute) | | ||
| durationMs (optional) | azure.duration (attribute) | | ||
| Level (optional) | severity_number, severity_text (field) | | ||
| location (optional) | cloud.region (attribute) | | ||
| — | cloud.provider (attribute) | | ||
| operationName (required) | azure.operation.name (attribute) | | ||
| operationVersion (optional) | azure.operation.version (attribute) | | ||
| properties (optional) | azure.properties (attribute, nested) | | ||
| resourceId (required) | azure.resource.id (resource attribute) | | ||
| resultDescription (optional) | azure.result.description (attribute) | | ||
| resultSignature (optional) | azure.result.signature (attribute) | | ||
| resultType (optional) | azure.result.type (attribute) | | ||
| tenantId (required, tenant logs) | azure.tenant.id (attribute) | | ||
| time or timeStamp (required) | time_unix_nano (time takes precedence) | | ||
| identity (optional) | azure.identity (attribute, nested) | | ||
|
||
Notes: | ||
* JSON does not distinguish between fixed and floating point numbers. All | ||
JSON numbers are encoded as doubles. | ||
|
||
For Metrics the Azure Metric Records are an array | ||
of "records" with the following fields. | ||
|
||
| Azure | Open Telemetry | | ||
|------------|---------------------------------------------| | ||
| time | time_unix_nano (field) | | ||
| resourceId | azure.resource.id (resource attribute) | | ||
| metricName | | | ||
| timeGrain | start_time_unix_nano (field) | | ||
| total | mapped to datapoint metricName + "_TOTAL" | | ||
| count | mapped to datapoint metricName + "_COUNT" | | ||
| minimum | mapped to datapoint metricName + "_MINIMUM" | | ||
| maximum | mapped to datapoint metricName + "_MAXIMUM" | | ||
| average | mapped to datapoint metricName + "_AVERAGE" | | ||
|
||
From this data a Metric of type Gauge is created | ||
with a Data Points that represents the values | ||
for the Metric including: Total, Minimum, Maximum, | ||
Average and Count. | ||
|
||
[storage extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage |
190 changes: 190 additions & 0 deletions
190
collector/components/azureeventhubreceiver/azureeventprocessor.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
package azureeventhubreceiver | ||
|
||
// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/processor.go | ||
// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/processor_partition_client.go | ||
|
||
/* | ||
>> https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go | ||
- get a processor | ||
- dispatchPartitionClients | ||
- processor.Run | ||
|
||
|
||
|
||
>> https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_events_test.go | ||
- ReceiveEvents(ctx, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error) | ||
- call cancel() | ||
- panic if there's an error that isn't context.DeadlineExceeded | ||
- process events | ||
--> put them into the entity thingy | ||
*/ | ||
|
||
// import ( | ||
// "context" | ||
// "errors" | ||
// "fmt" | ||
// "time" | ||
|
||
// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" | ||
// "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" | ||
// "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" | ||
// ) | ||
|
||
// // Assuming there's a struct managing the processor setup | ||
// // type EventHubProcessor struct { | ||
// // Processor *azeventhubs.Processor | ||
// // } | ||
|
||
// // Updated initialization function using the new SDK components | ||
// func NewEventHubProcessor(ehConn, ehName, storageConn, storageCnt string) (*EventHubProcessor, error) { | ||
// checkpointingProcessor, err := newCheckpointingProcessor(ehConn, ehName, storageConn, storageCnt) | ||
// if err != nil { | ||
// return nil, fmt.Errorf("failed to create checkpointing processor: %w", err) | ||
// } | ||
|
||
// // Start processing events | ||
// return &EventHubProcessor{ | ||
// Processor: checkpointingProcessor, | ||
// }, nil | ||
// } | ||
|
||
// // Assume there's a function to start processing events | ||
// func (e *EventHubProcessor) StartProcessing(ctx context.Context) error { | ||
// // Start the processor | ||
// if err := e.Processor.Run(ctx); err != nil { | ||
// return fmt.Errorf("error running processor: %w", err) | ||
// } | ||
// return nil | ||
// } | ||
|
||
// // Assuming there's a struct managing the processor setup | ||
// type EventHubProcessor struct { | ||
// Processor *azeventhubs.Processor | ||
// } | ||
|
||
// // These are config values the processor factory can use to create processors: | ||
// // | ||
// // (a) EventHubConnectionString | ||
// // (b) EventHubName | ||
// // (c) StorageConnectionString | ||
// // (d) StorageContainerName | ||
// // | ||
// // You always need the EventHub variable values. | ||
// // And you need all 4 of these to checkpoint. | ||
// // | ||
// // I think the config values should be managed in the factory struct. | ||
// /* | ||
// func (pf *processorFactory) CreateProcessor() (*azeventhubs.Processor, error) { | ||
// // Create the consumer client | ||
// consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(pf.EventHubConnectionString, pf.EventHubName, azeventhubs.DefaultConsumerGroup, nil) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
|
||
// // Create the blob container client for the checkpoint store | ||
// blobContainerClient, err := container.NewClientFromConnectionString(pf.StorageConnectionString, pf.StorageContainerName, nil) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
|
||
// // Create the checkpoint store using the blob container client | ||
// checkpointStore, err := azeventhubs.NewBlobCheckpointStore(blobContainerClient, nil) | ||
// // checkpointStore, err := azeventhubs.NewBlobCheckpointStore(blobContainerClient, nil) | ||
// // if err != nil { | ||
// // return nil, err | ||
// // } | ||
|
||
// // Create the processor with checkpointing | ||
// processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
|
||
// return processor, nil | ||
// } | ||
// */ | ||
|
||
// // checkpointing processor should be auth aware | ||
|
||
// func newCheckpointingProcessor(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.Processor, error) { | ||
// blobContainerClient, err := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
// checkpointStore, err := checkpoints.NewBlobStore(blobContainerClient, nil) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
|
||
// consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(eventHubConnectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
|
||
// return azeventhubs.NewProcessor(consumerClient, checkpointStore, nil) | ||
// } | ||
/* | ||
func dispatchPartitionClients(processor *azeventhubs.Processor) { | ||
for { | ||
processorPartitionClient := processor.NextPartitionClient(context.TODO()) | ||
if processorPartitionClient == nil { | ||
break | ||
} | ||
|
||
go func() { | ||
if err := processEventsForPartition(processorPartitionClient); err != nil { | ||
panic(err) | ||
} | ||
}() | ||
} | ||
} | ||
|
||
func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error { | ||
defer shutdownPartitionResources(partitionClient) | ||
if err := initializePartitionResources(partitionClient.PartitionID()); err != nil { | ||
return err | ||
} | ||
|
||
for { | ||
receiveCtx, cancelReceive := context.WithTimeout(context.TODO(), time.Minute) | ||
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil) | ||
cancelReceive() | ||
|
||
if err != nil && !errors.Is(err, context.DeadlineExceeded) { | ||
return err | ||
} | ||
if len(events) == 0 { | ||
continue | ||
} | ||
|
||
if err := processEvents(events, partitionClient); err != nil { | ||
return err | ||
} | ||
|
||
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) { | ||
if err := partitionClient.Close(context.TODO()); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
func initializePartitionResources(partitionID string) error { | ||
fmt.Printf("Initializing resources for partition %s\n", partitionID) | ||
return nil | ||
} | ||
|
||
// This is very much like the old processEvents function | ||
func processEvents(events []*azeventhubs.ReceivedEventData, partitionClient *azeventhubs.ProcessorPartitionClient) error { | ||
for _, event := range events { | ||
|
||
|
||
// fmt.Printf("Processing event: %v\n", event.EventData()) | ||
} | ||
return nil | ||
} | ||
*/ |
36 changes: 36 additions & 0 deletions
36
collector/components/azureeventhubreceiver/azureresourcelogs_unmarshaler.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" | ||
|
||
import ( | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
"go.uber.org/zap" | ||
|
||
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure" | ||
) | ||
|
||
type AzureResourceLogsEventUnmarshaler struct { | ||
unmarshaler *azure.ResourceLogsUnmarshaler | ||
} | ||
|
||
func newAzureResourceLogsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger) eventLogsUnmarshaler { | ||
return AzureResourceLogsEventUnmarshaler{ | ||
unmarshaler: &azure.ResourceLogsUnmarshaler{ | ||
Version: buildInfo.Version, | ||
Logger: logger, | ||
}, | ||
} | ||
} | ||
|
||
// UnmarshalLogs takes a byte array containing a JSON-encoded | ||
// payload with Azure log records and transforms it into | ||
// an OpenTelemetry plog.Logs object. The data in the Azure | ||
// log record appears as fields and attributes in the | ||
// OpenTelemetry representation; the bodies of the | ||
// OpenTelemetry log records are empty. | ||
func (r AzureResourceLogsEventUnmarshaler) UnmarshalLogs(event *azeventhubs.ReceivedEventData) (plog.Logs, error) { | ||
return r.unmarshaler.UnmarshalLogs(event.Body) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this would need to be added