@@ -194,7 +194,6 @@ def _test_transform_with_state_basic(
194
194
q .awaitTermination (10 )
195
195
self .assertTrue (q .exception () is None )
196
196
197
- @unittest .skip ("Temporarily disabled for testing" )
198
197
def test_transform_with_state_basic (self ):
199
198
def check_results (batch_df , batch_id ):
200
199
batch_df .collect ()
@@ -211,7 +210,6 @@ def check_results(batch_df, batch_id):
211
210
212
211
self ._test_transform_with_state_basic (SimpleStatefulProcessorFactory (), check_results )
213
212
214
- @unittest .skip ("Temporarily disabled for testing" )
215
213
def test_transform_with_state_non_exist_value_state (self ):
216
214
def check_results (batch_df , _ ):
217
215
batch_df .collect ()
@@ -224,7 +222,6 @@ def check_results(batch_df, _):
224
222
InvalidSimpleStatefulProcessorFactory (), check_results , True
225
223
)
226
224
227
- @unittest .skip ("Temporarily disabled for testing" )
228
225
def test_transform_with_state_query_restarts (self ):
229
226
root_path = tempfile .mkdtemp ()
230
227
input_path = root_path + "/input"
@@ -299,7 +296,6 @@ def test_transform_with_state_query_restarts(self):
299
296
Row (id = "1" , countAsString = "2" ),
300
297
}
301
298
302
- @unittest .skip ("Temporarily disabled for testing" )
303
299
def test_transform_with_state_list_state (self ):
304
300
def check_results (batch_df , _ ):
305
301
batch_df .collect ()
@@ -312,7 +308,6 @@ def check_results(batch_df, _):
312
308
ListStateProcessorFactory (), check_results , True , "processingTime"
313
309
)
314
310
315
- @unittest .skip ("Temporarily disabled for testing" )
316
311
def test_transform_with_state_list_state_large_list (self ):
317
312
def check_results (batch_df , batch_id ):
318
313
batch_df .collect ()
@@ -388,7 +383,6 @@ def check_results(batch_df, batch_id):
388
383
self .assertTrue (q .exception () is None )
389
384
390
385
# test list state with ttl has the same behavior as list state when state doesn't expire.
391
- @unittest .skip ("Temporarily disabled for testing" )
392
386
def test_transform_with_state_list_state_large_ttl (self ):
393
387
def check_results (batch_df , batch_id ):
394
388
batch_df .collect ()
@@ -401,7 +395,6 @@ def check_results(batch_df, batch_id):
401
395
ListStateLargeTTLProcessorFactory (), check_results , True , "processingTime"
402
396
)
403
397
404
- @unittest .skip ("Temporarily disabled for testing" )
405
398
def test_transform_with_state_map_state (self ):
406
399
def check_results (batch_df , _ ):
407
400
batch_df .collect ()
@@ -413,7 +406,6 @@ def check_results(batch_df, _):
413
406
self ._test_transform_with_state_basic (MapStateProcessorFactory (), check_results , True )
414
407
415
408
# test map state with ttl has the same behavior as map state when state doesn't expire.
416
- @unittest .skip ("Temporarily disabled for testing" )
417
409
def test_transform_with_state_map_state_large_ttl (self ):
418
410
def check_results (batch_df , batch_id ):
419
411
batch_df .collect ()
@@ -428,7 +420,6 @@ def check_results(batch_df, batch_id):
428
420
429
421
# test value state with ttl has the same behavior as value state when
430
422
# state doesn't expire.
431
- @unittest .skip ("Temporarily disabled for testing" )
432
423
def test_value_state_ttl_basic (self ):
433
424
def check_results (batch_df , batch_id ):
434
425
batch_df .collect ()
@@ -449,7 +440,6 @@ def check_results(batch_df, batch_id):
449
440
450
441
# TODO SPARK-50908 holistic fix for TTL suite
451
442
@unittest .skip ("test is flaky and it is only a timing issue, skipping until we can resolve" )
452
- @unittest .skip ("Temporarily disabled for testing" )
453
443
def test_value_state_ttl_expiration (self ):
454
444
def check_results (batch_df , batch_id ):
455
445
batch_df .collect ()
@@ -599,7 +589,6 @@ def _test_transform_with_state_proc_timer(self, stateful_processor_factory, chec
599
589
q .awaitTermination (10 )
600
590
self .assertTrue (q .exception () is None )
601
591
602
- @unittest .skip ("Temporarily disabled for testing" )
603
592
def test_transform_with_state_proc_timer (self ):
604
593
def check_results (batch_df , batch_id ):
605
594
batch_df .collect ()
@@ -717,7 +706,6 @@ def prepare_batch3(input_path):
717
706
q .awaitTermination (10 )
718
707
self .assertTrue (q .exception () is None )
719
708
720
- @unittest .skip ("Temporarily disabled for testing" )
721
709
def test_transform_with_state_event_time (self ):
722
710
def check_results (batch_df , batch_id ):
723
711
batch_df .collect ()
@@ -750,7 +738,6 @@ def check_results(batch_df, batch_id):
750
738
EventTimeStatefulProcessorFactory (), check_results
751
739
)
752
740
753
- @unittest .skip ("Temporarily disabled for testing" )
754
741
def test_transform_with_state_with_wmark_and_non_event_time (self ):
755
742
def check_results (batch_df , batch_id ):
756
743
batch_df .collect ()
@@ -849,7 +836,6 @@ def _test_transform_with_state_init_state(
849
836
q .awaitTermination (10 )
850
837
self .assertTrue (q .exception () is None )
851
838
852
- @unittest .skip ("Temporarily disabled for testing" )
853
839
def test_transform_with_state_init_state (self ):
854
840
def check_results (batch_df , batch_id ):
855
841
batch_df .collect ()
@@ -874,7 +860,6 @@ def check_results(batch_df, batch_id):
874
860
SimpleStatefulProcessorWithInitialStateFactory (), check_results
875
861
)
876
862
877
- @unittest .skip ("Temporarily disabled for testing" )
878
863
def test_transform_with_state_init_state_with_extra_transformation (self ):
879
864
def check_results (batch_df , batch_id ):
880
865
batch_df .collect ()
@@ -954,7 +939,6 @@ def _test_transform_with_state_non_contiguous_grouping_cols(
954
939
q .awaitTermination (10 )
955
940
self .assertTrue (q .exception () is None )
956
941
957
- @unittest .skip ("Temporarily disabled for testing" )
958
942
def test_transform_with_state_non_contiguous_grouping_cols (self ):
959
943
def check_results (batch_df , batch_id ):
960
944
batch_df .collect ()
@@ -967,7 +951,6 @@ def check_results(batch_df, batch_id):
967
951
SimpleStatefulProcessorWithInitialStateFactory (), check_results
968
952
)
969
953
970
- @unittest .skip ("Temporarily disabled for testing" )
971
954
def test_transform_with_state_non_contiguous_grouping_cols_with_init_state (self ):
972
955
def check_results (batch_df , batch_id ):
973
956
batch_df .collect ()
@@ -1051,7 +1034,6 @@ def _test_transform_with_state_chaining_ops(
1051
1034
q .processAllAvailable ()
1052
1035
q .awaitTermination (10 )
1053
1036
1054
- @unittest .skip ("Temporarily disabled for testing" )
1055
1037
def test_transform_with_state_chaining_ops (self ):
1056
1038
def check_results (batch_df , batch_id ):
1057
1039
batch_df .collect ()
@@ -1088,7 +1070,6 @@ def check_results(batch_df, batch_id):
1088
1070
["outputTimestamp" , "id" ],
1089
1071
)
1090
1072
1091
- @unittest .skip ("Temporarily disabled for testing" )
1092
1073
def test_transform_with_state_init_state_with_timers (self ):
1093
1074
def check_results (batch_df , batch_id ):
1094
1075
batch_df .collect ()
@@ -1119,7 +1100,6 @@ def check_results(batch_df, batch_id):
1119
1100
StatefulProcessorWithInitialStateTimersFactory (), check_results , "processingTime"
1120
1101
)
1121
1102
1122
- @unittest .skip ("Temporarily disabled for testing" )
1123
1103
def test_transform_with_state_batch_query (self ):
1124
1104
data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
1125
1105
df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1151,7 +1131,6 @@ def test_transform_with_state_batch_query(self):
1151
1131
Row (id = "1" , countAsString = "2" ),
1152
1132
}
1153
1133
1154
- @unittest .skip ("Temporarily disabled for testing" )
1155
1134
def test_transform_with_state_batch_query_initial_state (self ):
1156
1135
data = [("0" , 123 ), ("0" , 46 ), ("1" , 146 ), ("1" , 346 )]
1157
1136
df = self .spark .createDataFrame (data , "id string, temperature int" )
@@ -1196,11 +1175,9 @@ def test_transform_with_state_batch_query_initial_state(self):
1196
1175
@unittest .skipIf (
1197
1176
"COVERAGE_PROCESS_START" in os .environ , "Flaky with coverage enabled, skipping for now."
1198
1177
)
1199
- @unittest .skip ("Temporarily disabled for testing" )
1200
1178
def test_transform_with_map_state_metadata (self ):
1201
1179
self ._test_transform_with_map_state_metadata (None )
1202
1180
1203
- @unittest .skip ("Temporarily disabled for testing" )
1204
1181
def test_transform_with_map_state_metadata_with_init_state (self ):
1205
1182
# run the same test suite again but with no-op initial state
1206
1183
# TWS with initial state is using a different python runner
@@ -1333,7 +1310,6 @@ def check_results(batch_df, batch_id):
1333
1310
)
1334
1311
1335
1312
# This test covers multiple list state variables and flatten option
1336
- @unittest .skip ("Temporarily disabled for testing" )
1337
1313
def test_transform_with_list_state_metadata (self ):
1338
1314
checkpoint_path = tempfile .mktemp ()
1339
1315
@@ -1414,7 +1390,6 @@ def check_results(batch_df, batch_id):
1414
1390
1415
1391
# This test covers value state variable and read change feed,
1416
1392
# snapshotStartBatchId related options
1417
- @unittest .skip ("Temporarily disabled for testing" )
1418
1393
def test_transform_with_value_state_metadata (self ):
1419
1394
checkpoint_path = tempfile .mktemp ()
1420
1395
@@ -1505,7 +1480,6 @@ def check_results(batch_df, batch_id):
1505
1480
checkpoint_path = checkpoint_path ,
1506
1481
)
1507
1482
1508
- @unittest .skip ("Temporarily disabled for testing" )
1509
1483
def test_transform_with_state_restart_with_multiple_rows_init_state (self ):
1510
1484
def check_results (batch_df , _ ):
1511
1485
batch_df .collect ()
@@ -1565,7 +1539,6 @@ def dataframe_to_value_list(output_df):
1565
1539
initial_state = init_df ,
1566
1540
)
1567
1541
1568
- @unittest .skip ("Temporarily disabled for testing" )
1569
1542
def test_transform_with_state_in_pandas_composite_type (self ):
1570
1543
def check_results (batch_df , batch_id ):
1571
1544
if batch_id == 0 :
@@ -1624,7 +1597,6 @@ def check_results(batch_df, batch_id):
1624
1597
)
1625
1598
1626
1599
# run the same test suites again but with single shuffle partition
1627
- @unittest .skip ("Temporarily disabled for testing" )
1628
1600
def test_transform_with_state_with_timers_single_partition (self ):
1629
1601
with self .sql_conf ({"spark.sql.shuffle.partitions" : "1" }):
1630
1602
self .test_transform_with_state_init_state_with_timers ()
@@ -1673,7 +1645,6 @@ def _run_evolution_test(self, processor_factory, checkpoint_dir, check_results,
1673
1645
q .processAllAvailable ()
1674
1646
q .awaitTermination (10 )
1675
1647
1676
- @unittest .skip ("Temporarily disabled for testing" )
1677
1648
def test_schema_evolution_scenarios (self ):
1678
1649
"""Test various schema evolution scenarios"""
1679
1650
with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
@@ -1742,7 +1713,6 @@ def check_upcast(batch_df, batch_id):
1742
1713
1743
1714
# This test case verifies that an exception is thrown when downcasting, which violates
1744
1715
# Avro's schema evolution rules
1745
- @unittest .skip ("Temporarily disabled for testing" )
1746
1716
def test_schema_evolution_fails (self ):
1747
1717
with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
1748
1718
with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -1795,7 +1765,6 @@ def check_basic_state(batch_df, batch_id):
1795
1765
and "Schema evolution is not possible" in error_msg
1796
1766
)
1797
1767
1798
- @unittest .skip ("Temporarily disabled for testing" )
1799
1768
def test_not_nullable_fails (self ):
1800
1769
with self .sql_conf ({"spark.sql.streaming.stateStore.encodingFormat" : "avro" }):
1801
1770
with tempfile .TemporaryDirectory () as checkpoint_dir :
@@ -1828,7 +1797,6 @@ def check_basic_state(batch_df, batch_id):
1828
1797
and "column family state must be nullable" in error_msg
1829
1798
)
1830
1799
1831
- @unittest .skip ("Temporarily disabled for testing" )
1832
1800
def test_transform_with_state_int_to_decimal_coercion (self ):
1833
1801
if not self .use_pandas ():
1834
1802
return
0 commit comments