Skip to content

Support object_name in all from_ formats #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/json-csv-reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,17 @@ def main():
print("========================================================================")
print("static CSV with header schema test parsing 3.5K objects")
print("========================================================================")
static_csv_ds = DataChain.from_csv(uri, spec=ChatFeature)
static_csv_ds = DataChain.from_csv(uri, output=ChatFeature, object_name="chat")
static_csv_ds.print_schema()
print(static_csv_ds.to_pandas())

uri = "gs://datachain-demo/laion-aesthetics-csv"
print()
print("========================================================================")
print("dynamic CSV with header schema test parsing 3M objects")
print("========================================================================")
dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion", show_schema=True)
dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion")
dynamic_csv_ds.print_schema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ds.print_schema() and the argument show_schema=True serve different purposes.

The former just prints the entire schema for the datachain.
The latter is a check if the CSV/JSON was auto-schemed correctly; if not, the user will need to specify a schema manually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add show_schema to all these methods but would like some consensus on if it's desirable since it seems a bit superfluous with the schema-related methods added recently by @dmpetrov. Previously, these methods were always printing the schema of the table if it was inferred but @dmpetrov commented here about it looking like a strange side effect, so I dropped it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are important differences there.

If you are trying to read a new CSV/JSON file, you likely want to check the schema but also will probably copy-paste the output if you want to modify it.

ds.print_schema() does not serve this purpose.

I can talk to Dmitry if needed as we already had this convo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get back the actual schema, though, which seems more useful than a print statement since you can parse it, validate it, and modify it. For example, you could do DataChain.from_csv(uri, object_name="csv").schema["csv"] to get the pydantic-like feature class (I think having something like schema.to_dict() might also be nice).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataChain.from_csv(uri, object_name="csv").schema["csv"]

how does it help to copy-paste the schema to correct the errors?

Also I think you don't want to parse a 500Gb CSV file just to discover your auto-schema was wrong.
Maybe we should have show_csv_schema() as a companion to show_json_schema() to address this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does it help to copy-paste the schema to correct the errors?

That's why I mentioned schema.to_dict() would be nice. Then you could edit the dict and pass it back.

Also I think you don't want to parse a 500Gb CSV file just to discover your auto-schema was wrong. Maybe we should have show_csv_schema() as a companion to show_json_schema() to address this.

Only a single block (10k lines) will be parsed to infer the schema (this is how arrow does it). Like all other chain operations, the processing happens lazily.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataChain.from_csv(uri, object_name="csv").schema["csv"]

This seems to be the right approach for examine the schema alongside print_schema() Otherwise, we will need to introduce similar side effects to all parsing methods which explodes API and not considering a good practice.

We can potentially consider improving print_schema() for example by limiting number of outputs like print_schema("csv", "file")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a single block (10k lines) will be parsed to infer the schema (this is how arrow does it). Like all other chain operations, the processing happens lazily.

This is not intuitive but might be just okay.

What's not okay is not to offer a copy-paste Feature or Pydantic-to-feature schema output.
Note that neither print_schema() nor schema() gets us there (the latter should actually be renamed model():

>>> chain.schema["mistral_response"]
<class '__main__.MistralModel'>
>>> chain.schema["mistral_response"].schema()
{'$defs': {'CompletionResponseChoice': {'properties': {'message': {'allOf': [{'$ref': '#/$defs/MyChatMessage'}], 'default': {'role': '', 'content': ''}}}, 'title': 'CompletionResponseChoice', 'type': 'object'}, 'MyChatMessage': {'properties': {'role': {'default': '', 'title': 'Role', 'type': 'string'}, 'content': {'default': '', 'title': 'Content', 'type': 'string'}}, 'title': 'MyChatMessage', 'type': 'object'}, 'Usage': {'properties': {'prompt_tokens': {'default': 0, 'title': 'Prompt Tokens', 'type': 'integer'}, 'completion_tokens': {'default': 0, 'title': 'Completion Tokens', 'type': 'integer'}}, 'title': 'Usage', 'type': 'object'}}, 'properties': {'id': {'default': '', 'title': 'Id', 'type': 'string'}, 'choices': {'items': {'$ref': '#/$defs/CompletionResponseChoice'}, 'title': 'Choices', 'type': 'array'}, 'usage': {'allOf': [{'$ref': '#/$defs/Usage'}], 'default': {'prompt_tokens': 0, 'completion_tokens': 0}}}, 'required': ['choices'], 'title': 'MistralModel', 'type': 'object'}

>>> chain.print_schema()
 source: str
 parent: str
 name: str
 version: str
 etag: str
 size: int
 vtype: str
 location: dict
 file: File
     source: str
     parent: str
     name: str
     size: int
     version: str
     etag: str
     is_latest: bool
     last_modified: datetime
     location: Union[dict, list[dict], NoneType]
     vtype: str
 mistral_response: MistralModel
     id: str
     choices: list[CompletionResponseChoice]
         * list of: CompletionResponseChoice
             message: MyChatMessage
                 role: str
                 content: str
     usage: Usage
         prompt_tokens: int
         completion_tokens: int

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a copy-paste Feature or Pydantic-to-feature

Generating code for user to copy-past is uncommon because it's error prone (depends on environment) and not secure.

It might be better to handle this differently, possibly with a dedicated helper function like pydantic_to_feature().

@volkfox if you think it's an important use case - please share more context. However, it's unlikely should be a part of the data readers.

print(dynamic_csv_ds.to_pandas())


Expand Down
225 changes: 114 additions & 111 deletions examples/multimodal/clip_fine_tuning.ipynb

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions examples/wds.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
.map(stem=lambda file: file.get_file_stem(), params=["emd.file"], output=str)
)

meta_pq = (
DataChain.from_storage("gs://dvcx-datacomp-small/metadata")
.filter(C.name.glob("0020f*.parquet"))
.parse_parquet()
.map(stem=lambda file: file.get_file_stem(), params=["source.file"], output=str)
)
meta_pq = DataChain.from_parquet(
"gs://dvcx-datacomp-small/metadata/0020f*.parquet"
).map(stem=lambda file: file.get_file_stem(), params=["source.file"], output=str)

meta = meta_emd.merge(
meta_pq, on=["stem", "emd.index"], right_on=["stem", "source.index"]
Expand Down
33 changes: 25 additions & 8 deletions src/datachain/lib/arrow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import re
from collections.abc import Sequence
from typing import TYPE_CHECKING, Optional

import pyarrow as pa
from pyarrow.dataset import dataset

from datachain.lib.file import File, IndexedFile
from datachain.lib.udf import Generator

if TYPE_CHECKING:
import pyarrow as pa
from datachain.lib.dc import DataChain


class ArrowGenerator(Generator):
Expand Down Expand Up @@ -35,12 +37,29 @@ def process(self, file: File):
index += 1


def schema_to_output(schema: "pa.Schema"):
def infer_schema(chain: "DataChain", **kwargs) -> pa.Schema:
schemas = []
for file in chain.iterate_one("file"):
ds = dataset(file.get_path(), filesystem=file.get_fs(), **kwargs) # type: ignore[union-attr]
schemas.append(ds.schema)
return pa.unify_schemas(schemas)


def schema_to_output(schema: pa.Schema, col_names: Optional[Sequence[str]] = None):
"""Generate UDF output schema from pyarrow schema."""
if col_names and (len(schema) != len(col_names)):
raise ValueError(
"Error generating output from Arrow schema - "
f"Schema has {len(schema)} columns but got {len(col_names)} column names."
)
default_column = 0
output = {"source": IndexedFile}
for field in schema:
column = field.name.lower()
output = {}
for i, field in enumerate(schema):
if col_names:
column = col_names[i]
else:
column = field.name
column = column.lower()
column = re.sub("[^0-9a-z_]+", "", column)
if not column:
column = f"c{default_column}"
Expand All @@ -50,12 +69,10 @@ def schema_to_output(schema: "pa.Schema"):
return output


def _arrow_type_mapper(col_type: "pa.DataType") -> type: # noqa: PLR0911
def _arrow_type_mapper(col_type: pa.DataType) -> type: # noqa: PLR0911
"""Convert pyarrow types to basic types."""
from datetime import datetime

import pyarrow as pa

if pa.types.is_timestamp(col_type):
return datetime
if pa.types.is_binary(col_type):
Expand Down
Loading
Loading