Skip to content

Commit ccc8aa6

Browse files
committed
add metric to track sink info
1 parent a7b2373 commit ccc8aa6

File tree

5 files changed

+43
-3
lines changed

5 files changed

+43
-3
lines changed

cmd/substreams-sink-sql/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
110110
return fmt.Errorf("new db loader: %w", err)
111111
}
112112

113-
postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer)
113+
postgresSinker, err := sinker.New(sink, dbLoader, zlog, tracer, manifestPath)
114114
if err != nil {
115115
return fmt.Errorf("unable to setup postgres sinker: %w", err)
116116
}

db/db.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Loader struct {
3535

3636
database string
3737
schema string
38+
host string
3839
entries *OrderedMap[string, *OrderedMap[string, *Operation]]
3940
entriesCount uint64
4041
tables map[string]*TableInfo
@@ -78,6 +79,7 @@ func NewLoader(
7879
DB: db,
7980
database: dsn.database,
8081
schema: dsn.schema,
82+
host: dsn.host,
8183
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
8284
tables: map[string]*TableInfo{},
8385
batchBlockFlushInterval: batchBlockFlushInterval,
@@ -297,6 +299,10 @@ func (l *Loader) GetSchema() string {
297299
return l.schema
298300
}
299301

302+
func (l *Loader) GetDatabaseHost() string {
303+
return l.host
304+
}
305+
300306
func (l *Loader) HasTable(tableName string) bool {
301307
if _, found := l.tables[tableName]; found {
302308
return true

sinker/metrics.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,16 @@ var FlushDuration = metrics.NewCounter("substreams_sink_postgres_store_flush_dur
1616

1717
var HeadBlockNumber = metrics.NewHeadBlockNumber("substreams_sink_postgres")
1818
var HeadBlockTimeDrift = metrics.NewHeadTimeDrift("substreams_sink_postgres")
19+
20+
var SinkInfo = metrics.NewGaugeVec("substreams_sink_sql_info", []string{
21+
"endpoint", // Substreams endpoint
22+
"database", // Database name
23+
"schema", // Schema name
24+
"db_host", // Database host
25+
"manifest", // Manifest path or URL
26+
"output_module", // Output module name
27+
"module_hash", // Output module hash
28+
"block_start", // Start block
29+
"block_end", // End block
30+
"block_restarted_at", // Restarted at block
31+
}, "Information about the SQL sink configuration")

sinker/sinker.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strconv"
78
"strings"
89
"time"
910

@@ -29,9 +30,10 @@ type SQLSinker struct {
2930

3031
stats *Stats
3132
lastAppliedBlockNum *uint64
33+
manifestPath string // Store manifest path for metrics
3234
}
3335

34-
func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer) (*SQLSinker, error) {
36+
func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer logging.Tracer, manifestPath string) (*SQLSinker, error) {
3537
return &SQLSinker{
3638
Shutter: shutter.New(),
3739
Sinker: sink,
@@ -42,6 +44,7 @@ func New(sink *sink.Sinker, loader *db.Loader, logger *zap.Logger, tracer loggin
4244

4345
stats: NewStats(logger),
4446
lastAppliedBlockNum: nil,
47+
manifestPath: manifestPath,
4548
}, nil
4649
}
4750

@@ -90,6 +93,24 @@ func (s *SQLSinker) Run(ctx context.Context) {
9093
zap.String("database", s.loader.GetDatabase()),
9194
zap.String("schema", s.loader.GetSchema()),
9295
)
96+
97+
endpoint, _, _ := s.EndpointConfig()
98+
endBlockStr := "open"
99+
if endBlock := s.BlockRange().EndBlock(); endBlock != nil {
100+
endBlockStr = strconv.FormatUint(*endBlock, 10)
101+
}
102+
103+
SinkInfo.SetInt64(1,
104+
endpoint,
105+
s.loader.GetDatabase(),
106+
s.loader.GetSchema(),
107+
s.loader.GetDatabaseHost(),
108+
s.manifestPath,
109+
s.OutputModuleName(),
110+
s.OutputModuleHash(),
111+
strconv.FormatUint(s.BlockRange().StartBlock(), 10),
112+
endBlockStr,
113+
strconv.FormatUint(cursor.Block().Num(), 10))
93114
s.Sinker.Run(ctx, cursor, s)
94115
}
95116

sinker/sinker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func TestInserts(t *testing.T) {
216216
)
217217
s, err := sink.New(sink.SubstreamsModeDevelopment, false, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil)
218218
require.NoError(t, err)
219-
sinker, _ := New(s, l, logger, nil)
219+
sinker, _ := New(s, l, logger, nil, "")
220220

221221
for _, evt := range test.events {
222222
if evt.undoSignal {

0 commit comments

Comments
 (0)