Skip to content

Commit

Permalink
添加了中文注释,方便初学者阅读理解
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiao1024 committed Jul 25, 2024
1 parent 90f6ddf commit a95f64e
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 229 deletions.
178 changes: 91 additions & 87 deletions aof/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,114 +21,116 @@ import (
"github.com/hdt3213/godis/redis/protocol"
)

/*在aof过程中需要注意的事情
get 之类的读命令并不需要进行持久化
expire 命令要用等效的 expireat 命令替换。
举例说明,10:00 执行 expire a 3600 表示键 a 在 11:00 过期,
在 10:30 载入AOF文件时执行 expire a 3600 就成了 11:30 过期与原数据不符。
*/
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte

// aofQueueSize 定义AOF队列大小
const (
aofQueueSize = 1 << 20
)

// Fsync 策略常量
const (
// FsyncAlways do fsync for every command
// 每个命令执行后都同步到磁盘
FsyncAlways = "always"
// FsyncEverySec do fsync every second
// 每秒同步一次到磁盘
FsyncEverySec = "everysec"
// FsyncNo lets operating system decides when to do fsync
// 由操作系统决定何时同步到磁盘
FsyncNo = "no"
)

// payload 结构体用于封装要写入AOF的命令数据
type payload struct {
cmdLine CmdLine
dbIndex int
wg *sync.WaitGroup
wg *sync.WaitGroup // WaitGroup,用于同步等待所有命令的写入操作完成,主要用于测试和确保数据一致性
}

// Listener will be called-back after receiving a aof payload
// with a listener we can forward the updates to slave nodes etc.
// Listener 接口用于回调监听AOF的变更
type Listener interface {
// Callback will be called-back after receiving a aof payload
// Callback 会在接收到AOF 的 payload后被调用
Callback([]CmdLine)
}

// Persister receive msgs from channel and write to AOF file
// Aof的处理抽象结构体,AOF持久化处理器
type Persister struct {
ctx context.Context
cancel context.CancelFunc
db database.DBEngine
tmpDBMaker func() database.DBEngine
// aofChan is the channel to receive aof payload(listenCmd will send payload to this channel)
aofChan chan *payload
// aofFile is the file handler of aof file
aofFile *os.File
// aofFilename is the path of aof file
aofFilename string
// aofFsync is the strategy of fsync
aofFsync string
// aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down
aofFinished chan struct{}
// pause aof for start/finish aof rewrite progress
pausingAof sync.Mutex
currentDB int
listeners map[Listener]struct{}
// reuse cmdLine buffer
buffer []CmdLine
ctx context.Context // 上下文,用于管理和取消长时间运行的goroutine
cancel context.CancelFunc // 取消函数,用于在关闭Persister时停止所有goroutine
db database.DBEngine // 数据库引擎接口,必须持有以便操作redis的业务核心
tmpDBMaker func() database.DBEngine // 用于创建临时数据库实例的函数,通常在重写AOF时使用
aofChan chan *payload // AOF载荷通道,用于接收来自其他组件的命令,以便异步写入AOF文件
aofFile *os.File // AOF文件的文件句柄,用于文件读写操作
aofFilename string // AOF文件的路径,用于文件操作时指定正确的文件
aofFsync string // Fsync策略,控制数据何时从内存同步到磁盘("always", "everysec", "no")

aofFinished chan struct{} // 当AOF处理goroutine完成后,通过此通道通知主goroutine
pausingAof sync.Mutex // 在AOF重写过程中用于暂停AOF记录的互斥锁
currentDB int // 当前数据库的索引,用于支持多数据库环境,保证命令在正确的数据库上执行
listeners map[Listener]struct{} // 监听器集合,用于实现发布-订阅模式,当AOF有更新时通知这些监听器
buffer []CmdLine // 命令行缓冲区,用于减少内存分配,重用命令行数据
}

// NewPersister creates a new aof.Persister
// NewPersister 创建新的AOF持久化处理器
func NewPersister(db database.DBEngine, filename string, load bool, fsync string, tmpDBMaker func() database.DBEngine) (*Persister, error) {
persister := &Persister{}
persister := &Persister{} // 初始化Persister结构体实例
persister.aofFilename = filename
persister.aofFsync = strings.ToLower(fsync)
persister.aofFsync = strings.ToLower(fsync) // 设置文件同步策略,统一转为小写以防止大小写错误
persister.db = db
persister.tmpDBMaker = tmpDBMaker
persister.tmpDBMaker = tmpDBMaker // 设置临时数据库创建函数,用于AOF重写等场景
persister.currentDB = 0
// load aof file if needed
if load {
persister.LoadAof(0)
persister.LoadAof(0) // 根据需要加载AOF文件
}
aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
persister.aofFile = aofFile
persister.aofChan = make(chan *payload, aofQueueSize)
persister.aofFinished = make(chan struct{})
persister.aofChan = make(chan *payload, aofQueueSize) // 创建一个负载通道,用于异步接收要写入的命令
persister.aofFinished = make(chan struct{}) // 创建一个通道,用于通知AOF处理完成
persister.listeners = make(map[Listener]struct{})
// start aof goroutine to write aof file in background and fsync periodically if needed (see fsyncEverySecond)
// 启动一个协程来监听和处理AOF命令
go func() {
persister.listenCmd()
persister.listenCmd() // 启动一个协程监听和处理AOF命令
}()
// 设置context和cancel,用于管理协程的生命周期
ctx, cancel := context.WithCancel(context.Background())
persister.ctx = ctx
persister.cancel = cancel
// fsync every second if needed
if persister.aofFsync == FsyncEverySec {
// 如果策略是每秒同步,则启动定时同步
if persister.aofFsync == FsyncEverySec { // 如果策略是每秒同步,则启动定时同步
persister.fsyncEverySecond()
}
return persister, nil
}

// RemoveListener removes a listener from aof handler, so we can close the listener
// RemoveListener 移除一个监听器
func (persister *Persister) RemoveListener(listener Listener) {
persister.pausingAof.Lock()
defer persister.pausingAof.Unlock()
delete(persister.listeners, listener)
}

// SaveCmdLine send command to aof goroutine through channel
// SaveCmdLine 将命令行数据发送到AOF处理协程
func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) {
// aofChan will be set as nil temporarily during load aof see Persister.LoadAof
//如果没有初始化管道,会出错
if persister.aofChan == nil {
return
}

//判断一下开启的aof的策略
if persister.aofFsync == FsyncAlways {
p := &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
persister.writeAof(p)
return
persister.writeAof(p) // 如果策略是立即同步,则直接写入文件
}

persister.aofChan <- &payload{
Expand All @@ -138,50 +140,52 @@ func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) {

}

// listenCmd listen aof channel and write into file
// listenCmd 监听AOF队列,并将命令写入文件
func (persister *Persister) listenCmd() {
for p := range persister.aofChan {
persister.writeAof(p)
}
persister.aofFinished <- struct{}{}
}

// writeAof 处理写入AOF文件的具体逻辑
func (persister *Persister) writeAof(p *payload) {
persister.buffer = persister.buffer[:0] // reuse underlying array
persister.pausingAof.Lock() // prevent other goroutines from pausing aof
persister.buffer = persister.buffer[:0] // 清空缓冲区以重用
persister.pausingAof.Lock() // 加锁以同步对AOF操作的访问,防止重写冲突
defer persister.pausingAof.Unlock()
// ensure aof is in the right database
// 确认我当前操作是否需要切换db,如果跟上次相同则不需要再插入select db相关命令
if p.dbIndex != persister.currentDB {
// select db
selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
// 如果当前数据库索引不正确,则发送SELECT命令切换数据库
selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex)) //ToCmdLine功能是把输入的字符串序列,变成[][] string的切片
persister.buffer = append(persister.buffer, selectCmd)
data := protocol.MakeMultiBulkReply(selectCmd).ToBytes()
_, err := persister.aofFile.Write(data)
data := protocol.MakeMultiBulkReply(selectCmd).ToBytes() //调用ToBytes之后会变成redis协议的格式
_, err := persister.aofFile.Write(data) //写入文件
if err != nil {
logger.Warn(err)
return // skip this command
}
persister.currentDB = p.dbIndex
persister.currentDB = p.dbIndex // 更新当前数据库索引
}
// save command
// 写入实际的命令
data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
persister.buffer = append(persister.buffer, p.cmdLine)
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
// 通知所有注册的监听器
for listener := range persister.listeners {
listener.Callback(persister.buffer)
}
// 如果同步策略为每个命令后立即同步,则执行同步
if persister.aofFsync == FsyncAlways {
_ = persister.aofFile.Sync()
}
}

// LoadAof read aof file, can only be used before Persister.listenCmd started
// LoadAof 从AOF文件加载数据,通常在服务器启动时调用
func (persister *Persister) LoadAof(maxBytes int) {
// persister.db.Exec may call persister.AddAof
// delete aofChan to prevent loaded commands back into aofChan
// 在加载过程中暂时关闭aofChan,防止新的写入命令干扰加载过程
aofChan := persister.aofChan
persister.aofChan = nil
defer func(aofChan chan *payload) {
Expand All @@ -198,14 +202,14 @@ func (persister *Persister) LoadAof(maxBytes int) {
}
defer file.Close()

// load rdb preamble if needed
// 从文件中解析可能的RDB预数据
decoder := rdb.NewDecoder(file)
err = persister.db.LoadRDB(decoder)
if err != nil {
// no rdb preamble
// 如果没有RDB预数据,从文件开头开始加载
file.Seek(0, io.SeekStart)
} else {
// has rdb preamble
// 如果存在RDB预数据,从预数据后开始读取
_, _ = file.Seek(int64(decoder.GetReadCount())+1, io.SeekStart)
maxBytes = maxBytes - decoder.GetReadCount()
}
Expand All @@ -215,12 +219,13 @@ func (persister *Persister) LoadAof(maxBytes int) {
} else {
reader = file
}
// 解析AOF数据流
ch := parser.ParseStream(reader)
fakeConn := connection.NewFakeConn() // only used for save dbIndex
fakeConn := connection.NewFakeConn() // 用于保存数据库索引的虚拟连接
for p := range ch {
if p.Err != nil {
if p.Err == io.EOF {
break
break // 文件读取完成
}
logger.Error("parse error: " + p.Err.Error())
continue
Expand Down Expand Up @@ -248,69 +253,68 @@ func (persister *Persister) LoadAof(maxBytes int) {
}
}

// Fsync flushes aof file to disk
// Fsync 将AOF文件的内容同步到磁盘
func (persister *Persister) Fsync() {
persister.pausingAof.Lock()
persister.pausingAof.Lock() // 加锁以防止在同步时发生并发写操作
if err := persister.aofFile.Sync(); err != nil {
logger.Errorf("fsync failed: %v", err)
logger.Errorf("fsync failed: %v", err) // 同步失败时记录错误
}
persister.pausingAof.Unlock()
}

// Close gracefully stops aof persistence procedure
// Close 优雅地停止AOF持久化过程
func (persister *Persister) Close() {
if persister.aofFile != nil {
close(persister.aofChan)
<-persister.aofFinished // wait for aof finished
err := persister.aofFile.Close()
close(persister.aofChan) // 关闭AOF命令通道
<-persister.aofFinished // 等待后台goroutine完成AOF处理
err := persister.aofFile.Close() // 关闭AOF文件
if err != nil {
logger.Warn(err)
}
}
persister.cancel()
persister.cancel() // 取消相关的context,确保所有goroutine可以清理并退出
}

// fsyncEverySecond fsync aof file every second
// fsyncEverySecond 每秒同步AOF文件到磁盘
func (persister *Persister) fsyncEverySecond() {
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(time.Second) // 创建一个定时器,每秒触发一次
go func() {
for {
select {
case <-ticker.C:
persister.Fsync()
case <-persister.ctx.Done():
return
case <-ticker.C: // 每秒触发
persister.Fsync() // 调用Fsync方法同步数据到磁盘
case <-persister.ctx.Done(): // 监听context的取消信号
return // 如果接收到取消信号,退出goroutine
}
}
}()
}

// generateAof 根据当前数据库状态生成新的AOF文件
func (persister *Persister) generateAof(ctx *RewriteCtx) error {
// rewrite aof tmpFile
tmpFile := ctx.tmpFile
// load aof tmpFile
tmpAof := persister.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize))
tmpFile := ctx.tmpFile // 获取临时文件的文件句柄
tmpAof := persister.newRewriteHandler() // 创建一个新的AOF重写处理器
tmpAof.LoadAof(int(ctx.fileSize)) // 加载AOF数据到临时处理器
for i := 0; i < config.Properties.Databases; i++ {
// select db
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
_, err := tmpFile.Write(data)
_, err := tmpFile.Write(data) // 写入选择数据库的命令到临时文件
if err != nil {
return err
}
// dump db
// 遍历数据库中的每个键值对,并写入临时文件
tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
cmd := EntityToCmd(key, entity)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
if expiration != nil {
cmd := MakeExpireCmd(key, *expiration)
cmd := MakeExpireCmd(key, *expiration) // 如果有过期时间,生成过期命令
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
_, _ = tmpFile.Write(cmd.ToBytes()) // 写入过期命令到临时文件
}
}
return true
return true // 继续遍历
})
}
return nil
Expand Down
Loading

0 comments on commit a95f64e

Please sign in to comment.