Skip to content

Commit 6540abd

Browse files
authored
sessiontxn/staleread: don't overwrite StaleTSOProvider for single statement (#65110)
close #65090
1 parent b260730 commit 6540abd

File tree

5 files changed

+43
-6
lines changed

5 files changed

+43
-6
lines changed

pkg/expression/sessionexpr/sessionctx_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func TestSessionEvalContextCurrentTime(t *testing.T) {
122122
impl := sessionexpr.NewEvalContext(ctx)
123123

124124
var now atomic.Pointer[time.Time]
125-
sc.SetStaleTSOProvider(func() (uint64, error) {
125+
sc.SetStaleTSOProviderIfNotExist(func() (uint64, error) {
126126
v := time.UnixMilli(123456789)
127127
// should only be called once
128128
require.True(t, now.CompareAndSwap(nil, &v))
@@ -142,7 +142,7 @@ func TestSessionEvalContextCurrentTime(t *testing.T) {
142142
require.Equal(t, v.UnixNano(), tm.UnixNano())
143143

144144
// now should return the system variable if "timestamp" is set
145-
sc.SetStaleTSOProvider(nil)
145+
sc.SetStaleTSOProviderIfNotExist(nil)
146146
sc.Reset()
147147
require.NoError(t, vars.SetSystemVar("timestamp", "7654321.875"))
148148
tm, err = impl.CurrentTime()

pkg/sessionctx/stmtctx/stmtctx.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,10 +1191,13 @@ func (sc *StatementContext) DetachMemDiskTracker() {
11911191
}
11921192
}
11931193

1194-
// SetStaleTSOProvider sets the stale TSO provider.
1195-
func (sc *StatementContext) SetStaleTSOProvider(eval func() (uint64, error)) {
1194+
// SetStaleTSOProviderIfNotExist sets the stale TSO provider.
1195+
func (sc *StatementContext) SetStaleTSOProviderIfNotExist(eval func() (uint64, error)) {
11961196
sc.StaleTSOProvider.Lock()
11971197
defer sc.StaleTSOProvider.Unlock()
1198+
if sc.StaleTSOProvider.eval != nil {
1199+
return
1200+
}
11981201
sc.StaleTSOProvider.value = nil
11991202
sc.StaleTSOProvider.eval = eval
12001203
}

pkg/sessiontxn/staleread/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ go_test(
4646
"provider_test.go",
4747
],
4848
flaky = True,
49-
shard_count = 6,
49+
shard_count = 7,
5050
deps = [
5151
":staleread",
5252
"//pkg/domain",
@@ -56,6 +56,7 @@ go_test(
5656
"//pkg/parser/ast",
5757
"//pkg/parser/auth",
5858
"//pkg/sessionctx",
59+
"//pkg/sessionctx/stmtctx",
5960
"//pkg/sessiontxn",
6061
"//pkg/store/mockstore",
6162
"//pkg/table/temptable",

pkg/sessiontxn/staleread/processor_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/tidb/pkg/parser"
2828
"github.com/pingcap/tidb/pkg/parser/ast"
2929
"github.com/pingcap/tidb/pkg/sessionctx"
30+
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
3031
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
3132
"github.com/pingcap/tidb/pkg/store/mockstore"
3233
"github.com/pingcap/tidb/pkg/table/temptable"
@@ -428,3 +429,35 @@ func createProcessor(t *testing.T, se sessionctx.Context) staleread.Processor {
428429
require.Nil(t, processor.GetStalenessInfoSchema())
429430
return processor
430431
}
432+
433+
func TestConsistentCalculateAsOfTsExpr(t *testing.T) {
434+
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.EmbedUnistore))
435+
tk := testkit.NewTestKit(t, store)
436+
tk.MustExec("set time_zone = '+00:00'")
437+
tk.Session().GetSessionVars().TimeZone = time.UTC
438+
439+
p := parser.New()
440+
stmt, err := p.ParseOneStmt(`select now(3) - interval 1 second`, "", "")
441+
require.NoError(t, err)
442+
secondTsExpr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
443+
stmt, err = p.ParseOneStmt(`select now(3) - interval 3 second`, "", "")
444+
require.NoError(t, err)
445+
threeSecondTsExpr := stmt.(*ast.SelectStmt).Fields.Fields[0].Expr
446+
447+
se := tk.Session()
448+
se.GetSessionVars().StmtCtx = stmtctx.NewStmtCtxWithTimeZone(time.UTC)
449+
450+
ts1, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), secondTsExpr)
451+
require.NoError(t, err)
452+
453+
time.Sleep(10 * time.Millisecond)
454+
455+
ts2, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), secondTsExpr)
456+
require.NoError(t, err)
457+
require.Equal(t, ts1, ts2)
458+
459+
ts3, err := staleread.CalculateAsOfTsExpr(context.Background(), se.GetPlanCtx(), threeSecondTsExpr)
460+
require.NoError(t, err)
461+
require.True(t, ts3 < ts1)
462+
require.Equal(t, ts1-ts3, uint64(2000<<18))
463+
}

pkg/sessiontxn/staleread/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434

3535
// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
3636
func CalculateAsOfTsExpr(ctx context.Context, sctx planctx.PlanContext, tsExpr ast.ExprNode) (uint64, error) {
37-
sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) {
37+
sctx.GetSessionVars().StmtCtx.SetStaleTSOProviderIfNotExist(func() (uint64, error) {
3838
failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) {
3939
return uint64(val.(int)), nil
4040
})

0 commit comments

Comments
 (0)