Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Support filesystem interfaces
Browse files Browse the repository at this point in the history
edgarrmondragon committed Aug 24, 2024
1 parent 2675cec commit 003a557
Showing 6 changed files with 583 additions and 390 deletions.
56 changes: 56 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -18,7 +18,62 @@ plugins:
keys:
- col1
add_metadata_columns: false
settings_group_validation:
- [ftp.host]
- [github.org, github.repo]
settings:
- name: filesystem
kind: options
options:
- label: Local Filesystem
value: local
- label: FTP
value: ftp
- label: GitHub
value: github

# FTP settings
- name: ftp.host
label: FTP Host
description: Hostname of the FTP server
kind: string
- name: ftp.port
label: FTP Port
description: Port of the FTP server
kind: integer
- name: ftp.username
label: FTP Username
description: Username for the FTP server
kind: string
- name: ftp.password
label: FTP Password
description: Password for the FTP server
kind: string
sensitive: true
- name: ftp.encoding
label: FTP Encoding
description: Encoding for the FTP server
kind: string

# GitHub settings
- name: github.org
label: GitHub Organization
description: Organization name on GitHub
kind: string
- name: github.repo
label: GitHub Repository
description: Repository name on GitHub
kind: string
- name: github.username
label: GitHub Username
description: Username for GitHub
kind: string
- name: github.token
label: GitHub Token
description: Token for GitHub
kind: string
sensitive: true

- name: files
description: Array of objects containing keys - `entity`, `path`, `keys`, `encoding` (Optional), `delimiter` (Optional), `doublequote` (Optional), `escapechar` (Optional), `quotechar` (Optional), `skipinitialspace` (Optional), `strict` (Optional)
kind: array
@@ -30,6 +85,7 @@ plugins:
- name: add_metadata_columns
description: When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to output.
kind: boolean

loaders:
- name: target-jsonl
variant: andyh1203
840 changes: 460 additions & 380 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -11,7 +11,9 @@ license = "Apache-2.0"

[tool.poetry.dependencies]
python = ">=3.8"
fsspec = "~=2024.6.1"
singer-sdk = "~=0.39.0"
universal-pathlib = "~=0.2.3"

[tool.poetry.group.dev.dependencies]
coverage = ">=7.2"
28 changes: 19 additions & 9 deletions tap_csv/client.py
Original file line number Diff line number Diff line change
@@ -5,10 +5,11 @@
import csv
import os
import typing as t
from datetime import datetime, timezone

import fsspec
from singer_sdk import typing as th
from singer_sdk.streams import Stream
from upath import UPath

if t.TYPE_CHECKING:
from singer_sdk.helpers.types import Context
@@ -24,10 +25,11 @@ class CSVStream(Stream):
file_paths: list[str] = [] # noqa: RUF012
header: list[str] = [] # noqa: RUF012

def __init__(self, *args, **kwargs):
def __init__(self, filesystem: str, *args, options: dict[str, t.Any], **kwargs):
"""Init CSVStram."""
# cache file_config so we dont need to go iterating the config list again later
self.file_config = kwargs.pop("file_config")
self.fs = fsspec.filesystem(filesystem, **options)
super().__init__(*args, **kwargs)

def get_records(self, context: Context | None) -> t.Iterable[dict]:
@@ -38,9 +40,15 @@ def get_records(self, context: Context | None) -> t.Iterable[dict]:
require partitioning and should ignore the `context` argument.
"""
for file_path in self.get_file_paths():
file_last_modified = datetime.fromtimestamp(
os.path.getmtime(file_path), timezone.utc
)
self.logger.info("Reading file at %s", file_path)
try:
file_last_modified = self.fs.modified(file_path)
except NotImplementedError:
self.logger.warning(
"Filesystem implementation for %s does not support modified time, skipping",
self.fs.protocol,
)
file_last_modified = None

file_lineno = -1

@@ -58,8 +66,9 @@ def get_records(self, context: Context | None) -> t.Iterable[dict]:
def _get_recursive_file_paths(self, file_path: str) -> list:
file_paths = []

for dirpath, _, filenames in os.walk(file_path):
for dirpath, _, filenames in self.fs.walk(file_path):
for filename in filenames:
file_path = UPath(dirpath) / filename
file_path = os.path.join(dirpath, filename)
if self.is_valid_filename(file_path):
file_paths.append(file_path)
@@ -77,13 +86,14 @@ def get_file_paths(self) -> list:
return self.file_paths

file_path = self.file_config["path"]
if not os.path.exists(file_path):
if not self.fs.exists(file_path):
raise Exception(f"File path does not exist {file_path}")

file_paths = []
if os.path.isdir(file_path):
if self.fs.isdir(file_path):
clean_file_path = os.path.normpath(file_path) + os.sep
file_paths = self._get_recursive_file_paths(clean_file_path)

elif self.is_valid_filename(file_path):
file_paths.append(file_path)

@@ -118,7 +128,7 @@ def get_rows(self, file_path: str) -> t.Iterable[list]:
skipinitialspace=self.file_config.get("skipinitialspace", False),
strict=self.file_config.get("strict", False),
)
with open(file_path, encoding=encoding) as f:
with self.fs.open(file_path, mode="r", encoding=encoding) as f:
yield from csv.reader(f, dialect="tap_dialect")

@property
45 changes: 44 additions & 1 deletion tap_csv/tap.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

from singer_sdk import Stream, Tap
from singer_sdk import typing as th # JSON schema typing helpers
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers.capabilities import TapCapabilities

@@ -20,6 +21,37 @@ class TapCSV(Tap):
name = "tap-csv"

config_jsonschema = th.PropertiesList(
th.Property(
"filesystem",
th.StringType,
required=False,
default="local",
description="The filesystem to use for reading files.",
allowed_values=[
"local",
"ftp",
"github",
],
),
th.Property(
"ftp",
th.ObjectType(
th.Property("host", th.StringType, required=True),
th.Property("port", th.IntegerType, default=21),
th.Property("username", th.StringType),
th.Property("password", th.StringType, secret=True),
th.Property("encoding", th.StringType, default="utf-8"),
),
),
th.Property(
"github",
th.ObjectType(
th.Property("org", th.StringType, required=True),
th.Property("repo", th.StringType, required=True),
th.Property("username", th.StringType, required=False),
th.Property("token", th.StringType, required=False, secret=True),
),
),
th.Property(
"files",
th.ArrayType(
@@ -59,7 +91,7 @@ class TapCSV(Tap):

@classproperty
def capabilities(self) -> list[TapCapabilities]:
"""Get tap capabilites."""
"""Get tap capabilities."""
return [
TapCapabilities.CATALOG,
TapCapabilities.DISCOVER,
@@ -87,11 +119,22 @@ def get_file_configs(self) -> list[dict]:

def discover_streams(self) -> list[Stream]:
"""Return a list of discovered streams."""
filesystem = self.config["filesystem"]

if filesystem != "local" and filesystem not in self.config:
error_message = f"Missing filesystem options for {filesystem}"
raise ConfigValidationError(
"Misconfigured filesystem",
errors=[error_message],
)

return [
CSVStream(
tap=self,
name=file_config.get("entity"),
file_config=file_config,
filesystem=filesystem,
options=self.config.get(filesystem, {}),
)
for file_config in self.get_file_configs()
]
2 changes: 2 additions & 0 deletions tap_csv/tests/test_client.py
Original file line number Diff line number Diff line change
@@ -25,6 +25,8 @@ def test_get_file_paths_recursively():
tap=TapCSV(config=SAMPLE_CONFIG, catalog={}, state={}),
name="test_recursive",
file_config=SAMPLE_CONFIG.get("files")[0],
filesystem="local",
options={},
)
assert stream.get_file_paths() == [
f"{test_data_dir}/data/subfolder1/alphabet.csv",

0 comments on commit 003a557

Please sign in to comment.