diff --git a/runtime/drivers/clickhouse/clickhouse.go b/runtime/drivers/clickhouse/clickhouse.go index 64aa11ab0d1..c7f8076ac51 100644 --- a/runtime/drivers/clickhouse/clickhouse.go +++ b/runtime/drivers/clickhouse/clickhouse.go @@ -323,12 +323,18 @@ func (c *connection) AsObjectStore() (drivers.ObjectStore, bool) { // AsModelExecutor implements drivers.Handle. func (c *connection) AsModelExecutor(instanceID string, opts *drivers.ModelExecutorOptions) (drivers.ModelExecutor, bool) { - if opts.InputHandle == c && opts.OutputHandle == c { + if opts.OutputHandle != c { + return nil, false + } + if opts.InputHandle == c { return &selfToSelfExecutor{c}, true } - if opts.InputHandle.Driver() == "s3" && opts.OutputHandle == c { + if opts.InputHandle.Driver() == "s3" { return &s3ToSelfExecutor{opts.InputHandle, c}, true } + if opts.InputHandle.Driver() == "local_file" { + return &localFileToSelfExecutor{opts.InputHandle, c}, true + } return nil, false } diff --git a/runtime/drivers/clickhouse/model_executor_localfile_self.go b/runtime/drivers/clickhouse/model_executor_localfile_self.go new file mode 100644 index 00000000000..71b3eee1996 --- /dev/null +++ b/runtime/drivers/clickhouse/model_executor_localfile_self.go @@ -0,0 +1,277 @@ +package clickhouse + +import ( + "context" + "database/sql" + sqldriver "database/sql/driver" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/marcboeker/go-duckdb" + "github.com/mitchellh/mapstructure" + "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/pkg/fileutil" +) + +type localFileToSelfExecutor struct { + fileStore drivers.Handle + c *connection +} + +var _ drivers.ModelExecutor = &selfToSelfExecutor{} + +type localInputProps struct { + Format string `mapstructure:"format"` +} + +func (p *localInputProps) Validate() error { + return nil +} + +func (e *localFileToSelfExecutor) Concurrency(desired int) (int, bool) { + if desired > 1 { + return desired, true + } + return _defaultConcurrentInserts, true +} + +func (e *localFileToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExecuteOptions) (*drivers.ModelResult, error) { + from, ok := e.fileStore.AsFileStore() + if !ok { + return nil, fmt.Errorf("input handle %q does not implement filestore", opts.InputHandle.Driver()) + } + + // parse and validate input properties + inputProps := &localInputProps{} + if err := mapstructure.WeakDecode(opts.InputProperties, inputProps); err != nil { + return nil, fmt.Errorf("failed to parse input properties: %w", err) + } + if err := inputProps.Validate(); err != nil { + return nil, fmt.Errorf("invalid input properties: %w", err) + } + + // parse and validate output properties + outputProps := &ModelOutputProperties{} + if err := mapstructure.WeakDecode(opts.OutputProperties, outputProps); err != nil { + return nil, fmt.Errorf("failed to parse output properties: %w", err) + } + if outputProps.Typ == "" && outputProps.Materialize == nil { + outputProps.Materialize = boolptr(true) + } + if err := outputProps.Validate(opts); err != nil { + return nil, fmt.Errorf("invalid output properties: %w", err) + } + if outputProps.Typ != "TABLE" { + return nil, fmt.Errorf("models with input_connector 'localfile' must be materialized as tables") + } + + // get the local file path + localPaths, err := from.FilePaths(ctx, opts.InputProperties) + if err != nil { + return nil, err + } + if len(localPaths) == 0 { + return nil, fmt.Errorf("no files to ingest") + } + + if inputProps.Format == "" { + inputProps.Format, err = fileExtToFormat(fileutil.FullExt(localPaths[0])) + if err != nil { + return nil, fmt.Errorf("failed to infer format: %w", err) + } + } + + if outputProps.Columns == "" { + outputProps.Columns, err = e.inferColumns(ctx, opts, inputProps.Format, localPaths) + if err != nil { + return nil, fmt.Errorf("failed to infer columns: %w", err) + } + } + + usedModelName := false + if outputProps.Table == "" { + outputProps.Table = opts.ModelName + usedModelName = true + } + tableName := outputProps.Table + + // Prepare for ingesting into the staging view/table. + // NOTE: This intentionally drops the end table if not staging changes. + stagingTableName := tableName + if opts.Env.StageChanges { + stagingTableName = stagingTableNameFor(tableName) + } + if t, err := e.c.InformationSchema().Lookup(ctx, "", "", stagingTableName); err == nil { + _ = e.c.DropTable(ctx, stagingTableName, t.View) + } + + // create the table + err = e.c.createTable(ctx, stagingTableName, "", outputProps) + if err != nil { + _ = e.c.DropTable(ctx, stagingTableName, false) + return nil, fmt.Errorf("failed to create model: %w", err) + } + + // ingest the data + for _, path := range localPaths { + contents, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read file %q: %w", path, err) + } + + query := fmt.Sprintf("INSERT INTO %s FORMAT %s\n", safeSQLName(stagingTableName), inputProps.Format) + string(contents) + _, err = e.c.db.DB.ExecContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to insert data: %w", err) + } + } + + // Rename the staging table to the final table name + if stagingTableName != tableName { + err = olapForceRenameTable(ctx, e.c, stagingTableName, false, tableName) + if err != nil { + return nil, fmt.Errorf("failed to rename staged model: %w", err) + } + } + + // Build result props + resultPropsMap := map[string]interface{}{} + err = mapstructure.WeakDecode(&ModelResultProperties{ + Table: tableName, + View: false, + UsedModelName: usedModelName, + }, &resultPropsMap) + if err != nil { + return nil, fmt.Errorf("failed to encode result properties: %w", err) + } + + // Done + return &drivers.ModelResult{ + Connector: opts.OutputConnector, + Properties: resultPropsMap, + Table: tableName, + }, nil +} + +func (e *localFileToSelfExecutor) inferColumns(ctx context.Context, opts *drivers.ModelExecuteOptions, format string, localPaths []string) (string, error) { + tempDir, err := os.MkdirTemp(opts.TempDir, "duckdb") + if err != nil { + return "", fmt.Errorf("failed to create temp dir: %w", err) + } + defer os.RemoveAll(tempDir) + connector, err := duckdb.NewConnector(filepath.Join(tempDir, "temp.db?threads=1&max_memory=256MB"), func(execer sqldriver.ExecerContext) error { + _, err = execer.ExecContext(ctx, "SET autoinstall_known_extensions=1; SET autoload_known_extensions=1;", nil) + return err + }) + if err != nil { + return "", fmt.Errorf("failed to create duckdb connector: %w", err) + } + defer connector.Close() + + db := sql.OpenDB(connector) + defer db.Close() + + src, err := sourceReader(localPaths, format) + if err != nil { + return "", fmt.Errorf("failed to create source reader: %w", err) + } + + // identify the columns and types + rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT column_name, column_type FROM (DESCRIBE SELECT * FROM %s)", src)) + if err != nil { + return "", fmt.Errorf("failed to describe table: %w", err) + } + defer rows.Close() + + var columns strings.Builder + columns.WriteString("(") + var name, typ string + for rows.Next() { + if err := rows.Scan(&name, &typ); err != nil { + return "", fmt.Errorf("failed to scan row: %w", err) + } + if columns.Len() > 1 { + columns.WriteString(", ") + } + columns.WriteString(fmt.Sprintf("%s %s", safeSQLName(name), typeFromDuckDBType(typ))) + } + if rows.Err() != nil { + return "", fmt.Errorf("failed to iterate rows: %w", rows.Err()) + } + columns.WriteString(")") + return columns.String(), nil +} + +func sourceReader(paths []string, format string) (string, error) { + // Generate a "read" statement + if containsAny(format, []string{"CSV", "TabSeparated"}) { + // CSV reader + return fmt.Sprintf("read_csv_auto(%s)", convertToStatementParamsStr(paths)), nil + } else if strings.Contains(format, "Parquet") { + // Parquet reader + return fmt.Sprintf("read_parquet(%s)", convertToStatementParamsStr(paths)), nil + } else if containsAny(format, []string{"JSON", "JSONEachRow"}) { + // JSON reader + return fmt.Sprintf("read_json_auto(%s)", convertToStatementParamsStr(paths)), nil + } + return "", fmt.Errorf("file type not supported : %s", format) +} + +func containsAny(s string, targets []string) bool { + for _, target := range targets { + if strings.Contains(s, target) { + return true + } + } + return false +} + +func convertToStatementParamsStr(paths []string) string { + return fmt.Sprintf("['%s']", strings.Join(paths, "','")) +} + +func typeFromDuckDBType(typ string) string { + switch strings.ToLower(typ) { + case "boolean": + return "Bool" + case "bigint": + return "BIGINT" + case "double": + return "Float64" + case "time": + return "String" + case "date": + return "Date" + case "varchar": + return "String" + case "timestamp": + return "DateTime" + default: + return "String" + } +} + +func fileExtToFormat(ext string) (string, error) { + switch ext { + case ".csv": + return "CSV", nil + case ".tsv": + return "TabSeparated", nil + case ".txt": + return "CSV", nil + case ".parquet": + return "Parquet", nil + case ".json": + return "JSON", nil + case ".ndjson": + return "JSONEachRow", nil + default: + return "", fmt.Errorf("unsupported file extension: %s, must be one of ['.csv', '.tsv', '.txt', '.parquet', '.json', '.ndjson'] for models that ingest from 'local_file' into 'clickhouse'", ext) + } +} + +func boolptr(b bool) *bool { + return &b +} diff --git a/runtime/drivers/clickhouse/olap.go b/runtime/drivers/clickhouse/olap.go index 50324f8dd76..e0bfe1162a9 100644 --- a/runtime/drivers/clickhouse/olap.go +++ b/runtime/drivers/clickhouse/olap.go @@ -229,63 +229,11 @@ func (c *connection) CreateTableAsSelect(ctx context.Context, name string, view Priority: 100, }) } - - var create strings.Builder - create.WriteString("CREATE OR REPLACE TABLE ") - if c.config.Cluster != "" { - // need to create a local table on the cluster first - fmt.Fprintf(&create, "%s %s", safelocalTableName(name), onClusterClause) - } else { - create.WriteString(safeSQLName(name)) - } - - if outputProps.Columns == "" { - // infer columns - v := tempName("view") - err := c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("CREATE OR REPLACE VIEW %s %s AS %s", v, onClusterClause, sql)}) - if err != nil { - return err - } - defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - _ = c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("DROP VIEW %s %s", v, onClusterClause)}) - }() - // create table with same schema as view - fmt.Fprintf(&create, " AS %s ", v) - } else { - fmt.Fprintf(&create, " %s ", outputProps.Columns) - } - create.WriteString(outputProps.tblConfig()) - - // create table // on replicated databases `create table t as select * from ...` is prohibited // so we need to create a table first and then insert data into it - err := c.Exec(ctx, &drivers.Statement{Query: create.String(), Priority: 100}) - if err != nil { + if err := c.createTable(ctx, name, sql, outputProps); err != nil { return err } - - if c.config.Cluster != "" { - // create the distributed table - var distributed strings.Builder - fmt.Fprintf(&distributed, "CREATE OR REPLACE TABLE %s %s AS %s", safeSQLName(name), onClusterClause, safelocalTableName(name)) - fmt.Fprintf(&distributed, " ENGINE = Distributed(%s, currentDatabase(), %s", safeSQLName(c.config.Cluster), safelocalTableName(name)) - if outputProps.DistributedShardingKey != "" { - fmt.Fprintf(&distributed, ", %s", outputProps.DistributedShardingKey) - } else { - fmt.Fprintf(&distributed, ", rand()") - } - distributed.WriteString(")") - if outputProps.DistributedSettings != "" { - fmt.Fprintf(&distributed, " SETTINGS %s", outputProps.DistributedSettings) - } - err = c.Exec(ctx, &drivers.Statement{Query: distributed.String(), Priority: 100}) - if err != nil { - return err - } - } - // insert into table return c.Exec(ctx, &drivers.Statement{ Query: fmt.Sprintf("INSERT INTO %s %s", safeSQLName(name), sql), @@ -348,6 +296,10 @@ func (c *connection) DropTable(ctx context.Context, name string, _ bool) error { } } +func (c *connection) MayBeScaledToZero(ctx context.Context) bool { + return c.config.CanScaleToZero +} + // RenameTable implements drivers.OLAPStore. func (c *connection) RenameTable(ctx context.Context, oldName, newName string, view bool) error { typ, onCluster, err := informationSchema{c: c}.entityType(ctx, "", oldName) @@ -502,8 +454,65 @@ func (c *connection) renameTable(ctx context.Context, oldName, newName, onCluste return c.DropTable(context.Background(), oldName, false) } -func (c *connection) MayBeScaledToZero(ctx context.Context) bool { - return c.config.CanScaleToZero +func (c *connection) createTable(ctx context.Context, name, sql string, outputProps *ModelOutputProperties) error { + var onClusterClause string + if c.config.Cluster != "" { + onClusterClause = "ON CLUSTER " + safeSQLName(c.config.Cluster) + } + var create strings.Builder + create.WriteString("CREATE OR REPLACE TABLE ") + if c.config.Cluster != "" { + // need to create a local table on the cluster first + fmt.Fprintf(&create, "%s %s", safelocalTableName(name), onClusterClause) + } else { + create.WriteString(safeSQLName(name)) + } + + if outputProps.Columns == "" { + if sql == "" { + return fmt.Errorf("clickhouse: no columns specified for table %q", name) + } + // infer columns + v := tempName("view") + err := c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("CREATE OR REPLACE VIEW %s %s AS %s", v, onClusterClause, sql)}) + if err != nil { + return err + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + _ = c.Exec(ctx, &drivers.Statement{Query: fmt.Sprintf("DROP VIEW %s %s", v, onClusterClause)}) + }() + // create table with same schema as view + fmt.Fprintf(&create, " AS %s ", v) + } else { + fmt.Fprintf(&create, " %s ", outputProps.Columns) + } + create.WriteString(outputProps.tblConfig()) + + // create table + err := c.Exec(ctx, &drivers.Statement{Query: create.String(), Priority: 100}) + if err != nil { + return err + } + + if c.config.Cluster == "" { + return nil + } + // create the distributed table + var distributed strings.Builder + fmt.Fprintf(&distributed, "CREATE OR REPLACE TABLE %s %s AS %s", safeSQLName(name), onClusterClause, safelocalTableName(name)) + fmt.Fprintf(&distributed, " ENGINE = Distributed(%s, currentDatabase(), %s", safeSQLName(c.config.Cluster), safelocalTableName(name)) + if outputProps.DistributedShardingKey != "" { + fmt.Fprintf(&distributed, ", %s", outputProps.DistributedShardingKey) + } else { + fmt.Fprintf(&distributed, ", rand()") + } + distributed.WriteString(")") + if outputProps.DistributedSettings != "" { + fmt.Fprintf(&distributed, " SETTINGS %s", outputProps.DistributedSettings) + } + return c.Exec(ctx, &drivers.Statement{Query: distributed.String(), Priority: 100}) } // acquireMetaConn gets a connection from the pool for "meta" queries like information schema (i.e. fast queries). diff --git a/runtime/drivers/models.go b/runtime/drivers/models.go index 6afb4051340..e9a73234627 100644 --- a/runtime/drivers/models.go +++ b/runtime/drivers/models.go @@ -72,6 +72,8 @@ type ModelExecuteOptions struct { // PreviousResult is the result of a previous execution. // For concurrent split execution, it may not be the most recent previous result. PreviousResult *ModelResult + // TempDir is a temporary directory for storing intermediate data. + TempDir string } // ModelEnv contains contextual info about the model's instance. diff --git a/runtime/metricsview/executor_export.go b/runtime/metricsview/executor_export.go index b27d6aec34b..e462e641aac 100644 --- a/runtime/metricsview/executor_export.go +++ b/runtime/metricsview/executor_export.go @@ -74,6 +74,7 @@ func (e *Executor) executeExport(ctx context.Context, format drivers.FileFormat, InputProperties: inputProps, OutputProperties: outputProps, Priority: e.priority, + TempDir: e.rt.TempDir(e.instanceID), }) if err != nil { _ = os.Remove(path) diff --git a/runtime/reconcilers/model.go b/runtime/reconcilers/model.go index d7c4170b944..048679f852e 100644 --- a/runtime/reconcilers/model.go +++ b/runtime/reconcilers/model.go @@ -1107,6 +1107,7 @@ func (r *ModelReconciler) executeSingle(ctx context.Context, executor *wrappedMo IncrementalRun: incrementalRun, SplitRun: split != nil, PreviousResult: prevResult, + TempDir: r.C.Runtime.TempDir(r.C.InstanceID), }) if err != nil { return nil, err @@ -1136,6 +1137,7 @@ func (r *ModelReconciler) executeSingle(ctx context.Context, executor *wrappedMo IncrementalRun: incrementalRun, SplitRun: split != nil, PreviousResult: prevResult, + TempDir: r.C.Runtime.TempDir(r.C.InstanceID), }) if err != nil { return nil, err