Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7f88c60
docs: add ApplicationQueryService release notes (#13587)
robert-zaremba Oct 21, 2022
f538e09
feat(cli): add module-account cli cmd and grpc get api (backport #136…
mergify[bot] Oct 24, 2022
afdf06a
fix(x/auth): allow multiple = signs in `GetTxsEvent` (backport #12474…
mergify[bot] Oct 24, 2022
ec09eae
fix: app-hash mismatch if upgrade migration commit is interrupted (ba…
mergify[bot] Oct 25, 2022
a653798
feat(cli): Add iavl-disable-fastnode cmd flag with proper description…
mergify[bot] Oct 26, 2022
4eed46e
build(deps): Bump github.com/cosmos/iavl from 0.19.3 to 0.19.4 (#13680)
julienrbrt Oct 28, 2022
9d7028d
feat: emit cached context events (backport #13063) (#13702)
mergify[bot] Oct 31, 2022
eb1e3eb
chore: prepare 0.46.4 changelog (#13716)
julienrbrt Nov 1, 2022
68ee142
ci: notify for 0.46.x releases (#13719)
julienrbrt Nov 1, 2022
c7a9435
docs: fix algolia on 0.46 (#13730)
julienrbrt Nov 2, 2022
ba49bff
ci: modifying docs in 0.46 should not redeploy docs (#13732)
julienrbrt Nov 2, 2022
e4ae994
fix: propagate msg events correctly in x/gov (backport #13728) (#13748)
mergify[bot] Nov 3, 2022
2899dcf
refactor: add error log when iavl set failed (backport #13803) (#13804)
mergify[bot] Nov 9, 2022
0d1ed1a
chore: bump tendermint to `0.34.23` (#13814)
julienrbrt Nov 9, 2022
2bd2cf1
fix: propagate events in x/group through sdk.Results (backport #13808…
mergify[bot] Nov 9, 2022
3a809e2
docs: update algolia index (#13823)
julienrbrt Nov 10, 2022
62443b8
fix: bank store migration (backport #13821) (#13829)
mergify[bot] Nov 10, 2022
68e54fa
feat(types): set custom GasConfig on Context for GasKVStore (backport…
mergify[bot] Nov 11, 2022
2114ec4
chore: prepare 0.46.5 release (#13816)
julienrbrt Nov 14, 2022
2d515e0
fix: Allow underscores in EventRegex (backport #13861) (#13864)
mergify[bot] Nov 15, 2022
0b81939
fix(group): add group members weight checks (backport #13869) (#13880)
mergify[bot] Nov 16, 2022
6b633ef
fix(group)!: Fix group min execution period (backport #13876) (#13885)
mergify[bot] Nov 16, 2022
29cf4bc
feat(bank): Add helper for v0.46 denom migration (#13891)
amaury1093 Nov 16, 2022
e5fef13
chore: impove 0.46.5 release notes (#13898)
julienrbrt Nov 17, 2022
c80177e
refactor: State Streaming Docs + Explicit Config Support (backport #1…
mergify[bot] Nov 17, 2022
b88f086
chore: remove typo (#13914) (#13917)
mergify[bot] Nov 18, 2022
2f55513
fix: correctly propagate msg errors in gov (backport #13918) (#13928)
mergify[bot] Nov 18, 2022
8cce748
feat(bank): enable 0.46.5 bank migration fix through migrator keeper …
robert-zaremba Nov 18, 2022
8339276
chore: bump tendermint to 0.34.24 (#13972)
julienrbrt Nov 22, 2022
9765823
chore: add group and gov/v1 in swagger (backport #13984) (#13996)
mergify[bot] Nov 23, 2022
7bb7fa9
fix(baseapp): fix snapshot interval bug (backport #14049) (#14053)
mergify[bot] Nov 29, 2022
c1f4ca1
refactor: use viper unmarshal in `config.GetConfig` function (backpor…
mergify[bot] Nov 30, 2022
72699f7
fix(group)!: Don't re-tally proposals after VP end (backport #14071) …
mergify[bot] Nov 30, 2022
85d6f66
fix: do not shadow clientCtx in start.go (backport #14086) (#14101)
mergify[bot] Nov 30, 2022
6e5e302
chore: Test for Server Config Read #14125 (#14127)
mergify[bot] Dec 1, 2022
ff27cd3
fix: remove duplicate ante events (backport #13983) (#14142)
mergify[bot] Dec 3, 2022
a627446
feat: support alternative query multistore (backport #13529) (#14169)
mergify[bot] Dec 6, 2022
cc06bce
fix: state listener observe writes at wrong time (backport #13516) (#…
mergify[bot] Dec 6, 2022
45d2f08
refactor: provide a helper for baseapp options (backport #14175) (#14…
mergify[bot] Dec 6, 2022
463f4e4
feat: add --grpc client option (backport #14051) (#14192)
mergify[bot] Dec 7, 2022
eb6d0ef
refactor: cleanup store/streaming/constructor.go (backport #14044) (#…
mergify[bot] Dec 9, 2022
fcfa361
chore: audit store/streaming/file/service.go (backport #14234) (#14241)
mergify[bot] Dec 10, 2022
79ac73d
fix: fix grpc flag conflict (backport #14244) (#14248)
mergify[bot] Dec 10, 2022
4153b12
chore: prepare 0.46.7 release (#14103)
julienrbrt Dec 13, 2022
f71df80
fix(gov): Fix v3 votes migrations (backport #14214) (#14277)
mergify[bot] Dec 13, 2022
54bcb8b
Merge tag 'v0.46.7' into prov/dwedul/v0.46.7-to-our-0.46.x
SpicyLemon Dec 13, 2022
65ee327
Add changelog entry for bringing in v0.46.7 changes.
SpicyLemon Dec 13, 2022
cb3382f
Fix the changelog. The merge duplicated several SDK sections.
SpicyLemon Dec 13, 2022
17b0ad1
Fix the TestFileStreamingService.
SpicyLemon Dec 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: State Streaming Docs + Explicit Config Support (backport co…
  • Loading branch information
mergify[bot] authored Nov 17, 2022
commit c80177e785a44f0bfb5351fe9741fe5b55c82a60
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Improvements

* (config) [#13894](https://github.com/cosmos/cosmos-sdk/pull/13894) Support state streaming configuration in `app.toml` template and default configuration.

## [v0.46.5](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.46.5) - 2022-11-17

### Features
Expand Down
35 changes: 35 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

// FileStreamer defines the store streaming type for file streaming.
FileStreamer = "file"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -196,6 +199,28 @@ type StateSyncConfig struct {
SnapshotKeepRecent uint32 `mapstructure:"snapshot-keep-recent"`
}

type (
// StoreConfig defines application configuration for state streaming and other
// storage related operations.
StoreConfig struct {
Streamers []string `mapstructure:"streamers"`
}

// StreamersConfig defines concrete state streaming configuration options. These
// fields are required to be set when state streaming is enabled via a non-empty
// list defined by 'StoreConfig.Streamers'.
StreamersConfig struct {
File FileStreamerConfig `mapstructure:"file"`
}

// FileStreamerConfig defines the file streaming configuration options.
FileStreamerConfig struct {
Keys []string `mapstructure:"keys"`
WriteDir string `mapstructure:"write_dir"`
Prefix string `mapstructure:"prefix"`
}
)

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -207,6 +232,8 @@ type Config struct {
Rosetta RosettaConfig `mapstructure:"rosetta"`
GRPCWeb GRPCWebConfig `mapstructure:"grpc-web"`
StateSync StateSyncConfig `mapstructure:"state-sync"`
Store StoreConfig `mapstructure:"store"`
Streamers StreamersConfig `mapstructure:"streamers"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -288,6 +315,14 @@ func DefaultConfig() *Config {
SnapshotInterval: 0,
SnapshotKeepRecent: 2,
},
Store: StoreConfig{
Streamers: []string{},
},
Streamers: StreamersConfig{
File: FileStreamerConfig{
Keys: []string{"*"},
},
},
}
}

Expand Down
35 changes: 30 additions & 5 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand All @@ -32,28 +31,54 @@ func TestIndexEventsMarshalling(t *testing.T) {
err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestParseStoreStreaming(t *testing.T) {
expectedContents := `[store]
streamers = ["file", ]

[streamers]
[streamers.file]
keys = ["*", ]
write_dir = "/foo/bar"
prefix = ""`

cfg := DefaultConfig()
cfg.Store.Streamers = []string{FileStreamer}
cfg.Streamers.File.Keys = []string{"*"}
cfg.Streamers.File.WriteDir = "/foo/bar"

var buffer bytes.Buffer
require.NoError(t, configTemplate.Execute(&buffer, cfg), "executing template")
require.Contains(t, buffer.String(), expectedContents, "config file contents")
}

func TestIndexEventsWriteRead(t *testing.T) {
expected := []string{"key3", "key4"}

// Create config with two IndexEvents entries, and write it to a file.
confFile := filepath.Join(t.TempDir(), "app.toml")
conf := DefaultConfig()
conf.IndexEvents = expected

WriteConfigFile(confFile, conf)

// Read that file into viper.
// read the file into Viper
vpr := viper.New()
vpr.SetConfigFile(confFile)

err := vpr.ReadInConfig()
require.NoError(t, err, "reading config file into viper")

// Check that the raw viper value is correct.
actualRaw := vpr.GetStringSlice("index-events")
require.Equal(t, expected, actualRaw, "viper's index events")

// Check that it is parsed into the config correctly.
cfg, perr := ParseConfig(vpr)
require.NoError(t, perr, "parsing config")

actual := cfg.IndexEvents
require.Equal(t, expected, actual, "config value")
}
Expand All @@ -62,15 +87,15 @@ func TestGlobalLabelsEventsMarshalling(t *testing.T) {
expectedIn := `global-labels = [
["labelname1", "labelvalue1"],
["labelname2", "labelvalue2"],
]` + "\n"
]`
cfg := DefaultConfig()
cfg.Telemetry.GlobalLabels = [][]string{{"labelname1", "labelvalue1"}, {"labelname2", "labelvalue2"}}
var buffer bytes.Buffer

err := configTemplate.Execute(&buffer, cfg)
require.NoError(t, err, "executing template")
actual := buffer.String()
assert.Contains(t, actual, expectedIn, "config file contents")
require.Contains(t, actual, expectedIn, "config file contents")
}

func TestGlobalLabelsWriteRead(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,19 @@ snapshot-interval = {{ .StateSync.SnapshotInterval }}

# snapshot-keep-recent specifies the number of recent snapshots to keep and serve (0 to keep all).
snapshot-keep-recent = {{ .StateSync.SnapshotKeepRecent }}

###############################################################################
### Store / State Streaming ###
###############################################################################

[store]
streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}]

[streamers]
[streamers.file]
keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}]
write_dir = "{{ .Streamers.File.WriteDir }}"
prefix = "{{ .Streamers.File.Prefix }}"
`

var configTemplate *template.Template
Expand Down
7 changes: 4 additions & 3 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package simapp

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -228,10 +229,10 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// load state streaming if enabled
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
tmos.Exit(err.Error())
fmt.Printf("failed to load state streaming: %s", err)
os.Exit(1)
}

app := &SimApp{
Expand Down
73 changes: 47 additions & 26 deletions store/streaming/README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,38 @@
# State Streaming Service

This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
This package contains the constructors for the `StreamingService`s used to write
state changes out from individual KVStores to a file or stream, as described in
[ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md)
and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
output destinations can be added.
Currently, a `StreamingService` implementation that writes state changes out to
files is supported, in the future support for additional output destinations can
be added.

The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
The `StreamingService` is configured from within an App using the `AppOptions`
loaded from the `app.toml` file:

```toml
# ...

[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
# streaming is enabled if one or more streamers are defined
streamers = [
# name of the streaming service, used by constructor
"file"
]

[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```

`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
to return the `ServiceConstructor` for that particular implementation:
The `store.streamers` field contains a list of the names of the `StreamingService`
implementations to employ which are used by `ServiceTypeFromString` to return
the `ServiceConstructor` for that particular implementation:

```go
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
Expand All @@ -35,18 +44,27 @@ for _, listenerName := range listeners {
}
```

`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
The `streamers` field contains a mapping of the specific `StreamingService`
implementation name to the configuration parameters for that specific service.

The `streamers.x.keys` field contains the list of `StoreKey` names for the
KVStores to expose using this service and is required by every type of
`StreamingService`. In order to expose *ALL* KVStores, we can include `*` in
this list. An empty list is equivalent to turning the service off.

Additional configuration parameters are optional and specific to the implementation.
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.
In the case of the file streaming service, the `streamers.file.write_dir` field
contains the path to the directory to write the files to, and `streamers.file.prefix`
contains an optional prefix to prepend to the output files to prevent potential
collisions with other App `StreamingService` output files.

The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using
`streamers.x.keys`, a `BinaryMarshaller` and returns a `StreamingService
implementation.

The `AppOptions` are passed in to provide access to any implementation specific
configuration options, e.g. in the case of the file streaming service the
`streamers.file.write_dir` and `streamers.file.prefix`.

```go
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
Expand All @@ -55,9 +73,12 @@ if err != nil {
}
```

The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
may be synchronous or asynchronous with the message processing of the state machine.
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's
`SetStreamingService` method.

The `Stream` method is called on the service to begin the streaming process.
Depending on the implementation this process may be synchronous or asynchronous
with the message processing of the state machine.

```go
bApp.SetStreamingService(streamingService)
Expand Down