From a7b237305f881b5c2bcf61b6c2d9cdec595f3bf5 Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Wed, 17 Sep 2025 11:07:31 -0400 Subject: [PATCH 1/3] update upstream dependencies --- go.mod | 13 ++++++++----- go.sum | 18 ++++++++++++------ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 3f3d0f81..fef60503 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/streamingfast/substreams-sink-sql -go 1.23.4 +go 1.24.2 -toolchain go1.23.10 +toolchain go1.24.7 require ( github.com/ClickHouse/clickhouse-go/v2 v2.25.0 @@ -15,7 +15,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 - github.com/streamingfast/substreams v1.15.2 + github.com/streamingfast/substreams v1.15.10 github.com/streamingfast/substreams-sink v0.5.0 github.com/streamingfast/substreams-sink-database-changes v1.1.3 github.com/stretchr/testify v1.10.0 @@ -48,6 +48,7 @@ require ( github.com/bobg/go-generics/v4 v4.1.2 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/charmbracelet/lipgloss v1.0.0 // indirect github.com/charmbracelet/x/ansi v0.4.2 // indirect github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 // indirect @@ -83,6 +84,7 @@ require ( github.com/paulmach/orb v0.11.1 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -97,6 +99,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect github.com/streamingfast/derr v0.0.0-20250321151415-6b4fbbcb1bb5 // indirect + github.com/streamingfast/firehose-networks v0.2.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect github.com/zeebo/errs v1.4.0 // indirect @@ -152,7 +155,7 @@ require ( github.com/streamingfast/dgrpc v0.0.0-20250423172640-223250ed2391 // indirect github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2 github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa - github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e + github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb // indirect github.com/streamingfast/shutter v1.5.0 @@ -177,4 +180,4 @@ replace github.com/jimsmart/schema => github.com/streamingfast/schema v0.0.0-202 replace github.com/ClickHouse/clickhouse-go/v2 => github.com/YaroShkvorets/clickhouse-go/v2 v2.26.0-sink-sql -replace github.com/streamingfast/substreams-sink => github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout +replace github.com/streamingfast/substreams-sink => github.com/pinax-network/substreams-sink-go v0.5.7 diff --git a/go.sum b/go.sum index 1d690fee..4aa967e7 100644 --- a/go.sum +++ b/go.sum @@ -129,6 +129,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= +github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -447,8 +449,10 @@ github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvI github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout h1:Pulo1fRNs5/SnpYb65R9iqkpI6JvXqFJZufNfoN4yBo= -github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout/go.mod h1:zw+OTQ2XJPhhbU+oJI99+BFeJ1ElhYv+4teWNkWEyMA= +github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 h1:chRRgzgzmFzICbB/8ybY1IDqvxVgjV415M0AsIYmUHQ= +github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0/go.mod h1:G76L6ql7YCygVzN45BmtSBqA+qwcDuFWMM42tDnGJbE= +github.com/pinax-network/substreams-sink-go v0.5.7 h1:mcv/IFeil1CKQO85BcQK8zzBnQiSrMhGoHRAZ+dDdnI= +github.com/pinax-network/substreams-sink-go v0.5.7/go.mod h1:EbwZgN7FRZY6oNBmA7ufcaHZ215nDo3ejyyasuS3xj4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -518,8 +522,10 @@ github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2 h1:/mcLVdwy6 github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2/go.mod h1:MyG3U4ABuf7ANS8tix+e8UUevN7B9juhEnAbslS/X3M= github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa h1:PJkLMu6Own6V5qYwJDQHgRBCTTW2CxV4xxADMXfw+0M= github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw= -github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e h1:U7m8WgriNEJcbQBicFkyrkrEfZbQMpYYNefps9KLc7E= -github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e/go.mod h1:FKF6+noY/Qgf+1EgeyQdQllQpmCvKQ6bghSbqRYJhlw= +github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee h1:f0fDCbCNpdMQ0yCB4RqzaOiBbSwe03ejljyxiiaMdHE= +github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee/go.mod h1:FKF6+noY/Qgf+1EgeyQdQllQpmCvKQ6bghSbqRYJhlw= +github.com/streamingfast/firehose-networks v0.2.0 h1:w5YeAmFXvQx+gGd0PnY6sT/pQ4koxGqf1T63+PeqewY= +github.com/streamingfast/firehose-networks v0.2.0/go.mod h1:/XyroRcAUOO9DdxdgHktuVI1U6uKgjj6+1r+zgLEb90= github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20220222131651-12c3943aac2e/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo= github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU= @@ -533,8 +539,8 @@ github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd h1:P96NMUr1jD github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd/go.mod h1:XuHkKh98QevgA9M3oWB5Y5Tm6w7iNJ5P3a3ao7UnnfI= github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs= github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= -github.com/streamingfast/substreams v1.15.2 h1:6olSI9PMqvX771CEaZB60zwoINcKuaZh+9R7JFXp5nE= -github.com/streamingfast/substreams v1.15.2/go.mod h1:tukQ8ncibm8wzh4dCiC5sXBFAjy3U/+jHnXTfMqDAC4= +github.com/streamingfast/substreams v1.15.10 h1:9WuM0WfJdCoYzKcY+zEIXjXELOIlLRIp4i1bOJF9kqs= +github.com/streamingfast/substreams v1.15.10/go.mod h1:FvBSSvljqxHj8UupLa1YfOm7A2VlaJ141FgnIJaamz8= github.com/streamingfast/substreams-sink-database-changes v1.1.3 h1:rXeGb/V2mjC8FftumRkMQxG2jtdLfHdLx9UQVUtAqS8= github.com/streamingfast/substreams-sink-database-changes v1.1.3/go.mod h1:bul4OLl22/M8LlYO9+sxA/5ghUrV7eYrG5NSlfm5m5k= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= From ccc8aa62311de02f804c2174af68f501551af77d Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Wed, 17 Sep 2025 11:07:42 -0400 Subject: [PATCH 2/3] add metric to track sink info --- cmd/substreams-sink-sql/run.go | 2 +- db/db.go | 6 ++++++ sinker/metrics.go | 13 +++++++++++++ sinker/sinker.go | 23 ++++++++++++++++++++++- sinker/sinker_test.go | 2 +- 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index 8215d583..26bca5c2 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -110,7 +110,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new db loader: %w", err) } - postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer) + postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer, manifestPath) if err != nil { return fmt.Errorf("unable to setup postgres sinker: %w", err) } diff --git a/db/db.go b/db/db.go index 16a4381f..5186ad76 100644 --- a/db/db.go +++ b/db/db.go @@ -35,6 +35,7 @@ type Loader struct { database string schema string + host string entries *OrderedMap[string, *OrderedMap[string, *Operation]] entriesCount uint64 tables map[string]*TableInfo @@ -78,6 +79,7 @@ func NewLoader( DB: db, database: dsn.database, schema: dsn.schema, + host: dsn.host, entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](), tables: map[string]*TableInfo{}, batchBlockFlushInterval: batchBlockFlushInterval, @@ -297,6 +299,10 @@ func (l *Loader) GetSchema() string { return l.schema } +func (l *Loader) GetDatabaseHost() string { + return l.host +} + func (l *Loader) HasTable(tableName string) bool { if _, found := l.tables[tableName]; found { return true diff --git a/sinker/metrics.go b/sinker/metrics.go index bb8b3221..db8aa1ac 100644 --- a/sinker/metrics.go +++ b/sinker/metrics.go @@ -16,3 +16,16 @@ var FlushDuration = metrics.NewCounter("substreams_sink_postgres_store_flush_dur var HeadBlockNumber = metrics.NewHeadBlockNumber("substreams_sink_postgres") var HeadBlockTimeDrift = metrics.NewHeadTimeDrift("substreams_sink_postgres") + +var SinkInfo = metrics.NewGaugeVec("substreams_sink_sql_info", []string{ + "endpoint", // Substreams endpoint + "database", // Database name + "schema", // Schema name + "db_host", // Database host + "manifest", // Manifest path or URL + "output_module", // Output module name + "module_hash", // Output module hash + "block_start", // Start block + "block_end", // End block + "block_restarted_at", // Restarted at block +}, "Information about the SQL sink configuration") diff --git a/sinker/sinker.go b/sinker/sinker.go index 599487ac..90ef4dbf 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" @@ -29,9 +30,10 @@ type SQLSinker struct { stats *Stats lastAppliedBlockNum *uint64 + manifestPath string // Store manifest path for metrics } -func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer) (*SQLSinker, error) { +func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer, manifestPath string) (*SQLSinker, error) { return &SQLSinker{ Shutter: shutter.New(), Sinker: sink, @@ -42,6 +44,7 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin stats: NewStats(logger), lastAppliedBlockNum: nil, + manifestPath: manifestPath, }, nil } @@ -90,6 +93,24 @@ func (s *SQLSinker) Run(ctx context.Context) { zap.String("database", s.loader.GetDatabase()), zap.String("schema", s.loader.GetSchema()), ) + + endpoint, _, _ := s.EndpointConfig() + endBlockStr := "open" + if endBlock := s.BlockRange().EndBlock(); endBlock != nil { + endBlockStr = strconv.FormatUint(*endBlock, 10) + } + + SinkInfo.SetInt64(1, + endpoint, + s.loader.GetDatabase(), + s.loader.GetSchema(), + s.loader.GetDatabaseHost(), + s.manifestPath, + s.OutputModuleName(), + s.OutputModuleHash(), + strconv.FormatUint(s.BlockRange().StartBlock(), 10), + endBlockStr, + strconv.FormatUint(cursor.Block().Num(), 10)) s.Sinker.Run(ctx, cursor, s) } diff --git a/sinker/sinker_test.go b/sinker/sinker_test.go index 05642062..be8b8a0e 100644 --- a/sinker/sinker_test.go +++ b/sinker/sinker_test.go @@ -216,7 +216,7 @@ func TestInserts(t *testing.T) { ) s, err := sink.New(sink.SubstreamsModeDevelopment, false, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil) require.NoError(t, err) - sinker, _ := New(s, l, logger, nil) + sinker, _ := New(s, l, logger, nil, "") for _, evt := range test.events { if evt.undoSignal { From 005aa2835004ab12e31ec591a8d0907869e94ee5 Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Wed, 17 Sep 2025 11:12:34 -0400 Subject: [PATCH 3/3] chore: update golang base image from 1.23 to 1.24 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index b7d2677a..9c14a09b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.23-bullseye AS build +FROM --platform=$BUILDPLATFORM golang:1.24-bullseye AS build WORKDIR /src