diff --git a/pkg/proxy/net/mysql.go b/pkg/proxy/net/mysql.go index aeefacfb4..44d3d4c72 100644 --- a/pkg/proxy/net/mysql.go +++ b/pkg/proxy/net/mysql.go @@ -877,3 +877,7 @@ func MakeCloseStmtRequest(stmtID uint32) []byte { binary.LittleEndian.PutUint32(request[1:], stmtID) return request } + +func MakeInitDBRequest(db string) []byte { + return append([]byte{ComInitDB.Byte()}, hack.Slice(db)...) +} diff --git a/pkg/sqlreplay/cmd/audit_log_plugin.go b/pkg/sqlreplay/cmd/audit_log_plugin.go index b36e7c708..26694e072 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin.go @@ -4,7 +4,6 @@ package cmd import ( - "bytes" "strconv" "strings" "time" @@ -16,13 +15,16 @@ import ( const ( auditPluginKeyTimeStamp = "TIMESTAMP" - auditPluginKeyDatabase = "DATABASES" auditPluginKeySQL = "SQL_TEXT" auditPluginKeyConnID = "CONNECTION_ID" auditPluginKeyClass = "EVENT_CLASS" auditPluginKeySubClass = "EVENT_SUBCLASS" auditPluginKeyCommand = "COMMAND" auditPluginKeyStmtType = "SQL_STATEMENTS" + auditPluginKeyParams = "EXECUTE_PARAMS" + auditPluginKeyCurDB = "CURRENT_DB" + auditPluginKeyEvent = "EVENT" + auditPluginKeyCostTime = "COST_TIME" auditPluginClassGeneral = "GENERAL" auditPluginClassTableAccess = "TABLE_ACCESS" @@ -31,12 +33,13 @@ const ( auditPluginSubClassConnected = "Connected" auditPluginSubClassDisconnect = "Disconnect" + auditPluginEventEnd = "COMPLETED" + timeLayout = "2006/01/02 15:04:05.999 -07:00" ) type auditLogPluginConnCtx struct { - beginCmd *Command - inited bool + currentDB string } func NewAuditLogPluginDecoder() *AuditLogPluginDecoder { @@ -49,9 +52,16 @@ var _ CmdDecoder = (*AuditLogPluginDecoder)(nil) type AuditLogPluginDecoder struct { connInfo map[uint64]auditLogPluginConnCtx + // pendingCmds contains the commands that has not been returned yet. + pendingCmds []*Command } func (decoder *AuditLogPluginDecoder) Decode(reader LineReader) (*Command, error) { + if len(decoder.pendingCmds) > 0 { + cmd := decoder.pendingCmds[0] + decoder.pendingCmds = decoder.pendingCmds[1:] + return cmd, nil + } for { line, filename, lineIdx, err := reader.ReadLine() if err != nil { @@ -69,35 +79,42 @@ func (decoder *AuditLogPluginDecoder) Decode(reader LineReader) (*Command, error if err != nil { return nil, errors.Errorf("%s, line %d: parsing connection id failed: %s", filename, lineIdx, connStr) } - tsStr := kvs[auditPluginKeyTimeStamp] - if len(tsStr) == 0 { - return nil, errors.Errorf("%s, line %d: no timestamp in line: '%s", filename, lineIdx, line) - } - startTs, err := time.Parse(timeLayout, tsStr) - if err != nil { - return nil, errors.Errorf("%s, line %d: parsing timestamp failed: %s", filename, lineIdx, tsStr) - } - var c *Command + + var cmds []*Command eventClass := kvs[auditPluginKeyClass] switch eventClass { case auditPluginClassGeneral, auditPluginClassTableAccess: - c, err = decoder.parseGeneralEvent(kvs, connID) + cmds, err = decoder.parseGeneralEvent(kvs, connID) case auditPluginClassConnect: + var c *Command c, err = decoder.parseConnectEvent(kvs, connID) + if c != nil { + cmds = []*Command{c} + } default: return nil, errors.Errorf("%s, line %d: unknown event class: %s", filename, lineIdx, eventClass) } if err != nil { - return c, err + return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx) } // The log is ignored, skip. - if c == nil { + if len(cmds) == 0 { continue } - c.Succeess = true - c.ConnID = connID - c.StartTs = startTs - return c, nil + + startTs, err := parseStartTs(kvs) + if err != nil { + return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx) + } + for _, cmd := range cmds { + cmd.Success = true + cmd.ConnID = connID + cmd.StartTs = startTs + } + if len(cmds) > 1 { + decoder.pendingCmds = cmds[1:] + } + return cmds[0], nil } } @@ -165,27 +182,6 @@ func skipQuotes(line string, singleQuote bool) (endIdx int) { return -1 } -// [DATABASES="[test]"] -func parseDB(value string) []string { - var err error - value, err = strconv.Unquote(value) - if err != nil { - return nil - } - if len(value) == 0 { - return nil - } - if value[0] != '[' || value[len(value)-1] != ']' { - // impossible - return nil - } - value = value[1 : len(value)-1] - if len(value) == 0 { - return nil - } - return strings.Split(value, ",") -} - // [COMMAND="Init DB"], [COMMAND=Query] func parseCommand(value string) string { if len(value) == 0 { @@ -202,59 +198,160 @@ func parseCommand(value string) string { return value } -func (decoder *AuditLogPluginDecoder) parseGeneralEvent(kvs map[string]string, connID uint64) (*Command, error) { +func parseStartTs(kvs map[string]string) (time.Time, error) { + endTs, err := time.Parse(timeLayout, kvs[auditPluginKeyTimeStamp]) + if err != nil { + return time.Time{}, errors.Errorf("parsing timestamp failed: %s", kvs[auditPluginKeyTimeStamp]) + } + costTime := kvs[auditPluginKeyCostTime] + if len(costTime) == 0 { + return endTs, nil + } + millis, err := strconv.ParseFloat(costTime, 32) + if err != nil { + return endTs, errors.Errorf("parsing cost time failed: %s", costTime) + } + return endTs.Add(-time.Duration(millis * 1000)), nil +} + +// "[\"KindInt64 1\",\"KindInt64 1\"]" +func parseExecuteParams(value string) ([]any, error) { + v, err := strconv.Unquote(value) + if err != nil { + return nil, errors.Errorf("no quotes in params: %s", value) + } + if len(v) == 0 { + return nil, nil + } + if v[0] != '[' || v[len(v)-1] != ']' { + return nil, errors.Errorf("no brackets in params: %s", value) + } + v = v[1 : len(v)-1] + if len(v) == 0 { + return nil, nil + } + params := make([]any, 0, 10) + for idx := 0; idx < len(v); idx++ { + switch v[idx] { + case '"', '\'': + endIdx := skipQuotes(v[idx+1:], v[idx] == '\'') + if endIdx == -1 { + return nil, errors.Errorf("unterminated quote in params: %s", v[idx+1:]) + } + param, err := parseSingleParam(v[idx+1 : idx+endIdx+1]) + idx += endIdx + 1 + if err != nil { + return nil, err + } + params = append(params, param) + case ',', ' ': + default: + return nil, errors.Errorf("expected char in params: %s", v[idx:]) + } + } + return params, nil +} + +func parseSingleParam(value string) (any, error) { + idx := strings.IndexByte(value, ' ') + if idx < 0 { + return nil, errors.Errorf("no space in param: %s", value) + } + tpStr := value[:idx] + value = value[idx+1:] + switch tpStr { + case "KindNull": + return nil, nil + case "KindInt64": + return strconv.ParseInt(value, 10, 64) + case "KindUint64": + return strconv.ParseUint(value, 10, 64) + case "KindFloat32": + return strconv.ParseFloat(value, 32) + case "KindFloat64", "KindMysqlDecimal": + return strconv.ParseFloat(value, 64) + case "KindString", "KindBinaryLiteral", "KindMysqlBit", "KindMysqlSet", "KindMysqlTime", "KindMysqlJSON": + return value, nil + case "KindBytes": + return hack.Slice(value), nil + case "KindMysqlDuration", "KindMysqlEnum", "KindInterface", "KindMinNotNull", "KindMaxValue", "KindRaw": + return nil, errors.Errorf("unsupported param type: %s", tpStr) + } + return nil, errors.Errorf("unknown param type: %s", tpStr) +} + +func (decoder *AuditLogPluginDecoder) parseGeneralEvent(kvs map[string]string, connID uint64) ([]*Command, error) { connInfo := decoder.connInfo[connID] - var cmd *Command + event, ok := kvs[auditPluginKeyEvent] + if !ok || event != auditPluginEventEnd { + // Old version doesn't have the EVENT key. + // The STARTING event is wrong, we only care about the COMPLETED event. + return nil, nil + } + cmdStr := parseCommand(kvs[auditPluginKeyCommand]) + cmds := make([]*Command, 0, 4) + db := kvs[auditPluginKeyCurDB] + if len(db) > 0 && db != connInfo.currentDB { + cmds = append(cmds, &Command{ + Type: pnet.ComInitDB, + Payload: pnet.MakeInitDBRequest(db), + }) + connInfo.currentDB = db + decoder.connInfo[connID] = connInfo + } + switch cmdStr { case "Query", "Init DB": sql, err := strconv.Unquote(kvs[auditPluginKeySQL]) if err != nil { return nil, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL]) - // We also ignore "Quit" since disconnection is handled in parseConnectEvent. } - cmd = &Command{ + cmds = append(cmds, &Command{ Type: pnet.ComQuery, StmtType: kvs[auditPluginKeyStmtType], Payload: append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...), + }) + case "Execute": + params, ok := kvs[auditPluginKeyParams] + if !ok { + // the old format doesn't output params + break } - // Ignore StmtExecute since the params are not outputted. + sql, err := strconv.Unquote(kvs[auditPluginKeySQL]) + if err != nil { + return nil, errors.Wrapf(err, "unquote sql failed: %s", kvs[auditPluginKeySQL]) + } + args, err := parseExecuteParams(params) + if err != nil { + return nil, err + } + executeReq, err := pnet.MakeExecuteStmtRequest(0, args, true) + if err != nil { + return nil, errors.Wrapf(err, "make execute request failed") + } + cmds = append(cmds, &Command{ + Type: pnet.ComStmtPrepare, + StmtType: kvs[auditPluginKeyStmtType], + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, hack.Slice(sql)...), + }, &Command{ + Type: pnet.ComStmtExecute, + StmtType: kvs[auditPluginKeyStmtType], + Payload: executeReq, + }, &Command{ + Type: pnet.ComStmtClose, + StmtType: kvs[auditPluginKeyStmtType], + Payload: pnet.MakeCloseStmtRequest(0), + }) // Ignore Quit since disconnection is handled in parseConnectEvent. } - // Audit logs record both the beginning and end of each statement, but we only need the first one. - if cmd == nil { - return nil, nil - } - if connInfo.beginCmd != nil && bytes.Equal(cmd.Payload, connInfo.beginCmd.Payload) { - cmd = nil - } else if !connInfo.inited && cmd.StmtType == "Use" { - connInfo.inited = true - } else if kvs[auditPluginKeyClass] == auditPluginClassTableAccess && !connInfo.inited { - cmd = nil - } - connInfo.beginCmd = cmd - decoder.connInfo[connID] = connInfo - return cmd, nil + return cmds, nil } func (decoder *AuditLogPluginDecoder) parseConnectEvent(kvs map[string]string, connID uint64) (*Command, error) { switch kvs[auditPluginKeySubClass] { - case auditPluginSubClassConnected: - // The connection is treated as initialized no matter the current db is set or not. - connInfo := decoder.connInfo[connID] - connInfo.inited = true - decoder.connInfo[connID] = connInfo - - db := kvs[auditPluginKeyDatabase] - dbs := parseDB(db) - if len(dbs) == 1 { - return &Command{ - Type: pnet.ComInitDB, - Payload: append([]byte{pnet.ComInitDB.Byte()}, hack.Slice(dbs[0])...), - }, nil - } - return nil, nil case auditPluginSubClassDisconnect: + delete(decoder.connInfo, connID) return &Command{ Type: pnet.ComQuit, Payload: []byte{pnet.ComQuit.Byte()}, diff --git a/pkg/sqlreplay/cmd/audit_log_plugin_test.go b/pkg/sqlreplay/cmd/audit_log_plugin_test.go index c2ccf34ba..94988414c 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin_test.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin_test.go @@ -4,6 +4,7 @@ package cmd import ( + "io" "testing" "time" @@ -294,98 +295,302 @@ func TestParseLog(t *testing.T) { } } -func TestParseDB(t *testing.T) { +func TestCommand(t *testing.T) { tests := []struct { s string - expect []string + expect string }{ + { + s: ``, + expect: "", + }, + { + s: `"Init DB"`, + expect: "Init DB", + }, + { + s: `Query`, + expect: "Query", + }, + } + for i, test := range tests { + cmd := parseCommand(test.s) + require.EqualValues(t, test.expect, cmd, "case %d", i) + } +} + +func TestParseStartTs(t *testing.T) { + tests := []struct { + kvs map[string]string + ts time.Time + errMsg string + }{ + { + kvs: map[string]string{ + auditPluginKeyTimeStamp: "2025/09/06 17:03:50.888 +08:10", + auditPluginKeyCostTime: "666000", + }, + ts: time.Date(2025, 9, 6, 17, 3, 50, 222000000, time.FixedZone("", 8*3600+600)), + }, + { + kvs: map[string]string{ + auditPluginKeyTimeStamp: "2025/09/06 17:03:53.717 +08:10", + auditPluginKeyCostTime: "", + }, + ts: time.Date(2025, 9, 6, 17, 3, 53, 717000000, time.FixedZone("", 8*3600+600)), + }, + { + kvs: map[string]string{ + auditPluginKeyTimeStamp: "2025/09/06 17:03:53.717 +08:10", + }, + ts: time.Date(2025, 9, 6, 17, 3, 53, 717000000, time.FixedZone("", 8*3600+600)), + }, + { + kvs: map[string]string{ + auditPluginKeyTimeStamp: "2025/09/06", + }, + errMsg: "parsing timestamp failed", + }, + } + + for i, test := range tests { + ts, err := parseStartTs(test.kvs) + if test.errMsg != "" { + require.Error(t, err, "case %d", i) + require.Contains(t, err.Error(), test.errMsg, "case %d", i) + continue + } else { + require.NoError(t, err, "case %d", i) + } + require.EqualValues(t, test.ts, ts, "case %d", i) + } +} + +func TestParseParams(t *testing.T) { + tests := []struct { + s string + expect []any + errMsg string + }{ + { + s: ``, + errMsg: "no quotes in params", + }, { s: `""`, - expect: nil, + expect: []any{}, }, { s: `"[]"`, - expect: nil, + expect: []any{}, + }, + { + s: `"[\"KindInt64\"]"`, + errMsg: "no space in param", + }, + { + s: `"[\"KindInt64 \"]"`, + errMsg: "invalid syntax", }, { - s: `"[test]"`, - expect: []string{"test"}, + s: `"[\"1\"]"`, + errMsg: "no space in param", }, { - s: `"[hello,world]"`, - expect: []string{"hello", "world"}, + s: `"[\"Unknown 1\"]"`, + errMsg: "unknown param type", + }, + { + s: `"[\"KindInt64 1\"]"`, + expect: []any{ + int64(1), + }, + }, + { + s: `"[\"KindInt64 -9223372036854775808\"]"`, + expect: []any{ + int64(-9223372036854775808), + }, + }, + { + s: `"[\"KindFloat64 -123.45600128173828\"]"`, + expect: []any{ + float64(-123.45600128173828), + }, + }, + { + s: `"[\"KindString \"]"`, + expect: []any{ + "", + }, + }, + { + s: `"[\"KindString '单引号' \\\\\\\"双引号\\\\\\\"\\\\\\\\n\\\\\\\\t😊\\\\x00\\\\x00\"]"`, + expect: []any{ + "'单引号' \\\\\\\"双引号\\\\\\\"\\\\\\\\n\\\\\\\\t😊\\\\x00\\\\x00", + }, + }, + { + s: `"[\"KindNull \"]"`, + expect: []any{ + nil, + }, + }, + { + s: `"[\"KindString 37\",\"KindInt64 2\",\"KindString user_5556\",\"KindInt64 1\"]"`, + expect: []any{ + string("37"), + int64(2), + string("user_5556"), + int64(1), + }, }, } + for i, test := range tests { - dbs := parseDB(test.s) - if len(dbs) == 0 && len(test.expect) == 0 { + params, err := parseExecuteParams(test.s) + if test.errMsg != "" { + require.Error(t, err, "case %d", i) + require.Contains(t, err.Error(), test.errMsg, "case %d", i) continue + } else { + require.NoError(t, err, "case %d", i) } - require.EqualValues(t, test.expect, parseDB(test.s), "case %d", i) + if len(params) == 0 && len(test.expect) == 0 { + continue + } + require.Equal(t, test.expect, params, "case %d", i) } } -func TestDecodeAuditLogPlugin(t *testing.T) { +func TestDecodeSingleLine(t *testing.T) { tests := []struct { line string - cmd *Command + cmds []*Command errMsg string }{ { - line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - cmd: &Command{ - Type: pnet.ComQuery, - ConnID: 3695181836, - StartTs: time.Date(2025, 9, 6, 17, 3, 53, 720000000, time.FixedZone("", 8*3600+600)), - Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), - StmtType: "Select", - Succeess: true, + // sql with current db + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, + cmds: []*Command{ + { + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, + }, + { + Type: pnet.ComQuery, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), + StmtType: "Select", + Success: true, + }, }, }, { - // connect with an initial database - line: `[2025/09/08 21:15:12.904 +08:00] [INFO] [logger.go:77] [ID=17573373120] [TIMESTAMP=2025/09/08 21:15:12.904 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=49278] [CONNECTION_ID=3552575510] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, - cmd: &Command{ - Type: pnet.ComInitDB, - ConnID: 3552575510, - StartTs: time.Date(2025, 9, 8, 21, 15, 12, 904000000, time.FixedZone("", 8*3600+600)), - Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), - Succeess: true, + // sql without current database + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=] [EVENT=COMPLETED]`, + cmds: []*Command{ + { + Type: pnet.ComQuery, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), + StmtType: "Select", + Success: true, + }, }, }, { - // no initial database - line: `[2025/09/08 21:15:12.904 +08:00] [INFO] [logger.go:77] [ID=17573373120] [TIMESTAMP=2025/09/08 21:15:12.904 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=49278] [CONNECTION_ID=3552575510] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, - errMsg: "EOF", + // prepared statement + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"?\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Execute] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[\"KindInt64 1\"]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, + cmds: []*Command{ + { + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, + }, + { + Type: pnet.ComStmtPrepare, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtExecute, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtClose, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{0, 0, 0, 0}...), + StmtType: "Select", + Success: true, + }, + }, }, { - line: `[2025/09/08 21:15:35.621 +08:00] [INFO] [logger.go:77] [ID=17573373350] [TIMESTAMP=2025/09/08 21:15:35.621 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Disconnect] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=49278] [CONNECTION_ID=3552575510] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=22716.871792] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, - cmd: &Command{ - Type: pnet.ComQuit, - ConnID: 3552575510, - StartTs: time.Date(2025, 9, 8, 21, 15, 35, 621000000, time.FixedZone("", 8*3600+600)), - Payload: []byte{pnet.ComQuit.Byte()}, - Succeess: true, + // prepared statement without params field + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"?\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Execute] [SQL_STATEMENTS=Select] [CURRENT_DB=test] [EVENT=COMPLETED]`, + cmds: []*Command{ + { + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, + }, }, }, { - line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - errMsg: "no timestamp", + // ignore starting event + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=STARTING]`, + cmds: []*Command{}, + }, + { + // ignore new connection + line: `[2025/09/08 21:15:12.904 +08:00] [INFO] [logger.go:77] [ID=17573373120] [TIMESTAMP=2025/09/08 21:15:12.904 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=49278] [CONNECTION_ID=3552575510] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, + cmds: []*Command{}, }, { - line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, + // quit + line: `[2025/09/08 21:15:35.621 +08:00] [INFO] [logger.go:77] [ID=17573373350] [TIMESTAMP=2025/09/08 21:15:35.621 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Disconnect] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=49278] [CONNECTION_ID=3552575510] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=22716.871792] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, + cmds: []*Command{ + { + Type: pnet.ComQuit, + ConnID: 3552575510, + StartTs: time.Date(2025, 9, 8, 21, 15, 35, 621000000, time.FixedZone("", 8*3600+600)), + Payload: []byte{pnet.ComQuit.Byte()}, + Success: true, + }, + }, + }, + { + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, errMsg: "parsing timestamp failed", }, { - line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, errMsg: "no connection id", }, { - line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=abc] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=abc] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, errMsg: "parsing connection id failed", }, { - line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=HELLO] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, + line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=HELLO] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, errMsg: "unknown event class", }, } @@ -393,121 +598,204 @@ func TestDecodeAuditLogPlugin(t *testing.T) { for i, test := range tests { decoder := NewAuditLogPluginDecoder() mr := mockReader{data: append([]byte(test.line), '\n')} - cmd, err := decoder.Decode(&mr) + cmds := make([]*Command, 0, 4) + var err error + for { + var cmd *Command + cmd, err = decoder.Decode(&mr) + if cmd == nil { + break + } + cmds = append(cmds, cmd) + } + require.Error(t, err, "case %d", i) if len(test.errMsg) > 0 { - require.Error(t, err, "case %d", i) require.ErrorContains(t, err, test.errMsg, "case %d", i) continue } else { - require.NoError(t, err, "case %d", i) + require.ErrorIs(t, err, io.EOF, "case %d", i) } - require.Equal(t, test.cmd, cmd, "case %d", i) + require.Equal(t, test.cmds, cmds, "case %d", i) } } -func TestIgnoreCmds(t *testing.T) { +func TestDecodeMultiLines(t *testing.T) { tests := []struct { lines string cmds []*Command }{ { - // db not initialized, start with not a table access - lines: `[2025/09/08 21:16:29.585 +08:00] [INFO] [logger.go:77] [ID=17573373891] [TIMESTAMP=2025/09/08 21:16:29.585 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1057.834] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="set sql_mode=''"] [ROWS=0] [CONNECTION_ID=3552575564] [CLIENT_PORT=52611] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Set]`, + // db is changed in the second sql + lines: `[2025/09/08 21:16:29.585 +08:00] [INFO] [logger.go:77] [ID=17573373891] [TIMESTAMP=2025/09/06 16:16:29.585 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1057.834] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="set sql_mode=''"] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=52611] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Set] [EXECUTE_PARAMS="[]"] [CURRENT_DB=] [EVENT=COMPLETED] +[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=STARTING] +[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, cmds: []*Command{ { - StartTs: time.Date(2025, 9, 8, 21, 16, 29, 585000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575564, + StartTs: time.Date(2025, 9, 6, 16, 16, 29, 583942167, time.FixedZone("", 8*3600+600)), + ConnID: 3695181836, Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("set sql_mode=''")...), StmtType: "Set", - Succeess: true, + Success: true, + }, + { + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, + }, + { + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + ConnID: 3695181836, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), + StmtType: "Select", + Success: true, }, }, }, { - // db not initialized, start with a table access, ignore it - lines: `[2025/09/08 21:16:52.630 +08:00] [INFO] [logger.go:77] [ID=17573374120] [TIMESTAMP=2025/09/08 21:16:52.630 +08:10] [EVENT_CLASS=TABLE_ACCESS] [EVENT_SUBCLASS=Set] [STATUS_CODE=0] [COST_TIME=1509.417] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[t]"] [SQL_TEXT="insert t value(1)"] [ROWS=1] [CONNECTION_ID=3552575570] [CLIENT_PORT=52709] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Insert]`, - cmds: []*Command{}, - }, - { - // start with a use statement, the duplicated sql is ignored - lines: `[2025/09/08 21:17:55.686 +08:00] [INFO] [logger.go:77] [ID=17573374751] [TIMESTAMP=2025/09/08 21:15:55.686 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=70.708] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="use tiproxy_traffic_replay"] [ROWS=0] [CONNECTION_ID=3552575570] [CLIENT_PORT=52709] [PID=89967] [COMMAND="Init DB"] [SQL_STATEMENTS=Use] - [2025/09/08 21:16:52.630 +08:00] [INFO] [logger.go:77] [ID=17573374120] [TIMESTAMP=2025/09/08 21:16:52.630 +08:10] [EVENT_CLASS=TABLE_ACCESS] [EVENT_SUBCLASS=Insert] [STATUS_CODE=0] [COST_TIME=1509.417] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[t]"] [SQL_TEXT="insert t value(1)"] [ROWS=1] [CONNECTION_ID=3552575570] [CLIENT_PORT=52709] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Insert] - [2025/09/08 21:16:52.634 +08:00] [INFO] [logger.go:77] [ID=17573374121] [TIMESTAMP=2025/09/08 21:16:52.634 +08:10] [EVENT_CLASS=TABLE_ACCESS] [EVENT_SUBCLASS=Insert] [STATUS_CODE=0] [COST_TIME=5637.042] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[t]"] [SQL_TEXT="insert t value(1)"] [ROWS=1] [CONNECTION_ID=3552575570] [CLIENT_PORT=52709] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Insert]`, + // db stays the same in the second sql + lines: `[2025/09/08 21:16:29.585 +08:00] [INFO] [logger.go:77] [ID=17573373891] [TIMESTAMP=2025/09/06 16:16:29.585 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1057.834] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="set sql_mode=''"] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=52611] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Set] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED] +[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=STARTING] +[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, cmds: []*Command{ { - StartTs: time.Date(2025, 9, 8, 21, 15, 55, 686000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("use tiproxy_traffic_replay")...), + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 16, 16, 29, 583942167, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, + }, + { + StartTs: time.Date(2025, 9, 6, 16, 16, 29, 583942167, time.FixedZone("", 8*3600+600)), + ConnID: 3695181836, Type: pnet.ComQuery, - StmtType: "Use", - Succeess: true, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("set sql_mode=''")...), + StmtType: "Set", + Success: true, }, { - StartTs: time.Date(2025, 9, 8, 21, 16, 52, 630000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("insert t value(1)")...), + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + ConnID: 3695181836, Type: pnet.ComQuery, - StmtType: "Insert", - Succeess: true, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), + StmtType: "Select", + Success: true, }, }, }, { - // a new connection without current db - lines: `[2025/09/08 17:23:58.279 +08:00] [INFO] [logger.go:77] [ID=17574098380] [TIMESTAMP=2025/09/08 17:23:58.277 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=52797] [CONNECTION_ID=3552575570] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=] -[2025/09/08 21:16:52.630 +08:00] [INFO] [logger.go:77] [ID=17573374120] [TIMESTAMP=2025/09/08 21:16:52.630 +08:10] [EVENT_CLASS=TABLE_ACCESS] [EVENT_SUBCLASS=Insert] [STATUS_CODE=0] [COST_TIME=1509.417] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[t]"] [SQL_TEXT="insert test.t value(1)"] [ROWS=1] [CONNECTION_ID=3552575570] [CLIENT_PORT=52709] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Insert]`, + // new connection + quit connection + lines: `[2025/09/08 17:23:58.279 +08:00] [INFO] [logger.go:77] [ID=17574098380] [TIMESTAMP=2025/09/08 17:23:58.277 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=52797] [CONNECTION_ID=3552575570] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=] +[2025/09/08 21:16:52.630 +08:00] [INFO] [logger.go:77] [ID=17573374120] [TIMESTAMP=2025/09/08 21:16:52.630 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Disconnect] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=52620] [CONNECTION_ID=3552575570] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0.0445] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, cmds: []*Command{ { - StartTs: time.Date(2025, 9, 8, 21, 16, 52, 630000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("insert test.t value(1)")...), - Type: pnet.ComQuery, - StmtType: "Insert", - Succeess: true, + StartTs: time.Date(2025, 9, 8, 21, 16, 52, 630000000, time.FixedZone("", 8*3600+600)), + ConnID: 3552575570, + Payload: []byte{pnet.ComQuit.Byte()}, + Type: pnet.ComQuit, + Success: true, }, }, }, { - // a new connection with current db - lines: `[2025/09/08 17:23:58.279 +08:00] [INFO] [logger.go:77] [ID=17574098380] [TIMESTAMP=2025/09/08 17:23:58.277 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=52797] [CONNECTION_ID=3552575570] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=] -[2025/09/08 21:16:52.630 +08:00] [INFO] [logger.go:77] [ID=17573374120] [TIMESTAMP=2025/09/08 21:16:52.630 +08:10] [EVENT_CLASS=TABLE_ACCESS] [EVENT_SUBCLASS=Insert] [STATUS_CODE=0] [COST_TIME=1509.417] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[t]"] [SQL_TEXT="insert t value(1)"] [ROWS=1] [CONNECTION_ID=3552575570] [CLIENT_PORT=52709] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Insert]`, + // 2 prepared statements + lines: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"?\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Execute] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[\"KindInt64 1\"]"] [CURRENT_DB=test] [EVENT=COMPLETED] +[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"?\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Execute] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[\"KindInt64 1\"]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, cmds: []*Command{ { - StartTs: time.Date(2025, 9, 8, 17, 23, 58, 277000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), - Type: pnet.ComInitDB, - Succeess: true, + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, }, { - StartTs: time.Date(2025, 9, 8, 21, 16, 52, 630000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("insert t value(1)")...), - Type: pnet.ComQuery, - StmtType: "Insert", - Succeess: true, + Type: pnet.ComStmtPrepare, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtExecute, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtClose, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{0, 0, 0, 0}...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtPrepare, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtExecute, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComStmtClose, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{0, 0, 0, 0}...), + StmtType: "Select", + Success: true, }, }, }, { - // new connection + quit connection - lines: `[2025/09/08 17:23:58.279 +08:00] [INFO] [logger.go:77] [ID=17574098380] [TIMESTAMP=2025/09/08 17:23:58.277 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Connected] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[test]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=52797] [CONNECTION_ID=3552575570] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=] -[2025/09/08 21:16:52.630 +08:00] [INFO] [logger.go:77] [ID=17573374120] [TIMESTAMP=2025/09/08 21:16:52.630 +08:10] [EVENT_CLASS=CONNECTION] [EVENT_SUBCLASS=Disconnect] [STATUS_CODE=0] [COST_TIME=0] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT=] [ROWS=0] [CLIENT_PORT=52620] [CONNECTION_ID=3552575570] [CONNECTION_TYPE=SSL/TLS] [SERVER_ID=1] [SERVER_PORT=4000] [DURATION=0.0445] [SERVER_OS_LOGIN_USER=test] [OS_VERSION=darwin.arm64] [CLIENT_VERSION=] [SERVER_VERSION=v9.0.0] [AUDIT_VERSION=] [SSL_VERSION=TLSv1.3] [PID=89967] [Reason=]`, + // 2 different connections + lines: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED] +[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181837] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`, cmds: []*Command{ { - StartTs: time.Date(2025, 9, 8, 17, 23, 58, 277000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), - Type: pnet.ComInitDB, - Succeess: true, + Type: pnet.ComInitDB, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, }, { - StartTs: time.Date(2025, 9, 8, 21, 16, 52, 630000000, time.FixedZone("", 8*3600+600)), - ConnID: 3552575570, - Payload: []byte{pnet.ComQuit.Byte()}, - Type: pnet.ComQuit, - Succeess: true, + Type: pnet.ComQuery, + ConnID: 3695181836, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), + StmtType: "Select", + Success: true, + }, + { + Type: pnet.ComInitDB, + ConnID: 3695181837, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Success: true, + }, + { + Type: pnet.ComQuery, + ConnID: 3695181837, + StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), + StmtType: "Select", + Success: true, }, }, }, diff --git a/pkg/sqlreplay/cmd/cmd.go b/pkg/sqlreplay/cmd/cmd.go index b6d632081..e521a5ec4 100644 --- a/pkg/sqlreplay/cmd/cmd.go +++ b/pkg/sqlreplay/cmd/cmd.go @@ -60,7 +60,7 @@ type Command struct { // Logged only in audit log. StmtType string // Logged only in native log. - Succeess bool + Success bool } func NewCommand(packet []byte, startTs time.Time, connID uint64) *Command { @@ -69,11 +69,11 @@ func NewCommand(packet []byte, startTs time.Time, connID uint64) *Command { } // TODO: handle load infile specially return &Command{ - Payload: packet, - StartTs: startTs, - ConnID: connID, - Type: pnet.Command(packet[0]), - Succeess: true, + Payload: packet, + StartTs: startTs, + ConnID: connID, + Type: pnet.Command(packet[0]), + Success: true, } } @@ -84,7 +84,7 @@ func (c *Command) Equal(that *Command) bool { return c.StartTs.Equal(that.StartTs) && c.ConnID == that.ConnID && c.Type == that.Type && - c.Succeess == that.Succeess && + c.Success == that.Success && bytes.Equal(c.Payload, that.Payload) } diff --git a/pkg/sqlreplay/cmd/native.go b/pkg/sqlreplay/cmd/native.go index 79d0c72f1..18facc642 100644 --- a/pkg/sqlreplay/cmd/native.go +++ b/pkg/sqlreplay/cmd/native.go @@ -48,7 +48,7 @@ func (rw *NativeEncoder) Encode(c *Command, writer *bytes.Buffer) error { return err } } - if !c.Succeess { + if !c.Success { if err = writeString(nativeKeySuccess, "false", writer); err != nil { return err } @@ -81,7 +81,7 @@ type NativeDecoder struct { func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) { c = &Command{} - c.Succeess = true + c.Success = true c.Type = pnet.ComQuery for { line, filename, lineIdx, err := reader.ReadLine() @@ -124,7 +124,7 @@ func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) { } c.Type = pnet.CommandFromString(value) case nativeKeySuccess: - c.Succeess = value == "true" + c.Success = value == "true" case nativeKeyPayloadLen: var payloadLen int if payloadLen, err = strconv.Atoi(value); err != nil {