diff --git a/pkg/sqlreplay/replay/replay.go b/pkg/sqlreplay/replay/replay.go index 20dbf8426..e3339699c 100644 --- a/pkg/sqlreplay/replay/replay.go +++ b/pkg/sqlreplay/replay/replay.go @@ -214,6 +214,7 @@ func (r *replay) readCommands(ctx context.Context) { Dir: r.cfg.Input, EncryptionKey: r.cfg.encryptionKey, EncryptionMethod: r.meta.EncryptMethod, + CommandStartTime: r.cfg.CommandStartTime, }) if err != nil { r.stop(err) diff --git a/pkg/sqlreplay/store/line.go b/pkg/sqlreplay/store/line.go index 13a606dea..d9d6f6a84 100644 --- a/pkg/sqlreplay/store/line.go +++ b/pkg/sqlreplay/store/line.go @@ -7,6 +7,7 @@ import ( "bufio" "fmt" "io" + "time" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd" @@ -32,6 +33,8 @@ type ReaderCfg struct { Format string EncryptionMethod string EncryptionKey []byte + // Reader will skip the files whose end time is before CommandStartTime. + CommandStartTime time.Time } var _ cmd.LineReader = (*loader)(nil) diff --git a/pkg/sqlreplay/store/rotate.go b/pkg/sqlreplay/store/rotate.go index 42885c756..c911e7aef 100644 --- a/pkg/sqlreplay/store/rotate.go +++ b/pkg/sqlreplay/store/rotate.go @@ -169,12 +169,16 @@ func (r *rotateReader) nextReader() error { var minFileName string fileNamePrefix := getFileNamePrefix(r.cfg.Format) parseFunc := getParseFileNameFunc(r.cfg.Format) + fileFilter := getFilterFileNameFunc(r.cfg.Format, r.cfg.CommandStartTime) ctx, cancel := context.WithTimeout(context.Background(), opTimeout) err := r.storage.WalkDir(ctx, &storage.WalkOption{}, func(name string, size int64) error { if !strings.HasPrefix(name, fileNamePrefix) { return nil } + if !fileFilter(name, fileNamePrefix) { + return nil + } fileIdx := parseFunc(name, fileNamePrefix) if fileIdx == 0 { r.lg.Warn("traffic file name is invalid", zap.String("filename", name), zap.String("format", r.cfg.Format)) @@ -230,11 +234,21 @@ func getFileNamePrefix(format string) string { func getParseFileNameFunc(format string) func(string, string) int64 { switch format { case cmd.FormatAuditLogPlugin: - return parseFileTime + return parseFileTimeToIdx } return parseFileIdx } +func getFilterFileNameFunc(format string, commandStartTime time.Time) func(string, string) bool { + switch format { + case cmd.FormatAuditLogPlugin: + return func(name, fileNamePrefix string) bool { + return filterFileByTime(name, fileNamePrefix, commandStartTime) + } + } + return func(string, string) bool { return true } +} + // Parse the file name to get the file index. // filename pattern: traffic-1.log.gz func parseFileIdx(name, fileNamePrefix string) int64 { @@ -262,25 +276,44 @@ func parseFileIdx(name, fileNamePrefix string) int64 { // Parse the file name to get the file timestamp. // filename pattern: tidb-audit-2025-09-10T17-01-56.073.log -func parseFileTime(name, fileNamePrefix string) int64 { +func parseFileTime(name, fileNamePrefix string) time.Time { if !strings.HasPrefix(name, fileNamePrefix) { - return 0 + return time.Time{} } startIdx := len(fileNamePrefix) if len(name) <= startIdx+len(fileNameSuffix) { - return 0 + return time.Time{} } endIdx := len(name) if strings.HasSuffix(name, fileCompressFormat) { endIdx -= len(fileCompressFormat) } if !strings.HasSuffix(name[:endIdx], fileNameSuffix) { - return 0 + return time.Time{} } endIdx -= len(fileNameSuffix) ts, err := time.Parse(logTimeLayout, name[startIdx:endIdx]) if err != nil { + return time.Time{} + } + return ts +} + +func parseFileTimeToIdx(name, fileNamePrefix string) int64 { + ts := parseFileTime(name, fileNamePrefix) + if ts.IsZero() { return 0 } return ts.UnixNano() / 1000000 } + +func filterFileByTime(name, fileNamePrefix string, commandStartTime time.Time) bool { + fileTime := parseFileTime(name, fileNamePrefix) + if fileTime.IsZero() { + return false + } + // Be careful that the log file name doesn't contain timezone info. + // We assume the log file time is the Local time. But anyway we could workaround it by + // adjusting the commandStartTime. + return fileTime.After(commandStartTime) +} diff --git a/pkg/sqlreplay/store/rotate_test.go b/pkg/sqlreplay/store/rotate_test.go index 217cd0f01..6d7ec7aff 100644 --- a/pkg/sqlreplay/store/rotate_test.go +++ b/pkg/sqlreplay/store/rotate_test.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/pingcap/tiproxy/lib/util/logger" "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd" @@ -140,7 +141,7 @@ func TestParseFileTime(t *testing.T) { } for i, test := range tests { - idx := parseFileTime(test.fileName, auditFileNamePrefix) + idx := parseFileTimeToIdx(test.fileName, auditFileNamePrefix) require.Equal(t, test.fileIdx, idx, "case %d", i) } } @@ -367,3 +368,91 @@ func TestCompressAndEncrypt(t *testing.T) { require.Equal(t, []byte("testtest"), data[:8]) require.NoError(t, reader.Close()) } + +func TestFilterFileNameByStartTime(t *testing.T) { + commandStartTime, err := time.Parse(logTimeLayout, "2025-09-10T17-01-56.050") + require.NoError(t, err) + tests := []struct { + fileName string + expectToInclude bool + }{ + // Files after start time should be included + { + fileName: "tidb-audit-2025-09-10T17-01-56.073.log", + expectToInclude: true, + }, + { + fileName: "tidb-audit-2025-09-10T17-01-56.172.log.gz", + expectToInclude: true, + }, + { + fileName: "tidb-audit-2025-09-11T10-30-00.500.log", + expectToInclude: true, + }, + // Files before or equal to start time should be excluded + { + fileName: "tidb-audit-2025-09-10T17-01-55.073.log", + expectToInclude: false, + }, + { + fileName: "tidb-audit-2025-09-10T17-01-56.000.log", + expectToInclude: false, + }, + // Invalid file names should be excluded + { + fileName: "tidb-audit-invalid-timestamp.log", + expectToInclude: false, + }, + { + fileName: "traffic-1.log", + expectToInclude: false, + }, + { + fileName: "tidb-audit.log", + expectToInclude: false, + }, + { + fileName: "tidb-audit-2025-13-40T25-70-70.log", + expectToInclude: false, + }, + } + expectedFileOrder := []string{ + "tidb-audit-2025-09-10T17-01-56.073.log", + "tidb-audit-2025-09-10T17-01-56.172.log.gz", + "tidb-audit-2025-09-11T10-30-00.500.log", + } + for i, test := range tests { + included := filterFileByTime(test.fileName, auditFileNamePrefix, commandStartTime) + require.Equal(t, test.expectToInclude, included, "case %d", i) + } + + dir := t.TempDir() + require.NoError(t, os.RemoveAll(dir)) + require.NoError(t, os.MkdirAll(dir, 0777)) + for _, test := range tests { + f, err := os.Create(filepath.Join(dir, test.fileName)) + require.NoError(t, err) + if strings.HasSuffix(test.fileName, ".gz") { + w := gzip.NewWriter(f) + _, err := w.Write([]byte{}) + require.NoError(t, err) + require.NoError(t, w.Close()) + } + require.NoError(t, f.Close()) + } + storage, err := NewStorage(dir) + require.NoError(t, err) + defer storage.Close() + lg, _ := logger.CreateLoggerForTest(t) + l, err := newRotateReader(lg, storage, ReaderCfg{Dir: dir, Format: cmd.FormatAuditLogPlugin, CommandStartTime: commandStartTime}) + require.NoError(t, err) + var fileOrder []string + for { + if err := l.nextReader(); err != nil { + require.True(t, errors.Is(err, io.EOF)) + break + } + fileOrder = append(fileOrder, l.curFileName) + } + require.Equal(t, expectedFileOrder, fileOrder) +}