Skip to content

Commit

Permalink
flattened records
Browse files Browse the repository at this point in the history
  • Loading branch information
jlloyd-widen committed Mar 7, 2024
1 parent 31d48ca commit 9435092
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 37 deletions.
83 changes: 83 additions & 0 deletions tap_clari/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,86 @@ def get_url_params(
# "currency": "USD",
"exportFormat": "JSON",
}

def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""Append or transform raw data to match expected structure.
Args:
row: Individual record in the stream.
context: Stream partition or context dictionary.
Returns:
The resulting record dict, or `None` if the record should be excluded.
"""
return flatten_record(row)


def get_list_item_values(source_list: list, target_keys: list[str], search_pair: dict) -> dict:
"""Return target items from a dict with a specific key value pair from an array of dicts."""
if len(search_pair) > 1:
raise ValueError("pair must be a dictionary with a single key value pair")
for search_key, search_value in search_pair.items():
target_dict = next(i for i in source_list if i[search_key] == search_value)
return {k: v for k, v in target_dict.items() if k in target_keys}


def flatten_record(row: dict) -> dict:
"""Flatten a nested dictionary."""
entries = row.get("entries", [])
fields = row.get("fields", [])
time_frames = row.get("timeFrames", [])
time_periods = row.get("timePeriods", [])
users = row.get("users", [])

new_entries = []
for entry in entries:
field = get_list_item_values(
fields,
["fieldName"],
{"fieldId": entry["fieldId"]}
)
time_frame = get_list_item_values(
time_frames,
["startDate", "endDate"],
{"timeFrameId": entry["timeFrameId"]}
)
time_period = get_list_item_values(
time_periods,
["type", "label", "year", "startDate", "endDate", "crmId"],
{"timePeriodId": entry["timePeriodId"]}
)
user = get_list_item_values(
users,
[
"name",
"email",
"scopeId",
"crmId",
"hierarchyId",
"hierarchyName",
"parentHierarchyId",
"parentHierarchyName",
],
{"userId": entry["userId"]}
)

# prevent key clashes
time_period["timePeriodStartDate"] = time_period.pop("startDate")
time_period["timePeriodEndDate"] = time_period.pop("endDate")
time_period["timePeriodCrmId"] = time_period.pop("crmId")
time_period["timePeriodType"] = time_period.pop("type")
time_period["timePeriodLabel"] = time_period.pop("label")
time_frame["timeFrameStartDate"] = time_frame.pop("startDate")
time_frame["timeFrameEndDate"] = time_frame.pop("endDate")
user["userCrmId"] = user.pop("crmId")
user["userName"] = user.pop("name")
user["userEmail"] = user.pop("email")

# merge dictionaries
new_entries.append({**entry, **field, **time_frame, **time_period, **user})

return {"entries": new_entries}
40 changes: 16 additions & 24 deletions tap_clari/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def __init__(self, tap, forecast_id: str):
)
self.forecast_id = forecast_id

primary_keys: t.ClassVar[list[str]] = ["timeFrames", "timePeriods"]
primary_keys: t.ClassVar[list[str]] = [
"fieldId",
"timeFrameId",
"timePeriodId",
"userId",
]
replication_key = None
schema = th.PropertiesList(
th.Property("entries", th.ArrayType(th.ObjectType(
Expand All @@ -29,35 +34,22 @@ def __init__(self, tap, forecast_id: str):
th.Property("timeFrameId", th.StringType),
th.Property("timePeriodId", th.StringType),
th.Property("userId", th.StringType),
))),
th.Property("fields", th.ArrayType(th.ObjectType(
th.Property("fieldId", th.StringType),
th.Property("fieldName", th.StringType),
th.Property("fieldType", th.StringType),
))),
th.Property("timeFrames", th.ArrayType(th.ObjectType(
th.Property("endDate", th.DateType),
th.Property("startDate", th.DateType),
th.Property("timeFrameId", th.StringType),
))),
th.Property("timePeriods", th.ArrayType(th.ObjectType(
th.Property("crmId", th.StringType),
th.Property("endDate", th.DateType),
th.Property("label", th.StringType),
th.Property("startDate", th.DateType),
th.Property("timePeriodId", th.StringType),
th.Property("type", th.StringType),
th.Property("timeFrameEndDate", th.DateType),
th.Property("timeFrameStartDate", th.DateType),
th.Property("timePeriodCrmId", th.StringType),
th.Property("timePeriodEndDate", th.DateType),
th.Property("timePeriodLabel", th.StringType),
th.Property("timePeriodStartDate", th.DateType),
th.Property("timePeriodType", th.StringType),
th.Property("year", th.StringType),
))),
th.Property("users", th.ArrayType(th.ObjectType(
th.Property("crmId", th.StringType),
th.Property("email", th.EmailType),
th.Property("userCrmId", th.StringType),
th.Property("userEmail", th.EmailType),
th.Property("hierarchyId", th.StringType),
th.Property("hierarchyName", th.StringType),
th.Property("name", th.StringType),
th.Property("userName", th.StringType),
th.Property("parentHierarchyId", th.StringType),
th.Property("parentHierarchyName", th.StringType),
th.Property("scopeId", th.StringType),
th.Property("userId", th.StringType),
))),
).to_dict()
159 changes: 146 additions & 13 deletions tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,155 @@
"""Tests standard tap features using the built-in SDK tests library."""

import datetime
from tap_clari.client import get_list_item_values, flatten_record

from singer_sdk.testing import get_tap_test_class
# SAMPLE_CONFIG = {
# "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"),
# # TODO: Initialize minimal tap config
# }
#
#
# # Run standard built-in tap tests from the SDK:
# TestTapClari = get_tap_test_class(
# tap_class=TapClari,
# config=SAMPLE_CONFIG,
# )

from tap_clari.tap import TapClari

SAMPLE_CONFIG = {
"start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d"),
# TODO: Initialize minimal tap config
}
sl = [
{"id": 1, "name": "one", "other": "foo"},
{"id": 2, "name": "two", "other": "bar"},
{"id": 3, "name": "three", "other": "baz"},
]

def test_get_list_item_values_multiple():
"""Test the get_list_item_values function."""
res = get_list_item_values(sl, ["name", "other"], {"id": 1})
assert res == {"name": "one", "other": "foo"}

# Run standard built-in tap tests from the SDK:
TestTapClari = get_tap_test_class(
tap_class=TapClari,
config=SAMPLE_CONFIG,
)

def test_get_list_item_values_one():
"""Test the get_list_item_values function."""
res = get_list_item_values(sl, ["name"], {"id": 1})
assert res == {"name": "one"}

# TODO: Create additional tests as appropriate for your tap.

def test_get_list_item_values_raise():
"""Test that oversized search_pair raise an ValueError."""
try:
get_list_item_values(sl, ["name"], {"id": 1, "other": "foo"})
except ValueError:
pass
else:
assert False, "Expected ValueError"


def test_flatten_record():
"""Test the flatten_record function."""
record = {
"entries": [
{'fieldId': 'field_id_1',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1'},
{'fieldId': 'field_id_2',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1'}
],
'users': [{'crmId': 'crm_id_1',
'email': '[email protected]',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'name': 'name_1',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
'scopeId': '{"type":"blah"}',
'userId': 'user_id_1'},
{'crmId': 'crm_id_2',
'email': '[email protected]',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'name': 'name_2',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
'scopeId': '{"type":"blah"}',
'userId': 'user_id_2'},
],
'timePeriods': [{'crmId': 'crm_id_3',
'endDate': '2024-03-31',
'label': 'Q1',
'startDate': '2024-01-01',
'timePeriodId': '2024_Q1',
'type': 'quarter',
'year': '2024'}],
'fields': [
{'fieldId': 'field_id_2',
'fieldName': 'field_name_2',
'fieldType': 'quota'},
{'fieldId': 'field_id_3',
'fieldName': 'field_name_3',
'fieldType': 'bar'},
{'fieldId': 'field_id_1',
'fieldName': 'field_name_1',
'fieldType': 'bar'},
],
'timeFrames': [{'endDate': '2024-03-07',
'startDate': '2024-03-01',
'timeFrameId': 'TF:2024-03-01'}],

}
res = flatten_record(record)
assert res == {
"entries": [
{
'fieldId': 'field_id_1',
'fieldName': 'field_name_1',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timeFrameStartDate': '2024-03-01',
'timeFrameEndDate': '2024-03-07',
'timePeriodCrmId': 'crm_id_3',
'timePeriodType': 'quarter',
'timePeriodLabel': 'Q1',
'year': '2024',
'timePeriodStartDate': '2024-01-01',
'timePeriodEndDate': '2024-03-31',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1',
'userName': 'name_1',
'userEmail': '[email protected]',
'scopeId': '{"type":"blah"}',
'userCrmId': 'crm_id_1',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
},
{
'fieldId': 'field_id_2',
'fieldName': 'field_name_2',
'quotaValue': None,
'timeFrameId': 'TF:2024-03-01',
'timeFrameStartDate': '2024-03-01',
'timeFrameEndDate': '2024-03-07',
'timePeriodCrmId': 'crm_id_3',
'timePeriodType': 'quarter',
'timePeriodLabel': 'Q1',
'year': '2024',
'timePeriodStartDate': '2024-01-01',
'timePeriodEndDate': '2024-03-31',
'timePeriodId': '2024_Q1',
'userId': 'user_id_1',
'userName': 'name_1',
'userEmail': '[email protected]',
'scopeId': '{"type":"blah"}',
'userCrmId': 'crm_id_1',
'hierarchyId': 'bar',
'hierarchyName': 'spam',
'parentHierarchyId': 'foo',
'parentHierarchyName': 'eggs',
},
]
}

0 comments on commit 9435092

Please sign in to comment.