Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.23-bullseye AS build
FROM --platform=$BUILDPLATFORM golang:1.24-bullseye AS build

WORKDIR /src

Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new db loader: %w", err)
}

postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer)
postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer, manifestPath)
if err != nil {
return fmt.Errorf("unable to setup postgres sinker: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Loader struct {

database string
schema string
host string
entries *OrderedMap[string, *OrderedMap[string, *Operation]]
entriesCount uint64
tables map[string]*TableInfo
Expand Down Expand Up @@ -78,6 +79,7 @@ func NewLoader(
DB: db,
database: dsn.database,
schema: dsn.schema,
host: dsn.host,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
batchBlockFlushInterval: batchBlockFlushInterval,
Expand Down Expand Up @@ -297,6 +299,10 @@ func (l *Loader) GetSchema() string {
return l.schema
}

func (l *Loader) GetDatabaseHost() string {
return l.host
}

func (l *Loader) HasTable(tableName string) bool {
if _, found := l.tables[tableName]; found {
return true
Expand Down
13 changes: 8 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/streamingfast/substreams-sink-sql

go 1.23.4
go 1.24.2

toolchain go1.23.10
toolchain go1.24.7

require (
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
Expand All @@ -15,7 +15,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/substreams v1.15.2
github.com/streamingfast/substreams v1.15.10
github.com/streamingfast/substreams-sink v0.5.0
github.com/streamingfast/substreams-sink-database-changes v1.1.3
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -48,6 +48,7 @@ require (
github.com/bobg/go-generics/v4 v4.1.2 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/charmbracelet/lipgloss v1.0.0 // indirect
github.com/charmbracelet/x/ansi v0.4.2 // indirect
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 // indirect
Expand Down Expand Up @@ -83,6 +84,7 @@ require (
github.com/paulmach/orb v0.11.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -97,6 +99,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
github.com/streamingfast/derr v0.0.0-20250321151415-6b4fbbcb1bb5 // indirect
github.com/streamingfast/firehose-networks v0.2.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
github.com/zeebo/errs v1.4.0 // indirect
Expand Down Expand Up @@ -152,7 +155,7 @@ require (
github.com/streamingfast/dgrpc v0.0.0-20250423172640-223250ed2391 // indirect
github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa
github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e
github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb // indirect
github.com/streamingfast/shutter v1.5.0
Expand All @@ -177,4 +180,4 @@ replace github.com/jimsmart/schema => github.com/streamingfast/schema v0.0.0-202

replace github.com/ClickHouse/clickhouse-go/v2 => github.com/YaroShkvorets/clickhouse-go/v2 v2.26.0-sink-sql

replace github.com/streamingfast/substreams-sink => github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout
replace github.com/streamingfast/substreams-sink => github.com/pinax-network/substreams-sink-go v0.5.7
18 changes: 12 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -447,8 +449,10 @@ github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvI
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout h1:Pulo1fRNs5/SnpYb65R9iqkpI6JvXqFJZufNfoN4yBo=
github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout/go.mod h1:zw+OTQ2XJPhhbU+oJI99+BFeJ1ElhYv+4teWNkWEyMA=
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 h1:chRRgzgzmFzICbB/8ybY1IDqvxVgjV415M0AsIYmUHQ=
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0/go.mod h1:G76L6ql7YCygVzN45BmtSBqA+qwcDuFWMM42tDnGJbE=
github.com/pinax-network/substreams-sink-go v0.5.7 h1:mcv/IFeil1CKQO85BcQK8zzBnQiSrMhGoHRAZ+dDdnI=
github.com/pinax-network/substreams-sink-go v0.5.7/go.mod h1:EbwZgN7FRZY6oNBmA7ufcaHZ215nDo3ejyyasuS3xj4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -518,8 +522,10 @@ github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2 h1:/mcLVdwy6
github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2/go.mod h1:MyG3U4ABuf7ANS8tix+e8UUevN7B9juhEnAbslS/X3M=
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa h1:PJkLMu6Own6V5qYwJDQHgRBCTTW2CxV4xxADMXfw+0M=
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e h1:U7m8WgriNEJcbQBicFkyrkrEfZbQMpYYNefps9KLc7E=
github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e/go.mod h1:FKF6+noY/Qgf+1EgeyQdQllQpmCvKQ6bghSbqRYJhlw=
github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee h1:f0fDCbCNpdMQ0yCB4RqzaOiBbSwe03ejljyxiiaMdHE=
github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee/go.mod h1:FKF6+noY/Qgf+1EgeyQdQllQpmCvKQ6bghSbqRYJhlw=
github.com/streamingfast/firehose-networks v0.2.0 h1:w5YeAmFXvQx+gGd0PnY6sT/pQ4koxGqf1T63+PeqewY=
github.com/streamingfast/firehose-networks v0.2.0/go.mod h1:/XyroRcAUOO9DdxdgHktuVI1U6uKgjj6+1r+zgLEb90=
github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
github.com/streamingfast/logging v0.0.0-20220222131651-12c3943aac2e/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU=
Expand All @@ -533,8 +539,8 @@ github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd h1:P96NMUr1jD
github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd/go.mod h1:XuHkKh98QevgA9M3oWB5Y5Tm6w7iNJ5P3a3ao7UnnfI=
github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs=
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/substreams v1.15.2 h1:6olSI9PMqvX771CEaZB60zwoINcKuaZh+9R7JFXp5nE=
github.com/streamingfast/substreams v1.15.2/go.mod h1:tukQ8ncibm8wzh4dCiC5sXBFAjy3U/+jHnXTfMqDAC4=
github.com/streamingfast/substreams v1.15.10 h1:9WuM0WfJdCoYzKcY+zEIXjXELOIlLRIp4i1bOJF9kqs=
github.com/streamingfast/substreams v1.15.10/go.mod h1:FvBSSvljqxHj8UupLa1YfOm7A2VlaJ141FgnIJaamz8=
github.com/streamingfast/substreams-sink-database-changes v1.1.3 h1:rXeGb/V2mjC8FftumRkMQxG2jtdLfHdLx9UQVUtAqS8=
github.com/streamingfast/substreams-sink-database-changes v1.1.3/go.mod h1:bul4OLl22/M8LlYO9+sxA/5ghUrV7eYrG5NSlfm5m5k=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
13 changes: 13 additions & 0 deletions sinker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ var FlushDuration = metrics.NewCounter("substreams_sink_postgres_store_flush_dur

var HeadBlockNumber = metrics.NewHeadBlockNumber("substreams_sink_postgres")
var HeadBlockTimeDrift = metrics.NewHeadTimeDrift("substreams_sink_postgres")

var SinkInfo = metrics.NewGaugeVec("substreams_sink_sql_info", []string{
"endpoint", // Substreams endpoint
"database", // Database name
"schema", // Schema name
"db_host", // Database host
"manifest", // Manifest path or URL
"output_module", // Output module name
"module_hash", // Output module hash
"block_start", // Start block
"block_end", // End block
"block_restarted_at", // Restarted at block
}, "Information about the SQL sink configuration")
23 changes: 22 additions & 1 deletion sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -29,9 +30,10 @@ type SQLSinker struct {

stats *Stats
lastAppliedBlockNum *uint64
manifestPath string // Store manifest path for metrics
}

func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer) (*SQLSinker, error) {
func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer, manifestPath string) (*SQLSinker, error) {
return &SQLSinker{
Shutter: shutter.New(),
Sinker: sink,
Expand All @@ -42,6 +44,7 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin

stats: NewStats(logger),
lastAppliedBlockNum: nil,
manifestPath: manifestPath,
}, nil
}

Expand Down Expand Up @@ -90,6 +93,24 @@ func (s *SQLSinker) Run(ctx context.Context) {
zap.String("database", s.loader.GetDatabase()),
zap.String("schema", s.loader.GetSchema()),
)

endpoint, _, _ := s.EndpointConfig()
endBlockStr := "open"
if endBlock := s.BlockRange().EndBlock(); endBlock != nil {
endBlockStr = strconv.FormatUint(*endBlock, 10)
}

SinkInfo.SetInt64(1,
endpoint,
s.loader.GetDatabase(),
s.loader.GetSchema(),
s.loader.GetDatabaseHost(),
s.manifestPath,
s.OutputModuleName(),
s.OutputModuleHash(),
strconv.FormatUint(s.BlockRange().StartBlock(), 10),
endBlockStr,
strconv.FormatUint(cursor.Block().Num(), 10))
s.Sinker.Run(ctx, cursor, s)
}

Expand Down
2 changes: 1 addition & 1 deletion sinker/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestInserts(t *testing.T) {
)
s, err := sink.New(sink.SubstreamsModeDevelopment, false, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil)
require.NoError(t, err)
sinker, _ := New(s, l, logger, nil)
sinker, _ := New(s, l, logger, nil, "")

for _, evt := range test.events {
if evt.undoSignal {
Expand Down
Loading