Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sqlreplay/replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (r *replay) readCommands(ctx context.Context) {
Dir: r.cfg.Input,
EncryptionKey: r.cfg.encryptionKey,
EncryptionMethod: r.meta.EncryptMethod,
CommandStartTime: r.cfg.CommandStartTime,
})
if err != nil {
r.stop(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sqlreplay/store/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"fmt"
"io"
"time"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
Expand All @@ -32,6 +33,8 @@ type ReaderCfg struct {
Format string
EncryptionMethod string
EncryptionKey []byte
// Reader will skip the files whose end time is before CommandStartTime.
CommandStartTime time.Time
}

var _ cmd.LineReader = (*loader)(nil)
Expand Down
43 changes: 38 additions & 5 deletions pkg/sqlreplay/store/rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,16 @@ func (r *rotateReader) nextReader() error {
var minFileName string
fileNamePrefix := getFileNamePrefix(r.cfg.Format)
parseFunc := getParseFileNameFunc(r.cfg.Format)
fileFilter := getFilterFileNameFunc(r.cfg.Format, r.cfg.CommandStartTime)
ctx, cancel := context.WithTimeout(context.Background(), opTimeout)
err := r.storage.WalkDir(ctx, &storage.WalkOption{},
func(name string, size int64) error {
if !strings.HasPrefix(name, fileNamePrefix) {
return nil
}
if !fileFilter(name, fileNamePrefix) {
return nil
}
fileIdx := parseFunc(name, fileNamePrefix)
if fileIdx == 0 {
r.lg.Warn("traffic file name is invalid", zap.String("filename", name), zap.String("format", r.cfg.Format))
Expand Down Expand Up @@ -230,11 +234,21 @@ func getFileNamePrefix(format string) string {
func getParseFileNameFunc(format string) func(string, string) int64 {
switch format {
case cmd.FormatAuditLogPlugin:
return parseFileTime
return parseFileTimeToIdx
}
return parseFileIdx
}

func getFilterFileNameFunc(format string, commandStartTime time.Time) func(string, string) bool {
switch format {
case cmd.FormatAuditLogPlugin:
return func(name, fileNamePrefix string) bool {
return filterFileByTime(name, fileNamePrefix, commandStartTime)
}
}
return func(string, string) bool { return true }
}

// Parse the file name to get the file index.
// filename pattern: traffic-1.log.gz
func parseFileIdx(name, fileNamePrefix string) int64 {
Expand Down Expand Up @@ -262,25 +276,44 @@ func parseFileIdx(name, fileNamePrefix string) int64 {

// Parse the file name to get the file timestamp.
// filename pattern: tidb-audit-2025-09-10T17-01-56.073.log
func parseFileTime(name, fileNamePrefix string) int64 {
func parseFileTime(name, fileNamePrefix string) time.Time {
if !strings.HasPrefix(name, fileNamePrefix) {
return 0
return time.Time{}
}
startIdx := len(fileNamePrefix)
if len(name) <= startIdx+len(fileNameSuffix) {
return 0
return time.Time{}
}
endIdx := len(name)
if strings.HasSuffix(name, fileCompressFormat) {
endIdx -= len(fileCompressFormat)
}
if !strings.HasSuffix(name[:endIdx], fileNameSuffix) {
return 0
return time.Time{}
}
endIdx -= len(fileNameSuffix)
ts, err := time.Parse(logTimeLayout, name[startIdx:endIdx])
if err != nil {
return time.Time{}
}
return ts
}

func parseFileTimeToIdx(name, fileNamePrefix string) int64 {
ts := parseFileTime(name, fileNamePrefix)
if ts.IsZero() {
return 0
}
return ts.UnixNano() / 1000000
}

func filterFileByTime(name, fileNamePrefix string, commandStartTime time.Time) bool {
fileTime := parseFileTime(name, fileNamePrefix)
if fileTime.IsZero() {
return false
}
// Be careful that the log file name doesn't contain timezone info.
// We assume the log file time is the Local time. But anyway we could workaround it by
// adjusting the commandStartTime.
return fileTime.After(commandStartTime)
}
91 changes: 90 additions & 1 deletion pkg/sqlreplay/store/rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/pingcap/tiproxy/pkg/sqlreplay/cmd"
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestParseFileTime(t *testing.T) {
}

for i, test := range tests {
idx := parseFileTime(test.fileName, auditFileNamePrefix)
idx := parseFileTimeToIdx(test.fileName, auditFileNamePrefix)
require.Equal(t, test.fileIdx, idx, "case %d", i)
}
}
Expand Down Expand Up @@ -367,3 +368,91 @@ func TestCompressAndEncrypt(t *testing.T) {
require.Equal(t, []byte("testtest"), data[:8])
require.NoError(t, reader.Close())
}

func TestFilterFileNameByStartTime(t *testing.T) {
commandStartTime, err := time.Parse(logTimeLayout, "2025-09-10T17-01-56.050")
require.NoError(t, err)
tests := []struct {
fileName string
expectToInclude bool
}{
// Files after start time should be included
{
fileName: "tidb-audit-2025-09-10T17-01-56.073.log",
expectToInclude: true,
},
{
fileName: "tidb-audit-2025-09-10T17-01-56.172.log.gz",
expectToInclude: true,
},
{
fileName: "tidb-audit-2025-09-11T10-30-00.500.log",
expectToInclude: true,
},
// Files before or equal to start time should be excluded
{
fileName: "tidb-audit-2025-09-10T17-01-55.073.log",
expectToInclude: false,
},
{
fileName: "tidb-audit-2025-09-10T17-01-56.000.log",
expectToInclude: false,
},
// Invalid file names should be excluded
{
fileName: "tidb-audit-invalid-timestamp.log",
expectToInclude: false,
},
{
fileName: "traffic-1.log",
expectToInclude: false,
},
{
fileName: "tidb-audit.log",
expectToInclude: false,
},
{
fileName: "tidb-audit-2025-13-40T25-70-70.log",
expectToInclude: false,
},
}
expectedFileOrder := []string{
"tidb-audit-2025-09-10T17-01-56.073.log",
"tidb-audit-2025-09-10T17-01-56.172.log.gz",
"tidb-audit-2025-09-11T10-30-00.500.log",
}
for i, test := range tests {
included := filterFileByTime(test.fileName, auditFileNamePrefix, commandStartTime)
require.Equal(t, test.expectToInclude, included, "case %d", i)
}

dir := t.TempDir()
require.NoError(t, os.RemoveAll(dir))
require.NoError(t, os.MkdirAll(dir, 0777))
for _, test := range tests {
f, err := os.Create(filepath.Join(dir, test.fileName))
require.NoError(t, err)
if strings.HasSuffix(test.fileName, ".gz") {
w := gzip.NewWriter(f)
_, err := w.Write([]byte{})
require.NoError(t, err)
require.NoError(t, w.Close())
}
require.NoError(t, f.Close())
}
storage, err := NewStorage(dir)
require.NoError(t, err)
defer storage.Close()
lg, _ := logger.CreateLoggerForTest(t)
l, err := newRotateReader(lg, storage, ReaderCfg{Dir: dir, Format: cmd.FormatAuditLogPlugin, CommandStartTime: commandStartTime})
require.NoError(t, err)
var fileOrder []string
for {
if err := l.nextReader(); err != nil {
require.True(t, errors.Is(err, io.EOF))
break
}
fileOrder = append(fileOrder, l.curFileName)
}
require.Equal(t, expectedFileOrder, fileOrder)
}