diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 952392fbc1e8..be6535b465aa 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -47,6 +47,7 @@ typedef struct { SMsgCb* pMsgCb; int64_t version; uint64_t checkpointId; + bool enableMaxDelay; bool initTableReader; bool initTqReader; bool skipRollup; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index eb9d2de05c63..bf1c6684a683 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -75,6 +75,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { .fillHistory = pTask->info.fillHistory, .winRange = pTask->dataRange.window, .pOtherBackend = NULL, + .enableMaxDelay = (pTask->info.delaySchedParam != 0) }; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__MERGE) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 52fca46e20c0..c7c7b5f3b7cd 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -875,6 +875,7 @@ typedef struct SStreamEventAggOperatorInfo { SSHashObj* pSeDeleted; void* pDelIterator; SArray* pChildren; // cache for children's result; + bool enableMaxDelay; bool ignoreExpiredData; bool ignoreExpiredDataSaved; SArray* pUpdated; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 79cbbf4a43d7..fbcfa4f9178a 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -30,6 +30,7 @@ #include "tfill.h" #include "tglobal.h" #include "tlog.h" +#include "tmsg.h" #include "ttime.h" #define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState" @@ -440,6 +441,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl QUERY_CHECK_CODE(code, lino, _end); continue; } + code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, pOperator, 0); QUERY_CHECK_CODE(code, lino, _end); @@ -455,27 +457,44 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl QUERY_CHECK_CODE(code, lino, _end); } - if (isWindowIncomplete(&curWin)) { - releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore); - continue; - } + // if (isWindowIncomplete(&curWin)) { + // releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore); + // continue; + // } if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator)) { code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin); QUERY_CHECK_CODE(code, lino, _end); } + // alwasys output the result for current window if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { code = saveResult(curWin.winInfo, pSeUpdated); QUERY_CHECK_CODE(code, lino, _end); } + // let's put it into result map for window close trigger, even it is incomplete + // only the started window but not ended window will be kept for next data batch processing + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pInfo->enableMaxDelay && + (curWin.pWinFlag->startFlag && !curWin.pWinFlag->endFlag)) { + curWin.winInfo.pStatePos->beUpdated = true; + SSessionKey key = {0}; + getSessionHashKey(&curWin.winInfo.sessionWin, &key); + + code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin, sizeof(SEventWindowInfo)); + QUERY_CHECK_CODE(code, lino, _end); + } + + if (isWindowIncomplete(&curWin)) { + releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore); + continue; + } + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); - code = - tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); + code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin, sizeof(SEventWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } @@ -514,7 +533,9 @@ int32_t doStreamEventEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOper while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { void* key = tSimpleHashGetKey(pIte, &keyLen); tlen += encodeSSessionKey(buf, key); - tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + + SEventWindowInfo* pWinInfo = pIte; + tlen += encodeSResultWindowInfo(buf, &pWinInfo->winInfo, pInfo->streamAggSup.resultRowSize); } // 2.twAggSup @@ -571,8 +592,12 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera QUERY_CHECK_CODE(code, lino, _end); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); - code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, - sizeof(SResultWindowInfo)); + + SEventWindowInfo eventWinInfo = {0}; + setEventWindowInfo(pAggSup, &winfo.sessionWin, winfo.pStatePos, &eventWinInfo); + + code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &eventWinInfo, + sizeof(SEventWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } @@ -666,6 +691,72 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { return code; } +bool isEventWindowClosed(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) { + return pWin->ekey != INT64_MIN && pWin->ekey <= pTwSup->maxTs - pTwSup->waterMark; +} + +int32_t closeEventWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { + SEventWindowInfo* pInfo = pIte; + + if (isEventWindowClosed(&pInfo->winInfo.sessionWin.win, pTwSup) && !isWindowIncomplete(pInfo)) { + if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) { + code = saveResult(pInfo->winInfo, pClosed); + QUERY_CHECK_CODE(code, lino, _end); + } + + SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL); + code = tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter); + QUERY_CHECK_CODE(code, lino, _end); + } + } +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +void resetUncloseEventWinInfo(SSHashObj* winMap) { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) { + SEventWindowInfo* pWinInfo = pIte; + pWinInfo->winInfo.pStatePos->beUsed = true; + } +} + +int32_t getAllEventWindows(SSHashObj* pHashMap, SSHashObj* pStUpdated, const char* id) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + void* pIte = NULL; + int32_t iter = 0; + + while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { + SEventWindowInfo* pWinInfo = pIte; + if (!pWinInfo->winInfo.pStatePos->beUpdated) { + continue; + } + + pWinInfo->winInfo.pStatePos->beUpdated = false; + code = saveResult(pWinInfo->winInfo, pStUpdated); + + QUERY_CHECK_CODE(code, lino, _end); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } else { + qDebug("%s retrieved all event windows, total:%d", id, tSimpleHashGetSize(pStUpdated)); + } + return code; +} + static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { if (pOperator->status == OP_EXEC_DONE) { (*ppRes) = NULL; @@ -690,7 +781,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe if (pInfo->recvGetAll) { pInfo->recvGetAll = false; - resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + resetUncloseEventWinInfo(pInfo->streamAggSup.pResultRows); } if (pInfo->reCkBlock) { @@ -737,7 +828,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; - code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); + code = getAllEventWindows(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated, GET_TASKID(pTaskInfo)); QUERY_CHECK_CODE(code, lino, _end); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -765,13 +856,16 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe // the pDataBlock are always the same one, no need to call this again code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); QUERY_CHECK_CODE(code, lino, _end); + doStreamEventAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } + // restore the value pOperator->status = OP_RES_TO_RETURN; - - code = closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated); + + // output the closed window results + code = closeEventWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated); QUERY_CHECK_CODE(code, lino, _end); code = copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc); @@ -925,8 +1019,8 @@ void streamEventReloadState(SOperatorInfo* pOperator) { } SSessionKey key = {0}; getSessionHashKey(&curInfo.winInfo.sessionWin, &key); - code = - tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); + + code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo, sizeof(SEventWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } } @@ -1020,6 +1114,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (!pInfo->historyWins) { goto _error; } + if (pHandle) { pInfo->isHistoryOp = (pHandle->fillHistory == STREAM_HISTORY_OPERATOR); } @@ -1059,6 +1154,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initStreamBasicInfo(&pInfo->basic, pOperator); QUERY_CHECK_CODE(code, lino, _error); + if (pEventNode->window.triggerType == STREAM_TRIGGER_WINDOW_CLOSE) { + pInfo->enableMaxDelay = pHandle->enableMaxDelay; + } + if (pEventNode->window.triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) { if (pHandle->fillHistory == STREAM_HISTORY_OPERATOR) { setFillHistoryOperatorFlag(&pInfo->basic); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0fc2100e204a..a44e8776abda 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3141,11 +3141,13 @@ int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHa int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { SResultWindowInfo* pWinInfo = pIte; + if (isCloseWindow(&pWinInfo->sessionWin.win, pTwSup)) { if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) { code = saveResult(*pWinInfo, pClosed); QUERY_CHECK_CODE(code, lino, _end); } + SSessionKey* pKey = tSimpleHashGetKey(pIte, NULL); code = tSimpleHashIterateRemove(pHashMap, pKey, sizeof(SSessionKey), &pIte, &iter); QUERY_CHECK_CODE(code, lino, _end);