32
32
import org .apache .kafka .streams .errors .InvalidStateStoreException ;
33
33
import org .apache .kafka .streams .kstream .Consumed ;
34
34
import org .apache .kafka .streams .kstream .TimeWindowedDeserializer ;
35
- import org .apache .kafka .streams .kstream .Transformer ;
36
35
import org .apache .kafka .streams .kstream .Windowed ;
37
36
import org .apache .kafka .streams .kstream .internals .TimeWindow ;
38
- import org .apache .kafka .streams .processor .ProcessorContext ;
39
37
import org .apache .kafka .streams .processor .StateStoreContext ;
38
+ import org .apache .kafka .streams .processor .api .Processor ;
39
+ import org .apache .kafka .streams .processor .api .ProcessorContext ;
40
+ import org .apache .kafka .streams .processor .api .Record ;
40
41
import org .apache .kafka .streams .processor .internals .MockStreamsMetrics ;
41
42
import org .apache .kafka .streams .processor .internals .ProcessorRecordContext ;
42
43
import org .apache .kafka .streams .query .Position ;
@@ -102,7 +103,7 @@ public class CachingPersistentWindowStoreTest {
102
103
private static final String TOPIC = "topic" ;
103
104
private static final String CACHE_NAMESPACE = "0_0-store-name" ;
104
105
105
- private InternalMockProcessorContext context ;
106
+ private InternalMockProcessorContext <?, ?> context ;
106
107
private RocksDBSegmentedBytesStore bytesStore ;
107
108
private WindowStore <Bytes , byte []> underlyingStore ;
108
109
private CachingWindowStore cachingStore ;
@@ -138,8 +139,8 @@ public void shouldDelegateDeprecatedInit() {
138
139
final WindowStore <Bytes , byte []> inner = mock (WindowStore .class );
139
140
final CachingWindowStore outer = new CachingWindowStore (inner , WINDOW_SIZE , SEGMENT_INTERVAL );
140
141
when (inner .name ()).thenReturn ("store" );
141
- outer .init ((ProcessorContext ) context , outer );
142
- verify (inner ).init ((ProcessorContext ) context , outer );
142
+ outer .init ((org . apache . kafka . streams . processor . ProcessorContext ) context , outer );
143
+ verify (inner ).init ((org . apache . kafka . streams . processor . ProcessorContext ) context , outer );
143
144
}
144
145
145
146
@ SuppressWarnings ("unchecked" )
@@ -153,30 +154,28 @@ public void shouldDelegateInit() {
153
154
}
154
155
155
156
@ Test
156
- @ SuppressWarnings ("deprecation" )
157
157
public void shouldNotReturnDuplicatesInRanges () {
158
158
final StreamsBuilder builder = new StreamsBuilder ();
159
159
160
160
final StoreBuilder <WindowStore <String , String >> storeBuilder = Stores .windowStoreBuilder (
161
- Stores .persistentWindowStore ("store-name" , ofHours (1L ), ofMinutes (1L ), false ),
162
- Serdes .String (),
163
- Serdes .String ())
161
+ Stores .persistentWindowStore ("store-name" , ofHours (1L ), ofMinutes (1L ), false ),
162
+ Serdes .String (),
163
+ Serdes .String ())
164
164
.withCachingEnabled ();
165
165
166
166
builder .addStateStore (storeBuilder );
167
167
168
168
builder .stream (TOPIC ,
169
169
Consumed .with (Serdes .String (), Serdes .String ()))
170
- .transform (() -> new Transformer <String , String , KeyValue < String , String > >() {
170
+ .process (() -> new Processor <String , String , String , String >() {
171
171
private WindowStore <String , String > store ;
172
172
private int numRecordsProcessed ;
173
- private ProcessorContext context ;
173
+ private ProcessorContext < String , String > context ;
174
174
175
- @ SuppressWarnings ("unchecked" )
176
175
@ Override
177
- public void init (final ProcessorContext processorContext ) {
176
+ public void init (final ProcessorContext < String , String > processorContext ) {
178
177
this .context = processorContext ;
179
- this .store = ( WindowStore < String , String >) processorContext .getStateStore ("store-name" );
178
+ this .store = processorContext .getStateStore ("store-name" );
180
179
int count = 0 ;
181
180
182
181
try (final KeyValueIterator <Windowed <String >, String > all = store .all ()) {
@@ -190,7 +189,7 @@ public void init(final ProcessorContext processorContext) {
190
189
}
191
190
192
191
@ Override
193
- public KeyValue < String , String > transform (final String key , final String value ) {
192
+ public void process (final Record < String , String > record ) {
194
193
int count = 0 ;
195
194
196
195
try (final KeyValueIterator <Windowed <String >, String > all = store .all ()) {
@@ -202,22 +201,18 @@ public KeyValue<String, String> transform(final String key, final String value)
202
201
203
202
assertThat (count , equalTo (numRecordsProcessed ));
204
203
205
- store .put (value , value , context .timestamp ());
204
+ store .put (record . value (), record . value (), record .timestamp ());
206
205
207
206
numRecordsProcessed ++;
208
207
209
- return new KeyValue <>(key , value );
210
- }
211
-
212
- @ Override
213
- public void close () {
208
+ context .forward (record );
214
209
}
215
210
}, "store-name" );
216
211
217
212
final Properties streamsConfiguration = new Properties ();
218
213
streamsConfiguration .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
219
- streamsConfiguration .put (StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .String (). getClass (). getName () );
220
- streamsConfiguration .put (StreamsConfig .DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes .String (). getClass (). getName () );
214
+ streamsConfiguration .put (StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .StringSerde . class );
215
+ streamsConfiguration .put (StreamsConfig .DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes .StringSerde . class );
221
216
streamsConfiguration .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory ().getPath ());
222
217
streamsConfiguration .put (StreamsConfig .COMMIT_INTERVAL_MS_CONFIG , 10 * 1000L );
223
218
0 commit comments