Skip to content

Commit

Permalink
fix panic
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored and Sidddddarth committed Oct 25, 2024
1 parent 8e12242 commit 9cb1ff2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
17 changes: 8 additions & 9 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
for _, singularEvent := range gatewayBatchEvent.Batch {
messageId := stringify.Any(singularEvent["messageId"])
payloadFunc := ro.Memoize(func() json.RawMessage {
return getEventFromBatch(batchEvent.EventPayload)
return getEventFromBatch(batchEvent.EventPayload, singularEvent)
})
dedupKey := dedupTypes.KeyValue{
Key: fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId),
Expand Down Expand Up @@ -2325,7 +2325,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans
messageId := stringify.Any(singularEvent["messageId"])

payloadFunc := ro.Memoize(func() json.RawMessage {
return getEventFromBatch(batchEvent.EventPayload)
return getEventFromBatch(batchEvent.EventPayload, singularEvent)
})

if proc.config.enableDedup {
Expand Down Expand Up @@ -3739,14 +3739,13 @@ func (proc *Handle) countPendingEvents(ctx context.Context) error {
})
}

func getEventFromBatch(batch []byte) []byte {
end := []byte(`], "writeKey": `)
start := []byte(`{"batch": [`)
endIndex := bytes.Index(batch, end)
if endIndex == -1 {
panic(string(batch))
func getEventFromBatch(batch []byte, singularEvent types.SingularEventT) []byte {
end := bytes.LastIndex(batch, []byte(`]`))
start := bytes.Index(batch, []byte(`[{`))
if end == -1 || start == -1 {
return getPayloadOld(singularEvent)
}
res := batch[len(start):endIndex]
res := batch[start+1 : end]
return res
}

Expand Down
14 changes: 12 additions & 2 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6985,7 +6985,7 @@ PASS
ok github.com/rudderlabs/rudder-server/processor 3.779s
*/
func BenchmarkGetPayloadOld(b *testing.B) {
payload := []byte(`{"batch": [{"type": "track", "event": "Demo Track", "sentAt": "2019-08-12T05:08:30.909Z", "channel": "android-sdk", "context": {"app": {"name": "RudderAndroidClient", "build": "1", "version": "1.0", "namespace": "com.rudderlabs.android.sdk"}, "device": {"id": "49e4bdd1c280bc00", "name": "generic_x86", "model": "Android SDK built for x86", "manufacturer": "Google"}, "locale": "en-US", "screen": {"width": 1080, "height": 1794, "density": 420}, "traits": {"anonymousId": "49e4bdd1c280bc00"}, "library": {"name": "com.rudderstack.android.sdk.core"}, "network": {"carrier": "Android"}, "user_agent": "Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"}, "rudderId": "90ca6da0-292e-4e79-9880-f8009e0ae4a3", "messageId": "71d09e20-1d96-42d6-8642-d1319b9e3ce9", "properties": {"label": "Demo Label", "value": 5, "testMap": {"t1": "a", "t2": 4}, "category": "Demo Category", "floatVal": 4.501, "testArray": [{"id": "elem1", "value": "e1"}, {"id": "elem2", "value": "e2"}]}, "receivedAt": "2024-10-22T17:30:03.697+05:30", "request_ip": "[::1]", "anonymousId": "anon_id", "integrations": {"All": true}, "originalTimestamp": "2019-08-12T05:08:30.909Z"}], "writeKey": "2Lj2CdRryCKqlWkUtaKV6Kb2NDP", "requestIP": "[::1]", "receivedAt": "2024-10-22T17:30:03.697+05:30"}`)
payload := []byte(`{"batch":[{"_order":{"from":"wrk-deployment-js-6546f8cf66-g7hgz","item":"1","req":"301527","size":"3","thread":"3"},"anonymousId":"36BJBqlido81qfiHe9ju","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"50e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"event_2","integrations":{"All":true},"originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","label":"Demo Label","value":5},"responseTime":"0s","sentAt":"2024-10-25T14:24:02.000+0000","successProbability":1,"type":"track","messageId":"2eae1263-921d-43c2-aeea-8193faf6f616","rudderId":"f20501a9-e741-43b7-82e9-f64f07564d68","receivedAt":"2024-10-25T14:24:03.306Z","request_ip":"10.9.196.79"}],"receivedAt":"2024-10-25T14:24:03.306Z","requestIP":"10.9.196.79","writeKey":"2nVv1Uge9ebQK7pGD2qP9XNY70k"}`)

var gatewayBatchEvent types.GatewayBatchRequest
err := json.Unmarshal(payload, &gatewayBatchEvent)
Expand All @@ -7003,7 +7003,17 @@ func BenchmarkGetPayloadOld(b *testing.B) {
b.ResetTimer()
b.Run("GetPayload", func(b *testing.B) {
for i := 0; i < b.N; i++ {
getEventFromBatch(payload)
getEventFromBatch(payload, nil)
}
})
}

func TestGetEventFromBatch(t *testing.T) {
payload := []byte(`{"batch": [{"_order":{"from":"wrk-deployment-js-6546f8cf66-g7hgz","item":"1","req":"301527","size":"3","thread":"3"},"anonymousId":"36BJBqlido81qfiHe9ju","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"50e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"event_2","integrations":{"All":true},"originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","label":"Demo Label","value":5},"responseTime":"0s","sentAt":"2024-10-25T14:24:02.000+0000","successProbability":1,"type":"track","messageId":"2eae1263-921d-43c2-aeea-8193faf6f616","rudderId":"f20501a9-e741-43b7-82e9-f64f07564d68","receivedAt":"2024-10-25T14:24:03.306Z","request_ip":"10.9.196.79"}],"receivedAt":"2024-10-25T14:24:03.306Z","requestIP":"10.9.196.79","writeKey":"2nVv1Uge9ebQK7pGD2qP9XNY70k"}`)
event := getEventFromBatch(payload, nil)
require.Equal(t, `{"_order":{"from":"wrk-deployment-js-6546f8cf66-g7hgz","item":"1","req":"301527","size":"3","thread":"3"},"anonymousId":"36BJBqlido81qfiHe9ju","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"50e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"event_2","integrations":{"All":true},"originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","label":"Demo Label","value":5},"responseTime":"0s","sentAt":"2024-10-25T14:24:02.000+0000","successProbability":1,"type":"track","messageId":"2eae1263-921d-43c2-aeea-8193faf6f616","rudderId":"f20501a9-e741-43b7-82e9-f64f07564d68","receivedAt":"2024-10-25T14:24:03.306Z","request_ip":"10.9.196.79"}`, string(event))

payload = []byte(`{"writeKey":"2nVv1Uge9ebQK7pGD2qP9XNY70k", "batch": [{"_order":{"from":"wrk-deployment-js-6546f8cf66-g7hgz","item":"1","req":"301527","size":"3","thread":"3"},"anonymousId":"36BJBqlido81qfiHe9ju","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"50e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"event_2","integrations":{"All":true},"originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","label":"Demo Label","value":5},"responseTime":"0s","sentAt":"2024-10-25T14:24:02.000+0000","successProbability":1,"type":"track","messageId":"2eae1263-921d-43c2-aeea-8193faf6f616","rudderId":"f20501a9-e741-43b7-82e9-f64f07564d68","receivedAt":"2024-10-25T14:24:03.306Z","request_ip":"10.9.196.79"}],"receivedAt":"2024-10-25T14:24:03.306Z","requestIP":"10.9.196.79"}`)
event = getEventFromBatch(payload, nil)
require.Equal(t, `{"_order":{"from":"wrk-deployment-js-6546f8cf66-g7hgz","item":"1","req":"301527","size":"3","thread":"3"},"anonymousId":"36BJBqlido81qfiHe9ju","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"50e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"event_2","integrations":{"All":true},"originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","label":"Demo Label","value":5},"responseTime":"0s","sentAt":"2024-10-25T14:24:02.000+0000","successProbability":1,"type":"track","messageId":"2eae1263-921d-43c2-aeea-8193faf6f616","rudderId":"f20501a9-e741-43b7-82e9-f64f07564d68","receivedAt":"2024-10-25T14:24:03.306Z","request_ip":"10.9.196.79"}`, string(event))
}

0 comments on commit 9cb1ff2

Please sign in to comment.