Skip to content

Commit b9e7e53

Browse files
yihuangi-norden
andauthored
feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (backport #8664) (#13325)
* feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (backport #8664) <!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺ v ✰ Thanks for creating a PR! ✰ v Before smashing the submit button please review the checkboxes. v If a checkbox is n/a - please still include it but + a little note why ☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --> <!-- Add a description of the changes that this PR introduces and the files that are the most critical to review. --> Hello 👋 this PR introduces the second stage of changes to support [ADR-038](#8012) state listening. This is rebased on top of the [first segment](#8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type. In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level. The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here. Thanks! This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md) --- Before we can merge this PR, please make sure that all the following items have been checked off. If any of the checklist items are not applicable, please leave them but write a little note why. - [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work. - [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md). - [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`) - [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code). - [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md` - [x] Re-reviewed `Files changed` in the Github PR explorer - [x] Review `Codecov Report` in the comment section below once CI passes * Update CHANGELOG.md Co-authored-by: Ian Norden <iansnorden@gmail.com>
1 parent 48467cd commit b9e7e53

File tree

16 files changed

+1245
-91
lines changed

16 files changed

+1245
-91
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
4040
### Improvements
4141

4242
* [\#11693](https://github.com/cosmos/cosmos-sdk/pull/11693) Add validation for gentx cmd.
43+
* (store) [\#13325](https://github.com/cosmos/cosmos-sdk/pull/13325) Implementation of ADR-038 file StreamingService, backport #8664
4344

4445
## [v0.44.8](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.44.8) - 2022-04-12
4546

baseapp/abci.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
196196
}
197197
// set the signed validators for addition to context in deliverTx
198198
app.voteInfos = req.LastCommitInfo.GetVotes()
199+
200+
// call the hooks with the BeginBlock messages
201+
for _, streamingListener := range app.abciListeners {
202+
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
203+
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
204+
}
205+
}
206+
199207
return res
200208
}
201209

@@ -216,6 +224,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
216224
res.ConsensusParamUpdates = cp
217225
}
218226

227+
// call the streaming service hooks with the EndBlock messages
228+
for _, streamingListener := range app.abciListeners {
229+
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
230+
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
231+
}
232+
}
233+
219234
return res
220235
}
221236

@@ -260,12 +275,20 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
260275
// Otherwise, the ResponseDeliverTx will contain releveant error information.
261276
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
262277
// gas execution context.
263-
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
278+
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
264279
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
265280

266281
gInfo := sdk.GasInfo{}
267282
resultStr := "successful"
268283

284+
defer func() {
285+
for _, streamingListener := range app.abciListeners {
286+
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
287+
app.logger.Error("DeliverTx listening hook failed", "err", err)
288+
}
289+
}
290+
}()
291+
269292
defer func() {
270293
telemetry.IncrCounter(1, "tx", "count")
271294
telemetry.IncrCounter(1, "tx", resultStr)

baseapp/baseapp.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ type BaseApp struct { // nolint: maligned
133133
// indexEvents defines the set of events in the form {eventType}.{attributeKey},
134134
// which informs Tendermint what to index. If empty, all events will be indexed.
135135
indexEvents map[string]struct{}
136+
137+
// abciListeners for hooking into the ABCI message processing of the BaseApp
138+
// and exposing the requests and responses to external consumers
139+
abciListeners []ABCIListener
136140
}
137141

138142
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a

baseapp/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,3 +237,14 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
237237
app.grpcQueryRouter.SetInterfaceRegistry(registry)
238238
app.msgServiceRouter.SetInterfaceRegistry(registry)
239239
}
240+
241+
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
242+
func (app *BaseApp) SetStreamingService(s StreamingService) {
243+
// add the listeners for each StoreKey
244+
for key, lis := range s.Listeners() {
245+
app.cms.AddListeners(key, lis)
246+
}
247+
// register the StreamingService within the BaseApp
248+
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
249+
app.abciListeners = append(app.abciListeners, s)
250+
}

baseapp/streaming.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package baseapp
2+
3+
import (
4+
"io"
5+
"sync"
6+
7+
abci "github.com/tendermint/tendermint/abci/types"
8+
9+
store "github.com/cosmos/cosmos-sdk/store/types"
10+
"github.com/cosmos/cosmos-sdk/types"
11+
)
12+
13+
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
14+
type ABCIListener interface {
15+
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
16+
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
17+
// ListenEndBlock updates the steaming service with the latest EndBlock messages
18+
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
19+
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
20+
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
21+
}
22+
23+
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
24+
type StreamingService interface {
25+
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
26+
Stream(wg *sync.WaitGroup) error
27+
// Listeners returns the streaming service's listeners for the BaseApp to register
28+
Listeners() map[store.StoreKey][]store.WriteListener
29+
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
30+
ABCIListener
31+
// Closer interface
32+
io.Closer
33+
}

0 commit comments

Comments
 (0)