Skip to content

Commit 9bf01ba

Browse files
authored
Merge pull request #89 from deso-protocol/z/caching-and-txindex
Add caching and txindex updates
2 parents d419bc9 + 2c1a300 commit 9bf01ba

File tree

9 files changed

+94
-29
lines changed

9 files changed

+94
-29
lines changed

entries/bls_pkid_pair.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"github.com/deso-protocol/core/lib"
66
"github.com/deso-protocol/state-consumer/consumer"
7+
lru "github.com/hashicorp/golang-lru/v2"
78
"github.com/pkg/errors"
89
"github.com/uptrace/bun"
910
)
@@ -77,15 +78,15 @@ func BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct(
7778

7879
// BLSPublicKeyPKIDPairBatchOperation is the entry point for processing a batch of BLSPublicKeyPKIDPair entries.
7980
// It determines the appropriate handler based on the operation type and executes it.
80-
func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
81+
func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
8182
// We check before we call this function that there is at least one operation type.
8283
// We also ensure before this that all entries have the same operation type.
8384
operationType := entries[0].OperationType
8485
var err error
8586
if operationType == lib.DbOperationTypeDelete {
86-
err = bulkDeleteBLSPkidPairEntry(entries, db, operationType)
87+
err = bulkDeleteBLSPkidPairEntry(entries, db, operationType, cachedEntries)
8788
} else {
88-
err = bulkInsertBLSPkidPairEntry(entries, db, operationType, params)
89+
err = bulkInsertBLSPkidPairEntry(entries, db, operationType, params, cachedEntries)
8990
}
9091
if err != nil {
9192
return errors.Wrapf(err, "entries.StakeBatchOperation: Problem with operation type %v", operationType)
@@ -95,10 +96,13 @@ func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db bun.
9596

9697
// bulkInsertBLSPkidPairEntry inserts a batch of stake entries into the database.
9798
func bulkInsertBLSPkidPairEntry(
98-
entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams,
99-
) error {
99+
entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
100100
// Track the unique entries we've inserted so we don't insert the same entry twice.
101101
uniqueEntries := consumer.UniqueEntries(entries)
102+
103+
// Filter out any entries that are already tracked in the cache.
104+
uniqueEntries = consumer.FilterCachedEntries(uniqueEntries, cachedEntries)
105+
102106
uniqueBLSPkidPairEntries := consumer.FilterEntriesByPrefix(
103107
uniqueEntries, lib.Prefixes.PrefixValidatorBLSPublicKeyPKIDPairEntry)
104108
uniqueBLSPkidPairSnapshotEntries := consumer.FilterEntriesByPrefix(
@@ -145,11 +149,16 @@ func bulkInsertBLSPkidPairEntry(
145149
}
146150
}
147151

152+
// Update the cache with the new entries.
153+
for _, entry := range uniqueEntries {
154+
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
155+
}
156+
148157
return nil
149158
}
150159

151160
// bulkDeleteBLSPkidPairEntry deletes a batch of stake entries from the database.
152-
func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
161+
func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
153162
// Track the unique entries we've inserted so we don't insert the same entry twice.
154163
uniqueEntries := consumer.UniqueEntries(entries)
155164

@@ -182,5 +191,10 @@ func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db bun.IDB, ope
182191
}
183192
}
184193

194+
// Remove the deleted entries from the cache.
195+
for _, key := range keysToDelete {
196+
cachedEntries.Remove(string(key))
197+
}
198+
185199
return nil
186200
}

entries/jailed_history.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"github.com/deso-protocol/core/lib"
66
"github.com/deso-protocol/state-consumer/consumer"
7+
lru "github.com/hashicorp/golang-lru/v2"
78
"github.com/pkg/errors"
89
"github.com/uptrace/bun"
910
)
@@ -39,15 +40,15 @@ func UnjailValidatorStateChangeMetadataEncoderToPGStruct(
3940

4041
// ValidatorBatchOperation is the entry point for processing a batch of Validator entries.
4142
// It determines the appropriate handler based on the operation type and executes it.
42-
func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
43+
func JailedHistoryEventBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
4344
// We check before we call this function that there is at least one operation type.
4445
// We also ensure before this that all entries have the same operation type.
4546
operationType := entries[0].OperationType
4647
var err error
4748
if operationType == lib.DbOperationTypeDelete {
48-
err = bulkDeleteValidatorEntry(entries, db, operationType)
49+
err = bulkDeleteValidatorEntry(entries, db, operationType, cachedEntries)
4950
} else {
50-
err = bulkInsertValidatorEntry(entries, db, operationType, params)
51+
err = bulkInsertValidatorEntry(entries, db, operationType, params, cachedEntries)
5152
}
5253
if err != nil {
5354
return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType)

entries/pkid.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/deso-protocol/core/lib"
66
"github.com/deso-protocol/state-consumer/consumer"
77
"github.com/golang/glog"
8+
lru "github.com/hashicorp/golang-lru/v2"
89
"github.com/pkg/errors"
910
"github.com/uptrace/bun"
1011
)
@@ -127,15 +128,15 @@ func bulkDeletePkidEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationT
127128
return nil
128129
}
129130

130-
func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
131+
func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
131132
// We check before we call this function that there is at least one operation type.
132133
// We also ensure before this that all entries have the same operation type.
133134
operationType := entries[0].OperationType
134135
var err error
135136
if operationType == lib.DbOperationTypeDelete {
136-
err = bulkDeletePkid(entries, db, operationType)
137+
err = bulkDeletePkid(entries, db, operationType, cachedEntries)
137138
} else {
138-
err = bulkInsertPkid(entries, db, operationType, params)
139+
err = bulkInsertPkid(entries, db, operationType, params, cachedEntries)
139140
}
140141
if err != nil {
141142
return errors.Wrapf(err, "entries.PostBatchOperation: Problem with operation type %v", operationType)
@@ -144,12 +145,16 @@ func PkidBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib
144145
}
145146

146147
// bulkInsertPkid inserts a batch of PKIDs into the database.
147-
func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
148+
func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
148149
// Track the unique entries we've inserted so we don't insert the same entry twice.
149150
uniqueEntries := consumer.UniqueEntries(entries)
150151

151152
uniqueLeaderScheduleEntries := consumer.FilterEntriesByPrefix(
152153
uniqueEntries, lib.Prefixes.PrefixSnapshotLeaderSchedule)
154+
155+
// Filter out any entries that are already in the cache.
156+
uniqueLeaderScheduleEntries = consumer.FilterCachedEntries(uniqueLeaderScheduleEntries, cachedEntries)
157+
153158
// NOTE: if we need to support parsing other indexes for PKIDs beyond LeaderSchedule,
154159
// we will need to filter the uniqueEntries by the appropriate prefix and then convert
155160
// the entries to the appropriate PG struct.
@@ -178,11 +183,16 @@ func bulkInsertPkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType l
178183
}
179184
}
180185

186+
// Update the cached entries with the new entries.
187+
for _, entry := range uniqueLeaderScheduleEntries {
188+
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
189+
}
190+
181191
return nil
182192
}
183193

184194
// bulkDeletePKID deletes a batch of PKIDs from the database.
185-
func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
195+
func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
186196
// Track the unique entries we've inserted so we don't insert the same entry twice.
187197
uniqueEntries := consumer.UniqueEntries(entries)
188198

@@ -201,5 +211,10 @@ func bulkDeletePkid(entries []*lib.StateChangeEntry, db bun.IDB, operationType l
201211
}
202212
}
203213

214+
// Remove the entries from the cache.
215+
for _, entry := range uniqueEntries {
216+
cachedEntries.Remove(string(entry.KeyBytes))
217+
}
218+
204219
return nil
205220
}

entries/transaction.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ type TransactionEntry struct {
2121
FeeNanos uint64
2222
NonceExpirationBlockHeight uint64
2323
NoncePartialId uint64
24-
TxnMeta lib.DeSoTxnMetadata `bun:"type:jsonb"`
25-
TxIndexMetadata lib.DeSoEncoder `bun:"type:jsonb"`
26-
TxIndexBasicTransferMetadata lib.DeSoEncoder `bun:"type:jsonb"`
24+
TxnMeta lib.DeSoTxnMetadata `bun:"type:jsonb"`
25+
TxIndexMetadata consumer.ConsumerTxIndexMetadata `bun:"type:jsonb"`
26+
TxIndexBasicTransferMetadata lib.DeSoEncoder `bun:"type:jsonb"`
2727
TxnMetaBytes []byte
2828
TxnBytes []byte
2929
TxnType uint16

entries/utxo_operation.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ func parseUtxoOperationBundle(
430430
entry.BlockHeight,
431431
)
432432
}
433-
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHashHex, params, transaction.TxnFeeNanos, uint64(jj), utxoOps)
433+
txIndexMetadata, txnExtraMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHashHex, params, transaction.TxnFeeNanos, uint64(jj), utxoOps)
434434
if err != nil {
435435
// If we fail to compute txindex metadata, log the error and continue to the next transaction.
436436
// We still append this txn to the transactionUpdates slice so that we can have it in the db.
@@ -453,7 +453,10 @@ func parseUtxoOperationBundle(
453453
innerTxnMetadata.BasicTransferTxindexMetadata.UtxoOps = nil
454454
}
455455
}
456-
transactions[jj].TxIndexMetadata = metadata
456+
transactions[jj].TxIndexMetadata = consumer.ConsumerTxIndexMetadata{
457+
DeSoEncoder: metadata,
458+
TransactionExtraMetadata: txnExtraMetadata,
459+
}
457460

458461
transactions[jj].TxIndexBasicTransferMetadata = txIndexMetadata.GetEncoderForTxType(lib.TxnTypeBasicTransfer)
459462

entries/validator.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"github.com/deso-protocol/core/lib"
66
"github.com/deso-protocol/state-consumer/consumer"
7+
lru "github.com/hashicorp/golang-lru/v2"
78
"github.com/pkg/errors"
89
"github.com/uptrace/bun"
910
"github.com/uptrace/bun/extra/bunbig"
@@ -98,15 +99,15 @@ func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []b
9899

99100
// ValidatorBatchOperation is the entry point for processing a batch of Validator entries.
100101
// It determines the appropriate handler based on the operation type and executes it.
101-
func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
102+
func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
102103
// We check before we call this function that there is at least one operation type.
103104
// We also ensure before this that all entries have the same operation type.
104105
operationType := entries[0].OperationType
105106
var err error
106107
if operationType == lib.DbOperationTypeDelete {
107-
err = bulkDeleteValidatorEntry(entries, db, operationType)
108+
err = bulkDeleteValidatorEntry(entries, db, operationType, cachedEntries)
108109
} else {
109-
err = bulkInsertValidatorEntry(entries, db, operationType, params)
110+
err = bulkInsertValidatorEntry(entries, db, operationType, params, cachedEntries)
110111
}
111112
if err != nil {
112113
return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType)
@@ -115,9 +116,13 @@ func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params
115116
}
116117

117118
// bulkInsertValidatorEntry inserts a batch of validator entries into the database.
118-
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
119+
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams, cachedEntries *lru.Cache[string, []byte]) error {
119120
// Track the unique entries we've inserted so we don't insert the same entry twice.
120121
uniqueEntries := consumer.UniqueEntries(entries)
122+
123+
// Filter out any entries that are already tracked in the cache.
124+
uniqueEntries = consumer.FilterCachedEntries(uniqueEntries, cachedEntries)
125+
121126
uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)
122127
uniqueSnapshotValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)
123128
// Create a new array to hold the bun struct.
@@ -156,11 +161,17 @@ func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, opera
156161
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting snapshot validator entries")
157162
}
158163
}
164+
165+
// Add any new entries to the cache.
166+
for _, entry := range uniqueEntries {
167+
cachedEntries.Add(string(entry.KeyBytes), entry.EncoderBytes)
168+
}
169+
159170
return nil
160171
}
161172

162173
// bulkDeleteValidatorEntry deletes a batch of validator entries from the database.
163-
func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
174+
func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, cachedEntries *lru.Cache[string, []byte]) error {
164175
// Track the unique entries we've inserted so we don't insert the same entry twice.
165176
uniqueEntries := consumer.UniqueEntries(entries)
166177
uniqueKeys := consumer.KeysToDelete(uniqueEntries)
@@ -195,6 +206,12 @@ func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db bun.IDB, opera
195206
}
196207
}
197208

209+
// Delete cached validator entries.
210+
for _, key := range validatorKeysToDelete {
211+
keyStr := string(key)
212+
cachedEntries.Remove(keyStr)
213+
}
214+
198215
return nil
199216
}
200217

handler/data_handler.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations"
1212
"github.com/deso-protocol/state-consumer/consumer"
1313
"github.com/golang/glog"
14+
lru "github.com/hashicorp/golang-lru/v2"
1415
"github.com/pkg/errors"
1516
"github.com/uptrace/bun"
1617
)
@@ -25,6 +26,9 @@ type PostgresDataHandler struct {
2526
// Params is a struct containing the current blockchain parameters.
2627
// It is used to determine which prefix to use for public keys.
2728
Params *lib.DeSoParams
29+
30+
// LRU containing cached entries, to reduce duplicative database operations
31+
CachedEntries *lru.Cache[string, []byte]
2832
}
2933

3034
// HandleEntryBatch performs a bulk operation for a batch of entries, based on the encoder type.
@@ -92,7 +96,7 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
9296
case lib.EncoderTypeStakeEntry:
9397
err = entries.StakeBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
9498
case lib.EncoderTypeValidatorEntry:
95-
err = entries.ValidatorBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
99+
err = entries.ValidatorBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params, postgresDataHandler.CachedEntries)
96100
case lib.EncoderTypeLockedStakeEntry:
97101
err = entries.LockedStakeBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
98102
case lib.EncoderTypeLockedBalanceEntry:
@@ -102,11 +106,11 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
102106
case lib.EncoderTypeEpochEntry:
103107
err = entries.EpochEntryBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
104108
case lib.EncoderTypePKID:
105-
err = entries.PkidBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
109+
err = entries.PkidBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params, postgresDataHandler.CachedEntries)
106110
case lib.EncoderTypeGlobalParamsEntry:
107111
err = entries.GlobalParamsBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
108112
case lib.EncoderTypeBLSPublicKeyPKIDPairEntry:
109-
err = entries.BLSPublicKeyPKIDPairBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
113+
err = entries.BLSPublicKeyPKIDPairBatchOperation(batchedEntries, dbHandle, postgresDataHandler.Params, postgresDataHandler.CachedEntries)
110114
case lib.EncoderTypeBlockNode:
111115
err = entries.BlockNodeOperation(batchedEntries, dbHandle, postgresDataHandler.Params)
112116
}

handler/db_utils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ const (
1717
MigrationTypePostHypersync MigrationType = 1
1818
)
1919

20+
const (
21+
EntryCacheSize uint = 1000000 // 1M entries
22+
)
23+
2024
// TODO: Make this a method on the PostgresDataHandler struct.
2125
func RunMigrations(db *bun.DB, reset bool, migrationType MigrationType) error {
2226
ctx := context.Background()

main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/deso-protocol/postgres-data-handler/migrations/post_sync_migrations"
1111
"github.com/deso-protocol/state-consumer/consumer"
1212
"github.com/golang/glog"
13+
lru "github.com/hashicorp/golang-lru/v2"
1314
"github.com/spf13/viper"
1415
"github.com/uptrace/bun"
1516
"github.com/uptrace/bun/dialect/pgdialect"
@@ -70,6 +71,11 @@ func main() {
7071
}
7172
lib.GlobalDeSoParams = *params
7273

74+
cachedEntries, err := lru.New[string, []byte](int(handler.EntryCacheSize))
75+
if err != nil {
76+
glog.Fatalf("Error creating LRU cache: %v", err)
77+
}
78+
7379
// Initialize and run a state syncer consumer.
7480
stateSyncerConsumer := &consumer.StateSyncerConsumer{}
7581
err = stateSyncerConsumer.InitializeAndRun(
@@ -79,8 +85,9 @@ func main() {
7985
threadLimit,
8086
syncMempool,
8187
&handler.PostgresDataHandler{
82-
DB: db,
83-
Params: params,
88+
DB: db,
89+
Params: params,
90+
CachedEntries: cachedEntries,
8491
},
8592
)
8693
if err != nil {

0 commit comments

Comments
 (0)