-
Notifications
You must be signed in to change notification settings - Fork 95
Expand file tree
/
Copy pathdaemon.go
More file actions
229 lines (202 loc) · 7.3 KB
/
daemon.go
File metadata and controls
229 lines (202 loc) · 7.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/algorand/go-algorand/rpcs"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/algorand/indexer/api"
"github.com/algorand/indexer/config"
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/util/metrics"
)
var (
algodDataDir string
algodAddr string
algodToken string
daemonServerAddr string
noAlgod bool
developerMode bool
allowMigration bool
metricsMode string
tokenString string
writeTimeout time.Duration
readTimeout time.Duration
maxConn uint32
)
var daemonCmd = &cobra.Command{
Use: "daemon",
Short: "run indexer daemon",
Long: "run indexer daemon. Serve api on HTTP.",
//Args:
Run: func(cmd *cobra.Command, args []string) {
var err error
config.BindFlags(cmd)
err = configureLogger()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to configure logger: %v", err)
os.Exit(1)
}
if algodDataDir == "" {
algodDataDir = os.Getenv("ALGORAND_DATA")
}
ctx, cf := context.WithCancel(context.Background())
defer cf()
{
cancelCh := make(chan os.Signal, 1)
signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-cancelCh
logger.Println("Stopping Indexer.")
cf()
}()
}
var bot fetcher.Fetcher
if noAlgod {
logger.Info("algod block following disabled")
} else if algodAddr != "" && algodToken != "" {
bot, err = fetcher.ForNetAndToken(algodAddr, algodToken, logger)
maybeFail(err, "fetcher setup, %v", err)
} else if algodDataDir != "" {
bot, err = fetcher.ForDataDir(algodDataDir, logger)
maybeFail(err, "fetcher setup, %v", err)
} else {
// no algod was found
noAlgod = true
}
opts := idb.IndexerDbOptions{}
if noAlgod && !allowMigration {
opts.ReadOnly = true
}
opts.MaxConn = maxConn
db, availableCh := indexerDbFromFlags(opts)
defer db.Close()
var wg sync.WaitGroup
if bot != nil {
wg.Add(1)
go func() {
defer wg.Done()
// Wait until the database is available.
<-availableCh
// Initial import if needed.
genesisReader := importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger)
_, err := importer.EnsureInitialImport(db, genesisReader, logger)
maybeFail(err, "importer.EnsureInitialImport() error")
logger.Info("Initializing block import handler.")
nextRound, err := db.GetNextRoundToAccount()
maybeFail(err, "failed to get next round, %v", err)
bot.SetNextRound(nextRound)
imp := importer.NewImporter(db)
handler := blockHandler(imp, 1*time.Second)
bot.SetBlockHandler(handler)
logger.Info("Starting block importer.")
err = bot.Run(ctx)
if err != nil {
// If context is not expired.
if ctx.Err() == nil {
logger.WithError(err).Errorf("fetcher exited with error")
os.Exit(1)
}
}
}()
} else {
logger.Info("No block importer configured.")
}
fmt.Printf("serving on %s\n", daemonServerAddr)
logger.Infof("serving on %s", daemonServerAddr)
api.Serve(ctx, daemonServerAddr, db, bot, logger, makeOptions())
wg.Wait()
},
}
func init() {
daemonCmd.Flags().StringVarP(&algodDataDir, "algod", "d", "", "path to algod data dir, or $ALGORAND_DATA")
daemonCmd.Flags().StringVarP(&algodAddr, "algod-net", "", "", "host:port of algod")
daemonCmd.Flags().StringVarP(&algodToken, "algod-token", "", "", "api access token for algod")
daemonCmd.Flags().StringVarP(&genesisJSONPath, "genesis", "g", "", "path to genesis.json (defaults to genesis.json in algod data dir if that was set)")
daemonCmd.Flags().StringVarP(&daemonServerAddr, "server", "S", ":8980", "host:port to serve API on (default :8980)")
daemonCmd.Flags().BoolVarP(&noAlgod, "no-algod", "", false, "disable connecting to algod for block following")
daemonCmd.Flags().StringVarP(&tokenString, "token", "t", "", "an optional auth token, when set REST calls must use this token in a bearer format, or in a 'X-Indexer-API-Token' header")
daemonCmd.Flags().BoolVarP(&developerMode, "dev-mode", "", false, "allow performance intensive operations like searching for accounts at a particular round")
daemonCmd.Flags().BoolVarP(&allowMigration, "allow-migration", "", false, "allow migrations to happen even when no algod connected")
daemonCmd.Flags().StringVarP(&metricsMode, "metrics-mode", "", "OFF", "configure the /metrics endpoint to [ON, OFF, VERBOSE]")
daemonCmd.Flags().DurationVarP(&writeTimeout, "write-timeout", "", 30*time.Second, "set the maximum duration to wait before timing out writes to a http response, breaking connection")
daemonCmd.Flags().DurationVarP(&readTimeout, "read-timeout", "", 5*time.Second, "set the maximum duration for reading the entire request")
daemonCmd.Flags().Uint32VarP(&maxConn, "max-conn", "", 0, "set the maximum connections allowed in the connection pool, if the maximum is reached subsequent connections will wait until a connection becomes available, or timeout according to the read-timeout setting")
viper.RegisterAlias("algod", "algod-data-dir")
viper.RegisterAlias("algod-net", "algod-address")
viper.RegisterAlias("server", "server-address")
viper.RegisterAlias("token", "api-token")
}
// makeOptions converts CLI options to server options
func makeOptions() (options api.ExtraOptions) {
options.DeveloperMode = developerMode
if tokenString != "" {
options.Tokens = append(options.Tokens, tokenString)
}
switch strings.ToUpper(metricsMode) {
case "OFF":
options.MetricsEndpoint = false
options.MetricsEndpointVerbose = false
case "ON":
options.MetricsEndpoint = true
options.MetricsEndpointVerbose = false
case "VERBOSE":
options.MetricsEndpoint = true
options.MetricsEndpointVerbose = true
}
options.WriteTimeout = writeTimeout
options.ReadTimeout = readTimeout
return
}
// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
return func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
for {
err := handleBlock(block, imp)
if err == nil {
// return on success.
return nil
}
// Delay or terminate before next attempt.
select {
case <-ctx.Done():
return err
case <-time.After(retryDelay):
break
}
}
}
}
func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error {
start := time.Now()
err := imp.ImportBlock(block)
if err != nil {
logger.WithError(err).Errorf(
"adding block %d to database failed", block.Block.Round())
return fmt.Errorf("handleBlock() err: %w", err)
}
dt := time.Since(start)
// Ignore round 0 (which is empty).
if block.Block.Round() > 0 {
metrics.BlockImportTimeSeconds.Observe(dt.Seconds())
metrics.ImportedRoundGauge.Set(float64(block.Block.Round()))
txnCountByType := make(map[string]int)
for _, txn := range block.Block.Payset {
txnCountByType[string(txn.Txn.Type)]++
}
for k, v := range txnCountByType {
metrics.ImportedTxnsPerBlock.WithLabelValues(k).Set(float64(v))
}
}
logger.Infof("round r=%d (%d txn) imported in %s", block.Block.Round(), len(block.Block.Payset), dt.String())
return nil
}