Skip to content

Commit 1d6adca

Browse files
yihuangi-norden
authored andcommitted
feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (cosmos#8664) (cosmos#13326)
* feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (cosmos#8664) <!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺ v ✰ Thanks for creating a PR! ✰ v Before smashing the submit button please review the checkboxes. v If a checkbox is n/a - please still include it but + a little note why ☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > --> <!-- Add a description of the changes that this PR introduces and the files that are the most critical to review. --> Hello 👋 this PR introduces the second stage of changes to support [ADR-038](cosmos#8012) state listening. This is rebased on top of the [first segment](cosmos#8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type. In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level. The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here. Thanks! This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md) --- Before we can merge this PR, please make sure that all the following items have been checked off. If any of the checklist items are not applicable, please leave them but write a little note why. - [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work. - [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md). - [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`) - [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code). - [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md` - [x] Re-reviewed `Files changed` in the Github PR explorer - [x] Review `Codecov Report` in the comment section below once CI passes * fix lint * Update CHANGELOG.md Co-authored-by: Ian Norden <iansnorden@gmail.com>
1 parent 360e92c commit 1d6adca

File tree

15 files changed

+1253
-392
lines changed

15 files changed

+1253
-392
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ Ref: https://keepachangelog.com/en/1.0.0/
4848

4949
- (cli) [#13089](https://github.com/cosmos/cosmos-sdk/pull/13089) Fix rollback command don't actually delete multistore versions, added method `RollbackToVersion` to interface `CommitMultiStore` and added method `CommitMultiStore` to `Application` interface.
5050

51+
### Improvements
52+
53+
* (store) [\#13326](https://github.com/cosmos/cosmos-sdk/pull/13326) Implementation of ADR-038 file StreamingService, backport #8664
54+
5155
## v0.45.8 - 2022-08-25
5256

5357
### Improvements

baseapp/abci.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
242242
}
243243
// set the signed validators for addition to context in deliverTx
244244
app.voteInfos = req.LastCommitInfo.GetVotes()
245+
246+
// call the hooks with the BeginBlock messages
247+
for _, streamingListener := range app.abciListeners {
248+
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
249+
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
250+
}
251+
}
252+
245253
return res
246254
}
247255

@@ -264,7 +272,12 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
264272
res.ConsensusParamUpdates = cp
265273
}
266274

267-
app.deliverState.eventHistory = []abci.Event{}
275+
// call the streaming service hooks with the EndBlock messages
276+
for _, streamingListener := range app.abciListeners {
277+
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
278+
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
279+
}
280+
}
268281

269282
return res
270283
}
@@ -310,9 +323,17 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
310323
// Otherwise, the ResponseDeliverTx will contain releveant error information.
311324
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
312325
// gas execution context.
313-
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
326+
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
314327
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
315328

329+
defer func() {
330+
for _, streamingListener := range app.abciListeners {
331+
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
332+
app.logger.Error("DeliverTx listening hook failed", "err", err)
333+
}
334+
}
335+
}()
336+
316337
gInfo := sdk.GasInfo{}
317338
resultStr := "successful"
318339

baseapp/baseapp.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,20 +167,9 @@ type BaseApp struct { //nolint: maligned
167167
// which informs CometBFT what to index. If empty, all events will be indexed.
168168
indexEvents map[string]struct{}
169169

170-
// streamingManager for managing instances and configuration of ABCIListener services
171-
streamingManager storetypes.StreamingManager
172-
173-
chainID string
174-
175-
cdc codec.Codec
176-
177-
// optimisticExec contains the context required for Optimistic Execution,
178-
// including the goroutine handling.This is experimental and must be enabled
179-
// by developers.
180-
optimisticExec *oe.OptimisticExecution
181-
182-
// includeNestedMsgsGas holds a set of message types for which gas costs for its nested messages are calculated.
183-
includeNestedMsgsGas map[string]struct{}
170+
// abciListeners for hooking into the ABCI message processing of the BaseApp
171+
// and exposing the requests and responses to external consumers
172+
abciListeners []ABCIListener
184173
}
185174

186175
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a

baseapp/options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,3 +397,14 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
397397
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
398398
app.grpcQueryRouter = grpcQueryRouter
399399
}
400+
401+
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
402+
func (app *BaseApp) SetStreamingService(s StreamingService) {
403+
// add the listeners for each StoreKey
404+
for key, lis := range s.Listeners() {
405+
app.cms.AddListeners(key, lis)
406+
}
407+
// register the StreamingService within the BaseApp
408+
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
409+
app.abciListeners = append(app.abciListeners, s)
410+
}

baseapp/streaming.go

Lines changed: 25 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -1,200 +1,33 @@
11
package baseapp
22

33
import (
4-
"context"
5-
"fmt"
6-
"sort"
7-
"strings"
4+
"io"
5+
"sync"
86

9-
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
10-
"github.com/spf13/cast"
7+
abci "github.com/tendermint/tendermint/abci/types"
118

12-
"cosmossdk.io/log"
13-
"cosmossdk.io/schema"
14-
"cosmossdk.io/schema/appdata"
15-
"cosmossdk.io/schema/decoding"
16-
"cosmossdk.io/schema/indexer"
17-
"cosmossdk.io/store/streaming"
18-
storetypes "cosmossdk.io/store/types"
19-
20-
"github.com/cosmos/cosmos-sdk/client/flags"
21-
servertypes "github.com/cosmos/cosmos-sdk/server/types"
22-
)
23-
24-
const (
25-
StreamingTomlKey = "streaming"
26-
StreamingABCITomlKey = "abci"
27-
StreamingABCIPluginTomlKey = "plugin"
28-
StreamingABCIKeysTomlKey = "keys"
29-
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
9+
store "github.com/cosmos/cosmos-sdk/store/types"
10+
"github.com/cosmos/cosmos-sdk/types"
3011
)
3112

32-
// EnableIndexer enables the built-in indexer with the provided options (usually from the app.toml indexer key),
33-
// kv-store keys, and app modules. Using the built-in indexer framework is mutually exclusive from using other
34-
// types of streaming listeners.
35-
func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*storetypes.KVStoreKey, appModules map[string]any) error {
36-
listener, err := indexer.StartManager(indexer.ManagerOptions{
37-
Config: indexerOpts,
38-
Resolver: decoding.ModuleSetDecoderResolver(appModules),
39-
SyncSource: nil,
40-
Logger: app.logger.With(log.ModuleKey, "indexer"),
41-
})
42-
if err != nil {
43-
return err
44-
}
45-
46-
exposedKeys := exposeStoreKeysSorted([]string{"*"}, keys)
47-
app.cms.AddListeners(exposedKeys)
48-
49-
app.streamingManager = storetypes.StreamingManager{
50-
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener}},
51-
StopNodeOnErr: true,
52-
}
53-
54-
return nil
55-
}
56-
57-
// RegisterStreamingServices registers streaming services with the BaseApp.
58-
func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error {
59-
// register streaming services
60-
streamingCfg := cast.ToStringMap(appOpts.Get(StreamingTomlKey))
61-
for service := range streamingCfg {
62-
pluginKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, service, StreamingABCIPluginTomlKey)
63-
pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey)))
64-
if len(pluginName) > 0 {
65-
logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel))
66-
plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel)
67-
if err != nil {
68-
return fmt.Errorf("failed to load streaming plugin: %w", err)
69-
}
70-
if err := app.registerStreamingPlugin(appOpts, keys, plugin); err != nil {
71-
return fmt.Errorf("failed to register streaming plugin %w", err)
72-
}
73-
}
74-
}
75-
76-
return nil
77-
}
78-
79-
// registerStreamingPlugin registers streaming plugins with the BaseApp.
80-
func (app *BaseApp) registerStreamingPlugin(
81-
appOpts servertypes.AppOptions,
82-
keys map[string]*storetypes.KVStoreKey,
83-
streamingPlugin interface{},
84-
) error {
85-
v, ok := streamingPlugin.(storetypes.ABCIListener)
86-
if !ok {
87-
return fmt.Errorf("unexpected plugin type %T", v)
88-
}
89-
90-
app.registerABCIListenerPlugin(appOpts, keys, v)
91-
return nil
92-
}
93-
94-
// registerABCIListenerPlugin registers plugins that implement the ABCIListener interface.
95-
func (app *BaseApp) registerABCIListenerPlugin(
96-
appOpts servertypes.AppOptions,
97-
keys map[string]*storetypes.KVStoreKey,
98-
abciListener storetypes.ABCIListener,
99-
) {
100-
stopNodeOnErrKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIStopNodeOnErrTomlKey)
101-
stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey))
102-
keysKey := fmt.Sprintf("%s.%s.%s", StreamingTomlKey, StreamingABCITomlKey, StreamingABCIKeysTomlKey)
103-
exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey))
104-
exposedKeys := exposeStoreKeysSorted(exposeKeysStr, keys)
105-
app.cms.AddListeners(exposedKeys)
106-
app.SetStreamingManager(
107-
storetypes.StreamingManager{
108-
ABCIListeners: []storetypes.ABCIListener{abciListener},
109-
StopNodeOnErr: stopNodeOnErr,
110-
},
111-
)
112-
}
113-
114-
func exposeAll(list []string) bool {
115-
for _, ele := range list {
116-
if ele == "*" {
117-
return true
118-
}
119-
}
120-
return false
121-
}
122-
123-
func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStoreKey) []storetypes.StoreKey {
124-
var exposeStoreKeys []storetypes.StoreKey
125-
if exposeAll(keysStr) {
126-
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keys))
127-
for key := range keys {
128-
exposeStoreKeys = append(exposeStoreKeys, keys[key])
129-
}
130-
} else {
131-
exposeStoreKeys = make([]storetypes.StoreKey, 0, len(keysStr))
132-
for _, keyStr := range keysStr {
133-
if storeKey, ok := keys[keyStr]; ok {
134-
exposeStoreKeys = append(exposeStoreKeys, storeKey)
135-
}
136-
}
137-
}
138-
// sort storeKeys for deterministic output
139-
sort.SliceStable(exposeStoreKeys, func(i, j int) bool {
140-
return exposeStoreKeys[i].Name() < exposeStoreKeys[j].Name()
141-
})
142-
143-
return exposeStoreKeys
144-
}
145-
146-
type listenerWrapper struct {
147-
listener appdata.Listener
148-
}
149-
150-
func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
151-
if p.listener.StartBlock != nil {
152-
err := p.listener.StartBlock(appdata.StartBlockData{
153-
Height: uint64(req.Height),
154-
})
155-
if err != nil {
156-
return err
157-
}
158-
}
159-
160-
//// TODO txs, events
161-
162-
return nil
163-
}
164-
165-
func (p listenerWrapper) ListenCommit(ctx context.Context, res abci.CommitResponse, changeSet []*storetypes.StoreKVPair) error {
166-
if cb := p.listener.OnKVPair; cb != nil {
167-
updates := make([]appdata.ActorKVPairUpdate, len(changeSet))
168-
for i, pair := range changeSet {
169-
updates[i] = appdata.ActorKVPairUpdate{
170-
Actor: []byte(pair.StoreKey),
171-
StateChanges: []schema.KVPairUpdate{
172-
{
173-
Key: pair.Key,
174-
Value: pair.Value,
175-
Remove: pair.Delete,
176-
},
177-
},
178-
}
179-
}
180-
err := cb(appdata.KVPairData{Updates: updates})
181-
if err != nil {
182-
return err
183-
}
184-
}
185-
186-
if p.listener.Commit != nil {
187-
commitCb, err := p.listener.Commit(appdata.CommitData{})
188-
if err != nil {
189-
return err
190-
}
191-
if commitCb != nil {
192-
err := commitCb()
193-
if err != nil {
194-
return err
195-
}
196-
}
197-
}
198-
199-
return nil
13+
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
14+
type ABCIListener interface {
15+
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
16+
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
17+
// ListenEndBlock updates the steaming service with the latest EndBlock messages
18+
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
19+
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
20+
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
21+
}
22+
23+
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
24+
type StreamingService interface {
25+
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
26+
Stream(wg *sync.WaitGroup) error
27+
// Listeners returns the streaming service's listeners for the BaseApp to register
28+
Listeners() map[store.StoreKey][]store.WriteListener
29+
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
30+
ABCIListener
31+
// Closer interface
32+
io.Closer
20033
}

0 commit comments

Comments
 (0)