Skip to content

Commit

Permalink
add: classes for s3, timescaledb, timezone_shift added
Browse files Browse the repository at this point in the history
  • Loading branch information
jakobgabriel committed Oct 28, 2024
1 parent b824d45 commit 2fadb69
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 51 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python

# Inputs #
inputs/

### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
72 changes: 71 additions & 1 deletion src/timeseries_shaper/loader/metadata/metadata_api_loader.py
Original file line number Diff line number Diff line change
@@ -1 +1,71 @@
# TODO: Metadata API Loader
import requests
import pandas as pd
import json
from typing import List, Dict


class DatapointAPI:
"""
Class for accessing datapoints via an API.
"""

def __init__(self, device_name: str, base_url: str, api_token: str, output_path: str = "data_points.json"):
self.device_name = device_name
self.base_url = base_url
self.api_token = api_token
self.output_path = output_path
self.uuids: List[str] = []
self.metadata: pd.DataFrame = pd.DataFrame([])
self._api_access()

def _api_access(self) -> None:
"""Connect to the API and retrieve metadata for the specified device."""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_token}",
}

metadata = []
devices_found = []

for datatron in requests.get(f"{self.base_url}", headers=headers).json():
for device in requests.get(f"{self.base_url}/{datatron['id']}/devices", headers=headers).json():
if device["name"] == self.device_name:
datapoints = requests.get(
f"{self.base_url}/{datatron['id']}/devices/{device['id']}/data_points",
headers=headers,
).json()
metadata += datapoints
devices_found.append(device["name"])
if devices_found:
break
if devices_found:
break

self.metadata = pd.DataFrame(metadata)
if not self.metadata.empty:
self.metadata = self.metadata[self.metadata["enabled"] == True][["uuid", "label", "config"]]
data_points = self.metadata.to_dict(orient="records")
self._export_json(data_points)
self.uuids = [data["uuid"] for data in data_points]

def _export_json(self, data_points: List[Dict[str, str]]) -> None:
"""Export data points to a JSON file."""
with open(self.output_path, 'w') as f:
json.dump(data_points, f, indent=2)

def get_uuids(self) -> List[str]:
"""Return the list of UUIDs."""
return self.uuids

def get_full_config(self) -> List[Dict[str, str]]:
"""Return the full configuration (uuid, label, config) as a list of dictionaries."""
return self.metadata.to_dict(orient="records")

def get_uuid_label_pairs(self) -> List[Dict[str, str]]:
"""Return a list of uuid and label pairs."""
return self.metadata[['uuid', 'label']].to_dict(orient='records')

def display_dataframe(self) -> None:
"""Print the metadata DataFrame to visually inspect data points."""
print(self.metadata)
66 changes: 66 additions & 0 deletions src/timeseries_shaper/loader/metadata/metadata_db_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pandas as pd
import psycopg2
import json
from typing import List, Dict


class DatapointDB:
"""
Class for accessing datapoints via a database.
"""

def __init__(self, device_name: str, db_user: str, db_pass: str, db_host: str, output_path: str = "data_points.json"):
self.device_name = device_name
self.db_user = db_user
self.db_pass = db_pass
self.db_host = db_host
self.output_path = output_path
self.uuids: List[str] = []
self.metadata: pd.DataFrame = pd.DataFrame([])
self._db_access()

def _db_access(self) -> None:
"""Connect to the database and retrieve metadata for the specified device."""
conn = psycopg2.connect(
dbname="config_repository",
user=self.db_user,
password=self.db_pass,
host=self.db_host,
port=5432
)
cursor = conn.cursor()

cursor.execute(f"""
SELECT dp.uuid, dp.label, dp.config
FROM data_points dp
INNER JOIN devices dev ON dev.id = dp.device_id
WHERE dp.enabled = true AND dp.archived = false AND dev.name = %s
""", (self.device_name,))

data_points = [{"uuid": r[0], "label": r[1], "config": r[2]} for r in cursor.fetchall()]
conn.close()

self.metadata = pd.DataFrame(data_points)
self._export_json(data_points)
self.uuids = [data["uuid"] for data in data_points]

def _export_json(self, data_points: List[Dict[str, str]]) -> None:
"""Export data points to a JSON file."""
with open(self.output_path, 'w') as f:
json.dump(data_points, f, indent=2)

def get_uuids(self) -> List[str]:
"""Return the list of UUIDs."""
return self.uuids

def get_full_config(self) -> List[Dict[str, str]]:
"""Return the full configuration (uuid, label, config) as a list of dictionaries."""
return self.metadata.to_dict(orient="records")

def get_uuid_label_pairs(self) -> List[Dict[str, str]]:
"""Return a list of uuid and label pairs."""
return self.metadata[['uuid', 'label']].to_dict(orient='records')

def display_dataframe(self) -> None:
"""Print the metadata DataFrame to visually inspect data points."""
print(self.metadata)
49 changes: 0 additions & 49 deletions src/timeseries_shaper/loader/metadata/metadata_json_loader.py

This file was deleted.

151 changes: 150 additions & 1 deletion src/timeseries_shaper/loader/timeseries/s3proxy_parquet_loader.py
Original file line number Diff line number Diff line change
@@ -1 +1,150 @@
# TODO: s3proxy for timeseries loader
from pathlib import Path
import pandas as pd
import s3fs
from datetime import datetime
from sqlalchemy import create_engine
from typing import List, Dict

class S3ProxyDataAccess:
"""
A class to access timeseries data via an S3 proxy. This class retrieves
data for specified UUIDs within a defined time range, with the option to
output data as Parquet files or as a single combined DataFrame.
"""

def __init__(self, start_timestamp: str, end_timestamp: str, uuids: List[str], s3_config: Dict[str, str]):
"""
Initialize the S3ProxyDataAccess object.
:param start_timestamp: Start timestamp in "Year-Month-Day Hour:Minute:Second" format.
:param end_timestamp: End timestamp in "Year-Month-Day Hour:Minute:Second" format.
:param uuids: List of UUIDs to retrieve data for.
:param s3_config: Configuration dictionary for S3 connection.
"""
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.uuids = uuids
self.s3_config = s3_config

# Establish connection to S3 using provided configuration
self.s3 = s3fs.S3FileSystem(
endpoint_url=s3_config["endpoint_url"],
key=s3_config["key"],
secret=s3_config["secret"],
use_ssl=s3_config["use_ssl"],
version_aware=s3_config["version_aware"]
)
self.s3_path_base = s3_config["s3_path_base"]

def _generate_timeslot_paths(self):
"""
Generates a sequence of time-based directory paths for each hour in the range
between start_timestamp and end_timestamp.
:return: A generator yielding paths in the format year/month/day/hour.
"""
for timeslot in pd.date_range(start=self.start_timestamp, end=self.end_timestamp, freq="h"):
timeslot_dir = Path(str(timeslot.year), str(timeslot.month).zfill(2), str(timeslot.day).zfill(2), str(timeslot.hour).zfill(2))
yield timeslot_dir

def _fetch_parquet(self, uuid: str, timeslot_dir: Path):
"""
Fetches a Parquet file from S3 for a specific UUID and time slot.
:param uuid: The UUID for which data is being retrieved.
:param timeslot_dir: Directory path for the specific time slot.
:return: DataFrame if the file is found, else None.
"""
s3_path = f"{self.s3_path_base}{timeslot_dir}/{uuid}.parquet"
try:
with self.s3.open(s3_path, "rb") as remote_file:
return pd.read_parquet(remote_file)
except FileNotFoundError:
print(f"Data for UUID {uuid} at {timeslot_dir} not found.")
return None

def fetch_data_as_parquet(self, output_dir: str):
"""
Retrieves timeseries data from S3 and saves it as Parquet files.
Each file is saved in a directory structure of UUID/year/month/day/hour.
:param output_dir: Base directory to save the Parquet files.
"""
for timeslot_dir in self._generate_timeslot_paths():
for uuid in set(self.uuids):
df = self._fetch_parquet(uuid, timeslot_dir)
if df is not None:
output_path = Path(output_dir, f"{uuid}/{timeslot_dir}")
output_path.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_path / f"{uuid}.parquet")

def fetch_data_as_dataframe(self) -> pd.DataFrame:
"""
Retrieves timeseries data from S3 and returns it as a single DataFrame.
:return: A combined DataFrame with data for all specified UUIDs and time slots.
"""
data_frames = [self._fetch_parquet(uuid, timeslot_dir)
for timeslot_dir in self._generate_timeslot_paths()
for uuid in set(self.uuids)]
return pd.concat([df for df in data_frames if df is not None], ignore_index=True) if data_frames else pd.DataFrame()


class TimescaleDBDataAccess:
"""
A class to access timeseries data from a TimescaleDB database. This class
retrieves data for specified UUIDs within a defined time range, with the
option to output data as Parquet files or as a single combined DataFrame.
"""

def __init__(self, start_timestamp: str, end_timestamp: str, uuids: List[str], db_config: Dict[str, str]):
"""
Initialize the TimescaleDBDataAccess object.
:param start_timestamp: Start timestamp in "Year-Month-Day Hour:Minute:Second" format.
:param end_timestamp: End timestamp in "Year-Month-Day Hour:Minute:Second" format.
:param uuids: List of UUIDs to retrieve data for.
:param db_config: Configuration dictionary for database connection.
"""
self.start_timestamp = start_timestamp
self.end_timestamp = end_timestamp
self.uuids = uuids
self.db_config = db_config

# Establish database connection engine using SQLAlchemy
self.engine = create_engine(
f'postgresql+psycopg2://{db_config["db_user"]}:{db_config["db_pass"]}@{db_config["db_host"]}/{db_config["db_name"]}'
)

def _fetch_data(self, uuid: str) -> pd.DataFrame:
"""
Executes an SQL query to fetch timeseries data for a specific UUID from TimescaleDB.
:param uuid: The UUID for which data is being retrieved.
:return: A DataFrame with timeseries data for the UUID.
"""
query = f"""
SELECT uuid::text, systime, value_integer, value_string, value_double, value_bool, is_delta
FROM telemetry
WHERE uuid = '{uuid}'
AND systime BETWEEN '{self.start_timestamp}' AND '{self.end_timestamp}'
ORDER BY systime ASC
"""
return pd.read_sql(query, self.engine, chunksize=10000)

def fetch_data_as_parquet(self, output_dir: str):
"""
Retrieves timeseries data from TimescaleDB and saves it as Parquet files.
Each file is saved in a directory structure of UUID/year/month/day/hour.
:param output_dir: Base directory to save the Parquet files.
"""
for uuid in self.uuids:
for chunk in self._fetch_data(uuid):
if not chunk.empty:
for _, row in chunk.iterrows():
systime = row['systime']
timeslot_dir = Path(str(systime.year), str(systime.month).zfill(2), str(systime.day).zfill(2), str(systime.hour).zfill(2))
output_path = Path(output_dir, f"{uuid}/{timeslot_dir}")
output_path.mkdir(parents=True, exist_ok=True)
row.to_frame().T.to_parquet(output_path / f"{uuid}.parquet", index=False)

def fetch_data_as_dataframe(self) -> pd.DataFrame:
"""
Retrieves timeseries data from TimescaleDB and returns it as a single DataFrame.
:return: A combined DataFrame with data for all specified UUIDs within the time range.
"""
df_list = [chunk for uuid in self.uuids for chunk in self._fetch_data(uuid)]
return pd.concat(df_list, ignore_index=True) if df_list else pd.DataFrame()
Empty file.
Loading

0 comments on commit 2fadb69

Please sign in to comment.