Skip to content

Commit 502473f

Browse files
committed
Better DSN support for clickhouse
Database, Dialect and inserter now created base on the dsn driver type
1 parent c1e7af7 commit 502473f

File tree

4 files changed

+73
-43
lines changed

4 files changed

+73
-43
lines changed

cmd/substreams-sink-sql/from_proto.go

Lines changed: 57 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/streamingfast/substreams-sink-sql/db_proto"
1515
"github.com/streamingfast/substreams-sink-sql/db_proto/proto"
1616
protosql "github.com/streamingfast/substreams-sink-sql/db_proto/sql"
17+
clickhouse "github.com/streamingfast/substreams-sink-sql/db_proto/sql/click_house"
1718
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/postgres"
1819
schema2 "github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
1920
stats2 "github.com/streamingfast/substreams-sink-sql/db_proto/stats"
@@ -38,8 +39,8 @@ var fromProtoCmd = Command(fromProtoE,
3839
//flags.Bool("no-transactions", false, "Do not use transactions when inserting data. This is useful to speed up the initial import of a large dataset.")
3940
//flags.Bool("parallel", false, "Run the sinker in parallel mode. This is useful to speed up the initial import of a large dataset. This is will process blocks of a batch in parallel")
4041
flags.Int("block-batch-size", 25, "number of blocks to process at a time")
41-
//flags.String("clickhouse-sink-info-folder", "", "folder where to store the clickhouse sink info")
42-
//flags.String("clickhouse-cursor-file-path", "cursor.txt", "file name where to store the clickhouse cursor")
42+
flags.String("clickhouse-sink-info-folder", "", "folder where to store the clickhouse sink info")
43+
flags.String("clickhouse-cursor-file-path", "cursor.txt", "file name where to store the clickhouse cursor")
4344
}),
4445
)
4546

@@ -52,7 +53,6 @@ var fromProtoCmd = Command(fromProtoE,
5253
//todo: post generate index
5354
//todo: external process
5455
//todo: handle network
55-
//todo: fix DSN for clickhouse
5656

5757
func fromProtoE(cmd *cobra.Command, args []string) error {
5858
//app := NewApplication(cmd.Context())
@@ -161,8 +161,6 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
161161
}
162162
}
163163

164-
//todo: fix me
165-
//schemaName := "test"
166164
schemaName := dsn.Schema()
167165

168166
schema, err := schema2.NewSchema(schemaName, rootMessageDescriptor, useProtoOption, zlog)
@@ -185,38 +183,52 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
185183
}
186184

187185
connectionString := dsn.ConnString()
188-
//todo: fix me
189-
//connectionString = "http://localhost:8123?secure=false"
190186

191-
fmt.Println("connection string", connectionString)
187+
zlog.Info("connecting to db", zap.String("dsn", connectionString))
192188
sqlDB, err := sql.Open(dsn.Driver(), connectionString)
193189
if err != nil {
194190
return fmt.Errorf("open db connection: %w", err)
195191
}
196192

197-
dialect, err := postgres.NewDialectPostgres(schema.Name, schema.TableRegistry, zlog)
198-
//dialect, err := clickhouse.NewDialectClickHouse(schema.Name, schema.TableRegistry, zlog)
199-
if err != nil {
200-
return fmt.Errorf("creating dialect: %w", err)
201-
}
202-
193+
var dialect protosql.Dialect
203194
var database protosql.Database
204-
//implDatabase, err := clickhouse.NewDatabase(
205-
// schemaName,
206-
// dialect,
207-
// sqlDB,
208-
// outputModuleName,
209-
// rootMessageDescriptor,
210-
// sflags.MustGetString(cmd, "clickhouse-sink-info-folder"),
211-
// sflags.MustGetString(cmd, "clickhouse-cursor-file-path"),
212-
// true,
213-
// zlog,
214-
//)
215-
implDatabase, err := postgres.NewDatabase(schemaName, dialect, sqlDB, outputModuleName, rootMessageDescriptor, useProtoOption, zlog)
216-
if err != nil {
217-
return fmt.Errorf("creating database: %w", err)
195+
196+
switch dsn.Driver() {
197+
case "postgres":
198+
d, err := postgres.NewDialectPostgres(schema.Name, schema.TableRegistry, zlog)
199+
if err != nil {
200+
return fmt.Errorf("creating postgres dialect: %w", err)
201+
}
202+
dialect = d
203+
database, err = postgres.NewDatabase(schemaName, d, sqlDB, outputModuleName, rootMessageDescriptor, useProtoOption, zlog)
204+
if err != nil {
205+
return fmt.Errorf("creating postgres database: %w", err)
206+
}
207+
208+
case "clickhouse":
209+
d, err := clickhouse.NewDialectClickHouse(schema.Name, schema.TableRegistry, zlog)
210+
if err != nil {
211+
return fmt.Errorf("creating clickhouse dialect: %w", err)
212+
}
213+
dialect = d
214+
database, err = clickhouse.NewDatabase(
215+
schemaName,
216+
d,
217+
sqlDB,
218+
outputModuleName,
219+
rootMessageDescriptor,
220+
sflags.MustGetString(cmd, "clickhouse-sink-info-folder"),
221+
sflags.MustGetString(cmd, "clickhouse-cursor-file-path"),
222+
true,
223+
zlog,
224+
)
225+
if err != nil {
226+
return fmt.Errorf("creating clickhouse database: %w", err)
227+
}
228+
default:
229+
panic(fmt.Sprintf("unsupported driver: %s", dsn.Driver()))
230+
218231
}
219-
database = implDatabase
220232

221233
sinkInfo, err := database.FetchSinkInfo(schema.Name)
222234
if err != nil {
@@ -291,19 +303,24 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
291303
}
292304
}
293305

294-
//inserter, err := clickhouse.NewAccumulatorInserter(implDatabase, zlog)
295-
296306
var inserter protosql.Inserter
297-
if useConstraints {
298-
inserter, err = postgres.NewRowInserter(implDatabase, zlog)
299-
if err != nil {
300-
return fmt.Errorf("creating row inserter: %w", err)
301-
}
302-
} else {
303-
inserter, err = postgres.NewAccumulatorInserter(implDatabase, zlog)
304-
if err != nil {
305-
return fmt.Errorf("creating accumulator inserter: %w", err)
307+
switch dsn.Driver() {
308+
case "postgres":
309+
if useConstraints {
310+
inserter, err = postgres.NewRowInserter(database.(*postgres.Database), zlog)
311+
if err != nil {
312+
return fmt.Errorf("creating row inserter: %w", err)
313+
}
314+
} else {
315+
inserter, err = postgres.NewAccumulatorInserter(database.(*postgres.Database), zlog)
316+
if err != nil {
317+
return fmt.Errorf("creating accumulator inserter: %w", err)
318+
}
306319
}
320+
case "clickhouse":
321+
inserter, err = clickhouse.NewAccumulatorInserter(database.(*clickhouse.Database), zlog)
322+
default:
323+
panic(fmt.Sprintf("unsupported driver: %s", dsn.Driver()))
307324
}
308325

309326
database.SetInserter(inserter)

db_changes/db/dsn.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ func (c *DSN) Driver() string {
105105

106106
func (c *DSN) ConnString() string {
107107
if c.driver == "clickhouse" {
108+
for _, option := range c.options {
109+
if c.host == "localhost" {
110+
c.host = "127.0.0.1"
111+
scheme := "http"
112+
if option == "secure=true" {
113+
scheme = "https"
114+
}
115+
return strings.Replace(c.original, "clickhouse://", scheme+"://", 1)
116+
}
117+
}
108118
return c.original
109119
}
110120
out := fmt.Sprintf("host=%s port=%d dbname=%s %s", c.host, c.port, c.database, strings.Join(c.options, " "))

db_proto/sql/click_house/accumulator_inserter.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
type accumulator struct {
1515
ordinal int
16+
tableName string
1617
query string
1718
rowValues [][]string
1819
}
@@ -34,8 +35,9 @@ func NewAccumulatorInserter(database *Database, logger *zap.Logger) (*Accumulato
3435
return nil, fmt.Errorf("creating insert from descriptor for table %q: %w", table.Name, err)
3536
}
3637
accumulators[table.Name] = &accumulator{
37-
ordinal: table.Ordinal,
38-
query: query,
38+
tableName: table.Name,
39+
ordinal: table.Ordinal,
40+
query: query,
3941
}
4042
}
4143
accumulators["_blocks_"] = &accumulator{
@@ -115,6 +117,7 @@ func (i *AccumulatorInserter) Flush(tx *sql.Tx) error {
115117
})
116118

117119
for _, acc := range accumulators {
120+
i.logger.Debug("flushing table", zap.String("table", acc.tableName), zap.Int("ordinal", acc.ordinal), zap.Int("row_count", len(acc.rowValues)))
118121
if len(acc.rowValues) == 0 {
119122
continue
120123
}

db_proto/sql/click_house/dialect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const staticSqlCreateBlock = `
2323
hash text,
2424
timestamp timestamp
2525
)
26-
ENGINE = MergeTree()
26+
ENGINE = ReplacingMergeTree()
2727
PRIMARY KEY (number)
2828
`
2929

0 commit comments

Comments
 (0)