diff --git a/frictionless/formats/__init__.py b/frictionless/formats/__init__.py index 75d4902773..64bcd7082d 100644 --- a/frictionless/formats/__init__.py +++ b/frictionless/formats/__init__.py @@ -10,6 +10,7 @@ from .ods import * from .pandas import * from .parquet import * +from .polars import * from .qsv import * from .spss import * from .sql import * diff --git a/frictionless/formats/pandas/plugin.py b/frictionless/formats/pandas/plugin.py index e933eb180e..267dd53f03 100644 --- a/frictionless/formats/pandas/plugin.py +++ b/frictionless/formats/pandas/plugin.py @@ -28,7 +28,7 @@ def create_parser(self, resource: Resource): def detect_resource(self, resource: Resource): if resource.data is not None: - if helpers.is_type(resource.data, "DataFrame"): + if helpers.is_type(resource.data, "pandas.core.frame.DataFrame"): resource.format = resource.format or "pandas" if resource.format == "pandas": if resource.data is None: diff --git a/frictionless/formats/polars/__init__.py b/frictionless/formats/polars/__init__.py new file mode 100644 index 0000000000..10a49142c7 --- /dev/null +++ b/frictionless/formats/polars/__init__.py @@ -0,0 +1,3 @@ +from .control import PolarsControl as PolarsControl +from .parser import PolarsParser as PolarsParser +from .plugin import PolarsPlugin as PolarsPlugin diff --git a/frictionless/formats/polars/__spec__/test_parser.py b/frictionless/formats/polars/__spec__/test_parser.py new file mode 100644 index 0000000000..8aef010438 --- /dev/null +++ b/frictionless/formats/polars/__spec__/test_parser.py @@ -0,0 +1,205 @@ +from datetime import date, datetime, time +from decimal import Decimal + +import isodate +import polars as pl +import pytest +import pytz +from dateutil.tz import tzutc + +from frictionless import Package +from frictionless.resources import TableResource + +# Read + + +def test_polars_parser(): + dataframe = pl.DataFrame(data={"id": [1, 2], "name": ["english", "中国人"]}) + with TableResource(data=dataframe) as resource: + assert resource.header == ["id", "name"] + assert resource.read_rows() == [ + {"id": 1, "name": "english"}, + {"id": 2, "name": "中国人"}, + ] + + +def test_polars_parser_from_dataframe_with_datetime(): + # Polars does not have the concept of an index! + df = pl.read_csv("data/vix.csv", separator=";", try_parse_dates=True) # type: ignore + with TableResource(data=df) as resource: + # Assert meta + assert resource.schema.to_descriptor() == { + "fields": [ + {"name": "Date", "type": "datetime"}, + {"name": "VIXClose", "type": "number"}, + {"name": "VIXHigh", "type": "number"}, + {"name": "VIXLow", "type": "number"}, + {"name": "VIXOpen", "type": "number"}, + ] + } + rows = resource.read_rows() + # Assert rows + assert rows == [ + { + "Date": datetime(2004, 1, 5, tzinfo=pytz.utc), + "VIXClose": Decimal("17.49"), + "VIXHigh": Decimal("18.49"), + "VIXLow": Decimal("17.44"), + "VIXOpen": Decimal("18.45"), + }, + { + "Date": datetime(2004, 1, 6, tzinfo=pytz.utc), + "VIXClose": Decimal("16.73"), + "VIXHigh": Decimal("17.67"), + "VIXLow": Decimal("16.19"), + "VIXOpen": Decimal("17.66"), + }, + ] + + +# Write + + +def test_polars_parser_write(): + source = TableResource(path="data/table.csv") + target = source.write(format="polars") + assert target.data.to_dicts() == [ # type: ignore + {"id": 1, "name": "english"}, + {"id": 2, "name": "中国人"}, + ] + + +def test_polars_parser_nan_in_integer_resource_column(): + # see issue 1109 + res = TableResource( + data=[ + ["int", "number", "string"], + ["1", "2.3", "string"], + ["", "4.3", "string"], + ["3", "3.14", "string"], + ] + ) + df = res.to_polars() + assert df.dtypes == [pl.Int64, pl.Float64, pl.String] # type: ignore + + +def test_polars_parser_nan_in_integer_csv_column(): + res = TableResource(path="data/issue-1109.csv") + df = res.to_polars() + assert df.dtypes == [pl.Int64, pl.Float64, pl.String] # type: ignore + + +def test_polars_parser_write_types(): + source = Package("data/storage/types.json").get_table_resource("types") + target = source.write(format="polars") + with target: + # Assert schema + assert target.schema.to_descriptor() == { + "fields": [ + {"name": "any", "type": "string"}, # type fallback + {"name": "array", "type": "array"}, + {"name": "boolean", "type": "boolean"}, + {"name": "date", "type": "date"}, + {"name": "date_year", "type": "date"}, + {"name": "datetime", "type": "datetime"}, + {"name": "duration", "type": "duration"}, + {"name": "geojson", "type": "string"}, # type fallback + {"name": "geopoint", "type": "array"}, + {"name": "integer", "type": "integer"}, + {"name": "number", "type": "number"}, + {"name": "object", "type": "string"}, # type fallback + {"name": "string", "type": "string"}, + {"name": "time", "type": "time"}, + {"name": "year", "type": "integer"}, # type downgrade + {"name": "yearmonth", "type": "array"}, # type downgrade + ], + } + + # Assert rows + assert target.read_rows() == [ + { + "any": "中国人", + "array": ["Mike", "John"], + "boolean": True, + "date": date(2015, 1, 1), + "date_year": date(2015, 1, 1), + "datetime": datetime(2015, 1, 1, 3, 0), + "duration": isodate.parse_duration("P1Y1M"), + "geojson": "{'type': 'Point', 'coordinates': [33, 33.33]}", + "geopoint": [30.0, 70.0], + "integer": 1, + "number": 7.0, + "object": "{'chars': 560}", + "string": "english", + "time": time(3, 0), + "year": 2015, + "yearmonth": [2015, 1], + }, + ] + + +def test_polars_write_constraints(): + source = Package("data/storage/constraints.json").get_table_resource("constraints") + target = source.write(format="polars") + with target: + # Assert schema + assert target.schema.to_descriptor() == { + "fields": [ + {"name": "required", "type": "string"}, # constraint removal + {"name": "minLength", "type": "string"}, # constraint removal + {"name": "maxLength", "type": "string"}, # constraint removal + {"name": "pattern", "type": "string"}, # constraint removal + {"name": "enum", "type": "string"}, # constraint removal + {"name": "minimum", "type": "integer"}, # constraint removal + {"name": "maximum", "type": "integer"}, # constraint removal + ], + } + + # Assert rows + assert target.read_rows() == [ + { + "required": "passing", + "minLength": "passing", + "maxLength": "passing", + "pattern": "passing", + "enum": "passing", + "minimum": 5, + "maximum": 5, + }, + ] + + +# This Test fails because polars does not allow mixing tz aware time with naive time. +@pytest.mark.skip +def test_polars_parser_write_timezone(): + source = TableResource(path="data/timezone.csv") + target = source.write(format="polars") + with target: + # Assert schema + assert target.schema.to_descriptor() == { + "fields": [ + {"name": "datetime", "type": "datetime"}, + {"name": "time", "type": "time"}, + ], + } + # Polars disallows comparing naive tzs with explicit tzs + # https://github.com/pola-rs/polars/pull/12966#pullrequestreview-1785291945 + # Assert rows + assert target.read_rows() == [ + { + "datetime": datetime(2020, 1, 1, 15, 0, tzinfo=tzutc()), + "time": time(15), + }, + { + "datetime": datetime(2020, 1, 1, 15, 0, tzinfo=tzutc()), + "time": time(15), + }, + { + "datetime": datetime(2020, 1, 1, 12, 0, tzinfo=tzutc()), + "time": time(12), + }, + { + "datetime": datetime(2020, 1, 1, 18, 0, tzinfo=tzutc()), + "time": time(18), + }, + ] diff --git a/frictionless/formats/polars/control.py b/frictionless/formats/polars/control.py new file mode 100644 index 0000000000..51d6a761a7 --- /dev/null +++ b/frictionless/formats/polars/control.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +import attrs + +from ...dialect import Control + + +@attrs.define(kw_only=True, repr=False) +class PolarsControl(Control): + """Polars dialect representation""" + + type = "polars" diff --git a/frictionless/formats/polars/parser.py b/frictionless/formats/polars/parser.py new file mode 100644 index 0000000000..515bfc8da3 --- /dev/null +++ b/frictionless/formats/polars/parser.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import datetime +import decimal +from typing import TYPE_CHECKING, Any, List, Optional, Tuple + +from ... import types +from ...platform import platform +from ...schema import Field, Schema +from ...system import Parser + +if TYPE_CHECKING: + from ...resources import TableResource + + +class PolarsParser(Parser): + """Polars parser implementation.""" + + supported_types = [ + "array", + "boolean", + "datetime", + "date", + "duration", + "integer", + "number", + "object", + "string", + "time", + ] + + # Read + + def read_cell_stream_create(self): + pl = platform.polars + assert isinstance(self.resource.data, pl.DataFrame) + dataframe = self.resource.data + + # Schema + schema = self.__read_convert_schema() + if not self.resource.schema: + self.resource.schema = schema + + # Lists + yield schema.field_names + for row in dataframe.iter_rows(): # type: ignore + cells: List[Any] = [v if v is not pl.Null else None for v in row] + yield cells + + def __read_convert_schema(self): + pl = platform.polars + dataframe = self.resource.data + schema = Schema() + + # Fields + for name, dtype in dataframe.schema.items(): # type: ignore + sample = dataframe.select(pl.first(name)).item() if len(dataframe) else None # type: ignore + type = self.__read_convert_type(dtype, sample=sample) # type: ignore + field = Field.from_descriptor({"name": name, "type": type}) + schema.add_field(field) + + # Return schema + return schema + + def __read_convert_type(self, _: Any, sample: Optional[types.ISample] = None): + pl = platform.polars + # Python types + if sample is not None: + if isinstance(sample, bool): # type: ignore + return "boolean" + elif isinstance(sample, int): # type: ignore + return "integer" + elif isinstance(sample, float): # type: ignore + return "number" + if isinstance(sample, (list, tuple, pl.Series)): # type: ignore + return "array" + elif isinstance(sample, datetime.datetime): + return "datetime" + elif isinstance(sample, datetime.date): + return "date" + elif isinstance(sample, platform.isodate.Duration): # type: ignore + return "duration" + elif isinstance(sample, dict): + return "object" + elif isinstance(sample, str): + return "string" + elif isinstance(sample, datetime.time): + return "time" + + # Default + return "string" + + # Write + + def write_row_stream(self, source: TableResource): + pl = platform.polars + data_rows: List[Tuple[Any]] = [] + fixed_types = {} + with source: + for row in source.row_stream: + data_values: List[Any] = [] + for field in source.schema.fields: + value = row[field.name] + if isinstance(value, dict): + value = str(value) # type: ignore + if isinstance(value, decimal.Decimal): + value = float(value) + if isinstance(value, datetime.datetime) and value.tzinfo: + # Polars will only allow one timezone per column + # https://docs.pola.rs/user-guide/transformations/time-series/timezones/ + value = value.astimezone(datetime.timezone.utc) + + if isinstance(value, datetime.time) and (os := value.utcoffset()): + # offset information is lost in polars we have to resolve time. + value = ( + datetime.datetime.combine(datetime.date.today(), value) - os + ).time() + if value is None and field.type in ("number", "integer"): + fixed_types[field.name] = "number" + value = None + data_values.append(value) + data_rows.append(tuple(data_values)) + # Create dtypes/columns + columns: List[str] = [] + for field in source.schema.fields: + columns.append(field.name) + + # Create/set dataframe + dataframe = pl.DataFrame(data_rows, orient="row", schema=columns) + + for field in source.schema.fields: + if ( + field.type == "integer" + and field.name in dataframe.columns + and str(dataframe.select(field.name).dtypes[0]) != "int" + ): + dataframe = dataframe.with_columns(pl.col(field.name).cast(int)) + + self.resource.data = dataframe diff --git a/frictionless/formats/polars/plugin.py b/frictionless/formats/polars/plugin.py new file mode 100644 index 0000000000..14885f4f31 --- /dev/null +++ b/frictionless/formats/polars/plugin.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional + +from ... import helpers +from ...platform import platform +from ...system import Plugin +from .control import PolarsControl +from .parser import PolarsParser + +if TYPE_CHECKING: + from ...resource import Resource + + +# NOTE: +# We need to ensure that the way we detect pandas dataframe is good enough. +# We don't want to be importing pandas and checking the type without a good reason + + +class PolarsPlugin(Plugin): + """Plugin for Polars""" + + # Hooks + + def create_parser(self, resource: Resource): + if resource.format == "polars": + return PolarsParser(resource) + + def detect_resource(self, resource: Resource): + if resource.data is not None: + if helpers.is_type(resource.data, "polars.dataframe.frame.DataFrame"): + resource.format = resource.format or "polars" + if resource.format == "polars": + if resource.data is None: + resource.data = platform.polars.DataFrame() + resource.datatype = resource.datatype or "table" + resource.mediatype = resource.mediatype or "application/polars" + + def select_control_class(self, type: Optional[str] = None): + if type == "polars": + return PolarsControl diff --git a/frictionless/helpers/general.py b/frictionless/helpers/general.py index 105e992c31..0ec1fad307 100644 --- a/frictionless/helpers/general.py +++ b/frictionless/helpers/general.py @@ -266,7 +266,7 @@ def is_zip_descriptor(descriptor: Union[str, Dict[str, Any]]): def is_type(object: type, name: str): - return type(object).__name__ == name + return type(object).__module__ + "." + type(object).__name__ == name def parse_json_string(string: Optional[str]): diff --git a/frictionless/platform.py b/frictionless/platform.py index b88584d99d..c5fa31930d 100644 --- a/frictionless/platform.py +++ b/frictionless/platform.py @@ -325,6 +325,13 @@ def pandas(self): return pandas + @cached_property + @extras(name="polars") + def polars(self): + import polars # type: ignore + + return polars + @cached_property @extras(name="pandas") def pandas_core_dtypes_api(self): diff --git a/frictionless/resources/table.py b/frictionless/resources/table.py index 5891ace0b9..c3073e5c59 100644 --- a/frictionless/resources/table.py +++ b/frictionless/resources/table.py @@ -655,6 +655,12 @@ def to_pandas(self, *, dialect: Optional[Dialect] = None): target = self.write(Resource(format="pandas", dialect=dialect)) # type: ignore return target.data + def to_polars(self, *, dialect: Optional[Dialect] = None): + """Helper to export resource as an Polars dataframe""" + dialect = dialect or Dialect() + target = self.write(Resource(format="polars", dialect=dialect)) # type: ignore + return target.data + def to_snap(self, *, json: bool = False): """Create a snapshot from the resource diff --git a/pyproject.toml b/pyproject.toml index 82b131cbcd..c8546f25ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -90,6 +90,7 @@ html = ["pyquery>=1.4"] mysql = ["sqlalchemy>=1.4", "pymysql>=1.0"] ods = ["ezodf>=0.3", "lxml>=4.0"] pandas = ["pyarrow>=14.0", "pandas>=1.0"] +polars = ["polars>=0.20"] parquet = ["fastparquet>=0.8"] postgresql = ["sqlalchemy>=1.4", "psycopg>=3.0", "psycopg2>=2.9"] spss = ["savReaderWriter>=3.0"] @@ -107,7 +108,7 @@ path = "frictionless/settings.py" [tool.hatch.envs.default] python = "3.10" dependencies = [ - "frictionless[aws,bigquery,ckan,csv,dev,duckdb,excel,json,github,gsheets,html,mysql,ods,pandas,parquet,postgresql,spss,sql,visidata,wkt,zenodo]", + "frictionless[aws,bigquery,ckan,csv,dev,duckdb,excel,json,github,gsheets,html,mysql,ods,pandas,polars,parquet,postgresql,spss,sql,visidata,wkt,zenodo]", ] [tool.hatch.envs.default.scripts]