-
Notifications
You must be signed in to change notification settings - Fork 264
/
Copy pathreplication.c
3022 lines (2791 loc) · 137 KB
/
replication.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Asynchronous replication implementation.
* 异步复制
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/stat.h>
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd);
void replicationSendAck(void);
void putSlaveOnline(client *slave);
int cancelReplicationHandshake(void);
/* --------------------------- Utility functions ---------------------------- */
/* Return the pointer to a string representing the slave ip:listening_port
* pair. Mostly useful for logging, since we want to log a slave using its
* IP address and its listening port which is more clear for the user, for
* example: "Closing connection with slave 10.1.2.3:6380". */
// 返回一个字符串指针,指向从节点的ip:listening_port。对日志记录非常有用,因为我们想使用其IP地址和其监听端口来记录从节点,这对用户来说更为清晰。例如:"Closing connection with slave 10.1.2.3:6380"
char *replicationGetSlaveName(client *c) {
static char buf[NET_PEER_ID_LEN];
char ip[NET_IP_STR_LEN];
ip[0] = '\0';
buf[0] = '\0';
// 如果slave_ip保存有从节点的IP,则不执行anetPeerToString
// 否则,获取连接client的IP和端口号,这里只获取IP保存在ip数组中
if (c->slave_ip[0] != '\0' ||
anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1)
{
/* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
// 如果slave_ip保存有IP,则将从节点的IP拷贝到ip数组中
if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
// 如果slave_listening_port保存有从节点的端口号,则将ip和port以"[%s]:%d"的格式写入buf
if (c->slave_listening_port)
anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
else
snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
} else {
snprintf(buf,sizeof(buf),"client id #%llu",
(unsigned long long) c->id);
}
return buf;
}
/* ---------------------------------- MASTER -------------------------------- */
// 创建复制操作的积压缓冲区Backlog
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
// 复制操作的积压缓冲区分配空间,默认为1M大小
server.repl_backlog = zmalloc(server.repl_backlog_size);
// 复制积压缓冲区backlog中实际的数据长度为0
server.repl_backlog_histlen = 0;
// 复制积压缓冲区backlog当前的偏移量,下次写操作的下标为0
server.repl_backlog_idx = 0;
/* When a new backlog buffer is created, we increment the replication
* offset by one to make sure we'll not be able to PSYNC with any
* previous slave. This is needed because we avoid incrementing the
* master_repl_offset if no backlog exists nor slaves are attached. */
// 当新创建一个backlog时,我们将master_repl_offset加1,确保之前使用过backlog的从节点不进行错误的PSYNC操作
// 如果backlog既不存在数据,也没有从节点服务器连接,我们会避免增加master_repl_offset
server.master_repl_offset++;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
// repl_backlog_off记录的是积压缓冲区针对复制的最近一部分备份的偏移量
// 我们的缓冲区中没有任何数据,但实际上backlog的第一个字节的逻辑位置是master_repl_offset的下一个字节
server.repl_backlog_off = server.master_repl_offset+1;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
* server.repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one (possibly less data, but
* the most recent bytes, or the same data and more free space in case the
* buffer is enlarged). */
// 动态调整backlog的大小。
// 如果backlog是被扩大时,原有的数据会被保留
void resizeReplicationBacklog(long long newsize) {
// 调整的大小不能小于最小的16k
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
// 大小相等,直接返回
if (server.repl_backlog_size == newsize) return;
// 设置新的大小
server.repl_backlog_size = newsize;
if (server.repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally.
* The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the
* old buffer. */
// 释放原来的backlog
zfree(server.repl_backlog);
// 重新分配newsize大小的空间
server.repl_backlog = zmalloc(server.repl_backlog_size);
// 初始化数据长度和下标
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* Next byte we have is... the next since the buffer is empty. */
// 设置复制的偏移量 = 全局复制偏移量 + 1
server.repl_backlog_off = server.master_repl_offset+1;
}
}
// 释放复制积压缓冲区
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the buffer. */
// 添加数据到backlog中,并且会根据增量更新server.master_repl_offset的偏移量
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
// 更新全局复制的偏移量
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
// 环形的缓冲区,每次迭代时尽可能的写更多的数据。如果写到尾部要将下标idx重置
while(len) {
// 计算环形缓冲区还有多少空间
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
// 如果空间足够,设置thislen写的长度为len
if (thislen > len) thislen = len;
// 空间不足够或着刚刚好,那么只写入剩余的空间数,等待下次循环时写入
// 将数据拷贝到复制积压缓冲区中
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
// 更新下次写的下标
server.repl_backlog_idx += thislen;
// 如果idx已经到达缓冲区的尾部,那么重置它
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
// 更新未写入的数据长度
len -= thislen;
// 更新未写入数据的地址
p += thislen;
// 更新实际数据的长度
server.repl_backlog_histlen += thislen;
}
// 实际数据的长度最大只能为复制缓冲区的大小,因为之后环形写入时会覆盖开头位置的数据
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
// 设置backlog所备份已复制的数据的偏移量,用于处理复制时的断线
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input. */
// 使用字符串大小来封装feedReplicationBacklog()作为输入
void feedReplicationBacklogWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
// 整型编码转为字符串
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
// 写入backlog中
feedReplicationBacklog(p,len);
}
// 将参数列表中的参数发送给从服务器
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
// 如果没有backlog且没有从节点服务器,直接返回
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Send SELECT command to every slave if needed. */
// 如果当前从节点使用的数据库不是目标的数据库,则要生成一个select命令
if (server.slaveseldb != dictid) {
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
// 0 <= id < 10 ,可以使用共享的select命令对象
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
// 否则自行按照协议格式构建select命令对象
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
/* Add the SELECT command into the backlog. */
// 将select 命令添加到backlog中
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves. */
// 发送给从服务器
listRewind(slaves,&li);
// 遍历所有的从服务器节点
while((ln = listNext(&li))) {
client *slave = ln->value;
// 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// 添加select命令到当前从节点的回复中
addReply(slave,selectcmd);
}
// 释放临时对象
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
// 设置当前从节点使用的数据库ID
server.slaveseldb = dictid;
/* Write the command to the replication backlog if any. */
// 将命令写到backlog中
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
// 将参数个数构建成协议标准的字符串
// *<argc>\r\n
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
// 添加到backlog中
feedReplicationBacklog(aux,len+3);
// 遍历所有的参数
for (j = 0; j < argc; j++) {
// 返回参数对象的长度
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
// 构建成协议标准的字符串,并添加到backlog中
// $<len>\r\n<argv>\r\n
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
// 添加$<len>\r\n
feedReplicationBacklog(aux,len+3);
// 添加参数对象<argv>
feedReplicationBacklogWithObject(argv[j]);
// 添加\r\n
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
// 将命令写到每一个从节点中
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
// 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
// 将命令写给正在等待初次SYNC的从节点(所以这些命令在输出缓冲区中排队,直到初始SYNC完成),或已经与主节点同步
/* Add the multi bulk length. */
// 添加回复的长度
addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
// 将所有的参数列表添加到从节点的输出缓冲区
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
// 将参数列表中的参数发送给监控器
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
// 获取当前时间
gettimeofday(&tv,NULL);
// 将时间保存在cmdrepr中
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
// 根据client不同的状态,将不同信息追加到cmdrepr中
if (c->flags & CLIENT_LUA) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
}
// 遍历所有的参数,将参数添加到cmdrepr中
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == OBJ_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
// 将cmdrepr构建成字符串对象
cmdobj = createObject(OBJ_STRING,cmdrepr);
listRewind(monitors,&li);
// 遍历监控器链表
while((ln = listNext(&li))) {
client *monitor = ln->value;
// 将命令对象添加到当前监控器的回复中
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}
/* Feed the slave 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
// 将backlog所备份已复制数据的一部分,按照指定的offset发送给client
// 返回写入client的数据长度
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;
serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
// backlog中没有数据,返回0
if (server.repl_backlog_histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
server.repl_backlog_off);
serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
server.repl_backlog_histlen);
serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
server.repl_backlog_idx);
/* Compute the amount of bytes we need to discard. */
// 计算需要跳过的数据长度
skip = offset - server.repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actaully our
* server.repl_backlog_off byte. */
// 继续写入数据的起始下标
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */
// 根据offset要跳过一些数据
j = (j + skip) % server.repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */
// 计算要写入数据的长度
len = server.repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
// 计算backlog最多写入的长度
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
// 写入到client的输出缓冲区中
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
// 返回写入的长度
return server.repl_backlog_histlen - skip;
}
/* Return the offset to provide as reply to the PSYNC command received
* from the slave. The returned value is only valid immediately after
* the BGSAVE process started and before executing any other command
* from clients. */
// 返回一个偏移量,作为从从节点接受到的 PSYNC 命令的回复。返回的值只有在BGSAVE执行之后和其他client命令执行之前才有效
long long getPsyncInitialOffset(void) {
// 获取全局的复制偏移量
long long psync_offset = server.master_repl_offset;
/* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we'll increment the offset by one. */
// 如果backlog不存在,将psync_offset加1
if (server.repl_backlog == NULL) psync_offset++;
return psync_offset;
}
/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
* as a side effect setup the slave for a full sync in different ways:
*
* 1) Remember, into the slave client structure, the offset we sent
* here, so that if new slaves will later attach to the same
* background RDB saving process (by duplicating this client output
* buffer), we can get the right offset from this slave.
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
* we start accumulating differences from this point.
* 3) Force the replication stream to re-emit a SELECT statement so
* the new slave incremental differences will start selecting the
* right database number.
*
* Normally this function should be called immediately after a successful
* BGSAVE for replication was started, or when there is one already in
* progress that we attached our slave to. */
// 在完全重新同步的特定情况下发送FULLRESYNC回复,以以下不同的方式设置从服务器进行完全同步:
/*
1.我们在这里发送的偏移量到从节点的客户端结构中,以便如果新的从节点随后将附加到相同的BGSAVE进程(通过复制此客户端输出缓冲区),我们可以从该从节点获得正确的偏移量
2.将从节点的复制状态设置为WAIT_BGSAVE_END,以便从此开始积累差异
3.强制复制流重新发出SELECT命令,以便新的从节点增量差异可以选择正确的数据库编号
*/
// 通常在启动成功的BGSAVE进行复制后立即调用此函数,或者当我们的从节点已经有一个在执行BGSAVE
// 设置全量重同步从节点的状态
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
// 设置全量重同步的偏移量
slave->psync_initial_offset = offset;
// 设置从节点复制状态,开始累计差异数据
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SLEECT statement in the replication stream. */
// 将slaveseldb设置为-1,是为了强制发送一个select命令在复制流中
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
// 如果从节点的状态是CLIENT_PRE_PSYNC,则表示是Redis是2.8之前的版本,则不将这些信息发送给从节点。
// 因为在2.8之前只支持SYNC的全量复制同步,而在之后的版本提供了部分的重同步
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,offset);
// 否则会将全量复制的信息写给从节点
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return C_OK, otherwise C_ERR is returned and we proceed
* with the usual full resync. */
// 该函数从主节点接收到部分重新同步请求的角度处理PSYNC命令
// 成功返回C_OK,否则返回C_ERR
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr; //主节点的运行ID
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and
* there is no way to continue. */
// 主节点的运行ID是否和从节点执行PSYNC的参数提供的运行ID相同。
// 如果运行ID发生了改变,则主节点是一个不同的实例,那么就不能进行继续执行原有的复制进程
if (strcasecmp(master_runid, server.runid)) {
/* Run id "?" is used by slaves that want to force a full resync. */
// 如果从节点的运行ID是"?",表示想要强制进行一个全量同步
if (master_runid[0] != '?') {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
} else {
serverLog(LL_NOTICE,"Full resync requested by slave %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
/* We still have the data our slave is asking for? */
// 从参数对象中获取psync_offset
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;
// 如果psync_offset小于repl_backlog_off,说明backlog所备份的数据的已经太新了,有一些数据被覆盖,则需要进行全量复制
// 如果psync_offset大于(server.repl_backlog_off + server.repl_backlog_histlen),表示当前backlog的数据不够全,则需要进行全量复制
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
// 执行到这里,则可以进行部分重同步
// 1. 设置client状态为从节点
// 2. 向从节点发送 +CONTINUE 表示接受 partial resync 被接受
// 3. 发送backlog的数据给从节点
// 设置client状态为从节点
c->flags |= CLIENT_SLAVE;
// 设置复制状态为在线,此时RDB文件传输完成,发送差异数据
c->replstate = SLAVE_STATE_ONLINE;
// 设置从节点收到ack的时间
c->repl_ack_time = server.unixtime;
// slave向master发送ack标志设置为0
c->repl_put_online_on_ack = 0;
// 将当前client加入到从节点链表中
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* empty so this write will never fail actually. */
// 向从节点发送 +CONTINUE
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}
// 将backlog的数据发送从节点
psync_len = addReplyReplicationBacklog(c,psync_offset);
serverLog(LL_NOTICE,
"Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
replicationGetSlaveName(c),
psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
// 计算延迟值小于min-slaves-max-lag的从节点的个数
refreshGoodSlavesCount();
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
/* We need a full resync for some reason... Note that we can't
* reply to PSYNC right now if a full SYNC is needed. The reply
* must include the master offset at the time the RDB file we transfer
* is generated, so we need to delay the reply to that moment. */
return C_ERR;
}
/* Start a BGSAVE for replication goals, which is, selecting the disk or
* socket target depending on the configuration, and making sure that
* the script cache is flushed before to start.
*
* The mincapa argument is the bitwise AND among all the slaves capabilities
* of the slaves waiting for this BGSAVE, so represents the slave capabilities
* all the slaves support. Can be tested via SLAVE_CAPA_* macros.
*
* Side effects, other than starting a BGSAVE:
*
* 1) Handle the slaves in WAIT_START state, by preparing them for a full
* sync if the BGSAVE was succesfully started, or sending them an error
* and dropping them from the list of slaves.
*
* 2) Flush the Lua scripting script cache if the BGSAVE was actually
* started.
*
* Returns C_OK on success or C_ERR otherwise. */
// 开始为复制执行BGSAVE,根据配置选择磁盘或套接字作为RDB发送的目标,在开始之前确保冲洗脚本缓存
// mincapa参数是SLAVE_CAPA_*按位与的结果
int startBgsaveForReplication(int mincapa) {
int retval;
// 是否直接写到socket
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "slaves sockets" : "disk");
if (socket_target)
// 直接写到socket中
// fork一个子进程将rdb写到 状态为等待BGSAVE开始 的从节点的socket中
retval = rdbSaveToSlavesSockets();
else
// 否则后台进行RDB持久化BGSAVE操作,保存到磁盘上
retval = rdbSaveBackground(server.rdb_filename);
/* If we failed to BGSAVE, remove the slaves waiting for a full
* resynchorinization from the list of salves, inform them with
* an error about what happened, close the connection ASAP. */
// BGSAVE执行错误,将等待全量同步的从节点从从节点链表中删除,打印发生错误,立即关闭连接
if (retval == C_ERR) {
serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = ln->value;
// 将等待全量同步的从节点从从节点链表中删除
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves,ln);
addReplyError(slave,
"BGSAVE failed, replication can't continue");
// 立即关闭client的连接
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
}
/* If the target is socket, rdbSaveToSlavesSockets() already setup
* the salves for a full resync. Otherwise for disk target do it now.*/
// 如果是直接写到socket中,rdbSaveToSlavesSockets()已经会设置从节点为全量复制
// 否则直接写到磁盘上,执行以下代码
if (!socket_target) {
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = ln->value;
// 设置等待全量同步的从节点的状态
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
// 设置要执行全量重同步从节点的状态
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}
/* Flush the script cache, since we need that slave differences are
* accumulated without requiring slaves to match our cached scripts. */
// 刷新脚本的缓存
if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}
/* SYNC and PSYNC command implemenation. */
// SYNC and PSYNC 命令实现
void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
// 如果client是从节点,那么忽略同步命令
if (c->flags & CLIENT_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
// 如果服务器是从节点,但是状态未处于和主节点连接状态,则发送错误回复,直接返回
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
// 如果指定的client的回复缓冲区中还有数据,则不能执行同步
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
serverLog(LL_NOTICE,"Slave %s asks for synchronization",
replicationGetSlaveName(c));
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <runid> <offset>
*
* So the slave knows the new runid and offset to try a PSYNC later
* if the connection with the master is lost. */
// 尝试执行一个部分同步PSYNC的命令,则masterTryPartialResynchronization()会回复一个 "+FULLRESYNC <runid> <offset>",如果失败则执行全量同步
// 所以,从节点会如果和主节点连接断开,从节点会知道runid和offset,随后会尝试执行PSYNC
// 如果是执行PSYNC命令
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 主节点尝试执行部分重同步,执行成功返回C_OK
if (masterTryPartialResynchronization(c) == C_OK) {
// 可以执行PSYNC命令,则将接受PSYNC命令的个数加1
server.stat_sync_partial_ok++;
// 不需要执行后面的全量同步,直接返回
return; /* No full resync needed, return. */
// 不能执行PSYNC部分重同步,需要进行全量同步
} else {
char *master_runid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* runid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
// 从节点以强制全量同步为目的,所以不能执行部分重同步,因此增加PSYNC命令失败的次数
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
// 执行SYNC命令
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
// 设置标识,执行SYNC命令,不接受REPLCONF ACK
c->flags |= CLIENT_PRE_PSYNC;
}
/* Full resynchronization. */
// 全量重同步次数加1
server.stat_sync_full++;
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
// 设置client状态为:从服务器节点等待BGSAVE节点的开始
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
// 执行SYNC命令后是否关闭TCP_NODELAY
if (server.repl_disable_tcp_nodelay)
// 是的话,则启用nagle算法
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
// 保存主服务器传来的RDB文件的fd,设置为-1
c->repldbfd = -1;
// 设置client状态为从节点,标识client是一个从服务器
c->flags |= CLIENT_SLAVE;
// 添加到服务器从节点链表中
listAddNodeTail(server.slaves,c);
/* CASE 1: BGSAVE is in progress, with disk target. */
// 情况1. 正在执行 BGSAVE ,且是同步到磁盘上
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
client *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
slave = ln->value;
// 如果有从节点已经创建子进程执行写RDB操作,等待完成,那么退出循环
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
// 对于这个从节点,我们检查它是否具有触发当前BGSAVE操作的能力
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
// 将slave的输出缓冲区所有内容拷贝给c的所有输出缓冲区中
copyClientOutputBuffer(c,slave);
// 设置全量重同步从节点的状态,设置部分重同步的偏移量
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences. */
serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
/* CASE 2: BGSAVE is in progress, with socket target. */
// 情况2. 正在执行BGSAVE,且是无盘同步,直接写到socket中
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
// 虽然有子进程在执行写RDB,但是它直接写到socket中,所以等待下次执行BGSAVE
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
// 情况3:没有执行BGSAVE的进程
} else {
// 服务器支持无盘同步
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
// 无盘同步复制的子进程被创建在replicationCron()中,因为想等待更多的从节点可以到来而延迟
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
// 服务器不支持无盘复制
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
// 如果没有正在执行BGSAVE,且没有进行写AOF文件,则开始为复制执行BGSAVE,并且是将RDB文件写到磁盘上
if (server.aof_child_pid == -1) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
// 只有一个从节点,且backlog为空,则创建一个新的backlog
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}
/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication
* process before starting it with the SYNC command.
*
* Currently the only use of this command is to communicate to the master
* what is the listening port of the Slave redis instance, so that the
* master can accurately list slaves and their listening ports in
* the INFO output.
*
* In the future the same command can be used in order to configure
* the replication to initiate an incremental replication instead of a
* full resync. */
// REPLCONF <option> <value> <option> <value> ... 命令实现
// 被从节点使用来去配置复制进程,在SYNC之前
// 唯一使用的是用来让从节点告知主机点它的监听端口,以便主节点可以通过INFO命令来输出
void replconfCommand(client *c) {
int j;
// 检查参数个数是否正确,每一个<option>都要对应<value>
if ((c->argc % 2) == 0) {
/* Number of arguments must be odd to make sure that every
* option has a corresponding value. */
addReply(c,shared.syntaxerr);
return;
}
/* Process every option-value pair. */
// 处理每一个对<option> <value>
for (j = 1; j < c->argc; j+=2) {
// REPLCONF listening-port <port> 命令
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
long port;
// 获取端口号
if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != C_OK))
return;
// 设置从节点监听的端口号
c->slave_listening_port = port;
// REPLCONF ip-address ip
} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
sds ip = c->argv[j+1]->ptr;
// 设置从节点的ip
if (sdslen(ip) < sizeof(c->slave_ip)) {
memcpy(c->slave_ip,ip,sdslen(ip)+1);
} else {
addReplyErrorFormat(c,"REPLCONF ip-address provided by "
"slave instance is too long: %zd bytes", sdslen(ip));
return;
}
// REPLCONF capa eof
} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
/* Ignore capabilities not understood by this master. */
// 设置client的能力值capa,忽略其他的capabilities
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
// REPLCONF ack <offset>
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
* internal only command that normal clients should never use. */
// 从节点使用REPLCONF ACK通知主机到目前为止处理的复制偏移量。 这是一个内部唯一的命令,普通客户端不应该使用它
long long offset;
// client不是从节点,直接返回
if (!(c->flags & CLIENT_SLAVE)) return;
// 获取offset
if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
return;
// 设置从节点通过ack命令接收到的偏移量
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
// 通过ack命令接收到的偏移量所用的时间
c->repl_ack_time = server.unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
* confirms slave is online and ready to get more data). */
// 如果这是一个无盘复制,我们需要在接收到第一个ACK时确实将从节点设置为在线状态
// (这确认从节点在线并准备好获取更多的数据)
// 将从节点设置为在线状态
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
// REPLCONF getack
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
/* REPLCONF GETACK is used in order to request an ACK ASAP
* to the slave. */
// REPLCONF GETACK 被用来取请求一个ACK给从节点
if (server.masterhost && server.master) replicationSendAck();
/* Note: this command does not reply anything! */
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
return;
}
}
addReply(c,shared.ok);
}
/* This function puts a slave in the online state, and should be called just
* after a slave received the RDB file for the initial synchronization, and
* we are finally ready to send the incremental stream of commands.
*
* It does a few things:
*
* 1) Put the slave in ONLINE state (useless when the function is called
* because state is already ONLINE but repl_put_online_on_ack is true).
* 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
* sending it to the slave.
* 3) Update the count of good slaves. */
// 该函数将从节点置于在线状态,并且应该在从节点接收到初始同步的RDB文件之后调用,并且我们终于准备好发送增量命令流
/*
1. 将从节点设置为ONLINE状态,除非当函数被调用时,由于状态已经是ONLINE而是repl_put_online_on_ack为真
2. 确保可写事件已经被重新设置,因为调用SYNC命令不能使用它,所以我们可以累加输出缓冲区而不将其发送到从节点
3. 更新当前状态良好的从节点的个数
*/
void putSlaveOnline(client *slave) {
// 设置从节点的状态为ONLINE
slave->replstate = SLAVE_STATE_ONLINE;
// 不设置从节点的写处理器
slave->repl_put_online_on_ack = 0;
// 设置通过ack命令接收到的偏移量所用的时间
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
// 重新设置文件的可写事件的处理程序为sendReplyToClient
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
// 更新当前状态良好的从节点的个数
refreshGoodSlavesCount();
serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
replicationGetSlaveName(slave));
}