Skip to content

Commit

Permalink
Make MatchFiltersForEncodedEvent thread safe
Browse files Browse the repository at this point in the history
Also: add warnings to methods which are not intended to be called concurrently
  • Loading branch information
reductionista committed Jan 17, 2025
1 parent 4b77708 commit 11d5d68
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
}

// IncrementSeqNum increments the sequence number for a filterID and returns the new
// number. This means the sequence number assigned to the first log matched after registration will be 1
// number. This means the sequence number assigned to the first log matched after registration will be 1.
// WARNING: not thread safe, should only be called while fl.filtersMutex is locked, and after filters have been loaded.
func (fl *filters) IncrementSeqNum(filterID int64) int64 {
fl.seqNums[filterID]++
return fl.seqNums[filterID]
Expand Down Expand Up @@ -270,24 +271,31 @@ func (fl *filters) matchingFilters(addr PublicKey, eventSignature EventSignature
// this will be called on every new event that happens on the blockchain, so it's important it returns immediately if it
// doesn't match any registered filters.
func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] {
if _, ok := fl.knownPrograms[event.Program]; !ok {
return nil
}

// If this log message corresponds to an anchor event, then it must begin with an 8 byte discriminator,
// which will appear as the first 11 bytes of base64-encoded data. Standard base64 encoding RFC requires
// that any base64-encoded string must be padding with the = char to make its length a multiple of 4, so
// 12 is the minimum length for a valid anchor event.
if len(event.Data) < 12 {
return nil
}
isKnown := func() (ok bool) {
fl.filtersMutex.RLock()
defer fl.filtersMutex.RUnlock()

if _, ok = fl.knownPrograms[event.Program]; !ok {
return ok
}

// The first 64-bits of the event data is the event sig. Because it's base64 encoded, this corresponds to
// the first 10 characters plus 4 bits of the 11th character. We can quickly rule it out as not matching any known
// discriminators if the first 10 characters don't match. If it passes that initial test, we base64-decode the
// first 12 characters, and use the first 8 bytes of that as the event sig to call MatchingFilters. The address
// also needs to be base58-decoded to pass to MatchingFilters
_, ok = fl.knownDiscriminators[event.Data[:10]]
return ok
}

// The first 64-bits of the event data is the event sig. Because it's base64 encoded, this corresponds to
// the first 10 characters plus 4 bits of the 11th character. We can quickly rule it out as not matching any known
// discriminators if the first 10 characters don't match. If it passes that initial test, we base64-decode the
// first 12 characters, and use the first 8 bytes of that as the event sig to call MatchingFilters. The address
// also needs to be base58-decoded to pass to MatchingFilters
if _, ok := fl.knownDiscriminators[event.Data[:10]]; !ok {
if !isKnown() {
return nil
}

Expand Down Expand Up @@ -421,6 +429,9 @@ func (fl *filters) LoadFilters(ctx context.Context) error {
return nil
}

// DecodeSubKey accepts raw Borsh-encoded event data, a filter ID and a subkeyPath. It uses the decoder
// associated with that filter to decode the event and extract the subkey value from the specified subKeyPath.
// WARNING: not thread safe, should only be called while fl.filtersMutex is held and after filters have been loaded.
func (fl *filters) DecodeSubKey(ctx context.Context, raw []byte, ID int64, subKeyPath []string) (any, error) {
filter, ok := fl.filtersByID[ID]
if !ok {
Expand Down

0 comments on commit 11d5d68

Please sign in to comment.