From f2368d90711d49df1382acb381745c345c90a140 Mon Sep 17 00:00:00 2001 From: rkamysz Date: Tue, 7 Nov 2023 12:29:19 +0100 Subject: [PATCH] Split reader (#9) * split reader, remove test block range * update * update deploy script * change unprocessed block queue * clear chache * remove BlockState types * increase list limit * update logs, fix batch to single switch * 0.0.12 --- .env.template | 2 +- package.json | 8 +- scripts/deploy.sh | 7 +- .../__tests__/bootstrap.utils.unit.test.ts | 55 ----- src/bootstrap/bootstrap.command.ts | 2 +- src/bootstrap/bootstrap.utils.ts | 45 +--- src/bootstrap/start-bootstrap.ts | 24 +- src/common/abis/abis.repository-impl.ts | 10 +- src/common/block-state/block-state.ts | 58 +++-- src/common/block-state/block-state.types.ts | 2 +- src/common/common.enums.ts | 1 - src/common/common.errors.ts | 4 +- .../unprocessed-block-queue.ts | 153 +++++++------ .../unprocessed-block-queue.types.ts | 6 + src/config/index.ts | 2 +- src/filter/filter.command.ts | 2 +- src/reader/index.ts | 7 +- src/reader/read-blocks-in-default-mode.ts | 194 ++++++++++++++++ src/reader/read-blocks-in-replay-mode.ts | 151 +++++++++++++ src/reader/reader.command.ts | 2 +- src/reader/reader.dependencies.ts | 16 +- src/reader/reader.errors.ts | 5 + src/reader/reader.ts | 100 --------- src/reader/reader.worker-loader.ts | 41 +--- src/reader/reader.worker-message.ts | 42 ++++ src/reader/reader.worker.ts | 211 ++++++++---------- src/reader/start-reader.ts | 79 ++----- tutorials/config-vars.md | 3 +- yarn.lock | 26 +-- 29 files changed, 718 insertions(+), 540 deletions(-) create mode 100644 src/reader/read-blocks-in-default-mode.ts create mode 100644 src/reader/read-blocks-in-replay-mode.ts create mode 100644 src/reader/reader.errors.ts delete mode 100644 src/reader/reader.ts create mode 100644 src/reader/reader.worker-message.ts diff --git a/.env.template b/.env.template index 1ddc78d..67483da 100644 --- a/.env.template +++ b/.env.template @@ -22,7 +22,7 @@ END_BLOCK=238581000 START_FROM_HEAD=0 MODE='default' MAX_BLOCK_NUMBER=0xffffffff -UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE=256000000 +UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE=1024000000 UNPROCESSED_BLOCK_QUEUE_SIZE_CHECK_INTERVAL=2000 UNPROCESSED_BLOCK_QUEUE_BATCH_SIZE=100 BROADCAST_PORT=9000 diff --git a/package.json b/package.json index 82505a2..f3eda01 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@alien-worlds/aw-history", - "version": "0.0.9", + "version": "0.0.12", "description": "", "packageManager": "yarn@3.2.3", "main": "build/index.js", @@ -38,9 +38,9 @@ "typescript": "^4.8.2" }, "dependencies": { - "@alien-worlds/aw-broadcast": "^0.0.9", - "@alien-worlds/aw-core": "^0.0.15", - "@alien-worlds/aw-workers": "^0.0.4", + "@alien-worlds/aw-broadcast": "^0.0.12", + "@alien-worlds/aw-core": "^0.0.21", + "@alien-worlds/aw-workers": "^0.0.5", "async": "^3.2.4", "commander": "^10.0.1", "crypto": "^1.0.1", diff --git a/scripts/deploy.sh b/scripts/deploy.sh index d1a118b..9dc0496 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -6,7 +6,7 @@ error_exit() exit 1 } -# major | minor | patch +# major | minor | patch | pre TYPE=patch if [[ $1 =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]] @@ -15,11 +15,14 @@ if [[ $1 =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]] elif [[ $1 == "major" ]] || [[ $1 == "minor" ]] || [[ $1 == "patch" ]] then TYPE=$1 +elif [[ $1 == "pre" ]] + then + TYPE=prepatch elif [[ $1 == "" ]] then TYPE=patch else - printf "Unknown update type \"%s\".\n- Please use \"major\" | \"minor\" | \"patch\"\n" "$1" + printf "Unknown update type \"%s\".\n- Please use \"major\" | \"minor\" | \"patch\" | \"pre\"\n" "$1" exit 1; fi diff --git a/src/bootstrap/__tests__/bootstrap.utils.unit.test.ts b/src/bootstrap/__tests__/bootstrap.utils.unit.test.ts index ed94e33..0908568 100644 --- a/src/bootstrap/__tests__/bootstrap.utils.unit.test.ts +++ b/src/bootstrap/__tests__/bootstrap.utils.unit.test.ts @@ -2,7 +2,6 @@ import { Result } from '@alien-worlds/aw-core'; import { createDefaultModeBlockRange, createReplayModeBlockRange, - createTestModeBlockRange, } from '../bootstrap.utils'; import { Mode } from '../../common'; import { @@ -78,60 +77,6 @@ describe('createDefaultModeBlockRange', () => { }); }); -describe('createTestModeBlockRange', () => { - const originalLog = console.log; - const blockchain = { - getLastIrreversibleBlockNumber: jest.fn().mockResolvedValue(Result.withContent(100n)), - getHeadBlockNumber: jest.fn().mockResolvedValue(Result.withContent(100n)), - } as any; - beforeEach(() => { - jest.clearAllMocks(); - console.log = jest.fn(); - }); - - afterEach(() => { - console.log = originalLog; - }); - - it('should create a block range in test mode when startBlock is not a bigint', async () => { - const config = { - blockchain: {}, - startBlock: null, - mode: Mode.Test, - scanner: { scanKey: 'scanKey' }, - startFromHead: true, - } as any; - - const result = await createTestModeBlockRange(blockchain, config); - - expect(result).toEqual({ - startBlock: 99n, - endBlock: 100n, - mode: Mode.Test, - scanKey: 'scanKey', - }); - }); - - it('should create a block range in test mode when startBlock is a bigint', async () => { - const config = { - blockchain: {}, - startBlock: 50n, - mode: Mode.Test, - scanner: { scanKey: 'scanKey' }, - startFromHead: true, - } as any; - - const result = await createTestModeBlockRange(blockchain, config); - - expect(result).toEqual({ - startBlock: 50n, - endBlock: 51n, - mode: Mode.Test, - scanKey: 'scanKey', - }); - }); -}); - describe('createReplayModeBlockRange', () => { const originalLog = console.log; const blockchain = { diff --git a/src/bootstrap/bootstrap.command.ts b/src/bootstrap/bootstrap.command.ts index 13c2211..147133d 100644 --- a/src/bootstrap/bootstrap.command.ts +++ b/src/bootstrap/bootstrap.command.ts @@ -6,5 +6,5 @@ bootstrapCommand .version('1.0', '-v, --version') .option('-k, --scan-key ', 'Scan key') .option('-s, --start-block ', 'Start at this block') - .option('-m, --mode ', 'Mode (default/replay/test)') + .option('-m, --mode ', 'Mode (default/replay)') .option('-e, --end-block ', 'End block (exclusive)'); diff --git a/src/bootstrap/bootstrap.utils.ts b/src/bootstrap/bootstrap.utils.ts index a590ddc..21f9ba1 100644 --- a/src/bootstrap/bootstrap.utils.ts +++ b/src/bootstrap/bootstrap.utils.ts @@ -30,12 +30,9 @@ export const createBlockRangeTaskInput = async ( } else if (mode === Mode.Replay) { // return createReplayModeBlockRange(scanner, blockchain, config); - } else if (mode === Mode.Test) { - // - return createTestModeBlockRange(blockchain, config); } else { // - throw new UnknownModeError(mode); + throw new UnknownModeError(mode, [Mode.Default, Mode.Replay]); } }; @@ -117,41 +114,11 @@ export const createDefaultModeBlockRange = async ( throw new StartBlockHigherThanEndBlockError(lowEdge, highEdge); } - return { startBlock: lowEdge, endBlock: highEdge, mode, scanKey }; -}; - -/** - * Creates a block range in test mode. - * - * @async - * @param {BlockchainService} blockchain - The blockchain service. - * @param {BootstrapConfig} config - The bootstrap configuration. - * @returns {Promise} The block range data. - */ -export const createTestModeBlockRange = async ( - blockchain: BlockchainService, - config: BootstrapConfig -): Promise => { - const { - startBlock, - mode, - scanner: { scanKey }, - startFromHead, - } = config; - - const { content: lastIrreversibleBlock } = - await blockchain.getLastIrreversibleBlockNumber(); - const { content: headBlock } = await blockchain.getHeadBlockNumber(); - - let highEdge: bigint; - let lowEdge: bigint; - - if (typeof startBlock !== 'bigint') { - highEdge = startFromHead ? headBlock : lastIrreversibleBlock; - lowEdge = highEdge - 1n; - } else { - lowEdge = startBlock; - highEdge = startBlock + 1n; + if (highEdge === currentBlockNumber + 1n) { + log( + ` The current state of the block indicates that the block range up to ${highEdge.toString()} has been read. If this is an error, check the database and reset the process.` + ); + return null; } return { startBlock: lowEdge, endBlock: highEdge, mode, scanKey }; diff --git a/src/bootstrap/start-bootstrap.ts b/src/bootstrap/start-bootstrap.ts index 7292639..55e3bf7 100644 --- a/src/bootstrap/start-bootstrap.ts +++ b/src/bootstrap/start-bootstrap.ts @@ -5,7 +5,6 @@ import { import { createDefaultModeBlockRange, createReplayModeBlockRange, - createTestModeBlockRange, } from './bootstrap.utils'; import { BootstrapCommandOptions } from './bootstrap.types'; import { NoAbisError } from './bootstrap.errors'; @@ -80,6 +79,14 @@ export const bootstrap = async ( throw new NoAbisError(); } + log(` * Initialize block state ... [starting]`); + const initStateResult = await blockState.initState(); + + if (initStateResult.isFailure) { + throw initStateResult.failure.error; + } + log(` * Initialize block state ... [done]`); + if (config.mode === Mode.Replay) { blockRange = await createReplayModeBlockRange(scanner, blockchain, config); } @@ -90,16 +97,11 @@ export const bootstrap = async ( if (message.name === InternalBroadcastMessageName.DefaultModeReaderReady) { if (config.mode === Mode.Default) { blockRange = await createDefaultModeBlockRange(blockState, blockchain, config); - broadcastClient.sendMessage( - ReaderBroadcastMessage.newDefaultModeTask(blockRange) - ); - } - - if (config.mode === Mode.Test) { - blockRange = await createTestModeBlockRange(blockchain, config); - broadcastClient.sendMessage( - ReaderBroadcastMessage.newDefaultModeTask(blockRange) - ); + if (blockRange) { + broadcastClient.sendMessage( + ReaderBroadcastMessage.newDefaultModeTask(blockRange) + ); + } } } else if (message.name === InternalBroadcastMessageName.ReplayModeReaderReady) { broadcastClient.sendMessage(ReaderBroadcastMessage.newReplayModeTask(blockRange)); diff --git a/src/common/abis/abis.repository-impl.ts b/src/common/abis/abis.repository-impl.ts index b5bb511..659f735 100644 --- a/src/common/abis/abis.repository-impl.ts +++ b/src/common/abis/abis.repository-impl.ts @@ -58,11 +58,11 @@ export class AbisRepositoryImpl const where = Where.bind(); if (startBlock && endBlock) { - where.props().blockNumber.isGte(startBlock).isLte(endBlock); + where.prototype().blockNumber.isGte(startBlock).isLte(endBlock); } if (contracts) { - where.props().contract.isIn(contracts); + where.prototype().contract.isIn(contracts); } return this.find(FindParams.create({ where })); @@ -89,7 +89,7 @@ export class AbisRepositoryImpl } const where = Where.bind(); - where.props().blockNumber.isLte(blockNumber).props().contract.isEq(contract); + where.prototype().blockNumber.isLte(blockNumber).prototype().contract.isEq(contract); const { content, failure } = await this.find( FindParams.create({ where, limit: 1, sort: { block_number: -1 } }) ); @@ -135,11 +135,11 @@ export class AbisRepositoryImpl const where = Where.bind(); if (typeof startBlock === 'bigint') { - where.props().blockNumber.isGte(startBlock); + where.prototype().blockNumber.isGte(startBlock); } if (typeof endBlock === 'bigint') { - where.props().blockNumber.isLte(endBlock); + where.prototype().blockNumber.isLte(endBlock); } return this.count(CountParams.create({ where })); diff --git a/src/common/block-state/block-state.ts b/src/common/block-state/block-state.ts index a27ae7a..30cefbc 100644 --- a/src/common/block-state/block-state.ts +++ b/src/common/block-state/block-state.ts @@ -7,55 +7,69 @@ import { RepositoryImpl, Result, } from '@alien-worlds/aw-core'; -import { BlockStateModel } from './block-state.types'; +import { BlockStateEntity } from './block-state.types'; /** * A class representing a block state. */ -export class BlockState extends RepositoryImpl { +export class BlockState extends RepositoryImpl { /** * Creates an instance of the BlockState class. * - * @param {DataSource} source - The data source. - * @param {BlockStateMongoMapper} mapper - The data mapper. + * @param {DataSource} source - The data source. + * @param {Mapper} mapper - The data mapper. * @param {QueryBuilders} queryBuilders - The query builders. * @param {QueryBuilder} updateBlockNumberQueryBuilder - The query builder to update block number. */ constructor( source: DataSource, - mapper: Mapper, + mapper: Mapper, queryBuilders: QueryBuilders, private updateBlockNumberQueryBuilder: QueryBuilder ) { super(source, mapper, queryBuilders); } + /** + * Initialize state if not already set. + */ + public async initState(): Promise> { + try { + const { content: states } = await this.find(); + + if (states.length === 0) { + await this.add([ + { + lastModifiedTimestamp: new Date(), + actions: [], + tables: [], + blockNumber: 0n, + }, + ]); + } + return Result.withoutContent(); + } catch (error) { + return Result.withFailure(Failure.fromError(error)); + } + } + /** * Fetches the current state of the data source. * - * @returns {Promise>} - The result of the operation. + * @returns {Promise>} - The result of the operation. */ - public async getState(): Promise> { + public async getState(): Promise> { try { const { content: states } = await this.find(); - if (states) { - const state = states[0]; - const { lastModifiedTimestamp, actions, tables, blockNumber } = state; - - return Result.withContent({ - lastModifiedTimestamp: lastModifiedTimestamp || new Date(), - actions: actions || [], - tables: tables || [], - blockNumber: blockNumber || 0n, - }); - } + const state = states[0]; + const { lastModifiedTimestamp, actions, tables, blockNumber } = state; return Result.withContent({ - lastModifiedTimestamp: new Date(), - actions: [], - tables: [], - blockNumber: 0n, + lastModifiedTimestamp, + actions, + tables, + blockNumber, }); } catch (error) { return Result.withFailure(Failure.fromError(error)); diff --git a/src/common/block-state/block-state.types.ts b/src/common/block-state/block-state.types.ts index b7c8e9e..9a06e06 100644 --- a/src/common/block-state/block-state.types.ts +++ b/src/common/block-state/block-state.types.ts @@ -1,4 +1,4 @@ -export type BlockStateModel = { +export type BlockStateEntity = { lastModifiedTimestamp: Date; blockNumber: bigint; actions: string[]; diff --git a/src/common/common.enums.ts b/src/common/common.enums.ts index af0dcbb..0d2d8b6 100644 --- a/src/common/common.enums.ts +++ b/src/common/common.enums.ts @@ -1,5 +1,4 @@ export enum Mode { Default = 'default', Replay = 'replay', - Test = 'test', } diff --git a/src/common/common.errors.ts b/src/common/common.errors.ts index a10bca9..ba9ee80 100644 --- a/src/common/common.errors.ts +++ b/src/common/common.errors.ts @@ -1,5 +1,5 @@ export class UnknownModeError extends Error { - constructor(mode: string) { - super(`Unknown mode "${mode}"`); + constructor(mode: string, modes: string[]) { + super(`Unknown mode "${mode}". Use: ${modes.join(', ')}`); } } diff --git a/src/common/unprocessed-block-queue/unprocessed-block-queue.ts b/src/common/unprocessed-block-queue/unprocessed-block-queue.ts index e74390e..d2c1b0a 100644 --- a/src/common/unprocessed-block-queue/unprocessed-block-queue.ts +++ b/src/common/unprocessed-block-queue/unprocessed-block-queue.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ import { Block, DataSourceError, @@ -7,13 +8,10 @@ import { parseToBigInt, Result, } from '@alien-worlds/aw-core'; -import { - BlockNotFoundError, - DuplicateBlocksError, - UnprocessedBlocksOverloadError, -} from './unprocessed-block-queue.errors'; +import { BlockNotFoundError } from './unprocessed-block-queue.errors'; import { UnprocessedBlockSource } from './unprocessed-block-queue.source'; import { BlockModel } from '../types/block.types'; +import { InsertionResult } from './unprocessed-block-queue.types'; export abstract class UnprocessedBlockQueueReader { public abstract next(): Promise>; @@ -25,7 +23,7 @@ export class UnprocessedBlockQueue protected cache: Block[] = []; protected overloadHandler: (size: number) => void; protected beforeSendBatchHandler: () => void; - protected afterSendBatchHandler: () => void; + protected afterSendBatchHandler: (successful: boolean) => void; constructor( protected collection: UnprocessedBlockSource, @@ -35,31 +33,49 @@ export class UnprocessedBlockQueue protected fastLaneBatchSize: number ) {} - private async sendBatch() { - const addedBlockNumbers = []; - this.beforeSendBatchHandler(); - const documnets = this.cache.map(block => this.mapper.fromEntity(block)); - const result = await this.collection.insert(documnets); - result.forEach(model => { - addedBlockNumbers.push(parseToBigInt((model as BlockModel).this_block.block_num)); - }); - this.cache = []; - - if (this.maxBytesSize > 0 && this.overloadHandler) { - const sorted = addedBlockNumbers.sort(); - const min = sorted[0]; - const max = sorted.reverse()[0]; + private async sendBatch(): Promise> { + const result: InsertionResult = { + insertedBlocks: [], + failedBlocks: [], + queueOverloadSize: 0, + }; - const currentSize = await this.collection.bytesSize(); - if (currentSize >= this.maxBytesSize) { - this.overloadHandler(currentSize); - throw new UnprocessedBlocksOverloadError(min, max); + try { + const documents = this.cache.map(block => this.mapper.fromEntity(block)); + this.cache = []; + const insertedModels = await this.collection.insert({ + documents, + options: { ordered: false }, + }); + insertedModels.forEach(model => { + result.insertedBlocks.push( + parseToBigInt((model as BlockModel).this_block.block_num) + ); + }); + } catch (error) { + const { additionalData, isDuplicateError } = error as DataSourceError<{ + failedDocuments: BlockModel[]; + }>; + if (isDuplicateError === false) { + if ( + Array.isArray(additionalData.failedDocuments) && + additionalData.failedDocuments.length > 0 + ) { + result.failedBlocks = additionalData.failedDocuments.map(model => + parseToBigInt(model.this_block.block_num) + ); + } else { + return Result.withFailure(error); + } } } - this.afterSendBatchHandler(); + if (this.maxBytesSize > 0) { + const currentSize = await this.collection.bytesSize(); + result.queueOverloadSize = currentSize - this.maxBytesSize; + } - return addedBlockNumbers; + return Result.withContent(result); } public async getBytesSize(): Promise> { @@ -74,49 +90,32 @@ export class UnprocessedBlockQueue public async add( block: Block, options?: { isFastLane?: boolean; isLast?: boolean; predictedRangeSize?: number } - ): Promise> { + ): Promise> { + let result: Result = Result.withoutContent(); const { isFastLane, isLast, predictedRangeSize } = options || {}; - try { - let addedBlockNumbers: bigint[] = []; - const currentBatchSize = isFastLane - ? predictedRangeSize < this.fastLaneBatchSize - ? predictedRangeSize - : this.fastLaneBatchSize - : predictedRangeSize < this.batchSize - ? predictedRangeSize - : this.batchSize; - - if (this.cache.length < currentBatchSize) { - this.cache.push(block); - } - if (this.cache.length === currentBatchSize || isLast) { - addedBlockNumbers = await this.sendBatch(); - } + const currentBatchSize = isFastLane + ? predictedRangeSize < this.fastLaneBatchSize + ? predictedRangeSize + : this.fastLaneBatchSize + : predictedRangeSize < this.batchSize + ? predictedRangeSize + : this.batchSize; - return Result.withContent(addedBlockNumbers); - } catch (error) { - // it is important to clear the cache in case of errors - this.cache = []; + if (this.cache.length < currentBatchSize) { + this.cache.push(block); + } - if (error instanceof DataSourceError && error.isDuplicateError) { - this.afterSendBatchHandler(); - return Result.withFailure(Failure.fromError(new DuplicateBlocksError())); - } - return Result.withFailure(Failure.fromError(error)); + if (this.cache.length >= currentBatchSize || isLast) { + result = await this.sendBatch(); } + return result; } public async next(): Promise> { try { const document = await this.collection.next(); if (document) { - if (this.maxBytesSize > -1 && this.afterSendBatchHandler) { - if ((await this.collection.count()) === 0 && this.afterSendBatchHandler) { - this.afterSendBatchHandler(); - } - } - return Result.withContent(this.mapper.toEntity(document)); } return Result.withFailure(Failure.fromError(new BlockNotFoundError())); @@ -141,15 +140,37 @@ export class UnprocessedBlockQueue } } - public afterSendBatch(handler: () => void): void { - this.afterSendBatchHandler = handler; - } + /** + * Waits for the queue to clear up to a maximum number of tries. Checks the queue size + * at the specified timeout interval. If the size reaches zero or the maximum number of + * tries is exceeded, the promise is resolved. + * + * @param {number} [timeoutMS=1000] - The time interval (in milliseconds) at which the queue size is checked. + * @param {number} [maxTries=10] - The maximum number of times the queue size should be checked before giving up. + * + * @returns {Promise} A promise that resolves when the queue is cleared or the maximum tries are reached. + */ + public async waitForQueueToClear(timeoutMS = 1000, maxTries = 10) { + const { maxBytesSize } = this; + return new Promise(resolve => { + let tries = 0; + const interval = setInterval(async () => { + if (maxTries && tries >= maxTries) { + log(`Max tries (${maxTries}) reached without clearing the collection.`); + clearInterval(interval); + resolve(null); + return; + } - public beforeSendBatch(handler: () => void): void { - this.beforeSendBatchHandler = handler; - } + const { content: currentSize } = await this.getBytesSize(); - public onOverload(handler: (size: number) => void): void { - this.overloadHandler = handler; + if (currentSize === 0) { + log(`Unprocessed blocks collection cleared, blockchain reading resumed.`); + clearInterval(interval); + resolve(null); + } + tries++; + }, timeoutMS); + }); } } diff --git a/src/common/unprocessed-block-queue/unprocessed-block-queue.types.ts b/src/common/unprocessed-block-queue/unprocessed-block-queue.types.ts index 4d6c6e8..2033de9 100644 --- a/src/common/unprocessed-block-queue/unprocessed-block-queue.types.ts +++ b/src/common/unprocessed-block-queue/unprocessed-block-queue.types.ts @@ -5,3 +5,9 @@ export type UnprocessedBlockQueueConfig = { sizeCheckInterval?: number; [key: string]: unknown; }; + +export type InsertionResult = { + insertedBlocks: bigint[]; + failedBlocks: bigint[]; + queueOverloadSize: number; +}; diff --git a/src/config/index.ts b/src/config/index.ts index 5bfce4c..345abe7 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -85,7 +85,7 @@ export const buildProcessorTaskQueueConfig = ( export const buildUnprocessedBlockQueueConfig = ( vars: ConfigVars ): UnprocessedBlockQueueConfig => ({ - maxBytesSize: vars.getNumberEnv('UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE') || 256000000, + maxBytesSize: vars.getNumberEnv('UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE') || 1024000000, sizeCheckInterval: vars.getNumberEnv('UNPROCESSED_BLOCK_QUEUE_SIZE_CHECK_INTERVAL') || 2000, batchSize: vars.getNumberEnv('UNPROCESSED_BLOCK_QUEUE_BATCH_SIZE') || 100, diff --git a/src/filter/filter.command.ts b/src/filter/filter.command.ts index c28bbb1..e6f5881 100644 --- a/src/filter/filter.command.ts +++ b/src/filter/filter.command.ts @@ -5,6 +5,6 @@ export const filterCommand = new Command(); filterCommand .version('1.0', '-v, --version') .option('-k, --scan-key ', 'Scan key') - .option('-m, --mode ', 'Mode (default/replay/test)') + .option('-m, --mode ', 'Mode (default/replay)') .option('-t, --threads ', 'Number of threads') .parse(process.argv); diff --git a/src/reader/index.ts b/src/reader/index.ts index cc4e5fd..8469e92 100644 --- a/src/reader/index.ts +++ b/src/reader/index.ts @@ -1,10 +1,13 @@ -export * from './reader'; -export * from './reader.config'; +export * from './read-blocks-in-default-mode'; +export * from './read-blocks-in-replay-mode'; export * from './reader.command'; +export * from './reader.config'; export * from './reader.consts'; export * from './reader.dependencies'; +export * from './reader.errors'; export * from './reader.types'; export * from './reader.worker'; export * from './reader.worker-loader'; export * from './reader.worker-loader.dependencies'; +export * from './reader.worker-message'; export * from './start-reader'; diff --git a/src/reader/read-blocks-in-default-mode.ts b/src/reader/read-blocks-in-default-mode.ts new file mode 100644 index 0000000..11d05b5 --- /dev/null +++ b/src/reader/read-blocks-in-default-mode.ts @@ -0,0 +1,194 @@ +import { Block, BlockReader, log, parseToBigInt } from '@alien-worlds/aw-core'; +import { + InternalBroadcastChannel, + InternalBroadcastMessageName, + ReaderBroadcastMessage, +} from '../broadcast'; +import { ReaderDependencies } from './reader.dependencies'; +import { BroadcastMessage } from '@alien-worlds/aw-broadcast'; +import { ReadTaskData } from './reader.types'; +import { ReaderConfig } from './reader.config'; +import { BlockState, UnprocessedBlockQueue } from '../common'; + +/** + * Updates the block state based on the highest block in the queue. + * + * @async + * @param {UnprocessedBlockQueue} blockQueue - The block queue. + * @param {BlockState} blockState - The block state. + * @returns {Promise} + */ +export const updateBlockState = async ( + blockQueue: UnprocessedBlockQueue, + blockState: BlockState +): Promise => { + const { content: maxBlock } = await blockQueue.getMax(); + if (maxBlock) { + const { failure } = await blockState.updateBlockNumber( + maxBlock.thisBlock.blockNumber + ); + if (failure) { + log('Something went wrong, the block state was not updated.'); + } + } else { + log( + 'Something went wrong, the block with the highest number was not found/received.' + ); + } +}; + +/** + * Logs the reading progress of block numbers. + * + * @param {bigint[]} blockNumbers - Array of block numbers to log. + */ +export const logReadingProgress = (blockNumbers: bigint[], blocksDiff?: bigint) => { + if (blocksDiff && blockNumbers.length === 1) { + const blockNumber = blockNumbers[0]; + log( + `The block range (${( + blockNumber - blocksDiff + ).toString()}-${blockNumber.toString()}) has been read.` + ); + } else { + const sorted = blockNumbers.sort(); + const min = sorted[0]; + const max = sorted.reverse()[0]; + log(`The block range (${min.toString()}-${max.toString()}) has been read.`); + } +}; + +/** + * Handles the block when received. This involves adding it to the queue, + * logging progress, updating block state, and handling any errors that occur. + * + * @async + * @param {Block} block - The block received. + * @param {ReadTaskData} task - The task data for reading blocks. + * @param {UnprocessedBlockQueue} blockQueue - The block queue. + * @param {BlockState} blockState - The block state. + * @param {number} maxBytesSize - The max byte size for the unprocessed block collection. + * @returns {Promise} + */ +export const handleReceivedBlock = async ( + block: Block, + task: ReadTaskData, + blockQueue: UnprocessedBlockQueue, + blockState: BlockState, + blockReader: BlockReader, + maxBytesSize, + batchSize +) => { + const { startBlock, endBlock } = task; + const isLast = endBlock === block.thisBlock.blockNumber; + const isFastLane = block.thisBlock.blockNumber >= block.lastIrreversible.blockNumber; + const blocksDiff = batchSize > 1 ? parseToBigInt(batchSize) : 100n; + blockReader.pause(); + + const { content: insertionResult, failure } = await blockQueue.add(block, { + isFastLane, + isLast, + predictedRangeSize: Number(endBlock - startBlock), + }); + + if (failure) { + log(failure.error); + } else if (insertionResult) { + if (insertionResult.insertedBlocks.length > 0) { + await updateBlockState(blockQueue, blockState); + + if ( + insertionResult.insertedBlocks.length === 1 && + (block.thisBlock.blockNumber - task.startBlock) % blocksDiff === 0n + ) { + logReadingProgress(insertionResult.insertedBlocks, blocksDiff); + } else if (insertionResult.insertedBlocks.length > 1) { + logReadingProgress(insertionResult.insertedBlocks); + } + } + if (insertionResult.queueOverloadSize > 0) { + log( + `The size limit ${maxBytesSize} of the unprocessed blocks collection has been exceeded by ${insertionResult.queueOverloadSize}. Blockchain reading suspended until the collection is cleared.` + ); + await blockQueue.waitForQueueToClear(1000, 10); + } + } + + blockReader.resume(); +}; + +/** + * Reads blocks in default mode. The function sets up the block reader, + * listens for messages, and manages block reading tasks. + * + * @param {ReaderConfig} config - Configuration for the reader. + * @param {ReaderDependencies} dependencies - Dependencies required by the reader. + */ +export const readBlocksInDefaultMode = ( + config: ReaderConfig, + dependencies: ReaderDependencies +) => { + let task: ReadTaskData; + const { broadcastClient, blockReader, unprocessedBlockQueue, blockState } = + dependencies; + const { + maxBlockNumber, + blockReader: { shouldFetchDeltas, shouldFetchTraces, shouldFetchBlock }, + unprocessedBlockQueue: { maxBytesSize, batchSize }, + } = config; + const channel = InternalBroadcastChannel.DefaultModeReader; + const readyMessage = ReaderBroadcastMessage.defaultModeReady(); + + log(`Reader started in "listening" mode`); + broadcastClient.onMessage(channel, async (message: BroadcastMessage) => { + const { data, name } = message; + task = data; + if (name === InternalBroadcastMessageName.ReaderTask) { + blockReader.readBlocks( + task.startBlock, + task.endBlock || parseToBigInt(maxBlockNumber || 0xffffffff), + { + shouldFetchBlock, + shouldFetchDeltas, + shouldFetchTraces, + } + ); + } + }); + + broadcastClient.connect(); + + blockReader.onConnected(async () => { + broadcastClient.sendMessage(readyMessage); + }); + + blockReader.onDisconnected(async () => { + if (config.blockReader.autoReconnect === false) { + blockReader.connect(); + } + }); + + blockReader.onReceivedBlock(async block => + handleReceivedBlock( + block, + task, + unprocessedBlockQueue, + blockState, + blockReader, + maxBytesSize, + batchSize + ) + ); + + blockReader.onError(error => { + log(error); + }); + + blockReader.onComplete(async () => { + log(`The block range (${task?.startBlock}-${task?.endBlock}) has been read.`); + }); + + blockReader.connect(); + + log(`Reader ... [ready]`); +}; diff --git a/src/reader/read-blocks-in-replay-mode.ts b/src/reader/read-blocks-in-replay-mode.ts new file mode 100644 index 0000000..9cbe850 --- /dev/null +++ b/src/reader/read-blocks-in-replay-mode.ts @@ -0,0 +1,151 @@ +import { WorkerPool } from '@alien-worlds/aw-workers'; +import { ReadCompleteData, ReadTaskData } from './reader.types'; +import { BlockRangeScanner, Mode } from '../common'; +import { log } from '@alien-worlds/aw-core'; +import { ReaderWorkerMessage } from './reader.worker-message'; +import { ReaderConfig } from './reader.config'; +import { ReaderDependencies } from './reader.dependencies'; +import { + FilterBroadcastMessage, + InternalBroadcastChannel, + ReaderBroadcastMessage, +} from '../broadcast'; +import { readerWorkerLoaderPath } from './reader.consts'; +import { BroadcastClient, BroadcastMessage } from '@alien-worlds/aw-broadcast'; + +let loop = false; +let currentTask: ReadTaskData; +let workerPool: WorkerPool; +let scanner: BlockRangeScanner; +let broadcastClient: BroadcastClient; + +/** + * Handles an error from a worker. + * + * @async + * @param {number} id - The worker ID. + * @param {Error} error - The error thrown by the worker. + */ +export const handleWorkerError = async (id: number, error: Error) => { + log(`Worker error:`, error); + workerPool.releaseWorker(id); +}; + +/** + * Handles messages from a worker. + * + * @async + * @param {ReaderWorkerMessage} message - The message from the worker. + */ +export const handleWorkerMessage = async (message: ReaderWorkerMessage) => { + const { data, error, workerId } = message; + + if (message.isTaskResolved()) { + const { startBlock, endBlock } = data; + log( + `All blocks in the range ${startBlock.toString()} - ${endBlock.toString()} (exclusive) have been read.` + ); + workerPool.releaseWorker(workerId, data); + } else if (message.isTaskRejected()) { + log(`An unexpected error occurred while reading blocks...`, error); + workerPool.releaseWorker(workerId); + } else if (message.isTaskProgress()) { + broadcastClient.sendMessage(FilterBroadcastMessage.refresh()); + } else { + log(`Unhandled message`, message); + } +}; + +/** + * Reads blocks based on the task provided. + * This function contains the main loop logic for managing tasks and worker threads. + * + * @async + * @param {ReadTaskData} task - Data about which blocks to read. + */ +export const read = async (task: ReadTaskData) => { + if (loop) { + return; + } + loop = true; + + if (!currentTask) { + currentTask = task; + log( + `Preparation for scanning block range (${task.startBlock}-${task.endBlock}) ${ + task.mode === Mode.Replay ? 'under the label ' + task.scanKey : '' + }` + ); + } + + while (loop) { + const worker = await workerPool.getWorker(); + if (worker) { + worker.onMessage((message: ReaderWorkerMessage) => handleWorkerMessage(message)); + worker.onError((id, error) => { + log(`Worker error:`, error); + workerPool.releaseWorker(id, task); + }); + + const scan = await scanner.getNextScanNode(task.scanKey); + + if (scan) { + worker.run({ + startBlock: scan.start, + endBlock: scan.end, + scanKey: task.scanKey, + }); + } else { + log( + `The scan of the range ${task.startBlock}-${task.endBlock}(exclusive) under the label "${task.scanKey}" has already been completed. No subranges to process.` + ); + workerPool.releaseWorker(worker.id, task); + loop = false; + } + } else { + loop = false; + } + } +}; + +/** + * Reads blocks in replay mode. + * Configures and initializes worker threads to read blocks, listens for messages from these workers, + * and manages tasks accordingly. + * + * @async + * @param {ReaderConfig} config - Configuration settings for the reader. + * @param {ReaderDependencies} dependencies - Necessary dependencies for the reader. + */ +export const readBlocksInReplayMode = async ( + config: ReaderConfig, + dependencies: ReaderDependencies +) => { + const { workerLoaderPath, workerLoaderDependenciesPath } = dependencies; + const channel = InternalBroadcastChannel.DefaultModeReader; + + broadcastClient = dependencies.broadcastClient; + scanner = dependencies.scanner; + workerPool = await WorkerPool.create({ + ...config.workers, + sharedData: { config }, + workerLoaderPath: workerLoaderPath || readerWorkerLoaderPath, + workerLoaderDependenciesPath, + }); + workerPool.onWorkerRelease(async (id, data: ReadTaskData) => { + if (await scanner.hasUnscannedBlocks(data.scanKey)) { + read(data); + } + }); + + log(`Reader started in "listening" mode`); + broadcastClient.onMessage(channel, async (message: BroadcastMessage) => { + const { data } = message; + read(data); + }); + + broadcastClient.connect(); + broadcastClient.sendMessage(ReaderBroadcastMessage.replayModeReady()); + + log(`Reader ... [ready]`); +}; diff --git a/src/reader/reader.command.ts b/src/reader/reader.command.ts index 388e387..b47c34a 100644 --- a/src/reader/reader.command.ts +++ b/src/reader/reader.command.ts @@ -6,7 +6,7 @@ readerCommand .version('1.0', '-v, --version') .option('-k, --scan-key ', 'Scan key') .option('-s, --start-block ', 'Start at this block') - .option('-m, --mode ', 'Mode (default/replay/test)') + .option('-m, --mode ', 'Mode (default/replay)') .option('-e, --end-block ', 'End block (exclusive)') .option('-t, --threads ', 'Number of threads') .parse(process.argv); diff --git a/src/reader/reader.dependencies.ts b/src/reader/reader.dependencies.ts index 2a2fec2..f568346 100644 --- a/src/reader/reader.dependencies.ts +++ b/src/reader/reader.dependencies.ts @@ -1,7 +1,12 @@ import { BroadcastClient } from '@alien-worlds/aw-broadcast'; -import { Result } from '@alien-worlds/aw-core'; +import { BlockReader, Result } from '@alien-worlds/aw-core'; import { Dependencies } from '../common/dependencies'; -import { BlockRangeScanner, DatabaseConfigBuilder } from '../common'; +import { + BlockRangeScanner, + BlockState, + DatabaseConfigBuilder, + UnprocessedBlockQueue, +} from '../common'; import { ReaderConfig } from './reader.config'; /** @@ -10,9 +15,12 @@ import { ReaderConfig } from './reader.config'; */ export abstract class ReaderDependencies extends Dependencies { public broadcastClient: BroadcastClient; - public scanner: BlockRangeScanner; + public blockState?: BlockState; + public unprocessedBlockQueue?: UnprocessedBlockQueue; + public blockReader?: BlockReader; + public scanner?: BlockRangeScanner; public workerLoaderPath?: string; - public workerLoaderDependenciesPath: string; + public workerLoaderDependenciesPath?: string; public databaseConfigBuilder: DatabaseConfigBuilder; diff --git a/src/reader/reader.errors.ts b/src/reader/reader.errors.ts new file mode 100644 index 0000000..daaf8f7 --- /dev/null +++ b/src/reader/reader.errors.ts @@ -0,0 +1,5 @@ +export class BlockReaderNotConnected extends Error { + constructor() { + super(`The block reader is not connected.`); + } +} diff --git a/src/reader/reader.ts b/src/reader/reader.ts deleted file mode 100644 index a98be8e..0000000 --- a/src/reader/reader.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { ReadCompleteData, ReadTaskData } from './reader.types'; -import { FilterBroadcastMessage } from '../broadcast/messages'; -import { log } from '@alien-worlds/aw-core'; -import { BroadcastClient } from '@alien-worlds/aw-broadcast'; -import { WorkerPool, WorkerMessage } from '@alien-worlds/aw-workers'; -import { BlockRangeScanner, Mode } from '../common'; - -export class Reader { - private loop = false; - private initTaskData: ReadTaskData; - - constructor( - protected broadcastClient: BroadcastClient, - protected scanner: BlockRangeScanner, - protected workerPool: WorkerPool - ) { - workerPool.onWorkerRelease(async () => { - const { initTaskData } = this; - if (initTaskData.mode === Mode.Replay) { - if (await scanner.hasUnscannedBlocks(initTaskData.scanKey)) { - this.read(initTaskData); - } - } else { - // - } - }); - } - - private async handleWorkerMessage(message: WorkerMessage) { - const { workerPool, broadcastClient } = this; - const { data, error, workerId } = message; - - if (message.isTaskResolved()) { - const { startBlock, endBlock } = data; - log( - `All blocks in the range ${startBlock.toString()} - ${endBlock.toString()} (exclusive) have been read.` - ); - workerPool.releaseWorker(workerId, data); - } else if (message.isTaskRejected()) { - log(`An unexpected error occurred while reading blocks...`, error); - workerPool.releaseWorker(workerId); - } else if (message.isTaskProgress()) { - broadcastClient.sendMessage(FilterBroadcastMessage.refresh()); - } - } - - private async handleWorkerError(id: number, error: Error) { - const { workerPool } = this; - log(`Worker error:`, error); - workerPool.releaseWorker(id); - } - - public async read(task: ReadTaskData) { - if (this.loop) { - return; - } - this.loop = true; - - if (!this.initTaskData) { - this.initTaskData = task; - log( - `Preparation for scanning block range (${task.startBlock}-${task.endBlock}) ${ - task.mode === Mode.Replay ? 'under the label ' + task.scanKey : '' - }` - ); - } - - const { workerPool, scanner, initTaskData } = this; - - while (this.loop) { - const worker = await workerPool.getWorker(); - if (worker) { - worker.onMessage(message => this.handleWorkerMessage(message)); - worker.onError((id, error) => this.handleWorkerError(id, error)); - - if (task.mode === Mode.Default || task.mode === Mode.Test) { - worker.run({ startBlock: task.startBlock, endBlock: task.endBlock }); - } else if (task.mode === Mode.Replay) { - const scan = await scanner.getNextScanNode(task.scanKey); - - if (scan) { - worker.run({ - startBlock: scan.start, - endBlock: scan.end, - scanKey: initTaskData.scanKey, - }); - } else { - log( - `The scan of the range ${initTaskData.startBlock}-${initTaskData.endBlock}(exclusive) under the label "${initTaskData.scanKey}" has already been completed. No subranges to process.` - ); - workerPool.releaseWorker(worker.id); - this.loop = false; - } - } - } else { - this.loop = false; - } - } - } -} diff --git a/src/reader/reader.worker-loader.ts b/src/reader/reader.worker-loader.ts index a204f03..a0d0638 100644 --- a/src/reader/reader.worker-loader.ts +++ b/src/reader/reader.worker-loader.ts @@ -1,7 +1,11 @@ -import { Worker, DefaultWorkerLoader } from '@alien-worlds/aw-workers'; +import { + Worker, + DefaultWorkerLoader, +} from '@alien-worlds/aw-workers'; import { ReaderWorkerLoaderDependencies } from './reader.worker-loader.dependencies'; -import { log } from '@alien-worlds/aw-core'; import ReaderWorker, { ReaderSharedData } from './reader.worker'; +import { threadId } from 'worker_threads'; +import { ReaderWorkerMessage } from './reader.worker-message'; export default class ReaderWorkerLoader extends DefaultWorkerLoader< ReaderSharedData, @@ -12,36 +16,15 @@ export default class ReaderWorkerLoader extends DefaultWorkerLoader< await super.setup(sharedData, config); // const { - unprocessedBlockQueue: { maxBytesSize, sizeCheckInterval }, - } = config; - const { - dependencies: { blockQueue: blocksQueue, blockReader }, + dependencies: { blockReader }, } = this; - blocksQueue.onOverload(size => { - const overload = size - maxBytesSize; - log(`Overload: ${overload} bytes.`); - blockReader.pause(); - - let interval = setInterval(async () => { - const { content: size, failure } = await blocksQueue.getBytesSize(); - if (failure) { - log( - `Failed to get unprocessed blocks collection size: ${failure.error.message}` - ); - } else if (size === 0) { - log(`Unprocessed blocks collection cleared, blockchain reading resumed.`); - blockReader.resume(); - clearInterval(interval); - interval = null; - } - }, sizeCheckInterval || 1000); + blockReader.onConnected(async () => { + this.sendMessage(ReaderWorkerMessage.createBlockReaderConnectInfo(threadId)); }); - blocksQueue.beforeSendBatch(() => { - blockReader.pause(); - }); - blocksQueue.afterSendBatch(() => { - blockReader.resume(); + + blockReader.onDisconnected(async () => { + this.sendMessage(ReaderWorkerMessage.createBlockReaderDisconnectWarning(threadId)); }); await blockReader.connect(); diff --git a/src/reader/reader.worker-message.ts b/src/reader/reader.worker-message.ts new file mode 100644 index 0000000..9426b18 --- /dev/null +++ b/src/reader/reader.worker-message.ts @@ -0,0 +1,42 @@ +import { + WorkerMessage, + WorkerMessageName, + WorkerMessageType, +} from '@alien-worlds/aw-workers'; + +export enum ReaderWorkerMessageName { + BlockReaderConnected = 'block_reader_connected', + BlockReaderDisconnected = 'block_reader_disconnected', +} + +export class ReaderWorkerMessage extends WorkerMessage { + public static createBlockReaderDisconnectWarning(workerId: number) { + return WorkerMessage.create({ + workerId, + type: WorkerMessageType.Info, + name: ReaderWorkerMessageName.BlockReaderDisconnected, + }); + } + + public static createBlockReaderConnectInfo(workerId: number) { + return WorkerMessage.create({ + workerId, + type: WorkerMessageType.Info, + name: ReaderWorkerMessageName.BlockReaderConnected, + }); + } + + public isBlockReaderDisconnectWarning(): boolean { + return ( + this.type === WorkerMessageType.Warning && + this.name === ReaderWorkerMessageName.BlockReaderDisconnected + ); + } + + public isBlockReaderConnectInfo(): boolean { + return ( + this.type === WorkerMessageType.Info && + this.name === ReaderWorkerMessageName.BlockReaderDisconnected + ); + } +} diff --git a/src/reader/reader.worker.ts b/src/reader/reader.worker.ts index 49198c9..6c77dae 100644 --- a/src/reader/reader.worker.ts +++ b/src/reader/reader.worker.ts @@ -1,14 +1,40 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ import { Worker } from '@alien-worlds/aw-workers'; import { ReaderConfig } from './reader.config'; -import { BlockReader, log, parseToBigInt } from '@alien-worlds/aw-core'; +import { Block, BlockReader, log, parseToBigInt } from '@alien-worlds/aw-core'; import { UnprocessedBlockQueue, BlockState, BlockRangeScanner, Mode } from '../common'; +import { BlockReaderNotConnected } from './reader.errors'; +/** + * The shared data type that the ReaderWorker class uses. + * @typedef {Object} ReaderSharedData + * @property {ReaderConfig} config - Configuration settings for the reader. + */ export type ReaderSharedData = { config: ReaderConfig; }; +/** + * A worker class that reads blocks and reports progress/errors. This class utilizes + * the worker pattern to offload specific tasks away from the main thread. + * + * @extends {Worker} + */ export default class ReaderWorker extends Worker { + private startBlock: bigint; + private endBlock: bigint; + private scanKey: string; + + /** + * Constructs a new ReaderWorker. + * + * @param {Object} dependencies - External services or classes required by the worker. + * @param {BlockReader} dependencies.blockReader - A service for reading blocks. + * @param {UnprocessedBlockQueue} dependencies.blockQueue - A queue for blocks that haven't been processed. + * @param {BlockState} dependencies.blockState - Represents the state of a block. + * @param {BlockRangeScanner} dependencies.scanner - A service for scanning block ranges. + * @param {ReaderSharedData} sharedData - Shared data used by the worker. + */ constructor( protected dependencies: { blockReader: BlockReader; @@ -22,25 +48,12 @@ export default class ReaderWorker extends Worker { this.sharedData = sharedData; } - private async updateBlockState(): Promise { - const { - dependencies: { blockQueue, blockState }, - } = this; - const { content: maxBlock } = await blockQueue.getMax(); - if (maxBlock) { - const { failure } = await blockState.updateBlockNumber( - maxBlock.thisBlock.blockNumber - ); - if (failure) { - log('Something went wrong, the block state was not updated.'); - } - } else { - log( - 'Something went wrong, the block with the highest number was not found/received.' - ); - } - } - + /** + * Logs the progress of reading blocks. + * + * @private + * @param {bigint[]} blockNumbers - List of block numbers that have been read. + */ private logProgress(blockNumbers: bigint[]) { const sorted = blockNumbers.sort(); const min = sorted[0]; @@ -49,103 +62,74 @@ export default class ReaderWorker extends Worker { log(`Blocks ${min.toString()}-${max.toString()} have been read.`); } - private async readInDefaultMode(startBlock: bigint, endBlock: bigint) { + /** + * Handles a received block, logs progress, updates scanning progress, and handles potential errors. + * + * @private + * @async + * @param {Block} block - The block that has been received. + */ + private async handleReceivedBlock(block: Block) { const { - dependencies: { blockReader, blockQueue }, + dependencies: { blockReader, blockQueue, scanner }, sharedData: { - config: { - maxBlockNumber, - blockReader: { shouldFetchDeltas, shouldFetchTraces, shouldFetchBlock }, - unprocessedBlockQueue, - }, + config: { unprocessedBlockQueue }, }, + startBlock, + endBlock, + scanKey, } = this; - const rangeSize = endBlock - startBlock; - blockReader.onReceivedBlock(async block => { - const isLast = endBlock === block.thisBlock.blockNumber; - const isFastLane = - block.thisBlock.blockNumber >= block.lastIrreversible.blockNumber; - - const { content: addedBlockNumbers, failure } = await blockQueue.add(block, { - isFastLane, - isLast, - predictedRangeSize: Number(rangeSize), - }); - if (Array.isArray(addedBlockNumbers) && addedBlockNumbers.length > 0) { - this.logProgress(addedBlockNumbers); - - await this.updateBlockState(); - this.progress(); - // - } else if (failure?.error.name === 'DuplicateBlocksError') { - log(failure.error.message); - } else if (failure?.error.name === 'UnprocessedBlocksOverloadError') { - log(failure.error.message); - log( - `The size limit ${unprocessedBlockQueue.maxBytesSize} of the unprocessed blocks collection has been exceeded. Blockchain reading suspended until the collection is cleared.` - ); - } else if (failure) { - this.reject(failure.error); - } else { - // - } - }); + blockReader.pause(); - blockReader.onError(error => { - this.reject(error); - }); + const { content: insertionResult, failure } = await blockQueue.add(block); - blockReader.onComplete(async () => { - this.resolve({ startBlock, endBlock }); - }); + if (failure) { + log(failure.error); + this.reject(failure.error); + } else if (insertionResult) { + this.logProgress(insertionResult.insertedBlocks); + this.progress({ startBlock, endBlock, scanKey }); - blockReader.readBlocks( - startBlock, - endBlock || parseToBigInt(maxBlockNumber || 0xffffffff), - { - shouldFetchBlock, - shouldFetchDeltas, - shouldFetchTraces, + for (const blockNumber of insertionResult.insertedBlocks) { + await scanner.updateScanProgress(scanKey, blockNumber); } - ); + + blockReader.resume(); + } } - private async readInReplayMode(startBlock: bigint, endBlock: bigint, scanKey: string) { + /** + * Initiates the reading process for a range of blocks. Reports errors and progress accordingly. + * + * @public + * @async + * @param {Object} args - Arguments specifying the blocks to read. + * @param {bigint} args.startBlock - The block number to start reading from. + * @param {bigint} args.endBlock - The block number to end reading at. + * @param {string} args.scanKey - The scanning key. + * @returns {Promise} + */ + public async run(args: { + startBlock: bigint; + endBlock: bigint; + scanKey: string; + }): Promise { + const { startBlock, endBlock, scanKey } = args; const { - dependencies: { blockReader, blockQueue, scanner }, + dependencies: { blockReader }, sharedData: { config: { blockReader: { shouldFetchDeltas, shouldFetchTraces, shouldFetchBlock }, - unprocessedBlockQueue, }, }, } = this; - const rangeSize = endBlock - startBlock; - - blockReader.onReceivedBlock(async block => { - const { content: addedBlockNumbers, failure } = await blockQueue.add(block); - if (Array.isArray(addedBlockNumbers) && addedBlockNumbers.length > 0) { - this.logProgress(addedBlockNumbers); - - for (const blockNumber of addedBlockNumbers) { - await scanner.updateScanProgress(scanKey, blockNumber); - } - - this.progress({ startBlock, endBlock, scanKey }); - // - } else if (failure?.error.name === 'DuplicateBlocksError') { - log(failure.error.message); - } else if (failure?.error.name === 'UnprocessedBlocksOverloadError') { - log(failure.error.message); - log( - `The size limit ${unprocessedBlockQueue.maxBytesSize} of the unprocessed blocks collection has been exceeded by bytes. Blockchain reading suspended until the collection is cleared.` - ); - } else if (failure) { - this.reject(failure.error); - } - }); + this.startBlock = startBlock; + this.endBlock = endBlock; + this.scanKey = scanKey; + + blockReader.onReceivedBlock(block => this.handleReceivedBlock(block)); blockReader.onError(error => { this.reject(error); @@ -155,31 +139,14 @@ export default class ReaderWorker extends Worker { this.resolve({ startBlock, endBlock, scanKey }); }); - blockReader.readBlocks(startBlock, endBlock, { - shouldFetchBlock, - shouldFetchDeltas, - shouldFetchTraces, - }); - } - - public async run(args: { - startBlock: bigint; - endBlock: bigint; - scanKey: string; - }): Promise { - const { startBlock, endBlock, scanKey } = args; - const { - sharedData: { - config: { mode }, - }, - } = this; - - if (mode === Mode.Replay) { - this.readInReplayMode(startBlock, endBlock, scanKey); - } else if (mode === Mode.Default || mode === Mode.Test) { - this.readInDefaultMode(startBlock, endBlock); + if (blockReader.isConnected()) { + blockReader.readBlocks(startBlock, endBlock, { + shouldFetchBlock, + shouldFetchDeltas, + shouldFetchTraces, + }); } else { - log(`Unknown mode ${mode}`); + this.reject(new BlockReaderNotConnected()); } } } diff --git a/src/reader/start-reader.ts b/src/reader/start-reader.ts index 71962e3..0f4ad7b 100644 --- a/src/reader/start-reader.ts +++ b/src/reader/start-reader.ts @@ -1,29 +1,29 @@ -/* eslint-disable @typescript-eslint/no-empty-function */ -/* eslint-disable @typescript-eslint/no-unused-vars */ -import { - InternalBroadcastChannel, - InternalBroadcastMessageName, -} from '../broadcast/internal-broadcast.enums'; -import { ReadTaskData, ReaderCommandOptions } from './reader.types'; -import { ReaderBroadcastMessage } from '../broadcast/messages/reader-broadcast.message'; -import { Reader } from './reader'; +import { ReaderCommandOptions } from './reader.types'; import { readerCommand } from './reader.command'; import { buildReaderConfig } from '../config'; -import { readerWorkerLoaderPath } from './reader.consts'; import { log, ConfigVars } from '@alien-worlds/aw-core'; -import { BroadcastMessage } from '@alien-worlds/aw-broadcast'; -import { WorkerPool } from '@alien-worlds/aw-workers'; -import { Mode } from '../common'; -import { ReaderConfig } from './reader.config'; +import { Mode, UnknownModeError } from '../common'; import { ReaderDependencies } from './reader.dependencies'; +import { readBlocksInDefaultMode } from './read-blocks-in-default-mode'; +import { readBlocksInReplayMode } from './read-blocks-in-replay-mode'; /** - * - * @param config - * @returns + * Initiates the reader based on provided arguments and dependencies. This function parses the provided arguments, + * builds the reader's configuration, and initializes the reading mode (Replay or Default). + * + * @param {string[]} args - Arguments that specify command options for the reader. + * @param {ReaderDependencies} dependencies - External dependencies required by the reader. + * + * @returns {Promise} Returns a promise which resolves when the reader has been successfully started or rejects + * with an error if an issue is encountered during initialization or an unknown mode is detected. + * + * @throws {UnknownModeError} Throws an error if the mode specified in the configuration is neither Replay nor Default. */ -export const read = async (config: ReaderConfig, dependencies: ReaderDependencies) => { +export const startReader = async (args: string[], dependencies: ReaderDependencies) => { log(`Reader ... [starting]`); + const vars = new ConfigVars(); + const options = readerCommand.parse(args).opts(); + const config = buildReaderConfig(vars, dependencies.databaseConfigBuilder, options); const initResult = await dependencies.initialize(config); @@ -31,46 +31,13 @@ export const read = async (config: ReaderConfig, dependencies: ReaderDependencie throw initResult.failure.error; } - const { broadcastClient, scanner, workerLoaderPath, workerLoaderDependenciesPath } = - dependencies; - - const workerPool = await WorkerPool.create({ - ...config.workers, - sharedData: { config }, - workerLoaderPath: workerLoaderPath || readerWorkerLoaderPath, - workerLoaderDependenciesPath, - }); - - const reader = new Reader(broadcastClient, scanner, workerPool); - - let channel: string; - let readyMessage; - if (config.mode === Mode.Replay) { - channel = InternalBroadcastChannel.ReplayModeReader; - readyMessage = ReaderBroadcastMessage.replayModeReady(); - } else { - channel = InternalBroadcastChannel.DefaultModeReader; - readyMessage = ReaderBroadcastMessage.defaultModeReady(); + return readBlocksInReplayMode(config, dependencies); } - log(`Reader started in "listening" mode`); - broadcastClient.onMessage(channel, async (message: BroadcastMessage) => { - const { data, name } = message; - if (name === InternalBroadcastMessageName.ReaderTask) { - reader.read(data); - } - }); - broadcastClient.connect(); - // Everything is ready, notify the bootstrap that the process is ready to work - broadcastClient.sendMessage(readyMessage); - - log(`Reader ... [ready]`); -}; + if (config.mode === Mode.Default) { + return readBlocksInDefaultMode(config, dependencies); + } -export const startReader = (args: string[], dependencies: ReaderDependencies) => { - const vars = new ConfigVars(); - const options = readerCommand.parse(args).opts(); - const config = buildReaderConfig(vars, dependencies.databaseConfigBuilder, options); - read(config, dependencies).catch(log); + throw new UnknownModeError(config.mode, [Mode.Default, Mode.Replay]); }; diff --git a/tutorials/config-vars.md b/tutorials/config-vars.md index 8e4eb0a..487eab8 100644 --- a/tutorials/config-vars.md +++ b/tutorials/config-vars.md @@ -45,7 +45,8 @@ The following settings are additional for more advanced users who want to tweak | `PROCESSOR_INVIOLABLE_THREADS_COUNT` | _number_ | The number of threads that cannot be allocated to the processor process. | 0 | | `FILTER_INVIOLABLE_THREADS_COUNT` | _number_ | The number of threads that cannot be allocated to the filter process. | 0 | | `START_FROM_HEAD` | _number_ | Specifies (1 = true/ 0 = false) whether reading a blocks should start with the head or the last irreversible block number. | 0 | -| `UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE` | _number_ | The maximum size of the queue in bytes. | 256000000 | +| `UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE` | _number_ | The maximum size of the queue in bytes. | 1024000000 + | | `UNPROCESSED_BLOCK_QUEUE_SIZE_CHECK_INTERVAL` | _number_ | Specifies the waiting time in milliseconds to check that the current queue size in bytes does not exceed the maximum allowed. | 2000 | | `UNPROCESSED_BLOCK_QUEUE_BATCH_SIZE` | _number_ | Batch size of unprocessed blocks sent to the database at one time. The batch setting can be modified to optimize the transfer consumption to the database. | 100 | | `UNPROCESSED_BLOCK_QUEUE_FAST_LANE_BATCH_SIZE` | _number_ | Value used when block number is greater than last irreversible block number. Batch size of unprocessed blocks sent to the database at one time. The batch setting can be modified to optimize the transfer consumption to the database. | 1 | diff --git a/yarn.lock b/yarn.lock index 21899ac..264ab39 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2,27 +2,27 @@ # yarn lockfile v1 -"@alien-worlds/aw-broadcast@^0.0.9": - version "0.0.9" - resolved "https://registry.yarnpkg.com/@alien-worlds/aw-broadcast/-/aw-broadcast-0.0.9.tgz#6befb593d342ca364a55b73e2b7a691f7d7ec7a6" - integrity sha512-hohs5qzmBIEwM0QIh0hAeh/XW3j2NIINlWDw6fNUNTYeBJlyXumrJmIuywIJzuKNkaoKGpj8LIzMkj0+90QeGw== +"@alien-worlds/aw-broadcast@^0.0.12": + version "0.0.12" + resolved "https://registry.yarnpkg.com/@alien-worlds/aw-broadcast/-/aw-broadcast-0.0.12.tgz#a9bcf9d54fa0f897bf50929ee2e6472337533808" + integrity sha512-I+ZnJO6x7aaV4txIrCmpgzwK8h3VgOfLhWGXoAZu1nCmJXb6c8eBsmajlybSak0aHs7+Y3R/kX69p0FTKFoyRg== dependencies: - "@alien-worlds/aw-core" "^0.0.15" + "@alien-worlds/aw-core" "^0.0.21" nanoid "^3.0.0" -"@alien-worlds/aw-core@^0.0.15": - version "0.0.15" - resolved "https://registry.yarnpkg.com/@alien-worlds/aw-core/-/aw-core-0.0.15.tgz#1dca120a8b17577be4f71b0cc83850893ba9caa4" - integrity sha512-8Q9gvAZTIBYdBvZJvEvK9D6gKzOhITkgS8Vlc6x4BrFNeLVTs8JJaV4S7ALbaKu5jy1udai1ehhJ5FZEn2OYpw== +"@alien-worlds/aw-core@^0.0.21": + version "0.0.21" + resolved "https://registry.yarnpkg.com/@alien-worlds/aw-core/-/aw-core-0.0.21.tgz#f98d105adae0685a014a831c81eec56f1c691c7b" + integrity sha512-WP6ludtofCphRdZkdcke91wZv03BJGMGHWuW5RG7wjHfhc4u5X4bnojkIZ9lA9F5ihpMAzdmieH1xo6n/M3EkQ== dependencies: inversify "^6.0.1" node-fetch "2.6.6" reflect-metadata "^0.1.13" -"@alien-worlds/aw-workers@^0.0.4": - version "0.0.4" - resolved "https://registry.yarnpkg.com/@alien-worlds/aw-workers/-/aw-workers-0.0.4.tgz#30bbc7e16a42f10dede57102dd83d3661ef30045" - integrity sha512-m7cS+uQtcKRyljS7xPccvEcZcLKx6E5O0PnwYCZ5VJPPqhdCrfJonn0ExBC/q4hsLTPjI+CK2edhhUHnCh0tlQ== +"@alien-worlds/aw-workers@^0.0.5": + version "0.0.5" + resolved "https://registry.yarnpkg.com/@alien-worlds/aw-workers/-/aw-workers-0.0.5.tgz#4d89e17d666e4a925c3f32728cf6dec16e74ff59" + integrity sha512-7th9biXDopccNKep5CJbEeSDtxQGDUBOROUZIj/GSIhY/XzKCGPOGFuLgn+N/ZoHjoQizdsSvphdXehvZrrmEw== dependencies: async "^3.2.4" ts-node "^10.9.1"