Skip to content

Commit ab4bc47

Browse files
committed
[FLINK-10632][e2e] Running general purpose testing job with failure in
per-job mode
1 parent 25d03b6 commit ab4bc47

File tree

6 files changed

+269
-33
lines changed

6 files changed

+269
-33
lines changed

flink-end-to-end-tests/run-nightly-tests.sh

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ run_test "Running HA (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts
6161
run_test "Running HA (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true false" "skip_check_exceptions"
6262
run_test "Running HA (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_datastream.sh rocks true true" "skip_check_exceptions"
6363

64+
run_test "Running HA per-job cluster (file, async) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file true false" "skip_check_exceptions"
65+
run_test "Running HA per-job cluster (file, sync) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh file false false" "skip_check_exceptions"
66+
run_test "Running HA per-job cluster (rocks, non-incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh rocks true false" "skip_check_exceptions"
67+
run_test "Running HA per-job cluster (rocks, incremental) end-to-end test" "$END_TO_END_DIR/test-scripts/test_ha_per_job_cluster_datastream.sh rocks true true" "skip_check_exceptions"
68+
6469
run_test "Resuming Savepoint (file, async, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file true"
6570
run_test "Resuming Savepoint (file, sync, no parallelism change) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 file false"
6671
run_test "Resuming Savepoint (file, async, scale up) end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"

flink-end-to-end-tests/test-scripts/common.sh

+35-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ case "$(uname -s)" in
3333
esac
3434

3535
export EXIT_CODE=0
36+
export TASK_SLOTS_PER_TM_HA=4
3637

3738
echo "Flink dist directory: $FLINK_DIR"
3839

@@ -123,7 +124,7 @@ function create_ha_config() {
123124
jobmanager.rpc.port: 6123
124125
jobmanager.heap.mb: 1024
125126
taskmanager.heap.mb: 1024
126-
taskmanager.numberOfTaskSlots: 4
127+
taskmanager.numberOfTaskSlots: ${TASK_SLOTS_PER_TM_HA}
127128
128129
#==============================================================================
129130
# High Availability
@@ -238,9 +239,7 @@ function start_local_zk {
238239
done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg")
239240
}
240241

241-
function start_cluster {
242-
"$FLINK_DIR"/bin/start-cluster.sh
243-
242+
function wait_dispatcher_running {
244243
# wait at most 10 seconds until the dispatcher is up
245244
local QUERY_URL
246245
if [ "x$USE_SSL" = "xON" ]; then
@@ -264,6 +263,11 @@ function start_cluster {
264263
done
265264
}
266265

266+
function start_cluster {
267+
"$FLINK_DIR"/bin/start-cluster.sh
268+
wait_dispatcher_running
269+
}
270+
267271
function start_taskmanagers {
268272
tmnum=$1
269273
echo "Start ${tmnum} more task managers"
@@ -599,6 +603,33 @@ function wait_oper_metric_num_in_records {
599603
done
600604
}
601605

606+
function wait_num_of_occurence_in_logs {
607+
local text=$1
608+
local number=$2
609+
local logs
610+
if [ -z "$3" ]; then
611+
logs="standalonesession"
612+
else
613+
logs="$3"
614+
fi
615+
616+
echo "Waiting for text ${text} to appear ${number} of times in logs..."
617+
618+
while : ; do
619+
N=$(grep -o "${text}" $FLINK_DIR/log/*${logs}*.log | wc -l)
620+
621+
if [ -z $N ]; then
622+
N=0
623+
fi
624+
625+
if (( N < number )); then
626+
sleep 1
627+
else
628+
break
629+
fi
630+
done
631+
}
632+
602633
function wait_num_checkpoints {
603634
JOB=$1
604635
NUM_CHECKPOINTS=$2

flink-end-to-end-tests/test-scripts/common_ha.sh

+31-9
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ CLEARED=0
2424
JM_WATCHDOG_PID=0
2525
TM_WATCHDOG_PID=0
2626

27-
function stop_cluster_and_watchdog() {
27+
function stop_watchdogs() {
2828
if [ ${CLEARED} -eq 0 ]; then
2929

3030
if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
@@ -43,6 +43,15 @@ function stop_cluster_and_watchdog() {
4343
fi
4444
}
4545

46+
function verify_num_occurences_in_logs() {
47+
local log_pattern="$1"
48+
local text="$2"
49+
local expected_no="$3"
50+
51+
local actual_no=$(grep -r --include "*${log_pattern}*.log" -e "${text}" "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l)
52+
[[ "${expected_no}" -eq "${actual_no}" ]]
53+
}
54+
4655
function verify_logs() {
4756
local OUTPUT=$FLINK_DIR/log/*.out
4857
local JM_FAILURES=$1
@@ -56,14 +65,14 @@ function verify_logs() {
5665
fi
5766

5867
# checks that all apart from the first JM recover the failed jobgraph.
59-
if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then
68+
if ! verify_num_occurences_in_logs 'standalonesession' 'Recovered SubmittedJobGraph' ${JM_FAILURES}; then
6069
echo "FAILURE: A JM did not take over."
6170
EXIT_CODE=1
6271
fi
6372

6473
if [ "$VERIFY_CHECKPOINTS" = true ]; then
6574
# search the logs for JMs that log completed checkpoints
66-
if ! [ `grep -r --include '*standalonesession*.log' 'Completed checkpoint' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq $((JM_FAILURES + 1)) ]; then
75+
if ! verify_num_occurences_in_logs 'standalonesession' 'Completed checkpoint' $((JM_FAILURES + 1)); then
6776
echo "FAILURE: A JM did not execute the job."
6877
EXIT_CODE=1
6978
fi
@@ -77,26 +86,39 @@ function verify_logs() {
7786

7887
function jm_watchdog() {
7988
local EXPECTED_JMS=$1
80-
local IP_PORT=$2
89+
local PROCESS_NAME=$2
8190

8291
while true; do
83-
local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`;
92+
local RUNNING_JMS=`jps | grep "${PROCESS_NAME}" | wc -l`;
8493
local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
8594
for (( c=0; c<MISSING_JMS; c++ )); do
86-
"$FLINK_DIR"/bin/jobmanager.sh start "localhost" ${IP_PORT}
95+
${@:3}
8796
done
8897
sleep 1;
8998
done
9099
}
91100

101+
function start_jm_cmd {
102+
local IP_PORT=$1
103+
"$FLINK_DIR/bin/jobmanager.sh" "start" "localhost" "${IP_PORT}"
104+
}
105+
106+
#######################################
107+
# Start watchdog for JM process
108+
109+
# Arguments:
110+
# $1: expected number of jms to run
111+
# $2: process name to monitor
112+
# $3: command to start new jm
113+
#######################################
92114
function start_ha_jm_watchdog() {
93-
jm_watchdog $1 $2 &
115+
jm_watchdog $1 $2 ${@:3} &
94116
JM_WATCHDOG_PID=$!
95117
echo "Running JM watchdog @ ${JM_WATCHDOG_PID}"
96118
}
97119

98-
function kill_jm {
99-
local JM_PIDS=`jps | grep 'StandaloneSessionClusterEntrypoint' | cut -d " " -f 1`
120+
function kill_single {
121+
local JM_PIDS=`jps | grep "$1" | cut -d " " -f 1`
100122
local JM_PIDS=(${JM_PIDS[@]})
101123
local PID=${JM_PIDS[0]}
102124
kill -9 ${PID}

flink-end-to-end-tests/test-scripts/test_ha_dataset.sh

+15-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,19 @@ source "$(dirname "$0")"/common_ha.sh
2222

2323
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
2424

25+
function ha_cleanup() {
26+
# don't call ourselves again for another signal interruption
27+
trap "exit -1" INT
28+
# don't call ourselves again for normal exit
29+
trap "" EXIT
30+
31+
# kill the cluster and zookeeper
32+
stop_watchdogs
33+
}
34+
35+
trap ha_cleanup INT
36+
trap ha_cleanup EXIT
37+
2538
function run_ha_test() {
2639
local PARALLELISM=$1
2740

@@ -47,25 +60,19 @@ function run_ha_test() {
4760
wait_job_running ${JOB_ID}
4861

4962
# start the watchdog that keeps the number of JMs stable
50-
start_ha_jm_watchdog 1 "8081"
63+
start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
5164

5265
for (( c=0; c<${JM_KILLS}; c++ )); do
5366
# kill the JM and wait for watchdog to
5467
# create a new one which will take over
55-
kill_jm
68+
kill_single 'StandaloneSessionClusterEntrypoint'
5669
wait_job_running ${JOB_ID}
5770
done
5871

5972
cancel_job ${JOB_ID}
6073

6174
# do not verify checkpoints in the logs
6275
verify_logs ${JM_KILLS} false
63-
64-
# kill the cluster and zookeeper
65-
stop_cluster_and_watchdog
6676
}
6777

68-
trap stop_cluster_and_watchdog INT
69-
trap stop_cluster_and_watchdog EXIT
70-
7178
run_ha_test 4

flink-end-to-end-tests/test-scripts/test_ha_datastream.sh

+24-12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,19 @@ source "$(dirname "$0")"/common_ha.sh
2222

2323
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
2424

25+
function ha_cleanup() {
26+
# don't call ourselves again for another signal interruption
27+
trap "exit -1" INT
28+
# don't call ourselves again for normal exit
29+
trap "" EXIT
30+
31+
# kill the cluster and zookeeper
32+
stop_watchdogs
33+
}
34+
35+
trap ha_cleanup INT
36+
trap ha_cleanup EXIT
37+
2538
function run_ha_test() {
2639
local PARALLELISM=$1
2740
local BACKEND=$2
@@ -34,7 +47,12 @@ function run_ha_test() {
3447
CLEARED=0
3548

3649
# start the cluster on HA mode
37-
start_ha_cluster
50+
create_ha_config
51+
# change the pid dir to start log files always from 0, this is important for checks in the
52+
# jm killing loop
53+
set_conf "env.pid.dir" "${TEST_DATA_DIR}"
54+
start_local_zk
55+
start_cluster
3856

3957
echo "Running on HA mode: parallelism=${PARALLELISM}, backend=${BACKEND}, asyncSnapshots=${ASYNC}, and incremSnapshots=${INCREM}."
4058

@@ -58,34 +76,28 @@ function run_ha_test() {
5876
wait_job_running ${JOB_ID}
5977

6078
# start the watchdog that keeps the number of JMs stable
61-
start_ha_jm_watchdog 1 "8081"
79+
start_ha_jm_watchdog 1 "StandaloneSessionClusterEntrypoint" start_jm_cmd "8081"
6280

6381
sleep 5
6482

6583
# start the watchdog that keeps the number of TMs stable
6684
start_ha_tm_watchdog ${JOB_ID} 1
6785

6886
# let the job run for a while to take some checkpoints
69-
sleep 20
87+
wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonesession"
7088

71-
for (( c=0; c<${JM_KILLS}; c++ )); do
89+
for (( c=1; c<=${JM_KILLS}; c++ )); do
7290
# kill the JM and wait for watchdog to
7391
# create a new one which will take over
74-
kill_jm
92+
kill_single 'StandaloneSessionClusterEntrypoint'
7593
# let the job start and take some checkpoints
76-
sleep 60
94+
wait_num_of_occurence_in_logs "Completed checkpoint [1-9]* for job ${JOB_ID}" 2 "standalonesession-${c}"
7795
done
7896

7997
# verify checkpoints in the logs
8098
verify_logs ${JM_KILLS} true
81-
82-
# kill the cluster and zookeeper
83-
stop_cluster_and_watchdog
8499
}
85100

86-
trap stop_cluster_and_watchdog INT
87-
trap stop_cluster_and_watchdog EXIT
88-
89101
STATE_BACKEND_TYPE=${1:-file}
90102
STATE_BACKEND_FILE_ASYNC=${2:-true}
91103
STATE_BACKEND_ROCKS_INCREMENTAL=${3:-false}

0 commit comments

Comments
 (0)