Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:[TS-5776]add raw type from consumer #29666

Merged
merged 36 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
cd16068
feat:add test script
wangmm0220 Jan 8, 2025
dc708a4
feat:add test script
Jan 13, 2025
def67c0
feat:add test script
Jan 20, 2025
a4235fc
Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/…
wangmm0220 Jan 20, 2025
7c5c9f6
fix:[TS-5776]add raw type from consumer
wangmm0220 Jan 22, 2025
b9d966f
Merge branch 'feat/TS-5776' of https://github.com/taosdata/TDengine i…
wangmm0220 Jan 22, 2025
3fb2ec4
fix:[TS-5776]add raw type from consumer
wangmm0220 Jan 22, 2025
7fad4bc
fix:[TS-5776]add raw type from consumer
wangmm0220 Jan 24, 2025
902d067
fix:[TS-5776]add raw type from consumer
wangmm0220 Jan 24, 2025
9074a99
fix:[TS-5776]add raw type from consumer
wangmm0220 Jan 24, 2025
b5edb79
fix:[TS-5776]add raw type from consumer
wangmm0220 Jan 24, 2025
14bc7b0
feat:add test logic
Feb 5, 2025
fc0cf9b
feat:add test logic
Feb 5, 2025
9bde3ca
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 6, 2025
c5eeadd
Merge branch 'feat/TS-5776' of https://github.com/taosdata/TDengine i…
wangmm0220 Feb 6, 2025
c81a696
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 7, 2025
94b74cc
Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/…
wangmm0220 Feb 7, 2025
170d78b
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 10, 2025
878bac1
fix:[TS-5776]avoid memcpy fo DataRspObj
wangmm0220 Feb 11, 2025
f0b9e7b
fix:[TS-5776]avoid memcpy fo DataRspObj
wangmm0220 Feb 11, 2025
020840b
fix:[TS-5776]avoid memcpy fo DataRspObj
wangmm0220 Feb 12, 2025
891a605
fix:[TS-5776]avoid memcpy fo DataRspObj
wangmm0220 Feb 12, 2025
c3f4c4c
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 12, 2025
daea0ec
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 13, 2025
3440006
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 13, 2025
d67de02
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 14, 2025
f16f218
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 14, 2025
39f77e4
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 14, 2025
47a6836
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 15, 2025
3da00b7
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 17, 2025
420c222
fix:[TS-5776]add raw type from consumer
wangmm0220 Feb 17, 2025
bf5dc63
fix:[TS-5776]add test case
wangmm0220 Feb 17, 2025
6f2ccfc
Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/…
wangmm0220 Feb 17, 2025
551eb5b
fix:[TS-5776]add test case
wangmm0220 Feb 17, 2025
9dcac19
fix:[TS-5776]error in ci
wangmm0220 Feb 18, 2025
07e4265
fix:[TS-5776]error in create table time
wangmm0220 Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/client/taos.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ typedef enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3
TMQ_RES_METADATA = 3,
TMQ_RES_RAWDATA = 4
} tmq_res_t;

typedef struct tmq_topic_assignment {
Expand Down
3 changes: 2 additions & 1 deletion include/common/tcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ enum {
TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
TMQ_MSG_TYPE__POLL_RAW_DATA_RSP,
};

static char* tmqMsgTypeStr[] = {
"data", "meta", "ask ep", "meta data", "wal info", "batch meta"
"data", "meta", "ask ep", "meta data", "wal info", "batch meta", "raw data"
};

enum {
Expand Down
18 changes: 17 additions & 1 deletion include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,10 @@ typedef struct SSysTableSchema {
int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);

#define RETRIEVE_TABLE_RSP_VERSION 0
#define RETRIEVE_TABLE_RSP_TMQ_VERSION 1
#define RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION 2

typedef struct {
int64_t useconds;
int8_t completed; // all results are returned to client
Expand Down Expand Up @@ -4184,7 +4188,9 @@ typedef struct {
STqOffsetVal reqOffset;
int8_t enableReplay;
int8_t sourceExcluded;
int8_t rawData;
int8_t enableBatchMeta;
SHashObj *uidHash; // to find if uid is duplicated
} SMqPollReq;

int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
Expand Down Expand Up @@ -4258,13 +4264,21 @@ typedef struct {
SArray* createTableLen;
SArray* createTableReq;
};
struct{
int32_t len;
void* rawData;
};
};
void* data; //for free in client, only effected if type is data or metadata. raw data not effected
bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client)

} SMqDataRsp;

int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
int32_t tDecodeMqRawDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
void tDeleteMqRawDataRsp(SMqDataRsp* pRsp);

int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
Expand Down Expand Up @@ -4514,6 +4528,7 @@ typedef struct {

typedef struct {
SArray* aSubmitTbData; // SArray<SSubmitTbData>
bool raw;
} SSubmitReq2;

typedef struct {
Expand All @@ -4522,8 +4537,9 @@ typedef struct {
char data[]; // SSubmitReq2
} SSubmitReq2Msg;

int32_t transformRawSSubmitTbData(void* data, int64_t suid, int64_t uid, int32_t sver);
int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq, SArray* rawList);
void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag);

Expand Down
3 changes: 3 additions & 0 deletions include/libs/parser/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen, void* charsetCxt);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data);
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen);

int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
Expand Down
2 changes: 2 additions & 0 deletions include/util/taoserror.h
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
#define TSDB_CODE_TMQ_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x4018)
#define TSDB_CODE_TMQ_RAW_DATA_SPLIT TAOS_DEF_ERROR_CODE(0, 0x4019)

// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
Expand Down
2 changes: 2 additions & 0 deletions source/client/inc/clientInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum {
RES_TYPE__TMQ_META,
RES_TYPE__TMQ_METADATA,
RES_TYPE__TMQ_BATCH_META,
RES_TYPE__TMQ_RAWDATA,
};

#define SHOW_VARIABLES_RESULT_COLS 5
Expand All @@ -55,6 +56,7 @@ enum {
#define SHOW_VARIABLES_RESULT_FIELD5_LEN (TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE)

#define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY)
#define TD_RES_TMQ_RAW(res) (*(int8_t*)(res) == RES_TYPE__TMQ_RAWDATA)
#define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
Expand Down
36 changes: 19 additions & 17 deletions source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ void taos_close(TAOS *taos) {
}

int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return terrno;
}

Expand All @@ -514,7 +514,7 @@ int taos_errno(TAOS_RES *res) {
}

const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return (const char *)tstrerror(terrno);
}

Expand Down Expand Up @@ -554,6 +554,8 @@ void taos_free_result(TAOS_RES *res) {
tDeleteMqMetaRsp(&pRsp->metaRsp);
} else if (TD_RES_TMQ_BATCH_META(res)) {
tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
} else if (TD_RES_TMQ_RAW(res)) {
tDeleteMqRawDataRsp(&pRsp->dataRsp);
}
taosMemoryFree(pRsp);
}
Expand All @@ -572,7 +574,7 @@ void taos_kill_query(TAOS *taos) {
}

int taos_field_count(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}

Expand All @@ -583,7 +585,7 @@ int taos_field_count(TAOS_RES *res) {
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }

TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL;
}

Expand Down Expand Up @@ -643,7 +645,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
} else {
tscError("invalid result passed to taos_fetch_row");
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
terrno = TSDB_CODE_TMQ_INVALID_DATA;
return NULL;
}
}
Expand Down Expand Up @@ -764,7 +766,7 @@ int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD
}

int *taos_fetch_lengths(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return NULL;
}

Expand All @@ -773,7 +775,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
}

TAOS_ROW *taos_result_block(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
Expand Down Expand Up @@ -841,7 +843,7 @@ const char *taos_get_client_info() { return td_version; }

// return int32_t
int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
Expand All @@ -853,7 +855,7 @@ int taos_affected_rows(TAOS_RES *res) {

// return int64_t
int64_t taos_affected_rows64(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return 0;
}
Expand All @@ -864,7 +866,7 @@ int64_t taos_affected_rows64(TAOS_RES *res) {
}

int taos_result_precision(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_TIME_PRECISION_MILLI;
}

Expand Down Expand Up @@ -904,7 +906,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}

void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
TD_RES_TMQ_BATCH_META(res)) {
return;
}
Expand All @@ -913,7 +915,7 @@ void taos_stop_query(TAOS_RES *res) {
}

bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return true;
}
SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
Expand All @@ -938,7 +940,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}

int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}

Expand Down Expand Up @@ -973,15 +975,15 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
return 0;
} else {
tscError("taos_fetch_block_s invalid res type");
return -1;
return TSDB_CODE_TMQ_INVALID_DATA;
}
}

int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
*numOfRows = 0;
*pData = NULL;

if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}

Expand Down Expand Up @@ -1018,7 +1020,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}

int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
if (res == NULL || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return 0;
}

Expand All @@ -1038,7 +1040,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {

int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
TD_RES_TMQ_BATCH_META(res)) {
TD_RES_TMQ_RAW(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA;
}

Expand Down
Loading