Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should leave isr if local data need fix #182

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consistence/data_placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,7 @@ func (dpm *DataPlacement) prepareCandidateNodesForNewLeader(topicInfo *TopicPart
coordLog.Infof("failed to get log id on replica: %v, %v", replica, err)
continue
}
// TODO: maybe we should check the epoch in the log, is it possible we have higher epoch with less log id?
if cid > newestLogID {
newestReplicas = newestReplicas[0:0]
newestReplicas = append(newestReplicas, replica)
Expand Down
35 changes: 31 additions & 4 deletions consistence/nsqd_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,11 @@ func (ncoord *NsqdCoordinator) loadLocalTopicData() error {
partition := topic.GetTopicPart()
if tc, err := ncoord.getTopicCoordData(topicName, partition); err == nil && tc != nil {
// already loaded
if tc.IsNeedFix() {
go ncoord.requestLeaveFromISR(topicName, partition)
}
if tc.topicLeaderSession.LeaderNode == nil || tc.topicLeaderSession.Session == "" {
if tc.topicInfo.Leader == ncoord.myNode.GetID() {
if tc.topicInfo.Leader == ncoord.myNode.GetID() && !tc.IsNeedFix() {
err := ncoord.acquireTopicLeader(&tc.topicInfo)
if err != nil {
coordLog.Infof("failed to acquire leader : %v", err)
Expand Down Expand Up @@ -869,25 +872,28 @@ func (ncoord *NsqdCoordinator) loadLocalTopicData() error {
if localErr != nil {
coordLog.Errorf("check local topic %v data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr)
topic.SetDataFixState(true)
tc.GetData().setNeedFix(true)
go ncoord.requestLeaveFromISR(topicInfo.Name, topicInfo.Partition)
} else if !topicInfo.OrderedMulti {
delayQ := topic.GetDelayedQueue()
localErr = checkAndFixLocalLogQueueData(tc.GetData(), delayQ, tc.GetData().delayedLogMgr, forceFix)
if localErr != nil {
coordLog.Errorf("check local topic %v delayed queue data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr)
delayQ.SetDataFixState(true)
tc.GetData().setNeedFix(true)
go ncoord.requestLeaveFromISR(topicInfo.Name, topicInfo.Partition)
}
if delayQ != nil {
localErr = delayQ.CheckConsistence()
if localErr != nil {
coordLog.Errorf("check local topic %v delayed queue data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr)
delayQ.SetDataFixState(true)
tc.GetData().setNeedFix(true)
go ncoord.requestLeaveFromISR(topicInfo.Name, topicInfo.Partition)
}
}
}
if topicInfo.Leader == ncoord.myNode.GetID() {
if !tc.IsNeedFix() && topicInfo.Leader == ncoord.myNode.GetID() {
coordLog.Infof("topic %v starting as leader.", topicInfo.GetTopicDesp())
tc.DisableWrite(true)
err := ncoord.acquireTopicLeader(topicInfo)
Expand Down Expand Up @@ -1575,6 +1581,8 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator,
localLogSegStart, _, _ := logMgr.GetLogStartInfo()
countNumIndex, _ := logMgr.ConvertToCountIndex(logIndex, offset)

// TODO: here we need check if we have the same epoch with leader in the last commit log,
// If not, we need to truncate the catchup to make sure the last old epoch log is the same with leader
coordLog.Infof("topic %v catchup commit log begin :%v at: %v:%v:%v", topicInfo.GetTopicDesp(),
localLogSegStart, logIndex, offset, countNumIndex)
for offset > localLogSegStart.SegmentStartOffset || logIndex > localLogSegStart.SegmentStartIndex {
Expand Down Expand Up @@ -1693,6 +1701,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator,
if localErr != nil {
coordLog.Errorf("check local topic %v data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr)
localLogQ.SetDataFixState(true)
tc.setNeedFix(true)
}

if localLogQ.IsDataNeedFix() {
Expand All @@ -1714,6 +1723,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator,
localTopic.Unlock()
if localErr != nil {
localLogQ.SetDataFixState(true)
tc.setNeedFix(true)
coordLog.Errorf("failed to reset local topic %v data: %v", localTopic.GetFullName(), localErr)
return logIndex, offset, needFullSync, &CoordErr{localErr.Error(), RpcNoErr, CoordLocalErr}
}
Expand Down Expand Up @@ -1743,6 +1753,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator,
if localErr != nil {
coordLog.Errorf("failed to reset local topic data: %v", localErr)
localLogQ.SetDataFixState(true)
tc.setNeedFix(true)
return logIndex, offset, needFullSync, &CoordErr{localErr.Error(), RpcNoErr, CoordLocalErr}
}
_, localErr = logMgr.TruncateToOffsetV2(0, 0)
Expand Down Expand Up @@ -1779,6 +1790,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator,
localTopic.Unlock()
if localErr != nil {
localLogQ.SetDataFixState(true)
tc.setNeedFix(true)
coordLog.Warningf("reset topic %v queue with start %v failed: %v", topicInfo.GetTopicDesp(), firstLogData, localErr)
return logIndex, offset, needFullSync, &CoordErr{localErr.Error(), RpcNoErr, CoordLocalErr}
}
Expand Down Expand Up @@ -1914,6 +1926,7 @@ func (ncoord *NsqdCoordinator) pullCatchupDataFromLeader(tc *TopicCoordinator,
localLogQ.ForceFlush()
logIndex, offset = logMgr.GetCurrentEnd()
localLogQ.SetDataFixState(false)
tc.setNeedFix(false)
if synced {
break
}
Expand Down Expand Up @@ -2326,6 +2339,9 @@ func (ncoord *NsqdCoordinator) updateTopicInfo(topicCoord *TopicCoordinator, sho
return err
}

if newCoordData.IsNeedFix() {
go ncoord.requestLeaveFromISR(newTopicInfo.Name, newTopicInfo.Partition)
}
if newTopicInfo.Leader == ncoord.myNode.GetID() {
// not leader before and became new leader
if oldData.GetLeader() != ncoord.myNode.GetID() {
Expand All @@ -2337,7 +2353,7 @@ func (ncoord *NsqdCoordinator) updateTopicInfo(topicCoord *TopicCoordinator, sho
if shouldDisableWrite {
topicCoord.DisableWrite(true)
}
if needAcquireLeaderSession {
if needAcquireLeaderSession && !newCoordData.IsNeedFix() {
go ncoord.acquireTopicLeader(newTopicInfo)
}
} else {
Expand All @@ -2359,6 +2375,10 @@ func (ncoord *NsqdCoordinator) notifyAcquireTopicLeader(coord *coordData) *Coord
if atomic.LoadInt32(&ncoord.stopping) == 1 {
return ErrClusterChanged
}
if coord.IsNeedFix() {
coordLog.Warningf("topic %v is need fix, should not acquire leader", coord.topicInfo.GetTopicDesp())
return nil
}
coordLog.Infof("I am notified to acquire topic leader %v.", coord.topicInfo)
go ncoord.acquireTopicLeader(&coord.topicInfo)
return nil
Expand All @@ -2384,6 +2404,7 @@ func (ncoord *NsqdCoordinator) TryFixLocalTopic(topic string, pid int) error {
localTopic.Unlock()
if localErr == nil {
localTopic.SetDataFixState(false)
topicCoord.setNeedFix(false)
}
return nil
}
Expand All @@ -2410,6 +2431,7 @@ func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator
atomic.StoreInt32(&topicCoord.disableWrite, 1)
isWriteDisabled = true
localTopic.SetDataFixState(true)
tcData.setNeedFix(true)
localTopic.DisableForSlave(master)
}
if tcData.delayedLogMgr != nil && !tcData.topicInfo.OrderedMulti {
Expand All @@ -2420,6 +2442,7 @@ func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator
isWriteDisabled = true
localTopic.GetDelayedQueue().SetDataFixState(true)
localTopic.DisableForSlave(master)
tcData.setNeedFix(true)
}
}
localTopic.Unlock()
Expand Down Expand Up @@ -2555,6 +2578,10 @@ func (ncoord *NsqdCoordinator) updateTopicLeaderSession(topicCoord *TopicCoordin
}
tcData.updateBufferSize(int(dyConf.SyncEvery - 1))
localTopic.SetDynamicInfo(*dyConf, tcData.logMgr)

if tcData.IsNeedFix() {
go ncoord.requestLeaveFromISR(tcData.topicInfo.Name, tcData.topicInfo.Partition)
}
// leader changed (maybe down), we make sure out data is flushed to keep data safe
ncoord.switchStateForMaster(topicCoord, localTopic, false)

Expand All @@ -2564,7 +2591,7 @@ func (ncoord *NsqdCoordinator) updateTopicLeaderSession(topicCoord *TopicCoordin
} else {
if newLS == nil || newLS.LeaderNode == nil || newLS.Session == "" {
coordLog.Infof("topic leader is missing : %v", tcData.topicInfo.GetTopicDesp())
if tcData.GetLeader() == ncoord.myNode.GetID() {
if tcData.GetLeader() == ncoord.myNode.GetID() && !tcData.IsNeedFix() {
go ncoord.acquireTopicLeader(&tcData.topicInfo)
}
} else {
Expand Down
13 changes: 13 additions & 0 deletions consistence/topic_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ type coordData struct {
syncedConsumeMgr *ChannelConsumeMgr
logMgr *TopicCommitLogMgr
delayedLogMgr *TopicCommitLogMgr
needFix int32
}

func (cd *coordData) setNeedFix(needFix bool) {
if needFix {
atomic.StoreInt32(&cd.needFix, 1)
} else {
atomic.StoreInt32(&cd.needFix, 0)
}
}

func (cd *coordData) updateBufferSize(bs int) {
Expand Down Expand Up @@ -326,6 +335,10 @@ func (tc *TopicCoordinator) IsForceLeave() bool {
return atomic.LoadInt32(&tc.forceLeave) == 1
}

func (cd *coordData) IsNeedFix() bool {
return atomic.LoadInt32(&cd.needFix) == 1
}

func (cd *coordData) GetLeader() string {
return cd.topicInfo.Leader
}
Expand Down
6 changes: 6 additions & 0 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,9 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h
s.ctx.nsqadmin.logf("WARNING: %s", err)
messages = append(messages, pe.Error())
}
if producers.Len() == 0 {
return nil, http_api.Err{404, "NODE_NOT_FOUND"}
}
topicStats, _, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
Expand Down Expand Up @@ -587,6 +590,9 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
s.ctx.nsqadmin.logf("WARNING: %s", err)
messages = append(messages, pe.Error())
}
if producers.Len() == 0 {
return nil, http_api.Err{404, "NODE_NOT_FOUND"}
}
_, allChannelStats, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
Expand Down
7 changes: 4 additions & 3 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,8 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) {
topic := nsqd.GetTopicWithExt(topicName, 0, true)
channel := topic.GetChannel("order_channel")
channel.SetOrdered(true)
// sleep here since set order will reset reader
time.Sleep(time.Millisecond)
channel.doSkipZanTest(false)

msgs := make([]*Message, 0, 3)
Expand Down Expand Up @@ -1663,7 +1665,7 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) {
}
topic.PutMessages(msgs)
topic.flushBuffer(true)
time.Sleep(time.Second)
time.Sleep(time.Millisecond)
// consume normal message and some test message
for i := 0; i < 3; i++ {
outputMsg := <-channel.clientMsgChan
Expand All @@ -1672,7 +1674,6 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) {
channel.FinishMessageForce(0, "", outputMsg.ID, true)
channel.ContinueConsumeForOrder()
}
time.Sleep(time.Millisecond * 10)
// make sure zan test timeout
outputMsg := <-channel.clientMsgChan
t.Logf("consume %v", string(outputMsg.Body))
Expand All @@ -1681,7 +1682,7 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) {
time.Sleep(time.Millisecond * 10)
// skip zan test soon to make sure the zan test is inflight
channel.doSkipZanTest(true)
time.Sleep(time.Second * 3)
time.Sleep(opts.MsgTimeout * 2)
toC := time.After(time.Second * 30)

// set zan test skip and should continue consume normal messages
Expand Down
19 changes: 11 additions & 8 deletions nsqd/delay_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,17 @@ func (q *DelayQueue) put(m *Message, rawData []byte, trace bool, checkSize int64
}
oldMsgKey = getDelayedMsgDBKey(int(m.DelayedType), m.DelayedChannel, ts, oldID)
oldV = b.Get(oldMsgKey)
oldMsg, err := DecodeDelayedMessage(oldV, q.IsExt())
if err != nil {
return err
}
// the value from old index is not the same message we are inserting, this may happend when old code use the wrong id in index
if oldMsg.DelayedOrigID != m.DelayedOrigID || oldMsg.DelayedChannel != m.DelayedChannel || oldMsg.DelayedType != m.DelayedType {
nsqLog.Infof("found old delayed index key %v (%v, %v) msg value not matched : %v, %v", newIndexKey, iv, oldMsgKey, oldMsg, m)
oldV = nil
// it may have some old data which only have index but no msg value
if len(oldV) > 0 {
oldMsg, err := DecodeDelayedMessage(oldV, q.IsExt())
if err != nil {
nsqLog.Warningf("found old delayed index key %v (%v, %v) msg value data wrong: %v, %v", newIndexKey, iv, oldMsgKey, oldV, m)
// we can just delete this safely
} else if oldMsg.DelayedOrigID != m.DelayedOrigID || oldMsg.DelayedChannel != m.DelayedChannel || oldMsg.DelayedType != m.DelayedType {
// the value from old index is not the same message we are inserting, this may happend when old code use the wrong id in index
nsqLog.Infof("found old delayed index key %v (%v, %v) msg value not matched : %v, %v", newIndexKey, iv, oldMsgKey, oldMsg, m)
oldV = nil
}
}
}
exists := oldV != nil
Expand Down