Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/libs/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ typedef struct {
SMsgCb* pMsgCb;
int64_t version;
uint64_t checkpointId;
bool enableMaxDelay;
bool initTableReader;
bool initTqReader;
bool skipRollup;
Expand Down
1 change: 1 addition & 0 deletions source/dnode/vnode/src/tqCommon/tqCommon.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
.pOtherBackend = NULL,
.enableMaxDelay = (pTask->info.delaySchedParam != 0)
};

if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) {
Expand Down
1 change: 1 addition & 0 deletions source/libs/executor/inc/executorInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ typedef struct SStreamEventAggOperatorInfo {
SSHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool enableMaxDelay;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
Expand Down
129 changes: 114 additions & 15 deletions source/libs/executor/src/streameventwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "tfill.h"
#include "tglobal.h"
#include "tlog.h"
#include "tmsg.h"
#include "ttime.h"

#define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState"
Expand Down Expand Up @@ -440,6 +441,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
QUERY_CHECK_CODE(code, lino, _end);
continue;
}

code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator, 0);
QUERY_CHECK_CODE(code, lino, _end);
Expand All @@ -455,27 +457,44 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
QUERY_CHECK_CODE(code, lino, _end);
}

if (isWindowIncomplete(&curWin)) {
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore);
continue;
}
// if (isWindowIncomplete(&curWin)) {
// releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore);
// continue;
// }
Comment on lines +460 to +463
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of commented-out code seems to be a leftover from refactoring, as the same logic is now present a few lines below. To improve code clarity and maintainability, it would be best to remove this commented-out block.


if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator)) {
code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
QUERY_CHECK_CODE(code, lino, _end);
}

// alwasys output the result for current window
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curWin.winInfo, pSeUpdated);
QUERY_CHECK_CODE(code, lino, _end);
}

// let's put it into result map for window close trigger, even it is incomplete
// only the started window but not ended window will be kept for next data batch processing
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pInfo->enableMaxDelay &&
(curWin.pWinFlag->startFlag && !curWin.pWinFlag->endFlag)) {
curWin.winInfo.pStatePos->beUpdated = true;
SSessionKey key = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &key);

code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin, sizeof(SEventWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}

if (isWindowIncomplete(&curWin)) {
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore);
continue;
}

if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
curWin.winInfo.pStatePos->beUpdated = true;
SSessionKey key = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
code =
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin, sizeof(SEventWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}

Expand Down Expand Up @@ -514,7 +533,9 @@ int32_t doStreamEventEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pIte, &keyLen);
tlen += encodeSSessionKey(buf, key);
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);

SEventWindowInfo* pWinInfo = pIte;
tlen += encodeSResultWindowInfo(buf, &pWinInfo->winInfo, pInfo->streamAggSup.resultRowSize);
}

// 2.twAggSup
Expand Down Expand Up @@ -571,8 +592,12 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
QUERY_CHECK_CODE(code, lino, _end);

buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
sizeof(SResultWindowInfo));

SEventWindowInfo eventWinInfo = {0};
setEventWindowInfo(pAggSup, &winfo.sessionWin, winfo.pStatePos, &eventWinInfo);

code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &eventWinInfo,
sizeof(SEventWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}

Expand Down Expand Up @@ -666,6 +691,72 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
return code;
}

bool isEventWindowClosed(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
return pWin->ekey != INT64_MIN && pWin->ekey <= pTwSup->maxTs - pTwSup->waterMark;
}

int32_t closeEventWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
SEventWindowInfo* pInfo = pIte;

if (isEventWindowClosed(&pInfo->winInfo.sessionWin.win, pTwSup) && !isWindowIncomplete(pInfo)) {
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
code = saveResult(pInfo->winInfo, pClosed);
QUERY_CHECK_CODE(code, lino, _end);
}

SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL);
code = tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
QUERY_CHECK_CODE(code, lino, _end);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

void resetUncloseEventWinInfo(SSHashObj* winMap) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
SEventWindowInfo* pWinInfo = pIte;
pWinInfo->winInfo.pStatePos->beUsed = true;
}
}

int32_t getAllEventWindows(SSHashObj* pHashMap, SSHashObj* pStUpdated, const char* id) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* pIte = NULL;
int32_t iter = 0;

while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
SEventWindowInfo* pWinInfo = pIte;
if (!pWinInfo->winInfo.pStatePos->beUpdated) {
continue;
}

pWinInfo->winInfo.pStatePos->beUpdated = false;
code = saveResult(pWinInfo->winInfo, pStUpdated);

QUERY_CHECK_CODE(code, lino, _end);
}

_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} else {
qDebug("%s retrieved all event windows, total:%d", id, tSimpleHashGetSize(pStUpdated));
}
return code;
}

static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
Expand All @@ -690,7 +781,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe

if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
resetUncloseEventWinInfo(pInfo->streamAggSup.pResultRows);
}

if (pInfo->reCkBlock) {
Expand Down Expand Up @@ -737,7 +828,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
code = getAllEventWindows(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated, GET_TASKID(pTaskInfo));
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
Expand Down Expand Up @@ -765,13 +856,16 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
// the pDataBlock are always the same one, no need to call this again
code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
QUERY_CHECK_CODE(code, lino, _end);

doStreamEventAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
}

// restore the value
pOperator->status = OP_RES_TO_RETURN;

code = closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);

// output the closed window results
code = closeEventWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
QUERY_CHECK_CODE(code, lino, _end);

code = copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
Expand Down Expand Up @@ -925,8 +1019,8 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
}
SSessionKey key = {0};
getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
code =
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));

code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo, sizeof(SEventWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
}
Expand Down Expand Up @@ -1020,6 +1114,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
if (!pInfo->historyWins) {
goto _error;
}

if (pHandle) {
pInfo->isHistoryOp = (pHandle->fillHistory == STREAM_HISTORY_OPERATOR);
}
Expand Down Expand Up @@ -1059,6 +1154,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = initStreamBasicInfo(&pInfo->basic, pOperator);
QUERY_CHECK_CODE(code, lino, _error);

if (pEventNode->window.triggerType == STREAM_TRIGGER_WINDOW_CLOSE) {
pInfo->enableMaxDelay = pHandle->enableMaxDelay;
}

if (pEventNode->window.triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
if (pHandle->fillHistory == STREAM_HISTORY_OPERATOR) {
setFillHistoryOperatorFlag(&pInfo->basic);
Expand Down
2 changes: 2 additions & 0 deletions source/libs/executor/src/streamtimewindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -3141,11 +3141,13 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
SResultWindowInfo* pWinInfo = pIte;

if (isCloseWindow(&pWinInfo->sessionWin.win, pTwSup)) {
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
code = saveResult(*pWinInfo, pClosed);
QUERY_CHECK_CODE(code, lino, _end);
}

SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL);
code = tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter);
QUERY_CHECK_CODE(code, lino, _end);
Expand Down
Loading