Skip to content

Commit 2e85d1e

Browse files
feat & fix: fix data duplication issue, skip failed transaction (#8785)
* skip failed txn * fixed dup data issue
1 parent ec77e75 commit 2e85d1e

File tree

1 file changed

+46
-42
lines changed

1 file changed

+46
-42
lines changed

ingest/indexer/service/indexer_streaming_service.go

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -189,53 +189,57 @@ func (s *indexerStreamingService) publishTxn(ctx context.Context, req abci.Reque
189189

190190
// Looping through the transaction results, each result has a list of events to be looped through
191191
var includedEvents []domain.EventWrapper
192-
txResults := res.GetTxResults()
193-
for _, txResult := range txResults {
194-
events := txResult.GetEvents()
195-
// Iterate through the events in the transaction
196-
// Include these events only:
197-
// - token_swapped
198-
// - pool_joined
199-
// - pool_exited
200-
// - create_position
201-
// - withdraw_position
202-
for i, event := range events {
203-
clonedEvent := deepCloneEvent(&event)
204-
// Add the token liquidity to the event
205-
err := s.addTokenLiquidity(ctx, clonedEvent)
192+
193+
txnResult := res.TxResults[txnIndex]
194+
if txnResult.IsErr() {
195+
// Skip if the transaction is not successful, so that its corresponding events are not included in the publishing and not counted in by the dexscreener
196+
continue
197+
}
198+
events := txnResult.GetEvents()
199+
200+
// Iterate through the events in the transaction
201+
// Include these events only:
202+
// - token_swapped
203+
// - pool_joined
204+
// - pool_exited
205+
// - create_position
206+
// - withdraw_position
207+
for i, event := range events {
208+
clonedEvent := deepCloneEvent(&event)
209+
// Add the token liquidity to the event
210+
err := s.addTokenLiquidity(ctx, clonedEvent)
211+
if err != nil {
212+
s.logger.Error("Error adding token liquidity to event", "error", err)
213+
return err
214+
}
215+
err = s.adjustTokenInAmountBySpreadFactor(ctx, clonedEvent)
216+
if err != nil {
217+
s.logger.Error("Error adjusting amount by spread factor", "error", err)
218+
continue
219+
}
220+
eventType := clonedEvent.Type
221+
if eventType == gammtypes.TypeEvtTokenSwapped {
222+
// Set the spot price for the token swapped event in the event's attributes map
223+
err := s.setSpotPrice(ctx, clonedEvent)
206224
if err != nil {
207-
s.logger.Error("Error adding token liquidity to event", "error", err)
208-
return err
225+
s.logger.Error("Error setting spot price", "error", err)
226+
continue
209227
}
210-
err = s.adjustTokenInAmountBySpreadFactor(ctx, clonedEvent)
228+
}
229+
if eventType == gammtypes.TypeEvtTokenSwapped || eventType == gammtypes.TypeEvtPoolJoined || eventType == gammtypes.TypeEvtPoolExited || eventType == concentratedliquiditytypes.TypeEvtCreatePosition || eventType == concentratedliquiditytypes.TypeEvtWithdrawPosition {
230+
includedEvents = append(includedEvents, domain.EventWrapper{Index: i, Event: *clonedEvent})
231+
}
232+
// Track the newly created pool ID
233+
// IMPORTANT NOTE:
234+
// 1. Using event attributes in a transaction, ONLY pool ID of the newly created pool is available and being tracked by the underlying pool tracker.
235+
// 2. For the other pool metadata of the newly created pool, such as denoms and fees, they are available and tracked thru OnWrite listeners in the common/writelistener package.
236+
// 3. See: block_updates_indexer_block_process_strategy.go::publishCreatedPools for more details.
237+
if eventType == poolmanagertypes.TypeEvtPoolCreated {
238+
err := s.trackCreatedPoolID(event, sdkCtx.BlockHeight(), sdkCtx.BlockTime().UTC(), txHash)
211239
if err != nil {
212-
s.logger.Error("Error adjusting amount by spread factor", "error", err)
240+
s.logger.Error("Error tracking newly created pool ID %v. event skipped.", err)
213241
continue
214242
}
215-
eventType := clonedEvent.Type
216-
if eventType == gammtypes.TypeEvtTokenSwapped {
217-
// Set the spot price for the token swapped event in the event's attributes map
218-
err := s.setSpotPrice(ctx, clonedEvent)
219-
if err != nil {
220-
s.logger.Error("Error setting spot price", "error", err)
221-
continue
222-
}
223-
}
224-
if eventType == gammtypes.TypeEvtTokenSwapped || eventType == gammtypes.TypeEvtPoolJoined || eventType == gammtypes.TypeEvtPoolExited || eventType == concentratedliquiditytypes.TypeEvtCreatePosition || eventType == concentratedliquiditytypes.TypeEvtWithdrawPosition {
225-
includedEvents = append(includedEvents, domain.EventWrapper{Index: i, Event: *clonedEvent})
226-
}
227-
// Track the newly created pool ID
228-
// IMPORTANT NOTE:
229-
// 1. Using event attributes in a transaction, ONLY pool ID of the newly created pool is available and being tracked by the underlying pool tracker.
230-
// 2. For the other pool metadata of the newly created pool, such as denoms and fees, they are available and tracked thru OnWrite listeners in the common/writelistener package.
231-
// 3. See: block_updates_indexer_block_process_strategy.go::publishCreatedPools for more details.
232-
if eventType == poolmanagertypes.TypeEvtPoolCreated {
233-
err := s.trackCreatedPoolID(event, sdkCtx.BlockHeight(), sdkCtx.BlockTime().UTC(), txHash)
234-
if err != nil {
235-
s.logger.Error("Error tracking newly created pool ID %v. event skipped.", err)
236-
continue
237-
}
238-
}
239243
}
240244
}
241245
// Publish the transaction

0 commit comments

Comments
 (0)