1+ package grpc
2+
3+ import (
4+ "testing"
5+ "time"
6+
7+ "github.com/netobserv/loki-client-go/pkg/logproto"
8+ "github.com/prometheus/common/model"
9+ "github.com/stretchr/testify/assert"
10+ "github.com/stretchr/testify/require"
11+ )
12+
13+ func TestNewBatch (t * testing.T ) {
14+ tenantID := "test-tenant"
15+
16+ // Test empty batch
17+ b := newBatch (tenantID )
18+ assert .Equal (t , tenantID , b .tenantID )
19+ assert .Equal (t , 0 , b .bytes )
20+ assert .True (t , b .isEmpty ())
21+ assert .Equal (t , 0 , b .streamCount ())
22+ assert .Equal (t , 0 , b .entryCount ())
23+
24+ // Test batch with initial entries
25+ entry1 := entry {
26+ tenantID : tenantID ,
27+ labels : model.LabelSet {"job" : "test" },
28+ Entry : logproto.Entry {
29+ Timestamp : time .Now (),
30+ Line : "test log line 1" ,
31+ },
32+ }
33+
34+ entry2 := entry {
35+ tenantID : tenantID ,
36+ labels : model.LabelSet {"job" : "test" },
37+ Entry : logproto.Entry {
38+ Timestamp : time .Now (),
39+ Line : "test log line 2" ,
40+ },
41+ }
42+
43+ b2 := newBatch (tenantID , entry1 , entry2 )
44+ assert .Equal (t , tenantID , b2 .tenantID )
45+ assert .Equal (t , len ("test log line 1" )+ len ("test log line 2" ), b2 .bytes )
46+ assert .False (t , b2 .isEmpty ())
47+ assert .Equal (t , 1 , b2 .streamCount ()) // Same labels, so same stream
48+ assert .Equal (t , 2 , b2 .entryCount ())
49+ }
50+
51+ func TestBatchAdd (t * testing.T ) {
52+ tenantID := "test-tenant"
53+ b := newBatch (tenantID )
54+
55+ entry := entry {
56+ tenantID : tenantID ,
57+ labels : model.LabelSet {"job" : "test" , "instance" : "localhost" },
58+ Entry : logproto.Entry {
59+ Timestamp : time .Now (),
60+ Line : "test log line" ,
61+ },
62+ }
63+
64+ // Add first entry
65+ b .add (entry )
66+ assert .Equal (t , len ("test log line" ), b .bytes )
67+ assert .Equal (t , 1 , b .streamCount ())
68+ assert .Equal (t , 1 , b .entryCount ())
69+
70+ // Add entry with same labels (should go to same stream)
71+ entry2 := entry
72+ entry2 .Line = "another line"
73+ b .add (entry2 )
74+ assert .Equal (t , len ("test log line" )+ len ("another line" ), b .bytes )
75+ assert .Equal (t , 1 , b .streamCount ())
76+ assert .Equal (t , 2 , b .entryCount ())
77+
78+ // Add entry with different labels (should create new stream)
79+ entry3 := entry
80+ entry3 .labels = model.LabelSet {"job" : "different" }
81+ entry3 .Line = "different stream"
82+ b .add (entry3 )
83+ assert .Equal (t , len ("test log line" )+ len ("another line" )+ len ("different stream" ), b .bytes )
84+ assert .Equal (t , 2 , b .streamCount ())
85+ assert .Equal (t , 3 , b .entryCount ())
86+ }
87+
88+ func TestBatchSizeBytes (t * testing.T ) {
89+ tenantID := "test-tenant"
90+ b := newBatch (tenantID )
91+
92+ entry := entry {
93+ tenantID : tenantID ,
94+ labels : model.LabelSet {"job" : "test" },
95+ Entry : logproto.Entry {
96+ Timestamp : time .Now (),
97+ Line : "test" ,
98+ },
99+ }
100+
101+ assert .Equal (t , 0 , b .sizeBytes ())
102+
103+ expectedSize := len ("test" )
104+ assert .Equal (t , expectedSize , b .sizeBytesAfter (entry ))
105+
106+ b .add (entry )
107+ assert .Equal (t , expectedSize , b .sizeBytes ())
108+ }
109+
110+ func TestBatchAge (t * testing.T ) {
111+ tenantID := "test-tenant"
112+ b := newBatch (tenantID )
113+
114+ // Should be very recent
115+ age := b .age ()
116+ assert .True (t , age < 100 * time .Millisecond )
117+
118+ // Wait a bit and check again
119+ time .Sleep (10 * time .Millisecond )
120+ age2 := b .age ()
121+ assert .True (t , age2 > age )
122+ }
123+
124+ func TestCreatePushRequest (t * testing.T ) {
125+ tenantID := "test-tenant"
126+ timestamp := time .Now ()
127+
128+ entry1 := entry {
129+ tenantID : tenantID ,
130+ labels : model.LabelSet {"job" : "test1" },
131+ Entry : logproto.Entry {
132+ Timestamp : timestamp ,
133+ Line : "line 1" ,
134+ },
135+ }
136+
137+ entry2 := entry {
138+ tenantID : tenantID ,
139+ labels : model.LabelSet {"job" : "test1" },
140+ Entry : logproto.Entry {
141+ Timestamp : timestamp .Add (time .Second ),
142+ Line : "line 2" ,
143+ },
144+ }
145+
146+ entry3 := entry {
147+ tenantID : tenantID ,
148+ labels : model.LabelSet {"job" : "test2" },
149+ Entry : logproto.Entry {
150+ Timestamp : timestamp .Add (2 * time .Second ),
151+ Line : "line 3" ,
152+ },
153+ }
154+
155+ b := newBatch (tenantID , entry1 , entry2 , entry3 )
156+
157+ req , entriesCount := b .createPushRequest ()
158+ require .NotNil (t , req )
159+ assert .Equal (t , 3 , entriesCount )
160+ assert .Equal (t , 2 , len (req .Streams )) // Two different label sets
161+
162+ // Check streams
163+ streamsByLabel := make (map [string ]logproto.Stream )
164+ for _ , stream := range req .Streams {
165+ streamsByLabel [stream .Labels ] = stream
166+ }
167+
168+ // Check first stream (job=test1)
169+ stream1 , exists := streamsByLabel [`{job="test1"}` ]
170+ require .True (t , exists )
171+ assert .Equal (t , 2 , len (stream1 .Entries ))
172+ assert .Equal (t , "line 1" , stream1 .Entries [0 ].Line )
173+ assert .Equal (t , "line 2" , stream1 .Entries [1 ].Line )
174+
175+ // Check second stream (job=test2)
176+ stream2 , exists := streamsByLabel [`{job="test2"}` ]
177+ require .True (t , exists )
178+ assert .Equal (t , 1 , len (stream2 .Entries ))
179+ assert .Equal (t , "line 3" , stream2 .Entries [0 ].Line )
180+ }
181+
182+ func TestBatchIsEmpty (t * testing.T ) {
183+ tenantID := "test-tenant"
184+ b := newBatch (tenantID )
185+ assert .True (t , b .isEmpty ())
186+
187+ entry := entry {
188+ tenantID : tenantID ,
189+ labels : model.LabelSet {"job" : "test" },
190+ Entry : logproto.Entry {
191+ Timestamp : time .Now (),
192+ Line : "test" ,
193+ },
194+ }
195+
196+ b .add (entry )
197+ assert .False (t , b .isEmpty ())
198+ }
0 commit comments