Skip to content

Commit 24175ab

Browse files
committed
newfix: enable dedup on stmt/stmt2 inserts in interlace mode
1 parent fe6ca7b commit 24175ab

File tree

8 files changed

+360
-115
lines changed

8 files changed

+360
-115
lines changed

include/common/tdataformat.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ typedef struct {
378378
TAOS_MULTI_BIND *bind;
379379
} SBindInfo;
380380
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
381-
SArray *rowArray);
381+
SArray *rowArray, bool *orderedDup);
382382

383383
// stmt2 binding
384384
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
@@ -392,7 +392,7 @@ typedef struct {
392392
} SBindInfo2;
393393

394394
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
395-
SArray *rowArray);
395+
SArray *rowArray, bool *orderedDup);
396396

397397
#endif
398398

source/client/src/clientStmt.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,12 +1557,6 @@ int stmtExec(TAOS_STMT* stmt) {
15571557
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
15581558
} else {
15591559
if (pStmt->sql.stbInterlaceMode) {
1560-
STableDataCxt *pTableCxt = pStmt->exec.pCurrBlock;
1561-
if (!pTableCxt->ordered || pTableCxt->duplicateTs) {
1562-
tscError("failed to insert disordered or duplicate data");
1563-
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1564-
}
1565-
15661560
int64_t startTs = taosGetTimestampUs();
15671561
while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
15681562
taosUsleep(1);

source/client/src/clientStmt2.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,12 +1669,6 @@ int stmtExec2(TAOS_STMT2* stmt, int* affected_rows) {
16691669

16701670
if (STMT_TYPE_QUERY != pStmt->sql.type) {
16711671
if (pStmt->sql.stbInterlaceMode) {
1672-
STableDataCxt *pTableCxt = pStmt->exec.pCurrBlock;
1673-
if (!pTableCxt->ordered || pTableCxt->duplicateTs) {
1674-
tscError("failed to insert disordered or duplicate data");
1675-
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
1676-
}
1677-
16781672
int64_t startTs = taosGetTimestampUs();
16791673
while (atomic_load_64(&pStmt->sql.siInfo.tbRemainNum)) {
16801674
taosUsleep(1);

source/common/src/tdataformat.c

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,9 +449,10 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para
449449
* `infoSorted` is whether the bind information is sorted by column id
450450
* `pTSchema` is the schema of the table
451451
* `rowArray` is the array to store the rows
452+
* `orderedDup` is an array to store ordered and duplicateTs
452453
*/
453454
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
454-
SArray *rowArray) {
455+
SArray *rowArray, bool *orderedDup) {
455456
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
456457
return TSDB_CODE_INVALID_PARA;
457458
}
@@ -469,6 +470,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
469470
return terrno;
470471
}
471472

473+
SRowKey rowKey, lastRowKey;
472474
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
473475
taosArrayClear(colValArray);
474476

@@ -507,6 +509,24 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
507509
code = terrno;
508510
goto _exit;
509511
}
512+
513+
if (orderedDup) {
514+
tRowGetKey(row, &rowKey);
515+
if (iRow == 0) {
516+
// init to ordered by default
517+
orderedDup[0] = true;
518+
// init to non-duplicate by default
519+
orderedDup[1] = false;
520+
} else {
521+
// no more compare if we already get disordered or duplicate rows
522+
if (orderedDup[0] && !orderedDup[1]) {
523+
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
524+
orderedDup[0] = (code >= 0);
525+
orderedDup[1] = (code == 0);
526+
}
527+
}
528+
lastRowKey = rowKey;
529+
}
510530
}
511531

512532
_exit:
@@ -3235,9 +3255,10 @@ int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int3
32353255
* `infoSorted` is whether the bind information is sorted by column id
32363256
* `pTSchema` is the schema of the table
32373257
* `rowArray` is the array to store the rows
3258+
* `orderedDup` is an array to store ordered and duplicateTs
32383259
*/
32393260
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
3240-
SArray *rowArray) {
3261+
SArray *rowArray, bool *orderedDup) {
32413262
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
32423263
return TSDB_CODE_INVALID_PARA;
32433264
}
@@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
32663287
}
32673288
}
32683289

3290+
SRowKey rowKey, lastRowKey;
32693291
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
32703292
taosArrayClear(colValArray);
32713293

@@ -3317,6 +3339,24 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
33173339
code = terrno;
33183340
goto _exit;
33193341
}
3342+
3343+
if (orderedDup) {
3344+
tRowGetKey(row, &rowKey);
3345+
if (iRow == 0) {
3346+
// init to ordered by default
3347+
orderedDup[0] = true;
3348+
// init to non-duplicate by default
3349+
orderedDup[1] = false;
3350+
} else {
3351+
// no more compare if we already get disordered or duplicate rows
3352+
if (orderedDup[0] && !orderedDup[1]) {
3353+
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
3354+
orderedDup[0] = (code >= 0);
3355+
orderedDup[1] = (code == 0);
3356+
}
3357+
}
3358+
lastRowKey = rowKey;
3359+
}
33203360
}
33213361

33223362
_exit:

source/libs/parser/src/parInsertStmt.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
323323
int32_t code = 0;
324324
int16_t lastColId = -1;
325325
bool colInOrder = true;
326+
bool orderedDup[2];
326327

327328
if (NULL == *pTSchema) {
328329
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
@@ -368,7 +369,9 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
368369
// }
369370
}
370371

371-
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
372+
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
373+
pDataBlock->ordered = orderedDup[0];
374+
pDataBlock->duplicateTs = orderedDup[1];
372375

373376
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
374377

@@ -682,6 +685,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
682685
int16_t lastColId = -1;
683686
bool colInOrder = true;
684687
int ncharColNums = 0;
688+
bool orderedDup[2];
685689

686690
if (NULL == *pTSchema) {
687691
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
@@ -738,7 +742,9 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
738742
pBindInfos[c].bytes = pColSchema->bytes;
739743
}
740744

741-
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
745+
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
746+
pDataBlock->ordered = orderedDup[0];
747+
pDataBlock->duplicateTs = orderedDup[1];
742748

743749
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
744750

tests/script/api/makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ exe:
2929
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
3030
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
3131
gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS)
32+
gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS)
3233

3334
clean:
3435
rm $(ROOT)batchprepare
@@ -48,3 +49,4 @@ clean:
4849
rm $(ROOT)stmt2-nohole
4950
rm $(ROOT)stmt-crash
5051
rm $(ROOT)stmt-insert-dupkeys
52+
rm $(ROOT)stmt2-insert-dupkeys

0 commit comments

Comments
 (0)