Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitoring endpoints #22

Merged
merged 13 commits into from
Nov 7, 2023
1,376 changes: 947 additions & 429 deletions bridgectrl/pb/query.pb.go

Large diffs are not rendered by default.

1,423 changes: 271 additions & 1,152 deletions bridgectrl/pb/query.pb.gw.go

Large diffs are not rendered by default.

147 changes: 104 additions & 43 deletions bridgectrl/pb/query_grpc.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion db/pgstorage/migrations/0008.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ CREATE UNIQUE INDEX IF NOT EXISTS deposit_test_uidx ON sync.deposit_test (networ
CREATE INDEX IF NOT EXISTS deposit_dest_addr_idx ON sync.deposit (dest_addr);
CREATE INDEX IF NOT EXISTS deposit_test_dest_addr_idx ON sync.deposit_test (dest_addr);
CREATE INDEX IF NOT EXISTS claim_dest_addr_idx ON sync.claim (dest_addr);
CREATE INDEX IF NOT EXISTS claim_test_dest_addr_idx ON sync.claim (dest_addr);
CREATE INDEX IF NOT EXISTS claim_test_dest_addr_idx ON sync.claim_test (dest_addr);
62 changes: 62 additions & 0 deletions db/pgstorage/pgstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,37 @@ func (p *PostgresStorage) GetPendingTransactions(ctx context.Context, destAddr s
return deposits, nil
}

// GetNotReadyTransactions returns all the deposit transactions with ready_for_claim = false
func (p *PostgresStorage) GetNotReadyTransactions(ctx context.Context, limit uint, offset uint, dbTx pgx.Tx) ([]*etherman.Deposit, error) {
getDepositsSQL := fmt.Sprintf(`SELECT d.id, leaf_type, orig_net, orig_addr, amount, dest_net, dest_addr, deposit_cnt, block_id, b.block_num, d.network_id, tx_hash, metadata, ready_for_claim, b.received_at
FROM sync.deposit%[1]v as d INNER JOIN sync.block%[1]v as b ON d.network_id = b.network_id AND d.block_id = b.id
WHERE ready_for_claim = false
ORDER BY d.block_id DESC, d.deposit_cnt DESC LIMIT $1 OFFSET $2`, p.tableSuffix)

rows, err := p.getExecQuerier(dbTx).Query(ctx, getDepositsSQL, limit, offset)
if err != nil {
return nil, err
}

deposits := make([]*etherman.Deposit, 0, len(rows.RawValues()))

for rows.Next() {
var (
deposit etherman.Deposit
amount string
)
err = rows.Scan(&deposit.Id, &deposit.LeafType, &deposit.OriginalNetwork, &deposit.OriginalAddress, &amount, &deposit.DestinationNetwork, &deposit.DestinationAddress,
&deposit.DepositCount, &deposit.BlockID, &deposit.BlockNumber, &deposit.NetworkID, &deposit.TxHash, &deposit.Metadata, &deposit.ReadyForClaim, &deposit.Time)
if err != nil {
return nil, err
}
deposit.Amount, _ = new(big.Int).SetString(amount, 10) //nolint:gomnd
deposits = append(deposits, &deposit)
}

return deposits, nil
}

// GetDepositCount gets the deposit count for the destination address.
func (p *PostgresStorage) GetDepositCount(ctx context.Context, destAddr string, dbTx pgx.Tx) (uint64, error) {
getDepositCountSQL := fmt.Sprintf("SELECT COUNT(*) FROM sync.deposit%[1]v WHERE dest_addr = $1", p.tableSuffix)
Expand Down Expand Up @@ -606,6 +637,37 @@ func (p *PostgresStorage) GetClaimTxsByStatus(ctx context.Context, statuses []ct
return mTxs, nil
}

func (p *PostgresStorage) GetClaimTxsByStatusWithLimit(ctx context.Context, statuses []ctmtypes.MonitoredTxStatus, limit uint, offset uint, dbTx pgx.Tx) ([]ctmtypes.MonitoredTx, error) {
getMonitoredTxsSQL := fmt.Sprintf("SELECT * FROM sync.monitored_txs%[1]v WHERE status = ANY($1) ORDER BY created_at DESC LIMIT $2 OFFSET $3", p.tableSuffix)
rows, err := p.getExecQuerier(dbTx).Query(ctx, getMonitoredTxsSQL, pq.Array(statuses), limit, offset)
if errors.Is(err, pgx.ErrNoRows) {
return []ctmtypes.MonitoredTx{}, nil
} else if err != nil {
return nil, err
}

mTxs := make([]ctmtypes.MonitoredTx, 0, len(rows.RawValues()))
for rows.Next() {
var (
value string
history [][]byte
)
mTx := ctmtypes.MonitoredTx{}
err = rows.Scan(&mTx.ID, &mTx.BlockID, &mTx.From, &mTx.To, &mTx.Nonce, &value, &mTx.Data, &mTx.Gas, &mTx.Status, pq.Array(&history), &mTx.CreatedAt, &mTx.UpdatedAt)
if err != nil {
return mTxs, err
}
mTx.Value, _ = new(big.Int).SetString(value, 10) //nolint:gomnd
mTx.History = make(map[common.Hash]bool)
for _, h := range history {
mTx.History[common.BytesToHash(h)] = true
}
mTxs = append(mTxs, mTx)
}

return mTxs, nil
}

// GetClaimTxById gets the monitored transactions by id (depositCount)
func (p *PostgresStorage) GetClaimTxById(ctx context.Context, id uint, dbTx pgx.Tx) (*ctmtypes.MonitoredTx, error) {
getClaimSql := fmt.Sprintf("SELECT * FROM sync.monitored_txs%[1]v WHERE id = $1", p.tableSuffix)
Expand Down
112 changes: 67 additions & 45 deletions proto/src/proto/bridge/v1/query.proto
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you delete the prefix /priapi/v1/ob/

Original file line number Diff line number Diff line change
Expand Up @@ -14,111 +14,91 @@ service BridgeService {
/// Get api version
rpc CheckAPI(CheckAPIRequest) returns (CheckAPIResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/api",
additional_bindings: {
get: "/api"
}
get: "/api",
};
}

/// Get bridges for the destination address both in L1 and L2
rpc GetBridges(GetBridgesRequest) returns (GetBridgesResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/bridges/{dest_addr}",
additional_bindings: {
get: "/bridges/{dest_addr}"
}
get: "/bridges/{dest_addr}",
};
}

/// Get the merkle proof for the specific deposit
rpc GetProof(GetProofRequest) returns (GetProofResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/merkle-proof",
additional_bindings: {
get: "/merkle-proof"
}
get: "/merkle-proof",
};
}

/// Get the specific deposit
rpc GetBridge(GetBridgeRequest) returns (GetBridgeResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/bridge",
additional_bindings: {
get: "/bridge"
}
get: "/bridge",
};
}

/// Get claims for the specific smart contract address both in L1 and L2
rpc GetClaims(GetClaimsRequest) returns (GetClaimsResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/claims/{dest_addr}",
additional_bindings: {
get: "/claims/{dest_addr}"
}
get: "/claims/{dest_addr}",
};
}

/// Get token wrapped for the specific smart contract address both in L1 and L2
rpc GetTokenWrapped(GetTokenWrappedRequest) returns (GetTokenWrappedResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/tokenwrapped",
additional_bindings: {
get: "/tokenwrapped"
}
get: "/bridge/tokenwrapped",
};
}

/// Get the latest price of the specified coins
rpc GetCoinPrice(GetCoinPriceRequest) returns (CommonCoinPricesResponse) {
option (google.api.http) = {
post: "/priapi/v1/ob/bridge/coin-price",
additional_bindings: {
post: "/coin-price",
body: "*",
},
post: "/coin-price",
body: "*",
};
}

/// Get the list of all the main coins of a specified network
rpc GetMainCoins(GetMainCoinsRequest) returns (CommonCoinsResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/main-coins/{networkId}",
additional_bindings: {
get: "/main-coins/{networkId}"
}
get: "/main-coins/{networkId}",
};
}

/// Get the pending (not claimed) transactions of an account
rpc GetPendingTransactions(GetPendingTransactionsRequest) returns (CommonTransactionsResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/pending/{destAddr}",
additional_bindings: {
get: "/pending/{destAddr}"
}
get: "/pending/{destAddr}",
};
}

/// Get all the transactions of an account. Similar to GetBridges but the field names are changed
rpc GetAllTransactions(GetAllTransactionsRequest) returns (CommonTransactionsResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/all/{destAddr}",
additional_bindings: {
get: "/all/{destAddr}"
}
get: "/all/{destAddr}",
};
}

rpc GetSmtProof(GetSmtProofRequest) returns (CommonProofResponse) {
option (google.api.http) = {
get: "/priapi/v1/ob/bridge/smt-proof",
additional_bindings: {
get: "/smt-proof"
}
get: "/smt-proof",
};
}

/// Get all transactions with ready_for_claim = false
rpc GetNotReadyTransactions(GetNotReadyTransactionsRequest) returns (CommonTransactionsResponse) {
option (google.api.http) = {
get: "/not-ready",
};
}

/// Get list of monitored transactions, filtered by status
rpc GetMonitoredTxsByStatus(GetMonitoredTxsByStatusRequest) returns (CommonMonitoredTxsResponse) {
option (google.api.http) = {
get: "/monitored-txs/status/{status}",
};
}
}
Expand Down Expand Up @@ -214,6 +194,23 @@ message Transaction {
string metadata = 15;
}

// Monitored tx
message MonitoredTx {
uint64 id = 1;
string from = 2; // Sender address of the tx
string to = 3; // Receiver address of the tx
uint64 nonce = 4; // Nonce used to create the tx
string value = 5; // Transaction value
string data = 6; // Transaction data
uint64 gas = 7;
string gasPrice = 8;
string status = 9; // created/confirmed/failed
uint64 blockId = 10;
repeated string history = 11; // List of all transaction hashes created from this tx and sent to the network. The order of transactions is NOT guaranteed.
uint64 createdAt = 12; // Unix timestamp ms
uint64 updatedAt = 13; // Unix timestamp ms
}

// Get requests

message CheckAPIRequest {}
Expand Down Expand Up @@ -265,6 +262,17 @@ message GetAllTransactionsRequest {
uint32 limit = 3;
}

message GetNotReadyTransactionsRequest {
uint64 offset = 1;
uint32 limit = 2;
}

message GetMonitoredTxsByStatusRequest {
string status = 1; // created/failed/confirmed
uint64 offset = 2;
uint32 limit = 3;
}

// Get responses

message CheckAPIResponse {
Expand Down Expand Up @@ -359,4 +367,18 @@ message ProofDetail {
message GetSmtProofRequest {
uint32 index = 1;
uint64 fromChain = 2;
}

message CommonMonitoredTxsResponse {
uint32 code = 1;
string msg = 2;
string error_code = 3;
string error_message = 4;
string detailMsg = 5;
MonitoredTxsDetail data = 6;
}

message MonitoredTxsDetail {
bool hasNext = 1;
repeated MonitoredTx transactions = 2;
}
12 changes: 12 additions & 0 deletions sentinel/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@
"controlBehavior": 1,
"threshold": 50,
"maxQueueingTimeMs": 3000
},
{
"resource": "/bridge.v1.BridgeService/GetNotReadyTransactions",
"controlBehavior": 1,
"threshold": 1,
"maxQueueingTimeMs": 3000
},
{
"resource": "/bridge.v1.BridgeService/GetMonitoredTxsByStatus",
"controlBehavior": 1,
"threshold": 1,
"maxQueueingTimeMs": 3000
}
]
}
2 changes: 2 additions & 0 deletions server/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ type BridgeServiceStorage interface {
GetDepositCount(ctx context.Context, destAddr string, dbTx pgx.Tx) (uint64, error)
GetTokenWrapped(ctx context.Context, originalNetwork uint, originalTokenAddress common.Address, dbTx pgx.Tx) (*etherman.TokenWrapped, error)
GetPendingTransactions(ctx context.Context, destAddr string, limit uint, offset uint, dbTx pgx.Tx) ([]*etherman.Deposit, error)
GetNotReadyTransactions(ctx context.Context, limit uint, offset uint, dbTx pgx.Tx) ([]*etherman.Deposit, error)
GetClaimTxById(ctx context.Context, id uint, dbTx pgx.Tx) (*ctmtypes.MonitoredTx, error)
GetClaimTxsByStatusWithLimit(ctx context.Context, statuses []ctmtypes.MonitoredTxStatus, limit uint, offset uint, dbTx pgx.Tx) ([]ctmtypes.MonitoredTx, error)
}
10 changes: 9 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

const (
bridgeEndpointPath = "/priapi/v1/ob/bridge"
)

func RegisterNacos(cfg nacos.Config) {
log.Info(fmt.Sprintf("nacos config NacosUrls %s NamespaceId %s ApplicationName %s ExternalListenAddr %s", cfg.NacosUrls, cfg.NamespaceId, cfg.ApplicationName, cfg.ExternalListenAddr))
if cfg.NacosUrls != "" {
Expand Down Expand Up @@ -151,14 +155,18 @@ func runRestServer(ctx context.Context, grpcPort, httpPort string) error {
})
mux := runtime.NewServeMux(muxJSONOpt, muxHealthOpt)

httpMux := http.NewServeMux()
httpMux.Handle(bridgeEndpointPath+"/", http.StripPrefix(bridgeEndpointPath, mux))
httpMux.Handle("/", mux)

if err := pb.RegisterBridgeServiceHandler(ctx, mux, conn); err != nil {
return err
}

srv := &http.Server{
ReadTimeout: 1 * time.Second, //nolint:gomnd
Addr: ":" + httpPort,
Handler: allowCORS(mux),
Handler: allowCORS(httpMux),
}

c := make(chan os.Signal, 1)
Expand Down
Loading
Loading