Skip to content

Commit ae314ec

Browse files
joostjagerchampo
andcommitted
htlcswitch: add an always on mode to interceptable switch
Co-authored-by: Juan Pablo Civile <elementohb@gmail.com>
1 parent 169f0c0 commit ae314ec

File tree

10 files changed

+201
-54
lines changed

10 files changed

+201
-54
lines changed

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ type Config struct {
354354

355355
RejectHTLC bool `long:"rejecthtlc" description:"If true, lnd will not forward any HTLCs that are meant as onward payments. This option will still allow lnd to send HTLCs and receive HTLCs but lnd won't be used as a hop."`
356356

357+
// RequireInterceptor determines whether the HTLC interceptor is
358+
// registered regardless of whether the RPC is called or not.
359+
RequireInterceptor bool `long:"requireinterceptor" description:"Whether to always intercept HTLCs, even if no stream is attached"`
360+
357361
StaggerInitialReconnect bool `long:"stagger-initial-reconnect" description:"If true, will apply a randomized staggering between 0s and 30s when reconnecting to persistent peers on startup. The first 10 reconnections will be attempted instantly, regardless of the flag's value"`
358362

359363
MaxOutgoingCltvExpiry uint32 `long:"max-cltv-expiry" description:"The maximum number of blocks funds could be locked up for when forwarding payments."`

docs/release-notes/release-notes-0.15.0.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@
101101
change, it allows encrypted failure messages to be returned to the sender.
102102
Additionally it is possible to signal a malformed htlc.
103103

104+
* Add an [always on](https://github.com/lightningnetwork/lnd/pull/6232) mode to
105+
the HTLC interceptor API. This enables interception applications where every
106+
packet must be intercepted.
107+
104108
## Database
105109

106110
* [Add ForAll implementation for etcd to speed up

htlcswitch/interceptable_switch.go

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ type InterceptableSwitch struct {
4545
// client connect and disconnect.
4646
interceptorRegistration chan ForwardInterceptor
4747

48+
// requireInterceptor indicates whether processing should block if no
49+
// interceptor is connected.
50+
requireInterceptor bool
51+
4852
// interceptor is the handler for intercepted packets.
4953
interceptor ForwardInterceptor
5054

@@ -58,6 +62,7 @@ type InterceptableSwitch struct {
5862
type interceptedPackets struct {
5963
packets []*htlcPacket
6064
linkQuit chan struct{}
65+
isReplay bool
6166
}
6267

6368
// FwdAction defines the various resolution types.
@@ -101,13 +106,16 @@ type fwdResolution struct {
101106
}
102107

103108
// NewInterceptableSwitch returns an instance of InterceptableSwitch.
104-
func NewInterceptableSwitch(s *Switch) *InterceptableSwitch {
109+
func NewInterceptableSwitch(s *Switch,
110+
requireInterceptor bool) *InterceptableSwitch {
111+
105112
return &InterceptableSwitch{
106113
htlcSwitch: s,
107114
intercepted: make(chan *interceptedPackets),
108115
interceptorRegistration: make(chan ForwardInterceptor),
109116
holdForwards: make(map[channeldb.CircuitKey]InterceptedForward),
110117
resolutionChan: make(chan *fwdResolution),
118+
requireInterceptor: requireInterceptor,
111119

112120
quit: make(chan struct{}),
113121
}
@@ -155,9 +163,7 @@ func (s *InterceptableSwitch) run() {
155163
case packets := <-s.intercepted:
156164
var notIntercepted []*htlcPacket
157165
for _, p := range packets.packets {
158-
if s.interceptor == nil ||
159-
!s.interceptForward(p) {
160-
166+
if !s.interceptForward(p, packets.isReplay) {
161167
notIntercepted = append(
162168
notIntercepted, p,
163169
)
@@ -178,7 +184,6 @@ func (s *InterceptableSwitch) run() {
178184
}
179185
}
180186
}
181-
182187
func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
183188
err := s.interceptor(fwd.Packet())
184189
if err != nil {
@@ -191,12 +196,28 @@ func (s *InterceptableSwitch) sendForward(fwd InterceptedForward) {
191196
func (s *InterceptableSwitch) setInterceptor(interceptor ForwardInterceptor) {
192197
s.interceptor = interceptor
193198

199+
// Replay all currently held htlcs. When an interceptor is not required,
200+
// there may be none because they've been cleared after the previous
201+
// disconnect.
194202
if interceptor != nil {
195203
log.Debugf("Interceptor connected")
196204

205+
for _, fwd := range s.holdForwards {
206+
s.sendForward(fwd)
207+
}
208+
197209
return
198210
}
199211

212+
// The interceptor disconnects. If an interceptor is required, keep the
213+
// held htlcs.
214+
if s.requireInterceptor {
215+
log.Infof("Interceptor disconnected, retaining held packets")
216+
217+
return
218+
}
219+
220+
// Interceptor is not required. Release held forwards.
200221
log.Infof("Interceptor disconnected, resolving held packets")
201222

202223
for _, fwd := range s.holdForwards {
@@ -260,7 +281,7 @@ func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
260281
// interceptor. If the interceptor signals the resume action, the htlcs are
261282
// forwarded to the switch. The link's quit signal should be provided to allow
262283
// cancellation of forwarding during link shutdown.
263-
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},
284+
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bool,
264285
packets ...*htlcPacket) error {
265286

266287
// Synchronize with the main event loop. This should be light in the
@@ -269,6 +290,7 @@ func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},
269290
case s.intercepted <- &interceptedPackets{
270291
packets: packets,
271292
linkQuit: linkQuit,
293+
isReplay: isReplay,
272294
}:
273295

274296
case <-linkQuit:
@@ -283,7 +305,15 @@ func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},
283305

284306
// interceptForward forwards the packet to the external interceptor after
285307
// checking the interception criteria.
286-
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket) bool {
308+
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
309+
isReplay bool) bool {
310+
311+
// Process normally if an interceptor is not required and not
312+
// registered.
313+
if !s.requireInterceptor && s.interceptor == nil {
314+
return false
315+
}
316+
287317
switch htlc := packet.htlc.(type) {
288318
case *lnwire.UpdateAddHTLC:
289319
// We are not interested in intercepting initiated payments.
@@ -307,9 +337,31 @@ func (s *InterceptableSwitch) interceptForward(packet *htlcPacket) bool {
307337
htlcSwitch: s.htlcSwitch,
308338
}
309339

340+
if s.interceptor == nil && !isReplay {
341+
// There is no interceptor registered, we are in
342+
// interceptor-required mode, and this is a new packet
343+
//
344+
// Because the interceptor has never seen this packet
345+
// yet, it is still safe to fail back. This limits the
346+
// backlog of htlcs when the interceptor is down.
347+
err := intercepted.FailWithCode(
348+
lnwire.CodeTemporaryChannelFailure,
349+
)
350+
if err != nil {
351+
log.Errorf("Cannot fail packet: %v", err)
352+
}
353+
354+
return true
355+
}
356+
310357
s.holdForwards[inKey] = intercepted
311358

312-
s.sendForward(intercepted)
359+
// If there is no interceptor registered, we must be in
360+
// interceptor-required mode. The packet is kept in the queue
361+
// until the interceptor registers itself.
362+
if s.interceptor != nil {
363+
s.sendForward(intercepted)
364+
}
313365

314366
return true
315367

htlcswitch/link.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ type ChannelLinkConfig struct {
141141
// switch. The function returns and error in case it fails to send one or
142142
// more packets. The link's quit signal should be provided to allow
143143
// cancellation of forwarding during link shutdown.
144-
ForwardPackets func(chan struct{}, ...*htlcPacket) error
144+
ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error
145145

146146
// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
147147
// blobs, which are then used to inform how to forward an HTLC.
@@ -1720,7 +1720,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
17201720
l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
17211721

17221722
// Pipeline this settle, send it to the switch.
1723-
go l.forwardBatch(settlePacket)
1723+
go l.forwardBatch(false, settlePacket)
17241724

17251725
case *lnwire.UpdateFailMalformedHTLC:
17261726
// Convert the failure type encoded within the HTLC fail
@@ -2744,7 +2744,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
27442744

27452745
// Only spawn the task forward packets we have a non-zero number.
27462746
if len(switchPackets) > 0 {
2747-
go l.forwardBatch(switchPackets...)
2747+
go l.forwardBatch(false, switchPackets...)
27482748
}
27492749
}
27502750

@@ -3043,14 +3043,17 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
30433043
return
30443044
}
30453045

3046-
l.log.Debugf("forwarding %d packets to switch", len(switchPackets))
3046+
replay := fwdPkg.State != channeldb.FwdStateLockedIn
3047+
3048+
l.log.Debugf("forwarding %d packets to switch: replay=%v",
3049+
len(switchPackets), replay)
30473050

30483051
// NOTE: This call is made synchronous so that we ensure all circuits
30493052
// are committed in the exact order that they are processed in the link.
30503053
// Failing to do this could cause reorderings/gaps in the range of
30513054
// opened circuits, which violates assumptions made by the circuit
30523055
// trimming.
3053-
l.forwardBatch(switchPackets...)
3056+
l.forwardBatch(replay, switchPackets...)
30543057
}
30553058

30563059
// processExitHop handles an htlc for which this link is the exit hop. It
@@ -3184,7 +3187,7 @@ func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
31843187
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
31853188
// err chan for the individual responses. This method is intended to be spawned
31863189
// as a goroutine so the responses can be handled in the background.
3187-
func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
3190+
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
31883191
// Don't forward packets for which we already have a response in our
31893192
// mailbox. This could happen if a packet fails and is buffered in the
31903193
// mailbox, and the incoming link flaps.
@@ -3197,7 +3200,8 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
31973200
filteredPkts = append(filteredPkts, pkt)
31983201
}
31993202

3200-
if err := l.cfg.ForwardPackets(l.quit, filteredPkts...); err != nil {
3203+
err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
3204+
if err != nil {
32013205
log.Errorf("Unhandled error while reforwarding htlc "+
32023206
"settle/fail over htlcswitch: %v", err)
32033207
}

htlcswitch/link_test.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,12 +1940,14 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
19401940
// the firing via force feeding.
19411941
bticker := ticker.NewForce(time.Hour)
19421942
aliceCfg := ChannelLinkConfig{
1943-
FwrdingPolicy: globalPolicy,
1944-
Peer: alicePeer,
1945-
Switch: aliceSwitch,
1946-
BestHeight: aliceSwitch.BestHeight,
1947-
Circuits: aliceSwitch.CircuitModifier(),
1948-
ForwardPackets: aliceSwitch.ForwardPackets,
1943+
FwrdingPolicy: globalPolicy,
1944+
Peer: alicePeer,
1945+
Switch: aliceSwitch,
1946+
BestHeight: aliceSwitch.BestHeight,
1947+
Circuits: aliceSwitch.CircuitModifier(),
1948+
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
1949+
return aliceSwitch.ForwardPackets(linkQuit, packets...)
1950+
},
19491951
DecodeHopIterators: decoder.DecodeHopIterators,
19501952
ExtractErrorEncrypter: func(*btcec.PublicKey) (
19511953
hop.ErrorEncrypter, lnwire.FailCode) {
@@ -4491,12 +4493,14 @@ func (h *persistentLinkHarness) restartLink(
44914493
// the firing via force feeding.
44924494
bticker := ticker.NewForce(time.Hour)
44934495
aliceCfg := ChannelLinkConfig{
4494-
FwrdingPolicy: globalPolicy,
4495-
Peer: alicePeer,
4496-
Switch: aliceSwitch,
4497-
BestHeight: aliceSwitch.BestHeight,
4498-
Circuits: aliceSwitch.CircuitModifier(),
4499-
ForwardPackets: aliceSwitch.ForwardPackets,
4496+
FwrdingPolicy: globalPolicy,
4497+
Peer: alicePeer,
4498+
Switch: aliceSwitch,
4499+
BestHeight: aliceSwitch.BestHeight,
4500+
Circuits: aliceSwitch.CircuitModifier(),
4501+
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
4502+
return aliceSwitch.ForwardPackets(linkQuit, packets...)
4503+
},
45004504
DecodeHopIterators: decoder.DecodeHopIterators,
45014505
ExtractErrorEncrypter: func(*btcec.PublicKey) (
45024506
hop.ErrorEncrypter, lnwire.FailCode) {
@@ -6694,7 +6698,7 @@ func TestPipelineSettle(t *testing.T) {
66946698
// erroneously forwarded. If the forwardChan is closed before the last
66956699
// step, then the test will fail.
66966700
forwardChan := make(chan struct{})
6697-
fwdPkts := func(c chan struct{}, hp ...*htlcPacket) error {
6701+
fwdPkts := func(c chan struct{}, _ bool, hp ...*htlcPacket) error {
66986702
close(forwardChan)
66996703
return nil
67006704
}

0 commit comments

Comments
 (0)