Skip to content

Commit eb6094f

Browse files
committed
Merge branch 'main' into release/ver-3.3.0.0
2 parents 296edc2 + f7f9496 commit eb6094f

36 files changed

+2011
-1948
lines changed

include/common/rsync.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ extern "C" {
1111

1212
#include "tarray.h"
1313

14-
void stopRsync();
15-
void startRsync();
16-
int uploadRsync(char* id, char* path);
17-
int downloadRsync(char* id, char* path);
18-
int deleteRsync(char* id);
14+
void stopRsync();
15+
void startRsync();
16+
int32_t uploadRsync(const char* id, const char* path);
17+
int32_t downloadRsync(const char* id, const char* path);
18+
int32_t deleteRsync(const char* id);
1919

2020
#ifdef __cplusplus
2121
}

include/dnode/vnode/tqCommon.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
3333
bool isLeader, bool restored);
3434
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
3535
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
36-
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
3736
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
3837
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
3938
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);

include/libs/executor/storageapi.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ extern "C" {
3535
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
3636
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
3737
#define CACHESCAN_RETRIEVE_LAST 0x8
38-
#define CACHESCAN_RETRIEVE_PK 0x10
3938

4039
#define META_READER_LOCK 0x0
4140
#define META_READER_NOLOCK 0x1

include/libs/stream/streammsg.h

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright (c) 2019 TAOS Data, Inc. <[email protected]>
3+
*
4+
* This program is free software: you can use, redistribute, and/or modify
5+
* it under the terms of the GNU Affero General Public License, version 3
6+
* or later ("AGPL"), as published by the Free Software Foundation.
7+
*
8+
* This program is distributed in the hope that it will be useful, but WITHOUT
9+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10+
* FITNESS FOR A PARTICULAR PURPOSE.
11+
*
12+
* You should have received a copy of the GNU Affero General Public License
13+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
14+
*/
15+
16+
#ifndef TDENGINE_STREAMMSG_H
17+
#define TDENGINE_STREAMMSG_H
18+
19+
#include "tmsg.h"
20+
21+
#ifdef __cplusplus
22+
extern "C" {
23+
#endif
24+
25+
typedef struct SStreamChildEpInfo {
26+
int32_t nodeId;
27+
int32_t childId;
28+
int32_t taskId;
29+
SEpSet epSet;
30+
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
31+
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
32+
} SStreamChildEpInfo;
33+
34+
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
35+
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
36+
37+
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
38+
typedef struct {
39+
int64_t streamId;
40+
int64_t checkpointId;
41+
int32_t taskId;
42+
int32_t nodeId;
43+
SEpSet mgmtEps;
44+
int32_t mnodeId;
45+
int32_t transId;
46+
int8_t mndTrigger;
47+
int64_t expireTime;
48+
} SStreamCheckpointSourceReq;
49+
50+
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
51+
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
52+
53+
typedef struct {
54+
int64_t streamId;
55+
int64_t checkpointId;
56+
int32_t taskId;
57+
int32_t nodeId;
58+
int32_t mnodeId;
59+
int64_t expireTime;
60+
int8_t success;
61+
} SStreamCheckpointSourceRsp;
62+
63+
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
64+
65+
typedef struct SStreamTaskNodeUpdateMsg {
66+
int32_t transId; // to identify the msg
67+
int64_t streamId;
68+
int32_t taskId;
69+
SArray* pNodeList; // SArray<SNodeUpdateInfo>
70+
} SStreamTaskNodeUpdateMsg;
71+
72+
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg);
73+
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
74+
75+
typedef struct {
76+
int64_t reqId;
77+
int64_t stage;
78+
int64_t streamId;
79+
int32_t upstreamNodeId;
80+
int32_t upstreamTaskId;
81+
int32_t downstreamNodeId;
82+
int32_t downstreamTaskId;
83+
int32_t childId;
84+
} SStreamTaskCheckReq;
85+
86+
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
87+
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
88+
89+
typedef struct {
90+
int64_t reqId;
91+
int64_t streamId;
92+
int32_t upstreamNodeId;
93+
int32_t upstreamTaskId;
94+
int32_t downstreamNodeId;
95+
int32_t downstreamTaskId;
96+
int32_t childId;
97+
int64_t oldStage;
98+
int8_t status;
99+
} SStreamTaskCheckRsp;
100+
101+
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
102+
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
103+
104+
typedef struct {
105+
SMsgHead msgHead;
106+
int64_t streamId;
107+
int64_t checkpointId;
108+
int32_t downstreamTaskId;
109+
int32_t downstreamNodeId;
110+
int32_t upstreamTaskId;
111+
int32_t upstreamNodeId;
112+
int32_t childId;
113+
} SStreamCheckpointReadyMsg;
114+
115+
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
116+
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
117+
118+
struct SStreamDispatchReq {
119+
int32_t type;
120+
int64_t stage; // nodeId from upstream task
121+
int64_t streamId;
122+
int32_t taskId;
123+
int32_t msgId; // msg id to identify if the incoming msg from the same sender
124+
int32_t srcVgId;
125+
int32_t upstreamTaskId;
126+
int32_t upstreamChildId;
127+
int32_t upstreamNodeId;
128+
int32_t upstreamRelTaskId;
129+
int32_t blockNum;
130+
int64_t totalLen;
131+
SArray* dataLen; // SArray<int32_t>
132+
SArray* data; // SArray<SRetrieveTableRsp*>
133+
};
134+
135+
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const struct SStreamDispatchReq* pReq);
136+
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, struct SStreamDispatchReq* pReq);
137+
void tCleanupStreamDispatchReq(struct SStreamDispatchReq* pReq);
138+
139+
struct SStreamRetrieveReq {
140+
int64_t streamId;
141+
int64_t reqId;
142+
int32_t srcTaskId;
143+
int32_t srcNodeId;
144+
int32_t dstTaskId;
145+
int32_t dstNodeId;
146+
int32_t retrieveLen;
147+
SRetrieveTableRsp* pRetrieve;
148+
};
149+
150+
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const struct SStreamRetrieveReq* pReq);
151+
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, struct SStreamRetrieveReq* pReq);
152+
void tCleanupStreamRetrieveReq(struct SStreamRetrieveReq* pReq);
153+
154+
typedef struct SStreamTaskCheckpointReq {
155+
int64_t streamId;
156+
int32_t taskId;
157+
int32_t nodeId;
158+
} SStreamTaskCheckpointReq;
159+
160+
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
161+
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
162+
163+
typedef struct SStreamHbMsg {
164+
int32_t vgId;
165+
int32_t numOfTasks;
166+
SArray* pTaskStatus; // SArray<STaskStatusEntry>
167+
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
168+
} SStreamHbMsg;
169+
170+
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
171+
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
172+
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
173+
174+
typedef struct {
175+
SMsgHead head;
176+
int64_t streamId;
177+
int32_t taskId;
178+
int32_t reqType;
179+
} SStreamTaskRunReq;
180+
181+
#ifdef __cplusplus
182+
}
183+
#endif
184+
185+
#endif // TDENGINE_STREAMMSG_H

0 commit comments

Comments
 (0)