Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mix ecto.ch.schema fails for columns with type AggregateFunction #90

Open
Zarathustra2 opened this issue May 30, 2023 · 30 comments
Open

Comments

@Zarathustra2
Copy link
Contributor

Getting this error:

** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
    lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(argMin, Decimal(18, 4), DateTime)", [])
    lib/ch/types.ex:328: Ch.Types.decode/1
    lib/mix/tasks/schema.ex:77: Mix.Tasks.Ecto.Ch.Schema.build_field/2
    lib/mix/tasks/schema.ex:46: anonymous fn/1 in Mix.Tasks.Ecto.Ch.Schema.run/1
    (elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
    (elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
    lib/mix/tasks/schema.ex:46: Mix.Tasks.Ecto.Ch.Schema.run/1
    (mix 1.14.4) lib/mix/task.ex:421: anonymous fn/3 in Mix.Task.run_task/4

when running mix ecto.ch.schema default.<TABLE>

The table has the columns:

`foo` AggregateFunction(argMin, Decimal(18, 4), DateTime),
`bar` AggregateFunction(argMax, Decimal(18, 4), DateTime),
@Zarathustra2
Copy link
Contributor Author

How would I define a schema for those fields as well?

** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
    (ch 0.1.11) lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(avgWeighted, Decimal(18, 4), Int64)", [])
    (ch 0.1.11) lib/ch/types.ex:328: Ch.Types.decode/1
    (ecto 3.10.1) lib/ecto/parameterized_type.ex:190: Ecto.ParameterizedType.init/2
    (ecto 3.10.1) lib/ecto/schema.ex:1916: Ecto.Schema.__field__/4
    lib/core/clickhouse/schemas/chain_aggregates.ex:34: (module)

and just using Decimal(18,4) won't work because then on insert we will get:

 Type of 'foo' must be AggregateFunction(argMin, Decimal(18, 4), DateTime), not Decimal(18, 4)

@ruslandoga
Copy link
Contributor

ruslandoga commented May 31, 2023

I haven't used AggregateFunction types so I'm not sure how to properly decode them (for some reason many adapters don't support them). However, we can add something like :as option:

# uses `AggregateFunction(argMin, Decimal(18, 4), DateTime)` in the header
# uses `Decimal(18, 4)` for encoding and decoding
field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"

@Zarathustra2 what do you think?

It would also be able to replace a custom type in Plausible

field :is_bounce, Ch, type: "UInt8", as: :boolean

PR: #91

@ruslandoga
Copy link
Contributor

ruslandoga commented May 31, 2023

or maybe it can be a :name, with the logic reversed

field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
field :is_bounce, Ch, type: :boolean, name: "UInt8"

@ukutaht
Copy link

ukutaht commented May 31, 2023

It's funny, for me the intuitive naming convention would be the other way around:

field :is_bounce, Ch, type: :boolean, as: "UInt8"

I suppose it depends on which side you approach it from. Looking at it from the Clickhouse side, it's a UInt8 which is treated as a boolean in the app. But looking from the app side, it is a boolean which is serialized as an UInt8 to Clickhouse.

Since Ecto lives in the context or our application, it's more intuitive for me that the type would represent how we treat it in-app.

@Zarathustra2
Copy link
Contributor Author

@ruslandoga can you also add a test for inserting the data? I am wondering if we will run into the same issue as ClickHouse/clickhouse-java#1232

I personally prefer type & name maybe even go with ch_name instead of name

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

@Zarathustra2 what kind of test do you have in mind?

Something like this?

  test "insert AggregateFunction", %{conn: conn} do
    Ch.query!(conn, """
    CREATE TABLE test_insert_aggregate_function (
      uid Int16,
      updated SimpleAggregateFunction(max, DateTime),
      name AggregateFunction(argMax, String, DateTime)
    ) ENGINE AggregatingMergeTree ORDER BY uid
    """)

    rows = [
      [1, ~N[2020-01-02 00:00:00], "b"],
      [1, ~N[2020-01-01 00:00:00], "a"]
    ]

    assert %{num_rows: 2} =
             Ch.query!(
               conn,
               """
               INSERT INTO test_insert_aggregate_function
                 SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
                 FROM input('uid Int16, updated DateTime, name String')
                 FORMAT RowBinary\
               """,
               rows,
               types: ["Int16", "DateTime", "String"]
             )

    assert Ch.query!(conn, """
           SELECT uid, max(updated) AS updated, argMaxMerge(name)
           FROM test_insert_aggregate_function
           GROUP BY uid
           """).rows == [[1, ~N[2020-01-02 00:00:00], "b"]]
  end

But note that insert into ... select from input(...) format ... won't be supported automatically in an Ecto adapter and it doesn't test inserting AggregateFunction types.

@Zarathustra2
Copy link
Contributor Author

Sorry bad phrasing, more like how would I even insert into this column with ecto:

defmodule SomeSchema do

  use Ecto.Schema

  @primary_key false
  schema "table" do
     field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
  end

end

How would I insert into this column since I need to insert not Decimal(18,4) but an argMinState(Decimal(18, 4), DateTime)).

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

How would you insert into that column with clickhouse-client?

@Zarathustra2
Copy link
Contributor Author

for my example above?

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

Yes, since I haven't used AggregateFunction myself I don't know how inserting would work, but I can help you translate a clickhouse-client query into ecto_ch :)

@Zarathustra2
Copy link
Contributor Author

 create table table_test_agg (foo AggregateFunction(argMin, UInt8, DateTime)) Engine = Memory;
insert into table_test_agg (foo) Values (arrayReduce('argMinState', [1], [now()]));

on a different note: AggregateFunction & SimpleAggregateFunction are super cool, I use them a lot! :)

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

This query won't work in "streaming" formats like native or rowbinary, since they don't do any evaluation of the rows.

You can use the same query with Repo.query or Ch.query though.

@Zarathustra2
Copy link
Contributor Author

I am wondering whether the data can be encoded in a different way because the java issues states:

On the other hand, in order to support more AggregateFunction types, since there's no document about the data structures for read and write, we have to dig into ClickHouse code to figure it out one by one, which is going to take a while. Would be great if someone from the server team can document all the details, so that not only Java but all other clients will benefit from that.

So it seems serializing a state of an aggregate function is possible but it is not documented? Maybe we have to ping someone on the slack?

@ruslandoga
Copy link
Contributor

We probably won't be supporting undocumented APIs in this driver. However, it would be possible to support them in a separate library since Ch has a very liberal API:

encoded = YourRowBinaryEncoder.encode_rows(...)
Ch.query(conn, "insert into ... format RowBinary", encoded, encode: false)

@ruslandoga
Copy link
Contributor

It seems to me right now that we won't be needing :as option in Ch since it doesn't accomplish much.

field :is_bounce, Ch, type: :boolean, as: "UInt8"

can be already covered with a custom Ecto type which is more typing but more conventional and possibly easier to understand.

And

field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"

doesn't seem to be possible, since we can't just take a decimal and insert it into an aggregate function type.

@Zarathustra2
Copy link
Contributor Author

Zarathustra2 commented Jun 1, 2023

Yeah I don't think as is needed right now


Do you think creating insert queries such as

"""
 INSERT INTO test_insert_aggregate_function
                 SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
                 FROM input('uid Int16, updated DateTime, name String')
                 FORMAT RowBinary
"""

would be possible in ecto_ch with a macro or something based on a given schema?

EDIT: Actually this is kinda hard as we will never catch all edge cases (some may insert states with values from no fields of the table)

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

I'll add tests that use the approaches shown in https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/. It seems like all of them can already be supported with the current functionality.

@Zarathustra2
Copy link
Contributor Author

yeah all of them can be used via arrayReduce + input

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

would be possible in ecto_ch with a macro or something based on a given schema?

That SQL statement can be constructed without macros. One sec.

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

Something like this:

defmodule Schema do
  use Ecto.Schema
  
  @primary_key false
  schema "test_insert_aggregate_function" do
    field :uid, Ch, type: "Int16"
    field :updated, :naive_datetime
    field :name, :string
  end
end

table = Schema.__schema__(:source)
fields = Schema.__schema__(:fields)
types = Enum.map(fields, fn field -> Schema.__schema__(:type, field) |> Ecto.Type.type() end)
structure = Enum.zip(fields, types) |> Enum.map(fn {f, t} -> "#{f} #{t}" end) |> Enum.join(", ")

select = from i in fragment("input(?)", literal(^structure)),
  select:  %{uid: i.uid, updated: i.updated, name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)}

{select, _no_params = []} = Repo.to_sql(:all, select)

Repo.query!(["insert into ", table, ?\s, select, " format RowBinary"], rows, types: types)

@Zarathustra2
Copy link
Contributor Author

Oh that is sick, thanks @ruslandoga <3


Should this may be added to the readme of ecto_ch, I bet someone else will run into this as well :D

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 1, 2023

I'll try to find a way to "automate" fragment("input(?)", ...) in the tests for https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/.


I'm thinking about doing something like

import Ecto.Adapters.ClickHouse.API, only: [input: 1]

input =
  from i in input(uid: "Int16", updated: "DateTime", name: "String"), # or input(Schema)
    select: %{
      uid: i.uid,
      updated: i.updated,
      name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)
    }

rows = [
  [uid: 1231, updated: ~N[2020-01-02 00:00:00], name: "Jane"],
  [uid: 1231, updated: ~N[2020-01-01 00:00:00], name: "John"]
]

TestRepo.insert_all("users", rows, input: input)

WIP: plausible/ecto_ch#79

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 5, 2023

@Zarathustra2 I've merged plausible/ecto_ch#79 and I wonder if it solves your use-case.

def deps do
  [
-   {:ecto_ch, "~> 0.1.0"},
+   {:ecto_ch, github: "plausible/ecto_ch"}
  ]
end

I'm not releasing it yet since it might need some more work depending on your test-drive.

@Zarathustra2
Copy link
Contributor Author

Oh that is sick!!! Let me test that today/tomorrow, I will ping you but just from reading over the code it looks pretty incredible easy to work with! <3

@Zarathustra2
Copy link
Contributor Author

likely not getting to it today but should be getting to it tomorrow, sorry about that

@Zarathustra2
Copy link
Contributor Author

Currently getting ** (Ch.Error) Code: 477. DB::Exception: FORMAT must be specified for function input(). (INVALID_USAGE_OF_INPUT) (version 23.2.4.12 (official build)) let me see if I can debug this one :D (can't really share the schema due to being confidential :( )

@ruslandoga
Copy link
Contributor

@Zarathustra2
Copy link
Contributor Author

Works now I had TestRepo.insert_all("users", input, input: rows) instead of TestRepo.insert_all("users", rows, input: input) lol

Super awesome! @ruslandoga <3

@ruslandoga
Copy link
Contributor

ruslandoga commented Jun 8, 2023

Ah, yes. I still don't know what the API should be. I considered both of your approaches (and picked the current one since we are still inserting rows, but "via" an input). And I also thought about Repo.insert_input(table, input_query, rows, opts)

@Zarathustra2
Copy link
Contributor Author

I mean, I did a silly mistake so that is that :D

insert_input doesn't sound too bad IMO then could add dedicated docs for that function which may make it easier for other users to spot on hexdocs when reading through the different functions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants