Skip to content

Commit 7fd95e3

Browse files
fix: Use FS-specific listdir in folder tap (#2785)
* fix: Use FS-specific `listdir` in folder tap * Use details * Fix tests * Use `DirFileSystem` wrapper * Make mypy happy * Update singer_sdk/testing/templates.py
1 parent 3ad4615 commit 7fd95e3

File tree

3 files changed

+59
-27
lines changed

3 files changed

+59
-27
lines changed

singer_sdk/contrib/filesystem/tap.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
import enum
66
import functools
77
import logging
8-
import os
98
import typing as t
109
from pathlib import Path
1110

1211
import fsspec
12+
import fsspec.implementations
13+
import fsspec.implementations.dirfs
1314

1415
import singer_sdk.typing as th
1516
from singer_sdk import Tap
@@ -138,6 +139,11 @@ def read_mode(self) -> ReadMode:
138139
"""Folder read mode."""
139140
return ReadMode(self.config["read_mode"])
140141

142+
@functools.cached_property
143+
def path(self) -> str:
144+
"""Return the path to the directory."""
145+
return self.config["path"] # type: ignore[no-any-return]
146+
141147
@functools.cached_property
142148
def fs(self) -> fsspec.AbstractFileSystem:
143149
"""Return the filesystem object.
@@ -147,13 +153,18 @@ def fs(self) -> fsspec.AbstractFileSystem:
147153
"""
148154
protocol = self.config["filesystem"]
149155
if protocol != "local" and protocol not in self.config: # pragma: no cover
150-
msg = "Filesytem configuration is missing"
156+
msg = "Filesystem configuration is missing"
151157
raise ConfigValidationError(
152158
msg,
153159
errors=[f"Missing configuration for filesystem {protocol}"],
154160
)
155-
logger.info("Instatiating filesystem inteface: '%s'", protocol)
156-
return fsspec.filesystem(protocol, **self.config.get(protocol, {}))
161+
logger.info("Instantiating filesystem interface: '%s'", protocol)
162+
163+
return fsspec.implementations.dirfs.DirFileSystem(
164+
path=self.path,
165+
target_protocol=protocol,
166+
target_options=self.config.get(protocol),
167+
)
157168

158169
def discover_streams(self) -> list:
159170
"""Return a list of discovered streams.
@@ -162,24 +173,23 @@ def discover_streams(self) -> list:
162173
ValueError: If the path does not exist or is not a directory.
163174
"""
164175
# A directory for now, but could be a glob pattern.
165-
path: str = self.config["path"]
166-
167-
if not self.fs.exists(path) or not self.fs.isdir(path): # pragma: no cover
176+
if not self.fs.exists(".") or not self.fs.isdir("."): # pragma: no cover
168177
# Raise a more specific error if the path is not a directory.
169-
msg = f"Path {path} does not exist or is not a directory"
178+
msg = f"Path {self.path} does not exist or is not a directory"
170179
raise ValueError(msg)
171180

172181
# One stream per file
173182
if self.read_mode == ReadMode.one_stream_per_file:
174183
return [
175184
self.default_stream_class(
176185
tap=self,
177-
name=file_path_to_stream_name(member),
178-
filepaths=[os.path.join(path, member)], # noqa: PTH118
186+
name=file_path_to_stream_name(member["name"]),
187+
filepaths=[member["name"]],
179188
filesystem=self.fs,
180189
)
181-
for member in os.listdir(path)
182-
if member.endswith(self.valid_extensions)
190+
for member in self.fs.listdir(".")
191+
if member["type"] == "file"
192+
and member["name"].endswith(self.valid_extensions)
183193
]
184194

185195
# Merge
@@ -188,9 +198,10 @@ def discover_streams(self) -> list:
188198
tap=self,
189199
name=self.config["stream_name"],
190200
filepaths=[
191-
os.path.join(path, member) # noqa: PTH118
192-
for member in os.listdir(path)
193-
if member.endswith(self.valid_extensions)
201+
member["name"]
202+
for member in self.fs.listdir(".")
203+
if member["type"] == "file"
204+
and member["name"].endswith(self.valid_extensions)
194205
],
195206
filesystem=self.fs,
196207
)

singer_sdk/testing/templates.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ def run(
103103
Raises:
104104
ValueError: if Test instance does not have `name` and `type` properties.
105105
"""
106-
if not self.name or not self.plugin_type:
107-
msg = "Test must have 'name' and 'type' properties."
106+
if not self.name or not self.plugin_type: # pragma: no cover
107+
msg = "Test must have 'name' and 'plugin_type' properties."
108108
raise ValueError(msg)
109109

110110
self.config = config

tests/samples/test_tap_csv.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
from __future__ import annotations
22

33
import datetime
4+
import typing as t
45

56
import pytest
67

78
from samples.sample_tap_csv.sample_tap_csv import SampleTapCSV
8-
from singer_sdk.testing import SuiteConfig, get_tap_test_class
9+
from singer_sdk.testing import SuiteConfig, TapTestRunner, get_tap_test_class
10+
11+
if t.TYPE_CHECKING:
12+
from samples.sample_tap_csv.client import CSVStream
913

1014
_TestCSVMerge = get_tap_test_class(
1115
tap_class=SampleTapCSV,
@@ -44,7 +48,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
4448
"customers": {
4549
"partitions": [
4650
{
47-
"context": {"_sdc_path": "fixtures/csv/customers.csv"},
51+
"context": {"_sdc_path": "./customers.csv"},
4852
"replication_key": "_sdc_modified_at",
4953
"replication_key_value": FUTURE.isoformat(),
5054
}
@@ -53,7 +57,7 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
5357
"employees": {
5458
"partitions": [
5559
{
56-
"context": {"_sdc_path": "fixtures/csv/employees.csv"},
60+
"context": {"_sdc_path": "./employees.csv"},
5761
"replication_key": "_sdc_modified_at",
5862
"replication_key_value": FUTURE.isoformat(),
5963
}
@@ -76,10 +80,27 @@ class TestCSVOneStreamPerFile(_TestCSVOneStreamPerFile):
7680

7781

7882
class TestCSVOneStreamPerFileIncremental(_TestCSVOneStreamPerFileIncremental):
79-
@pytest.mark.xfail(reason="No records are extracted", strict=True)
80-
def test_tap_stream_transformed_catalog_schema_matches_record(self, stream: str):
81-
super().test_tap_stream_transformed_catalog_schema_matches_record(stream)
82-
83-
@pytest.mark.xfail(reason="No records are extracted", strict=True)
84-
def test_tap_stream_returns_record(self, stream: str):
85-
super().test_tap_stream_returns_record(stream)
83+
def test_tap_stream_transformed_catalog_schema_matches_record(
84+
self,
85+
config: SuiteConfig,
86+
resource: t.Any,
87+
runner: TapTestRunner,
88+
stream: CSVStream,
89+
):
90+
with pytest.warns(UserWarning):
91+
super().test_tap_stream_transformed_catalog_schema_matches_record(
92+
config,
93+
resource,
94+
runner,
95+
stream,
96+
)
97+
98+
def test_tap_stream_returns_record(
99+
self,
100+
config: SuiteConfig,
101+
resource: t.Any,
102+
runner: TapTestRunner,
103+
stream: CSVStream,
104+
):
105+
with pytest.warns(UserWarning):
106+
super().test_tap_stream_returns_record(config, resource, runner, stream)

0 commit comments

Comments
 (0)