@@ -25,9 +25,16 @@ import akka.testkit.TestActor.AutoPilot
25
25
import akka .testkit .{ImplicitSender , TestActor , TestDuration , TestKit , TestProbe }
26
26
import com .amazonaws .services .kinesis .clientlibrary .interfaces .IRecordProcessorCheckpointer
27
27
import com .amazonaws .services .kinesis .clientlibrary .lib .worker .Worker
28
- import com .amazonaws .services .kinesis .clientlibrary .types .{InitializationInput , ProcessRecordsInput , UserRecord }
28
+ import com .amazonaws .services .kinesis .clientlibrary .types .{
29
+ InitializationInput ,
30
+ ProcessRecordsInput ,
31
+ UserRecord
32
+ }
29
33
import com .amazonaws .services .kinesis .model .Record
30
- import com .weightwatchers .reactive .kinesis .consumer .ConsumerWorker .{ProcessEvents , ProcessingComplete }
34
+ import com .weightwatchers .reactive .kinesis .consumer .ConsumerWorker .{
35
+ ProcessEvents ,
36
+ ProcessingComplete
37
+ }
31
38
import com .weightwatchers .reactive .kinesis .models .{CompoundSequenceNumber , ConsumerEvent }
32
39
import org .joda .time .{DateTime , DateTimeZone }
33
40
import org .mockito .Mockito
@@ -43,7 +50,7 @@ import scala.concurrent.duration.{DurationDouble, FiniteDuration}
43
50
import scala .concurrent .{Await , Future , Promise }
44
51
45
52
class ConsumerProcessingManagerSpec
46
- extends TestKit (ActorSystem (" checkpoint-worker-spec" ))
53
+ extends TestKit (ActorSystem (" checkpoint-worker-spec" ))
47
54
with ImplicitSender
48
55
with FreeSpecLike
49
56
with Matchers
@@ -61,10 +68,10 @@ class ConsumerProcessingManagerSpec
61
68
Await .result(system.whenTerminated, 5 .seconds)
62
69
}
63
70
64
- " The KinesisRecordProcessingManager " - {
71
+ " The ConsumerProcessingManager " - {
65
72
" Should set the shardId on init" in {
66
- val worker = TestProbe ()
67
- val kcl = mock[Worker ]
73
+ val worker = TestProbe ()
74
+ val kcl = mock[Worker ]
68
75
val shardId = " 12345"
69
76
70
77
val manager = new ConsumerProcessingManager (worker.ref, kcl, batchTimeout)
@@ -84,17 +91,17 @@ class ConsumerProcessingManagerSpec
84
91
val worker = TestProbe ()
85
92
worker.setAutoPilot(workerAutoPilot(workerResponse))
86
93
87
- val kcl = mock[Worker ]
94
+ val kcl = mock[Worker ]
88
95
val shardId = " 12345"
89
96
90
97
val checkpointer = mock[IRecordProcessorCheckpointer ]
91
98
92
- val record1 = buildRecord(" 1" , " payload1" , new Date ())
93
- val record2 = buildRecord(" 2" , " payload2" , new Date ())
94
- val record3 = buildRecord(" 3" , " payload3" , new Date ())
95
- val record4 = buildAggregatedRecord(" 4" , 1 , " payload3" , new Date ())
96
- val record5 = buildAggregatedRecord(" 4" , 2 , " payload3" , new Date ())
97
- val record6 = buildAggregatedRecord(" 4" , 3 , " payload3" , new Date ())
99
+ val record1 = buildRecord(" 1" , " payload1" , new Date ())
100
+ val record2 = buildRecord(" 2" , " payload2" , new Date ())
101
+ val record3 = buildRecord(" 3" , " payload3" , new Date ())
102
+ val record4 = buildAggregatedRecord(" 4" , 1 , " payload3" , new Date ())
103
+ val record5 = buildAggregatedRecord(" 4" , 2 , " payload3" , new Date ())
104
+ val record6 = buildAggregatedRecord(" 4" , 3 , " payload3" , new Date ())
98
105
val records : Seq [Record ] = Seq (record1, record2, record3, record4, record5, record6)
99
106
100
107
val processInput = new ProcessRecordsInput ()
@@ -113,11 +120,11 @@ class ConsumerProcessingManagerSpec
113
120
// validate the probe received the Seq of ConsumerEvents
114
121
val expectedMsg = ProcessEvents (
115
122
ArrayBuffer (toConsumerEvent(record1),
116
- toConsumerEvent(record2),
117
- toConsumerEvent(record3),
118
- toConsumerEvent(record4),
119
- toConsumerEvent(record5),
120
- toConsumerEvent(record6)),
123
+ toConsumerEvent(record2),
124
+ toConsumerEvent(record3),
125
+ toConsumerEvent(record4),
126
+ toConsumerEvent(record5),
127
+ toConsumerEvent(record6)),
121
128
checkpointer,
122
129
shardId
123
130
)
@@ -144,9 +151,9 @@ class ConsumerProcessingManagerSpec
144
151
145
152
" When the response is a failed batch it should shutdown and stop processing" in new ProcessingSetup {
146
153
147
- workerResponse.success(ProcessingComplete (false )) // complete with a failed batch
148
-
149
154
whenReady(processResult) { _ =>
155
+ workerResponse.success(ProcessingComplete (false )) // complete with a failed batch
156
+
150
157
processResult.isCompleted should be(true )
151
158
152
159
manager.shuttingDown.get() should be(true )
@@ -181,10 +188,10 @@ class ConsumerProcessingManagerSpec
181
188
// use a promise to block the response
182
189
val worker = TestProbe ()
183
190
184
- val kcl = mock[Worker ]
191
+ val kcl = mock[Worker ]
185
192
val shardId = " 12345"
186
193
187
- val checkpointer = mock[IRecordProcessorCheckpointer ]
194
+ val checkpointer = mock[IRecordProcessorCheckpointer ]
188
195
val records : Seq [Record ] = Seq (buildRecord(" 1" , " payload1" , new Date ()))
189
196
val processInput = new ProcessRecordsInput ()
190
197
.withCheckpointer(checkpointer)
0 commit comments