diff --git a/pkg/proxy/tunnel.go b/pkg/proxy/tunnel.go index c138056d2ed04..8a312f63c79c3 100644 --- a/pkg/proxy/tunnel.go +++ b/pkg/proxy/tunnel.go @@ -520,6 +520,9 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) { p.mu.started = false p.mu.cond.Broadcast() } + + var firstCond bool + var currSeq int16 var lastSeq int16 = -1 var rotated bool prepareNextMessage := func() (terminate bool, err error) { @@ -557,8 +560,6 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) { // set txn status and cmd time within the mutex together. // only server->client pipe need to set the txn status. if p.name == pipeServerToClient { - var currSeq int16 - // issue#16042 if len(tempBuf) > 3 { currSeq = int16(tempBuf[3]) @@ -576,7 +577,27 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) { rotated = false } - inTxn, ok := checkTxnStatus(tempBuf) + // seqID is mainly used for server side. It records the sequence ID of + // each packet. + // In the case of "load data local infile" statement, client sends the + // first packet, then server sends response, which is "0xFB + filename", + // after that, client sends content of filename and an empty packet, at + // last, server sends OK packet. The sequence ID of this OK packet is not + // 1, and will cause the session cannot be transferred after this stmt + // finished. + // So, the solution is: when server sends 0xFB and the sequence ID of + // next packet is 3 bigger than last one, the next packet MUST be an + // OK packet, and the transfer is allowed. + // Related issue: https://github.com/matrixorigin/mo-cloud/issues/4088 + var mustOK bool + if !firstCond { + firstCond = isLoadDataLocalInfileRespPacket(tempBuf) + } else { + mustOK = currSeq-lastSeq == 3 + firstCond = false + } + + inTxn, ok := checkTxnStatus(tempBuf, mustOK) if ok { p.mu.inTxn = inTxn } @@ -721,10 +742,10 @@ func txnStatus(status uint16) bool { } // handleOKPacket handles the OK packet from server to update the txn state. -func handleOKPacket(msg []byte) bool { +func handleOKPacket(msg []byte, mustOK bool) bool { var mp *frontend.MysqlProtocolImpl - // the sequence ID should be 1 for OK packet. - if msg[3] != 1 { + // if the mustOK is false, then the sequence ID should be 1 for OK packet. + if !mustOK && msg[3] != 1 { return txnStatus(0) } pos := 5 @@ -754,14 +775,14 @@ func handleEOFPacket(msg []byte) bool { // the first return value is the txn status, and the second return value // indicates if we can get the txn status from the packet. If it is a ERROR // packet, the second return value is false. -func checkTxnStatus(msg []byte) (bool, bool) { +func checkTxnStatus(msg []byte, mustOK bool) (bool, bool) { ok := true inTxn := true // For the server->client pipe, we get the transaction status from the // OK and EOF packet, which is used in connection transfer. If the session // is in a transaction, a transfer should not start. if isOKPacket(msg) { - inTxn = handleOKPacket(msg) + inTxn = handleOKPacket(msg, mustOK) } else if isEOFPacket(msg) { inTxn = handleEOFPacket(msg) } else if isErrPacket(msg) { diff --git a/pkg/proxy/tunnel_test.go b/pkg/proxy/tunnel_test.go index 323b859746134..61ff6846d4194 100644 --- a/pkg/proxy/tunnel_test.go +++ b/pkg/proxy/tunnel_test.go @@ -691,24 +691,64 @@ func TestReplaceServerConn(t *testing.T) { } func TestCheckTxnStatus(t *testing.T) { - inTxn, ok := checkTxnStatus(nil) - require.True(t, ok) - require.True(t, inTxn) - - inTxn, ok = checkTxnStatus(makeErrPacket(8)) - require.False(t, ok) - require.True(t, inTxn) - - p1 := makeOKPacket(5) - value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED - binary.LittleEndian.PutUint16(p1[7:], value) - inTxn, ok = checkTxnStatus(p1) - require.True(t, ok) - require.False(t, inTxn) - - value |= frontend.SERVER_STATUS_IN_TRANS - binary.LittleEndian.PutUint16(p1[7:], value) - inTxn, ok = checkTxnStatus(p1) - require.True(t, ok) - require.True(t, inTxn) + t.Run("mustOK false", func(t *testing.T) { + inTxn, ok := checkTxnStatus(nil, false) + require.True(t, ok) + require.True(t, inTxn) + + inTxn, ok = checkTxnStatus(makeErrPacket(8), false) + require.False(t, ok) + require.True(t, inTxn) + + p1 := makeOKPacket(5) + value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, false) + require.True(t, ok) + require.False(t, inTxn) + + value |= frontend.SERVER_STATUS_IN_TRANS + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, false) + require.True(t, ok) + require.True(t, inTxn) + }) + + t.Run("mustOK true", func(t *testing.T) { + inTxn, ok := checkTxnStatus(nil, true) + require.True(t, ok) + require.True(t, inTxn) + + inTxn, ok = checkTxnStatus(makeErrPacket(8), true) + require.False(t, ok) + require.True(t, inTxn) + + p1 := makeOKPacket(5) + value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.False(t, inTxn) + + value |= frontend.SERVER_STATUS_IN_TRANS + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.True(t, inTxn) + + value ^= frontend.SERVER_STATUS_IN_TRANS + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.False(t, inTxn) + + p1[3] = 4 + inTxn, ok = checkTxnStatus(p1, false) + require.True(t, ok) + require.True(t, inTxn) + + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.False(t, inTxn) + }) } diff --git a/pkg/proxy/util.go b/pkg/proxy/util.go index ecc73506a05c3..20dfd4b6ff417 100644 --- a/pkg/proxy/util.go +++ b/pkg/proxy/util.go @@ -98,6 +98,15 @@ func isErrPacket(p []byte) bool { return false } +// isLoadDataLocalInfileRespPacket returns true if []byte is a packet +// of load data local infile response. +func isLoadDataLocalInfileRespPacket(p []byte) bool { + if len(p) > 4 && p[4] == 0xFB { + return true + } + return false +} + // isEmptyPacket returns true if []byte is an empty packet. func isEmptyPacket(p []byte) bool { return len(p) == 0 diff --git a/pkg/proxy/util_test.go b/pkg/proxy/util_test.go index 8667f35c42106..902d31be03f3c 100644 --- a/pkg/proxy/util_test.go +++ b/pkg/proxy/util_test.go @@ -200,6 +200,20 @@ func TestIsErrPacket(t *testing.T) { require.True(t, ret) } +func TestIsLoadDataLocalInfileRespPacket(t *testing.T) { + var data []byte + ret := isLoadDataLocalInfileRespPacket(data) + require.False(t, ret) + + data = []byte{0, 0, 0, 0, 2, 0} + ret = isLoadDataLocalInfileRespPacket(data) + require.False(t, ret) + + data = []byte{0, 0, 0, 0, 0xFB, 0} + ret = isLoadDataLocalInfileRespPacket(data) + require.True(t, ret) +} + func TestIsDeallocatePacket(t *testing.T) { var data []byte ret := isDeallocatePacket(data)