Skip to content

Commit 3531905

Browse files
Import validation tool implementation. (algorand#775)
1 parent a35ec31 commit 3531905

File tree

5 files changed

+504
-4
lines changed

5 files changed

+504
-4
lines changed

cmd/import-validator/main.go

Lines changed: 395 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,395 @@
1+
// The import validator tool imports blocks into indexer database and algod's sqlite
2+
// database in lockstep and checks that the modified accounts are the same in the two
3+
// databases. It lets detect the first round where an accounting discrepancy occurs
4+
// and it prints out what the difference is before crashing.
5+
// There is a small limitation, however. The set of modified accounts is computed using
6+
// the sqlite database. Thus, if indexer's accounting were to modify a superset of
7+
// those accounts, this tool would not detect it. This, however, should be unlikely.
8+
9+
package main
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"os"
15+
"path"
16+
"reflect"
17+
"sync"
18+
"time"
19+
20+
"github.com/algorand/go-algorand-sdk/client/v2/algod"
21+
"github.com/algorand/go-algorand/agreement"
22+
"github.com/algorand/go-algorand/config"
23+
"github.com/algorand/go-algorand/data/basics"
24+
"github.com/algorand/go-algorand/data/bookkeeping"
25+
"github.com/algorand/go-algorand/ledger"
26+
"github.com/algorand/go-algorand/ledger/ledgercore"
27+
"github.com/algorand/go-algorand/logging"
28+
"github.com/algorand/go-algorand/protocol"
29+
"github.com/algorand/go-algorand/rpcs"
30+
"github.com/sirupsen/logrus"
31+
"github.com/spf13/cobra"
32+
33+
"github.com/algorand/indexer/fetcher"
34+
"github.com/algorand/indexer/idb"
35+
"github.com/algorand/indexer/idb/postgres"
36+
"github.com/algorand/indexer/util"
37+
)
38+
39+
type blockHandler struct {
40+
f func(*rpcs.EncodedBlockCert) error
41+
}
42+
43+
func (h blockHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
44+
err := h.f(block)
45+
if err != nil {
46+
fmt.Printf("error handling block %d err: %v\n", block.Block.Round(), err)
47+
os.Exit(1)
48+
}
49+
}
50+
51+
func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) {
52+
data, err := client.BlockRaw(0).Do(context.Background())
53+
if err != nil {
54+
return bookkeeping.Block{}, fmt.Errorf("getGenesisBlock() client err: %w", err)
55+
}
56+
57+
var block rpcs.EncodedBlockCert
58+
err = protocol.Decode(data, &block)
59+
if err != nil {
60+
return bookkeeping.Block{}, fmt.Errorf("getGenesisBlock() decode err: %w", err)
61+
}
62+
63+
return block.Block, nil
64+
}
65+
66+
func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) {
67+
data, err := client.GetGenesis().Do(context.Background())
68+
if err != nil {
69+
return bookkeeping.Genesis{}, fmt.Errorf("getGenesis() client err: %w", err)
70+
}
71+
72+
var res bookkeeping.Genesis
73+
err = protocol.DecodeJSON([]byte(data), &res)
74+
if err != nil {
75+
return bookkeeping.Genesis{}, fmt.Errorf("getGenesis() decode err: %w", err)
76+
}
77+
78+
return res, nil
79+
}
80+
81+
func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block, logger *logrus.Logger) (*postgres.IndexerDb, error) {
82+
db, availableCh, err :=
83+
postgres.OpenPostgres(postgresConnStr, idb.IndexerDbOptions{}, logger)
84+
if err != nil {
85+
return nil, fmt.Errorf("openIndexerDb() err: %w", err)
86+
}
87+
<-availableCh
88+
89+
_, err = db.GetNextRoundToAccount()
90+
if err != idb.ErrorNotInitialized {
91+
if err != nil {
92+
return nil, fmt.Errorf("openIndexerDb() err: %w", err)
93+
}
94+
} else {
95+
err = db.LoadGenesis(*genesis)
96+
if err != nil {
97+
return nil, fmt.Errorf("openIndexerDb() err: %w", err)
98+
}
99+
}
100+
101+
nextRound, err := db.GetNextRoundToAccount()
102+
if err != nil {
103+
return nil, fmt.Errorf("openIndexerDb() err: %w", err)
104+
}
105+
106+
if nextRound == 0 {
107+
err = db.AddBlock(genesisBlock)
108+
if err != nil {
109+
return nil, fmt.Errorf("openIndexerDb() err: %w", err)
110+
}
111+
}
112+
113+
return db, nil
114+
}
115+
116+
func openLedger(ledgerPath string, genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block) (*ledger.Ledger, error) {
117+
logger := logging.NewLogger()
118+
119+
accounts := make(map[basics.Address]basics.AccountData)
120+
for _, alloc := range genesis.Allocation {
121+
address, err := basics.UnmarshalChecksumAddress(alloc.Address)
122+
if err != nil {
123+
return nil, fmt.Errorf("openLedger() decode address err: %w", err)
124+
}
125+
accounts[address] = alloc.State
126+
}
127+
128+
initState := ledgercore.InitState{
129+
Block: *genesisBlock,
130+
Accounts: accounts,
131+
GenesisHash: genesisBlock.GenesisHash(),
132+
}
133+
134+
ledger, err := ledger.OpenLedger(
135+
logger, path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal())
136+
if err != nil {
137+
return nil, fmt.Errorf("openLedger() open err: %w", err)
138+
}
139+
140+
return ledger, nil
141+
}
142+
143+
func getModifiedAccounts(l *ledger.Ledger, block *bookkeeping.Block) ([]basics.Address, error) {
144+
eval, err := l.StartEvaluator(block.BlockHeader, len(block.Payset), 0)
145+
if err != nil {
146+
return nil, fmt.Errorf("changedAccounts() start evaluator err: %w", err)
147+
}
148+
149+
paysetgroups, err := block.DecodePaysetGroups()
150+
if err != nil {
151+
return nil, fmt.Errorf("changedAccounts() decode payset groups err: %w", err)
152+
}
153+
154+
for _, group := range paysetgroups {
155+
err = eval.TransactionGroup(group)
156+
if err != nil {
157+
return nil, fmt.Errorf("changedAccounts() apply transaction group err: %w", err)
158+
}
159+
}
160+
161+
vb, err := eval.GenerateBlock()
162+
if err != nil {
163+
return nil, fmt.Errorf("changedAccounts() generate block err: %w", err)
164+
}
165+
166+
accountDeltas := vb.Delta().Accts
167+
return accountDeltas.ModifiedAccounts(), nil
168+
}
169+
170+
func checkModifiedAccounts(db *postgres.IndexerDb, l *ledger.Ledger, block *bookkeeping.Block, addresses []basics.Address) error {
171+
var accountsIndexer map[basics.Address]basics.AccountData
172+
var err0 error
173+
var accountsAlgod map[basics.Address]basics.AccountData
174+
var err1 error
175+
var wg sync.WaitGroup
176+
177+
wg.Add(1)
178+
go func() {
179+
defer wg.Done()
180+
181+
accountsIndexer, err0 = db.GetAccountData(addresses)
182+
if err0 != nil {
183+
err0 = fmt.Errorf("checkModifiedAccounts() err0: %w", err0)
184+
return
185+
}
186+
}()
187+
188+
wg.Add(1)
189+
go func() {
190+
defer wg.Done()
191+
192+
accountsAlgod = make(map[basics.Address]basics.AccountData, len(addresses))
193+
for _, address := range addresses {
194+
var accountData basics.AccountData
195+
accountData, _, err1 = l.LookupWithoutRewards(block.Round(), address)
196+
if err1 != nil {
197+
err1 = fmt.Errorf("checkModifiedAccounts() lookup err1: %w", err1)
198+
return
199+
}
200+
201+
// Indexer returns nil for these maps if they are empty. Unfortunately,
202+
// in go-algorand it's not well defined, and sometimes ledger returns empty
203+
// maps and sometimes nil maps. So we set those maps to nil if they are empty so
204+
// that comparison works.
205+
if len(accountData.AssetParams) == 0 {
206+
accountData.AssetParams = nil
207+
}
208+
if len(accountData.Assets) == 0 {
209+
accountData.Assets = nil
210+
}
211+
212+
if accountData.AppParams != nil {
213+
// Make a copy of `AppParams` to avoid modifying ledger's storage.
214+
appParams :=
215+
make(map[basics.AppIndex]basics.AppParams, len(accountData.AppParams))
216+
for index, params := range accountData.AppParams {
217+
if len(params.GlobalState) == 0 {
218+
params.GlobalState = nil
219+
}
220+
appParams[index] = params
221+
}
222+
accountData.AppParams = appParams
223+
}
224+
225+
if accountData.AppLocalStates != nil {
226+
// Make a copy of `AppLocalStates` to avoid modifying ledger's storage.
227+
appLocalStates :=
228+
make(map[basics.AppIndex]basics.AppLocalState, len(accountData.AppLocalStates))
229+
for index, state := range accountData.AppLocalStates {
230+
if len(state.KeyValue) == 0 {
231+
state.KeyValue = nil
232+
}
233+
appLocalStates[index] = state
234+
}
235+
accountData.AppLocalStates = appLocalStates
236+
}
237+
238+
accountsAlgod[address] = accountData
239+
}
240+
}()
241+
242+
wg.Wait()
243+
if err0 != nil {
244+
return err0
245+
}
246+
if err1 != nil {
247+
return err1
248+
}
249+
250+
if !reflect.DeepEqual(accountsIndexer, accountsAlgod) {
251+
diff := util.Diff(accountsAlgod, accountsIndexer)
252+
return fmt.Errorf(
253+
"checkModifiedAccounts() accounts differ,"+
254+
"\naccountsIndexer: %+v,\naccountsAlgod: %+v,\ndiff: %s",
255+
accountsIndexer, accountsAlgod, diff)
256+
}
257+
258+
return nil
259+
}
260+
261+
func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *logrus.Logger) error {
262+
nextRoundIndexer, err := db.GetNextRoundToAccount()
263+
if err != nil {
264+
return fmt.Errorf("catchup err: %w", err)
265+
}
266+
nextRoundLedger := uint64(l.Latest()) + 1
267+
268+
if nextRoundLedger > nextRoundIndexer {
269+
return fmt.Errorf(
270+
"catchup() ledger is ahead of indexer nextRoundIndexer: %d nextRoundLedger: %d",
271+
nextRoundIndexer, nextRoundLedger)
272+
}
273+
274+
if nextRoundIndexer > nextRoundLedger+1 {
275+
return fmt.Errorf(
276+
"catchup() indexer is too ahead of ledger "+
277+
"nextRoundIndexer: %d nextRoundLedger: %d",
278+
nextRoundIndexer, nextRoundLedger)
279+
}
280+
281+
blockHandlerFunc := func(block *rpcs.EncodedBlockCert) error {
282+
var modifiedAccounts []basics.Address
283+
var err0 error
284+
var err1 error
285+
var wg sync.WaitGroup
286+
287+
wg.Add(1)
288+
go func() {
289+
modifiedAccounts, err0 = getModifiedAccounts(l, &block.Block)
290+
wg.Done()
291+
}()
292+
293+
if nextRoundLedger >= nextRoundIndexer {
294+
wg.Add(1)
295+
go func() {
296+
start := time.Now()
297+
err1 = db.AddBlock(&block.Block)
298+
fmt.Printf(
299+
"%d transactions imported in %v\n",
300+
len(block.Block.Payset), time.Since(start))
301+
wg.Done()
302+
}()
303+
}
304+
305+
wg.Wait()
306+
if err0 != nil {
307+
return fmt.Errorf("catchup() err0: %w", err0)
308+
}
309+
if nextRoundLedger >= nextRoundIndexer {
310+
if err1 != nil {
311+
return fmt.Errorf("catchup() err1: %w", err1)
312+
}
313+
nextRoundIndexer++
314+
}
315+
316+
err0 = l.AddBlock(block.Block, agreement.Certificate{})
317+
if err0 != nil {
318+
return fmt.Errorf("catchup() err0: %w", err0)
319+
}
320+
nextRoundLedger++
321+
322+
return checkModifiedAccounts(db, l, &block.Block, modifiedAccounts)
323+
}
324+
bot.AddBlockHandler(blockHandler{f: blockHandlerFunc})
325+
bot.SetNextRound(nextRoundLedger)
326+
bot.Run()
327+
328+
return nil
329+
}
330+
331+
func main() {
332+
var algodAddr string
333+
var algodToken string
334+
var algodLedger string
335+
var postgresConnStr string
336+
337+
var rootCmd = &cobra.Command{
338+
Use: "import-validator",
339+
Short: "Import validator",
340+
Run: func(cmd *cobra.Command, args []string) {
341+
logger := logrus.New()
342+
343+
bot, err := fetcher.ForNetAndToken(algodAddr, algodToken, logger)
344+
if err != nil {
345+
fmt.Printf("error initializing fetcher err: %v", err)
346+
os.Exit(1)
347+
}
348+
349+
genesis, err := getGenesis(bot.Algod())
350+
if err != nil {
351+
fmt.Printf("error getting genesis err: %v", err)
352+
os.Exit(1)
353+
}
354+
genesisBlock, err := getGenesisBlock(bot.Algod())
355+
if err != nil {
356+
fmt.Printf("error getting genesis block err: %v", err)
357+
os.Exit(1)
358+
}
359+
360+
db, err := openIndexerDb(postgresConnStr, &genesis, &genesisBlock, logger)
361+
if err != nil {
362+
fmt.Printf("error opening indexer database err: %v", err)
363+
os.Exit(1)
364+
}
365+
l, err := openLedger(algodLedger, &genesis, &genesisBlock)
366+
if err != nil {
367+
fmt.Printf("error opening algod database err: %v", err)
368+
os.Exit(1)
369+
}
370+
371+
err = catchup(db, l, bot, logger)
372+
if err != nil {
373+
fmt.Printf("error catching up err: %v", err)
374+
os.Exit(1)
375+
}
376+
},
377+
}
378+
379+
rootCmd.Flags().StringVar(&algodAddr, "algod-net", "", "host:port of algod")
380+
rootCmd.MarkFlagRequired("algod-net")
381+
382+
rootCmd.Flags().StringVar(
383+
&algodToken, "algod-token", "", "api access token for algod")
384+
rootCmd.MarkFlagRequired("algod-token")
385+
386+
rootCmd.Flags().StringVar(
387+
&algodLedger, "algod-ledger", "", "path to algod ledger directory")
388+
rootCmd.MarkFlagRequired("algod-ledger")
389+
390+
rootCmd.Flags().StringVar(
391+
&postgresConnStr, "postgres", "", "connection string for postgres database")
392+
rootCmd.MarkFlagRequired("postgres")
393+
394+
rootCmd.Execute()
395+
}

0 commit comments

Comments
 (0)