Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/replayer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ func main() {
readonly := rootCmd.PersistentFlags().Bool("read-only", false, "only replay read-only queries, default is false")
format := rootCmd.PersistentFlags().String("format", "", "the format of traffic files")
logFile := rootCmd.PersistentFlags().String("log-file", "", "the output log file")
cmdStartTime := rootCmd.PersistentFlags().Time("command-start-time", time.Now(), []string{time.RFC3339, time.RFC3339Nano}, "the start time to replay the traffic, format is RFC3339. The command before this start time will be ignored.")

rootCmd.RunE = func(cmd *cobra.Command, _ []string) error {
replayCfg := replay.ReplayConfig{
Input: *input,
Speed: *speed,
Username: *username,
Password: *password,
Format: *format,
ReadOnly: *readonly,
StartTime: time.Now(),
Input: *input,
Speed: *speed,
Username: *username,
Password: *password,
Format: *format,
ReadOnly: *readonly,
StartTime: time.Now(),
CommandStartTime: *cmdStartTime,
}

r := &replayer{}
Expand Down
14 changes: 8 additions & 6 deletions lib/cli/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func GetTrafficReplayCmd(ctx *Context) *cobra.Command {
password := replayCmd.PersistentFlags().String("password", "", "the password to connect to TiDB for replay")
readonly := replayCmd.PersistentFlags().Bool("read-only", false, "only replay read-only queries, default is false")
format := replayCmd.PersistentFlags().String("format", "", "the format of traffic files")
cmdStartTime := replayCmd.PersistentFlags().String("command-start-time", "", "the start time to replay the traffic, format is RFC3339 or RFC3339Nano. The command before this start time will be ignored.")
replayCmd.RunE = func(cmd *cobra.Command, args []string) error {
username := *username
if len(username) == 0 {
Expand All @@ -79,12 +80,13 @@ func GetTrafficReplayCmd(ctx *Context) *cobra.Command {
password = string(bytePassword)
}
reader := GetFormReader(map[string]string{
"input": *input,
"speed": strconv.FormatFloat(*speed, 'f', -1, 64),
"username": username,
"password": password,
"readonly": strconv.FormatBool(*readonly),
"format": *format,
"input": *input,
"speed": strconv.FormatFloat(*speed, 'f', -1, 64),
"username": username,
"password": password,
"readonly": strconv.FormatBool(*readonly),
"format": *format,
"cmdstarttime": *cmdStartTime,
})
resp, err := doRequest(cmd.Context(), ctx, http.MethodPost, "/api/traffic/replay", reader)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/server/api/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ func (h *Server) TrafficReplay(c *gin.Context) {
cfg.Format = c.PostForm("format")
cfg.ReadOnly = strings.EqualFold(c.PostForm("readonly"), "true")
cfg.KeyFile = globalCfg.Security.EncryptionKeyPath
// By default, if `cmdstarttime` is not specified, use zero time
if cmdStartTimeStr := c.PostForm("cmdstarttime"); cmdStartTimeStr != "" {
cmdStartTime, err := time.Parse(time.RFC3339, cmdStartTimeStr)
if err != nil {
cmdStartTime, err = time.Parse(time.RFC3339Nano, cmdStartTimeStr)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
}
cfg.CommandStartTime = cmdStartTime
}

if err := h.mgr.ReplayJobMgr.StartReplay(cfg); err != nil {
c.String(http.StatusInternalServerError, err.Error())
Expand Down
20 changes: 15 additions & 5 deletions pkg/sqlreplay/cmd/audit_log_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func NewAuditLogPluginDecoder() *AuditLogPluginDecoder {
var _ CmdDecoder = (*AuditLogPluginDecoder)(nil)

type AuditLogPluginDecoder struct {
connInfo map[uint64]auditLogPluginConnCtx
connInfo map[uint64]auditLogPluginConnCtx
commandStartTime time.Time
// pendingCmds contains the commands that has not been returned yet.
pendingCmds []*Command
}
Expand Down Expand Up @@ -81,6 +82,15 @@ func (decoder *AuditLogPluginDecoder) Decode(reader LineReader) (*Command, error
return nil, errors.Errorf("%s, line %d: parsing connection id failed: %s", filename, lineIdx, connStr)
}

startTs, err := parseStartTs(kvs)
if err != nil {
return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx)
}
if startTs.Before(decoder.commandStartTime) {
// Ignore the commands before CommandStartTime.
continue
}

var cmds []*Command
eventClass := kvs[auditPluginKeyClass]
switch eventClass {
Expand All @@ -103,10 +113,6 @@ func (decoder *AuditLogPluginDecoder) Decode(reader LineReader) (*Command, error
continue
}

startTs, err := parseStartTs(kvs)
if err != nil {
return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx)
}
for _, cmd := range cmds {
cmd.Success = true
cmd.ConnID = connID
Expand All @@ -119,6 +125,10 @@ func (decoder *AuditLogPluginDecoder) Decode(reader LineReader) (*Command, error
}
}

func (decoder *AuditLogPluginDecoder) SetCommandStartTime(t time.Time) {
decoder.commandStartTime = t
}

// All SQL_TEXT are converted into one line in audit log.
func parseLog(line string) (map[string]string, error) {
kv := make(map[string]string)
Expand Down
71 changes: 71 additions & 0 deletions pkg/sqlreplay/cmd/audit_log_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,3 +858,74 @@ func TestDecodeMultiLines(t *testing.T) {
require.Equal(t, test.cmds, cmds, "case %d", i)
}
}

func TestDecodeAuditLogWithCommandStartTime(t *testing.T) {
tests := []struct {
lines string
cmds []*Command
}{
{
// db is changed in the second sql
lines: `[2025/09/14 16:16:29.585 +08:00] [INFO] [logger.go:77] [ID=17573373891] [TIMESTAMP=2025/09/14 16:16:29.585 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1057.834] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="set sql_mode=''"] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=52611] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Set] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]
[2025/09/14 16:16:30.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/14 16:16:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=STARTING]
[2025/09/14 16:16:31.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/14 16:16:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=test] [EVENT=COMPLETED]`,
cmds: []*Command{
{
Type: pnet.ComInitDB,
ConnID: 3695181836,
StartTs: time.Date(2025, 9, 14, 16, 16, 53, 718663917, time.FixedZone("", 8*3600+600)),
Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...),
Success: true,
},
{
StartTs: time.Date(2025, 9, 14, 16, 16, 53, 718663917, time.FixedZone("", 8*3600+600)),
ConnID: 3695181836,
Type: pnet.ComQuery,
Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...),
StmtType: "Select",
Success: true,
},
},
},
{
// The latest DB is used
lines: `[2025/09/14 16:16:29.585 +08:00] [INFO] [logger.go:77] [ID=17573373891] [TIMESTAMP=2025/09/14 16:16:29.585 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1057.834] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="set sql_mode=''"] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=52611] [PID=89967] [COMMAND=Query] [SQL_STATEMENTS=Set] [EXECUTE_PARAMS="[]"] [CURRENT_DB=a] [EVENT=COMPLETED]
[2025/09/14 16:16:30.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/14 16:16:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=b] [EVENT=STARTING]
[2025/09/14 16:16:31.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/14 16:16:53.720 +08:10] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select] [EXECUTE_PARAMS="[]"] [CURRENT_DB=b] [EVENT=COMPLETED]`,
cmds: []*Command{
{
Type: pnet.ComInitDB,
ConnID: 3695181836,
StartTs: time.Date(2025, 9, 14, 16, 16, 53, 718663917, time.FixedZone("", 8*3600+600)),
Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("b")...),
Success: true,
},
{
StartTs: time.Date(2025, 9, 14, 16, 16, 53, 718663917, time.FixedZone("", 8*3600+600)),
ConnID: 3695181836,
Type: pnet.ComQuery,
Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...),
StmtType: "Select",
Success: true,
},
},
},
}

commandStartTime := time.Date(2025, 9, 14, 16, 16, 53, 0, time.FixedZone("", 8*3600+600))
for i, test := range tests {
decoder := NewAuditLogPluginDecoder()
decoder.SetCommandStartTime(commandStartTime)
mr := mockReader{data: append([]byte(test.lines), '\n')}
cmds := make([]*Command, 0, len(test.cmds))
for {
cmd, err := decoder.Decode(&mr)
if err != nil {
require.ErrorContains(t, err, "EOF", "case %d", i)
break
}
cmds = append(cmds, cmd)
}
require.Equal(t, test.cmds, cmds, "case %d", i)
}
}
2 changes: 2 additions & 0 deletions pkg/sqlreplay/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func NewCmdDecoder(format string) CmdDecoder {

type CmdDecoder interface {
Decode(reader LineReader) (c *Command, err error)

SetCommandStartTime(t time.Time)
}

type Command struct {
Expand Down
20 changes: 20 additions & 0 deletions pkg/sqlreplay/cmd/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ func NewNativeDecoder() *NativeDecoder {
var _ CmdDecoder = (*NativeDecoder)(nil)

type NativeDecoder struct {
commandStartTime time.Time
}

func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) {
c = &Command{}
c.Success = true
c.Type = pnet.ComQuery

var skipThisCommand bool
for {
line, filename, lineIdx, err := reader.ReadLine()
if err != nil {
Expand Down Expand Up @@ -110,6 +113,10 @@ func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) {
if err != nil {
return nil, errors.Errorf("%s, line %d: parsing Time failed: %s", filename, lineIdx, line)
}

if c.StartTs.Before(rw.commandStartTime) {
skipThisCommand = true
}
case nativeKeyConnID:
if c.ConnID > 0 {
return nil, errors.Errorf("%s, line %d: redundant Conn_ID: %s, Conn_ID was %d", filename, lineIdx, line, c.ConnID)
Expand Down Expand Up @@ -148,6 +155,15 @@ func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) {
if data[0] != '\n' {
return nil, errors.Errorf("%s, line %d: expected new line, but got: %s", filename, lineIdx, line)
}

if skipThisCommand {
c = &Command{}
c.Success = true
c.Type = pnet.ComQuery
skipThisCommand = false
// skip the payload
continue
}
if err = c.Validate(filename, lineIdx); err != nil {
return nil, err
}
Expand All @@ -156,6 +172,10 @@ func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) {
}
}

func (rw *NativeDecoder) SetCommandStartTime(t time.Time) {
rw.commandStartTime = t
}

func writeString(key, value string, writer *bytes.Buffer) error {
var err error
if _, err = writer.WriteString(key); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions pkg/sqlreplay/cmd/native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,43 @@ select 1
require.Error(t, err, test)
}
}

func TestDecodeNativeWithCommandStartTime(t *testing.T) {
tests := []struct {
lines string
cmds []*Command
}{
{
lines: `# Time: 2024-08-28T18:51:20.477067+08:10
# Conn_ID: 100
# Payload_len: 8
select 1
# Time: 2024-08-28T18:51:21.477067+08:10
# Conn_ID: 100
# Payload_len: 8
select 2
`,
cmds: []*Command{
{
Type: pnet.ComQuery,
ConnID: 100,
Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select 2")...),
StartTs: time.Date(2024, 8, 28, 18, 51, 21, 477067000, time.FixedZone("", 8*3600+600)),
Success: true,
},
},
},
}

commandStartTime := time.Date(2024, 8, 28, 18, 51, 21, 0, time.FixedZone("", 8*3600+600))
for i, test := range tests {
decoder := NewCmdDecoder(FormatNative)
decoder.SetCommandStartTime(commandStartTime)
mr := mockReader{data: []byte(test.lines)}
for j, cmd := range test.cmds {
newCmd, err := decoder.Decode(&mr)
require.NoError(t, err, "case %d-%d", i, j)
require.True(t, cmd.Equal(newCmd), "case %d-%d", i, j)
}
}
}
8 changes: 8 additions & 0 deletions pkg/sqlreplay/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type ReplayConfig struct {
Speed float64
ReadOnly bool
encryptionKey []byte
// CommandStartTime is the start time of the command being replayed. It's different from StartTime,
// which means the start time of the whole replay job.
CommandStartTime time.Time
// the following fields are for testing
reader cmd.LineReader
report report.Report
Expand Down Expand Up @@ -226,6 +229,11 @@ func (r *replay) readCommands(ctx context.Context) {
maxPendingCmds := int64(0)
totalWaitTime := time.Duration(0)
decoder := cmd.NewCmdDecoder(r.cfg.Format)
// It's better to filter out the commands in `readCommands` instead of `Decoder`. However,
// the connection state is maintained in decoder. Filtering out commands here will make it'
// impossible for decoder to know whether `use xxx` will be executed, and thus cannot maintain
// the current session state correctly.
decoder.SetCommandStartTime(r.cfg.CommandStartTime)
for ctx.Err() == nil {
for hasCloseEvent := true; hasCloseEvent; {
select {
Expand Down