Skip to content

Commit e6d828b

Browse files
authored
nits, improvements, cleanup after #1858 (#1863)
1 parent c429125 commit e6d828b

File tree

12 files changed

+172
-161
lines changed

12 files changed

+172
-161
lines changed

packages/client/lib/sync/fetcher/blockfetcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export class BlockFetcher extends BlockFetcherBase<Block[], Block> {
106106
blocks[0]?.header.number
107107
} last=${blocks[blocks.length - 1]?.header.number})`
108108
)
109-
this.config.events.emit(Event.SYNC_FETCHER_FETCHED, blocks.slice(0, num))
109+
this.config.events.emit(Event.SYNC_FETCHED_BLOCKS, blocks.slice(0, num))
110110
} catch (e: any) {
111111
this.debug(
112112
`Error storing fetcher results in blockchain (blocks num=${blocks.length} first=${

packages/client/lib/sync/fetcher/blockfetcherbase.ts

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,12 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
2828
> {
2929
protected chain: Chain
3030
/**
31-
* `first` is from where the fetcher pendancy starts apart from the jobs/tasks already
32-
* in the `in` queue.
31+
* Where the fetcher starts apart from the tasks already in the `in` queue
3332
*/
3433
first: BN
3534
/**
36-
* `count` are the number of items in fetcher pendancy starting from (and including)
37-
* `first`. `first + count - 1` gives the height fetcher is attempting to reach
35+
* Number of items in the fetcher starting from (and including) `first`.
36+
* `first + count - 1` gives the height fetcher is attempting to reach
3837
*/
3938
count: BN
4039

@@ -53,10 +52,10 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
5352
}
5453

5554
/**
56-
* Generate list of tasks to fetch, modifies `first`` and `count` to indicate any
57-
* remaning pendancy of the fetcher apart from jobs/tasks it pushes in the queue
55+
* Generate list of tasks to fetch. Modifies `first` and `count` to indicate
56+
* remaining items apart from the tasks it pushes in the queue
5857
*/
59-
tasks(first = this.first, count = this.count, maxTasks = Infinity): JobTask[] {
58+
tasks(first = this.first, count = this.count, maxTasks = this.config.maxFetcherJobs): JobTask[] {
6059
const max = this.config.maxPerRequest
6160
const tasks: JobTask[] = []
6261
let debugStr = `first=${first}`
@@ -72,15 +71,15 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
7271
tasks.push({ first: first.clone(), count: count.toNumber() })
7372
pushedCount.iadd(count)
7473
}
75-
debugStr = `${debugStr} count=${pushedCount}`
74+
debugStr += ` count=${pushedCount}`
7675
this.debug(`Created new tasks num=${tasks.length} ${debugStr}`)
7776
return tasks
7877
}
7978

8079
nextTasks(): void {
8180
if (this.in.length === 0 && this.count.gten(0)) {
82-
this.debug(`Fetcher has pendancy with first=${this.first} count=${this.count}`)
83-
const tasks = this.tasks(this.first, this.count, this.config.maxFetcherJobs)
81+
this.debug(`Fetcher pending with first=${this.first} count=${this.count}`)
82+
const tasks = this.tasks(this.first, this.count)
8483
for (const task of tasks) {
8584
this.enqueueTask(task)
8685
}
@@ -95,9 +94,9 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
9594
let first = this.first
9695
let last = this.first.add(this.count).subn(1)
9796

98-
// we have to loop and figure out because the jobs won't always be an monotonically
99-
// increasing order, some jobs could have refetch tasks enqueued. So better to find
100-
// the first and last examining each job
97+
// We have to loop because the jobs won't always be in increasing order.
98+
// Some jobs could have refetch tasks enqueued, so it is better to find
99+
// the first and last by examining each job.
101100
while (this.in.length > 0) {
102101
const job = this.in.remove()
103102
if (!job) break
@@ -111,7 +110,7 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
111110
}
112111
this.first = first
113112
this.count = last.sub(this.first).addn(1)
114-
// already removed jobs from the `in` heap, just pass to super for further cleanup
113+
// Already removed jobs from the `in` heap, just pass to super for further cleanup
115114
super.clear()
116115
}
117116

@@ -134,8 +133,8 @@ export abstract class BlockFetcherBase<JobResult, StorageItem> extends Fetcher<
134133
this.count.iadd(max.sub(last))
135134
updateHeightStr = `updated height=${max}`
136135
}
137-
// Re enqueue the numbers which are < `this.first` to refetch them else they will be
138-
// fetched in the future automatically on the jobs created by `nextTasks`
136+
// Re-enqueue the numbers which are less than `first` to refetch them,
137+
// else they will be fetched in the future with the jobs created by `nextTasks`.
139138
numberList = numberList.filter((num) => num.lte(this.first))
140139
const numBlocks = numberList.length
141140
let bulkRequest = true

packages/client/lib/sync/fetcher/fetcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ export abstract class Fetcher<JobTask, JobResult, StorageItem> extends Readable
188188
state: 'idle',
189189
peer: null,
190190
}
191-
this.debug(`enqueueTask ${this.jobStr(job)}`)
191+
this.debug(`enqueueTask: ${this.jobStr(job)}`)
192192
this.in.insert(job)
193193
if (!this.running && autoRestart) {
194194
void this.fetch()

packages/client/lib/sync/fetcher/headerfetcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ export class HeaderFetcher extends BlockFetcherBase<BlockHeaderResult, BlockHead
8888
headers[0]?.number
8989
} last=${headers[headers.length - 1]?.number})`
9090
)
91-
this.config.events.emit(Event.SYNC_FETCHER_FETCHED, headers.slice(0, num))
91+
this.config.events.emit(Event.SYNC_FETCHED_HEADERS, headers.slice(0, num))
9292
} catch (e: any) {
9393
this.debug(
9494
`Error storing fetcher results in blockchain (headers num=${headers.length} first=${

packages/client/lib/sync/fullsync.ts

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Event } from '../types'
66
import { Synchronizer, SynchronizerOptions } from './sync'
77
import { BlockFetcher } from './fetcher'
88
import { VMExecution } from '../execution'
9-
import type { Block, BlockHeader } from '@ethereumjs/block'
9+
import type { Block } from '@ethereumjs/block'
1010
import type { TxPool } from '../service/txpool'
1111

1212
interface FullSynchronizerOptions extends SynchronizerOptions {
@@ -39,7 +39,7 @@ export class FullSynchronizer extends Synchronizer {
3939
this.processBlocks = this.processBlocks.bind(this)
4040
this.stop = this.stop.bind(this)
4141

42-
this.config.events.on(Event.SYNC_FETCHER_FETCHED, this.processBlocks)
42+
this.config.events.on(Event.SYNC_FETCHED_BLOCKS, this.processBlocks)
4343
this.config.events.on(Event.SYNC_EXECUTION_VM_ERROR, this.stop)
4444

4545
void this.chain.update()
@@ -113,48 +113,50 @@ export class FullSynchronizer extends Synchronizer {
113113
}
114114

115115
/**
116-
* Sync all blocks and state from peer starting from current height.
116+
* Called from `sync()` to sync blocks and state from peer starting from current height.
117117
* @param peer remote peer to sync with
118-
* @return Resolves when sync completed
118+
* @returns a boolean if the setup was successful
119119
*/
120-
async syncWithPeer(peer?: Peer): Promise<BlockFetcher | null> {
121-
let latest
122-
if (peer && (latest = await this.latest(peer))) {
123-
const height = latest.number
124-
if (!this.config.syncTargetHeight || this.config.syncTargetHeight.lt(latest.number)) {
125-
this.config.syncTargetHeight = height
126-
this.config.logger.info(`New sync target height=${height} hash=${short(latest.hash())}`)
127-
}
120+
async syncWithPeer(peer?: Peer): Promise<boolean> {
121+
const latest = peer ? await this.latest(peer) : undefined
122+
if (!latest) return false
123+
124+
const height = latest.number
125+
if (!this.config.syncTargetHeight || this.config.syncTargetHeight.lt(latest.number)) {
126+
this.config.syncTargetHeight = height
127+
this.config.logger.info(`New sync target height=${height} hash=${short(latest.hash())}`)
128+
}
128129

129-
// Start fetcher from a safe distance behind because if the previous fetcher exited
130-
// due to a reorg, it would make sense to step back and refetch.
131-
const first = BN.max(
132-
this.chain.blocks.height.addn(1).subn(this.config.safeReorgDistance),
133-
new BN(1)
134-
)
135-
const count = height.sub(first).addn(1)
136-
if (count.gtn(0)) {
137-
if (!this.fetcher || this.fetcher.errored) {
138-
return new BlockFetcher({
139-
config: this.config,
140-
pool: this.pool,
141-
chain: this.chain,
142-
interval: this.interval,
143-
first,
144-
count,
145-
destroyWhenDone: false,
146-
})
147-
} else {
148-
const fetcherHeight = this.fetcher.first.add(this.fetcher.count).subn(1)
149-
if (height.gt(fetcherHeight)) this.fetcher.count.iadd(height.sub(fetcherHeight))
150-
this.config.logger.info(`peer=${peer} updated fetcher target to height=${height}`)
151-
}
152-
}
130+
// Start fetcher from a safe distance behind because if the previous fetcher exited
131+
// due to a reorg, it would make sense to step back and refetch.
132+
const first = BN.max(
133+
this.chain.blocks.height.addn(1).subn(this.config.safeReorgDistance),
134+
new BN(1)
135+
)
136+
const count = height.sub(first).addn(1)
137+
if (count.lten(0)) return false
138+
if (!this.fetcher || this.fetcher.errored) {
139+
this.fetcher = new BlockFetcher({
140+
config: this.config,
141+
pool: this.pool,
142+
chain: this.chain,
143+
interval: this.interval,
144+
first,
145+
count,
146+
destroyWhenDone: false,
147+
})
148+
} else {
149+
const fetcherHeight = this.fetcher.first.add(this.fetcher.count).subn(1)
150+
if (height.gt(fetcherHeight)) this.fetcher.count.iadd(height.sub(fetcherHeight))
151+
this.config.logger.info(`Updated fetcher target to height=${height} peer=${peer} `)
153152
}
154-
return null
153+
return true
155154
}
156155

157-
async processBlocks(blocks: Block[] | BlockHeader[]) {
156+
/**
157+
* Process blocks fetched from the fetcher.
158+
*/
159+
async processBlocks(blocks: Block[]) {
158160
if (this.config.chainCommon.gteHardfork(Hardfork.Merge)) {
159161
if (this.fetcher !== null) {
160162
// If we are beyond the merge block we should stop the fetcher
@@ -170,7 +172,6 @@ export class FullSynchronizer extends Synchronizer {
170172
return
171173
}
172174

173-
blocks = blocks as Block[]
174175
const first = blocks[0].header.number
175176
const last = blocks[blocks.length - 1].header.number
176177
const hash = short(blocks[0].hash())
@@ -305,38 +306,33 @@ export class FullSynchronizer extends Synchronizer {
305306
let min = new BN(-1)
306307
let newSyncHeight: [Buffer, BN] | undefined
307308
const blockNumberList: BN[] = []
308-
data.forEach((value) => {
309+
for (const value of data) {
309310
const blockNumber = value[1]
310311
blockNumberList.push(blockNumber)
311312
if (min.eqn(-1) || blockNumber.lt(min)) {
312313
min = blockNumber
313314
}
314315

315316
// Check if new sync target height can be set
316-
if (
317-
(!newSyncHeight &&
318-
(!this.config.syncTargetHeight || blockNumber.gt(this.config.syncTargetHeight))) ||
319-
(newSyncHeight && blockNumber.gt(newSyncHeight[1]))
320-
) {
321-
newSyncHeight = value
322-
}
323-
})
317+
if (newSyncHeight && blockNumber.lte(newSyncHeight[1])) continue
318+
if (this.config.syncTargetHeight && blockNumber.lte(this.config.syncTargetHeight)) continue
319+
newSyncHeight = value
320+
}
324321

325322
if (!newSyncHeight) return
326323
const [hash, height] = newSyncHeight
327324
this.config.syncTargetHeight = height
328325
this.config.logger.info(`New sync target height number=${height} hash=${short(hash)}`)
329-
// enqueue if we are close enough to chain head
326+
// Enqueue if we are close enough to chain head
330327
if (min.lt(this.chain.headers.height.addn(3000))) {
331328
this.fetcher.enqueueByNumberList(blockNumberList, min, height)
332329
}
333330
}
334331

335332
async stop(): Promise<boolean> {
336-
this.config.events.removeListener(Event.SYNC_FETCHER_FETCHED, this.processBlocks)
333+
this.config.events.removeListener(Event.SYNC_FETCHED_BLOCKS, this.processBlocks)
337334
this.config.events.removeListener(Event.SYNC_EXECUTION_VM_ERROR, this.stop)
338-
339-
return await super.stop()
335+
return super.stop()
340336
}
341337

342338
/**

0 commit comments

Comments
 (0)