-
Notifications
You must be signed in to change notification settings - Fork 264
/
Copy pathsentinel.c
5430 lines (4945 loc) · 237 KB
/
sentinel.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Redis Sentinel implementation
* Redis 哨兵实现
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include "hiredis.h"
#include "async.h"
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
extern char **environ;
// Redis哨兵默认的端口
#define REDIS_SENTINEL_PORT 26379
/* ======================== Sentinel global state =========================== */
// 哨兵全局状态
/* Address object, used to describe an ip:port pair. */
// 描述地址对象的结构
typedef struct sentinelAddr {
char *ip; //IP
int port; //port
} sentinelAddr;
/* A Sentinel Redis Instance object is monitoring. */
// 每个被监控的节点实例的flags值。
// 被监控的节点可以是:主节点、从节点或Sentinel节点
// 实例是主节点
#define SRI_MASTER (1<<0)
// 实例是从节点
#define SRI_SLAVE (1<<1)
// 实例是Sentinel
#define SRI_SENTINEL (1<<2)
// 实例处于主观下线状态
#define SRI_S_DOWN (1<<3) /* Subjectively down (no quorum). */
// 实例处于客观下线状态
#define SRI_O_DOWN (1<<4) /* Objectively down (confirmed by others). */
// Sentinel认为主节点下线
#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
its master is down. */
// 正在进行主节点的故障转移
#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
this master. */
// 实例是被选中的新主节点,但仍然是从节点
#define SRI_PROMOTED (1<<7) /* Slave selected for promotion. */
// 向从节点发送SLAVEOF命令
#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster> sent. */
// 正在同步主节点的从节点
#define SRI_RECONF_INPROG (1<<9) /* Slave synchronization in progress. */
// 从节点完成同步主节点
#define SRI_RECONF_DONE (1<<10) /* Slave synchronized with new master. */
// 强制对主节点进行故障迁移操作
#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
// 发送SCRIPT KILL命令给返回 -BUSY 的节点
#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
/* Note: times are in milliseconds. */
#define SENTINEL_INFO_PERIOD 10000
#define SENTINEL_PING_PERIOD 1000
#define SENTINEL_ASK_PERIOD 1000
// publish命令发布的周期
#define SENTINEL_PUBLISH_PERIOD 2000
#define SENTINEL_DEFAULT_DOWN_AFTER 30000
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
#define SENTINEL_TILT_TRIGGER 2000
#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
#define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
#define SENTINEL_MAX_PENDING_COMMANDS 100
#define SENTINEL_ELECTION_TIMEOUT 10000
#define SENTINEL_MAX_DESYNC 1000
/* Failover machine different states. */
// 没有执行故障转移
#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
// 等待故障转移开始
#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
// 选择晋升的从节点
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
// 发送slaveof no one ,使从节点变为主节点
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
// 等待从节点去晋升为主节点
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
// 给所有的从节点发送SLAVEOF newmaster命令,取复制新的主节点
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
// 监控晋升的从节点,更新一些配置
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
#define SENTINEL_MASTER_LINK_STATUS_UP 0
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
/* Generic flags that can be used with different functions.
* They use higher bits to avoid colliding with the function specific
* flags. */
#define SENTINEL_NO_FLAGS 0
#define SENTINEL_GENERATE_EVENT (1<<16)
#define SENTINEL_LEADER (1<<17)
#define SENTINEL_OBSERVER (1<<18)
/* Script execution flags and limits. */
#define SENTINEL_SCRIPT_NONE 0
#define SENTINEL_SCRIPT_RUNNING 1
#define SENTINEL_SCRIPT_MAX_QUEUE 256
#define SENTINEL_SCRIPT_MAX_RUNNING 16
#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
#define SENTINEL_SCRIPT_MAX_RETRY 10
#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
/* SENTINEL SIMULATE-FAILURE command flags. */
#define SENTINEL_SIMFAILURE_NONE 0
// 当选择完晋升的从节点之后发生模拟故障
#define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
// 当晋升从节点之后发生模拟故障
#define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)
/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
* monitoring many masters, we have different instances representing the
* same Sentinels, one per master, and we need to share the hiredis connections
* among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
* 500 outgoing connections instead of 5.
*
* So this structure represents a reference counted link in terms of the two
* hiredis connections for commands and Pub/Sub, and the fields needed for
* failure detection, since the ping/pong time are now local to the link: if
* the link is available, the instance is avaialbe. This way we don't just
* have 5 connections instead of 500, we also send 5 pings instead of 500.
*
* Links are shared only for Sentinels: master and slave instances have
* a link with refcount = 1, always. */
// 连接到sentinelRedisInstance。当我们有一组Sentinel集合监控这许多主节点时,我们允许有不同实例表示相同的Sentinel,每一个主节点一个,我们需要在他们之中共享hiredis连接。否则如果有5个Sentinel在监控100个主节点,我们需要创建500个连接,而不是5个。
// 因此,这个结构代表了命令和Pub/Sub的两个hiredis连接以及故障检测所需的字段的引用计数链接,因为ping/pong次数会局部的:如果连接可用,则实例是可达的(available)。这样我们不止有5个连接而是500个,发送5个PING而是500个
// 连接只能被Sentinels共享,主节点和从节点实例的连接引用计数总为1
typedef struct instanceLink {
// sentinelRedisInstance被共有的次数
int refcount; /* Number of sentinelRedisInstance owners. */
// 是否需要重新连接cc或pc
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
// 已发送但是未回复命令的数量
int pending_commands; /* Number of commands sent waiting for a reply. */
// 用于发送命令的异步连接
redisAsyncContext *cc; /* Hiredis context for commands. */
// 用于Pub / Sub的的异步连接
// 仅在实例为主节点时使用
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
// cc连接创建的时间
mstime_t cc_conn_time; /* cc connection time. */
// pc连接创建的时间
mstime_t pc_conn_time; /* pc connection time. */
// 最近一次接收到信息的时间
mstime_t pc_last_activity; /* Last time we received any message. */
// 实例最近一次返回正确PING命令回复的时间
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
// 实例最近一次发送PING命令的时间(没有接收到PONG),当收到PONG时,被设置为0.
// 如果act_ping_time为0,且一个PING被发送,则设置act_ping_time为当前时间
mstime_t act_ping_time; /* Time at which the last pending ping (no pong
received after it) was sent. This field is
set to 0 when a pong is received, and set again
to the current time if the value is 0 and a new
ping is sent. */
// 最近一次发送PING的时间,用于避免在故障时发送太多的PING命令,使用act_ping_time字段来计算空转时间
mstime_t last_ping_time; /* Time at which we sent the last ping. This is
only used to avoid sending too many pings
during failure. Idle time is computed using
the act_ping_time field. */
// 最近一次实例回复PING命令的时间,无论回复是什么。被用来检查是否连接空转和重新建立连接
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
// 当连接中断后最近一次尝试重连的时间
mstime_t last_reconn_time; /* Last reconnection attempt performed when
the link was down. */
} instanceLink;
typedef struct sentinelRedisInstance {
// 标识值,记录了当前Redis实例的类型和状态
int flags; /* See SRI_... defines */
// 实例的名字
// 主节点的名字由用户在配置文件中设置
// 从节点以及Sentinel节点的名字由Sentinel自动设置,格式为:ip:port
char *name; /* Master name from the point of view of this sentinel. */
// 实例运行的独一无二ID
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
// 配置纪元,用于实现故障转移
uint64_t config_epoch; /* Configuration epoch. */
// 实例地址:ip和port
sentinelAddr *addr; /* Master host. */
// 实例的连接,有可能是被Sentinel共享的
instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
// 最近一次通过 Pub/Sub 发送信息的时间
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
// 只有被Sentinel实例使用
// 最近一次接收到从Sentinel发送来hello的时间
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
// 最近一次回复SENTINEL is-master-down的时间
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
// 实例被判断为主观下线的时间
mstime_t s_down_since_time; /* Subjectively down since time. */
// 实例被判断为客观下线的时间
mstime_t o_down_since_time; /* Objectively down since time. */
// 实例无响应多少毫秒之后被判断为主观下线
// 由SENTINEL down-after-millisenconds配置设定
mstime_t down_after_period; /* Consider it down after that period. */
// 从实例获取INFO命令回复的时间
mstime_t info_refresh; /* Time at which we received INFO output from it. */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
// 实例的角色
int role_reported;
// 角色更新的时间
mstime_t role_reported_time;
// 最近一次从节点的主节点地址变更的时间
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master specific. */
/*----------------------------------主节点特有的属性----------------------------------*/
// 其他监控相同主节点的Sentinel
dict *sentinels; /* Other sentinels monitoring the same master. */
// 如果当前实例是主节点,那么slaves保存着该主节点的所有从节点实例
// 键是从节点命令,值是从节点服务器对应的sentinelRedisInstance
dict *slaves; /* Slaves for this master instance. */
// 判定该主节点客观下线的投票数
// 由SENTINEL monitor <master-name> <ip> <port> <quorum>配置
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
// 在故障转移时,可以同时对新的主节点进行同步的从节点数量
// 由sentinel parallel-syncs <master-name> <number>配置
int parallel_syncs; /* How many slaves to reconfigure at same time. */
// 连接主节点和从节点的认证密码
char *auth_pass; /* Password to use for AUTH against master & slaves. */
/* Slave specific. */
/*----------------------------------从节点特有的属性----------------------------------*/
// 从节点复制操作断开时间
mstime_t master_link_down_time; /* Slave replication link down time. */
// 按照INFO命令输出的从节点优先级
int slave_priority; /* Slave priority according to its INFO output. */
// 故障转移时,从节点发送SLAVEOF <new>命令的时间
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
// 如果当前实例是从节点,那么保存该从节点连接的主节点实例
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
// INFO命令的回复中记录的主节点的IP
char *slave_master_host; /* Master host as reported by INFO */
// INFO命令的回复中记录的主节点的port
int slave_master_port; /* Master port as reported by INFO */
// INFO命令的回复中记录的主从服务器连接的状态
int slave_master_link_status; /* Master link status as reported by INFO */
// 从节点复制偏移量
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
/*----------------------------------故障转移的属性----------------------------------*/
// 如果这是一个主节点实例,那么leader保存的是执行故障转移的Sentinel的runid
// 如果这是一个Sentinel实例,那么leader保存的是当前这个Sentinel实例选举出来的领头的runid
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
// leader字段的纪元
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
// 当前执行故障转移的纪元
uint64_t failover_epoch; /* Epoch of the currently started failover. */
// 故障转移操作的状态
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
// 故障转移操作状态改变的时间
mstime_t failover_state_change_time;
// 最近一次故障转移尝试开始的时间
mstime_t failover_start_time; /* Last failover attempt start time. */
// 更新故障转移状态的最大超时时间
mstime_t failover_timeout; /* Max time to refresh failover state. */
// 记录故障转移延迟的时间
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
// 晋升为新主节点的从节点实例
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
// 通知admin的可执行脚本的地址,如果设置为空,则没有执行的脚本
char *notification_script;
// 通知配置的client的可执行脚本的地址,如果设置为空,则没有执行的脚本
char *client_reconfig_script;
// 缓存INFO命令的输出
sds info; /* cached INFO output */
} sentinelRedisInstance;
/* Main state. */
struct sentinelState {
// Sentinel的id,41字节长的字符串
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
// 当前纪元
uint64_t current_epoch; /* Current epoch. */
// 监控的主节点字典
// 键是主节点实例的名字
// 值是主节点一个指向 sentinelRedisInstance 结构的指针
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
// 是否在TILT模式,该模式只收集数据,不做故障切换fail-over
int tilt; /* Are we in TILT mode? */
// 当前正在执行的脚本的数量
int running_scripts; /* Number of scripts in execution right now. */
// TILT模式开始的时间
mstime_t tilt_start_time; /* When TITL started. */
// 最后一次执行时间处理程序的时间
mstime_t previous_time; /* Last time we ran the time handler. */
// 要执行用户脚本的队列
list *scripts_queue; /* Queue of user scripts to execute. */
// 多个 Sentinel 进程(progress)之间使用流言协议(gossip protocols)来接收关于主服务器是否下线的信息, 并使用投票协议(agreement protocols)来决定是否执行自动故障迁移, 以及选择哪个从服务器作为新的主服务器。
// 被流言(gossip)到其他Sentinel的ip地址
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
// 被流言(gossip)到其他Sentinel的port
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
// 故障模拟
unsigned long simfailure_flags; /* Failures simulation. */
} sentinel;
/* A script execution job. */
typedef struct sentinelScriptJob {
// 脚本的标志,记录脚本执行的限制
int flags; /* Script job flags: SENTINEL_SCRIPT_* */
// 脚本执行的次数
int retry_num; /* Number of times we tried to execute it. */
// 调用脚本的参数
char **argv; /* Arguments to call the script. */
// 执行脚本的时间
mstime_t start_time; /* Script execution time if the script is running,
otherwise 0 if we are allowed to retry the
execution at any time. If the script is not
running and it's not 0, it means: do not run
before the specified time. */
// 脚本执行子进程的pid
pid_t pid; /* Script execution pid. */
} sentinelScriptJob;
/* ======================= hiredis ae.c adapters =============================
* Note: this implementation is taken from hiredis/adapters/ae.h, however
* we have our modified copy for Sentinel in order to use our allocator
* and to have full control over how the adapter works. */
// hiredis客户端的适配器结构
typedef struct redisAeEvents {
redisAsyncContext *context; //客户端连接的上下文
aeEventLoop *loop; //服务器的事件循环
int fd; //文件描述符
int reading, writing; //读写事件是否就绪标志
} redisAeEvents;
// 读事件处理程序
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleRead(e->context);
}
// 写事件处理程序
static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleWrite(e->context);
}
// 在事件循环中创建一个文件事件,并设置可读事件的处理程序为redisAeReadEvent
static void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->reading) {
e->reading = 1;
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
}
}
// 从事件循环中取消监听一个文件事件的可读事件
static void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->reading) {
e->reading = 0;
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
}
}
// 在事件循环中创建一个文件事件,并设置可写事件的处理程序为redisAeReadEvent
static void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}
// 从事件循环中取消监听一个文件事件的可写事件
static void redisAeDelWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->writing) {
e->writing = 0;
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
}
}
// 从事件循环中取消监听一个文件事件的所有事件
static void redisAeCleanup(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
redisAeDelRead(privdata);
redisAeDelWrite(privdata);
zfree(e);
}
// 为上下文ac和事件循环loop创建hiredis客户端的适配器
static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisAeEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return C_ERR;
/* Create container for context and r/w events */
// 创建一个适配器,并初始化
e = (redisAeEvents*)zmalloc(sizeof(*e));
e->context = ac;
e->loop = loop;
e->fd = c->fd;
e->reading = e->writing = 0;
/* Register functions to start/stop listening for events */
// 设置异步调用的函数
ac->ev.addRead = redisAeAddRead;
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;
return C_OK;
}
/* ============================= Prototypes ================================= */
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
sentinelRedisInstance *sentinelGetMasterByName(char *name);
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
int yesnotoi(char *s);
void instanceLinkConnectionError(const redisAsyncContext *c);
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
void sentinelAbortFailover(sentinelRedisInstance *ri);
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
void sentinelScheduleScriptExecution(char *path, ...);
void sentinelStartFailover(sentinelRedisInstance *master);
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
void sentinelFlushConfig(void);
void sentinelGenerateInitialMonitorEvents(void);
int sentinelSendPing(sentinelRedisInstance *ri);
int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
void sentinelSimFailureCrash(void);
/* ========================= Dictionary types =============================== */
unsigned int dictSdsHash(const void *key);
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
void dictInstancesValDestructor (void *privdata, void *obj) {
UNUSED(privdata);
releaseSentinelRedisInstance(obj);
}
/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
*
* also used for: sentinelRedisInstance->sentinels dictionary that maps
* sentinels ip:port to last seen time in Pub/Sub hello message. */
// 实例的命令(sds) 映射 实例(sentinelRedisInstance结构指针)
// sentinelRedisInstance的sentinels字典,字典的键是Sentinel的ip:port,字典的值是Sentinel最后一次向频道发送信息的时间
dictType instancesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictInstancesValDestructor /* val destructor */
};
/* Instance runid (sds) -> votes (long casted to void*)
*
* This is useful into sentinelGetObjectiveLeader() function in order to
* count the votes and understand who is the leader. */
// 实例的runid(sds) 映射 votes(一个整型的投票数,强制转换为void*类型)
// 用于sentinelGetObjectiveLeader() 函数去计算投票数和了解哪个Sentinel是领导者
dictType leaderVotesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
/* =========================== Initialization =============================== */
void sentinelCommand(client *c);
void sentinelInfoCommand(client *c);
void sentinelSetCommand(client *c);
void sentinelPublishCommand(client *c);
void sentinelRoleCommand(client *c);
// Sentinel的命令表
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};
/* This function overwrites a few normal Redis config default with Sentinel
* specific defaults. */
// 设置Sentinel的默认端口,覆盖服务器的默认属性
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT;
}
/* Perform the Sentinel mode initialization. */
// 执行Sentinel模式的初始化操作
void initSentinel(void) {
unsigned int j;
/* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */
// 将服务器的命令表清空
dictEmpty(server.commands,NULL);
// 只添加Sentinel模式的相关命令
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
}
/* Initialize various data structures. */
// 初始化各种Sentinel状态的数据结构
// 当前纪元,用于实现故障转移操作
sentinel.current_epoch = 0;
// 监控的主节点信息的字典
sentinel.masters = dictCreate(&instancesDictType,NULL);
// TILT模式
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
// 最后执行时间处理程序的时间
sentinel.previous_time = mstime();
// 正在执行的脚本数量
sentinel.running_scripts = 0;
// 用户脚本的队列
sentinel.scripts_queue = listCreate();
// Sentinel通过流言协议接收关于主服务器的ip和port
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
// 故障模拟
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
// Sentinel的ID置为0
memset(sentinel.myid,0,sizeof(sentinel.myid));
}
/* This function gets called when the server is in Sentinel mode, started,
* loaded the configuration, and is ready for normal operations. */
// 这个函数在服务器以Sentinel模式启动时被调用,载入配置,并且准备好一些操作
void sentinelIsRunning(void) {
int j;
// 配置文件路径为空,退出程序
if (server.configfile == NULL) {
serverLog(LL_WARNING,
"Sentinel started without a config file. Exiting...");
exit(1);
// 检查配置文件的权限是否可写,否则退出程序
} else if (access(server.configfile,W_OK) == -1) {
serverLog(LL_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile,strerror(errno));
exit(1);
}
/* If this Sentinel has yet no ID set in the configuration file, we
* pick a random one and persist the config on disk. From now on this
* will be this Sentinel ID across restarts. */
// 如果Sentinel在配置文件中没有添加ID,则随机生成一个保存到文件中
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;
// 如果没有ID
if (j == CONFIG_RUN_ID_SIZE) {
/* Pick ID and presist the config. */
// 通过SHA1算法随机生成一个ID
getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
// 保存到配置文件中
sentinelFlushConfig();
}
/* Log its ID to make debugging of issues simpler. */
serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
/* We want to generate a +monitor event for every configured master
* at startup. */
// Sentinel启动时,生成一个"+monitor"事件
sentinelGenerateInitialMonitorEvents();
}
/* ============================== sentinelAddr ============================== */
/* Create a sentinelAddr object and return it on success.
* On error NULL is returned and errno is set to:
* ENOENT: Can't resolve the hostname.
* EINVAL: Invalid port number.
*/
// 返回一个的sentinelAddr对象。如果失败则返回NULL,且设置errno
// ENOENT:不能解析到主机地址
// EINVAL:port值非法
sentinelAddr *createSentinelAddr(char *hostname, int port) {
char ip[NET_IP_STR_LEN];
sentinelAddr *sa;
// 检查port
if (port < 0 || port > 65535) {
errno = EINVAL;
return NULL;
}
// 解析hostname,保存地址到ip中
if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
errno = ENOENT;
return NULL;
}
// 分配一个sentinelAddr对象,并设置ip和port
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(ip);
sa->port = port;
return sa;
}
/* Return a duplicate of the source address. */
// 返回一个源地址的复制品
sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
sentinelAddr *sa;
// 重新创建一个sentinelAddr对象
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(src->ip);
sa->port = src->port;
return sa;
}
/* Free a Sentinel address. Can't fail. */
// 释放Sentinel地址
void releaseSentinelAddr(sentinelAddr *sa) {
sdsfree(sa->ip);
zfree(sa);
}
/* Return non-zero if two addresses are equal. */
// 比较两个地址是否相同。相同返回1,不同返回0
int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
return a->port == b->port && !strcasecmp(a->ip,b->ip);
}
/* =========================== Events notification ========================== */
/* Send an event to log, pub/sub, user notification script.
*
* 'level' is the log level for logging. Only LL_WARNING events will trigger
* the execution of the user notification script.
*
* 'type' is the message type, also used as a pub/sub channel name.
*
* 'ri', is the redis instance target of this event if applicable, and is
* used to obtain the path of the notification script to execute.
*
* The remaining arguments are printf-alike.
* If the format specifier starts with the two characters "%@" then ri is
* not NULL, and the message is prefixed with an instance identifier in the
* following format:
*
* <instance type> <instance name> <ip> <port>
*
* If the instance type is not master, than the additional string is
* added to specify the originating master:
*
* @ <master name> <master ip> <master port>
*
* Any other specifier after "%@" is processed by printf itself.
*/
/*=========================== 事件通知 ==========================*/
/*
发送一个事件到日志,订阅的频道,用户的通知脚本中
level 是日志的级别,只有LL_WARNING级别的日志才会触发用户通知脚本的执行
type 是消息类型,也被用来作为频道的名字
ri 是引发事件的Redis实例,被用来执行通知脚本的路径
剩下的参数和printf的格式化输出格式类似
如果格式是以 "%@" 两个字符开头的字符,并且 'ri' 不为空,那么这个消息将以一下字符串为前缀
<instance type> <instance name> <ip> <port>
如果实例类型不是主节点,那么一下信息会被添加到后面
@ <master name> <master ip> <master port>
"%@"之后的任何其他的说明符都和printf的一样
*/
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
const char *fmt, ...) {
va_list ap;
char msg[LOG_MAX_LEN];
robj *channel, *payload;
/* Handle %@ */
// 先处理"%@"说明符
if (fmt[0] == '%' && fmt[1] == '@') {
// 如果ri是一个主节点实例,那么master=NULL
// 否则ri是一个从节点实例或Sentinel实例,那么master被设置为主节点结构的指针
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
NULL : ri->master;
// 如果ri不是主节点实例,那么会打印主节点的属性
if (master) {
snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
// ri的类型,slave 或者 Sentinel
sentinelRedisInstanceTypeStr(ri),
// ri的名字,ip,port
ri->name, ri->addr->ip, ri->addr->port,
// ri的主节点的命令,ip,port
master->name, master->addr->ip, master->addr->port);
// ri是主节点实例
} else {
snprintf(msg, sizeof(msg), "%s %s %s %d",
// ri的类型:master
sentinelRedisInstanceTypeStr(ri),
// ri的名字,ip,port
ri->name, ri->addr->ip, ri->addr->port);
}
fmt += 2;
} else {
msg[0] = '\0';
}
/* Use vsprintf for the rest of the formatting if any. */
// 将其余的信息打印到msg中
if (fmt[0] != '\0') {
va_start(ap, fmt);
vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
va_end(ap);
}
/* Log the message if the log level allows it to be logged. */
// 日志级别高于设置的日志级别限制,则打印日志
if (level >= server.verbosity)
serverLog(level,"%s %s",type,msg);
/* Publish the message via Pub/Sub if it's not a debugging one. */
// 如果不是DEBUG级别的日志,那么发送msg到type指定的频道中
if (level != LL_DEBUG) {
channel = createStringObject(type,strlen(type));
payload = createStringObject(msg,strlen(msg));
pubsubPublishMessage(channel,payload);
decrRefCount(channel);
decrRefCount(payload);
}
/* Call the notification script if applicable. */
// 如果可以,则要执行通知脚本
if (level == LL_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
// 执行主节点的通知脚本
if (master && master->notification_script) {
sentinelScheduleScriptExecution(master->notification_script,
type,msg,NULL);
}
}
}
/* This function is called only at startup and is used to generate a
* +monitor event for every configured master. The same events are also
* generated when a master to monitor is added at runtime via the
* SENTINEL MONITOR command. */
// 该函数只在服务器启动的时候被调用,被用来形成一个"+monitor"事件。
void sentinelGenerateInitialMonitorEvents(void) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(sentinel.masters);
// 遍历Sentinel监视所有的主节点
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}
/* ============================ script execution ============================ */
/* Release a script job structure and all the associated data. */
// 释放一个脚本任务结构和所有关联的数据
void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
int j = 0;
// 释放脚本的参数
while(sj->argv[j]) sdsfree(sj->argv[j++]);
zfree(sj->argv);
zfree(sj);
}
#define SENTINEL_SCRIPT_MAX_ARGS 16
// 将给定参数和脚本放入用户脚本队列中
void sentinelScheduleScriptExecution(char *path, ...) {
va_list ap;
char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
int argc = 1;
sentinelScriptJob *sj;
va_start(ap, path);
// 将参数保存到argv中
while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
argv[argc] = va_arg(ap,char*);
if (!argv[argc]) break;
argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
argc++;
}
va_end(ap);
// 第一个参数是脚本的路径
argv[0] = sdsnew(path);
// 分配脚本任务结构的空间
sj = zmalloc(sizeof(*sj));
sj->flags = SENTINEL_SCRIPT_NONE; //脚本限制
sj->retry_num = 0; //执行次数
sj->argv = zmalloc(sizeof(char*)*(argc+1)); //参数列表
sj->start_time = 0; //开始时间
sj->pid = 0; //执行脚本子进程的pid
// 设置脚本的参数列表
memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
// 添加到脚本队列中
listAddNodeTail(sentinel.scripts_queue,sj);
/* Remove the oldest non running script if we already hit the limit. */
// 如果队列长度大于256个,那么删除最旧的脚本,只保留255个
if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
listNode *ln;
listIter li;
listRewind(sentinel.scripts_queue,&li);
// 遍历脚本链表队列
while ((ln = listNext(&li)) != NULL) {
sj = ln->value;
// 跳过正在执行的脚本
if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
/* The first node is the oldest as we add on tail. */
// 删除最旧的脚本
listDelNode(sentinel.scripts_queue,ln);
// 释放一个脚本任务结构和所有关联的数据
sentinelReleaseScriptJob(sj);
break;
}
serverAssert(listLength(sentinel.scripts_queue) <=
SENTINEL_SCRIPT_MAX_QUEUE);
}
}
/* Lookup a script in the scripts queue via pid, and returns the list node
* (so that we can easily remove it from the queue if needed). */
// 根据pid查找并返回正在运行的脚本节点
listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
listNode *ln;
listIter li;
listRewind(sentinel.scripts_queue,&li);
// 遍历脚本链表队列
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;
// 如果正在执行该脚本,且等于pid,则返回节点地址
if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
return ln;
}
return NULL;
}
/* Run pending scripts if we are not already at max number of running
* scripts. */
// 执行在队列中等待的脚本,如果没有超过同一时刻最多运行脚本的数量
void sentinelRunPendingScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();
/* Find jobs that are not running and run them, from the top to the
* tail of the queue, so we run older jobs first. */
listRewind(sentinel.scripts_queue,&li);
// 遍历脚本链表队列,如果没有超过同一时刻最多运行脚本的数量,找到没有正在运行的脚本
while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
(ln = listNext(&li)) != NULL)
{
sentinelScriptJob *sj = ln->value;
pid_t pid;
/* Skip if already running. */
// 跳过正在运行的脚本
if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
/* Skip if it's a retry, but not enough time has elapsed. */
// 该脚本没有到达重新执行的时间,跳过
if (sj->start_time && sj->start_time > now) continue;
// 设置正在执行标志
sj->flags |= SENTINEL_SCRIPT_RUNNING;
// 开始执行时间
sj->start_time = mstime();
// 执行次数加1
sj->retry_num++;
// 创建子进程执行
pid = fork();
// fork()失败,报告错误
if (pid == -1) {
/* Parent (fork error).
* We report fork errors as signal 99, in order to unify the
* reporting with other kind of errors. */
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], 99, 0);
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
// 子进程执行的代码
} else if (pid == 0) {
/* Child */
// 执行该脚本
execve(sj->argv[0],sj->argv,environ);
/* If we are here an error occurred. */
// 如果执行_exit(2),表示发生了错误,不能重新执行
_exit(2); /* Don't retry execution. */
// 父进程,更新脚本的pid,和同时执行脚本的个数
} else {
sentinel.running_scripts++;
sj->pid = pid;
// 并且通知事件
sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
}
}
}