Skip to content

Commit

Permalink
Split reader (#9)
Browse files Browse the repository at this point in the history
* split reader, remove test block range
* update
* update deploy script
* change unprocessed block queue
* clear chache
* remove BlockState types
* increase list limit
* update logs, fix batch to single switch
* 0.0.12
  • Loading branch information
rkamysz authored Nov 7, 2023
1 parent f704ee9 commit f2368d9
Show file tree
Hide file tree
Showing 29 changed files with 718 additions and 540 deletions.
2 changes: 1 addition & 1 deletion .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ END_BLOCK=238581000
START_FROM_HEAD=0
MODE='default'
MAX_BLOCK_NUMBER=0xffffffff
UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE=256000000
UNPROCESSED_BLOCK_QUEUE_MAX_BYTES_SIZE=1024000000
UNPROCESSED_BLOCK_QUEUE_SIZE_CHECK_INTERVAL=2000
UNPROCESSED_BLOCK_QUEUE_BATCH_SIZE=100
BROADCAST_PORT=9000
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@alien-worlds/aw-history",
"version": "0.0.9",
"version": "0.0.12",
"description": "",
"packageManager": "[email protected]",
"main": "build/index.js",
Expand Down Expand Up @@ -38,9 +38,9 @@
"typescript": "^4.8.2"
},
"dependencies": {
"@alien-worlds/aw-broadcast": "^0.0.9",
"@alien-worlds/aw-core": "^0.0.15",
"@alien-worlds/aw-workers": "^0.0.4",
"@alien-worlds/aw-broadcast": "^0.0.12",
"@alien-worlds/aw-core": "^0.0.21",
"@alien-worlds/aw-workers": "^0.0.5",
"async": "^3.2.4",
"commander": "^10.0.1",
"crypto": "^1.0.1",
Expand Down
7 changes: 5 additions & 2 deletions scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ error_exit()
exit 1
}

# major | minor | patch
# major | minor | patch | pre
TYPE=patch

if [[ $1 =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]
Expand All @@ -15,11 +15,14 @@ if [[ $1 =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]
elif [[ $1 == "major" ]] || [[ $1 == "minor" ]] || [[ $1 == "patch" ]]
then
TYPE=$1
elif [[ $1 == "pre" ]]
then
TYPE=prepatch
elif [[ $1 == "" ]]
then
TYPE=patch
else
printf "Unknown update type \"%s\".\n- Please use \"major\" | \"minor\" | \"patch\"\n" "$1"
printf "Unknown update type \"%s\".\n- Please use \"major\" | \"minor\" | \"patch\" | \"pre\"\n" "$1"
exit 1;
fi

Expand Down
55 changes: 0 additions & 55 deletions src/bootstrap/__tests__/bootstrap.utils.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Result } from '@alien-worlds/aw-core';
import {
createDefaultModeBlockRange,
createReplayModeBlockRange,
createTestModeBlockRange,
} from '../bootstrap.utils';
import { Mode } from '../../common';
import {
Expand Down Expand Up @@ -78,60 +77,6 @@ describe('createDefaultModeBlockRange', () => {
});
});

describe('createTestModeBlockRange', () => {
const originalLog = console.log;
const blockchain = {
getLastIrreversibleBlockNumber: jest.fn().mockResolvedValue(Result.withContent(100n)),
getHeadBlockNumber: jest.fn().mockResolvedValue(Result.withContent(100n)),
} as any;
beforeEach(() => {
jest.clearAllMocks();
console.log = jest.fn();
});

afterEach(() => {
console.log = originalLog;
});

it('should create a block range in test mode when startBlock is not a bigint', async () => {
const config = {
blockchain: {},
startBlock: null,
mode: Mode.Test,
scanner: { scanKey: 'scanKey' },
startFromHead: true,
} as any;

const result = await createTestModeBlockRange(blockchain, config);

expect(result).toEqual({
startBlock: 99n,
endBlock: 100n,
mode: Mode.Test,
scanKey: 'scanKey',
});
});

it('should create a block range in test mode when startBlock is a bigint', async () => {
const config = {
blockchain: {},
startBlock: 50n,
mode: Mode.Test,
scanner: { scanKey: 'scanKey' },
startFromHead: true,
} as any;

const result = await createTestModeBlockRange(blockchain, config);

expect(result).toEqual({
startBlock: 50n,
endBlock: 51n,
mode: Mode.Test,
scanKey: 'scanKey',
});
});
});

describe('createReplayModeBlockRange', () => {
const originalLog = console.log;
const blockchain = {
Expand Down
2 changes: 1 addition & 1 deletion src/bootstrap/bootstrap.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ bootstrapCommand
.version('1.0', '-v, --version')
.option('-k, --scan-key <scan-key>', 'Scan key')
.option('-s, --start-block <start-block>', 'Start at this block')
.option('-m, --mode <mode>', 'Mode (default/replay/test)')
.option('-m, --mode <mode>', 'Mode (default/replay)')
.option('-e, --end-block <end-block>', 'End block (exclusive)');
45 changes: 6 additions & 39 deletions src/bootstrap/bootstrap.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ export const createBlockRangeTaskInput = async (
} else if (mode === Mode.Replay) {
//
return createReplayModeBlockRange(scanner, blockchain, config);
} else if (mode === Mode.Test) {
//
return createTestModeBlockRange(blockchain, config);
} else {
//
throw new UnknownModeError(mode);
throw new UnknownModeError(mode, [Mode.Default, Mode.Replay]);
}
};

Expand Down Expand Up @@ -117,41 +114,11 @@ export const createDefaultModeBlockRange = async (
throw new StartBlockHigherThanEndBlockError(lowEdge, highEdge);
}

return { startBlock: lowEdge, endBlock: highEdge, mode, scanKey };
};

/**
* Creates a block range in test mode.
*
* @async
* @param {BlockchainService} blockchain - The blockchain service.
* @param {BootstrapConfig} config - The bootstrap configuration.
* @returns {Promise<BlockRangeData>} The block range data.
*/
export const createTestModeBlockRange = async (
blockchain: BlockchainService,
config: BootstrapConfig
): Promise<BlockRangeData> => {
const {
startBlock,
mode,
scanner: { scanKey },
startFromHead,
} = config;

const { content: lastIrreversibleBlock } =
await blockchain.getLastIrreversibleBlockNumber();
const { content: headBlock } = await blockchain.getHeadBlockNumber();

let highEdge: bigint;
let lowEdge: bigint;

if (typeof startBlock !== 'bigint') {
highEdge = startFromHead ? headBlock : lastIrreversibleBlock;
lowEdge = highEdge - 1n;
} else {
lowEdge = startBlock;
highEdge = startBlock + 1n;
if (highEdge === currentBlockNumber + 1n) {
log(
` The current state of the block indicates that the block range up to ${highEdge.toString()} has been read. If this is an error, check the database and reset the process.`
);
return null;
}

return { startBlock: lowEdge, endBlock: highEdge, mode, scanKey };
Expand Down
24 changes: 13 additions & 11 deletions src/bootstrap/start-bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
import {
createDefaultModeBlockRange,
createReplayModeBlockRange,
createTestModeBlockRange,
} from './bootstrap.utils';
import { BootstrapCommandOptions } from './bootstrap.types';
import { NoAbisError } from './bootstrap.errors';
Expand Down Expand Up @@ -80,6 +79,14 @@ export const bootstrap = async (
throw new NoAbisError();
}

log(` * Initialize block state ... [starting]`);
const initStateResult = await blockState.initState();

if (initStateResult.isFailure) {
throw initStateResult.failure.error;
}
log(` * Initialize block state ... [done]`);

if (config.mode === Mode.Replay) {
blockRange = await createReplayModeBlockRange(scanner, blockchain, config);
}
Expand All @@ -90,16 +97,11 @@ export const bootstrap = async (
if (message.name === InternalBroadcastMessageName.DefaultModeReaderReady) {
if (config.mode === Mode.Default) {
blockRange = await createDefaultModeBlockRange(blockState, blockchain, config);
broadcastClient.sendMessage(
ReaderBroadcastMessage.newDefaultModeTask(blockRange)
);
}

if (config.mode === Mode.Test) {
blockRange = await createTestModeBlockRange(blockchain, config);
broadcastClient.sendMessage(
ReaderBroadcastMessage.newDefaultModeTask(blockRange)
);
if (blockRange) {
broadcastClient.sendMessage(
ReaderBroadcastMessage.newDefaultModeTask(blockRange)
);
}
}
} else if (message.name === InternalBroadcastMessageName.ReplayModeReaderReady) {
broadcastClient.sendMessage(ReaderBroadcastMessage.newReplayModeTask(blockRange));
Expand Down
10 changes: 5 additions & 5 deletions src/common/abis/abis.repository-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ export class AbisRepositoryImpl
const where = Where.bind<ContractEncodedAbi>();

if (startBlock && endBlock) {
where.props().blockNumber.isGte(startBlock).isLte(endBlock);
where.prototype().blockNumber.isGte(startBlock).isLte(endBlock);
}

if (contracts) {
where.props().contract.isIn(contracts);
where.prototype().contract.isIn(contracts);
}

return this.find(FindParams.create({ where }));
Expand All @@ -89,7 +89,7 @@ export class AbisRepositoryImpl
}

const where = Where.bind<ContractEncodedAbi>();
where.props().blockNumber.isLte(blockNumber).props().contract.isEq(contract);
where.prototype().blockNumber.isLte(blockNumber).prototype().contract.isEq(contract);
const { content, failure } = await this.find(
FindParams.create({ where, limit: 1, sort: { block_number: -1 } })
);
Expand Down Expand Up @@ -135,11 +135,11 @@ export class AbisRepositoryImpl
const where = Where.bind<ContractEncodedAbi>();

if (typeof startBlock === 'bigint') {
where.props().blockNumber.isGte(startBlock);
where.prototype().blockNumber.isGte(startBlock);
}

if (typeof endBlock === 'bigint') {
where.props().blockNumber.isLte(endBlock);
where.prototype().blockNumber.isLte(endBlock);
}

return this.count(CountParams.create({ where }));
Expand Down
58 changes: 36 additions & 22 deletions src/common/block-state/block-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,69 @@ import {
RepositoryImpl,
Result,
} from '@alien-worlds/aw-core';
import { BlockStateModel } from './block-state.types';
import { BlockStateEntity } from './block-state.types';

/**
* A class representing a block state.
*/
export class BlockState extends RepositoryImpl<BlockStateModel, unknown> {
export class BlockState extends RepositoryImpl<BlockStateEntity, unknown> {
/**
* Creates an instance of the BlockState class.
*
* @param {DataSource<BlockStateMongoModel>} source - The data source.
* @param {BlockStateMongoMapper} mapper - The data mapper.
* @param {DataSource} source - The data source.
* @param {Mapper<BlockStateEntity>} mapper - The data mapper.
* @param {QueryBuilders} queryBuilders - The query builders.
* @param {QueryBuilder} updateBlockNumberQueryBuilder - The query builder to update block number.
*/
constructor(
source: DataSource,
mapper: Mapper<BlockStateModel>,
mapper: Mapper<BlockStateEntity>,
queryBuilders: QueryBuilders,
private updateBlockNumberQueryBuilder: QueryBuilder
) {
super(source, mapper, queryBuilders);
}

/**
* Initialize state if not already set.
*/
public async initState(): Promise<Result<void>> {
try {
const { content: states } = await this.find();

if (states.length === 0) {
await this.add([
{
lastModifiedTimestamp: new Date(),
actions: [],
tables: [],
blockNumber: 0n,
},
]);
}
return Result.withoutContent();
} catch (error) {
return Result.withFailure(Failure.fromError(error));
}
}

/**
* Fetches the current state of the data source.
*
* @returns {Promise<Result<BlockStateModel>>} - The result of the operation.
* @returns {Promise<Result<BlockStateEntity>>} - The result of the operation.
*/
public async getState(): Promise<Result<BlockStateModel>> {
public async getState(): Promise<Result<BlockStateEntity>> {
try {
const { content: states } = await this.find();

if (states) {
const state = states[0];
const { lastModifiedTimestamp, actions, tables, blockNumber } = state;

return Result.withContent({
lastModifiedTimestamp: lastModifiedTimestamp || new Date(),
actions: actions || [],
tables: tables || [],
blockNumber: blockNumber || 0n,
});
}
const state = states[0];
const { lastModifiedTimestamp, actions, tables, blockNumber } = state;

return Result.withContent({
lastModifiedTimestamp: new Date(),
actions: [],
tables: [],
blockNumber: 0n,
lastModifiedTimestamp,
actions,
tables,
blockNumber,
});
} catch (error) {
return Result.withFailure(Failure.fromError(error));
Expand Down
2 changes: 1 addition & 1 deletion src/common/block-state/block-state.types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export type BlockStateModel = {
export type BlockStateEntity = {
lastModifiedTimestamp: Date;
blockNumber: bigint;
actions: string[];
Expand Down
1 change: 0 additions & 1 deletion src/common/common.enums.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export enum Mode {
Default = 'default',
Replay = 'replay',
Test = 'test',
}
4 changes: 2 additions & 2 deletions src/common/common.errors.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export class UnknownModeError extends Error {
constructor(mode: string) {
super(`Unknown mode "${mode}"`);
constructor(mode: string, modes: string[]) {
super(`Unknown mode "${mode}". Use: ${modes.join(', ')}`);
}
}
Loading

0 comments on commit f2368d9

Please sign in to comment.