Skip to content

Commit 7981f9b

Browse files
committed
fixes #247: implement msg id function as pubsub option
1 parent 7269e41 commit 7981f9b

File tree

6 files changed

+43
-15
lines changed

6 files changed

+43
-15
lines changed

gossipsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ func (gs *GossipSubRouter) Protocols() []protocol.ID {
7575
func (gs *GossipSubRouter) Attach(p *PubSub) {
7676
gs.p = p
7777
gs.tracer = p.tracer
78+
// start using the same msg ID function as PubSub for caching messages.
79+
gs.mcache.ChangeMsgIdFn(p.msgID)
7880
go gs.heartbeatTimer()
7981
}
8082

mcache.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,19 @@ func NewMessageCache(gossip, history int) *MessageCache {
2828
msgs: make(map[string]*pb.Message),
2929
history: make([][]CacheEntry, history),
3030
gossip: gossip,
31+
msgID: DefaultMsgIdFn,
3132
}
3233
}
3334

3435
type MessageCache struct {
3536
msgs map[string]*pb.Message
3637
history [][]CacheEntry
3738
gossip int
39+
msgID MsgIdFunction
40+
}
41+
42+
func (mc *MessageCache) ChangeMsgIdFn(msgID MsgIdFunction) {
43+
mc.msgID = msgID
3844
}
3945

4046
type CacheEntry struct {
@@ -43,7 +49,7 @@ type CacheEntry struct {
4349
}
4450

4551
func (mc *MessageCache) Put(msg *pb.Message) {
46-
mid := msgID(msg)
52+
mid := mc.msgID(msg)
4753
mc.msgs[mid] = msg
4854
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()})
4955
}

mcache_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
func TestMessageCache(t *testing.T) {
1212
mcache := NewMessageCache(3, 5)
13+
msgID := DefaultMsgIdFn
1314

1415
msgs := make([]*pb.Message, 60)
1516
for i := range msgs {

pubsub.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ type PubSub struct {
117117
seenMessagesMx sync.Mutex
118118
seenMessages *timecache.TimeCache
119119

120+
// function used to compute the ID for a message
121+
msgID MsgIdFunction
122+
120123
// key for signing messages; nil when signing is disabled (default for now)
121124
signKey crypto.PrivKey
122125
// source ID for signed messages; corresponds to signKey
@@ -208,6 +211,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
208211
blacklist: NewMapBlacklist(),
209212
blacklistPeer: make(chan peer.ID),
210213
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
214+
msgID: DefaultMsgIdFn,
211215
counter: uint64(time.Now().UnixNano()),
212216
}
213217

@@ -240,6 +244,20 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
240244
return ps, nil
241245
}
242246

247+
// MsgIdFunction returns a unique ID for the passed Message, and PubSub can be customized to use any
248+
// implementation of this function by configuring it with the Option from WithMessageIdFn.
249+
type MsgIdFunction func(pmsg *pb.Message) string
250+
251+
// WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message.
252+
// The default ID function is DefaultMsgIdFn (concatenate source and seq nr.),
253+
// but it can be customized to e.g. the hash of the message.
254+
func WithMessageIdFn(fn MsgIdFunction) Option {
255+
return func(p *PubSub) error {
256+
p.msgID = fn
257+
return nil
258+
}
259+
}
260+
243261
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
244262
// We start dropping messages to a peer if the outbound queue if full
245263
func WithPeerOutboundQueueSize(size int) Option {
@@ -326,7 +344,7 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
326344
// WithEventTracer provides a tracer for the pubsub system
327345
func WithEventTracer(tracer EventTracer) Option {
328346
return func(p *PubSub) error {
329-
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID()}
347+
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), msgID: p.msgID}
330348
return nil
331349
}
332350
}
@@ -730,8 +748,8 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
730748
p.rt.HandleRPC(rpc)
731749
}
732750

733-
// msgID returns a unique ID of the passed Message
734-
func msgID(pmsg *pb.Message) string {
751+
// DefaultMsgIdFn returns a unique ID of the passed Message
752+
func DefaultMsgIdFn(pmsg *pb.Message) string {
735753
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno())
736754
}
737755

@@ -760,7 +778,7 @@ func (p *PubSub) pushMsg(msg *Message) {
760778
}
761779

762780
// have we already seen and validated this message?
763-
id := msgID(msg.Message)
781+
id := p.msgID(msg.Message)
764782
if p.seenMessage(id) {
765783
p.tracer.DuplicateMessage(msg)
766784
return

trace.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type EventTracer interface {
1818
type pubsubTracer struct {
1919
tracer EventTracer
2020
pid peer.ID
21+
msgID MsgIdFunction
2122
}
2223

2324
func (t *pubsubTracer) PublishMessage(msg *Message) {
@@ -31,7 +32,7 @@ func (t *pubsubTracer) PublishMessage(msg *Message) {
3132
PeerID: []byte(t.pid),
3233
Timestamp: &now,
3334
PublishMessage: &pb.TraceEvent_PublishMessage{
34-
MessageID: []byte(msgID(msg.Message)),
35+
MessageID: []byte(t.msgID(msg.Message)),
3536
Topics: msg.Message.TopicIDs,
3637
},
3738
}
@@ -50,7 +51,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
5051
PeerID: []byte(t.pid),
5152
Timestamp: &now,
5253
RejectMessage: &pb.TraceEvent_RejectMessage{
53-
MessageID: []byte(msgID(msg.Message)),
54+
MessageID: []byte(t.msgID(msg.Message)),
5455
ReceivedFrom: []byte(msg.ReceivedFrom),
5556
Reason: &reason,
5657
},
@@ -70,7 +71,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) {
7071
PeerID: []byte(t.pid),
7172
Timestamp: &now,
7273
DuplicateMessage: &pb.TraceEvent_DuplicateMessage{
73-
MessageID: []byte(msgID(msg.Message)),
74+
MessageID: []byte(t.msgID(msg.Message)),
7475
ReceivedFrom: []byte(msg.ReceivedFrom),
7576
},
7677
}
@@ -89,7 +90,7 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) {
8990
PeerID: []byte(t.pid),
9091
Timestamp: &now,
9192
DeliverMessage: &pb.TraceEvent_DeliverMessage{
92-
MessageID: []byte(msgID(msg.Message)),
93+
MessageID: []byte(t.msgID(msg.Message)),
9394
},
9495
}
9596

@@ -146,7 +147,7 @@ func (t *pubsubTracer) RecvRPC(rpc *RPC) {
146147
Timestamp: &now,
147148
RecvRPC: &pb.TraceEvent_RecvRPC{
148149
ReceivedFrom: []byte(rpc.from),
149-
Meta: traceRPCMeta(rpc),
150+
Meta: t.traceRPCMeta(rpc),
150151
},
151152
}
152153

@@ -165,7 +166,7 @@ func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
165166
Timestamp: &now,
166167
SendRPC: &pb.TraceEvent_SendRPC{
167168
SendTo: []byte(rpc.from),
168-
Meta: traceRPCMeta(rpc),
169+
Meta: t.traceRPCMeta(rpc),
169170
},
170171
}
171172

@@ -184,20 +185,20 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
184185
Timestamp: &now,
185186
DropRPC: &pb.TraceEvent_DropRPC{
186187
SendTo: []byte(rpc.from),
187-
Meta: traceRPCMeta(rpc),
188+
Meta: t.traceRPCMeta(rpc),
188189
},
189190
}
190191

191192
t.tracer.Trace(evt)
192193
}
193194

194-
func traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
195+
func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
195196
rpcMeta := new(pb.TraceEvent_RPCMeta)
196197

197198
var msgs []*pb.TraceEvent_MessageMeta
198199
for _, m := range rpc.Publish {
199200
msgs = append(msgs, &pb.TraceEvent_MessageMeta{
200-
MessageID: []byte(msgID(m)),
201+
MessageID: []byte(t.msgID(m)),
201202
Topics: m.TopicIDs,
202203
})
203204
}

validation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
201201

202202
// we can mark the message as seen now that we have verified the signature
203203
// and avoid invoking user validators more than once
204-
id := msgID(msg.Message)
204+
id := v.p.msgID(msg.Message)
205205
if !v.p.markSeen(id) {
206206
v.tracer.DuplicateMessage(msg)
207207
return

0 commit comments

Comments
 (0)