diff --git a/.changeset/ripe-plums-agree.md b/.changeset/ripe-plums-agree.md new file mode 100644 index 000000000..02eaef32f --- /dev/null +++ b/.changeset/ripe-plums-agree.md @@ -0,0 +1,5 @@ +--- +"chainlink-deployments-framework": minor +--- + +(feat) Support catalog deletes for the datastore diff --git a/datastore/catalog/remote/address_ref_store.go b/datastore/catalog/remote/address_ref_store.go index 21dbada35..ddbdba308 100644 --- a/datastore/catalog/remote/address_ref_store.go +++ b/datastore/catalog/remote/address_ref_store.go @@ -212,97 +212,31 @@ func (s *catalogAddressRefStore) Filter( } func (s *catalogAddressRefStore) Add(_ context.Context, record datastore.AddressRef) error { - // Convert the datastore record to protobuf - protoRef := s.addressRefToProto(record) - - // Create the edit request with INSERT semantics - editRequest := &pb.AddressReferenceEditRequest{ - Record: protoRef, - Semantics: pb.EditSemantics_SEMANTICS_INSERT, - } - - // Create the request - editReq := &pb.DataAccessRequest{ + req := &pb.DataAccessRequest{ Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{ - AddressReferenceEditRequest: editRequest, + AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{ + Record: s.addressRefToProto(record), + Semantics: pb.EditSemantics_SEMANTICS_INSERT, + }, }, } - // Create a bidirectional stream with the initial request for HMAC - stream, err := s.client.DataAccess(editReq) - if err != nil { - return fmt.Errorf("failed to create data access stream: %w", err) - } - - if sendErr := stream.Send(editReq); sendErr != nil { - return fmt.Errorf("failed to send edit request: %w", sendErr) - } - - // Receive the edit response - editResponse, err := stream.Recv() - if err != nil { - return fmt.Errorf("failed to receive edit response: %w", err) - } - - // Check for errors in the edit response - if err := parseResponseStatus(editResponse.Status); err != nil { - return fmt.Errorf("add address ref failed: %w", err) - } - - // Extract the edit response to validate it - editResp := editResponse.GetAddressReferenceEditResponse() - if editResp == nil { - return errors.New("unexpected edit response type") - } - - return nil + return executeEdit(s.client, req, + (*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil) } func (s *catalogAddressRefStore) Upsert(_ context.Context, record datastore.AddressRef) error { - // Convert the datastore record to protobuf - protoRef := s.addressRefToProto(record) - - // Create the edit request with UPSERT semantics - editRequest := &pb.AddressReferenceEditRequest{ - Record: protoRef, - Semantics: pb.EditSemantics_SEMANTICS_UPSERT, - } - - // Create the request - request := &pb.DataAccessRequest{ + req := &pb.DataAccessRequest{ Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{ - AddressReferenceEditRequest: editRequest, + AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{ + Record: s.addressRefToProto(record), + Semantics: pb.EditSemantics_SEMANTICS_UPSERT, + }, }, } - // Create a bidirectional stream with the initial request for HMAC - stream, err := s.client.DataAccess(request) - if err != nil { - return fmt.Errorf("failed to create data access stream: %w", err) - } - - if sendErr := stream.Send(request); sendErr != nil { - return fmt.Errorf("failed to send edit request: %w", sendErr) - } - - // Receive the response - response, err := stream.Recv() - if err != nil { - return fmt.Errorf("failed to receive response: %w", err) - } - - // Check for errors in the response - if err := parseResponseStatus(response.Status); err != nil { - return fmt.Errorf("upsert address ref failed: %w", err) - } - - // Extract the edit response to validate it - editResponse := response.GetAddressReferenceEditResponse() - if editResponse == nil { - return errors.New("unexpected response type") - } - - return nil + return executeEdit(s.client, req, + (*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil) } func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.AddressRef) error { @@ -319,56 +253,42 @@ func (s *catalogAddressRefStore) Update(ctx context.Context, record datastore.Ad } // Record exists, proceed with updating it - // Convert the datastore record to protobuf - protoRef := s.addressRefToProto(record) - - // Create the edit request with UPDATE semantics - editRequest := &pb.AddressReferenceEditRequest{ - Record: protoRef, - Semantics: pb.EditSemantics_SEMANTICS_UPDATE, - } - - // Create the request - editReq := &pb.DataAccessRequest{ + req := &pb.DataAccessRequest{ Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{ - AddressReferenceEditRequest: editRequest, + AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{ + Record: s.addressRefToProto(record), + Semantics: pb.EditSemantics_SEMANTICS_UPDATE, + }, }, } - // Create a bidirectional stream with the initial request for HMAC - stream, streamErr := s.client.DataAccess(editReq) - if streamErr != nil { - return fmt.Errorf("failed to create data access stream: %w", streamErr) - } - - if sendErr := stream.Send(editReq); sendErr != nil { - return fmt.Errorf("failed to send edit request: %w", sendErr) - } - - // Receive the edit response - editResponse, err := stream.Recv() - if err != nil { - return fmt.Errorf("failed to receive edit response: %w", err) - } + return executeEdit(s.client, req, + (*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil) +} - // Check for errors in the edit response - if err := parseResponseStatus(editResponse.Status); err != nil { - return fmt.Errorf("update address ref failed: %w", err) - } +func (s *catalogAddressRefStore) Delete(_ context.Context, key datastore.AddressRefKey) error { + return s.deleteRecord(key) +} - // Extract the edit response to validate it - editResp := editResponse.GetAddressReferenceEditResponse() - if editResp == nil { - return errors.New("unexpected edit response type") +func (s *catalogAddressRefStore) deleteRecord(key datastore.AddressRefKey) error { + req := &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{ + AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{ + Record: &pb.AddressReference{ + Domain: s.domain, + Environment: s.environment, + ChainSelector: key.ChainSelector(), + ContractType: string(key.Type()), + Version: key.Version().String(), + Qualifier: key.Qualifier(), + }, + Semantics: pb.EditSemantics_SEMANTICS_DELETE, + }, + }, } - return nil -} - -func (s *catalogAddressRefStore) Delete(_ context.Context, _ datastore.AddressRefKey) error { - // The catalog API does not support delete operations - // This is intentional as catalogs are typically immutable reference stores - return errors.New("delete operation not supported by catalog API") + return executeEdit(s.client, req, + (*pb.DataAccessResponse).GetAddressReferenceEditResponse, nil) } // keyToFilter converts a datastore.AddressRefKey to a protobuf AddressReferenceKeyFilter diff --git a/datastore/catalog/remote/address_ref_store_test.go b/datastore/catalog/remote/address_ref_store_test.go index c6d1326af..ace032fea 100644 --- a/datastore/catalog/remote/address_ref_store_test.go +++ b/datastore/catalog/remote/address_ref_store_test.go @@ -288,17 +288,93 @@ func TestCatalogAddressRefStore_Upsert(t *testing.T) { func TestCatalogAddressRefStore_Delete(t *testing.T) { t.Parallel() - store := setupTestStore(t, "", "") - version := semver.MustParse("1.0.0") - key := datastore.NewAddressRefKey(12345, "LinkToken", version, "test") + tests := []struct { + name string + run func(t *testing.T, store *catalogAddressRefStore) + }{ + { + name: "delete_existing_record", + run: func(t *testing.T, store *catalogAddressRefStore) { + t.Helper() + ref := newRandomAddressRef() + require.NoError(t, store.Add(t.Context(), ref)) + + key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier) + require.NoError(t, store.Delete(t.Context(), key)) + + _, err := store.Get(t.Context(), key) + require.ErrorIs(t, err, datastore.ErrAddressRefNotFound) + }, + }, + { + name: "delete_nonexistent_key_is_noop", + run: func(t *testing.T, store *catalogAddressRefStore) { + t.Helper() + ref := newRandomAddressRef() + key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier) + require.NoError(t, store.Delete(t.Context(), key)) + }, + }, + { + name: "delete_already_deleted_is_noop", + run: func(t *testing.T, store *catalogAddressRefStore) { + t.Helper() + ref := newRandomAddressRef() + require.NoError(t, store.Add(t.Context(), ref)) + + key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier) + require.NoError(t, store.Delete(t.Context(), key)) + require.NoError(t, store.Delete(t.Context(), key)) + }, + }, + { + name: "delete_then_readd", + run: func(t *testing.T, store *catalogAddressRefStore) { + t.Helper() + ref := newRandomAddressRef() + require.NoError(t, store.Add(t.Context(), ref)) + + key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier) + require.NoError(t, store.Delete(t.Context(), key)) + + resurrected := ref + resurrected.Address = "0x" + randomHex(40) + resurrected.Labels = datastore.NewLabelSet("resurrected") + require.NoError(t, store.Add(t.Context(), resurrected)) + + got, err := store.Get(t.Context(), key) + require.NoError(t, err) + require.Equal(t, resurrected.Address, got.Address) + require.Equal(t, resurrected.Labels.List(), got.Labels.List()) + }, + }, + { + name: "delete_excluded_from_find", + run: func(t *testing.T, store *catalogAddressRefStore) { + t.Helper() + ref := newRandomAddressRef() + require.NoError(t, store.Add(t.Context(), ref)) - // Execute - err := store.Delete(t.Context(), key) + key := datastore.NewAddressRefKey(ref.ChainSelector, ref.Type, ref.Version, ref.Qualifier) + _, err := store.Get(t.Context(), key) + require.NoError(t, err) - // Verify - require.Error(t, err) - require.Contains(t, err.Error(), "delete operation not supported") + require.NoError(t, store.Delete(t.Context(), key)) + + _, err = store.Get(t.Context(), key) + require.ErrorIs(t, err, datastore.ErrAddressRefNotFound) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + store := setupTestStore(t, "test-domain", "catalog_testing") + tt.run(t, store) + }) + } } func TestCatalogAddressRefStore_FetchAndFilter(t *testing.T) { @@ -657,7 +733,7 @@ func setupTestStore(t *testing.T, domain, environment string) *catalogAddressRef } // randomHex generates a random hex string of specified length -func randomHex(length int) string { +func randomHex(length int) string { //nolint:unparam // this is a test function and we usually want a 40 digit hex string bytes := make([]byte, length/2) if _, err := rand.Read(bytes); err != nil { panic(fmt.Sprintf("failed to generate random bytes: %v", err)) diff --git a/datastore/catalog/remote/chain_metadata_store.go b/datastore/catalog/remote/chain_metadata_store.go index 10abf2ee1..ef1628967 100644 --- a/datastore/catalog/remote/chain_metadata_store.go +++ b/datastore/catalog/remote/chain_metadata_store.go @@ -346,18 +346,46 @@ func (s *catalogChainMetadataStore) Update(ctx context.Context, key datastore.Ch return s.editRecord(record, pb.EditSemantics_SEMANTICS_UPDATE) } -func (s *catalogChainMetadataStore) Delete(_ context.Context, _ datastore.ChainMetadataKey) error { - return errors.New("delete operation not supported for catalog chain metadata store") +func (s *catalogChainMetadataStore) Delete(_ context.Context, key datastore.ChainMetadataKey) error { + return s.deleteRecord(key) +} + +func (s *catalogChainMetadataStore) deleteRecord(key datastore.ChainMetadataKey) error { + req := &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_ChainMetadataEditRequest{ + ChainMetadataEditRequest: &pb.ChainMetadataEditRequest{ + Record: &pb.ChainMetadata{ + Domain: s.domain, + Environment: s.environment, + ChainSelector: key.ChainSelector(), + }, + Semantics: pb.EditSemantics_SEMANTICS_DELETE, + }, + }, + } + + if err := executeEdit(s.client, req, + (*pb.DataAccessResponse).GetChainMetadataEditResponse, nil); err != nil { + return err + } + + s.clearVersion(key) + + return nil +} + +func (s *catalogChainMetadataStore) clearVersion(key datastore.ChainMetadataKey) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.versionCache, key.String()) } // editRecord is a helper method that handles Add, Upsert, and Update operations func (s *catalogChainMetadataStore) editRecord(record datastore.ChainMetadata, semantics pb.EditSemantics) error { - // Get the current version for this record key := record.Key() version := s.getVersion(key) - // Create edit request - editReq := &pb.DataAccessRequest{ + req := &pb.DataAccessRequest{ Operation: &pb.DataAccessRequest_ChainMetadataEditRequest{ ChainMetadataEditRequest: &pb.ChainMetadataEditRequest{ Record: s.chainMetadataToProto(record, version), @@ -366,46 +394,22 @@ func (s *catalogChainMetadataStore) editRecord(record datastore.ChainMetadata, s }, } - // Create stream with the initial request for HMAC - stream, err := s.client.DataAccess(editReq) - if err != nil { - return fmt.Errorf("failed to create gRPC stream: %w", err) - } - - if sendErr := stream.Send(editReq); sendErr != nil { - return fmt.Errorf("failed to send edit request: %w", sendErr) - } - - // Receive response - resp, err := stream.Recv() - if err != nil { - return fmt.Errorf("failed to receive response: %w", err) - } - - // Check for errors in the edit response - if statusErr := parseResponseStatus(resp.Status); statusErr != nil { - st, err := parseStatusError(statusErr) - if err != nil { - return err - } - - switch st.Code() { //nolint:exhaustive // We don't need to handle all codes here - case codes.NotFound: - return fmt.Errorf("%w: %s", datastore.ErrChainMetadataNotFound, statusErr.Error()) - case codes.Aborted: - return fmt.Errorf("%w: %s", datastore.ErrChainMetadataStale, statusErr.Error()) - default: - return fmt.Errorf("edit request failed: %w", statusErr) - } - } - editResp := resp.GetChainMetadataEditResponse() - if editResp == nil { - return errors.New("unexpected response type") + if err := executeEdit(s.client, req, + (*pb.DataAccessResponse).GetChainMetadataEditResponse, + func(statusErr error, code codes.Code) error { + switch code { //nolint:exhaustive // We don't need to handle all codes here + case codes.NotFound: + return fmt.Errorf("%w: %s", datastore.ErrChainMetadataNotFound, statusErr.Error()) + case codes.Aborted: + return fmt.Errorf("%w: %s", datastore.ErrChainMetadataStale, statusErr.Error()) + default: + return fmt.Errorf("edit request failed: %w", statusErr) + } + }); err != nil { + return err } - // Update the version cache - increment the version after successful edit - newVersion := s.getVersion(key) + 1 - s.setVersion(key, newVersion) + s.setVersion(key, s.getVersion(key)+1) return nil } diff --git a/datastore/catalog/remote/chain_metadata_store_test.go b/datastore/catalog/remote/chain_metadata_store_test.go index 92669e919..44b7d763d 100644 --- a/datastore/catalog/remote/chain_metadata_store_test.go +++ b/datastore/catalog/remote/chain_metadata_store_test.go @@ -715,16 +715,94 @@ func TestCatalogChainMetadataStore_Upsert_StaleVersion(t *testing.T) { func TestCatalogChainMetadataStore_Delete(t *testing.T) { t.Parallel() - store := setupTestChainStore(t, "", "") - key := datastore.NewChainMetadataKey(12345) + tests := []struct { + name string + run func(t *testing.T, store *catalogChainMetadataStore) + }{ + { + name: "delete_existing_record", + run: func(t *testing.T, store *catalogChainMetadataStore) { + t.Helper() + record := newRandomChainMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewChainMetadataKey(record.ChainSelector) + require.NoError(t, store.Delete(t.Context(), key)) + + _, err := store.Get(t.Context(), key) + require.ErrorIs(t, err, datastore.ErrChainMetadataNotFound) + }, + }, + { + name: "delete_nonexistent_key_is_noop", + run: func(t *testing.T, store *catalogChainMetadataStore) { + t.Helper() + key := datastore.NewChainMetadataKey(generateRandomChainSelector()) + require.NoError(t, store.Delete(t.Context(), key)) + }, + }, + { + name: "delete_already_deleted_is_noop", + run: func(t *testing.T, store *catalogChainMetadataStore) { + t.Helper() + record := newRandomChainMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewChainMetadataKey(record.ChainSelector) + require.NoError(t, store.Delete(t.Context(), key)) + require.NoError(t, store.Delete(t.Context(), key)) + }, + }, + { + name: "delete_then_readd", + run: func(t *testing.T, store *catalogChainMetadataStore) { + t.Helper() + record := newRandomChainMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewChainMetadataKey(record.ChainSelector) + require.NoError(t, store.Delete(t.Context(), key)) - // Execute - err := store.Delete(t.Context(), key) + resurrected := datastore.ChainMetadata{ + ChainSelector: record.ChainSelector, + Metadata: newTestChainMetadata("Resurrected"), + } + require.NoError(t, store.Add(t.Context(), resurrected)) + + got, err := store.Get(t.Context(), key) + require.NoError(t, err) + concrete, err := datastore.As[TestChainMetadata](got.Metadata) + require.NoError(t, err) + require.Equal(t, "Resurrected", concrete.Name) + }, + }, + { + name: "delete_excluded_from_find", + run: func(t *testing.T, store *catalogChainMetadataStore) { + t.Helper() + record := newRandomChainMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewChainMetadataKey(record.ChainSelector) + _, err := store.Get(t.Context(), key) + require.NoError(t, err) + + require.NoError(t, store.Delete(t.Context(), key)) - // Verify - require.Error(t, err) - require.Contains(t, err.Error(), "delete operation not supported") + _, err = store.Get(t.Context(), key) + require.ErrorIs(t, err, datastore.ErrChainMetadataNotFound) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + store := setupTestChainStore(t, "test-domain", "catalog_testing") + tt.run(t, store) + }) + } } func TestCatalogChainMetadataStore_FetchAndFilter(t *testing.T) { diff --git a/datastore/catalog/remote/contract_metadata_store.go b/datastore/catalog/remote/contract_metadata_store.go index 2a8ac45ca..1930dc52c 100644 --- a/datastore/catalog/remote/contract_metadata_store.go +++ b/datastore/catalog/remote/contract_metadata_store.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "strings" "sync" @@ -353,71 +352,71 @@ func (s *catalogContractMetadataStore) Update(ctx context.Context, key datastore return s.editRecord(record, pb.EditSemantics_SEMANTICS_UPDATE) } -func (s *catalogContractMetadataStore) Delete(_ context.Context, _ datastore.ContractMetadataKey) error { - return errors.New("delete operation not supported for catalog contract metadata store") +func (s *catalogContractMetadataStore) Delete(_ context.Context, key datastore.ContractMetadataKey) error { + return s.deleteRecord(key) } -// editRecord is a helper method that handles Add, Upsert, and Update operations -func (s *catalogContractMetadataStore) editRecord(record datastore.ContractMetadata, semantics pb.EditSemantics) error { - // Get the current version for this record - key := record.Key() - version := s.getVersion(key) - - // Create edit request - editReq := &pb.DataAccessRequest{ +func (s *catalogContractMetadataStore) deleteRecord(key datastore.ContractMetadataKey) error { + req := &pb.DataAccessRequest{ Operation: &pb.DataAccessRequest_ContractMetadataEditRequest{ ContractMetadataEditRequest: &pb.ContractMetadataEditRequest{ - Record: s.contractMetadataToProto(record, version), - Semantics: semantics, + Record: &pb.ContractMetadata{ + Domain: s.domain, + Environment: s.environment, + ChainSelector: key.ChainSelector(), + Address: key.Address(), + }, + Semantics: pb.EditSemantics_SEMANTICS_DELETE, }, }, } - // Create stream with the initial request for HMAC - stream, err := s.client.DataAccess(editReq) - if err != nil { - return fmt.Errorf("failed to create gRPC stream: %w", err) + if err := executeEdit(s.client, req, + (*pb.DataAccessResponse).GetContractMetadataEditResponse, nil); err != nil { + return err } - if sendErr := stream.Send(editReq); sendErr != nil { - return fmt.Errorf("failed to send edit request: %w", sendErr) - } + s.clearVersion(key) - // Receive response - resp, recvErr := stream.Recv() - if recvErr != nil { - if errors.Is(recvErr, io.EOF) { - return errors.New("unexpected end of stream") - } + return nil +} - return fmt.Errorf("failed to receive response: %w", recvErr) - } +func (s *catalogContractMetadataStore) clearVersion(key datastore.ContractMetadataKey) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.versionCache, key.String()) +} - // Check for errors in the edit response - if statusErr := parseResponseStatus(resp.Status); statusErr != nil { - st, err := parseStatusError(statusErr) - if err != nil { - return err - } +// editRecord is a helper method that handles Add, Upsert, and Update operations +func (s *catalogContractMetadataStore) editRecord(record datastore.ContractMetadata, semantics pb.EditSemantics) error { + key := record.Key() + version := s.getVersion(key) - switch st.Code() { //nolint:exhaustive // We don't need to handle all codes here - case codes.NotFound: - return fmt.Errorf("%w: %s", datastore.ErrContractMetadataNotFound, statusErr.Error()) - case codes.Aborted: - return fmt.Errorf("%w: %s", datastore.ErrContractMetadataStale, statusErr.Error()) - default: - return fmt.Errorf("edit request failed: %w", statusErr) - } + req := &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_ContractMetadataEditRequest{ + ContractMetadataEditRequest: &pb.ContractMetadataEditRequest{ + Record: s.contractMetadataToProto(record, version), + Semantics: semantics, + }, + }, } - editResp := resp.GetContractMetadataEditResponse() - if editResp == nil { - return errors.New("unexpected response type") + if err := executeEdit(s.client, req, + (*pb.DataAccessResponse).GetContractMetadataEditResponse, + func(statusErr error, code codes.Code) error { + switch code { //nolint:exhaustive // We don't need to handle all codes here + case codes.NotFound: + return fmt.Errorf("%w: %s", datastore.ErrContractMetadataNotFound, statusErr.Error()) + case codes.Aborted: + return fmt.Errorf("%w: %s", datastore.ErrContractMetadataStale, statusErr.Error()) + default: + return fmt.Errorf("edit request failed: %w", statusErr) + } + }); err != nil { + return err } - // Update the version cache - increment the version after successful edit - newVersion := s.getVersion(key) + 1 - s.setVersion(key, newVersion) + s.setVersion(key, s.getVersion(key)+1) return nil } diff --git a/datastore/catalog/remote/contract_metadata_store_test.go b/datastore/catalog/remote/contract_metadata_store_test.go index 4702146b9..5d7a33769 100644 --- a/datastore/catalog/remote/contract_metadata_store_test.go +++ b/datastore/catalog/remote/contract_metadata_store_test.go @@ -501,16 +501,96 @@ func TestCatalogContractMetadataStore_Upsert_StaleVersion(t *testing.T) { func TestCatalogContractMetadataStore_Delete(t *testing.T) { t.Parallel() - store := setupTestContractStore(t, "", "") - key := datastore.NewContractMetadataKey(12345, "0x1234567890abcdef1234567890abcdef12345678") + tests := []struct { + name string + run func(t *testing.T, store *catalogContractMetadataStore) + }{ + { + name: "delete_existing_record", + run: func(t *testing.T, store *catalogContractMetadataStore) { + t.Helper() + record := newRandomContractMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewContractMetadataKey(record.ChainSelector, record.Address) + require.NoError(t, store.Delete(t.Context(), key)) + + _, err := store.Get(t.Context(), key) + require.ErrorIs(t, err, datastore.ErrContractMetadataNotFound) + }, + }, + { + name: "delete_nonexistent_key_is_noop", + run: func(t *testing.T, store *catalogContractMetadataStore) { + t.Helper() + record := newRandomContractMetadata() + key := datastore.NewContractMetadataKey(record.ChainSelector, record.Address) + require.NoError(t, store.Delete(t.Context(), key)) + }, + }, + { + name: "delete_already_deleted_is_noop", + run: func(t *testing.T, store *catalogContractMetadataStore) { + t.Helper() + record := newRandomContractMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewContractMetadataKey(record.ChainSelector, record.Address) + require.NoError(t, store.Delete(t.Context(), key)) + require.NoError(t, store.Delete(t.Context(), key)) + }, + }, + { + name: "delete_then_readd", + run: func(t *testing.T, store *catalogContractMetadataStore) { + t.Helper() + record := newRandomContractMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewContractMetadataKey(record.ChainSelector, record.Address) + require.NoError(t, store.Delete(t.Context(), key)) - // Execute - err := store.Delete(t.Context(), key) + resurrected := datastore.ContractMetadata{ + Address: record.Address, + ChainSelector: record.ChainSelector, + Metadata: newTestContractMetadata("Resurrected"), + } + require.NoError(t, store.Add(t.Context(), resurrected)) + + got, err := store.Get(t.Context(), key) + require.NoError(t, err) + concrete, err := datastore.As[TestContractMetadata](got.Metadata) + require.NoError(t, err) + require.Equal(t, "Resurrected", concrete.Name) + }, + }, + { + name: "delete_excluded_from_find", + run: func(t *testing.T, store *catalogContractMetadataStore) { + t.Helper() + record := newRandomContractMetadata() + require.NoError(t, store.Add(t.Context(), record)) + + key := datastore.NewContractMetadataKey(record.ChainSelector, record.Address) + _, err := store.Get(t.Context(), key) + require.NoError(t, err) + + require.NoError(t, store.Delete(t.Context(), key)) - // Verify - require.Error(t, err) - require.Contains(t, err.Error(), "delete operation not supported") + _, err = store.Get(t.Context(), key) + require.ErrorIs(t, err, datastore.ErrContractMetadataNotFound) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + store := setupTestContractStore(t, "test-domain", "catalog_testing") + tt.run(t, store) + }) + } } func TestCatalogContractMetadataStore_FetchAndFilter(t *testing.T) { diff --git a/datastore/catalog/remote/edit_request.go b/datastore/catalog/remote/edit_request.go new file mode 100644 index 000000000..1678c7026 --- /dev/null +++ b/datastore/catalog/remote/edit_request.go @@ -0,0 +1,120 @@ +package remote + +import ( + "errors" + "fmt" + "io" + + "google.golang.org/grpc/codes" + + pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore" +) + +func getOpName(req *pb.DataAccessRequest) (string, error) { + var op, entity string + var err error + switch r := req.Operation.(type) { + case *pb.DataAccessRequest_AddressReferenceEditRequest: + entity = "address ref" + op, err = semanticsLabel(r.AddressReferenceEditRequest.Semantics) + if err != nil { + return "", err + } + case *pb.DataAccessRequest_ChainMetadataEditRequest: + entity = "chain metadata" + op, err = semanticsLabel(r.ChainMetadataEditRequest.Semantics) + if err != nil { + return "", err + } + case *pb.DataAccessRequest_ContractMetadataEditRequest: + entity = "contract metadata" + op, err = semanticsLabel(r.ContractMetadataEditRequest.Semantics) + if err != nil { + return "", err + } + case *pb.DataAccessRequest_EnvironmentMetadataEditRequest: + entity = "env metadata" + op, err = semanticsLabel(r.EnvironmentMetadataEditRequest.Semantics) + if err != nil { + return "", err + } + default: + return "", errors.New("unknown operation type") + } + + return op + " " + entity, nil +} + +func semanticsLabel(s pb.EditSemantics) (string, error) { + switch s { + case pb.EditSemantics_SEMANTICS_INSERT: + return "add", nil + case pb.EditSemantics_SEMANTICS_UPSERT: + return "upsert", nil + case pb.EditSemantics_SEMANTICS_UPDATE: + return "update", nil + case pb.EditSemantics_SEMANTICS_DELETE: + return "delete", nil + default: + return "", errors.New("unknown semantics") + } +} + +// errorMapper translates a non-nil gRPC status error into a domain-specific +// error. Receives the raw status error and its extracted code for switch-based +// mapping. If nil, executeEdit falls back to: fmt.Errorf("%s failed: %w", opName, statusErr). +type errorMapper func(statusErr error, code codes.Code) error + +// executeEdit handles the open-stream / send / recv / status-check sequence +// shared by every *EditRequest call. R must be the concrete edit-response +// pointer type (e.g. *pb.AddressReferenceEditResponse); extract returns the +// typed response from the DataAccessResponse oneof and executeEdit returns +// "unexpected response type" when it is nil. +func executeEdit[R comparable]( + client *CatalogClient, + req *pb.DataAccessRequest, + extract func(*pb.DataAccessResponse) R, + mapErr errorMapper, +) error { + opName, err := getOpName(req) + if err != nil { + return fmt.Errorf("failed to get operation name: %w", err) + } + stream, clientErr := client.DataAccess(req) + if clientErr != nil { + return fmt.Errorf("failed to create gRPC stream: %w", clientErr) + } + + if sendErr := stream.Send(req); sendErr != nil { + return fmt.Errorf("failed to send %s request: %w", opName, sendErr) + } + + resp, recvErr := stream.Recv() + if recvErr != nil { + if errors.Is(recvErr, io.EOF) { + return errors.New("unexpected end of stream") + } + + return fmt.Errorf("failed to receive %s response: %w", opName, recvErr) + } + + if statusErr := parseResponseStatus(resp.Status); statusErr != nil { + if mapErr != nil { + st, parseErr := parseStatusError(statusErr) + if parseErr != nil { + return parseErr + } + + return mapErr(statusErr, st.Code()) + } + + return fmt.Errorf("%s failed: %w", opName, statusErr) + } + + var zero R + if extract(resp) == zero { + return errors.New("unexpected response type") + } + + return nil +} diff --git a/datastore/catalog/remote/edit_request_test.go b/datastore/catalog/remote/edit_request_test.go new file mode 100644 index 000000000..d1a3e99cc --- /dev/null +++ b/datastore/catalog/remote/edit_request_test.go @@ -0,0 +1,91 @@ +package remote + +import ( + "testing" + + pb "github.com/smartcontractkit/chainlink-protos/op-catalog/v1/datastore" + "github.com/stretchr/testify/require" +) + +func TestGetOpName(t *testing.T) { + t.Parallel() + + addressRefOp := func(s pb.EditSemantics) *pb.DataAccessRequest { + return &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{ + AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{Semantics: s}, + }, + } + } + chainMetadataOp := func(s pb.EditSemantics) *pb.DataAccessRequest { + return &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_ChainMetadataEditRequest{ + ChainMetadataEditRequest: &pb.ChainMetadataEditRequest{Semantics: s}, + }, + } + } + contractMetadataOp := func(s pb.EditSemantics) *pb.DataAccessRequest { + return &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_ContractMetadataEditRequest{ + ContractMetadataEditRequest: &pb.ContractMetadataEditRequest{Semantics: s}, + }, + } + } + envMetadataOp := func(s pb.EditSemantics) *pb.DataAccessRequest { + return &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_EnvironmentMetadataEditRequest{ + EnvironmentMetadataEditRequest: &pb.EnvironmentMetadataEditRequest{Semantics: s}, + }, + } + } + + tests := []struct { + name string + req *pb.DataAccessRequest + expected string + expectedErr string + }{ + // address ref + {name: "add address ref", req: addressRefOp(pb.EditSemantics_SEMANTICS_INSERT), expected: "add address ref"}, + {name: "upsert address ref", req: addressRefOp(pb.EditSemantics_SEMANTICS_UPSERT), expected: "upsert address ref"}, + {name: "update address ref", req: addressRefOp(pb.EditSemantics_SEMANTICS_UPDATE), expected: "update address ref"}, + {name: "delete address ref", req: addressRefOp(pb.EditSemantics_SEMANTICS_DELETE), expected: "delete address ref"}, + // chain metadata + {name: "add chain metadata", req: chainMetadataOp(pb.EditSemantics_SEMANTICS_INSERT), expected: "add chain metadata"}, + {name: "upsert chain metadata", req: chainMetadataOp(pb.EditSemantics_SEMANTICS_UPSERT), expected: "upsert chain metadata"}, + {name: "update chain metadata", req: chainMetadataOp(pb.EditSemantics_SEMANTICS_UPDATE), expected: "update chain metadata"}, + {name: "delete chain metadata", req: chainMetadataOp(pb.EditSemantics_SEMANTICS_DELETE), expected: "delete chain metadata"}, + // contract metadata + {name: "add contract metadata", req: contractMetadataOp(pb.EditSemantics_SEMANTICS_INSERT), expected: "add contract metadata"}, + {name: "upsert contract metadata", req: contractMetadataOp(pb.EditSemantics_SEMANTICS_UPSERT), expected: "upsert contract metadata"}, + {name: "update contract metadata", req: contractMetadataOp(pb.EditSemantics_SEMANTICS_UPDATE), expected: "update contract metadata"}, + {name: "delete contract metadata", req: contractMetadataOp(pb.EditSemantics_SEMANTICS_DELETE), expected: "delete contract metadata"}, + // env metadata + {name: "add env metadata", req: envMetadataOp(pb.EditSemantics_SEMANTICS_INSERT), expected: "add env metadata"}, + {name: "upsert env metadata", req: envMetadataOp(pb.EditSemantics_SEMANTICS_UPSERT), expected: "upsert env metadata"}, + {name: "update env metadata", req: envMetadataOp(pb.EditSemantics_SEMANTICS_UPDATE), expected: "update env metadata"}, + // edge cases + {name: "unknown operation type", req: &pb.DataAccessRequest{}, expectedErr: "unknown operation type"}, + {name: "unknown semantics", req: &pb.DataAccessRequest{ + Operation: &pb.DataAccessRequest_AddressReferenceEditRequest{ + AddressReferenceEditRequest: &pb.AddressReferenceEditRequest{ + Semantics: 100, + }, + }, + }, expectedErr: "unknown semantics"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + op, err := getOpName(tt.req) + + if tt.expectedErr != "" { + require.ErrorContains(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tt.expected, op) + } + }) + } +} diff --git a/datastore/catalog/remote/env_metadata_store.go b/datastore/catalog/remote/env_metadata_store.go index 5fcd22750..106118db5 100644 --- a/datastore/catalog/remote/env_metadata_store.go +++ b/datastore/catalog/remote/env_metadata_store.go @@ -223,63 +223,30 @@ func (s *catalogEnvMetadataStore) Set(ctx context.Context, metadata any, opts .. // editRecord is a helper method that handles the edit operation func (s *catalogEnvMetadataStore) editRecord(record datastore.EnvMetadata) error { - // Get the current version for this record version := s.getVersion() - // Create the protobuf record - protoRecord := s.envMetadataToProto(record, version) - // Create edit request with UPSERT semantics (since Set should always work) - editReq := &pb.DataAccessRequest{ + req := &pb.DataAccessRequest{ Operation: &pb.DataAccessRequest_EnvironmentMetadataEditRequest{ EnvironmentMetadataEditRequest: &pb.EnvironmentMetadataEditRequest{ - Record: protoRecord, + Record: s.envMetadataToProto(record, version), Semantics: pb.EditSemantics_SEMANTICS_UPSERT, }, }, } - // Create stream with the initial request for HMAC - stream, err := s.client.DataAccess(editReq) - if err != nil { - return fmt.Errorf("failed to create gRPC stream: %w", err) - } - - if sendErr := stream.Send(editReq); sendErr != nil { - return fmt.Errorf("failed to send edit request: %w", sendErr) - } - - // Receive response - resp, err := stream.Recv() - if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("request canceled or deadline exceeded: %w", err) - } - - return fmt.Errorf("failed to receive response: %w", err) - } - - // Check for errors in the edit response - if statusErr := parseResponseStatus(resp.Status); statusErr != nil { - st, err := parseStatusError(statusErr) - if err != nil { - return err - } - - if st.Code() == codes.Aborted { - return fmt.Errorf("%w: %s", datastore.ErrEnvMetadataStale, statusErr.Error()) - } - - return fmt.Errorf("edit request failed: %w", statusErr) - } + if err := executeEdit(s.client, req, + (*pb.DataAccessResponse).GetEnvironmentMetadataEditResponse, + func(statusErr error, code codes.Code) error { + if code == codes.Aborted { + return fmt.Errorf("%w: %s", datastore.ErrEnvMetadataStale, statusErr.Error()) + } - editResp := resp.GetEnvironmentMetadataEditResponse() - if editResp == nil { - return errors.New("unexpected response type") + return fmt.Errorf("edit request failed: %w", statusErr) + }); err != nil { + return err } - // Update the version cache - increment the version after successful edit - newVersion := s.getVersion() + 1 - s.setVersion(newVersion) + s.setVersion(s.getVersion() + 1) return nil } diff --git a/datastore/catalog/remote/testcontainer_setup.go b/datastore/catalog/remote/testcontainer_setup.go index 9179351f8..004cd52da 100644 --- a/datastore/catalog/remote/testcontainer_setup.go +++ b/datastore/catalog/remote/testcontainer_setup.go @@ -23,7 +23,7 @@ const ( postgresPort = "5432" catalogImageEnv = "CATALOG_SERVICE_IMAGE" catalogImageRepo = "op-catalog-service" - catalogImageTag = "latest" + catalogImageTag = "v0.3.0" catalogPort = "8080" networkName = "chainlink_catalog_network" ) diff --git a/datastore/catalog_syncer.go b/datastore/catalog_syncer.go index 31ed7f4ee..4ccdda3b9 100644 --- a/datastore/catalog_syncer.go +++ b/datastore/catalog_syncer.go @@ -29,6 +29,18 @@ func MergeDataStoreToCatalog(ctx context.Context, sourceDS DataStore, catalog Ca } } + if src, ok := sourceDS.Addresses().(*MemoryAddressRefStore); ok { + for _, dk := range src.DeletedRemoteKeys { + key, keyErr := NewAddressRefKeyFromString(dk) + if keyErr != nil { + return fmt.Errorf("failed to parse address ref key: %w", keyErr) + } + if deleteErr := txCatalog.Addresses().Delete(ctx, key); deleteErr != nil { + return fmt.Errorf("failed to delete address reference from catalog: %w", deleteErr) + } + } + } + // Merge all chain metadata to the catalog chainMetadata, err := sourceDS.ChainMetadata().Fetch() if err != nil { @@ -42,6 +54,18 @@ func MergeDataStoreToCatalog(ctx context.Context, sourceDS DataStore, catalog Ca } } + if src, ok := sourceDS.ChainMetadata().(*MemoryChainMetadataStore); ok { + for _, dk := range src.DeletedRemoteKeys { + key, keyErr := NewChainMetadataKeyFromString(dk) + if keyErr != nil { + return fmt.Errorf("failed to parse chain metadata key: %w", keyErr) + } + if deleteErr := txCatalog.ChainMetadata().Delete(ctx, key); deleteErr != nil { + return fmt.Errorf("failed to delete chain metadata from catalog: %w", deleteErr) + } + } + } + // Merge all contract metadata to the catalog contractMetadata, err := sourceDS.ContractMetadata().Fetch() if err != nil { @@ -55,6 +79,18 @@ func MergeDataStoreToCatalog(ctx context.Context, sourceDS DataStore, catalog Ca } } + if src, ok := sourceDS.ContractMetadata().(*MemoryContractMetadataStore); ok { + for _, dk := range src.DeletedRemoteKeys { + key, keyErr := NewContractMetadataKeyFromString(dk) + if keyErr != nil { + return fmt.Errorf("failed to parse contract metadata key: %w", keyErr) + } + if deleteErr := txCatalog.ContractMetadata().Delete(ctx, key); deleteErr != nil { + return fmt.Errorf("failed to delete contract metadata from catalog: %w", deleteErr) + } + } + } + // Merge environment metadata to the catalog envMetadata, err := sourceDS.EnvMetadata().Get() if err != nil { diff --git a/datastore/catalog_syncer_test.go b/datastore/catalog_syncer_test.go index f81372c3a..208d11768 100644 --- a/datastore/catalog_syncer_test.go +++ b/datastore/catalog_syncer_test.go @@ -78,13 +78,14 @@ func TestMergeDataStoreToCatalog(t *testing.T) { ).Once() // Setup source datastore expectations - fetch from source - mockSourceDS.EXPECT().Addresses().Return(mockSourceAddressStore).Once() + // Each store is called twice: once for Fetch, once for the DeletedRemoteKeys type assertion. + mockSourceDS.EXPECT().Addresses().Return(mockSourceAddressStore).Times(2) mockSourceAddressStore.EXPECT().Fetch().Return(testAddressRefs, nil).Once() - mockSourceDS.EXPECT().ChainMetadata().Return(mockSourceChainStore).Once() + mockSourceDS.EXPECT().ChainMetadata().Return(mockSourceChainStore).Times(2) mockSourceChainStore.EXPECT().Fetch().Return(testChainMetadata, nil).Once() - mockSourceDS.EXPECT().ContractMetadata().Return(mockSourceContractStore).Once() + mockSourceDS.EXPECT().ContractMetadata().Return(mockSourceContractStore).Times(2) mockSourceContractStore.EXPECT().Fetch().Return(testContractMetadata, nil).Once() mockSourceDS.EXPECT().EnvMetadata().Return(mockSourceEnvStore).Once() @@ -139,13 +140,13 @@ func TestMergeDataStoreToCatalog(t *testing.T) { ).Once() // Setup source datastore expectations - mockSourceDS.EXPECT().Addresses().Return(mockSourceAddressStore).Once() + mockSourceDS.EXPECT().Addresses().Return(mockSourceAddressStore).Times(2) mockSourceAddressStore.EXPECT().Fetch().Return([]AddressRef{}, nil).Once() - mockSourceDS.EXPECT().ChainMetadata().Return(mockSourceChainStore).Once() + mockSourceDS.EXPECT().ChainMetadata().Return(mockSourceChainStore).Times(2) mockSourceChainStore.EXPECT().Fetch().Return([]ChainMetadata{}, nil).Once() - mockSourceDS.EXPECT().ContractMetadata().Return(mockSourceContractStore).Once() + mockSourceDS.EXPECT().ContractMetadata().Return(mockSourceContractStore).Times(2) mockSourceContractStore.EXPECT().Fetch().Return([]ContractMetadata{}, nil).Once() mockSourceDS.EXPECT().EnvMetadata().Return(mockSourceEnvStore).Once() @@ -364,13 +365,13 @@ func TestMergeDataStoreToCatalog(t *testing.T) { ).Once() // Setup source datastore expectations - all stores are empty - mockSourceDS.EXPECT().Addresses().Return(mockSourceAddressStore).Once() + mockSourceDS.EXPECT().Addresses().Return(mockSourceAddressStore).Times(2) mockSourceAddressStore.EXPECT().Fetch().Return([]AddressRef{}, nil).Once() - mockSourceDS.EXPECT().ChainMetadata().Return(mockSourceChainStore).Once() + mockSourceDS.EXPECT().ChainMetadata().Return(mockSourceChainStore).Times(2) mockSourceChainStore.EXPECT().Fetch().Return([]ChainMetadata{}, nil).Once() - mockSourceDS.EXPECT().ContractMetadata().Return(mockSourceContractStore).Once() + mockSourceDS.EXPECT().ContractMetadata().Return(mockSourceContractStore).Times(2) mockSourceContractStore.EXPECT().Fetch().Return([]ContractMetadata{}, nil).Once() mockSourceDS.EXPECT().EnvMetadata().Return(mockSourceEnvStore).Once() @@ -405,4 +406,126 @@ func TestMergeDataStoreToCatalog(t *testing.T) { require.Error(t, err) assert.Equal(t, expectedErr, err) }) + + t.Run("merges deletions to catalog", func(t *testing.T) { + t.Parallel() + + mockCatalog := NewMockCatalogStore(t) + mockTxCatalog := NewMockCatalogStore(t) + mockCatalogAddressStore := NewMockMutableRefStoreV2[AddressRefKey, AddressRef](t) + mockCatalogChainStore := NewMockMutableStoreV2[ChainMetadataKey, ChainMetadata](t) + mockCatalogContractStore := NewMockMutableStoreV2[ContractMetadataKey, ContractMetadata](t) + + // Use a real MemoryDataStore so the type assertions in the syncer succeed. + // RemoteDelete populates DeletedRemoteKeys (the field the syncer iterates). + sourceDS := NewMemoryDataStore() + + addrKey := NewAddressRefKey(1, "TestContract", semver.MustParse("1.0.0"), "to-delete") + require.NoError(t, sourceDS.AddressRefStore.RemoteDelete(addrKey)) + + chainKey := NewChainMetadataKey(42) + require.NoError(t, sourceDS.ChainMetadataStore.RemoteDelete(chainKey)) + + contractKey := NewContractMetadataKey(99, "0xbeef") + require.NoError(t, sourceDS.ContractMetadataStore.RemoteDelete(contractKey)) + + mockCatalog.EXPECT().WithTransaction(ctx, mock.Anything).RunAndReturn( + func(ctx context.Context, fn TransactionLogic) error { + return fn(ctx, mockTxCatalog) + }, + ).Once() + + // Fetch returns empty for all stores (records were deleted), so no upserts. + // Delete is called once per deleted key. + mockTxCatalog.EXPECT().Addresses().Return(mockCatalogAddressStore).Once() + mockCatalogAddressStore.EXPECT().Delete(ctx, addrKey).Return(nil).Once() + + mockTxCatalog.EXPECT().ChainMetadata().Return(mockCatalogChainStore).Once() + mockCatalogChainStore.EXPECT().Delete(ctx, chainKey).Return(nil).Once() + + mockTxCatalog.EXPECT().ContractMetadata().Return(mockCatalogContractStore).Once() + mockCatalogContractStore.EXPECT().Delete(ctx, contractKey).Return(nil).Once() + + // Env metadata is not set in a fresh MemoryDataStore, syncer returns nil early. + err := MergeDataStoreToCatalog(ctx, sourceDS.Seal(), mockCatalog) + require.NoError(t, err) + }) + + t.Run("returns error when address ref delete fails", func(t *testing.T) { + t.Parallel() + + mockCatalog := NewMockCatalogStore(t) + mockTxCatalog := NewMockCatalogStore(t) + mockCatalogAddressStore := NewMockMutableRefStoreV2[AddressRefKey, AddressRef](t) + + sourceDS := NewMemoryDataStore() + addrKey := NewAddressRefKey(1, "TestContract", semver.MustParse("1.0.0"), "to-delete") + require.NoError(t, sourceDS.AddressRefStore.RemoteDelete(addrKey)) + + mockCatalog.EXPECT().WithTransaction(ctx, mock.Anything).RunAndReturn( + func(ctx context.Context, fn TransactionLogic) error { + return fn(ctx, mockTxCatalog) + }, + ).Once() + + mockTxCatalog.EXPECT().Addresses().Return(mockCatalogAddressStore).Once() + mockCatalogAddressStore.EXPECT().Delete(ctx, addrKey).Return(errors.New("remote delete failed")).Once() + + err := MergeDataStoreToCatalog(ctx, sourceDS.Seal(), mockCatalog) + require.Error(t, err) + require.ErrorContains(t, err, "failed to delete address reference from catalog") + require.ErrorContains(t, err, "remote delete failed") + }) + + t.Run("returns error when chain metadata delete fails", func(t *testing.T) { + t.Parallel() + + mockCatalog := NewMockCatalogStore(t) + mockTxCatalog := NewMockCatalogStore(t) + mockCatalogChainStore := NewMockMutableStoreV2[ChainMetadataKey, ChainMetadata](t) + + sourceDS := NewMemoryDataStore() + chainKey := NewChainMetadataKey(42) + require.NoError(t, sourceDS.ChainMetadataStore.RemoteDelete(chainKey)) + + mockCatalog.EXPECT().WithTransaction(ctx, mock.Anything).RunAndReturn( + func(ctx context.Context, fn TransactionLogic) error { + return fn(ctx, mockTxCatalog) + }, + ).Once() + + mockTxCatalog.EXPECT().ChainMetadata().Return(mockCatalogChainStore).Once() + mockCatalogChainStore.EXPECT().Delete(ctx, chainKey).Return(errors.New("remote delete failed")).Once() + + err := MergeDataStoreToCatalog(ctx, sourceDS.Seal(), mockCatalog) + require.Error(t, err) + require.ErrorContains(t, err, "failed to delete chain metadata from catalog") + require.ErrorContains(t, err, "remote delete failed") + }) + + t.Run("returns error when contract metadata delete fails", func(t *testing.T) { + t.Parallel() + + mockCatalog := NewMockCatalogStore(t) + mockTxCatalog := NewMockCatalogStore(t) + mockCatalogContractStore := NewMockMutableStoreV2[ContractMetadataKey, ContractMetadata](t) + + sourceDS := NewMemoryDataStore() + contractKey := NewContractMetadataKey(99, "0xbeef") + require.NoError(t, sourceDS.ContractMetadataStore.RemoteDelete(contractKey)) + + mockCatalog.EXPECT().WithTransaction(ctx, mock.Anything).RunAndReturn( + func(ctx context.Context, fn TransactionLogic) error { + return fn(ctx, mockTxCatalog) + }, + ).Once() + + mockTxCatalog.EXPECT().ContractMetadata().Return(mockCatalogContractStore).Once() + mockCatalogContractStore.EXPECT().Delete(ctx, contractKey).Return(errors.New("remote delete failed")).Once() + + err := MergeDataStoreToCatalog(ctx, sourceDS.Seal(), mockCatalog) + require.Error(t, err) + require.ErrorContains(t, err, "failed to delete contract metadata from catalog") + require.ErrorContains(t, err, "remote delete failed") + }) } diff --git a/go.mod b/go.mod index d368fc3e0..54de7ead2 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260121163256-85accaf3d28d github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250912190424-fd2e35d7deb5 github.com/smartcontractkit/chainlink-protos/job-distributor v0.18.0 - github.com/smartcontractkit/chainlink-protos/op-catalog v0.0.4 + github.com/smartcontractkit/chainlink-protos/op-catalog v0.1.0 github.com/smartcontractkit/chainlink-testing-framework/framework v0.15.18 github.com/smartcontractkit/chainlink-testing-framework/seth v1.51.5 github.com/smartcontractkit/chainlink-ton v0.0.0-20260219201907-054376f21418 diff --git a/go.sum b/go.sum index adaf4f330..547456256 100644 --- a/go.sum +++ b/go.sum @@ -767,8 +767,8 @@ github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-202510021 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b/go.mod h1:dkR2uYg9XYJuT1JASkPzWE51jjFkVb86P7a/yXe5/GM= -github.com/smartcontractkit/chainlink-protos/op-catalog v0.0.4 h1:AEnxv4HM3WD1RbQkRiFyb9cJ6YKAcqBp1CpIcFdZfuo= -github.com/smartcontractkit/chainlink-protos/op-catalog v0.0.4/go.mod h1:PjZD54vr6rIKEKQj6HNA4hllvYI/QpT+Zefj3tqkFAs= +github.com/smartcontractkit/chainlink-protos/op-catalog v0.1.0 h1:hGEJFD2X3oNIPXQbtIPxCJyg5CcKglRCYBmESS+gmeQ= +github.com/smartcontractkit/chainlink-protos/op-catalog v0.1.0/go.mod h1:PjZD54vr6rIKEKQj6HNA4hllvYI/QpT+Zefj3tqkFAs= github.com/smartcontractkit/chainlink-sui v0.0.0-20260205175622-33e65031f9a9 h1:KyPROV+v7P8VdiU7JhVuGLcDlEBsURSpQmSCgNBTY+s= github.com/smartcontractkit/chainlink-sui v0.0.0-20260205175622-33e65031f9a9/go.mod h1:KpEWZJMLwbdMHeHQz9rbkES0vRrx4nk6OQXyhlHb9/8= github.com/smartcontractkit/chainlink-testing-framework/framework v0.15.18 h1:ZPYXn3VvaZhWOyVHFBsKC543EJbL2d4PthQbdL3WgHs=