diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 090bae730..eefefc3bd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -76,28 +76,3 @@ publish_to_pypi: else echo "Skipped. (Running in fork.)" fi - - | - echo "Waiting for PyPi availability..." - if [[ "$CI_PROJECT_NAMESPACE" == "meltano" ]] - then - pwd - ls -la - export VER=$(poetry version --short) - export PIPERR=$(pip install tapdance==$VER 2>&1) - echo "Checking for PyPi availability of version $VER" - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not yet found..."; sleep 30; } fi; - export PIPERR=$(pip install tapdance==$VER 2>&1) - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not yet found..."; sleep 30; } fi; - export PIPERR=$(pip install tapdance==$VER 2>&1) - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not yet found..."; sleep 30; } fi; - export PIPERR=$(pip install tapdance==$VER 2>&1) - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not yet found..."; sleep 30; } fi; - export PIPERR=$(pip install tapdance==$VER 2>&1) - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not yet found..."; sleep 30; } fi; - export PIPERR=$(pip install tapdance==$VER 2>&1) - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not yet found..."; sleep 30; } fi; - export PIPERR=$(pip install tapdance==$VER 2>&1) - if [[ $PIPERR == *"$VER"* ]]; then { echo "Yes"; } else { echo "Not found. Giving up. Last message from PyPi was $PIPERR"; exit 1; } fi; - else - echo "Skipped. (Running in fork.)" - fi diff --git a/LICENSE b/LICENSE index a07811ae5..564273905 100644 --- a/LICENSE +++ b/LICENSE @@ -186,7 +186,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2020 AJ + Copyright 2021 Meltano Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/README.md b/README.md index 5ac3d5ca2..3e5f78cb9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,4 @@ -# `singer-sdk` - an open framework for building Singer-compliant taps - -- _Note: This framework is still in early development and planning phases_ +# `singer-sdk` - a framework for building Singer taps ## Strategies for Optimized Tap Development diff --git a/cookiecutter/tap-template/README.md b/cookiecutter/tap-template/README.md index 80bcd921b..e769380a5 100644 --- a/cookiecutter/tap-template/README.md +++ b/cookiecutter/tap-template/README.md @@ -1,10 +1,3 @@ -_**NOTE:** This framework is still in early exploration and development phases. For more -information and to be updated on this project, please feel free to subscribe to our -[original Meltano thread](https://gitlab.com/meltano/meltano/-/issues/2401) and the -[initial PR for the underlying framework](https://gitlab.com/meltano/tap-base/-/merge_requests/1)._ - --------------------------------- - # Singer SDK Tap Template To use this cookie cutter template: diff --git a/cookiecutter/tap-template/cookiecutter.json b/cookiecutter/tap-template/cookiecutter.json index ba6c48fdf..8c7926092 100644 --- a/cookiecutter/tap-template/cookiecutter.json +++ b/cookiecutter/tap-template/cookiecutter.json @@ -3,7 +3,6 @@ "tap_id": "tap-{{ cookiecutter.source_name.lower() }}", "library_name": "{{ cookiecutter.tap_id.replace('-', '_') }}", "stream_type": [ - "Database", "REST", "GraphQL", "Other" diff --git a/cookiecutter/tap-template/cookiecutter.tests.yml b/cookiecutter/tap-template/cookiecutter.tests.yml index 7c6495b73..4554fe144 100644 --- a/cookiecutter/tap-template/cookiecutter.tests.yml +++ b/cookiecutter/tap-template/cookiecutter.tests.yml @@ -1,35 +1,30 @@ tests: - - source_name: DatabaseTemplateTest - tap_id: test-tap-database-type - stream_type: Database - auth_method": Custom or N/A - - source_name: RESTSimpleAuthTemplateTest tap_id: test-tap-rest-simpleauth-type stream_type: REST - auth_method": Simple + auth_method: Simple - source_name: RESTOAuthTemplateTest tap_id: test-tap-rest-oath-type stream_type: REST - auth_method": OAuth + auth_method: OAuth - source_name: RESTJWTTemplateTest tap_id: test-tap-rest-simpleauth-type stream_type: REST - auth_method": JWT + auth_method: JWT - source_name: RESTJWTTemplateTest tap_id: test-tap-rest-simpleauth-type stream_type: REST - auth_method": Custom or N/A + auth_method: Custom or N/A - source_name: GraphQLJWTTemplateTest tap_id: test-tap-graphql-jwt-type stream_type: GraphQL - auth_method": JWT + auth_method: JWT - source_name: OtherCustomTemplateTest tap_id: test-tap-other-custom-type stream_type: Other - auth_method": Custom or N/A + auth_method: Custom or N/A diff --git a/cookiecutter/tap-template/{{cookiecutter.library_name}}/README.md b/cookiecutter/tap-template/{{cookiecutter.library_name}}/README.md index 38323ca25..eb0b9c225 100644 --- a/cookiecutter/tap-template/{{cookiecutter.library_name}}/README.md +++ b/cookiecutter/tap-template/{{cookiecutter.library_name}}/README.md @@ -1,13 +1,6 @@ -_**NOTE:** The Singer SDK framework is still in early exploration and development phases. For more -information and to be updated on this project, please feel free to subscribe to our -[original Meltano thread](https://gitlab.com/meltano/meltano/-/issues/2401) and the -[initial PR for the underlying framework](https://gitlab.com/meltano/singer-sdk/-/merge_requests/1)._ +# {{cookiecutter.tap_id}} --------------------------------- - -# Welcome to the {{cookiecutter.tap_id}} Singer Tap! - -This Singer-compliant tap was created using the [Singer SDK](https://gitlab.com/meltano/singer-sdk). +This Singer tap was created using the [Singer SDK](https://gitlab.com/meltano/singer-sdk). ## Getting Started diff --git a/cookiecutter/tap-template/{{cookiecutter.library_name}}/pyproject.toml b/cookiecutter/tap-template/{{cookiecutter.library_name}}/pyproject.toml index adda905f8..50d2dc85d 100644 --- a/cookiecutter/tap-template/{{cookiecutter.library_name}}/pyproject.toml +++ b/cookiecutter/tap-template/{{cookiecutter.library_name}}/pyproject.toml @@ -1,17 +1,14 @@ [tool.poetry] name = "{{cookiecutter.tap_id}}" version = "0.0.1" -description = "`{{cookiecutter.tap_id}}` is Singer-compliant {{cookiecutter.source_name}} tap built with Singer SDK." +description = "`{{cookiecutter.tap_id}}` is Singer tap for {{cookiecutter.source_name}}, built with the Singer SDK." authors = ["TODO: Your Name "] license = "Apache v2" [tool.poetry.dependencies] python = "^3.6" requests = "^2.25.1" -# Note: Until we clear the first non-prerelease, `singer-sdk` need to be pinned to a specific version. -# For a list of released versions: https://pypi.org/project/singer-sdk/#history -# To safely update the version number: `poetry add singer-sdk==0.0.2-dev.1068770959` -singer-sdk = "0.0.2-dev.1068770959" +singer-sdk = "^0.1.0" [tool.poetry.dev-dependencies] pytest = "^6.1.2" diff --git a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/streams.py b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/streams.py index 57b069699..a2d5b093b 100644 --- a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/streams.py +++ b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/streams.py @@ -12,7 +12,7 @@ from singer_sdk.streams import {{ cookiecutter.stream_type }}Stream {% endif %} -{% if cookiecutter.stream_type in ["GraphQL", "REST"] %} +{% if cookiecutter.stream_type in ("GraphQL", "REST") %} from singer_sdk.authenticators import ( APIAuthenticatorBase, SimpleAuthenticator, @@ -20,7 +20,7 @@ OAuthJWTAuthenticator ) {% endif %} -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( ArrayType, BooleanType, DateTimeType, @@ -39,7 +39,7 @@ class {{ cookiecutter.source_name }}Stream(Stream): """Stream class for {{ cookiecutter.source_name }} streams.""" - def get_records(self, partition: Optional[dict]) -> Iterable[dict]: + def get_records(self, partition: Optional[dict] = None) -> Iterable[dict]: """Return a generator of row-type dictionary objects.""" # TODO: Write logic to extract data from the upstream source. # rows = mysource.getall() @@ -47,12 +47,17 @@ def get_records(self, partition: Optional[dict]) -> Iterable[dict]: # yield row.to_dict() raise NotImplementedError("The method is not yet implemented (TODO)") -{% endif %} -{% if cookiecutter.stream_type in ["GraphQL", "REST"] %} +{% elif cookiecutter.stream_type in ("GraphQL", "REST") %} class {{ cookiecutter.source_name }}Stream({{ cookiecutter.stream_type }}Stream): """{{ cookiecutter.source_name }} stream class.""" - url_base = "https://api.mysample.com" + @property + def url_base(self) -> str: + """Return the API URL root, configurable via tap settings.""" + return self.config["api_url"] + + # Alternatively, use a static string for url_base: + # url_base = "https://api.mysample.com" {% if cookiecutter.stream_type == "REST" %} def get_url_params( @@ -66,9 +71,9 @@ def get_url_params( logic. """ params = {} - starting_datetime = self.get_starting_datetime(partition) + starting_datetime = self.get_starting_timestamp(partition) if starting_datetime: - params.update({"updated": starting_datetime}) + params["updated"] = starting_datetime return params {% endif %} @@ -100,10 +105,12 @@ def authenticator(self) -> APIAuthenticatorBase: {% if cookiecutter.stream_type == "GraphQL" %} -# TODO: - Override `StreamA` and `StreamB` with your own stream definition. +# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition. # - Copy-paste as many times as needed to create multiple stream types. -class StreamA({{ cookiecutter.source_name }}Stream): +class UsersStream({{ cookiecutter.source_name }}Stream): name = "users" + # Optionally, you may also use `schema_filepath` in place of `schema`: + # schema_filepath = SCHEMAS_DIR / "users.json" schema = PropertiesList( Property("name", StringType), Property("id", StringType), @@ -137,7 +144,7 @@ class StreamA({{ cookiecutter.source_name }}Stream): """ -class StreamB({{ cookiecutter.source_name }}Stream): +class GroupsStream({{ cookiecutter.source_name }}Stream): name = "groups" schema = PropertiesList( Property("name", StringType), @@ -155,16 +162,18 @@ class StreamB({{ cookiecutter.source_name }}Stream): """ -{% elif cookiecutter.stream_type in ["Other", "REST"] %} -# TODO: - Override `StreamA` and `StreamB` with your own stream definition. +{% elif cookiecutter.stream_type in ("Other", "REST") %} +# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition. # - Copy-paste as many times as needed to create multiple stream types. -class StreamA({{ cookiecutter.source_name }}Stream): +class UsersStream({{ cookiecutter.source_name }}Stream): name = "users" -{% if cookiecutter.stream_type in ["REST"] %} +{% if cookiecutter.stream_type == "REST" %} path = "/users" {% endif %} primary_keys = ["id"] replication_key = None + # Optionally, you may also use `schema_filepath` in place of `schema`: + # schema_filepath = SCHEMAS_DIR / "users.json" schema = PropertiesList( Property("name", StringType), Property("id", StringType), @@ -177,9 +186,9 @@ class StreamA({{ cookiecutter.source_name }}Stream): ).to_dict() -class StreamB({{ cookiecutter.source_name }}Stream): +class GroupsStream({{ cookiecutter.source_name }}Stream): name = "groups" -{% if cookiecutter.stream_type in ["REST"] %} +{% if cookiecutter.stream_type == "REST" %} path = "/groups" {% endif %} primary_keys = ["id"] @@ -190,25 +199,3 @@ class StreamB({{ cookiecutter.source_name }}Stream): Property("modified", DateTimeType), ).to_dict() {% endif %} - -{% if cookiecutter.stream_type == "Database" %} -class {{ cookiecutter.source_name }}Stream(DatabaseStream): - """Stream class for {{ cookiecutter.source_name }} database streams.""" - - @classmethod - def execute_query(cls, sql: Union[str, List[str]], config) -> Iterable[dict]: - """Run a query in snowflake.""" - connection = cls.open_connection(config=config) - """Connect to database.""" - # TODO: Define the process of executing a query against your database - # and returning a list or other iterable containing the resulting - # rows. - raise NotImplementedError("The method is not yet implemented (TODO)") - - @classmethod - def open_connection(cls, config) -> Any: - """Connect to database.""" - # TODO: Define the process of connecting to your database and returning - # a connection object. - raise NotImplementedError("The method is not yet implemented (TODO)") -{% endif %} diff --git a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tap.py b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tap.py index 62e974070..2477053f2 100644 --- a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tap.py +++ b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tap.py @@ -4,7 +4,7 @@ from typing import List import click from singer_sdk import Tap, Stream -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( ArrayType, BooleanType, DateTimeType, @@ -19,20 +19,18 @@ # TODO: Import your custom stream types here: from {{ cookiecutter.library_name }}.streams import ( {{ cookiecutter.source_name }}Stream, -{% if cookiecutter.stream_type in ["GraphQL", "REST", "Other"] %} - StreamA, - StreamB, +{% if cookiecutter.stream_type in ("GraphQL", "REST", "Other") %} + UsersStream, + GroupsStream, {% endif %} ) -PLUGIN_NAME = "{{ cookiecutter.tap_id }}" - -{% if cookiecutter.stream_type in ["GraphQL", "REST", "Other"] %} +{% if cookiecutter.stream_type in ("GraphQL", "REST", "Other") %} # TODO: Compile a list of custom stream types here # OR rewrite discover_streams() below with your custom logic. STREAM_TYPES = [ - StreamA, - StreamB, + UsersStream, + GroupsStream, ] {% endif %} @@ -46,10 +44,10 @@ class Tap{{ cookiecutter.source_name }}(Tap): Property("auth_token", StringType, required=True), Property("project_ids", ArrayType(StringType), required=True), Property("start_date", DateTimeType), - Property("api_url", StringType), + Property("api_url", StringType, default="https://api.mysample.com"), ).to_dict() -{% if cookiecutter.stream_type in ["GraphQL", "REST", "Other"] %} +{% if cookiecutter.stream_type in ("GraphQL", "REST", "Other") %} def discover_streams(self) -> List[Stream]: """Return a list of discovered streams.""" return [stream_class(tap=self) for stream_class in STREAM_TYPES] diff --git a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/__init__.py b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/__init__.py index 3a28a3af8..a16590961 100644 --- a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/__init__.py +++ b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/__init__.py @@ -1 +1 @@ -"""Test suite for {{ cookiecutter.library_name }}.""" +"""Test suite for {{ cookiecutter.tap_id }}.""" diff --git a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/{{ 'test' }}_core.py b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/{{ 'test' }}_core.py index 22bdf6b9a..766010785 100644 --- a/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/{{ 'test' }}_core.py +++ b/cookiecutter/tap-template/{{cookiecutter.library_name}}/{{cookiecutter.library_name}}/tests/{{ 'test' }}_core.py @@ -1,12 +1,13 @@ """Tests init and discovery features for {{ cookiecutter.tap_id }}.""" -from singer_sdk.helpers.util import utc_now +import datetime + from singer_sdk.helpers.testing import get_basic_tap_test from {{ cookiecutter.library_name }}.tap import Tap{{ cookiecutter.source_name }} SAMPLE_CONFIG = { - "start_date": utc_now() + "start_date": datetime.datetime.now(datetime.timezone.utc) # TODO: Initialize minimal tap config and/or register env vars in test harness } diff --git a/docs/dev_guide.md b/docs/dev_guide.md index 5019e7aa4..cc270bab4 100644 --- a/docs/dev_guide.md +++ b/docs/dev_guide.md @@ -12,8 +12,6 @@ _Create with `singer-sdk` requires overriding just two classes:_ - `Stream` - _The **generic** base class for streams._ - `RESTStream` - _The base class for **REST**-type streams._ - `GraphQLStream` - _The base class for **GraphQL**-type streams._ - - `DatabaseStream` - _The base class for **database**-type streams - specifically those - which support the SQL language._ **Detailed Instructions:** @@ -23,7 +21,6 @@ _Create with `singer-sdk` requires overriding just two classes:_ 1. ['Generic' stream classes](#generic-stream-classes) 2. ['API' stream classes](#api-stream-classes) 3. ['GraphQL' stream classes](#graphql-stream-classes) - 4. ['Database' stream classes](#database-stream-classes) ## Step 1: Initialize a new tap repo @@ -126,34 +123,6 @@ GraphQL streams are very similar to REST API-based streams, but instead of speci - [Countries API Tap](/singer_sdk/samples/sample_tap_countries/countries_tap.py) - [Countries API Streams](/singer_sdk/samples/sample_tap_countries/countries_streams.py) -### 'Database' stream classes - -`NOTE: The Database stream class is not fully tested and is less mature than other stream types. For more info: https://gitlab.com/meltano/singer-sdk/-/issues/45` - -_Database streams inherit from the class `DatabaseStream`. To create a database -stream class, you will first override the `execute_query()` method. Depending upon how closely your -source complies with standard `information_schema` conventions, you may also override between -one and four class properties, in order to override specific metadata queries._ - -**All database stream classes override:** - -2. **`execute_query()` method** - This method should run a give SQL statement and incrementally return a dictionary - object for each resulting row. - -_Depending upon your implementation, you may also want to override one or more of the following properties:_ - -1. `open_connection()` method - (Optional.) Open a connection to the database and return a connection object. -2. `table_scan_sql` - A SQL string which should query for all tables, returning three columns: `database_name`, `schema_name`, and `table_name`. -3. `view_scan_sql` - A SQL string which should query for all views, returning three columns: `database_name`, `schema_name`, and `view_name`. -4. `column_scan_sql` - A SQL string which should query for all columns, returning five columns: `database_name`, `schema_name`, `table_or_view_name`, `column_name`, and `data_type`. -5. `primary_key_scan_sql` - Optional. A SQL string which should query for the list of primary keys, returning five columns: `database_name`, `schema_name`, `table_name`, `pk_column_name`. - -**More info:** - -- For more info, see the [Snowflake](/singer_sdk/samples/sample_tap_snowflake) sample: - - [Snowflake tap](/singer_sdk/samples/sample_tap_snowflake/snowflake_tap.py) - - [Snowflake streams](/singer_sdk/samples/sample_tap_snowflake/snowflake_tap_stream.py) - ## Singer SDK Implementation Details For more detailed information about the Singer SDK implementation, please see the diff --git a/docs/implementation/cli.md b/docs/implementation/cli.md index cbc3b770e..1e5a1ceca 100644 --- a/docs/implementation/cli.md +++ b/docs/implementation/cli.md @@ -19,6 +19,6 @@ When `--config=ENV` is specified, the SDK will automatically capture and pass al values from environment variables which match the exact name of a setting, along with a prefix determined by the plugin name. -> For example: For a sample plugin named `tap-my-example`, the SDK will automatically scrape -> the settings from environment variables `TAP_MY_EXAMPLE_username` and -> `TAP_MY_EXAMPLE_access_key`, if they exist. +> For example: For a sample plugin named `tap-my-example` and settings named "username" and "access_key", the SDK will automatically scrape +> the settings from environment variables `TAP_MY_EXAMPLE_USERNAME` and +> `TAP_MY_EXAMPLE_ACCESS_KEY`, if they exist. diff --git a/docs/tests.md b/docs/tests.md index 293b76e0a..07987919b 100644 --- a/docs/tests.md +++ b/docs/tests.md @@ -11,7 +11,7 @@ poetry install && \ poetry run tap-parquet --help ``` -**Run 'sync' with auto-discovery...** +### Run in sync sync mode with auto-discovery ```bash poetry install && \ diff --git a/poetry.lock b/poetry.lock index 2d1f1f452..e9ff5e342 100644 --- a/poetry.lock +++ b/poetry.lock @@ -229,7 +229,7 @@ python-versions = "*" name = "jinja2" version = "2.11.3" description = "A very fast and expressive template engine." -category = "main" +category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" @@ -273,7 +273,7 @@ format_nongpl = ["idna", "jsonpointer (>1.13)", "webcolors", "rfc3986-validator name = "markupsafe" version = "1.1.1" description = "Safely add untrusted strings to HTML/XML markup." -category = "main" +category = "dev" optional = false python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*" @@ -313,7 +313,7 @@ python-versions = "*" name = "numpy" version = "1.19.5" description = "NumPy is the fundamental package for array computing with Python." -category = "main" +category = "dev" optional = false python-versions = ">=3.6" @@ -402,7 +402,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" name = "pyarrow" version = "2.0.0" description = "Python library for Apache Arrow" -category = "main" +category = "dev" optional = false python-versions = ">=3.5" @@ -497,11 +497,11 @@ testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xm [[package]] name = "python-dateutil" -version = "2.8.0" +version = "2.8.1" description = "Extensions to the standard Python datetime module" category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" [package.dependencies] six = ">=1.5" @@ -665,7 +665,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pyt [metadata] lock-version = "1.1" python-versions = "<3.9,>=3.6" -content-hash = "bfdafb0831598a34092779bf0ca1516179a3c580cf85a71a1458712e800706fb" +content-hash = "8744fff7f705e8b2c3a65dc69334ef3972e38192062cd25d21628942c44eb8ce" [metadata.files] appdirs = [ @@ -1018,8 +1018,8 @@ pytest = [ {file = "pytest-6.2.2.tar.gz", hash = "sha256:9d1edf9e7d0b84d72ea3dbcdfd22b35fb543a5e8f2a60092dd578936bf63d7f9"}, ] python-dateutil = [ - {file = "python-dateutil-2.8.0.tar.gz", hash = "sha256:c89805f6f4d64db21ed966fda138f8a5ed7a4fdbc1a8ee329ce1b74e3c74da9e"}, - {file = "python_dateutil-2.8.0-py2.py3-none-any.whl", hash = "sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb"}, + {file = "python-dateutil-2.8.1.tar.gz", hash = "sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c"}, + {file = "python_dateutil-2.8.1-py2.py3-none-any.whl", hash = "sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"}, ] python-slugify = [ {file = "python-slugify-4.0.1.tar.gz", hash = "sha256:69a517766e00c1268e5bbfc0d010a0a8508de0b18d30ad5a1ff357f8ae724270"}, diff --git a/pyproject.toml b/pyproject.toml index 66635354d..e4b274707 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "singer-sdk" version = "0.0.2" -description = "An open framework for building singer-compliant taps" +description = "A framework for building Singer taps" authors = ["Meltano Team and Contributors"] license = "Apache v2" @@ -10,10 +10,7 @@ python = "<3.9,>=3.6" pipelinewise-singer-python = "1.2.0" backoff = "1.8.0" pendulum = "1.2.0" -python-dateutil = "<2.8.1,>=2.1" -pyarrow = "^2.0.0" click = "^7.1.2" -Jinja2 = "^2.11.2" PyJWT = "1.7.1" requests = "^2.25.1" cryptography = "^3.4.6" diff --git a/singer_sdk/__init__.py b/singer_sdk/__init__.py index f737e1ad8..bcd62119d 100644 --- a/singer_sdk/__init__.py +++ b/singer_sdk/__init__.py @@ -5,7 +5,6 @@ from singer_sdk import streams from singer_sdk.streams import ( Stream, - DatabaseStream, RESTStream, GraphQLStream, ) @@ -15,7 +14,6 @@ "Tap", "streams", "Stream", - "DatabaseStream", "RESTStream", "GraphQLStream", ] diff --git a/singer_sdk/authenticators.py b/singer_sdk/authenticators.py index 60d59a9f5..5430cc0f3 100644 --- a/singer_sdk/authenticators.py +++ b/singer_sdk/authenticators.py @@ -13,7 +13,7 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend -from singer_sdk.helpers.util import utc_now +from singer_sdk.helpers._util import utc_now from singer_sdk.streams import Stream as RESTStreamBase from singer import utils @@ -75,6 +75,15 @@ def __init__( self.last_refreshed: Optional[datetime] = None self.expires_in: Optional[int] = None + @property + def http_headers(self) -> dict: + """Return a dictionary of HTTP headers, including any authentication tokens.""" + if not self.is_token_valid(): + self.update_access_token() + result = super().http_headers + result["Authorization"] = f"Bearer {self.access_token}" + return result + @property def auth_endpoint(self) -> str: """Return the authorization endpoint.""" @@ -87,39 +96,6 @@ def oauth_scopes(self) -> Optional[str]: """Return a string with the OAuth scopes, or None if not set.""" return self._oauth_scopes - @property - def client_id(self) -> Optional[str]: - """Return client ID string to be used in authentication or None if not set.""" - if self.config: - return self.config.get("client_id", self.config.get("client_email")) - return None - - @property - def client_secret(self) -> Optional[str]: - """Return client secret to be used in authentication or None if not set.""" - if self.config: - return self.config.get("client_secret") - return None - - @property - def http_headers(self) -> dict: - """Return a dictionary of HTTP headers, including any authentication tokens.""" - if not self.is_token_valid(): - self.update_access_token() - result = super().http_headers - result["Authorization"] = f"Bearer {self.access_token}" - return result - - def is_token_valid(self) -> bool: - """Return true if token is valid.""" - if self.last_refreshed is None: - return False - if not self.expires_in: - return True - if self.expires_in > (utils.now() - self.last_refreshed).total_seconds(): - return True - return False - @property def oauth_request_payload(self) -> dict: """Return the request body directly (OAuth) or encrypted (JWT).""" @@ -142,11 +118,36 @@ def oauth_request_body(self) -> dict: 'username': self.config.get("username", self.config["client_id"]), 'password': self.config["password"], } + ``` """ raise NotImplementedError( "The `oauth_request_body` property was not defined in the subclass." ) + @property + def client_id(self) -> Optional[str]: + """Return client ID string to be used in authentication or None if not set.""" + if self.config: + return self.config.get("client_id") + return None + + @property + def client_secret(self) -> Optional[str]: + """Return client secret to be used in authentication or None if not set.""" + if self.config: + return self.config.get("client_secret") + return None + + def is_token_valid(self) -> bool: + """Return true if token is valid.""" + if self.last_refreshed is None: + return False + if not self.expires_in: + return True + if self.expires_in > (utils.now() - self.last_refreshed).total_seconds(): + return True + return False + # Authentication and refresh def update_access_token(self): """Update `access_token` along with: `last_refreshed` and `expires_in`.""" diff --git a/singer_sdk/helpers/classproperty.py b/singer_sdk/helpers/_classproperty.py similarity index 100% rename from singer_sdk/helpers/classproperty.py rename to singer_sdk/helpers/_classproperty.py diff --git a/singer_sdk/helpers/_compat.py b/singer_sdk/helpers/_compat.py new file mode 100644 index 000000000..2c39f71fb --- /dev/null +++ b/singer_sdk/helpers/_compat.py @@ -0,0 +1,15 @@ +"""Compatibility helpers.""" + +try: + from typing import final +except ImportError: + # Final not available until Python3.8 + final = lambda f: f # noqa: E731 + +try: + from importlib import metadata +except ImportError: + # Running on pre-3.8 Python; use importlib-metadata package + import importlib_metadata as metadata # type: ignore + +__all__ = ["metadata", "final"] diff --git a/singer_sdk/helpers/secrets.py b/singer_sdk/helpers/_secrets.py similarity index 94% rename from singer_sdk/helpers/secrets.py rename to singer_sdk/helpers/_secrets.py index 7c0cee930..fdf8336a1 100644 --- a/singer_sdk/helpers/secrets.py +++ b/singer_sdk/helpers/_secrets.py @@ -17,14 +17,12 @@ def is_common_secret_key(key_name: str) -> bool: """Return true if the key_name value matches a known secret name or pattern.""" if key_name in COMMON_SECRET_KEYS: return True - if any( + return any( [ key_name.lower().endswith(key_suffix) for key_suffix in COMMON_SECRET_KEY_SUFFIXES ] - ): - return True - return False + ) class SecretString(str): diff --git a/singer_sdk/helpers/_state.py b/singer_sdk/helpers/_state.py new file mode 100644 index 000000000..7227f5686 --- /dev/null +++ b/singer_sdk/helpers/_state.py @@ -0,0 +1,176 @@ +"""Helper functions for state and bookmark management.""" + + +from typing import Any, List, Optional + + +def get_state_if_exists( + state: dict, + tap_stream_id: str, + partition: Optional[dict] = None, + key: Optional[str] = None, +) -> Optional[Any]: + """Return the stream or partition state, creating a new one if it does not exist. + + Parameters + ---------- + state : dict + the existing state dict which contains all streams. + tap_stream_id : str + the id of the stream + partition : Optional[dict], optional + keys which identify the partition context, by default None (not partitioned) + key : Optional[str], optional + name of the key searched for, by default None (return entire state if found) + + Returns + ------- + Optional[Any] + Returns the state if exists, otherwise None + + Raises + ------ + ValueError + Raised if state is invalid or cannot be parsed. + + """ + if "bookmarks" not in state: + return None + if tap_stream_id not in state["bookmarks"]: + return None + + stream_state = state["bookmarks"][tap_stream_id] + if not partition: + if key: + return stream_state.get(key, None) + return stream_state + if "partitions" not in stream_state: + return None + + stream_state_partitions = stream_state["partitions"] + found = [ + partition_state + for partition_state in stream_state_partitions + if partition_state["context"] == partition + ] + if not found: + return None # Partition definition not present + if len(found) > 1: + raise ValueError( + f"State file contains duplicate entries for partition: {partition}" + ) + + matched_partition: dict = found[0] + if key: + return matched_partition.get(key, None) + return matched_partition + + +def get_state_partitions_list(state: dict, tap_stream_id: str) -> Optional[List[dict]]: + """Return a list of partitions defined in the state, or None if not defined.""" + return (get_state_if_exists(state, tap_stream_id) or {}).get("partitions", None) + + +def get_writeable_state_dict( + state: dict, tap_stream_id: str, partition: Optional[dict] = None +) -> dict: + """Return the stream or partition state, creating a new one if it does not exist. + + Parameters + ---------- + state : dict + the existing state dict which contains all streams. + tap_stream_id : str + the id of the stream + partition : Optional[dict], optional + keys which identify the partition context, by default None (not partitioned) + + Returns + ------- + dict + Returns a writeable dict at the stream or partition level. + + Raises + ------ + ValueError + Raise an error if duplicate entries are found. + + """ + if "bookmarks" not in state: + state["bookmarks"] = {} + if tap_stream_id not in state["bookmarks"]: + state["bookmarks"][tap_stream_id] = {} + stream_state = state["bookmarks"][tap_stream_id] + if not partition: + return stream_state + if "partitions" not in stream_state: + stream_state["partitions"] = [] + stream_state_partitions = stream_state["partitions"] + found = [ + partition_state + for partition_state in stream_state_partitions + if partition_state["context"] == partition + ] + if len(found) > 1: + raise ValueError( + f"State file contains duplicate entries for partition: {partition}" + ) + if found: + return found[0] + # Existing partition not found. Creating new state entry in partitions list... + new_partition_state = {"context": partition} + stream_state_partitions.append(new_partition_state) + return new_partition_state + + +def read_stream_state( + state, + tap_stream_id: str, + key=None, + default: Any = None, + *, + partition: Optional[dict] = None, +) -> Any: + """Read stream state.""" + state_dict = get_writeable_state_dict(state, tap_stream_id, partition=partition) + if key: + return state_dict.get(key, default) + return state_dict or default + + +def write_stream_state( + state, tap_stream_id: str, key, val, *, partition: Optional[dict] = None +) -> None: + """Write stream state.""" + state_dict = get_writeable_state_dict(state, tap_stream_id, partition=partition) + state_dict[key] = val + + +def wipe_stream_state_keys( + state: dict, + tap_stream_id: str, + wipe_keys: List[str] = None, + *, + except_keys: List[str] = None, + partition: Optional[dict] = None, +) -> None: + """Wipe bookmarks. + + You may specify a list to wipe or a list to keep, but not both. + """ + state_dict = get_writeable_state_dict(state, tap_stream_id, partition=partition) + + if except_keys and wipe_keys: + raise ValueError( + "Incorrect number of arguments. " + "Expected `except_keys` or `wipe_keys` but not both." + ) + if except_keys: + wipe_keys = [ + found_key for found_key in state_dict.keys() if found_key not in except_keys + ] + wipe_keys = wipe_keys or [] + for wipe_key in wipe_keys: + if wipe_key in state: + state_dict.pop(wipe_key) + return diff --git a/singer_sdk/helpers/_typing.py b/singer_sdk/helpers/_typing.py new file mode 100644 index 000000000..28c2ceed4 --- /dev/null +++ b/singer_sdk/helpers/_typing.py @@ -0,0 +1,135 @@ +"""General helper functions for json typing.""" + +import copy +import datetime +import logging +from functools import lru_cache +from typing import Optional, Dict, Any + + +def append_type(type_dict: dict, new_type: str) -> dict: + """Return a combined type definition using the 'anyOf' JSON Schema construct.""" + result = copy.deepcopy(type_dict) + if "anyOf" in result: + if isinstance(result["anyOf"], list) and new_type not in result["anyOf"]: + result["anyOf"].append(new_type) + elif new_type != result["anyOf"]: + result["anyOf"] = [result["anyOf"], new_type] + elif "type" in result: + if isinstance(result["type"], list) and new_type not in result["type"]: + result["type"].append(new_type) + elif new_type != result["type"]: + result["type"] = [result["type"], new_type] + else: + raise ValueError("Could not append type because type was not detected.") + return result + + +def is_datetime_type(type_dict: dict) -> bool: + """Return True if JSON Schema type definition is a 'date-time' type. + + Also returns True if 'date-time' is nested within an 'anyOf' type Array. + """ + if not type_dict: + raise ValueError("Could not detect type from empty type_dict param.") + if "anyOf" in type_dict: + for type_dict in type_dict["anyOf"]: + if is_datetime_type(type_dict): + return True + return False + elif "type" in type_dict: + return type_dict.get("format") == "date-time" + raise ValueError( + f"Could not detect type of replication key using schema '{type_dict}'" + ) + + +def is_string_array_type(type_dict: dict) -> bool: + """Return True if JSON Schema type definition is a string array.""" + if not type_dict: + raise ValueError("Could not detect type from empty type_dict param.") + + if "anyOf" in type_dict: + return any([is_string_array_type(t) for t in type_dict["anyOf"]]) + + if "type" not in type_dict: + raise ValueError(f"Could not detect type from schema '{type_dict}'") + + return type_dict["type"] == "array" and is_string_type(type_dict["items"]) + + +def is_boolean_type(property_schema: dict) -> Optional[bool]: + """Return true if the JSON Schema type is a boolean or None if detection fails.""" + if "anyOf" not in property_schema and "type" not in property_schema: + return None # Could not detect data type + for property_type in property_schema.get("anyOf", [property_schema.get("type")]): + if "boolean" in property_type or property_type == "boolean": + return True + return False + + +def is_string_type(property_schema: dict) -> Optional[bool]: + """Return true if the JSON Schema type is a boolean or None if detection fails.""" + if "anyOf" not in property_schema and "type" not in property_schema: + return None # Could not detect data type + for property_type in property_schema.get("anyOf", [property_schema.get("type")]): + if "string" in property_type or property_type == "string": + return True + return False + + +@lru_cache() +def _warn_unmapped_property( + stream_name: str, property_name: str, logger: logging.Logger +): + logger.warning( + f"Property '{property_name}' was present in the '{stream_name}' stream but " + "not found in catalog schema. Ignoring." + ) + + +def conform_record_data_types( # noqa: C901 + stream_name: str, row: Dict[str, Any], schema: dict, logger: logging.Logger +) -> Dict[str, Any]: + """Translate values in record dictionary to singer-compatible data types. + + Any property names not found in the schema catalog will be removed, and a + warning will be logged exactly once per unmapped property name. + """ + rec: Dict[str, Any] = {} + for property_name, elem in row.items(): + if property_name not in schema["properties"]: + _warn_unmapped_property(stream_name, property_name, logger) + continue + + property_schema = schema["properties"][property_name] + if isinstance(elem, datetime.datetime): + rec[property_name] = elem.isoformat() + "+00:00" + elif isinstance(elem, datetime.date): + rec[property_name] = elem.isoformat() + "T00:00:00+00:00" + elif isinstance(elem, datetime.timedelta): + epoch = datetime.datetime.utcfromtimestamp(0) + timedelta_from_epoch = epoch + elem + rec[property_name] = timedelta_from_epoch.isoformat() + "+00:00" + elif isinstance(elem, datetime.time): + rec[property_name] = str(elem) + elif isinstance(elem, bytes): + # for BIT value, treat 0 as False and anything else as True + bit_representation: bool + if is_boolean_type(property_schema): + bit_representation = elem != b"\x00" + rec[property_name] = bit_representation + else: + rec[property_name] = elem.hex() + elif is_boolean_type(property_schema): + boolean_representation: Optional[bool] + if elem is None: + boolean_representation = None + elif elem == 0: + boolean_representation = False + else: + boolean_representation = True + rec[property_name] = boolean_representation + else: + rec[property_name] = elem + return rec diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py new file mode 100644 index 000000000..4c186ae01 --- /dev/null +++ b/singer_sdk/helpers/_util.py @@ -0,0 +1,27 @@ +"""General helper functions, helper classes, and decorators.""" + +import json +from pathlib import Path, PurePath +from typing import Any, Dict, Union + +import pendulum + + +def read_json_file(path: Union[PurePath, str]) -> Dict[str, Any]: + """Read json file, thowing an error if missing.""" + if not path: + raise RuntimeError("Could not open file. Filepath not provided.") + + if not Path(path).exists(): + msg = f"File at '{path}' was not found." + for template in [f"{path}.template"]: + if Path(template).exists(): + msg += f"\nFor more info, please see the sample template at: {template}" + raise FileExistsError(msg) + + return json.loads(Path(path).read_text()) + + +def utc_now(): + """Return current time in UTC.""" + return pendulum.utcnow() diff --git a/singer_sdk/helpers/state.py b/singer_sdk/helpers/state.py deleted file mode 100644 index 12cec677b..000000000 --- a/singer_sdk/helpers/state.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Helper functions for state and bookmark management.""" - - -from typing import Any, List, Optional - - -def get_stream_state_dict( - state: dict, tap_stream_id: str, partition: Optional[dict] = None -) -> dict: - """Return the stream or partition state, creating a new one if it does not exist. - - Parameters - ---------- - state : dict - the existing state dict which contains all streams. - tap_stream_id : str - the id of the stream - partition : Optional[dict], optional - keys which identify the partition context, by default None (not partitioned) - - Returns - ------- - dict - Returns a writeable dict at the stream or partition level. - - Raises - ------ - ValueError - Raise an error if duplicate entries are found. - - """ - if "bookmarks" not in state: - state["bookmarks"] = {} - if tap_stream_id not in state["bookmarks"]: - state["bookmarks"][tap_stream_id] = {} - if not partition: - return state["bookmarks"][tap_stream_id] - if "partitions" not in state["bookmarks"][tap_stream_id]: - state["bookmarks"][tap_stream_id]["partitions"] = [] - else: - found = [ - partition_state - for partition_state in state["bookmarks"][tap_stream_id]["partitions"] - if partition_state.get("context") == partition - ] - if len(found) > 1: - raise ValueError( - "State file contains duplicate entries for partition definition: " - f"{partition}" - ) - if found: - return found[0] - # Existing partition not found. Creating new state entry in partitions list... - new_dict = {"context": partition} - state["bookmarks"][tap_stream_id]["partitions"].append(new_dict) - return new_dict - - -def read_stream_state( - state, - tap_stream_id: str, - key=None, - default: Any = None, - *, - partition: Optional[dict] = None, -) -> Any: - """Read stream state.""" - state_dict = get_stream_state_dict(state, tap_stream_id, partition=partition) - if key: - return state_dict.get(key, default) - return state_dict or default - - -def write_stream_state( - state, tap_stream_id: str, key, val, *, partition: Optional[dict] = None -) -> None: - """Write stream state.""" - state_dict = get_stream_state_dict(state, tap_stream_id, partition=partition) - state_dict[key] = val - - -def wipe_stream_state_keys( - state: dict, - tap_stream_id: str, - wipe_keys: List[str] = None, - *, - except_keys: List[str] = None, - partition: Optional[dict] = None, -) -> None: - """Wipe bookmarks. - - You may specify a list to wipe or a list to keep, but not both. - """ - state_dict = get_stream_state_dict(state, tap_stream_id, partition=partition) - - if except_keys and wipe_keys: - raise ValueError( - "Incorrect number of arguments. " - "Expected `except_keys` or `wipe_keys` but not both." - ) - if except_keys: - wipe_keys = [ - found_key for found_key in state_dict.keys() if found_key not in except_keys - ] - wipe_keys = wipe_keys or [] - for wipe_key in wipe_keys: - if wipe_key in state: - state_dict.pop(wipe_key) - return diff --git a/singer_sdk/helpers/util.py b/singer_sdk/helpers/util.py deleted file mode 100644 index 0a9e9859f..000000000 --- a/singer_sdk/helpers/util.py +++ /dev/null @@ -1,89 +0,0 @@ -"""General helper functions, helper classes, and decorators.""" - -from decimal import Decimal -import json -from pathlib import Path, PurePath - -import pendulum -from typing import Any, Dict, List, Optional, Union, cast - - -def read_json_file(path: Union[PurePath, str]) -> Dict[str, Any]: - """Read json file, thowing an error if missing.""" - if not path: - raise RuntimeError("Could not open file. Filepath not provided.") - if Path(path).exists(): - return json.loads(Path(path).read_text()) - else: - msg = f"File at '{path}' was not found." - for template in [f"{path}.template"]: - if Path(template).exists(): - msg += f"\nFor more info, please see the sample template at: {template}" - raise FileExistsError(msg) - - -def utc_now(): - """Return current time in UTC.""" - return pendulum.utcnow() - - -def get_catalog_entries(catalog_dict: dict) -> List[dict]: - """Parse the catalog dict and return a list of catalog entries.""" - if "streams" not in catalog_dict: - raise ValueError("Catalog does not contain expected 'streams' collection.") - if not catalog_dict.get("streams"): - raise ValueError("Catalog does not contain any streams.") - return cast(List[dict], catalog_dict.get("streams")) - - -def get_catalog_entry_name(catalog_entry: dict) -> str: - """Return the name of the provided catalog entry dict.""" - result = catalog_entry.get("stream", catalog_entry.get("tap_stream_id", None)) - if not result: - raise ValueError( - "Stream name could not be identified due to missing or blank" - "'stream' and 'tap_stream_id' values." - ) - return result - - -def get_catalog_entry_schema(catalog_entry: dict) -> dict: - """Return the JSON Schema dict for the specified catalog entry dict.""" - result = catalog_entry.get("schema", None) - if not result: - raise ValueError( - "Stream does not have a valid schema. Please check that the catalog file " - "is properly formatted." - ) - return result - - -def get_property_schema(schema: dict, property: str) -> Optional[dict]: - """Given the provided JSON Schema, return the property by name specified. - - If property name does not exist in schema, return None. - """ - if property not in schema["properties"]: - return None - return schema["properties"][property] - - -def is_boolean_type(property_schema: dict) -> Optional[bool]: - """Return true if the JSON Schema type is a boolean or None if detection fails.""" - if "anyOf" not in property_schema and "type" not in property_schema: - return None # Could not detect data type - for property_type in property_schema.get("anyOf", [property_schema.get("type")]): - if "boolean" in property_type or property_type == "boolean": - return True - return False - - -def _float_to_decimal(value): - """Walk the given data structure and turn all instances of float into double.""" - if isinstance(value, float): - return Decimal(str(value)) - if isinstance(value, list): - return [_float_to_decimal(child) for child in value] - if isinstance(value, dict): - return {k: _float_to_decimal(v) for k, v in value.items()} - return value diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index fed272ce9..96780a065 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -10,30 +10,25 @@ from jsonschema import ValidationError, SchemaError, Draft4Validator from pathlib import PurePath -from singer_sdk.helpers.classproperty import classproperty -from singer_sdk.helpers.util import read_json_file -from singer_sdk.helpers.secrets import is_common_secret_key, SecretString -from singer_sdk.helpers.typing import extend_with_default +from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers._compat import metadata +from singer_sdk.helpers._util import read_json_file +from singer_sdk.helpers._secrets import is_common_secret_key, SecretString +from singer_sdk.helpers._typing import is_string_array_type +from singer_sdk.typing import extend_validator_with_defaults import click SDK_PACKAGE_NAME = "singer_sdk" -try: - from importlib import metadata -except ImportError: - # Running on pre-3.8 Python; use importlib-metadata package - import importlib_metadata as metadata # type: ignore - - -JSONSchemaValidator = extend_with_default(Draft4Validator) +JSONSchemaValidator = extend_validator_with_defaults(Draft4Validator) class PluginBase(metaclass=abc.ABCMeta): """Abstract base class for taps.""" - name: str = "sample-plugin-name" + name: str = None config_jsonschema: Optional[dict] = None _config: dict @@ -90,9 +85,8 @@ def capabilities(self) -> List[str]: def _env_var_config(cls) -> Dict[str, Any]: """Return any config specified in environment variables. - Variables must match the convention "PLUGIN_NAME_setting_name", - with dashes converted to underscores, the plugin name converted to all - caps, and the setting name in same-case as specified in settings config. + Variables must match the convention "_", + all uppercase with dashes converted to underscores. """ result: Dict[str, Any] = {} plugin_env_prefix = f"{cls.name.upper().replace('-', '_')}_" @@ -103,10 +97,16 @@ def _env_var_config(cls) -> Dict[str, Any]: cls.logger.info( f"Parsing '{config_key}' config from env variable '{env_var_name}'." ) - if env_var_value[0] == "[" and env_var_value[-1] == "]": - result[config_key] = ( - env_var_value.lstrip("[").rstrip("]").split(",") - ) + if is_string_array_type( + cls.config_jsonschema["properties"][config_key] + ): + if env_var_value[0] == "[" and env_var_value[-1] == "]": + raise ValueError( + "A bracketed list was detected in the environment variable " + f"'{env_var_name}'. This syntax is no longer supported. " + "Please remove the brackets and try again." + ) + result[config_key] = env_var_value.split(",") else: result[config_key] = env_var_value return result @@ -138,11 +138,6 @@ def state(self) -> dict: """Return the state dict for the plugin.""" raise NotImplementedError() - @property - def input_catalog(self) -> Optional[dict]: - """Return the catalog dictionary input, or None if not provided.""" - raise NotImplementedError() - # Core plugin config: @property diff --git a/singer_sdk/samples/sample_tap_countries/countries_streams.py b/singer_sdk/samples/sample_tap_countries/countries_streams.py index e8617cc7c..64967511d 100644 --- a/singer_sdk/samples/sample_tap_countries/countries_streams.py +++ b/singer_sdk/samples/sample_tap_countries/countries_streams.py @@ -28,7 +28,7 @@ class CountriesStream(CountriesAPIStream): name = "countries" primary_keys = ["code"] - schema_filepath = "./singer_sdk/samples/sample_tap_countries/schemas/countries.json" + schema_filepath = SCHEMAS_DIR / "countries.json" query = """ countries { code @@ -54,6 +54,7 @@ class ContinentsStream(CountriesAPIStream): """Continents stream from the Countries API.""" name = "continents" + primary_keys = ["code"] schema_filepath = SCHEMAS_DIR / "continents.json" query = """ continents { diff --git a/singer_sdk/samples/sample_tap_countries/countries_tap.py b/singer_sdk/samples/sample_tap_countries/countries_tap.py index 90b204c36..1aea17d7f 100644 --- a/singer_sdk/samples/sample_tap_countries/countries_tap.py +++ b/singer_sdk/samples/sample_tap_countries/countries_tap.py @@ -13,15 +13,13 @@ CountriesStream, ContinentsStream, ) -from singer_sdk.helpers.typing import PropertiesList - -PLUGIN_NAME = "sample-tap-countries" +from singer_sdk.typing import PropertiesList class SampleTapCountries(Tap): """Sample tap for Countries GraphQL API.""" - name: str = PLUGIN_NAME + name: str = "sample-tap-countries" config_jsonschema = PropertiesList().to_dict() def discover_streams(self) -> List[Stream]: diff --git a/singer_sdk/samples/sample_tap_gitlab/gitlab_globals.py b/singer_sdk/samples/sample_tap_gitlab/gitlab_globals.py deleted file mode 100644 index 23305e520..000000000 --- a/singer_sdk/samples/sample_tap_gitlab/gitlab_globals.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Global settings and helper functions.""" - -PLUGIN_NAME = "sample-tap-gitlab" diff --git a/singer_sdk/samples/sample_tap_gitlab/gitlab_rest_streams.py b/singer_sdk/samples/sample_tap_gitlab/gitlab_rest_streams.py index 244029e98..06afb38ee 100644 --- a/singer_sdk/samples/sample_tap_gitlab/gitlab_rest_streams.py +++ b/singer_sdk/samples/sample_tap_gitlab/gitlab_rest_streams.py @@ -1,12 +1,11 @@ """Sample tap stream test for tap-gitlab.""" -import copy import requests from pathlib import Path from typing import Any, Dict, List, cast, Optional -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( ArrayType, DateTimeType, IntegerType, @@ -14,7 +13,7 @@ PropertiesList, StringType, ) -from singer_sdk.helpers.state import get_stream_state_dict +from singer_sdk.helpers._state import get_writeable_state_dict from singer_sdk.authenticators import SimpleAuthenticator from singer_sdk.streams.rest import RESTStream @@ -43,11 +42,10 @@ def get_url_params( self, partition: Optional[dict], next_page_token: Optional[Any] = None ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" - state = self.get_stream_or_partition_state(partition) - result = copy.deepcopy(state) - result.update({"start_date": self.config.get("start_date")}) - result["page"] = next_page_token or 1 - return result + return { + "start_date": self.get_starting_timestamp(partition), + "page": next_page_token or 1, + } def get_next_page_token( self, response: requests.Response, previous_token: Optional[Any] = None @@ -56,11 +54,6 @@ def get_next_page_token( next_page_token = response.headers.get("X-Next-Page", None) if next_page_token: self.logger.info(f"Next page token retrieved: {next_page_token}") - if next_page_token and next_page_token == previous_token: - raise RuntimeError( - f"Loop detected in pagination. " - f"Pagination token {next_page_token} is identical to previous run." - ) return next_page_token @@ -162,10 +155,10 @@ class EpicsStream(ProjectBasedStream): # schema_filepath = SCHEMAS_DIR / "epics.json" - def post_process(self, row: dict, stream_or_partition_state: dict) -> dict: + def post_process(self, row: dict, partition: Optional[dict] = None) -> dict: """Perform post processing, including queuing up any child stream types.""" # Ensure child state record(s) are created - _ = get_stream_state_dict( + _ = get_writeable_state_dict( self.tap_state, "epic_issues", partition={ @@ -173,7 +166,7 @@ def post_process(self, row: dict, stream_or_partition_state: dict) -> dict: "epic_id": row["id"], }, ) - return super().post_process(row, stream_or_partition_state) + return super().post_process(row, partition) class EpicIssuesStream(GitlabStream): diff --git a/singer_sdk/samples/sample_tap_gitlab/gitlab_tap.py b/singer_sdk/samples/sample_tap_gitlab/gitlab_tap.py index 04087e3e6..248402220 100644 --- a/singer_sdk/samples/sample_tap_gitlab/gitlab_tap.py +++ b/singer_sdk/samples/sample_tap_gitlab/gitlab_tap.py @@ -1,6 +1,6 @@ """Sample tap test for tap-gitlab.""" -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( ArrayType, DateTimeType, Property, @@ -17,7 +17,6 @@ EpicsStream, # EpicIssuesStream, # Temporarily skipped due to access denied error ) -from singer_sdk.samples.sample_tap_gitlab.gitlab_globals import PLUGIN_NAME STREAM_TYPES = [ @@ -33,13 +32,12 @@ class SampleTapGitlab(Tap): """Sample tap for Gitlab.""" - name: str = PLUGIN_NAME + name: str = "sample-tap-gitlab" config_jsonschema = PropertiesList( Property("auth_token", StringType, required=True), Property("project_ids", ArrayType(StringType), required=True), Property("group_ids", ArrayType(StringType), required=True), Property("start_date", DateTimeType, required=True), - Property("api_url", StringType), ).to_dict() def discover_streams(self) -> List[Stream]: diff --git a/singer_sdk/samples/sample_tap_google_analytics/ga_tap.py b/singer_sdk/samples/sample_tap_google_analytics/ga_tap.py index 870c20394..d3d6612bb 100644 --- a/singer_sdk/samples/sample_tap_google_analytics/ga_tap.py +++ b/singer_sdk/samples/sample_tap_google_analytics/ga_tap.py @@ -5,7 +5,7 @@ from typing import List from singer_sdk.tap_base import Tap -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( PropertiesList, Property, StringType, @@ -16,7 +16,6 @@ SampleGoogleAnalyticsStream, ) -PLUGIN_NAME = "sample-tap-google-analytics" REPORT_DEFS_FILE = ( "singer_sdk/samples/sample_tap_google_analytics/resources/" "default_report_definitions.json" @@ -27,7 +26,7 @@ class SampleTapGoogleAnalytics(Tap): """Sample tap for GoogleAnalytics.""" - name: str = PLUGIN_NAME + name: str = "sample-tap-google-analytics" config_jsonschema = PropertiesList( Property("view_id", StringType(), required=True), Property("client_email", StringType(), required=True), diff --git a/singer_sdk/samples/sample_tap_google_analytics/ga_tap_stream.py b/singer_sdk/samples/sample_tap_google_analytics/ga_tap_stream.py index 6e768633d..5dbbdbd09 100644 --- a/singer_sdk/samples/sample_tap_google_analytics/ga_tap_stream.py +++ b/singer_sdk/samples/sample_tap_google_analytics/ga_tap_stream.py @@ -14,10 +14,12 @@ class GoogleJWTAuthenticator(OAuthJWTAuthenticator): - """Class responsible for Google Auth via JWT and OAuth. + """Class responsible for Google Auth via JWT and OAuth.""" - (Currently this class simply inherits from the base class.) - """ + @property + def client_id(self) -> str: + """Override since Google auth uses email, not numeric client ID.""" + return self.config["client_email"] class SampleGoogleAnalyticsStream(RESTStream): diff --git a/singer_sdk/samples/sample_tap_parquet/parquet_globals.py b/singer_sdk/samples/sample_tap_parquet/parquet_globals.py deleted file mode 100644 index 0c3bddf8f..000000000 --- a/singer_sdk/samples/sample_tap_parquet/parquet_globals.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Global tap config file.""" - -PLUGIN_NAME = "sample-tap-parquet" diff --git a/singer_sdk/samples/sample_tap_parquet/parquet_tap.py b/singer_sdk/samples/sample_tap_parquet/parquet_tap.py index 95676a810..e26a76d1f 100644 --- a/singer_sdk/samples/sample_tap_parquet/parquet_tap.py +++ b/singer_sdk/samples/sample_tap_parquet/parquet_tap.py @@ -6,8 +6,7 @@ from singer_sdk.samples.sample_tap_parquet.parquet_tap_stream import ( SampleTapParquetStream, ) -from singer_sdk.samples.sample_tap_parquet.parquet_globals import PLUGIN_NAME -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( PropertiesList, Property, StringType, @@ -17,7 +16,7 @@ class SampleTapParquet(Tap): """Sample tap for Parquet.""" - name: str = PLUGIN_NAME + name: str = "sample-tap-parquet" config_jsonschema = PropertiesList(Property("filepath", StringType)).to_dict() def discover_streams(self) -> List[Stream]: diff --git a/singer_sdk/samples/sample_tap_parquet/parquet_tap_stream.py b/singer_sdk/samples/sample_tap_parquet/parquet_tap_stream.py index b09d184f8..807e6e65e 100644 --- a/singer_sdk/samples/sample_tap_parquet/parquet_tap_stream.py +++ b/singer_sdk/samples/sample_tap_parquet/parquet_tap_stream.py @@ -6,8 +6,6 @@ from singer_sdk.streams.core import Stream -PLUGIN_NAME = "sample-tap-parquet" - class SampleTapParquetStream(Stream): """Sample tap test for parquet.""" diff --git a/singer_sdk/samples/sample_tap_snowflake/__init__.py b/singer_sdk/samples/sample_tap_snowflake/__init__.py deleted file mode 100644 index e93bf6a59..000000000 --- a/singer_sdk/samples/sample_tap_snowflake/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Module test for tap-snowflake functionality.""" - -from singer_sdk.samples.sample_tap_snowflake.snowflake_tap import SampleTapSnowflake -from singer_sdk.samples.sample_tap_snowflake.snowflake_tap_stream import ( - SampleTapSnowflakeStream, -) - -__all__ = [ - "SampleTapSnowflake", - "SampleTapSnowflakeStream", - "SampleSnowflakeConnection", -] diff --git a/singer_sdk/samples/sample_tap_snowflake/snowflake-catalog.sample.json b/singer_sdk/samples/sample_tap_snowflake/snowflake-catalog.sample.json deleted file mode 100644 index f232dfc68..000000000 --- a/singer_sdk/samples/sample_tap_snowflake/snowflake-catalog.sample.json +++ /dev/null @@ -1,77 +0,0 @@ -{ - "streams": [ - { - "tap_stream_id": "ASampleTable", - "replication_key": "f0", - "replication_method": "INCREMENTAL", - "key_properties": [ - "f0" - ], - "schema": { - "properties": { - "f0": { - "type": [ - "string", - "None" - ] - }, - "f1": { - "type": [ - "string", - "None" - ] - }, - "f2": { - "type": [ - "string", - "None" - ] - } - } - }, - "stream": "ASampleTable", - "metadata": [ - { - "breadcrumb": [], - "metadata": { - "table-key-properties": [ - "f0" - ], - "forced-replication-method": "INCREMENTAL", - "valid-replication-keys": [ - "f0" - ], - "inclusion": "available" - } - }, - { - "breadcrumb": [ - "properties", - "f0" - ], - "metadata": { - "inclusion": "automatic" - } - }, - { - "breadcrumb": [ - "properties", - "f1" - ], - "metadata": { - "inclusion": "available" - } - }, - { - "breadcrumb": [ - "properties", - "f2" - ], - "metadata": { - "inclusion": "available" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/singer_sdk/samples/sample_tap_snowflake/snowflake-config.sample.json b/singer_sdk/samples/sample_tap_snowflake/snowflake-config.sample.json deleted file mode 100644 index 9e26dfeeb..000000000 --- a/singer_sdk/samples/sample_tap_snowflake/snowflake-config.sample.json +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file diff --git a/singer_sdk/samples/sample_tap_snowflake/snowflake_tap.py b/singer_sdk/samples/sample_tap_snowflake/snowflake_tap.py deleted file mode 100644 index 781833658..000000000 --- a/singer_sdk/samples/sample_tap_snowflake/snowflake_tap.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Sample tap test for tap-snowflake.""" - -from typing import List - -from singer_sdk import Tap, Stream -from singer_sdk.helpers.typing import ( - ArrayType, - PropertiesList, - Property, - StringType, -) -from singer_sdk.samples.sample_tap_snowflake.snowflake_tap_stream import ( - SampleTapSnowflakeStream, -) - -PLUGIN_NAME = "sample-tap-snowflake" - - -class SampleTapSnowflake(Tap): - """Sample tap for Snowflake.""" - - name = PLUGIN_NAME - config_jsonschema = PropertiesList( - Property("account", StringType, required=True), - Property("dbname", StringType, required=True), - Property("warehouse", StringType, required=True), - Property("user", StringType, required=True), - Property("password", StringType, required=True), - Property("tables", ArrayType(StringType)), - ).to_dict() - - def discover_streams(self) -> List[Stream]: - """Return a list of discovered streams.""" - return SampleTapSnowflakeStream.from_discovery(tap=self) - - def load_streams(self) -> List[Stream]: - """Load streams, skipping discovery if `input_catalog` is provided.""" - if not self.input_catalog: - return sorted( - self.discover_streams(), - key=lambda x: x.name, - ) - return SampleTapSnowflakeStream.from_input_catalog(tap=self) - - -# CLI Execution: - -cli = SampleTapSnowflake.cli diff --git a/singer_sdk/samples/sample_tap_snowflake/snowflake_tap_stream.py b/singer_sdk/samples/sample_tap_snowflake/snowflake_tap_stream.py deleted file mode 100644 index e8e41401c..000000000 --- a/singer_sdk/samples/sample_tap_snowflake/snowflake_tap_stream.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Sample tap stream test for tap-snowflake.""" - -from typing import Iterable, List, Optional, Union -from snowflake import connector - -from singer_sdk.helpers.classproperty import classproperty -from singer_sdk.streams import DatabaseStream - - -DEFAULT_BATCH_SIZE = 10000 - - -class SampleTapSnowflakeStream(DatabaseStream): - """Sample tap test for snowflake.""" - - @classproperty - # @classmethod - def primary_key_scan_sql(cls) -> Optional[str]: - """Snowflake does not support primary keys. Return empty result.""" - return None - - @classmethod - def execute_query(cls, sql: Union[str, List[str]], config) -> Iterable[dict]: - """Run a query in snowflake.""" - connection = cls.open_connection(config=config) - with connection.cursor(connector.DictCursor) as cur: - queries = [] - if isinstance(sql, list): - # Run every query in one transaction if query is a list of SQL - queries.append("START TRANSACTION") - queries.extend(sql) - else: - queries = [sql] - for sql in queries: - cls.logger.info("Executing synchronous Snowflake query: %s", sql) - cur.execute(sql) - result_batch = cur.fetchmany(DEFAULT_BATCH_SIZE) - while len(result_batch) > 0: - for result in result_batch: - yield result - result_batch = cur.fetchmany(DEFAULT_BATCH_SIZE) - cur.close() - - @classmethod - def open_connection(cls, config) -> connector.SnowflakeConnection: - """Connect to snowflake database.""" - acct = config["account"] - db = config["dbname"] - wh = config["warehouse"] - usr = config["user"] - cls.logger.info( - f"Attempting to connect to Snowflake '{db}' database on " - f"account '{acct}' instance with warehouse '{wh}' and user '{usr}'." - ) - conn = connector.connect( - account=acct, - database=db, - warehouse=wh, - user=usr, - password=config["password"], - insecure_mode=config.get("insecure_mode", False) - # Use insecure mode to avoid "Failed to get OCSP response" warnings - # insecure_mode=True - ) - return conn diff --git a/singer_sdk/streams/__init__.py b/singer_sdk/streams/__init__.py index fc45f38af..c203cef1c 100644 --- a/singer_sdk/streams/__init__.py +++ b/singer_sdk/streams/__init__.py @@ -2,13 +2,11 @@ from singer_sdk.streams.core import Stream from singer_sdk.streams.rest import RESTStream -from singer_sdk.streams.database import DatabaseStream from singer_sdk.streams.graphql import GraphQLStream __all__ = [ "Stream", "RESTStream", - "DatabaseStream", "GraphQLStream", ] diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 3efee3a60..1cd2e5840 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -4,21 +4,9 @@ import datetime import json import logging -from types import MappingProxyType - -import pendulum -from singer import metadata - -from singer_sdk.plugin_base import PluginBase as TapBaseClass -from singer_sdk.helpers.util import get_property_schema, is_boolean_type -from singer_sdk.helpers.state import ( - get_stream_state_dict, - read_stream_state, - wipe_stream_state_keys, -) -from functools import lru_cache from os import PathLike from pathlib import Path +from types import MappingProxyType from typing import ( Dict, Any, @@ -30,17 +18,23 @@ Union, ) -try: - from typing import final -except ImportError: - # Final not available until Python3.8 - final = lambda f: f # noqa: E731 +import pendulum +from singer import metadata + +from singer_sdk.helpers._typing import conform_record_data_types +from singer_sdk.helpers._state import ( + get_writeable_state_dict, + wipe_stream_state_keys, + get_state_partitions_list, +) +from singer_sdk.plugin_base import PluginBase as TapBaseClass +from singer_sdk.helpers._compat import final import singer from singer import RecordMessage, SchemaMessage from singer.catalog import Catalog from singer.schema import Schema -from singer_sdk.helpers.typing import is_datetime_type +from singer_sdk.helpers._typing import is_datetime_type # Replication methods @@ -55,7 +49,6 @@ class Stream(metaclass=abc.ABCMeta): """Abstract base class for tap streams.""" STATE_MSG_FREQUENCY = 10000 # Number of records between state messages - MAX_CONNECT_RETRIES = 0 MAX_RECORDS_LIMIT: Optional[int] = None parent_stream_types: List[Any] = [] # May be used in sync sequencing @@ -87,7 +80,7 @@ def __init__( self._schema = schema elif isinstance(schema, Schema): self._schema = schema.to_dict() - elif schema: + else: raise ValueError( f"Unexpected type {type(schema).__name__} for arg 'schema'." ) @@ -104,19 +97,20 @@ def is_timestamp_replication_key(self) -> bool: type_dict = self.schema.get("properties", {}).get(self.replication_key) return is_datetime_type(type_dict) - def get_starting_datetime( + def get_starting_timestamp( self, partition: Optional[dict] ) -> Optional[datetime.datetime]: """Return `start_date` config, or state if using timestamp replication.""" - result: Optional[datetime.datetime] = None if self.is_timestamp_replication_key: state = self.get_stream_or_partition_state(partition) replication_key = state.get("replication_key") if replication_key and replication_key in state: - result = pendulum.parse(state[replication_key]) - if result is None and "start_date" in self.config: - result = pendulum.parse(self.config.get("start_date")) - return result + return pendulum.parse(state[replication_key]) + + if "start_date" in self.config: + return pendulum.parse(self.config["start_date"]) + + return None @property def schema_filepath(self) -> Optional[Path]: @@ -242,11 +236,11 @@ def stream_state(self) -> dict: A blank state entry will be created if one doesn't already exist. """ - return get_stream_state_dict(self.tap_state, self.name) + return get_writeable_state_dict(self.tap_state, self.name) def get_partition_state(self, partition: dict) -> dict: """Return a writable state dict for the given partition.""" - return get_stream_state_dict(self.tap_state, self.name, partition=partition) + return get_writeable_state_dict(self.tap_state, self.name, partition=partition) # Partitions @@ -254,15 +248,16 @@ def get_partition_state(self, partition: dict) -> dict: def partitions(self) -> Optional[List[dict]]: """Return a list of partition key dicts (if applicable), otherwise None. + By default, this method returns a list of any partitions which are already + defined in state, otherwise None. Developers may override this property to provide a default partitions list. """ - state = read_stream_state(self.tap_state, self.name) - if state is None or "partitions" not in state: - return None result: List[dict] = [] - for partition_state in state["partitions"]: - result.append(partition_state.get("context")) - return result + for partition_state in ( + get_state_partitions_list(self.tap_state, self.name) or [] + ): + result.append(partition_state["context"]) + return result or None # Private bookmarking methods @@ -352,15 +347,17 @@ def _sync_records(self, partition: Optional[dict] = None) -> None: "Stream prematurely aborted due to the stream's max record " f"limit ({self.MAX_RECORDS_LIMIT}) being reached." ) - if rows_sent: - # Flush state messages if applicable - self._write_state_message() # Abort stream sync for this partition or stream break if rows_sent and ((rows_sent - 1) % self.STATE_MSG_FREQUENCY == 0): self._write_state_message() - record = self._conform_record_data_types(row_dict) + record = conform_record_data_types( + stream_name=self.name, + row=row_dict, + schema=self.schema, + logger=self.logger, + ) record_message = RecordMessage( stream=self.name, record=record, @@ -379,60 +376,6 @@ def _sync_records(self, partition: Optional[dict] = None) -> None: ) self._write_state_message() - # Private validation and cleansing methods: - - @lru_cache() - def _warn_unmapped_property(self, property_name: str): - self.logger.warning( - f"Property '{property_name}' was present in the result stream but " - "not found in catalog schema. Ignoring." - ) - - def _conform_record_data_types( # noqa: C901 - self, row: Dict[str, Any] - ) -> RecordMessage: - """Translate values in record dictionary to singer-compatible data types. - - Any property names not found in the schema catalog will be removed, and a - warning will be logged exactly once per unmapped property name. - """ - rec: Dict[str, Any] = {} - for property_name, elem in row.items(): - property_schema = get_property_schema(self.schema or {}, property_name) - if not property_schema: - self._warn_unmapped_property(property_name) - continue - if isinstance(elem, datetime.datetime): - rec[property_name] = elem.isoformat() + "+00:00" - elif isinstance(elem, datetime.date): - rec[property_name] = elem.isoformat() + "T00:00:00+00:00" - elif isinstance(elem, datetime.timedelta): - epoch = datetime.datetime.utcfromtimestamp(0) - timedelta_from_epoch = epoch + elem - rec[property_name] = timedelta_from_epoch.isoformat() + "+00:00" - elif isinstance(elem, datetime.time): - rec[property_name] = str(elem) - elif isinstance(elem, bytes): - # for BIT value, treat 0 as False and anything else as True - bit_representation: bool - if is_boolean_type(property_schema): - bit_representation = elem != b"\x00" - rec[property_name] = bit_representation - else: - rec[property_name] = elem.hex() - elif is_boolean_type(property_schema): - boolean_representation: Optional[bool] - if elem is None: - boolean_representation = None - elif elem == 0: - boolean_representation = False - else: - boolean_representation = True - rec[property_name] = boolean_representation - else: - rec[property_name] = elem - return rec - # Public methods ("final", not recommended to be overridden) @final @@ -462,30 +405,24 @@ def records(self) -> Iterable[dict]: """Return a generator of row-type dictionary objects.""" if self.partitions: for partition in self.partitions: - partition_state = self.get_partition_state(partition) - for row in self.get_records(partition_state): - row = self.post_process(row, partition_state) + for row in self.get_records(partition): + row = self.post_process(row, partition) yield row else: - for row in self.get_records(self.stream_state): - row = self.post_process(row, self.stream_state) + for row in self.get_records(): + row = self.post_process(row) yield row # Abstract Methods @abc.abstractmethod - def get_records(self, partition: Optional[dict]) -> Iterable[Dict[str, Any]]: + def get_records(self, partition: Optional[dict] = None) -> Iterable[Dict[str, Any]]: """Abstract row generator function. Must be overridden by the child class. Each row emitted should be a dictionary of property names to their values. """ pass - def post_process(self, row: dict, stream_or_partition_state: dict) -> dict: - """Transform raw data from HTTP GET into the expected property values.""" + def post_process(self, row: dict, partition: Optional[dict] = None) -> dict: + """As needed, append or transform raw data to match expected structure.""" return row - - @property - def http_headers(self) -> dict: - """Return headers to be used by HTTP requests.""" - return NotImplemented() diff --git a/singer_sdk/streams/database.py b/singer_sdk/streams/database.py deleted file mode 100644 index e6f7251f2..000000000 --- a/singer_sdk/streams/database.py +++ /dev/null @@ -1,334 +0,0 @@ -"""Base class for database-type streams.""" - -import abc -from pathlib import Path -import backoff - -import singer -from singer.schema import Schema -from singer_sdk.helpers.util import ( - get_catalog_entries, - get_catalog_entry_name, - get_catalog_entry_schema, -) -from singer_sdk.helpers.classproperty import classproperty -from singer_sdk.exceptions import TapStreamConnectionFailure -from typing import Any, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, cast - -from singer_sdk.plugin_base import PluginBase as TapBaseClass -from singer_sdk.streams.core import Stream - -FactoryType = TypeVar("FactoryType", bound="DatabaseStream") - - -SINGER_STRING_TYPE = singer.Schema(type=["string", "null"]) -SINGER_FLOAT_TYPE = singer.Schema(type=["double", "null"]) -SINGER_INT_TYPE = singer.Schema(type=["int", "null"]) -SINGER_DECIMAL_TYPE = singer.Schema(type=["decimal", "null"]) -SINGER_DATETIME_TYPE = singer.Schema(type=["string", "null"], format="date-time") -SINGER_BOOLEAN_TYPE = singer.Schema(type=["boolean", "null"]) -SINGER_OBJECT_TYPE = singer.Schema(type=["string", "object", "null"]) -SINGER_TYPE_LOOKUP = { - # NOTE: This is an ordered mapping, with earlier mappings taking precedence. - # If the SQL-provided type contains the type name on the left, the mapping - # will return the respective singer type. - "timestamp": SINGER_DATETIME_TYPE, - "datetime": SINGER_DATETIME_TYPE, - "date": SINGER_DATETIME_TYPE, - "int": SINGER_INT_TYPE, - "number": SINGER_DECIMAL_TYPE, - "decimal": SINGER_DECIMAL_TYPE, - "double": SINGER_FLOAT_TYPE, - "float": SINGER_FLOAT_TYPE, - "string": SINGER_STRING_TYPE, - "text": SINGER_STRING_TYPE, - "char": SINGER_STRING_TYPE, - "bool": SINGER_BOOLEAN_TYPE, - "variant": SINGER_STRING_TYPE, # TODO: Support nested objects. -} - - -class DatabaseStream(Stream, metaclass=abc.ABCMeta): - """Abstract base class for database-type streams. - - This class currently supports databases with 3-part names only. For databases which - use two-part names, further modification to certain methods may be necessary. - """ - - MAX_CONNECT_RETRIES = 5 - THREE_PART_NAMES = True # For backwards compatibility reasons - - DEFAULT_QUOTE_CHAR = '"' - OTHER_QUOTE_CHARS = ['"', "[", "]", "`"] - - def __init__( - self, - tap: TapBaseClass, - schema: Optional[Union[str, Path, Dict[str, Any], Schema]], - name: str, - ): - """Initialize the database stream. - - Parameters - ---------- - tap : TapBaseClass - reference to the parent tap - schema : Optional[Union[str, Path, Dict[str, Any], Schema]] - A schema dict or the path to a valid schema file in json. - name : str - Required. Name of the stream (generally the same as the table name). - - """ - super().__init__(tap=tap, schema=schema, name=name) - self.is_view: Optional[bool] = None - self.row_count: Optional[int] = None - - def get_records(self, partition: Optional[dict]) -> Iterable[Dict[str, Any]]: - """Return a generator of row-type dictionary objects. - - Each row emitted should be a dictionary of property names to their values. - """ - if partition: - raise NotImplementedError( - f"Stream '{self.name}' does not support partitioning." - ) - for row in self.execute_query( - sql=f"SELECT * FROM {self.fully_qualified_name}", config=self.config - ): - yield row - - @property - def fully_qualified_name(self): - """Return the fully qualified name of the table name.""" - return self.tap_stream_id - - @classproperty - # @classmethod - def table_scan_sql(cls) -> str: - """Return a SQL statement for syncable tables. - - Result fields should be in this order: - - db_name - - schema_name - - table_name - """ - return """ - SELECT table_catalog, table_schema, table_name - from information_schema.tables - WHERE UPPER(table_type) not like '%VIEW%' - """ - - @classproperty - # @classmethod - def view_scan_sql(cls) -> str: - """Return a SQL statement for syncable views. - - Result fields should be in this order: - - db_name - - schema_name - - view_name - """ - return """ - SELECT table_catalog, table_schema, table_name - FROM information_schema.views - where upper(table_schema) <> 'INFORMATION_SCHEMA' - """ - - @classproperty - # @classmethod - def column_scan_sql(cls) -> str: - """Return a SQL statement that provides the column names and types. - - Result fields should be in this order: - - db_name - - schema_name - - table_name - - column_name - - column_type - - Optionally, results can be sorted to preserve cardinal ordinaling. - """ - return """ - SELECT table_catalog, table_schema, table_name, column_name, data_type - FROM information_schema.columns - ORDER BY table_catalog, table_schema, table_name, ordinal_position - """ - - @classproperty - # @classmethod - def primary_key_scan_sql(cls) -> Optional[str]: - """Return a SQL statement that provides the list of primary key columns. - - Result fields should be in this order: - - db_name - - schema_name - - table_name - - column_name - """ - return """ - SELECT cols.table_catalog, - cols.table_schema, - cols.table_name, - cols.column_name as key_column - FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS constraint - JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE cols - on cols.constraint_name = constraint.constraint_name - and cols.constraint_schema = constraint.constraint_schema - and cols.constraint_name = constraint.constraint_name - WHERE constraint.constraint_type = 'PRIMARY KEY' - ORDER BY cols.table_schema, - cols.table_name, - cols.ordinal_position; - """ - - @staticmethod - def _create_singer_schema(columns: Dict[str, str]) -> singer.Schema: - """Return a singer 'Schema' object with the specified columns and data types.""" - props: Dict[str, singer.Schema] = {} - for column, sql_type in columns.items(): - props[column] = DatabaseStream.get_singer_type(sql_type) - return singer.Schema(type="object", properties=props) - - @staticmethod - def _get_singer_type(sql_type: str) -> singer.Schema: - """Return a singer type class based on the provided sql-base data type.""" - for matchable in SINGER_TYPE_LOOKUP.keys(): - if matchable.lower() in sql_type.lower(): - return SINGER_TYPE_LOOKUP[matchable] - raise RuntimeError( - f"Could not infer a Singer data type from type '{sql_type}'." - ) - - @classmethod - def scan_and_collate_columns( - cls, config - ) -> Dict[Tuple[str, str, str], Dict[str, str]]: - """Return a mapping of columns and datatypes for each table and view.""" - columns_scan_result = cls.execute_query(config=config, sql=cls.column_scan_sql) - result: Dict[Tuple[str, str, str], Dict[str, str]] = {} - for row_dict in columns_scan_result: - row_dict = cast(dict, row_dict) - catalog, schema_name, table, column, data_type = row_dict.values() - if (catalog, schema_name, table) not in result: - result[(catalog, schema_name, table)] = {} - result[(catalog, schema_name, table)][column] = data_type - return result - - @classmethod - def scan_primary_keys(cls, config) -> Dict[Tuple[str, str, str], List[str]]: - """Return a listing of primary keys for each table and view.""" - result: Dict[Tuple[str, str, str], List[str]] = {} - if not cls.primary_key_scan_sql: - return result - pk_scan_result = cls.execute_query(config=config, sql=cls.primary_key_scan_sql) - for row_dict in pk_scan_result: - row_dict = cast(dict, row_dict) - catalog, schema_name, table, pk_column = row_dict.values() - if (catalog, schema_name, table) not in result: - result[(catalog, schema_name, table)] = [] - result[(catalog, schema_name, table)].append(pk_column) - return result - - @classmethod - def from_discovery(cls, tap: TapBaseClass) -> List[FactoryType]: - """Return a list of all streams (tables).""" - result: List[FactoryType] = [] - config = tap.config - table_scan_result = cls.execute_query(config=config, sql=cls.table_scan_sql) - view_scan_result = cls.execute_query(config=config, sql=cls.view_scan_sql) - all_results = [ - (database, schema_name, table, False) - for database, schema_name, table in table_scan_result.values() - ] + [ - (database, schema_name, table, True) - for database, schema_name, table in view_scan_result.values() - ] - collated_columns = cls.scan_and_collate_columns(config=config) - primary_keys_lookup = cls.scan_primary_keys(config=config) - for database, schema_name, table, is_view in all_results: - name_tuple = (database, schema_name, table) - full_name = ".".join(name_tuple) - columns = collated_columns.get(name_tuple, None) - if not columns: - raise RuntimeError(f"Did not find any columns for table '{full_name}'") - singer_schema: singer.Schema = cls._create_singer_schema(columns) - primary_keys = primary_keys_lookup.get(name_tuple, None) - new_stream = cast( - FactoryType, - cls(tap=tap, schema=singer_schema.to_dict(), name=full_name), - ) - new_stream.primary_keys = primary_keys - new_stream.is_view = is_view - result.append(new_stream) - return result - - @classmethod - def from_input_catalog(cls, tap: TapBaseClass) -> List[FactoryType]: - """Initialize streams from an existing catalog, returning a list of streams.""" - result: List[FactoryType] = [] - catalog = tap.input_catalog - if not catalog: - raise ValueError( - "Could not initialize stream from blank or missing catalog." - ) - for catalog_entry in get_catalog_entries(catalog): - full_name = get_catalog_entry_name(catalog_entry) - new_stream = cast( - FactoryType, - cls( - tap=tap, - name=full_name, - schema=get_catalog_entry_schema(catalog_entry), - ), - ) - result.append(new_stream) - return result - - # @abc.abstractclassmethod - @classmethod - def execute_query(cls, sql: Union[str, List[str]], config) -> Iterable[dict]: - """Run a SQL query and generate a dict for each returned row.""" - pass - - @classmethod - def enquote(cls, identifier: str): - """Escape identifier to be SQL safe.""" - for quotechar in [cls.DEFAULT_QUOTE_CHAR] + cls.OTHER_QUOTE_CHARS: - if quotechar in identifier: - raise Exception( - f"Can't escape identifier `{identifier}` because it contains a " - f"quote character ({quotechar})." - ) - return f"{cls.DEFAULT_QUOTE_CHAR}{identifier.upper()}{cls.DEFAULT_QUOTE_CHAR}" - - @classmethod - def dequote(cls, identifier: str): - """Dequote identifier from quoted version.""" - for quotechar in [cls.DEFAULT_QUOTE_CHAR] + cls.OTHER_QUOTE_CHARS: - if identifier.startswith(quotechar): - return identifier.lstrip(quotechar).rstrip(quotechar) - - @classmethod - def log_backoff_attempt(cls, details): - """Log backoff attempts used by stream retry_pattern().""" - cls.logger.info( - "Error communicating with source, " - f"triggering backoff: {details.get('tries')} try" - ) - - @classmethod - def connect_with_retries(cls) -> Any: - """Run open_stream_connection(), retry automatically a few times if failed.""" - return backoff.on_exception( - backoff.expo, - exception=TapStreamConnectionFailure, - max_tries=cls.MAX_CONNECT_RETRIES, - on_backoff=cls.log_backoff_attempt, - factor=2, - )(cls.open_connection)() - - # @abc.abstractclassmethod - @classmethod - def open_connection(cls, config) -> Any: - """Connect to the database source.""" - pass diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index fb1dade9d..a35d4a563 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -149,9 +149,15 @@ def request_records(self, partition: Optional[dict]) -> Iterable[dict]: resp = self._request_with_backoff(prepared_request) for row in self.parse_response(resp): yield row + previous_token = copy.deepcopy(next_page_token) next_page_token = self.get_next_page_token( - response=resp, previous_token=copy.deepcopy(next_page_token) + response=resp, previous_token=previous_token ) + if next_page_token and next_page_token == previous_token: + raise RuntimeError( + f"Loop detected in pagination. " + f"Pagination token {next_page_token} is identical to prior token." + ) # Cycle until get_next_page_token() no longer returns a value finished = not next_page_token @@ -190,9 +196,8 @@ def get_records(self, partition: Optional[dict]) -> Iterable[Dict[str, Any]]: Each row emitted should be a dictionary of property names to their values. """ - state = self.get_stream_or_partition_state(partition) for row in self.request_records(partition): - row = self.post_process(row, state) + row = self.post_process(row, partition) yield row def parse_response(self, response: requests.Response) -> Iterable[dict]: diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 873993a1c..cf774d765 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -8,8 +8,8 @@ import click from singer.catalog import Catalog -from singer_sdk.helpers.classproperty import classproperty -from singer_sdk.helpers.util import read_json_file +from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers._util import read_json_file from singer_sdk.plugin_base import PluginBase from singer_sdk.streams.core import Stream @@ -186,10 +186,13 @@ def cli( if version: cls.print_version() return + if about: cls.print_about(format) return + cls.print_version(print_fn=cls.logger.info) + parse_env_config = False config_files: List[PurePath] = [] for config_path in config or []: @@ -197,13 +200,16 @@ def cli( # Allow parse from env vars: parse_env_config = True continue + # Validate config file paths before adding to list if not Path(config_path).is_file(): raise FileNotFoundError( f"Could not locate config file at '{config_path}'." "Please check that the file exists." ) + config_files.append(Path(config_path)) + tap = cls( config=config_files or None, state=state, diff --git a/singer_sdk/tests/core/test_contries_sync.py b/singer_sdk/tests/core/test_countries_sync.py similarity index 100% rename from singer_sdk/tests/core/test_contries_sync.py rename to singer_sdk/tests/core/test_countries_sync.py diff --git a/singer_sdk/tests/core/test_jsonschema_helpers.py b/singer_sdk/tests/core/test_jsonschema_helpers.py index 302191f2e..5abb3d877 100644 --- a/singer_sdk/tests/core/test_jsonschema_helpers.py +++ b/singer_sdk/tests/core/test_jsonschema_helpers.py @@ -4,7 +4,7 @@ from singer_sdk.tap_base import Tap from singer_sdk.streams.core import Stream -from singer_sdk.helpers.typing import ( +from singer_sdk.typing import ( ArrayType, ObjectType, StringType, diff --git a/singer_sdk/tests/core/test_plugin_base.py b/singer_sdk/tests/core/test_plugin_base.py index c1bb332a4..df4a5c834 100644 --- a/singer_sdk/tests/core/test_plugin_base.py +++ b/singer_sdk/tests/core/test_plugin_base.py @@ -1,7 +1,7 @@ import os from unittest import mock -from singer_sdk.helpers.typing import IntegerType, PropertiesList, Property, StringType +from singer_sdk.typing import IntegerType, PropertiesList, Property, StringType from singer_sdk.plugin_base import PluginBase diff --git a/singer_sdk/tests/core/test_streams.py b/singer_sdk/tests/core/test_streams.py index 3750a5185..20631f700 100644 --- a/singer_sdk/tests/core/test_streams.py +++ b/singer_sdk/tests/core/test_streams.py @@ -2,7 +2,7 @@ import pytest -from singer_sdk.helpers.typing import IntegerType, PropertiesList, Property, StringType +from singer_sdk.typing import IntegerType, PropertiesList, Property, StringType from singer_sdk.streams.core import ( REPLICATION_FULL_TABLE, REPLICATION_INCREMENTAL, diff --git a/singer_sdk/tests/external/test_generic_tests_external.py b/singer_sdk/tests/external/test_generic_tests_external.py index e4f21cb53..9aa596c53 100644 --- a/singer_sdk/tests/external/test_generic_tests_external.py +++ b/singer_sdk/tests/external/test_generic_tests_external.py @@ -1,4 +1,4 @@ -"""Test the generic tests from `singer_sdk.helpers.testing`.""" +"""Run the generic tests from `singer_sdk.testing`.""" from pathlib import Path diff --git a/singer_sdk/helpers/typing.py b/singer_sdk/typing.py similarity index 76% rename from singer_sdk/helpers/typing.py rename to singer_sdk/typing.py index 73108d436..7979f2b91 100644 --- a/singer_sdk/helpers/typing.py +++ b/singer_sdk/typing.py @@ -1,4 +1,4 @@ -"""Helpers for JSONSchema typing. +"""Classes and functions to streamline JSONSchema typing. Usage example: ---------- @@ -39,53 +39,15 @@ """ -import copy from jsonschema import validators from typing import List, Tuple -from singer_sdk.helpers.classproperty import classproperty +from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers._typing import append_type -def _append_type(type_dict: dict, new_type: str) -> dict: - result = copy.deepcopy(type_dict) - if "anyOf" in result: - if isinstance(result["anyOf"], list) and new_type not in result["anyOf"]: - result["anyOf"].append(new_type) - elif new_type != result["anyOf"]: - result["anyOf"] = [result["anyOf"], new_type] - elif "type" in result: - if isinstance(result["type"], list) and new_type not in result["type"]: - result["type"].append(new_type) - elif new_type != result["type"]: - result["type"] = [result["type"], new_type] - else: - raise ValueError("Could not append type because type was not detected.") - return result - - -def is_datetime_type(type_dict: dict) -> bool: - """Return True if JSON Schema type definition is a 'date-time' type. - - Also returns True if 'date-time' is nested within an 'anyOf' type Array. - """ - if not type_dict: - raise ValueError("Could not detect type from empty type_dict param.") - if "anyOf" in type_dict: - for type_dict in type_dict["anyOf"]: - if is_datetime_type(type_dict): - return True - return False - elif "type" in type_dict: - if type_dict.get("format") == "date-time": - return True - return False - raise ValueError( - f"Could not detect type of replication key using schema '{type_dict}'" - ) - - -def extend_with_default(validator_class): - """Fill in defaults, before validating. +def extend_validator_with_defaults(validator_class): + """Fill in defaults, before validating with the provided JSON Schema Validator. See https://python-jsonschema.readthedocs.io/en/latest/faq/#why-doesn-t-my-schema-s-default-property-set-the-default-on-my-instance # noqa for details. @@ -204,7 +166,7 @@ def to_dict(self) -> dict: """Return a dict mapping the property name to its definition.""" type_dict = self.type_dict if self.optional: - type_dict = _append_type(type_dict, "null") + type_dict = append_type(type_dict, "null") if self.default: type_dict.update({"default": self.default}) return {self.name: type_dict}