Skip to content

Commit 7de8f8a

Browse files
SinkInfo metrics (#16)
* update upstream dependencies * add metric to track sink info * chore: update golang base image from 1.23 to 1.24
1 parent 90d9d2b commit 7de8f8a

File tree

8 files changed

+64
-15
lines changed

8 files changed

+64
-15
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM --platform=$BUILDPLATFORM golang:1.23-bullseye AS build
1+
FROM --platform=$BUILDPLATFORM golang:1.24-bullseye AS build
22

33
WORKDIR /src
44

cmd/substreams-sink-sql/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
110110
return fmt.Errorf("new db loader: %w", err)
111111
}
112112

113-
postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer)
113+
postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer, manifestPath)
114114
if err != nil {
115115
return fmt.Errorf("unable to setup postgres sinker: %w", err)
116116
}

db/db.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Loader struct {
3535

3636
database string
3737
schema string
38+
host string
3839
entries *OrderedMap[string, *OrderedMap[string, *Operation]]
3940
entriesCount uint64
4041
tables map[string]*TableInfo
@@ -78,6 +79,7 @@ func NewLoader(
7879
DB: db,
7980
database: dsn.database,
8081
schema: dsn.schema,
82+
host: dsn.host,
8183
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
8284
tables: map[string]*TableInfo{},
8385
batchBlockFlushInterval: batchBlockFlushInterval,
@@ -297,6 +299,10 @@ func (l *Loader) GetSchema() string {
297299
return l.schema
298300
}
299301

302+
func (l *Loader) GetDatabaseHost() string {
303+
return l.host
304+
}
305+
300306
func (l *Loader) HasTable(tableName string) bool {
301307
if _, found := l.tables[tableName]; found {
302308
return true

go.mod

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module github.com/streamingfast/substreams-sink-sql
22

3-
go 1.23.4
3+
go 1.24.2
44

5-
toolchain go1.23.10
5+
toolchain go1.24.7
66

77
require (
88
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
@@ -15,7 +15,7 @@ require (
1515
github.com/spf13/pflag v1.0.5
1616
github.com/spf13/viper v1.15.0
1717
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
18-
github.com/streamingfast/substreams v1.15.2
18+
github.com/streamingfast/substreams v1.15.10
1919
github.com/streamingfast/substreams-sink v0.5.0
2020
github.com/streamingfast/substreams-sink-database-changes v1.1.3
2121
github.com/stretchr/testify v1.10.0
@@ -48,6 +48,7 @@ require (
4848
github.com/bobg/go-generics/v4 v4.1.2 // indirect
4949
github.com/buger/jsonparser v1.1.1 // indirect
5050
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
51+
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
5152
github.com/charmbracelet/lipgloss v1.0.0 // indirect
5253
github.com/charmbracelet/x/ansi v0.4.2 // indirect
5354
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 // indirect
@@ -83,6 +84,7 @@ require (
8384
github.com/paulmach/orb v0.11.1 // indirect
8485
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
8586
github.com/pierrec/lz4/v4 v4.1.22 // indirect
87+
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 // indirect
8688
github.com/pkg/errors v0.9.1 // indirect
8789
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
8890
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -97,6 +99,7 @@ require (
9799
github.com/spf13/jwalterweatherman v1.1.0 // indirect
98100
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect
99101
github.com/streamingfast/derr v0.0.0-20250321151415-6b4fbbcb1bb5 // indirect
102+
github.com/streamingfast/firehose-networks v0.2.0 // indirect
100103
github.com/subosito/gotenv v1.4.2 // indirect
101104
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf // indirect
102105
github.com/zeebo/errs v1.4.0 // indirect
@@ -152,7 +155,7 @@ require (
152155
github.com/streamingfast/dgrpc v0.0.0-20250423172640-223250ed2391 // indirect
153156
github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2
154157
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa
155-
github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e
158+
github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee
156159
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
157160
github.com/streamingfast/pbgo v0.0.6-0.20240823134334-812f6a16c5cb // indirect
158161
github.com/streamingfast/shutter v1.5.0
@@ -177,4 +180,4 @@ replace github.com/jimsmart/schema => github.com/streamingfast/schema v0.0.0-202
177180

178181
replace github.com/ClickHouse/clickhouse-go/v2 => github.com/YaroShkvorets/clickhouse-go/v2 v2.26.0-sink-sql
179182

180-
replace github.com/streamingfast/substreams-sink => github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout
183+
replace github.com/streamingfast/substreams-sink => github.com/pinax-network/substreams-sink-go v0.5.7

go.sum

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMU
129129
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
130130
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
131131
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
132+
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
133+
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
132134
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
133135
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
134136
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -447,8 +449,10 @@ github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvI
447449
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
448450
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
449451
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
450-
github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout h1:Pulo1fRNs5/SnpYb65R9iqkpI6JvXqFJZufNfoN4yBo=
451-
github.com/pinax-network/substreams-sink-go v0.5.6-idle-timeout/go.mod h1:zw+OTQ2XJPhhbU+oJI99+BFeJ1ElhYv+4teWNkWEyMA=
452+
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 h1:chRRgzgzmFzICbB/8ybY1IDqvxVgjV415M0AsIYmUHQ=
453+
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0/go.mod h1:G76L6ql7YCygVzN45BmtSBqA+qwcDuFWMM42tDnGJbE=
454+
github.com/pinax-network/substreams-sink-go v0.5.7 h1:mcv/IFeil1CKQO85BcQK8zzBnQiSrMhGoHRAZ+dDdnI=
455+
github.com/pinax-network/substreams-sink-go v0.5.7/go.mod h1:EbwZgN7FRZY6oNBmA7ufcaHZ215nDo3ejyyasuS3xj4=
452456
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
453457
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
454458
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -518,8 +522,10 @@ github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2 h1:/mcLVdwy6
518522
github.com/streamingfast/dhammer v0.0.0-20220506192416-3797a7906da2/go.mod h1:MyG3U4ABuf7ANS8tix+e8UUevN7B9juhEnAbslS/X3M=
519523
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa h1:PJkLMu6Own6V5qYwJDQHgRBCTTW2CxV4xxADMXfw+0M=
520524
github.com/streamingfast/dmetrics v0.0.0-20240214191810-524a5c58fbaa/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
521-
github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e h1:U7m8WgriNEJcbQBicFkyrkrEfZbQMpYYNefps9KLc7E=
522-
github.com/streamingfast/dstore v0.1.1-0.20250217165048-d508dcc6b33e/go.mod h1:FKF6+noY/Qgf+1EgeyQdQllQpmCvKQ6bghSbqRYJhlw=
525+
github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee h1:f0fDCbCNpdMQ0yCB4RqzaOiBbSwe03ejljyxiiaMdHE=
526+
github.com/streamingfast/dstore v0.1.1-0.20250609173504-95368d3441ee/go.mod h1:FKF6+noY/Qgf+1EgeyQdQllQpmCvKQ6bghSbqRYJhlw=
527+
github.com/streamingfast/firehose-networks v0.2.0 h1:w5YeAmFXvQx+gGd0PnY6sT/pQ4koxGqf1T63+PeqewY=
528+
github.com/streamingfast/firehose-networks v0.2.0/go.mod h1:/XyroRcAUOO9DdxdgHktuVI1U6uKgjj6+1r+zgLEb90=
523529
github.com/streamingfast/logging v0.0.0-20210811175431-f3b44b61606a/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
524530
github.com/streamingfast/logging v0.0.0-20220222131651-12c3943aac2e/go.mod h1:4GdqELhZOXj4xwc4IaBmzofzdErGynnaSzuzxy0ZIBo=
525531
github.com/streamingfast/logging v0.0.0-20220304214715-bc750a74b424/go.mod h1:VlduQ80JcGJSargkRU4Sg9Xo63wZD/l8A5NC/Uo1/uU=
@@ -533,8 +539,8 @@ github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd h1:P96NMUr1jD
533539
github.com/streamingfast/schema v0.0.0-20240621180609-1de2e05fe3bd/go.mod h1:XuHkKh98QevgA9M3oWB5Y5Tm6w7iNJ5P3a3ao7UnnfI=
534540
github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs=
535541
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
536-
github.com/streamingfast/substreams v1.15.2 h1:6olSI9PMqvX771CEaZB60zwoINcKuaZh+9R7JFXp5nE=
537-
github.com/streamingfast/substreams v1.15.2/go.mod h1:tukQ8ncibm8wzh4dCiC5sXBFAjy3U/+jHnXTfMqDAC4=
542+
github.com/streamingfast/substreams v1.15.10 h1:9WuM0WfJdCoYzKcY+zEIXjXELOIlLRIp4i1bOJF9kqs=
543+
github.com/streamingfast/substreams v1.15.10/go.mod h1:FvBSSvljqxHj8UupLa1YfOm7A2VlaJ141FgnIJaamz8=
538544
github.com/streamingfast/substreams-sink-database-changes v1.1.3 h1:rXeGb/V2mjC8FftumRkMQxG2jtdLfHdLx9UQVUtAqS8=
539545
github.com/streamingfast/substreams-sink-database-changes v1.1.3/go.mod h1:bul4OLl22/M8LlYO9+sxA/5ghUrV7eYrG5NSlfm5m5k=
540546
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

sinker/metrics.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,16 @@ var FlushDuration = metrics.NewCounter("substreams_sink_postgres_store_flush_dur
1616

1717
var HeadBlockNumber = metrics.NewHeadBlockNumber("substreams_sink_postgres")
1818
var HeadBlockTimeDrift = metrics.NewHeadTimeDrift("substreams_sink_postgres")
19+
20+
var SinkInfo = metrics.NewGaugeVec("substreams_sink_sql_info", []string{
21+
"endpoint", // Substreams endpoint
22+
"database", // Database name
23+
"schema", // Schema name
24+
"db_host", // Database host
25+
"manifest", // Manifest path or URL
26+
"output_module", // Output module name
27+
"module_hash", // Output module hash
28+
"block_start", // Start block
29+
"block_end", // End block
30+
"block_restarted_at", // Restarted at block
31+
}, "Information about the SQL sink configuration")

sinker/sinker.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strconv"
78
"strings"
89
"time"
910

@@ -29,9 +30,10 @@ type SQLSinker struct {
2930

3031
stats *Stats
3132
lastAppliedBlockNum *uint64
33+
manifestPath string // Store manifest path for metrics
3234
}
3335

34-
func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer) (*SQLSinker, error) {
36+
func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer, manifestPath string) (*SQLSinker, error) {
3537
return &SQLSinker{
3638
Shutter: shutter.New(),
3739
Sinker: sink,
@@ -42,6 +44,7 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin
4244

4345
stats: NewStats(logger),
4446
lastAppliedBlockNum: nil,
47+
manifestPath: manifestPath,
4548
}, nil
4649
}
4750

@@ -90,6 +93,24 @@ func (s *SQLSinker) Run(ctx context.Context) {
9093
zap.String("database", s.loader.GetDatabase()),
9194
zap.String("schema", s.loader.GetSchema()),
9295
)
96+
97+
endpoint, _, _ := s.EndpointConfig()
98+
endBlockStr := "open"
99+
if endBlock := s.BlockRange().EndBlock(); endBlock != nil {
100+
endBlockStr = strconv.FormatUint(*endBlock, 10)
101+
}
102+
103+
SinkInfo.SetInt64(1,
104+
endpoint,
105+
s.loader.GetDatabase(),
106+
s.loader.GetSchema(),
107+
s.loader.GetDatabaseHost(),
108+
s.manifestPath,
109+
s.OutputModuleName(),
110+
s.OutputModuleHash(),
111+
strconv.FormatUint(s.BlockRange().StartBlock(), 10),
112+
endBlockStr,
113+
strconv.FormatUint(cursor.Block().Num(), 10))
93114
s.Sinker.Run(ctx, cursor, s)
94115
}
95116

sinker/sinker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func TestInserts(t *testing.T) {
216216
)
217217
s, err := sink.New(sink.SubstreamsModeDevelopment, false, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil)
218218
require.NoError(t, err)
219-
sinker, _ := New(s, l, logger, nil)
219+
sinker, _ := New(s, l, logger, nil, "")
220220

221221
for _, evt := range test.events {
222222
if evt.undoSignal {

0 commit comments

Comments
 (0)