Skip to content

Commit 4ac9552

Browse files
cmm-airbytecristina.mariscal
andauthored
Salesforce refactor: add CheckpointMixin for state management (#39517)
Co-authored-by: cristina.mariscal <[email protected]>
1 parent 94234e5 commit 4ac9552

File tree

4 files changed

+15
-5
lines changed

4 files changed

+15
-5
lines changed

airbyte-integrations/connectors/source-salesforce/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: b117307c-14b6-41aa-9422-947e34922962
13-
dockerImageTag: 2.5.14
13+
dockerImageTag: 2.5.15
1414
dockerRepository: airbyte/source-salesforce
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
1616
githubIssueLabel: source-salesforce

airbyte-integrations/connectors/source-salesforce/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "2.5.14"
6+
version = "2.5.15"
77
name = "source-salesforce"
88
description = "Source implementation for Salesforce."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
2222
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
2323
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import IsoMillisConcurrentStreamStateConverter
24-
from airbyte_cdk.sources.streams.core import Stream, StreamData
24+
from airbyte_cdk.sources.streams.core import CheckpointMixin, Stream, StreamData
2525
from airbyte_cdk.sources.streams.http import HttpClient, HttpStream, HttpSubStream
2626
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
2727
from airbyte_cdk.utils import AirbyteTracedException
@@ -695,7 +695,7 @@ def transform_empty_string_to_none(instance: Any, schema: Any):
695695
return instance
696696

697697

698-
class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
698+
class IncrementalRestSalesforceStream(RestSalesforceStream, CheckpointMixin, ABC):
699699
state_checkpoint_interval = 500
700700
_slice = None
701701

@@ -704,6 +704,7 @@ def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwar
704704
self.replication_key = replication_key
705705
self._stream_slice_step = stream_slice_step
706706
self._stream_slicer_cursor = None
707+
self._state = {}
707708

708709
def set_cursor(self, cursor: Cursor) -> None:
709710
self._stream_slicer_cursor = cursor
@@ -764,7 +765,15 @@ def request_params(
764765
def cursor_field(self) -> str:
765766
return self.replication_key
766767

767-
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
768+
@property
769+
def state(self):
770+
return self._state
771+
772+
@state.setter
773+
def state(self, value):
774+
self._state = value
775+
776+
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
768777
"""
769778
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state
770779
object and returning an updated state object. Check if latest record is IN stream slice interval => ignore if not

docs/integrations/sources/salesforce.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ Now that you have set up the Salesforce source connector, check out the followin
195195

196196
| Version | Date | Pull Request | Subject |
197197
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
198+
| 2.5.15 | 2024-06-16 | [39517](https://github.com/airbytehq/airbyte/pull/39517) | Salesforce refactor: add CheckpointMixin for state management |
198199
| 2.5.14 | 2024-06-06 | [39269](https://github.com/airbytehq/airbyte/pull/39269) | [autopull] Upgrade base image to v1.2.2 |
199200
| 2.5.13 | 2024-05-23 | [38563](https://github.com/airbytehq/airbyte/pull/38563) | Use HttpClient to perform HTTP requests for bulk, authentication and schema discovery |
200201
| 2.5.12 | 2024-05-16 | [38255](https://github.com/airbytehq/airbyte/pull/38255) | Replace AirbyteLogger with logging.Logger |

0 commit comments

Comments
 (0)