Skip to content

Commit 63ed1fd

Browse files
authored
Add support for Redis Streams Idempotent Production (#3693)
1 parent 3e119d7 commit 63ed1fd

File tree

2 files changed

+214
-10
lines changed

2 files changed

+214
-10
lines changed

commands_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7010,6 +7010,162 @@ var _ = Describe("Commands", func() {
70107010
Expect(vals).To(HaveLen(0))
70117011
})
70127012

7013+
It("should XAdd with IDMP (idempotent production)", func() {
7014+
SkipBeforeRedisVersion(8.6, "IDMP requires Redis 8.6+")
7015+
streamName := "idmp-stream"
7016+
defer client.Del(ctx, streamName)
7017+
7018+
// First add with IDMP
7019+
id1, err := client.XAdd(ctx, &redis.XAddArgs{
7020+
Stream: streamName,
7021+
ProducerID: "producer1",
7022+
IdempotentID: "msg1",
7023+
Values: map[string]interface{}{"field": "value1"},
7024+
}).Result()
7025+
Expect(err).NotTo(HaveOccurred())
7026+
Expect(id1).NotTo(BeEmpty())
7027+
7028+
// Add the same message again - should return the same ID (duplicate detection)
7029+
id2, err := client.XAdd(ctx, &redis.XAddArgs{
7030+
Stream: streamName,
7031+
ProducerID: "producer1",
7032+
IdempotentID: "msg1",
7033+
Values: map[string]interface{}{"field": "value2"},
7034+
}).Result()
7035+
Expect(err).NotTo(HaveOccurred())
7036+
Expect(id2).To(Equal(id1)) // Should return the same entry ID
7037+
7038+
// Verify only one message was added
7039+
vals, err := client.XRange(ctx, streamName, "-", "+").Result()
7040+
Expect(err).NotTo(HaveOccurred())
7041+
Expect(vals).To(HaveLen(1))
7042+
Expect(vals[0].ID).To(Equal(id1))
7043+
7044+
// Add a different message with different idempotent ID
7045+
id3, err := client.XAdd(ctx, &redis.XAddArgs{
7046+
Stream: streamName,
7047+
ProducerID: "producer1",
7048+
IdempotentID: "msg2",
7049+
Values: map[string]interface{}{"field": "value3"},
7050+
}).Result()
7051+
Expect(err).NotTo(HaveOccurred())
7052+
Expect(id3).NotTo(Equal(id1)) // Should be a different entry ID
7053+
7054+
// Verify two messages now exist
7055+
vals, err = client.XRange(ctx, streamName, "-", "+").Result()
7056+
Expect(err).NotTo(HaveOccurred())
7057+
Expect(vals).To(HaveLen(2))
7058+
7059+
// Verify XINFO STREAM shows idempotent stats
7060+
info, err := client.XInfoStream(ctx, streamName).Result()
7061+
Expect(err).NotTo(HaveOccurred())
7062+
Expect(info.PIDsTracked).To(Equal(int64(1)))
7063+
Expect(info.IIDsTracked).To(BeNumerically(">", 0))
7064+
Expect(info.IIDsAdded).To(Equal(int64(2)))
7065+
Expect(info.IIDsDuplicates).To(Equal(int64(1)))
7066+
})
7067+
7068+
It("should XAdd with IDMPAUTO (auto-generated idempotent ID)", func() {
7069+
SkipBeforeRedisVersion(8.6, "IDMPAUTO requires Redis 8.6+")
7070+
streamName := "idmpauto-stream"
7071+
defer client.Del(ctx, streamName)
7072+
7073+
id1, err := client.XAdd(ctx, &redis.XAddArgs{
7074+
Stream: streamName,
7075+
ProducerID: "producer1",
7076+
IdempotentAuto: true,
7077+
Values: map[string]interface{}{"field": "value1", "order": "123"},
7078+
}).Result()
7079+
Expect(err).NotTo(HaveOccurred())
7080+
Expect(id1).NotTo(BeEmpty())
7081+
7082+
// Add the same message again - should return the same ID (duplicate detection)
7083+
// Redis will calculate the same idempotent ID based on content
7084+
id2, err := client.XAdd(ctx, &redis.XAddArgs{
7085+
Stream: streamName,
7086+
ProducerID: "producer1",
7087+
IdempotentAuto: true,
7088+
Values: map[string]interface{}{"field": "value1", "order": "123"},
7089+
}).Result()
7090+
Expect(err).NotTo(HaveOccurred())
7091+
Expect(id2).To(Equal(id1)) // Should return the same entry ID
7092+
7093+
// Verify only one message was added
7094+
vals, err := client.XRange(ctx, streamName, "-", "+").Result()
7095+
Expect(err).NotTo(HaveOccurred())
7096+
Expect(vals).To(HaveLen(1))
7097+
7098+
// Add a different message - should create a new entry
7099+
id3, err := client.XAdd(ctx, &redis.XAddArgs{
7100+
Stream: streamName,
7101+
ProducerID: "producer1",
7102+
IdempotentAuto: true,
7103+
Values: map[string]interface{}{"field": "value2", "order": "456"},
7104+
}).Result()
7105+
Expect(err).NotTo(HaveOccurred())
7106+
Expect(id3).NotTo(Equal(id1))
7107+
7108+
// Verify two messages now exist
7109+
vals, err = client.XRange(ctx, streamName, "-", "+").Result()
7110+
Expect(err).NotTo(HaveOccurred())
7111+
Expect(vals).To(HaveLen(2))
7112+
7113+
// Verify XINFO STREAM shows idempotent stats
7114+
info, err := client.XInfoStream(ctx, streamName).Result()
7115+
Expect(err).NotTo(HaveOccurred())
7116+
Expect(info.IIDsAdded).To(Equal(int64(2)))
7117+
Expect(info.IIDsDuplicates).To(Equal(int64(1)))
7118+
})
7119+
7120+
It("should XCfgSet configure idempotent production settings", func() {
7121+
SkipBeforeRedisVersion(8.6, "XCFGSET requires Redis 8.6+")
7122+
streamName := "xcfgset-stream"
7123+
defer client.Del(ctx, streamName)
7124+
7125+
_, err := client.XAdd(ctx, &redis.XAddArgs{
7126+
Stream: streamName,
7127+
Values: map[string]interface{}{"field": "value"},
7128+
}).Result()
7129+
Expect(err).NotTo(HaveOccurred())
7130+
7131+
// Configure IDMP settings
7132+
status, err := client.XCfgSet(ctx, &redis.XCfgSetArgs{
7133+
Stream: streamName,
7134+
Duration: 200, // 200 seconds
7135+
MaxSize: 500, // 500 idempotent IDs
7136+
}).Result()
7137+
Expect(err).NotTo(HaveOccurred())
7138+
Expect(status).To(Equal("OK"))
7139+
7140+
// Verify the settings were applied
7141+
info, err := client.XInfoStream(ctx, streamName).Result()
7142+
Expect(err).NotTo(HaveOccurred())
7143+
Expect(info.IDMPDuration).To(Equal(int64(200)))
7144+
Expect(info.IDMPMaxSize).To(Equal(int64(500)))
7145+
7146+
status, err = client.XCfgSet(ctx, &redis.XCfgSetArgs{
7147+
Stream: streamName,
7148+
Duration: 300,
7149+
}).Result()
7150+
Expect(err).NotTo(HaveOccurred())
7151+
Expect(status).To(Equal("OK"))
7152+
7153+
info, err = client.XInfoStream(ctx, streamName).Result()
7154+
Expect(err).NotTo(HaveOccurred())
7155+
Expect(info.IDMPDuration).To(Equal(int64(300)))
7156+
7157+
status, err = client.XCfgSet(ctx, &redis.XCfgSetArgs{
7158+
Stream: streamName,
7159+
MaxSize: 1000,
7160+
}).Result()
7161+
Expect(err).NotTo(HaveOccurred())
7162+
Expect(status).To(Equal("OK"))
7163+
7164+
info, err = client.XInfoStream(ctx, streamName).Result()
7165+
Expect(err).NotTo(HaveOccurred())
7166+
Expect(info.IDMPMaxSize).To(Equal(int64(1000)))
7167+
})
7168+
70137169
It("should XDel", func() {
70147170
n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result()
70157171
Expect(err).NotTo(HaveOccurred())

stream_commands.go

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type StreamCmdable interface {
4343
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
4444
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
4545
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
46+
XCfgSet(ctx context.Context, a *XCfgSetArgs) *StatusCmd
4647
}
4748

4849
// XAddArgs accepts values in the following formats:
@@ -52,25 +53,51 @@ type StreamCmdable interface {
5253
//
5354
// Note that map will not preserve the order of key-value pairs.
5455
// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used.
56+
//
57+
// For idempotent production (at-most-once production):
58+
// - ProducerID: A unique identifier for the producer (required for both IDMP and IDMPAUTO)
59+
// - IdempotentID: A unique identifier for the message (used with IDMP)
60+
// - IdempotentAuto: If true, Redis will auto-generate an idempotent ID based on message content (IDMPAUTO)
61+
//
62+
// ProducerID and IdempotentID are mutually exclusive with IdempotentAuto.
63+
// When using idempotent production, ID must be "*" or empty.
5564
type XAddArgs struct {
5665
Stream string
5766
NoMkStream bool
5867
MaxLen int64 // MAXLEN N
5968
MinID string
6069
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
61-
Approx bool
62-
Limit int64
63-
Mode string
64-
ID string
65-
Values interface{}
70+
Approx bool
71+
Limit int64
72+
Mode string
73+
ID string
74+
Values interface{}
75+
ProducerID string // Producer ID for idempotent production (IDMP or IDMPAUTO)
76+
IdempotentID string // Idempotent ID for IDMP
77+
IdempotentAuto bool // Use IDMPAUTO to auto-generate idempotent ID based on content
6678
}
6779

6880
func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
69-
args := make([]interface{}, 0, 11)
81+
args := make([]interface{}, 0, 15)
7082
args = append(args, "xadd", a.Stream)
7183
if a.NoMkStream {
7284
args = append(args, "nomkstream")
7385
}
86+
87+
if a.Mode != "" {
88+
args = append(args, a.Mode)
89+
}
90+
91+
if a.ProducerID != "" {
92+
if a.IdempotentAuto {
93+
// IDMPAUTO pid
94+
args = append(args, "idmpauto", a.ProducerID)
95+
} else if a.IdempotentID != "" {
96+
// IDMP pid iid
97+
args = append(args, "idmp", a.ProducerID, a.IdempotentID)
98+
}
99+
}
100+
74101
switch {
75102
case a.MaxLen > 0:
76103
if a.Approx {
@@ -89,10 +116,6 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
89116
args = append(args, "limit", a.Limit)
90117
}
91118

92-
if a.Mode != "" {
93-
args = append(args, a.Mode)
94-
}
95-
96119
if a.ID != "" {
97120
args = append(args, a.ID)
98121
} else {
@@ -527,3 +550,28 @@ func (c cmdable) XInfoStreamFull(ctx context.Context, key string, count int) *XI
527550
_ = c(ctx, cmd)
528551
return cmd
529552
}
553+
554+
// XCfgSetArgs represents the arguments for the XCFGSET command.
555+
// Duration is the duration, in seconds, that Redis keeps each idempotent ID.
556+
// MaxSize is the maximum number of most recent idempotent IDs that Redis keeps for each producer ID.
557+
type XCfgSetArgs struct {
558+
Stream string
559+
Duration int64
560+
MaxSize int64
561+
}
562+
563+
// XCfgSet sets the idempotent production configuration for a stream.
564+
// XCFGSET key [IDMP-DURATION duration] [IDMP-MAXSIZE maxsize]
565+
func (c cmdable) XCfgSet(ctx context.Context, a *XCfgSetArgs) *StatusCmd {
566+
args := make([]interface{}, 0, 6)
567+
args = append(args, "xcfgset", a.Stream)
568+
if a.Duration > 0 {
569+
args = append(args, "idmp-duration", a.Duration)
570+
}
571+
if a.MaxSize > 0 {
572+
args = append(args, "idmp-maxsize", a.MaxSize)
573+
}
574+
cmd := NewStatusCmd(ctx, args...)
575+
_ = c(ctx, cmd)
576+
return cmd
577+
}

0 commit comments

Comments
 (0)