Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update bookmark logic for transactions and order_refunds for older tap version #198

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-shopify/bin/activate
pylint tap_shopify -d missing-docstring,too-many-branches,consider-using-f-string,consider-using-generator,consider-using-dict-items,unnecessary-dunder-call,duplicate-code
pylint tap_shopify -d missing-docstring,too-many-branches,consider-using-f-string,consider-using-generator,consider-using-dict-items,unnecessary-dunder-call,duplicate-code,global-statement
json_validator:
executor: docker-executor
steps:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.12.0
* Update bookmark logic for transactions and order_refunds stream [#198](https://github.com/singer-io/tap-shopify/pull/198)

## 1.11.0
* Deprecate the streams for the older version. [#196](https://github.com/singer-io/tap-shopify/pull/196)
* Deprecated streams - products, inventory_items, metafields (product)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="tap-shopify",
version="1.11.0",
version="1.12.0",
description="Singer.io tap for extracting Shopify data",
author="Stitch",
url="http://github.com/singer-io/tap-shopify",
Expand Down
10 changes: 6 additions & 4 deletions tap_shopify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ def raise_warning():

if SELECTED_DEPRECATED_STREAMS and TODAY_UTC > CUTOFF_DATE:
raise ShopifyDeprecationError(
f"The {SELECTED_DEPRECATED_STREAMS} stream(s) are no longer supported after 31st March 2025. "
"Please upgrade to the latest version of tap-shopify, which supports GraphQL endpoints for these streams."
f"The {SELECTED_DEPRECATED_STREAMS} stream(s) are no longer supported after 31st March"
" 2025. Please upgrade to the latest version of tap-shopify, which supports GraphQL"
"endpoints for these streams."
)

@shopify_error_handling
Expand Down Expand Up @@ -182,8 +183,9 @@ def sync():
SELECTED_DEPRECATED_STREAMS.append(stream_id)
if TODAY_UTC > CUTOFF_DATE:
LOGGER.critical(
f"The {stream_id} stream is no longer supported. "
"Please upgrade to the latest version of tap-shopify, which supports GraphQL endpoints for this stream."
"The %s stream is no longer supported. Please upgrade to the latest "
"version of tap-shopify, which supports GraphQL endpoints for this stream.",
stream_id
)
continue
if stream_id == 'metafields':
Expand Down
2 changes: 1 addition & 1 deletion tap_shopify/streams/metafields.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
from datetime import datetime, timezone
import shopify
import singer
from singer import utils
from datetime import datetime, timezone

from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
Expand Down
14 changes: 7 additions & 7 deletions tap_shopify/streams/order_refunds.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import shopify
from singer.utils import strftime, strptime_to_utc
import singer
from singer.utils import strptime_to_utc
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
shopify_error_handling,
Expand All @@ -10,6 +11,7 @@ class OrderRefunds(Stream):
name = 'order_refunds'
replication_object = shopify.Refund
replication_key = 'created_at'
parent_stream = None

@shopify_error_handling
def get_refunds(self, parent_object, since_id):
Expand All @@ -24,6 +26,7 @@ def get_refunds(self, parent_object, since_id):
def get_objects(self):
selected_parent = Context.stream_objects['orders']()
selected_parent.name = "refund_orders"
self.parent_stream = selected_parent

# Page through all `orders`, bookmarking at `refund_orders`
for parent_object in selected_parent.get_objects():
Expand All @@ -44,7 +47,6 @@ def get_objects(self):

def sync(self):
bookmark = self.get_bookmark()
max_bookmark = bookmark
for refund in self.get_objects():
refund_dict = refund.to_dict()
replication_value = strptime_to_utc(refund_dict[self.replication_key])
Expand All @@ -54,10 +56,8 @@ def sync(self):
canonicalize(transaction_dict, field_name)
yield refund_dict

if replication_value > max_bookmark:
max_bookmark = replication_value

self.update_bookmark(strftime(max_bookmark))

max_bookmark = singer.get_bookmark(
Context.state, self.parent_stream.name, self.parent_stream.replication_key)
self.update_bookmark(max_bookmark)

Context.stream_objects['order_refunds'] = OrderRefunds
12 changes: 6 additions & 6 deletions tap_shopify/streams/transactions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import shopify
import singer
from singer.utils import strftime, strptime_to_utc
from singer.utils import strptime_to_utc
from tap_shopify.context import Context
from tap_shopify.streams.base import (Stream,
shopify_error_handling,
Expand All @@ -17,6 +17,7 @@ class Transactions(Stream):
name = 'transactions'
replication_key = 'created_at'
replication_object = shopify.Transaction
parent_stream = None
# Added decorator over functions of shopify SDK
replication_object.find = shopify_error_handling(replication_object.find)
# Transactions have no updated_at property. Therefore we have
Expand Down Expand Up @@ -59,6 +60,7 @@ def get_objects(self):
# Get transactions, bookmarking at `transaction_orders`
selected_parent = Context.stream_objects['orders']()
selected_parent.name = "transaction_orders"
self.parent_stream = selected_parent

# Page through all `orders`, bookmarking at `transaction_orders`
for parent_object in selected_parent.get_objects():
Expand All @@ -68,7 +70,6 @@ def get_objects(self):

def sync(self):
bookmark = self.get_bookmark()
max_bookmark = bookmark
for transaction in self.get_objects():
transaction_dict = transaction.to_dict()
replication_value = strptime_to_utc(transaction_dict[self.replication_key])
Expand All @@ -77,9 +78,8 @@ def sync(self):
canonicalize(transaction_dict, field_name)
yield transaction_dict

if replication_value > max_bookmark:
max_bookmark = replication_value

self.update_bookmark(strftime(max_bookmark))
max_bookmark = singer.get_bookmark(
Context.state, self.parent_stream.name, self.parent_stream.replication_key)
self.update_bookmark(max_bookmark)

Context.stream_objects['transactions'] = Transactions
4 changes: 2 additions & 2 deletions tests/test_bookmarks_updated.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def bookmarks_test(self, conn_id, testable_streams):
#simulated_states = self.calculated_states_by_stream(first_sync_bookmark)

# We are hardcoding the updated state to ensure that we get atleast 1 record in second sync. These values have been provided after reviewing the max bookmark value for each of the streams
simulated_states = {'products': {'updated_at': '2021-12-20T05:10:05.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2021-09-07T21:18:05.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2021-12-20T17:41:18.000000Z'}}
simulated_states = {'products': {'updated_at': '2024-09-14T03:01:11.000000Z'}, 'collects': {'updated_at': '2021-09-01T09:08:28.000000Z'}, 'abandoned_checkouts': {'updated_at': '2022-02-02T16:00:00.000000Z'}, 'inventory_levels': {'updated_at': '2021-12-20T05:09:34.000000Z'}, 'locations': {'updated_at': '2021-07-20T09:00:22.000000Z'}, 'events': {'created_at': '2021-12-20T05:09:01.000000Z'}, 'inventory_items': {'updated_at': '2021-09-15T19:44:11.000000Z'}, 'transactions': {'created_at': '2021-12-20T00:08:52-05:00'}, 'metafields': {'updated_at': '2025-01-21T13:28:24.000000Z'}, 'order_refunds': {'created_at': '2021-05-01T17:41:18.000000Z'}, 'customers': {'updated_at': '2021-12-20T05:08:17.000000Z'}, 'orders': {'updated_at': '2021-12-20T05:09:01.000000Z'}, 'custom_collections': {'updated_at': '2024-12-13T08:42:56.000000Z'}}

for stream, updated_state in simulated_states.items():
new_state['bookmarks'][stream] = updated_state
Expand Down Expand Up @@ -148,7 +148,7 @@ def bookmarks_test(self, conn_id, testable_streams):
for record in second_sync_messages:
replication_key_value = record.get(replication_key)
# verify the 2nd sync replication key value is greater or equal to the 1st sync bookmarks
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark")
self.assertGreaterEqual(self.convert_state_to_utc(replication_key_value), simulated_bookmark_value, msg="Second sync records do not respect the previous bookmark")
# verify the 2nd sync bookmark value is the max replication key value for a given stream
self.assertLessEqual(replication_key_value, second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater replication key value was synced")

Expand Down
30 changes: 9 additions & 21 deletions tests/test_interrupted_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ def test_run(self):
completed_streams = stream_groups.get('completed')
yet_to_be_synced_streams = stream_groups.get('yet_to_be_synced')

base_state = {'bookmarks':
{'currently_sync_stream': currently_syncing_stream,
'customers': {'updated_at': '2023-03-28T18:53:28.000000Z'},
'events': {'created_at': '2023-01-22T05:05:53.000000Z'},
'metafields': {'updated_at': '2023-01-07T21:18:05.000000Z'},
'orders': {'updated_at': '2023-01-22T05:07:44.000000Z'},
'products': {'updated_at': '2023-01-22T05:05:56.000000Z'},
'transactions': {'created_at': '2022-06-26T00:06:38-04:00'}
}}
base_state = {"bookmarks": {"currently_sync_stream": currently_syncing_stream,
"customers": first_sync_state.get("bookmarks").get("customers"),
"events": first_sync_state.get("bookmarks").get("events"),
"metafields": first_sync_state.get("bookmarks").get("metafields"),
"orders": first_sync_state.get("bookmarks").get("orders"),
"products": first_sync_state.get("bookmarks").get("products"),
"transactions": first_sync_state.get("bookmarks").get("transactions"),}}

# remove yet to be synced streams from base state and then set new state
new_state = {
Expand Down Expand Up @@ -196,18 +194,8 @@ def test_run(self):
self.assertIsNotNone(resuming_bookmark_value)
self.assertTrue(self.is_expected_date_format(resuming_bookmark_value))

# verify the resuming bookmark is greater than 1st sync bookmark
# This is the expected behaviour for shopify as they are using date windowing
# TDL-17096 : Resuming bookmark value is getting assigned from execution time
# rather than the actual bookmark time for some streams.
# TODO transactions stream has equal bookmarks, orders stream has shown both equal
# and greater than bookmark behavior, confirm if this is correct
if stream == 'transactions':
self.assertEqual(resuming_bookmark_value, first_bookmark_value)
elif stream == 'orders':
self.assertGreaterEqual(resuming_bookmark_value, first_bookmark_value)
else:
self.assertGreater(resuming_bookmark_value, first_bookmark_value)
# verify the resuming bookmark is greater or equal than 1st sync bookmark
self.assertGreaterEqual(resuming_bookmark_value, first_bookmark_value)

# verify oldest record from resuming sync respects bookmark from previous sync
if stream in new_state['bookmarks'].keys() and resuming_sync_messages:
Expand Down