Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Function Python v2 #762

Merged
merged 12 commits into from
Jan 6, 2025
136 changes: 136 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/cratedb_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import time
import logging
from crate import client
from crate.client.exceptions import ProgrammingError, IntegrityError


class CrateDBWriter:
"""
The CrateWriter class is used to insert enriched and raw data in to CrateDB
"""

CONNECTION_POOL_SIZE = 10

def __init__(self, tables, host, db_user, db_pass):
self._conn = None
self._cursor = None
self._tables = tables
self._host = host
self._db_user = db_user
self._db_pass = db_pass
self._failed = []

def insert_values(self, value_cache):
self._connect()

if len(value_cache.readings) > 0:
self._insert_operation(
value_cache.readings,
self._tables["readings"],
)
self._move_reading_to_error(value_cache)

if len(value_cache.errors) > 0:
self._insert_operation(
value_cache.errors,
self._tables["errors"],
)

@staticmethod
def _insert_ts():
return int(time.time() * 1000)

def _connect(self):
if (
self._cursor is None
or (self._cursor and getattr(self._cursor, "_closed", True))
or self._conn is None
or (self._conn and getattr(self._conn, "_closed", True))
):
self._conn = client.connect(
self._host,
username=self._db_user,
password=self._db_pass,
pool_size=self.CONNECTION_POOL_SIZE,
)
self._cursor = self._conn.cursor()

def _insert_operation(self, value_list, table_name):
if self._cursor is None:
return
try:
stmt, parameters = self._prepare_insert_stmt(
value_list, table_name, (0, len(value_list))
)
result = self._cursor.executemany(stmt, parameters)
except (ProgrammingError, IntegrityError) as e:
for item in value_list:
self._add_item_to_failed(
str(e), stmt, parameters, type(e).__name__, table_name, item
)
return

for i, row in enumerate(result):
if row["rowcount"] == -2:
stmt, parameters = self._prepare_insert_stmt(
value_list, table_name, (i, i + 1)
)
try:
self._cursor.executemany(stmt, parameters)
# IntegrityError is raised in case of PK violation (e.g. duplicated PK)
except (ProgrammingError, IntegrityError) as e:
self._add_item_to_failed(
str(e),
stmt,
parameters,
type(e).__name__,
table_name,
value_list[i],
)

def _add_item_to_failed(
self, error, stmt, parameters, error_type, table_name, payload
):
logging.warning(f"error: {error} -- stmt: {stmt} -- parameters: {parameters}")
self._failed.append(
{
"type": table_name,
"error": error,
"error_type": error_type,
"payload": payload,
}
)

def _move_reading_to_error(self, value_cache):
for element in self._failed:
value_cache.add_error(
element["payload"], element["error"], element["error_type"]
)

def _prepare_insert_stmt(self, value_list, table_name, iteration_range):
stmt = f"INSERT INTO {table_name} (insert_ts, "
parameters = "?, "
parameter_list = []
keys = value_list[0].keys()

for key in keys:
stmt += f"{key}, "
parameters += "?, "
stmt = stmt.rstrip(", ")
parameters = parameters.rstrip(", ")

stmt += f") VALUES ({parameters})"

for i in range(iteration_range[0], iteration_range[1]):
parameter_entry = [self._insert_ts()]
parameter_entry.extend(self._add_entries(value_list, keys, i))
parameter_list.append(tuple(parameter_entry))

return stmt, parameter_list

@staticmethod
def _add_entries(values, keys, index):
entries = []
for key in keys:
entries.append(values[index][key])
return entries
69 changes: 69 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/enrichment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging

KEY_MAPPING = TARGET_MAP = {
"ts": "reading_ts",
"time": "reading_ts",
"current_ts": "reading_ts",
"timestamp": "reading_ts",
"id": "sensor_id",
"loc": "location",
}


def transform(raw_payload, value_cache):
"""
This function takes a single event and transform it, checking for errors.
The result is saved in the value_cache variable.

Args:
raw_payload: event from an Event Hub
value_cache: ValueCache object to transfer values to the database writer
"""
if raw_payload is None:
return

try:
event_t = transform_payload(raw_payload)
location = event_t.get("location")
sensor_id = event_t.get("sensor_id")
timestamp = event_t.get("reading_ts")
payload = {
"temperature": event_t.get("temperature"),
"humidity": event_t.get("humidity"),
"light": event_t.get("light"),
}

value_cache.add_reading(payload, location, timestamp, sensor_id)

except (ValueError, KeyError) as e:
logging.info(f"enrichment error: {e}" f"-- payload: {raw_payload}")
value_cache.add_error(raw_payload, str(e), type(e).__name__)


def transform_payload(event):
# remove empty keys
event = remove_empty_keys(event)
# change column names
event = rename_keys(event)
# check for sensor_id, timestamp, location keys
check_fields(event)
return event


def remove_empty_keys(event):
if "" in event:
value = event.pop("")
return event


def rename_keys(event):
for key in list(event.keys()):
if key in KEY_MAPPING.keys():
event[KEY_MAPPING[key]] = event.pop(key)

return event


def check_fields(event):
if not event.keys() >= {"location", "sensor_id", "reading_ts"}:
raise KeyError("missing key in payload")
46 changes: 46 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/function_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import sys
import json
import logging

import azure.functions as func

from enrichment import transform
from cratedb_writer import CrateDBWriter
from value_cache import ValueCache


app = func.FunctionApp()


@app.event_hub_message_trigger(
arg_name="event",
event_hub_name="demo-event-ce",
connection="EVENT_HUB_CONNECTION_STRING",
)
def enrich_events(event: func.EventHubEvent):
crate_db = CrateDBWriter(
{
"readings": os.getenv("READING_TABLE"),
"errors": os.getenv("ERROR_TABLE"),
},
os.getenv("HOST"),
os.getenv("DB_USER", None),
os.getenv("DB_PASSWORD", None),
)

try:
if event is None:
return
insert_value_cache = ValueCache()
raw_events = json.loads(event.get_body().decode("utf-8"))

for event_ in raw_events:
raw_event = event_
transform(raw_event, insert_value_cache)

crate_db.insert_values(insert_value_cache)
except Exception as e:
# when any exception occurred, the function must exit unsuccessfully for events to be retried
logging.error(f"error: {e}")
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues

azure-functions
azure-eventhub
crate==1.0.1
27 changes: 27 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/value_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
class ValueCache:
"""
ValueCache class is used to structure enriched data for insert.
"""

def __init__(self):
self.errors = []
self.readings = []

def add_error(self, payload, message, type):
self.errors.append(
{
"payload": payload,
"error": {"message": message, "type": type},
"type": type,
}
)

def add_reading(self, payload, location, timestamp, sensor_id):
self.readings.append(
{
"location": location,
"sensor_id": sensor_id,
"reading_ts": timestamp,
"reading": payload,
}
)
Loading