-
Notifications
You must be signed in to change notification settings - Fork 264
/
Copy pathaof.c
1925 lines (1756 loc) · 84.4 KB
/
aof.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include "bio.h"
#include "rio.h"
#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/param.h>
void aofUpdateCurrentSize(void);
void aofClosePipes(void);
/* ----------------------------------------------------------------------------
* AOF rewrite buffer implementation.
*
* The following code implement a simple buffer used in order to accumulate
* changes while the background process is rewriting the AOF file.
*
* We only need to append, but can't just use realloc with a large block
* because 'huge' reallocs are not always handled as one could expect
* (via remapping of pages at OS level) but may involve copying data.
*
* For this reason we use a list of blocks, every block is
* AOF_RW_BUF_BLOCK_SIZE bytes.
* ------------------------------------------------------------------------- */
// AOF 重写缓冲区实现
// 以下代码实现了一个简单的缓冲区,这个缓冲区用来累计一些当后台执行 BGREWRITEAOF 时所发生的数据改变
// 我们只是进行追加append操作,使用realloc分配一块较大的空间不总是我们所预期的那样,而且还可能包含大量的复制操作,基于这个原因,我们使用一个一些块的链表,每个块大小为10MB
// AOF缓冲区大小
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */
// AOF块缓冲区结构
typedef struct aofrwblock {
// 当前已经使用的和可用的字节数
unsigned long used, free;
// 缓冲区
char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;
/* This function free the old AOF rewrite buffer if needed, and initialize
* a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
* so can be used for the first initialization as well. */
// 按需释放旧的AOF缓冲块的,并且初始化一个新的。
void aofRewriteBufferReset(void) {
// 如果当期有缓冲块的链表,则释放旧的缓冲区
if (server.aof_rewrite_buf_blocks)
listRelease(server.aof_rewrite_buf_blocks);
// 初始化一个新的缓冲块链表
server.aof_rewrite_buf_blocks = listCreate();
// 设置链表的释放方法
listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}
/* Return the current size of the AOF rewrite buffer. */
// 返回当前AOF重写缓冲区的大小
unsigned long aofRewriteBufferSize(void) {
listNode *ln;
listIter li;
unsigned long size = 0;
// 设置链表迭代器指向头节点,并设置迭代方向为从头到位
listRewind(server.aof_rewrite_buf_blocks,&li);
// 遍历所有链表节点
while((ln = listNext(&li))) {
// 取出当前节点的缓冲块
aofrwblock *block = listNodeValue(ln);
// 总缓冲区的大小 = 节点数量 × AOF_RW_BUF_BLOCK_SIZE - 最后一个节点的free字节数
size += block->used;
}
return size;
}
/* Event handler used to send data to the child process doing the AOF
* rewrite. We send pieces of our AOF differences buffer so that the final
* write when the child finishes the rewrite will be small. */
// 事件处理程序发送一些数据给正在做AOF重写的子进程,我们发送AOF缓冲区一部分不同的数据给子进程,当子进程完成重写时,重写的文件会比较小
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
UNUSED(el);
UNUSED(fd);
UNUSED(privdata);
UNUSED(mask);
while(1) {
// 获取缓冲块链表的头节点地址
ln = listFirst(server.aof_rewrite_buf_blocks);
// 获取缓冲块地址
block = ln ? ln->value : NULL;
// 如果aof_stop_sending_diff为真,则停止发送累计的不同数据给子进程,或者缓冲块为空
// 则将管道的写端从服务器的监听队列中删除
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
// 如果已经有缓存的数据
if (block->used > 0) {
// 则将缓存的数据写到管道中
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
// 更新缓冲区的数据,覆盖掉已经写到管道的数据
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
}
// 如果当前节点的所缓冲的数据全部写完,则删除该节点
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
// 将s指向的数据追加到AOF重写缓冲区中,如果需要可以新分配一个缓冲块
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
// 获取缓冲块链表的尾节点地址
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
// 如果已经有一个缓存快,那么执行追加append操作
if (block) {
// 判断缓存块的可用空间最多能写的空间
unsigned long thislen = (block->free < len) ? block->free : len;
// 当前缓存块有一定空间
if (thislen) { /* The current block is not already full. */
// 将s指向的数据拷贝thislen长度到当前缓存块
memcpy(block->buf+block->used, s, thislen);
// 更新可用和已用的信息
block->used += thislen;
block->free -= thislen;
// 更新要拷贝的数据地址
s += thislen;
// 更新拷贝的长度
len -= thislen;
}
}
// 需要新创建一个缓存块
if (len) { /* First block to allocate, or need another block. */
int numblocks;
// 分配缓存块空间和初始化
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
// 将缓存块加如到链表缓存区末尾
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
// 每次创建10个或100个缓存块,就更新日志,当做标记
// 获取当前缓存块个数
numblocks = listLength(server.aof_rewrite_buf_blocks);
// 更新日志
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
/* Install a file event to send data to the rewrite child if there is
* not one already. */
// 获取当前事件正在监听的类型,如果等于0,未设置,则设置管道aof_pipe_write_data_to_child为可写状态
// 当然aof_pipe_write_data_to_child可以用的时候,调用aofChildWriteDiffDatah()函数写数据
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
/* Write the buffer (possibly composed of multiple blocks) into the specified
* fd. If a short write or any other error happens -1 is returned,
* otherwise the number of bytes written is returned. */
// 将缓冲区的数据写到制定的文件描述符fd中,成功返回写入的字节数,否则返回-1
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
// 设置链表迭代器指向头节点,并设置迭代方向为从头到位
listRewind(server.aof_rewrite_buf_blocks,&li);
// 遍历所有链表节点
while((ln = listNext(&li))) {
// 获取缓存块地址
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
// 如果有数据,则将已有的数据写到fd中
if (block->used) {
nwritten = write(fd,block->buf,block->used);
// 如果发生短写(short write),设置errno返回-1
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
// 更新成功写的字节数
count += nwritten;
}
}
return count;
}
/* ----------------------------------------------------------------------------
* AOF file implementation
* ------------------------------------------------------------------------- */
// AOF文件的实现
/* Starts a background task that performs fsync() against the specified
* file descriptor (the one of the AOF file) in another thread. */
// 在另一个线程中,创建一个后台任务,对给定的fd,执行一个fsync()函数
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
/* Called when the user switches from "appendonly yes" to "appendonly no"
* at runtime using the CONFIG command. */
// 用户通过CONFIG命令设置appendonly为no时,调用stopAppendOnly()
void stopAppendOnly(void) {
// 保证AOF的状态为正在进行AOF
serverAssert(server.aof_state != AOF_OFF);
// 强制将AOF缓冲区内容冲洗到AOF文件中
flushAppendOnlyFile(1);
// 对AOF文件执行同步操作
aof_fsync(server.aof_fd);
// 关闭AOF文件
close(server.aof_fd);
// 清空AOF状态
server.aof_fd = -1;
server.aof_selected_db = -1;
server.aof_state = AOF_OFF;
/* rewrite operation in progress? kill it, wait child exit */
// 如果正在进行AOF
if (server.aof_child_pid != -1) {
int statloc;
serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
(long) server.aof_child_pid);
// 杀死当前正在进行AOF的子进程
if (kill(server.aof_child_pid,SIGUSR1) != -1) {
// 等待子进程退出
while(wait3(&statloc,0,NULL) != server.aof_child_pid);
}
/* reset the buffer accumulating changes while the child saves */
// 释放旧的AOF缓冲块的,并且初始化一个新的。
aofRewriteBufferReset();
// 删除临时文件
aofRemoveTempFile(server.aof_child_pid);
// 清除执行AOF进程的id和重写的时间
server.aof_child_pid = -1;
server.aof_rewrite_time_start = -1;
/* close pipes used for IPC between the two processes. */
// 关闭两个进程之间的通信管道
aofClosePipes();
}
}
/* Called when the user switches from "appendonly no" to "appendonly yes"
* at runtime using the CONFIG command. */
// 用户通过CONFIG命令设置appendonly为yes时,调用startAppendOnly()
int startAppendOnly(void) {
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
// 设置AOF最近一个同步的时间
server.aof_last_fsync = server.unixtime;
// 打开一个AOF文件
server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
// 确保当前AOF状态为关闭状态
serverAssert(server.aof_state == AOF_OFF);
// 打开文件失败,获得文件路径更新日志
if (server.aof_fd == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Redis needs to enable the AOF but can't open the "
"append only file %s (in server root dir %s): %s",
server.aof_filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}
// 当前正在执行RDB持久化,那么将AOF提上日程
if (server.rdb_child_pid != -1) {
server.aof_rewrite_scheduled = 1; //设置提上日程标记
serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
// 如果AOF后台重写失败
} else if (rewriteAppendOnlyFileBackground() == C_ERR) {
// 关闭AOF文件描述符,并更新日志
close(server.aof_fd);
serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return C_ERR;
}
/* We correctly switched on AOF, now wait for the rewrite to be complete
* in order to append data on disk. */
// 设置AOF状态为等待重写完成
server.aof_state = AOF_WAIT_REWRITE;
return C_OK;
}
/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
// 将AOF缓存写到磁盘中
// 因为我们需要在回复client之前对AOF执行写操作,唯一的机会是在事件loop中,因此累计所有的AOF到缓存中,在下一次重新进入事件loop之前将缓存写到AOF文件中
// 关于force参数
// 当fsync被设置为每秒执行一次,如果后台仍有线程正在执行fsync操作,我们可能会延迟flush操作,因为write操作可能会被阻塞,当发生这种情况时,说明需要尽快的执行flush操作,会调用 serverCron() 函数。
// 然而如果force被设置为1,我们会无视后台的fsync,直接进行写入操作
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
// 将AOF缓存冲洗到磁盘中
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
// 如果缓冲区中没有数据,直接返回
if (sdslen(server.aof_buf) == 0) return;
// 同步策略是每秒同步一次
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// AOF同步操作是否在后台正在运行
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
// 同步策略是每秒同步一次,且不是强制同步的
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
// 根据这个同步策略,且没有设置强制执行,我们在后台执行同步
// 如果同步已经在后台执行,那么可以延迟两秒,如果设置了force,那么服务器会阻塞在write操作上
// 如果后台正在执行同步
if (sync_in_progress) {
// 延迟执行flush操作的开始时间为0,表示之前没有延迟过write
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
// 之前没有延迟过write操作,那么将延迟write操作的开始时间保存下来,然后就直接返回
server.aof_flush_postponed_start = server.unixtime;
return;
// 如果之前延迟过write操作,如果没到2秒,直接返回,不执行write
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
// 执行到这里,表示后台正在执行fsync,但是延迟时间已经超过2秒
// 那么执行write操作,此时write会被阻塞
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
// 执行write操作,保证写操作是原子操作
// 设置延迟检测开始的时间
latencyStartMonitor(latency);
// 将缓冲区的内容写到AOF文件中
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
// 设置延迟的时间 = 当前的时间 - 开始的时间
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
// 捕获不同造成延迟write的事件
// 如果正在后台执行同步fsync
if (sync_in_progress) {
// 将latency和"aof-write-pending-fsync"关联到延迟诊断字典中
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
// 如果正在执行AOF或正在执行RDB
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
// 将latency和"aof-write-active-child"关联到延迟诊断字典中
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
// 将latency和"aof-write-alone"关联到延迟诊断字典中
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
// 将latency和"aof-write"关联到延迟诊断字典中
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
// 执行了write,所以清零延迟flush的时间
server.aof_flush_postponed_start = 0;
// 如果写入的字节数不等于缓存的字节数,发生异常错误
if (nwritten != (signed)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
// 限制日志的频率每行30秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
// 如果写入错误,写errno到日志
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
// 如果是写了一部分,发生错误
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 将追加的内容截断,删除了追加的内容,恢复成原来的文件
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
// 如果是写入的策略为每次写入就同步,无法恢复这种策略的写,因为我们已经告知使用者,已经将写的数据同步到磁盘了,因此直接退出程序
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
//设置执行write操作的状态
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
// 如果只写入了局部,没有办法用ftruncate()函数去恢复原来的AOF文件
if (nwritten > 0) {
// 只能更新当前的AOF文件的大小
server.aof_current_size += nwritten;
// 删除AOF缓冲区写入的字节数
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
// nwritten == (signed)sdslen(server.aof_buf
// 执行write写入成功
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 更新最近一次写的状态为 C_OK
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
// 只能更新当前的AOF文件的大小
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
// 如果这个缓存足够小,小于4K,那么重用这个缓存,否则释放AOF缓存
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf); //将缓存内容清空,重用
} else {
sdsfree(server.aof_buf); //释放缓存空间
server.aof_buf = sdsempty();//创建一个新缓存
}
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
// 如果no-appendfsync-on-rewrite被设置为yes,表示正在执行重写,则不执行fsync
// 或者正在执行 BGSAVE 或 BGWRITEAOF,也不执行
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
/* Perform the fsync if needed. */
// 执行fsync进行同步,每次写入都同步
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
// 设置延迟检测开始的时间
latencyStartMonitor(latency);
// Linux下调用fdatasync()函数更高效的执行同步
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
// 设置延迟的时间 = 当前的时间 - 开始的时间
latencyEndMonitor(latency);
// 将latency和"aof-fsync-always"关联到延迟诊断字典中
latencyAddSampleIfNeeded("aof-fsync-always",latency);
// 更新最近一次执行同步的时间
server.aof_last_fsync = server.unixtime;
// 每秒执行一次同步,当前时间大于上一次执行同步的时间
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 如果没有正在执行同步,那么在后台开一个线程执行同步
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
// 更新最近一次执行同步的时间
server.aof_last_fsync = server.unixtime;
}
}
// 根据传入的命令和命令参数,将他们还原成协议格式
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;
// 格式:"*<argc>\r\n"
buf[0] = '*';
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
buf[len++] = '\r';
buf[len++] = '\n';
// 拼接到dst的后面
dst = sdscatlen(dst,buf,len);
// 遍历所有的参数,建立命令的格式:$<command_len>\r\n<command>\r\n
for (j = 0; j < argc; j++) {
o = getDecodedObject(argv[j]); //解码成字符串对象
buf[0] = '$';
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
}
return dst; //返回还原后的协议内容
}
/* Create the sds representation of an PEXPIREAT command, using
* 'seconds' as time to live and 'cmd' to understand what command
* we are translating into a PEXPIREAT.
*
* This command is used in order to translate EXPIRE and PEXPIRE commands
* into PEXPIREAT command so that we retain precision in the append only
* file, and the time is always absolute and not relative. */
// 用sds表示一个 PEXPIREAT 命令,seconds为生存时间,cmd为指定转换的指令
// 这个函数用来转换 EXPIRE and PEXPIRE 命令成 PEXPIREAT ,以便在AOF时,时间总是一个绝对值
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
long long when;
robj *argv[3];
/* Make sure we can use strtoll */
// 解码成字符串对象,以便使用strtoll函数
seconds = getDecodedObject(seconds);
// 取出过期值,long long类型
when = strtoll(seconds->ptr,NULL,10);
/* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
// 将 EXPIRE, SETEX, EXPIREAT 参数的秒转换成毫秒
if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
cmd->proc == expireatCommand)
{
when *= 1000;
}
/* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
// 将 EXPIRE, PEXPIRE, SETEX, PSETEX 命令的参数,从相对时间设置为绝对时间
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == setexCommand || cmd->proc == psetexCommand)
{
when += mstime();
}
decrRefCount(seconds);
// 创建一个 PEXPIREAT 命令对象
argv[0] = createStringObject("PEXPIREAT",9);
argv[1] = key;
argv[2] = createStringObjectFromLongLong(when);
// 将命令还原成协议格式,追加到buf
buf = catAppendOnlyGenericCommand(buf, 3, argv);
decrRefCount(argv[0]);
decrRefCount(argv[2]);
// 返回buf
return buf;
}
// 将命令追加到AOF文件中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty(); //设置一个空sds
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
// 使用SELECT命令,显式的设置当前数据库
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
// 构造SELECT命令的协议格式
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
// 执行AOF时,当前的数据库ID
server.aof_selected_db = dictid;
}
// 如果是 EXPIRE/PEXPIRE/EXPIREAT 三个命令,则要转换成 PEXPIREAT 命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
// 如果是 SETEX/PSETEX 命令,则转换成 SET and PEXPIREAT
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
// SETEX key seconds value
// 构建SET命令对象
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
// 将SET命令按协议格式追加到buf中
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
// 将SETEX/PSETEX命令和键对象按协议格式追加到buf中
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
// 其他命令直接按协议格式转换,然后追加到buf中
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
// 如果正在进行AOF,则将命令追加到AOF的缓存中,在重新进入事件循环之前,这些命令会被冲洗到磁盘上,并向client回复
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
// 如果后台正在进行重写,那么将命令追加到重写缓存区中,以便我们记录重写的AOF文件于当前数据库的差异
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
/* ----------------------------------------------------------------------------
* AOF loading
* ------------------------------------------------------------------------- */
// AOF载入
/* In Redis commands are always executed in the context of a client, so in
* order to load the append only file we need to create a fake client. */
// Redis命令总是在一个客户端中被执行,为了载入AOF文件,我们需要创建并返回一个伪client
struct client *createFakeClient(void) {
struct client *c = zmalloc(sizeof(*c)); //分配空间
selectDb(c,0);
c->fd = -1;
c->name = NULL;
c->querybuf = sdsempty();
c->querybuf_peak = 0;
c->argc = 0;
c->argv = NULL;
c->bufpos = 0;
c->flags = 0;
c->btype = BLOCKED_NONE;
/* We set the fake client as a slave waiting for the synchronization
* so that Redis will not try to send replies to this client. */
// 将客户端设置为一个等待同步的从节点,于是Redis不会发送回复给这个client
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
c->watched_keys = listCreate();
c->peerid = NULL;
listSetFreeMethod(c->reply,decrRefCountVoid);
listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c);
return c;
}
// 释放伪client的参数列表空间
void freeFakeClientArgv(struct client *c) {
int j;
for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
zfree(c->argv);
}
// 释放伪client空间
void freeFakeClient(struct client *c) {
// 释放查询缓存
sdsfree(c->querybuf);
// 释放回复缓存
listRelease(c->reply);
// 释放监控列表
listRelease(c->watched_keys);
// 释放事物状态
freeClientMultiState(c);
// 释放client的空间
zfree(c);
}
/* Replay the append log file. On success C_OK is returned. On non fatal
* error (the append only file is zero-length) C_ERR is returned. On
* fatal error an error message is logged and the program exists. */
// 执行AOF文件中的命令
// 成功返回C_OK,出现非致命错误返回C_ERR,例如AOF文件长度为0,出现致命错误打印日志退出
int loadAppendOnlyFile(char *filename) {
struct client *fakeClient;
FILE *fp = fopen(filename,"r"); //以读打开AOF文件
struct redis_stat sb;
int old_aof_state = server.aof_state; //备份当前AOF的状态
long loops = 0;
off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */
// 如果文件打开,但是大小为0,则返回C_ERR
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return C_ERR;
}
// 如果文件打开失败,打印日志,退出
if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we're about to read. */
// 暂时关闭AOF,防止在执行MULTI时,EXEC命令被传播到AOF文件中
server.aof_state = AOF_OFF;
// 生成一个伪client
fakeClient = createFakeClient();
// 设置载入的状态信息
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
/* Serve the clients from time to time */
// 间隔性的处理client请求
if (!(loops++ % 1000)) {
// ftello(fp)返回当前文件载入的偏移量
// 设置载入时server的状态信息,更新当前载入的进度
loadingProgress(ftello(fp));
// 在服务器被阻塞的状态下,仍然能处理请求
// 因为当前处于载入状态,当client的请求到来时,总是返回loading的状态错误
processEventsWhileBlocked();
}
// 将一行文件内容读到buf中,遇到"\r\n"停止
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp)) //如果文件已经读完了或数据库为空,则跳出while循环
break;
else
goto readerr;
}
// 检查文件格式 "*<argc>\r\n"
if (buf[0] != '*') goto fmterr;
if (buf[1] == '\0') goto readerr;
// 取出命令参数个数
argc = atoi(buf+1);
if (argc < 1) goto fmterr; //至少一个参数,就是当前命令
// 分配参数列表空间
argv = zmalloc(sizeof(robj*)*argc);
// 设置伪client的参数列表
fakeClient->argc = argc;
fakeClient->argv = argv;
// 遍历参数列表
// "$<command_len>\r\n<command>\r\n"
for (j = 0; j < argc; j++) {
// 读一行内容到buf中,遇到"\r\n"停止
if (fgets(buf,sizeof(buf),fp) == NULL) {
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
// 检查格式
if (buf[0] != '$') goto fmterr;
// 读出参数的长度len
len = strtol(buf+1,NULL,10);
// 初始化一个len长度的sds
argsds = sdsnewlen(NULL,len);
// 从文件中读出一个len字节长度,将值保存到argsds中
if (len && fread(argsds,len,1,fp) == 0) {
sdsfree(argsds);
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
// 创建一个字符串对象保存读出的参数argsds
argv[j] = createObject(OBJ_STRING,argsds);
// 读两个字节,跳过"\r\n"
if (fread(buf,2,1,fp) == 0) {
fakeClient->argc = j+1; /* Free up to j. */
freeFakeClientArgv(fakeClient);
goto readerr; /* discard CRLF */
}
}
/* Command lookup */
// 查找命令
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
exit(1);
}
/* Run the command in the context of a fake client */
// 调用伪client执行命令
cmd->proc(fakeClient);
/* The fake client should not have a reply */
// 伪client不应该有回复
serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
// 伪client不应该是阻塞的
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
// 释放伪client的参数列表
freeFakeClientArgv(fakeClient);
// 更新已载入且命令合法的当前文件的偏移量
if (server.aof_load_truncated) valid_up_to = ftello(fp);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
// 执行到这里,说明AOF文件的所有内容都被正确的读取
// 如果伪client处于 MULTI/EXEC 的环境中,还有检测文件是否包含正确事物的结束,调到uxeof
if (fakeClient->flags & CLIENT_MULTI) goto uxeof;
// 载入成功
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp); //关闭文件
freeFakeClient(fakeClient); //释放伪client
server.aof_state = old_aof_state; //还原AOF状态
stopLoading(); //设置载入完成的状态
aofUpdateCurrentSize(); //更新服务器状态,当前AOF文件的大小
server.aof_rewrite_base_size = server.aof_current_size; //更新重写的大小
return C_OK;
// 载入时读错误,如果feof(fp)为真,则直接执行 uxeof
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
if (!feof(fp)) {
// 退出前释放伪client的空间
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
exit(1);
}
// 不被预期的AOF文件结束格式
uxeof: /* Unexpected AOF end of file. */
// 如果发现末尾结束格式不完整则自动截掉,成功加载前面正确的数据。
if (server.aof_load_truncated) {
serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
(unsigned long long) valid_up_to);
// 截断文件到正确加载的位置
if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
if (valid_up_to == -1) {
serverLog(LL_WARNING,"Last valid command offset is invalid");
} else {
serverLog(LL_WARNING,"Error truncating the AOF file: %s",
strerror(errno));
}
} else {
/* Make sure the AOF file descriptor points to the end of the
* file after the truncate call. */
// 确保截断后的文件指针指向文件的末尾
if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
strerror(errno));
} else {
serverLog(LL_WARNING,
"AOF loaded anyway because aof-load-truncated is enabled");
goto loaded_ok; //跳转到loaded_ok,表截断成功,成功加载前面正确的数据。
}
}
}
// 退出前释放伪client的空间
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
exit(1);
// 格式错误
fmterr: /* Format error. */
// 退出前释放伪client的空间
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
/* ----------------------------------------------------------------------------
* AOF rewrite
* ------------------------------------------------------------------------- */
// AOF 重写
/* Delegate writing an object to writing a bulk string or bulk long long.
* This is not placed in rio.c since that adds the server.h dependency. */
// 将obj对象按照格式写入rio中,返回写入的字节数
int rioWriteBulkObject(rio *r, robj *obj) {
/* Avoid using getDecodedObject to help copy-on-write (we are often
* in a child process when this function is called). */
// obj对象指向的是整数类型
if (obj->encoding == OBJ_ENCODING_INT) {
return rioWriteBulkLongLong(r,(long)obj->ptr);
// obj对象指向的是字符串类型
} else if (sdsEncodedObject(obj)) {
return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
} else {
serverPanic("Unknown string encoding");
}