diff --git a/singer_sdk/contrib/filesystem/tap.py b/singer_sdk/contrib/filesystem/tap.py index 58ca673d8..ff0c98819 100644 --- a/singer_sdk/contrib/filesystem/tap.py +++ b/singer_sdk/contrib/filesystem/tap.py @@ -5,11 +5,12 @@ import enum import functools import logging -import os import typing as t from pathlib import Path import fsspec +import fsspec.implementations +import fsspec.implementations.dirfs import singer_sdk.typing as th from singer_sdk import Tap @@ -138,6 +139,11 @@ def read_mode(self) -> ReadMode: """Folder read mode.""" return ReadMode(self.config["read_mode"]) + @functools.cached_property + def path(self) -> str: + """Return the path to the directory.""" + return self.config["path"] # type: ignore[no-any-return] + @functools.cached_property def fs(self) -> fsspec.AbstractFileSystem: """Return the filesystem object. @@ -147,13 +153,18 @@ def fs(self) -> fsspec.AbstractFileSystem: """ protocol = self.config["filesystem"] if protocol != "local" and protocol not in self.config: # pragma: no cover - msg = "Filesytem configuration is missing" + msg = "Filesystem configuration is missing" raise ConfigValidationError( msg, errors=[f"Missing configuration for filesystem {protocol}"], ) - logger.info("Instatiating filesystem inteface: '%s'", protocol) - return fsspec.filesystem(protocol, **self.config.get(protocol, {})) + logger.info("Instantiating filesystem interface: '%s'", protocol) + + return fsspec.implementations.dirfs.DirFileSystem( + path=self.path, + target_protocol=protocol, + target_options=self.config.get(protocol), + ) def discover_streams(self) -> list: """Return a list of discovered streams. @@ -162,11 +173,9 @@ def discover_streams(self) -> list: ValueError: If the path does not exist or is not a directory. """ # A directory for now, but could be a glob pattern. - path: str = self.config["path"] - - if not self.fs.exists(path) or not self.fs.isdir(path): # pragma: no cover + if not self.fs.exists(".") or not self.fs.isdir("."): # pragma: no cover # Raise a more specific error if the path is not a directory. - msg = f"Path {path} does not exist or is not a directory" + msg = f"Path {self.path} does not exist or is not a directory" raise ValueError(msg) # One stream per file @@ -174,12 +183,13 @@ def discover_streams(self) -> list: return [ self.default_stream_class( tap=self, - name=file_path_to_stream_name(member), - filepaths=[os.path.join(path, member)], # noqa: PTH118 + name=file_path_to_stream_name(member["name"]), + filepaths=[member["name"]], filesystem=self.fs, ) - for member in os.listdir(path) - if member.endswith(self.valid_extensions) + for member in self.fs.listdir(".") + if member["type"] == "file" + and member["name"].endswith(self.valid_extensions) ] # Merge @@ -188,9 +198,10 @@ def discover_streams(self) -> list: tap=self, name=self.config["stream_name"], filepaths=[ - os.path.join(path, member) # noqa: PTH118 - for member in os.listdir(path) - if member.endswith(self.valid_extensions) + member["name"] + for member in self.fs.listdir(".") + if member["type"] == "file" + and member["name"].endswith(self.valid_extensions) ], filesystem=self.fs, ) diff --git a/singer_sdk/testing/templates.py b/singer_sdk/testing/templates.py index 8a21f639c..716caaf0a 100644 --- a/singer_sdk/testing/templates.py +++ b/singer_sdk/testing/templates.py @@ -103,8 +103,8 @@ def run( Raises: ValueError: if Test instance does not have `name` and `type` properties. """ - if not self.name or not self.plugin_type: - msg = "Test must have 'name' and 'type' properties." + if not self.name or not self.plugin_type: # pragma: no cover + msg = "Test must have 'name' and 'plugin_type' properties." raise ValueError(msg) self.config = config diff --git a/tests/samples/test_tap_csv.py b/tests/samples/test_tap_csv.py index cb16e0e0e..217022f2a 100644 --- a/tests/samples/test_tap_csv.py +++ b/tests/samples/test_tap_csv.py @@ -1,11 +1,15 @@ from __future__ import annotations import datetime +import typing as t import pytest from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV -from singer_sdk.testing import SuiteConfig, get_tap_test_class +from singer_sdk.testing import SuiteConfig, TapTestRunner, get_tap_test_class + +if t.TYPE_CHECKING: + from samples.sample_tap_csv.client import CSVStream _TestCSVMerge = get_tap_test_class( tap_class=SampleTapCSV, @@ -44,7 +48,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): "customers": { "partitions": [ { - "context": {"_sdc_path": "fixtures/csv/customers.csv"}, + "context": {"_sdc_path": "./customers.csv"}, "replication_key": "_sdc_modified_at", "replication_key_value": FUTURE.isoformat(), } @@ -53,7 +57,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): "employees": { "partitions": [ { - "context": {"_sdc_path": "fixtures/csv/employees.csv"}, + "context": {"_sdc_path": "./employees.csv"}, "replication_key": "_sdc_modified_at", "replication_key_value": FUTURE.isoformat(), } @@ -76,10 +80,27 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile): class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental): - @pytest.mark.xfail(reason="No records are extracted", strict=True) - def test_tap_stream_transformed_catalog_schema_matches_record(self, stream: str): - super().test_tap_stream_transformed_catalog_schema_matches_record(stream) - - @pytest.mark.xfail(reason="No records are extracted", strict=True) - def test_tap_stream_returns_record(self, stream: str): - super().test_tap_stream_returns_record(stream) + def test_tap_stream_transformed_catalog_schema_matches_record( + self, + config: SuiteConfig, + resource: t.Any, + runner: TapTestRunner, + stream: CSVStream, + ): + with pytest.warns(UserWarning): + super().test_tap_stream_transformed_catalog_schema_matches_record( + config, + resource, + runner, + stream, + ) + + def test_tap_stream_returns_record( + self, + config: SuiteConfig, + resource: t.Any, + runner: TapTestRunner, + stream: CSVStream, + ): + with pytest.warns(UserWarning): + super().test_tap_stream_returns_record(config, resource, runner, stream)