From 4dac1e5a0a517be45f555a98823cef03fec2f10a Mon Sep 17 00:00:00 2001 From: mtyszczak <mateusz.tyszczak@gmail.com> Date: Wed, 6 Nov 2024 12:22:30 +0100 Subject: [PATCH] Rewrite queen bee interface to allow for any blockchain struct listening --- __tests__/assets/jest-helper.ts | 2 +- __tests__/detailed/bot_events.ts | 200 ++++++---- eslint.config.js | 20 +- package.json | 5 +- pnpm-lock.yaml | 21 -- src/bot.ts | 159 ++++---- .../classifiers/account-classifier.ts | 40 ++ .../classifiers/block-classifier.ts | 16 + .../classifiers/block-header-classifier.ts | 12 + .../classifiers/collector-classifier-base.ts | 21 ++ .../dynamic-global-properties-classifier.ts | 12 + .../impacted-account-classifier.ts | 14 + src/chain-observers/classifiers/index.ts | 7 + .../classifiers/operation-classifier.ts | 22 ++ .../classifiers/rc-account-classifier.ts | 18 + .../collectors/collector-base.ts | 52 +++ .../common/block-header-collector.ts | 23 ++ .../common/impacted-account-collector.ts | 43 +++ .../collectors/common/operation-collector.ts | 53 +++ .../collectors/jsonrpc/account-collector.ts | 100 +++++ .../collectors/jsonrpc/block-collector.ts | 39 ++ .../dynamic-global-properties-collector.ts | 22 ++ .../jsonrpc/rc-account-collector.ts | 51 +++ .../factories/data-evaluation-context.ts | 45 +++ src/chain-observers/factories/factory-base.ts | 58 +++ .../factories/jsonrpc/factory-data.ts | 24 ++ .../factories/jsonrpc/factory.ts | 13 + .../filters/account-created-filter.ts | 34 ++ .../filters/account-full-manabar-filter.ts | 38 ++ .../filters/account-metadata-change-filter.ts | 49 +++ .../filters/balance-change-filter.ts | 64 ++++ src/chain-observers/filters/block-filter.ts | 26 ++ .../filters/composite-filter.ts | 75 ++++ .../filters/custom-operation-filter.ts | 34 ++ src/chain-observers/filters/filter-base.ts | 13 + src/chain-observers/filters/follow-filter.ts | 34 ++ .../filters/impacted-account-filter.ts | 26 ++ .../filters/new-block-filter.ts | 24 ++ src/chain-observers/filters/post-filter.ts | 34 ++ src/chain-observers/filters/post-mention.ts | 31 ++ src/chain-observers/filters/reblog-filter.ts | 34 ++ .../filters/transaction-id-filter.ts | 26 ++ src/chain-observers/filters/vote-filter.ts | 37 ++ .../filters/whale-alert-filter.ts | 59 +++ src/chain-observers/observer-mediator.ts | 79 ++++ .../providers/account-provider.ts | 40 ++ .../providers/block-header-provider.ts | 26 ++ .../providers/block-provider.ts | 30 ++ .../providers/mention-provider.ts | 45 +++ .../providers/provider-base.ts | 8 + .../providers/rc-account-provider.ts | 39 ++ .../providers/transaction-provider.ts | 40 ++ .../providers/whale-alert-provider.ts | 103 +++++ src/errors.ts | 7 +- src/interfaces.ts | 96 +---- src/queen.ts | 356 +++++++++--------- src/types/iterator.ts | 30 ++ src/types/subscribable.ts | 35 ++ src/utils/assets.ts | 5 + src/utils/mention.ts | 22 ++ 60 files changed, 2223 insertions(+), 468 deletions(-) create mode 100644 src/chain-observers/classifiers/account-classifier.ts create mode 100644 src/chain-observers/classifiers/block-classifier.ts create mode 100644 src/chain-observers/classifiers/block-header-classifier.ts create mode 100644 src/chain-observers/classifiers/collector-classifier-base.ts create mode 100644 src/chain-observers/classifiers/dynamic-global-properties-classifier.ts create mode 100644 src/chain-observers/classifiers/impacted-account-classifier.ts create mode 100644 src/chain-observers/classifiers/index.ts create mode 100644 src/chain-observers/classifiers/operation-classifier.ts create mode 100644 src/chain-observers/classifiers/rc-account-classifier.ts create mode 100644 src/chain-observers/collectors/collector-base.ts create mode 100644 src/chain-observers/collectors/common/block-header-collector.ts create mode 100644 src/chain-observers/collectors/common/impacted-account-collector.ts create mode 100644 src/chain-observers/collectors/common/operation-collector.ts create mode 100644 src/chain-observers/collectors/jsonrpc/account-collector.ts create mode 100644 src/chain-observers/collectors/jsonrpc/block-collector.ts create mode 100644 src/chain-observers/collectors/jsonrpc/dynamic-global-properties-collector.ts create mode 100644 src/chain-observers/collectors/jsonrpc/rc-account-collector.ts create mode 100644 src/chain-observers/factories/data-evaluation-context.ts create mode 100644 src/chain-observers/factories/factory-base.ts create mode 100644 src/chain-observers/factories/jsonrpc/factory-data.ts create mode 100644 src/chain-observers/factories/jsonrpc/factory.ts create mode 100644 src/chain-observers/filters/account-created-filter.ts create mode 100644 src/chain-observers/filters/account-full-manabar-filter.ts create mode 100644 src/chain-observers/filters/account-metadata-change-filter.ts create mode 100644 src/chain-observers/filters/balance-change-filter.ts create mode 100644 src/chain-observers/filters/block-filter.ts create mode 100644 src/chain-observers/filters/composite-filter.ts create mode 100644 src/chain-observers/filters/custom-operation-filter.ts create mode 100644 src/chain-observers/filters/filter-base.ts create mode 100644 src/chain-observers/filters/follow-filter.ts create mode 100644 src/chain-observers/filters/impacted-account-filter.ts create mode 100644 src/chain-observers/filters/new-block-filter.ts create mode 100644 src/chain-observers/filters/post-filter.ts create mode 100644 src/chain-observers/filters/post-mention.ts create mode 100644 src/chain-observers/filters/reblog-filter.ts create mode 100644 src/chain-observers/filters/transaction-id-filter.ts create mode 100644 src/chain-observers/filters/vote-filter.ts create mode 100644 src/chain-observers/filters/whale-alert-filter.ts create mode 100644 src/chain-observers/observer-mediator.ts create mode 100644 src/chain-observers/providers/account-provider.ts create mode 100644 src/chain-observers/providers/block-header-provider.ts create mode 100644 src/chain-observers/providers/block-provider.ts create mode 100644 src/chain-observers/providers/mention-provider.ts create mode 100644 src/chain-observers/providers/provider-base.ts create mode 100644 src/chain-observers/providers/rc-account-provider.ts create mode 100644 src/chain-observers/providers/transaction-provider.ts create mode 100644 src/chain-observers/providers/whale-alert-provider.ts create mode 100644 src/types/iterator.ts create mode 100644 src/types/subscribable.ts create mode 100644 src/utils/assets.ts create mode 100644 src/utils/mention.ts diff --git a/__tests__/assets/jest-helper.ts b/__tests__/assets/jest-helper.ts index cff9630..5c6344e 100644 --- a/__tests__/assets/jest-helper.ts +++ b/__tests__/assets/jest-helper.ts @@ -30,13 +30,13 @@ const envTestFor = <GlobalType extends IWorkerBeeGlobals>( const runner = async<R, Args extends any[]>(checkEqual: boolean, fn: TWorkerBeeTestCallable<R, Args>, ...args: Args): Promise<R> => { + let nodeData = await fn(await (globalFunction as (...args: any[]) => any)("node"), ...args); const webData = await page.evaluate(async({ args: pageArgs, globalFunction: globalFn, webFn }) => { /* eslint-disable no-eval */ eval(`window.webEvalFn = ${webFn};`); return (window as Window & typeof globalThis & { webEvalFn: (...args: any[]) => any }).webEvalFn(await globalThis[globalFn]("web"), ...pageArgs); }, { args, globalFunction: globalFunction.name, webFn: fn.toString() }); - let nodeData = await fn(await (globalFunction as (...args: any[]) => any)("node"), ...args); if(typeof nodeData === "object") // Remove prototype data from the node result to match webData nodeData = JSON.parse(JSON.stringify(nodeData)); diff --git a/__tests__/detailed/bot_events.ts b/__tests__/detailed/bot_events.ts index c436322..b853217 100644 --- a/__tests__/detailed/bot_events.ts +++ b/__tests__/detailed/bot_events.ts @@ -1,10 +1,12 @@ /* eslint-disable no-console */ + + +/* eslint-disable @typescript-eslint/no-unused-vars */ import type { ApiAccount } from "@hiveio/wax"; import { expect } from "@playwright/test"; import { ChromiumBrowser, ConsoleMessage, chromium } from "playwright"; import type { IStartConfiguration } from "../../src/bot"; -import type { IBlockData } from "../../src/interfaces"; import { test } from "../assets/jest-helper"; @@ -27,6 +29,14 @@ test.describe("WorkerBee Bot events test", () => { await page.goto("http://localhost:8080/__tests__/assets/test.html", { waitUntil: "load" }); }); + test("Should have a destroyable global module", async({ workerbeeTest }) => { + await workerbeeTest(({ WorkerBee }) => { + const bot = new WorkerBee(); + + bot.delete(); + }); + }); + test("Allow to pass explicit chain", async({ workerbeeTest }) => { const explicitChainTest = await workerbeeTest(async({ WorkerBee }) => { @@ -41,7 +51,7 @@ test.describe("WorkerBee Bot events test", () => { // Call start just to initialize chain member in WorkerBee object. await chainOwner.start(); // Stop does not affect chain property, so we can avoid making ineffective api calls. - await chainOwner.stop(); + chainOwner.stop(); const localChain = chainOwner.chain; @@ -52,9 +62,9 @@ test.describe("WorkerBee Bot events test", () => { // Validate endpoints to easily check that instances match const validChainInstance = bot.chain !== undefined && localChain !== undefined && bot.chain.endpointUrl === localChain.endpointUrl; - await bot.delete(); + bot.delete(); - await chainOwner.delete(); + chainOwner.delete(); return validChainInstance; }); @@ -62,55 +72,26 @@ test.describe("WorkerBee Bot events test", () => { expect(explicitChainTest).toEqual(true); }); - test("Should have a destroyable global module", async({ workerbeeTest }) => { - await workerbeeTest(async({ WorkerBee }) => { - const bot = new WorkerBee(); - - await bot.delete(); - }); - }); - - test("Should call proper events", async({ workerbeeTest }) => { - const result = await workerbeeTest(async({ WorkerBee }) => { - const bot = new WorkerBee(); - bot.on("error", console.error); - - let handlersCalled = 0; - - bot.on("start", () => { ++handlersCalled; }); - bot.on("stop", () => { ++handlersCalled; }); - - await bot.start(); - await bot.stop(); - - return handlersCalled; - }); - - expect(result).toStrictEqual(2); - }); - - test("Should be able to parse at least 2 blocks from the remote", async({ workerbeeTest }) => { + test("Should be able to parse at least 2 blocks from the remote using block observer", async({ workerbeeTest }) => { const result = await workerbeeTest.dynamic(async({ WorkerBee }, hiveBlockInterval) => { const bot = new WorkerBee(); - bot.on("error", console.error); let blocksParsed = 0; - bot.on("block", ({ block, number }) => { - console.info(`Got block #${block.block_id} (${number})`); - ++blocksParsed; + const observer = bot.observe.onBlock().subscribe({ + next(data) { + console.info(`Got block #${data.block.number}`); + ++blocksParsed; + }, + error(err) { + console.error(err); + } }); await bot.start(); - await Promise.race([ - new Promise(res => { setTimeout(res, hiveBlockInterval * 4); }), - new Promise<void>(res => { - bot.on("stop", res); - }) - ]); + await new Promise(res => { setTimeout(res, hiveBlockInterval * 2); }); - await bot.stop(); - await bot.delete(); + observer.unsubscribe(); return blocksParsed; }, HIVE_BLOCK_INTERVAL); @@ -121,7 +102,6 @@ test.describe("WorkerBee Bot events test", () => { test("Should be able to use async iterator on bot", async({ workerbeeTest }) => { const result = await workerbeeTest.dynamic(async({ WorkerBee }, hiveBlockInterval) => { const bot = new WorkerBee(); - bot.on("error", console.error); let blocksParsed = 0; @@ -130,8 +110,8 @@ test.describe("WorkerBee Bot events test", () => { new Promise<void>(async res => { await bot.start(); - for await(const { block, number } of bot) { - console.info(`Got block #${block.block_id} (${number})`); + for await(const { number } of bot) { + console.info(`Got block #${number}`); ++blocksParsed; if(blocksParsed > 1) @@ -140,11 +120,11 @@ test.describe("WorkerBee Bot events test", () => { res(); }), - new Promise(res => { setTimeout(res, hiveBlockInterval * 4); }) + new Promise((_, rej) => { setTimeout(rej, hiveBlockInterval * 2, new Error("Test timeout")); }) ]); - await bot.stop(); - await bot.delete(); + bot.stop(); + bot.delete(); return blocksParsed; }, HIVE_BLOCK_INTERVAL); @@ -152,67 +132,127 @@ test.describe("WorkerBee Bot events test", () => { expect(result).toBeGreaterThanOrEqual(1); }); - test("Should be able to use block observer", async({ workerbeeTest }) => { + test("Should be able to use full manabar regeneration time observer", async({ workerbeeTest }) => { + const result = await workerbeeTest(async({ WorkerBee }, hiveBlockInterval) => { + const bot = new WorkerBee(); + + const result = await Promise.race([ + new Promise<string>(res => { + bot.start(); + + console.info("Waiting for full manabar regeneration on initminer"); + + const observer = bot.observe.onAccountFullManabar("initminer").provideRcAccounts("initminer"); + observer.subscribe({ + next(data) { + console.info(`Account has full manabar: ${data.rcAccounts["initminer"].manabar.currentMana}`); + + res(data.rcAccounts["initminer"].manabar.currentMana); + }, + error(err) { + console.error(err); + } + }); + }), + new Promise<string>((_, rej) => { setTimeout(rej, hiveBlockInterval * 2, new Error("Test timeout")); }) + ]); + + bot.stop(); + bot.delete(); + + return result; + }, HIVE_BLOCK_INTERVAL); + + expect(result.length).toBeGreaterThan(0); + }); + + test("Should be able to evaluate or condition in first statement", async({ workerbeeTest }) => { await workerbeeTest(async({ WorkerBee }, hiveBlockInterval) => { const bot = new WorkerBee(); - bot.on("error", console.error); await Promise.race([ - /* eslint-disable-next-line no-async-promise-executor */ - new Promise<void>(async res => { - await bot.start(); - - const block = await new Promise(blockResolve => { - bot.once("block", blockResolve); - }) as IBlockData; + new Promise<void>(res => { + bot.start(); - console.info(`Waiting for block: #${block.number + 1}`); - const observer = bot.observe.block(block.number + 1); + const observer = bot.observe.onAccountFullManabar("initminer").or.onBlockNumber(1); observer.subscribe({ next() { - console.info("Block detected"); - res(); + }, + error(err) { + console.error(err); } }); }), - new Promise(res => { setTimeout(res, hiveBlockInterval * 4); }) + new Promise<void>((_, rej) => { setTimeout(rej, hiveBlockInterval * 2, new Error("Test timeout")); }) ]); - await bot.stop(); - await bot.delete(); + bot.stop(); + bot.delete(); }, HIVE_BLOCK_INTERVAL); }); - test("Should be able to use full manabar regeneration time observer", async({ workerbeeTest }) => { + test("Should be able to evaluate or condition in second statement", async({ workerbeeTest }) => { await workerbeeTest(async({ WorkerBee }, hiveBlockInterval) => { const bot = new WorkerBee(); - bot.on("error", console.error); await Promise.race([ - /* eslint-disable-next-line no-async-promise-executor */ - new Promise<void>(async res => { - await bot.start(); - - console.info("Waiting for full manabar regeneration on initminer"); + new Promise<void>(res => { + bot.start(); - const observer = bot.observe.accountFullManabar("initminer"); + const observer = bot.observe.onBlockNumber(1).or.onAccountFullManabar("initminer"); observer.subscribe({ - next(acc: ApiAccount) { - console.info(`Account has full manabar: ${acc.voting_manabar.current_mana}`); - + next() { res(); + }, + error(err) { + console.error(err); } }); }), - new Promise(res => { setTimeout(res, hiveBlockInterval * 4); }) + new Promise<void>((_, rej) => { setTimeout(rej, hiveBlockInterval * 2, new Error("Test timeout")); }) ]); - await bot.stop(); - await bot.delete(); + bot.stop(); + bot.delete(); }, HIVE_BLOCK_INTERVAL); }); + test("Should call next() only once when all or statements evaluate to true", async({ workerbeeTest }) => { + const result = await workerbeeTest(async({ WorkerBee }, hiveBlockInterval) => { + const bot = new WorkerBee(); + + let calls = 0; + let res: () => void; + + await Promise.race([ + new Promise<void>(_res => { + res = _res; + bot.start(); + + const observer = bot.observe.onBlock().or.onBlock().or.onBlock().or.onBlock().or.onBlock(); + observer.subscribe({ + next() { + ++calls; + }, + error(err) { + console.error(err); + } + }); + }), + new Promise<void>(() => { setTimeout(res, hiveBlockInterval * 2); }) + ]); + + bot.stop(); + bot.delete(); + + return calls; + }, HIVE_BLOCK_INTERVAL); + + // We accept 3 block events triggerred at most - 2 block intervals + 1 for any possible race condition + expect(result).toBeLessThanOrEqual(3); + }); + test.afterAll(async() => { await browser.close(); }); diff --git a/eslint.config.js b/eslint.config.js index 3f2059b..c456e66 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -20,6 +20,7 @@ export default [ { ignores: [ "**/node_modules", "**/dist", + "**/build", "examples" ] }, ...compat.extends("eslint:recommended"), { @@ -37,6 +38,7 @@ export default [ { }, rules: { + "no-empty-pattern": 0, "require-atomic-updates": 1, "no-unused-private-class-members": 1, "capitalized-comments": 1, @@ -63,7 +65,7 @@ export default [ { "id-length": [ 2, { min: 2, max: 30, - exceptionPatterns: [ "[ei-l]" ] + exceptionPatterns: [ "[_ei-l]" ] } ], "eol-last": 1, "no-multiple-empty-lines": 1, @@ -121,21 +123,13 @@ export default [ { varsIgnorePattern: "^_" } ], "@typescript-eslint/explicit-member-accessibility": 2, - "@typescript-eslint/explicit-function-return-type": 2, + "@typescript-eslint/explicit-function-return-type": 0, "@typescript-eslint/no-non-null-assertion": 0, "@typescript-eslint/no-inferrable-types": 0, "@typescript-eslint/no-explicit-any": 0, - "@typescript-eslint/typedef": [ 2, { - arrayDestructuring: false, - objectDestructuring: false, - arrowParameter: false, - memberVariableDeclaration: true, - parameter: true, - propertyDeclaration: true, - variableDeclaration: false, - variableDeclarationIgnoreFunction: true - } ], + "@typescript-eslint/typedef": 0, "@typescript-eslint/no-extra-semi": 0, - "@typescript-eslint/no-empty-function": 0 + "@typescript-eslint/no-empty-function": 0, + "@typescript-eslint/no-empty-object-type": 0 } } ]; diff --git a/package.json b/package.json index 8a05cb3..0818c8c 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,6 @@ "@playwright/test": "^1.39.0", "@rollup/plugin-commonjs": "^27.0.0", "@rollup/plugin-node-resolve": "^15.3.0", - "@types/events": "^3.0.3", "@types/node": "^18.19.50", "@typescript-eslint/eslint-plugin": "^8.13.0", "@typescript-eslint/parser": "^8.13.0", @@ -67,9 +66,7 @@ }, "dependencies": { "@hiveio/beekeeper": "1.27.6-rc4", - "@hiveio/wax": "1.27.6-rc6", - "events": "^3.3.0", - "rxjs": "^7.8.1" + "@hiveio/wax": "1.27.6-rc6" }, "repository": { "type": "git", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4c82a3e..a155b73 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -14,12 +14,6 @@ importers: '@hiveio/wax': specifier: 1.27.6-rc6 version: 1.27.6-rc6 - events: - specifier: ^3.3.0 - version: 3.3.0 - rxjs: - specifier: ^7.8.1 - version: 7.8.1 devDependencies: '@eslint/compat': specifier: ^1.2.2 @@ -39,9 +33,6 @@ importers: '@rollup/plugin-node-resolve': specifier: ^15.3.0 version: 15.3.0(rollup@4.24.4) - '@types/events': - specifier: ^3.0.3 - version: 3.0.3 '@types/node': specifier: ^18.19.50 version: 18.19.50 @@ -460,9 +451,6 @@ packages: '@types/estree@1.0.6': resolution: {integrity: sha512-AYnb1nQyY49te+VRAVgmzfcgjYS91mY5P0TKUDCLEM+gNnA+3T6rWITXRLYCpahpqSQbN5cE+gHpnPyXjHWxcw==} - '@types/events@3.0.3': - resolution: {integrity: sha512-trOc4AAUThEz9hapPtSd7wf5tiQKvTtu5b371UxXdTuqzIh0ArcRspRP0i0Viu+LXstIQ1z96t1nsPxT9ol01g==} - '@types/hast@3.0.4': resolution: {integrity: sha512-WPs+bbQw5aCj+x6laNGWLH3wviHtoCv/P3+otBhbOhJgG8qtpdAMlTCxLtsTWA7LH1Oh/bFCHsBn0TPS5m30EQ==} @@ -2142,9 +2130,6 @@ packages: run-parallel@1.2.0: resolution: {integrity: sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==} - rxjs@7.8.1: - resolution: {integrity: sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==} - safe-array-concat@1.0.1: resolution: {integrity: sha512-6XbUAseYE2KtOuGueyeobCySj9L4+66Tn6KQMOPQJrAJEowYKW/YR/MGJZl7FdydUdaFu4LYyDZjxf4/Nmo23Q==} engines: {node: '>=0.4'} @@ -2986,8 +2971,6 @@ snapshots: '@types/estree@1.0.6': {} - '@types/events@3.0.3': {} - '@types/hast@3.0.4': dependencies: '@types/unist': 3.0.3 @@ -5014,10 +4997,6 @@ snapshots: dependencies: queue-microtask: 1.2.3 - rxjs@7.8.1: - dependencies: - tslib: 2.6.2 - safe-array-concat@1.0.1: dependencies: call-bind: 1.0.5 diff --git a/src/bot.ts b/src/bot.ts index 9f51930..db4aef0 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -1,11 +1,13 @@ -import EventEmitter from "events"; import type { IBeekeeperOptions, IBeekeeperUnlockedWallet } from "@hiveio/beekeeper"; -import { calculateExpiration, IWaxOptionsChain, IHiveChainInterface, TWaxExtended, ITransaction, ApiTransaction } from "@hiveio/wax"; -import type { Subscribable } from "rxjs"; +import { calculateExpiration, IWaxOptionsChain, IHiveChainInterface, TWaxExtended, ITransaction, ApiTransaction, transaction } from "@hiveio/wax"; +import { IBlockData } from "./chain-observers/classifiers/block-classifier"; +import { IBlockHeaderData } from "./chain-observers/classifiers/block-header-classifier"; +import { ObserverMediator } from "./chain-observers/observer-mediator"; import { WorkerBeeError } from "./errors"; -import type { IWorkerBee, IBlockData, ITransactionData, IBroadcastOptions } from "./interfaces"; +import type { IWorkerBee, IBroadcastOptions } from "./interfaces"; import { QueenBee } from "./queen"; +import type { Subscribable } from "./types/subscribable"; import { getWax, WaxExtendTypes } from "./wax"; const ONE_MINUTE = 1000 * 60; @@ -38,39 +40,38 @@ export const DEFAULT_WORKERBEE_OPTIONS = { chainOptions: {} }; -export const DEFAULT_BLOCK_INTERVAL_TIMEOUT = 1500; - -export class WorkerBee extends EventEmitter implements IWorkerBee { - public running: boolean = false; +export const DEFAULT_BLOCK_INTERVAL_TIMEOUT_MS = 2000; +export class WorkerBee implements IWorkerBee { public readonly configuration: IStartConfiguration; public chain?: TWaxExtended<typeof WaxExtendTypes>; private wallet?: IBeekeeperUnlockedWallet; - private headBlockNumber: number = 0; + private intervalId: NodeJS.Timeout | undefined = undefined; + + public get running() { + return this.intervalId !== undefined; + } + + public get observe() { + return new QueenBee(this); + } - public readonly observe: QueenBee = new QueenBee(this); + public mediator = new ObserverMediator(this); public constructor( configuration: IStartConfiguration = {} ) { - super(); - this.configuration = { ...DEFAULT_WORKERBEE_OPTIONS, ...configuration }; if(typeof configuration.explicitChain !== "undefined" && typeof configuration.chainOptions !== "undefined") throw new WorkerBeeError("explicitChain and chainOptions parameters are exclusive"); - - // When halt is requested, indicate we are not going to do the task again - super.on("halt", () => { - this.running = false; - }); } - public async broadcast(tx: ApiTransaction | ITransaction, options: IBroadcastOptions = {}): Promise<Subscribable<ITransactionData>> { + public async broadcast(tx: ApiTransaction | ITransaction, options: IBroadcastOptions = {}): Promise<Subscribable<transaction>> { const toBroadcast: ApiTransaction = "toApiJson" in tx ? tx.toApiJson() as ApiTransaction : tx as ApiTransaction; if(toBroadcast.signatures.length === 0) @@ -92,96 +93,76 @@ export class WorkerBee extends EventEmitter implements IWorkerBee { const apiTx = this.chain!.createTransactionFromJson(toBroadcast); - return this.observe.transaction(apiTx.id, expireDate.getTime()); + return { + subscribe: observer => { + const listener = this.observe.onTransactionId(apiTx.id).subscribe({ + next(val) { + observer.next?.(val.transactions[apiTx.id]!); + }, + error(val) { + observer.error?.(val); + }, + complete() { + observer.complete?.(); + } + }); + const timeoutId = setTimeout(() => { + listener.unsubscribe(); + observer.error?.(new WorkerBeeError(`Transaction ${apiTx.id} has expired`)); + }, expireDate.getTime() - Date.now()); + + return { + unsubscribe: () => { + clearTimeout(timeoutId); + listener.unsubscribe(); + } + } + } + }; } public async start(wallet?: IBeekeeperUnlockedWallet): Promise<void> { // Initialize chain and beekepeer if required - if(typeof this.chain === "undefined") { + if(typeof this.chain === "undefined") this.chain = await getWax(this.configuration.explicitChain, this.configuration.chainOptions); - ({ head_block_number: this.headBlockNumber } = await this.chain.api.database_api.get_dynamic_global_properties({})); - } - if(typeof this.wallet === "undefined") this.wallet = wallet; - // Ensure the app is not running - await this.stop(); - - // Do the first task and run the app - this.running = true; - super.emit("start"); - - this.doTask(); - } + this.stop(); - public async *[Symbol.asyncIterator](): AsyncIterator<IBlockData> { - while(this.running) - yield await new Promise(res => { - super.once("block", res); - }); + setInterval(() => { + this.mediator.notify(); + }, DEFAULT_BLOCK_INTERVAL_TIMEOUT_MS); } - public async doTask(): Promise<void> { - try { - // Get the head block, but wait at least DEFAULT_BLOCK_INTERVAL_TIMEOUT ms - const [ { block } ] = await Promise.all([ - this.chain!.api.block_api.get_block({ block_num: this.headBlockNumber }), - new Promise(res => { setTimeout(res, DEFAULT_BLOCK_INTERVAL_TIMEOUT); }) - ]); - - if(typeof block === "object") { - const blockData = { - number: this.headBlockNumber, - block - }; - - super.emit("block", blockData); - - for(let i = 0; i < block.transaction_ids.length; ++i) - super.emit("transaction", { - id: block.transaction_ids[i], - transaction: block.transactions[i], - block: blockData + public [Symbol.asyncIterator](): AsyncIterator<IBlockData & IBlockHeaderData> { + // TODO: Optimize this + return { + next: (): Promise<IteratorResult<IBlockData & IBlockHeaderData, void>> => { + return new Promise(res => { + const listener = this.observe.onBlock().provideBlockData().subscribe({ + next: block => { + listener.unsubscribe(); + res({ value: block.block, done: false }); + } }); - - ++this.headBlockNumber; - } // Else -> no new block - } catch (error) { - // Ensure we are emitting the Error instance - if (super.listenerCount("error") > 0) - super.emit("error", new WorkerBeeError(`Error occurred during automation: ${String(error)}`, error)); - - // Wait before any next operation is performed to reduce spamming the API - await new Promise(res => { setTimeout(res, DEFAULT_BLOCK_INTERVAL_TIMEOUT); }); - } finally { - // Do the task if running - if(this.running) - this.doTask(); - else // Inform about the application stop otherwise - super.emit("stop"); - } + }); + } + }; } - public stop(): Promise<void> { - return new Promise<void>(res => { - if(!this.running) - res(); - - // Request application stop - super.emit("halt"); + public stop(): void { + if(!this.running) + return; - // Wait for the stop and resolve - super.once("stop", res); - }); + clearTimeout(this.intervalId); } - public async delete(): Promise<void> { - // This function actually allows you to actually reset the bot instance - await this.stop(); + public delete(): void { + this.stop(); - super.removeAllListeners(); + this.mediator.unregisterAllListeners(); if(typeof this.configuration.explicitChain === "undefined") this.chain?.delete(); diff --git a/src/chain-observers/classifiers/account-classifier.ts b/src/chain-observers/classifiers/account-classifier.ts new file mode 100644 index 0000000..17d14c3 --- /dev/null +++ b/src/chain-observers/classifiers/account-classifier.ts @@ -0,0 +1,40 @@ +import type { asset } from "@hiveio/wax"; +import { CollectorClassifierBase } from "./collector-classifier-base"; + +export interface IHiveAssetDetailedBalance { + liquid: asset; + unclaimed: asset; + total: asset; +} + + +export interface IHiveAssetWithSavingsDetailedBalance extends IHiveAssetDetailedBalance { + savings: asset; +} + +export interface IHiveHPAssetDetailedBalance extends IHiveAssetDetailedBalance { + delegated: asset; + received: asset; + poweringDown: asset; +} + +export interface IAccountBalance { + HBD: IHiveAssetWithSavingsDetailedBalance; + HIVE: IHiveAssetWithSavingsDetailedBalance; + HP: IHiveHPAssetDetailedBalance; +} + +export interface IAccount { + name: string; + postingJsonMetadata: Record<string, any>; + jsonMetadata: Record<string, any>; + balance: IAccountBalance; +} + +export interface IAccountData { + accounts: Record<string, IAccount>; +} + +export class AccountClassifier extends CollectorClassifierBase { + public type!: IAccountData; +} diff --git a/src/chain-observers/classifiers/block-classifier.ts b/src/chain-observers/classifiers/block-classifier.ts new file mode 100644 index 0000000..e6f8ebf --- /dev/null +++ b/src/chain-observers/classifiers/block-classifier.ts @@ -0,0 +1,16 @@ +import type { transaction } from "@hiveio/wax"; +import { CollectorClassifierBase } from "./collector-classifier-base"; + +export interface ITransactionData { + transaction: transaction; + id: string; +} + +export interface IBlockData { + transactions: ITransactionData[]; + transactionsPerId: Map<string, transaction>; +} + +export class BlockClassifier extends CollectorClassifierBase { + public type!: IBlockData; +} diff --git a/src/chain-observers/classifiers/block-header-classifier.ts b/src/chain-observers/classifiers/block-header-classifier.ts new file mode 100644 index 0000000..3b4298f --- /dev/null +++ b/src/chain-observers/classifiers/block-header-classifier.ts @@ -0,0 +1,12 @@ +import { CollectorClassifierBase } from "./collector-classifier-base"; + +export interface IBlockHeaderData { + timestamp: Date; + witness: string; + number: number; + id: string; +} + +export class BlockHeaderClassifier extends CollectorClassifierBase { + public type!: IBlockHeaderData; +} diff --git a/src/chain-observers/classifiers/collector-classifier-base.ts b/src/chain-observers/classifiers/collector-classifier-base.ts new file mode 100644 index 0000000..3081987 --- /dev/null +++ b/src/chain-observers/classifiers/collector-classifier-base.ts @@ -0,0 +1,21 @@ +export interface IEvaluationContextClass { + name: string; + + new(): CollectorClassifierBase; +} + +export type TRegisterEvaluationContext = IEvaluationContextClass | { + options?: Record<string, any>; + class: IEvaluationContextClass; +} + +export class CollectorClassifierBase { + public type!: any; + + public static forOptions(options?: Record<string, any>): TRegisterEvaluationContext { + return { + class: this, + options + }; + } +} diff --git a/src/chain-observers/classifiers/dynamic-global-properties-classifier.ts b/src/chain-observers/classifiers/dynamic-global-properties-classifier.ts new file mode 100644 index 0000000..04de60f --- /dev/null +++ b/src/chain-observers/classifiers/dynamic-global-properties-classifier.ts @@ -0,0 +1,12 @@ +import { CollectorClassifierBase } from "./collector-classifier-base"; + +export interface IDynamicGlobalPropertiesData { + headBlockNumber: number; + currentWitness: string; + headBlockTime: Date; + headBlockId: string; +} + +export class DynamicGlobalPropertiesClassifier extends CollectorClassifierBase { + public type!: IDynamicGlobalPropertiesData; +} diff --git a/src/chain-observers/classifiers/impacted-account-classifier.ts b/src/chain-observers/classifiers/impacted-account-classifier.ts new file mode 100644 index 0000000..2da36d4 --- /dev/null +++ b/src/chain-observers/classifiers/impacted-account-classifier.ts @@ -0,0 +1,14 @@ +import { CollectorClassifierBase } from "./collector-classifier-base"; +import type { IOperationBaseData } from "./operation-classifier"; + +export interface IImpactedAccount extends IOperationBaseData { + name: string; +} + +export interface IImpactedAccountData { + impactedAccounts: Record<string, IImpactedAccount>; +} + +export class ImpactedAccountClassifier extends CollectorClassifierBase { + public type!: IImpactedAccountData; +} diff --git a/src/chain-observers/classifiers/index.ts b/src/chain-observers/classifiers/index.ts new file mode 100644 index 0000000..d70374d --- /dev/null +++ b/src/chain-observers/classifiers/index.ts @@ -0,0 +1,7 @@ +export { BlockHeaderClassifier } from "./block-header-classifier"; +export { DynamicGlobalPropertiesClassifier } from "./dynamic-global-properties-classifier"; +export { BlockClassifier } from "./block-classifier"; +export { AccountClassifier } from "./account-classifier"; +export { RcAccountClassifier } from "./rc-account-classifier"; +export { ImpactedAccountClassifier } from "./impacted-account-classifier"; +export { OperationClassifier } from "./operation-classifier"; diff --git a/src/chain-observers/classifiers/operation-classifier.ts b/src/chain-observers/classifiers/operation-classifier.ts new file mode 100644 index 0000000..5a0c92b --- /dev/null +++ b/src/chain-observers/classifiers/operation-classifier.ts @@ -0,0 +1,22 @@ +import type { operation } from "@hiveio/wax"; +import { ITransactionData } from "./block-classifier"; +import { CollectorClassifierBase } from "./collector-classifier-base"; + +export interface IOperationTransactionPair<OpType = operation> { + operation: OpType; + transaction: ITransactionData; +} + +export interface IOperationBaseData { + operations: Iterable<IOperationTransactionPair>; +} + +export interface IOperationData extends IOperationBaseData { + operationsPerType: { + [key in keyof operation]: Iterable<IOperationTransactionPair<Exclude<operation[key], undefined>>>; + }; +} + +export class OperationClassifier extends CollectorClassifierBase { + public type!: IOperationData; +} diff --git a/src/chain-observers/classifiers/rc-account-classifier.ts b/src/chain-observers/classifiers/rc-account-classifier.ts new file mode 100644 index 0000000..d703c83 --- /dev/null +++ b/src/chain-observers/classifiers/rc-account-classifier.ts @@ -0,0 +1,18 @@ +import { CollectorClassifierBase } from "./collector-classifier-base"; + +export interface IRcAccount { + name: string; + manabar: { + currentMana: string; + lastUpdateTime: Date; + }; + maxRc: string; +} + +export interface IRcAccountData { + rcAccounts: Record<string, IRcAccount>; +} + +export class RcAccountClassifier extends CollectorClassifierBase { + public type!: IRcAccountData; +} diff --git a/src/chain-observers/collectors/collector-base.ts b/src/chain-observers/collectors/collector-base.ts new file mode 100644 index 0000000..5766924 --- /dev/null +++ b/src/chain-observers/collectors/collector-base.ts @@ -0,0 +1,52 @@ +import type { WorkerBee } from "../../bot"; + +import type * as TClassifiers from "../classifiers"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; + +export type TAvailableClassifiers = { + [key in keyof typeof TClassifiers]: InstanceType<(typeof TClassifiers)[key]>["type"]; +}; + +export abstract class CollectorBase { + public constructor( + protected readonly worker: WorkerBee + ) {} + + private registersCount = 0; + + /* + * We need to return the data in the format of { [classifierName]: { [key]: value } } so + * overriding can be properly deduced later by the data evaluation context + */ + public abstract fetchData(data: DataEvaluationContext): Promise<Partial<TAvailableClassifiers>>; + + /** + * If the collector uses any context, it should report it as dependency by overriding this function + */ + public usedContexts(): Array<TRegisterEvaluationContext> { + return []; + } + + // TODO: Overriding logic + + protected pushOptions?(data: Record<string, any>): void; + protected popOptions?(data: Record<string, any>): void; + + public get hasRegistered() { + return this.registersCount > 0; + } + + public register(data?: Record<string, any>) { + ++this.registersCount; + + if (data !== undefined) + this.pushOptions?.(data); + } + public unregister(data?: Record<string, any>) { + --this.registersCount; + + if (data !== undefined) + this.popOptions?.(data); + } +} diff --git a/src/chain-observers/collectors/common/block-header-collector.ts b/src/chain-observers/collectors/common/block-header-collector.ts new file mode 100644 index 0000000..0ec2ebd --- /dev/null +++ b/src/chain-observers/collectors/common/block-header-collector.ts @@ -0,0 +1,23 @@ +import { TRegisterEvaluationContext } from "../../classifiers/collector-classifier-base"; +import { DynamicGlobalPropertiesClassifier } from "../../classifiers/dynamic-global-properties-classifier"; +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export class BlockHeaderCollector extends CollectorBase { + public usedContexts(): Array<TRegisterEvaluationContext> { + return [DynamicGlobalPropertiesClassifier]; + } + + public async fetchData(data: DataEvaluationContext) { + const { headBlockNumber, currentWitness, headBlockTime, headBlockId } = await data.get(DynamicGlobalPropertiesClassifier); + + return { + BlockHeaderClassifier: { + number: headBlockNumber, + timestamp: headBlockTime, + witness: currentWitness, + id: headBlockId + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/collectors/common/impacted-account-collector.ts b/src/chain-observers/collectors/common/impacted-account-collector.ts new file mode 100644 index 0000000..73d7329 --- /dev/null +++ b/src/chain-observers/collectors/common/impacted-account-collector.ts @@ -0,0 +1,43 @@ +import { OperationClassifier } from "../../classifiers"; +import { TRegisterEvaluationContext } from "../../classifiers/collector-classifier-base"; +import { IImpactedAccount } from "../../classifiers/impacted-account-classifier"; +import { IOperationTransactionPair } from "../../classifiers/operation-classifier"; +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export class ImpactedAccountCollector extends CollectorBase { + public usedContexts(): Array<TRegisterEvaluationContext> { + return [OperationClassifier]; + } + + private ensureStructIntegrity(impactedAccounts: Record<string, IImpactedAccount>, accountName: string): Array<IOperationTransactionPair> { + let struct = impactedAccounts[accountName]; + + if (struct === undefined) + struct = { + name: accountName, + operations: [] + }; + + return struct.operations as IOperationTransactionPair[]; + } + + public async fetchData(data: DataEvaluationContext) { + const { operations } = await data.get(OperationClassifier); + + const impactedAccounts: Record<string, IImpactedAccount> = {}; + + for(const operation of operations) { + const impactedOperationAccounts = this.worker.chain!.operationGetImpactedAccounts(operation.operation); + + for(const accountName of impactedOperationAccounts) + this.ensureStructIntegrity(impactedAccounts, accountName).push(operation); + } + + return { + ImpactedAccountClassifier: { + impactedAccounts + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/collectors/common/operation-collector.ts b/src/chain-observers/collectors/common/operation-collector.ts new file mode 100644 index 0000000..a69185f --- /dev/null +++ b/src/chain-observers/collectors/common/operation-collector.ts @@ -0,0 +1,53 @@ +import { BlockClassifier } from "../../classifiers"; +import { TRegisterEvaluationContext } from "../../classifiers/collector-classifier-base"; +import { IOperationTransactionPair } from "../../classifiers/operation-classifier"; +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export class OperationCollector extends CollectorBase { + public usedContexts(): Array<TRegisterEvaluationContext> { + return [BlockClassifier]; + } + + public async fetchData(data: DataEvaluationContext) { + const { transactions } = await data.get(BlockClassifier); + + const operations = transactions.flatMap(transaction => + transaction.transaction.operations.map(operation => ({ + operation, + transaction + })) + ); + + const operationsPerType = operations.reduce((operations, { operation, transaction }) => { + let opType = ""; + for(const type in operation) + if(operation[type] !== undefined) { + opType = type; + break; + } + + if(operations[opType] === undefined) + operations[opType] = []; + + operations[opType].push({ + operation: operation[opType], + transaction + }); + + return operations; + }, {} as Record<string, Array<IOperationTransactionPair>>); + + /* + * XXX: Debugging code: + * console.log("-", Object.entries(operationsPerType).map(([type, operations]) => `${type}: ${operations.length}`).join("\n- ")); + */ + + return { + OperationClassifier: { + operations, + operationsPerType + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/collectors/jsonrpc/account-collector.ts b/src/chain-observers/collectors/jsonrpc/account-collector.ts new file mode 100644 index 0000000..316d98c --- /dev/null +++ b/src/chain-observers/collectors/jsonrpc/account-collector.ts @@ -0,0 +1,100 @@ +import { IAccount } from "../../classifiers/account-classifier"; +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export interface IAccountCollectorOptions { + account: string; +} + +const MAX_ACCOUNT_GET_LIMIT = 100; + +export class AccountCollector extends CollectorBase { + private readonly accounts: Record<string, number> = {}; + + protected pushOptions(data: IAccountCollectorOptions): void { + this.accounts[data.account] = (this.accounts[data.account] || 0) + 1; + } + + protected popOptions(data: IAccountCollectorOptions): void { + this.accounts[data.account] = (this.accounts[data.account] || 1) - 1; + + if (this.accounts[data.account] === 0) + delete this.accounts[data.account]; + } + + public async fetchData(_: DataEvaluationContext) { + const accounts: Record<string, IAccount> = {}; + + const tryParseJson = (data: string) => { + try { + return JSON.parse(data); + } catch { + return {}; + } + } + + const accountNames = Object.keys(this.accounts); + for (let i = 0; i < accountNames.length; i += MAX_ACCOUNT_GET_LIMIT) { + const chunk = accountNames.slice(i, i + MAX_ACCOUNT_GET_LIMIT); + + const { accounts: apiAccounts } = await this.worker.chain!.api.database_api.find_accounts({ accounts: chunk }); + + for(const account of apiAccounts) + accounts[account.name] = { + name: account.name, + postingJsonMetadata: tryParseJson(account.posting_json_metadata), + jsonMetadata: tryParseJson(account.json_metadata), + balance: { + HBD: { + liquid: account.hbd_balance, + unclaimed: account.reward_hbd_balance, + total: { + amount: ( BigInt(account.hbd_balance.amount) + + BigInt(account.reward_hbd_balance.amount) + ).toString(), + precision: account.hbd_balance.precision, + nai: account.hbd_balance.nai + }, + savings: account.savings_hbd_balance + }, + HIVE: { + liquid: account.balance, + unclaimed: account.reward_hive_balance, + total: { + amount: ( BigInt(account.balance.amount) + + BigInt(account.reward_hive_balance.amount) + + BigInt(account.savings_balance.amount) + ).toString(), + precision: account.balance.precision, + nai: account.balance.nai + }, + savings: account.savings_balance + }, + HP: { + liquid: account.vesting_shares, + unclaimed: account.reward_vesting_balance, + total: { + amount: ( BigInt(account.vesting_shares.amount) + + BigInt(account.reward_vesting_balance.amount) + + BigInt(account.delegated_vesting_shares.amount) + + BigInt(account.received_vesting_shares.amount) + + BigInt(account.vesting_withdraw_rate.amount) + ).toString(), + precision: account.vesting_shares.precision, + nai: account.vesting_shares.nai + }, + delegated: account.delegated_vesting_shares, + received: account.received_vesting_shares, + poweringDown: account.vesting_withdraw_rate + } + } + }; + } + + return { + AccountClassifier: { + accounts + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/collectors/jsonrpc/block-collector.ts b/src/chain-observers/collectors/jsonrpc/block-collector.ts new file mode 100644 index 0000000..3c24a23 --- /dev/null +++ b/src/chain-observers/collectors/jsonrpc/block-collector.ts @@ -0,0 +1,39 @@ +import { transaction } from "@hiveio/wax"; +import { WorkerBeeError } from "../../../errors"; +import { DynamicGlobalPropertiesClassifier } from "../../classifiers"; +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export class BlockCollector extends CollectorBase { + private currentHeadBlock = -1; + + private cachedBlockData!: TAvailableClassifiers["BlockClassifier"]; + + public async fetchData(data: DataEvaluationContext) { + const { headBlockNumber } = await data.get(DynamicGlobalPropertiesClassifier); + + if(this.currentHeadBlock === headBlockNumber) + return { + BlockClassifier: this.cachedBlockData + } satisfies Partial<TAvailableClassifiers>; + + this.currentHeadBlock = headBlockNumber; + + const { block } = await this.worker.chain!.api.block_api.get_block({ block_num: headBlockNumber }); + + if (block === undefined) + throw new WorkerBeeError(`Block ${headBlockNumber} is not available`); + + const transactions = block.transactions.map((tx, index) => ({ + id: block.transaction_ids[index], + transaction: this.worker.chain!.createTransactionFromJson(tx).transaction + })); + + return { + BlockClassifier: { + transactionsPerId: new Map<string, transaction>(block.transaction_ids.map((id, index) => [id, transactions[index].transaction])), + transactions + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/collectors/jsonrpc/dynamic-global-properties-collector.ts b/src/chain-observers/collectors/jsonrpc/dynamic-global-properties-collector.ts new file mode 100644 index 0000000..664c68c --- /dev/null +++ b/src/chain-observers/collectors/jsonrpc/dynamic-global-properties-collector.ts @@ -0,0 +1,22 @@ +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export class DynamicGlobalPropertiesCollector extends CollectorBase { + public async fetchData(_: DataEvaluationContext) { + const { + current_witness, + head_block_number, + time, + head_block_id + } = await this.worker.chain!.api.database_api.get_dynamic_global_properties({}); + + return { + DynamicGlobalPropertiesClassifier: { + currentWitness: current_witness, + headBlockNumber: head_block_number, + headBlockTime: new Date(`${time}Z`), + headBlockId: head_block_id + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/collectors/jsonrpc/rc-account-collector.ts b/src/chain-observers/collectors/jsonrpc/rc-account-collector.ts new file mode 100644 index 0000000..5475627 --- /dev/null +++ b/src/chain-observers/collectors/jsonrpc/rc-account-collector.ts @@ -0,0 +1,51 @@ +import { IRcAccount } from "../../classifiers/rc-account-classifier"; +import { DataEvaluationContext } from "../../factories/data-evaluation-context"; +import { CollectorBase, TAvailableClassifiers } from "../collector-base"; + +export interface IRcAccountCollectorOptions { + rcAccount: string; +} + +const MAX_RC_ACCOUNT_GET_LIMIT = 100; + +export class RcAccountCollector extends CollectorBase { + private readonly rcAccounts: Record<string, number> = {}; + + protected pushOptions(data: IRcAccountCollectorOptions): void { + this.rcAccounts[data.rcAccount] = (this.rcAccounts[data.rcAccount] || 0) + 1; + } + + protected popOptions(data: IRcAccountCollectorOptions): void { + this.rcAccounts[data.rcAccount] = (this.rcAccounts[data.rcAccount] || 1) - 1; + + if (this.rcAccounts[data.rcAccount] === 0) + delete this.rcAccounts[data.rcAccount]; + } + + public async fetchData(_: DataEvaluationContext) { + const rcAccounts: Record<string, IRcAccount> = {}; + + const accountNames = Object.keys(this.rcAccounts); + for (let i = 0; i < accountNames.length; i += MAX_RC_ACCOUNT_GET_LIMIT) { + const chunk = accountNames.slice(i, i + MAX_RC_ACCOUNT_GET_LIMIT); + + const { rc_accounts: apiRcAccounts } = await this.worker.chain!.api.rc_api.find_rc_accounts({ accounts: chunk }); + + for(const rcAccount of apiRcAccounts) + rcAccounts[rcAccount.account] = { + name: rcAccount.account, + manabar: { + currentMana: String(rcAccount.rc_manabar.current_mana), + lastUpdateTime: new Date(rcAccount.rc_manabar.last_update_time * 1000) + }, + maxRc: String(rcAccount.max_rc) + }; + } + + return { + RcAccountClassifier: { + rcAccounts + } + } satisfies Partial<TAvailableClassifiers>; + }; +} diff --git a/src/chain-observers/factories/data-evaluation-context.ts b/src/chain-observers/factories/data-evaluation-context.ts new file mode 100644 index 0000000..baf37de --- /dev/null +++ b/src/chain-observers/factories/data-evaluation-context.ts @@ -0,0 +1,45 @@ +import { WorkerBeeError } from "../../errors"; +import type * as TClassifiers from "../classifiers"; +import { CollectorClassifierBase, IEvaluationContextClass } from "../classifiers/collector-classifier-base"; + +import { CollectorBase, TAvailableClassifiers } from "../collectors/collector-base"; + +export type TAvailableCollectorFunctions = { + [key in keyof typeof TClassifiers]: CollectorBase; +}; + +export class DataEvaluationContext { + private readonly cachedFunctions = new Map<CollectorBase, Promise<Partial<TAvailableClassifiers>>>(); + private readonly collectors: TAvailableCollectorFunctions = {} as TAvailableCollectorFunctions; + + public inject<T extends IEvaluationContextClass>( + evaluationContext: T, + collector: CollectorBase + ): void { + if (this.collectors[evaluationContext.name] !== undefined) + return; // Already registered + + this.collectors[evaluationContext.name] = collector; + } + + public async get<T extends IEvaluationContextClass, R = T extends new () => infer R ? R : never>( + evaluationContext: T + ): Promise<R extends CollectorClassifierBase ? R["type"] : never> { + const collector = this.collectors[evaluationContext.name]; + + if (collector === undefined) + throw new WorkerBeeError(`Collector for evaluation context: "${evaluationContext.name}" not found`); + + let cached = this.cachedFunctions.get(collector); + + if (cached === undefined) { + cached = collector.fetchData(this); + + this.cachedFunctions.set(collector, cached!); + } + + const result = await cached!; + + return result[evaluationContext.name] as any; + } +} diff --git a/src/chain-observers/factories/factory-base.ts b/src/chain-observers/factories/factory-base.ts new file mode 100644 index 0000000..bb1f54e --- /dev/null +++ b/src/chain-observers/factories/factory-base.ts @@ -0,0 +1,58 @@ +import type { WorkerBee } from "../../bot"; +import { IEvaluationContextClass, TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { CollectorBase } from "../collectors/collector-base"; +import { DataEvaluationContext } from "./data-evaluation-context"; + +export class FactoryBase { + protected collectors!: Map<IEvaluationContextClass, CollectorBase>; + + public constructor( + protected readonly worker: WorkerBee + ) {} + + public pushClassifier(classifier: TRegisterEvaluationContext): void { + const classifierClass = "class" in classifier ? classifier.class : classifier; + + const instance = this.collectors.get(classifierClass); + if (instance === undefined) + throw new Error(`Classifier "${classifierClass.name}" is not supported by factory "${(this as any).__proto__.constructor.name}"`); + + instance.register("options" in classifier ? classifier.options : undefined); + + for(const dependency of instance.usedContexts()) + this.pushClassifier(dependency); + } + + public popClassifier(classifier: TRegisterEvaluationContext): void { + const classifierClass = "class" in classifier ? classifier.class : classifier; + + const instance = this.collectors.get(classifierClass); + if (instance === undefined) + throw new Error(`Classifier "${classifierClass.name}" is not supported by factory "${(this as any).__proto__.constructor.name}"`); + + instance.unregister("options" in classifier ? classifier.options : undefined); + + for(const dependency of instance.usedContexts()) + this.popClassifier(dependency); + } + + private rebuildDataEvaluationContext(): DataEvaluationContext { + const context = new DataEvaluationContext(); + + for(const [contextClass, collectorInstance] of this.collectors) { + // XXX: Rewrite this on overriding logic: + if (!collectorInstance.hasRegistered) // Ignore collectors that have no registered classifiers + continue; + + // Dependencies are already pushed by the classifier + + context.inject(contextClass, collectorInstance); + } + + return context; + } + + public collect(): DataEvaluationContext { + return this.rebuildDataEvaluationContext(); + } +} diff --git a/src/chain-observers/factories/jsonrpc/factory-data.ts b/src/chain-observers/factories/jsonrpc/factory-data.ts new file mode 100644 index 0000000..9d6ddde --- /dev/null +++ b/src/chain-observers/factories/jsonrpc/factory-data.ts @@ -0,0 +1,24 @@ +import type { WorkerBee } from "../../../bot"; +import { + AccountClassifier, BlockClassifier, BlockHeaderClassifier, + DynamicGlobalPropertiesClassifier, ImpactedAccountClassifier, OperationClassifier, RcAccountClassifier +} from "../../classifiers"; +import { IEvaluationContextClass } from "../../classifiers/collector-classifier-base"; +import { CollectorBase } from "../../collectors/collector-base"; +import { BlockHeaderCollector } from "../../collectors/common/block-header-collector"; +import { ImpactedAccountCollector } from "../../collectors/common/impacted-account-collector"; +import { OperationCollector } from "../../collectors/common/operation-collector"; +import { AccountCollector } from "../../collectors/jsonrpc/account-collector"; +import { BlockCollector } from "../../collectors/jsonrpc/block-collector"; +import { DynamicGlobalPropertiesCollector } from "../../collectors/jsonrpc/dynamic-global-properties-collector"; +import { RcAccountCollector } from "../../collectors/jsonrpc/rc-account-collector"; + +export const JsonRpcFactoryData: (worker: WorkerBee) => Array<[IEvaluationContextClass, CollectorBase]> = (worker: WorkerBee) => ([ + [BlockHeaderClassifier, new BlockHeaderCollector(worker)], + [DynamicGlobalPropertiesClassifier, new DynamicGlobalPropertiesCollector(worker)], + [BlockClassifier, new BlockCollector(worker)], + [AccountClassifier, new AccountCollector(worker)], + [RcAccountClassifier, new RcAccountCollector(worker)], + [ImpactedAccountClassifier, new ImpactedAccountCollector(worker)], + [OperationClassifier, new OperationCollector(worker)] +]); diff --git a/src/chain-observers/factories/jsonrpc/factory.ts b/src/chain-observers/factories/jsonrpc/factory.ts new file mode 100644 index 0000000..4421ab6 --- /dev/null +++ b/src/chain-observers/factories/jsonrpc/factory.ts @@ -0,0 +1,13 @@ +import { WorkerBee } from "../../../bot"; +import { FactoryBase } from "../factory-base"; +import { JsonRpcFactoryData } from "./factory-data"; + +export class JsonRpcFactory extends FactoryBase { + public constructor( + protected readonly worker: WorkerBee + ) { + super(worker); + + this.collectors = new Map(JsonRpcFactoryData(worker)); + } +} diff --git a/src/chain-observers/filters/account-created-filter.ts b/src/chain-observers/filters/account-created-filter.ts new file mode 100644 index 0000000..9171c77 --- /dev/null +++ b/src/chain-observers/filters/account-created-filter.ts @@ -0,0 +1,34 @@ +import type { WorkerBee } from "../../bot"; +import { OperationClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class AccountCreatedFilter extends FilterBase { + public constructor( + worker: WorkerBee + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { operationsPerType } = await data.get(OperationClassifier); + + for(const {} of (operationsPerType.account_create ?? [])) + return true; + + for(const {} of (operationsPerType.account_create_with_delegation ?? [])) + return true; + + for(const {} of (operationsPerType.create_claimed_account ?? [])) + return true; + + return false; + } +} diff --git a/src/chain-observers/filters/account-full-manabar-filter.ts b/src/chain-observers/filters/account-full-manabar-filter.ts new file mode 100644 index 0000000..d6a0e82 --- /dev/null +++ b/src/chain-observers/filters/account-full-manabar-filter.ts @@ -0,0 +1,38 @@ +import type { WorkerBee } from "../../bot"; +import { DynamicGlobalPropertiesClassifier, RcAccountClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { IRcAccountCollectorOptions } from "../collectors/jsonrpc/rc-account-collector"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class AccountFullManabarFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + DynamicGlobalPropertiesClassifier, + RcAccountClassifier.forOptions({ rcAccount: this.account } satisfies IRcAccountCollectorOptions) + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const rcAccountClassifier = await data.get(RcAccountClassifier); + const dgpo = await data.get(DynamicGlobalPropertiesClassifier); + + const rcAccount = rcAccountClassifier.rcAccounts[this.account]; + + const manabarData = this.worker.chain!.calculateCurrentManabarValue( + Math.round(dgpo.headBlockTime.getTime() / 1000), + rcAccount.maxRc, + rcAccount.manabar.currentMana, + rcAccount.manabar.lastUpdateTime.getTime() / 1000 + ); + + return manabarData.percent >= 98; + } +} diff --git a/src/chain-observers/filters/account-metadata-change-filter.ts b/src/chain-observers/filters/account-metadata-change-filter.ts new file mode 100644 index 0000000..e426b92 --- /dev/null +++ b/src/chain-observers/filters/account-metadata-change-filter.ts @@ -0,0 +1,49 @@ +import type { WorkerBee } from "../../bot"; +import { AccountClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class AccountMetadataChangeFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + AccountClassifier.forOptions({ + account: this.account + }) + ]; + } + + private previousJsonMetadata?: string; + private previousPostingJsonMetadata?: string; + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { accounts } = await data.get(AccountClassifier); + + const account = accounts[this.account]; + + if (this.previousJsonMetadata === undefined) { + this.previousJsonMetadata = JSON.stringify(account.jsonMetadata); + this.previousPostingJsonMetadata = JSON.stringify(account.postingJsonMetadata); + + return false; + } + + const postingMeta = JSON.stringify(account.postingJsonMetadata); + const accMeta = JSON.stringify(account.jsonMetadata); + + const changedAccMeta = accMeta !== this.previousJsonMetadata; + const changedPosting = postingMeta !== this.previousPostingJsonMetadata; + + this.previousJsonMetadata = accMeta; + this.previousPostingJsonMetadata = postingMeta; + + return changedAccMeta || changedPosting; + } +} diff --git a/src/chain-observers/filters/balance-change-filter.ts b/src/chain-observers/filters/balance-change-filter.ts new file mode 100644 index 0000000..fe1b7a9 --- /dev/null +++ b/src/chain-observers/filters/balance-change-filter.ts @@ -0,0 +1,64 @@ +import type { WorkerBee } from "../../bot"; +import { AccountClassifier } from "../classifiers"; +import { IAccountBalance } from "../classifiers/account-classifier"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class BalanceChangeFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string, + private readonly includeInternalTransfers: boolean = false + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + AccountClassifier.forOptions({ + account: this.account + }) + ]; + } + + private previousBalance?: IAccountBalance; + + private parseInternalTransfers(balance: IAccountBalance): boolean { + for(const asset in this.previousBalance) + for(const type in this.previousBalance[asset]) + if (this.previousBalance[asset][type].amount !== balance[asset][type].amount) { + this.previousBalance = balance; + + return true; + } + + + this.previousBalance = balance; + + return false; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { accounts } = await data.get(AccountClassifier); + + const account = accounts[this.account]; + + if (this.previousBalance === undefined) { + this.previousBalance = account.balance; + + return false; + } + + if (this.includeInternalTransfers) + return this.parseInternalTransfers(account.balance); + + const changedHP = this.previousBalance.HP.total.amount !== account.balance.HP.total.amount; + const changedHIVE = this.previousBalance.HIVE.total.amount !== account.balance.HIVE.total.amount; + const changedHBD = this.previousBalance.HBD.total.amount !== account.balance.HBD.total.amount; + + this.previousBalance = account.balance; + + return changedHP || changedHIVE || changedHBD; + } +} diff --git a/src/chain-observers/filters/block-filter.ts b/src/chain-observers/filters/block-filter.ts new file mode 100644 index 0000000..bcb72b5 --- /dev/null +++ b/src/chain-observers/filters/block-filter.ts @@ -0,0 +1,26 @@ +import type { WorkerBee } from "../../bot"; +import { BlockHeaderClassifier } from "../classifiers/block-header-classifier"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class BlockNumberFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly number: number + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + BlockHeaderClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const block = await data.get(BlockHeaderClassifier); + + return block.number === this.number; + } +} diff --git a/src/chain-observers/filters/composite-filter.ts b/src/chain-observers/filters/composite-filter.ts new file mode 100644 index 0000000..b3c7ff3 --- /dev/null +++ b/src/chain-observers/filters/composite-filter.ts @@ -0,0 +1,75 @@ +import type { WorkerBee } from "../../bot"; +import { WorkerBeeUnsatisfiedFilterError } from "../../errors"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "../filters/filter-base"; + +abstract class CompositeFilter extends FilterBase { + public constructor( + worker: WorkerBee, + protected readonly operands: FilterBase[] + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + const collectorSet = new Set<TRegisterEvaluationContext>(); + for(const operand of this.operands) + for(const collector of operand.usedContexts()) + collectorSet.add(collector); + + return [...collectorSet]; + } + + protected async evaluateOperands(context: DataEvaluationContext, forceCancelValue?: boolean, forceResolveValue?: boolean): Promise<void> { + let forceResolve = () => {}; + let forceReject = (_: WorkerBeeUnsatisfiedFilterError) => {}; + + const forcePromise = new Promise<void>((resolve, reject) => { + forceResolve = resolve; + forceReject = reject; + }); + + await Promise.race([ + forcePromise, + Promise.all(this.operands.map((filter) => filter.match(context).then(evaluationResult => { + if(evaluationResult === forceResolveValue) + forceResolve(); + else if (evaluationResult === forceCancelValue) + forceReject(new WorkerBeeUnsatisfiedFilterError()); + + return evaluationResult; + }))) + ]); + } +}; + +export class LogicalAndFilter extends CompositeFilter { + public async match(context: DataEvaluationContext): Promise<boolean> { + try { + await this.evaluateOperands(context, false); + + return true; + } catch(error) { + if(typeof error === "object" && error instanceof WorkerBeeUnsatisfiedFilterError) + return false; + + throw error; + } + } +}; + +export class LogicalOrFilter extends CompositeFilter { + public async match(context: DataEvaluationContext): Promise<boolean> { + try { + await this.evaluateOperands(context, undefined, true); + + return true; + } catch(error) { + if(typeof error === "object" && error instanceof WorkerBeeUnsatisfiedFilterError) + return false; + + throw error; + } + } +}; diff --git a/src/chain-observers/filters/custom-operation-filter.ts b/src/chain-observers/filters/custom-operation-filter.ts new file mode 100644 index 0000000..0ad57bf --- /dev/null +++ b/src/chain-observers/filters/custom-operation-filter.ts @@ -0,0 +1,34 @@ +import type { WorkerBee } from "../../bot"; +import { OperationClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class CustomOperationFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly id: string | number + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { operationsPerType } = await data.get(OperationClassifier); + + for(const { operation } of (operationsPerType.custom_json ?? [])) + if (operation.id === this.id) + return true; + + for(const { operation } of (operationsPerType.custom ?? [])) + if (operation.id === this.id) + return true; + + return false; + } +} diff --git a/src/chain-observers/filters/filter-base.ts b/src/chain-observers/filters/filter-base.ts new file mode 100644 index 0000000..01b113c --- /dev/null +++ b/src/chain-observers/filters/filter-base.ts @@ -0,0 +1,13 @@ +import type { WorkerBee } from "../../bot"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; + +export abstract class FilterBase { + public constructor( + protected readonly worker: WorkerBee + ) {} + + public abstract usedContexts(): Array<TRegisterEvaluationContext>; + + public abstract match(data: DataEvaluationContext): Promise<boolean>; +} diff --git a/src/chain-observers/filters/follow-filter.ts b/src/chain-observers/filters/follow-filter.ts new file mode 100644 index 0000000..f4f1202 --- /dev/null +++ b/src/chain-observers/filters/follow-filter.ts @@ -0,0 +1,34 @@ +import type { WorkerBee } from "../../bot"; +import { OperationClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class FollowFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { operationsPerType } = await data.get(OperationClassifier); + + for(const { operation } of (operationsPerType.custom_json ?? [])) + if (operation.id === "follow") { + const json = JSON.parse(operation.json); + + if (json[0] === "follow" && json[1].follower === this.account) + return true; + } + + return false; + } +} diff --git a/src/chain-observers/filters/impacted-account-filter.ts b/src/chain-observers/filters/impacted-account-filter.ts new file mode 100644 index 0000000..72cb497 --- /dev/null +++ b/src/chain-observers/filters/impacted-account-filter.ts @@ -0,0 +1,26 @@ +import type { WorkerBee } from "../../bot"; +import { ImpactedAccountClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class ImpactedAccountFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + ImpactedAccountClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const impactedAccount = await data.get(ImpactedAccountClassifier); + + return impactedAccount.impactedAccounts[this.account] !== undefined; + } +} diff --git a/src/chain-observers/filters/new-block-filter.ts b/src/chain-observers/filters/new-block-filter.ts new file mode 100644 index 0000000..90d3a6a --- /dev/null +++ b/src/chain-observers/filters/new-block-filter.ts @@ -0,0 +1,24 @@ +import { BlockHeaderClassifier } from "../classifiers/block-header-classifier"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class BlockChangedFilter extends FilterBase { + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + BlockHeaderClassifier + ]; + } + + private previousBlock?: number; + + public async match(data: DataEvaluationContext): Promise<boolean> { + const block = await data.get(BlockHeaderClassifier); + + const blockChanged = this.previousBlock !== block.number; + + this.previousBlock = block.number; + + return blockChanged; + } +} diff --git a/src/chain-observers/filters/post-filter.ts b/src/chain-observers/filters/post-filter.ts new file mode 100644 index 0000000..08879bf --- /dev/null +++ b/src/chain-observers/filters/post-filter.ts @@ -0,0 +1,34 @@ +import type { WorkerBee } from "../../bot"; +import { ImpactedAccountClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class PostFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + ImpactedAccountClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { impactedAccounts } = await data.get(ImpactedAccountClassifier); + + const account = impactedAccounts[this.account]; + + for(const { operation } of account.operations) + if(operation.comment) + if (operation.comment.author === this.account) + return true; + + + return false; + } +} diff --git a/src/chain-observers/filters/post-mention.ts b/src/chain-observers/filters/post-mention.ts new file mode 100644 index 0000000..7add925 --- /dev/null +++ b/src/chain-observers/filters/post-mention.ts @@ -0,0 +1,31 @@ +import type { WorkerBee } from "../../bot"; +import { mentionedAccount } from "../../utils/mention"; +import { OperationClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class PostMentionFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { operationsPerType } = await data.get(OperationClassifier); + + for(const { operation } of (operationsPerType.comment ?? [])) + if (mentionedAccount(this.account, operation.body)) + return true; + + return false; + } +} diff --git a/src/chain-observers/filters/reblog-filter.ts b/src/chain-observers/filters/reblog-filter.ts new file mode 100644 index 0000000..b43d3d2 --- /dev/null +++ b/src/chain-observers/filters/reblog-filter.ts @@ -0,0 +1,34 @@ +import type { WorkerBee } from "../../bot"; +import { OperationClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class ReblogFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { operationsPerType } = await data.get(OperationClassifier); + + for(const { operation } of (operationsPerType.custom_json ?? [])) + if (operation.id === "follow") { + const json = JSON.parse(operation.json); + + if (json[0] === "reblog" && json[1].account === this.account) + return true; + } + + return false; + } +} diff --git a/src/chain-observers/filters/transaction-id-filter.ts b/src/chain-observers/filters/transaction-id-filter.ts new file mode 100644 index 0000000..f6f3aa2 --- /dev/null +++ b/src/chain-observers/filters/transaction-id-filter.ts @@ -0,0 +1,26 @@ +import type { WorkerBee } from "../../bot"; +import { BlockClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class TransactionIdFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly transactionId: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + BlockClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const block = await data.get(BlockClassifier); + + return block.transactionsPerId.has(this.transactionId); + } +} diff --git a/src/chain-observers/filters/vote-filter.ts b/src/chain-observers/filters/vote-filter.ts new file mode 100644 index 0000000..677d08a --- /dev/null +++ b/src/chain-observers/filters/vote-filter.ts @@ -0,0 +1,37 @@ +import type { WorkerBee } from "../../bot"; +import { ImpactedAccountClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class VoteFilter extends FilterBase { + public constructor( + worker: WorkerBee, + private readonly account: string + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + ImpactedAccountClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const { impactedAccounts } = await data.get(ImpactedAccountClassifier); + + const account = impactedAccounts[this.account]; + + for(const { operation } of account.operations) + if(operation.vote) { + if (operation.vote.author === this.account) + return true; + } else if (operation.account_witness_vote) + if (operation.account_witness_vote.account === this.account && operation.account_witness_vote.approve) + return true; + + + return false; + } +} diff --git a/src/chain-observers/filters/whale-alert-filter.ts b/src/chain-observers/filters/whale-alert-filter.ts new file mode 100644 index 0000000..4373551 --- /dev/null +++ b/src/chain-observers/filters/whale-alert-filter.ts @@ -0,0 +1,59 @@ +import type { asset } from "@hiveio/wax"; +import type { WorkerBee } from "../../bot"; +import { isGreaterThan } from "../../utils/assets"; +import { OperationClassifier } from "../classifiers"; +import type { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import type { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { FilterBase } from "./filter-base"; + +export class WhaleAlertFilter extends FilterBase { + /** + * @param worker @internal + * @param amount Any amount - HIVE/HBD in coins (no precision) + */ + public constructor( + worker: WorkerBee, + private readonly asset: asset + ) { + super(worker); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async match(data: DataEvaluationContext): Promise<boolean> { + const operations = await data.get(OperationClassifier); + + const transfer = operations.operationsPerType["transfer"]; + const fromSavings = operations.operationsPerType["transfer_from_savings"]; + const escrow = operations.operationsPerType["escrow_transfer"]; + const recurrent = operations.operationsPerType["recurrent_transfer"]; + + if(transfer) + for(const op of transfer) + if(isGreaterThan(this.asset, op.operation.amount!)) + return true; + + if(fromSavings) + for(const op of fromSavings) + if(isGreaterThan(this.asset, op.operation.amount!)) + return true; + + if(escrow) + for(const op of escrow) + if(isGreaterThan(this.asset, op.operation.hbd_amount!)) + return true; + else if(isGreaterThan(this.asset, op.operation.hive_amount!)) + return true; + + if(recurrent) + for(const op of recurrent) + if(isGreaterThan(this.asset, op.operation.amount!)) + return true; + + return false; + } +} diff --git a/src/chain-observers/observer-mediator.ts b/src/chain-observers/observer-mediator.ts new file mode 100644 index 0000000..86e90b1 --- /dev/null +++ b/src/chain-observers/observer-mediator.ts @@ -0,0 +1,79 @@ +import { WorkerBee } from "../bot"; +import { type Observer } from "../types/subscribable"; +import { DataEvaluationContext } from "./factories/data-evaluation-context"; +import { FactoryBase } from "./factories/factory-base"; +import { JsonRpcFactory } from "./factories/jsonrpc/factory"; +import { FilterBase } from "./filters/filter-base"; +import { ProviderBase } from "./providers/provider-base"; + +export type ListenerType = (data: DataEvaluationContext) => void; + +export type FactoryBaseConstructor = new (worker: WorkerBee) => FactoryBase; + +export class ObserverMediator { + private readonly factory: FactoryBase; + + public constructor( + worker: WorkerBee, + factory: FactoryBaseConstructor = JsonRpcFactory + ) { + this.factory = new factory(worker); + } + + private filters = new Map<Partial<Observer<any>>, { filter: FilterBase; providers: ProviderBase[]; }>(); + + public notify() { + const context = this.factory.collect(); + + // Start providing parsed, cached data to filters + for(const [listener, { filter, providers }] of this.filters.entries()) + filter.match(context).then(async(matched) => { + if(!matched) + return; + + // Join all providers data for user (1 level nested) + const providedData = {}; + for(const provider of providers) { + const providerResult = await provider.provide(context); + for(const key in providerResult) + if (Array.isArray(providerResult[key])) + providedData[key] = (providedData[key] ?? []).concat(providerResult[key]); + else + providedData[key] = { ...(providedData[key] ?? {}), ...providerResult[key] }; + } + + listener.next?.(providedData); + }).catch(error => listener.error?.(error)); + } + + public registerListener(listener: Partial<Observer<any>>, filter: FilterBase, providers: ProviderBase[]) { + this.filters.set(listener, { filter, providers }); + + for(const classifier of filter.usedContexts()) + this.factory.pushClassifier(classifier); + + for(const classifier of providers) + for(const usedContext of classifier.usedContexts()) + this.factory.pushClassifier(usedContext); + } + + public unregisterListener(listener: Partial<Observer<any>>) { + const filter = this.filters.get(listener); + if (!filter) + return; + + for(const classifier of filter.filter.usedContexts()) + this.factory.popClassifier(classifier); + + for(const classifier of filter.providers) + for(const usedContext of classifier.usedContexts()) + this.factory.popClassifier(usedContext); + + this.filters.delete(listener); + } + + public unregisterAllListeners() { + for(const listener of this.filters.keys()) + this.unregisterListener(listener); + } +} diff --git a/src/chain-observers/providers/account-provider.ts b/src/chain-observers/providers/account-provider.ts new file mode 100644 index 0000000..68a1bb5 --- /dev/null +++ b/src/chain-observers/providers/account-provider.ts @@ -0,0 +1,40 @@ +import { TAccountName } from "@hiveio/wax"; +import { AccountClassifier } from "../classifiers"; +import { IAccount } from "../classifiers/account-classifier"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export type TAccountProvided<TAccounts extends Array<TAccountName>> = { + [K in TAccounts[number]]: IAccount; +}; + +export interface IAccountProviderData<TAccounts extends Array<TAccountName>> { + accounts: TAccountProvided<TAccounts>; +}; + +export class AccountProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase { + public constructor( + private readonly accounts: TAccounts + ) { + super(); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return this.accounts.map(account => AccountClassifier.forOptions({ + account + })); + } + + public async provide(data: DataEvaluationContext): Promise<IAccountProviderData<TAccounts>> { + const result = { + accounts: {} + }; + + const accounts = await data.get(AccountClassifier); + for(const account of this.accounts) + result.accounts[account] = accounts.accounts[account]; + + return result as IAccountProviderData<TAccounts>; + } +} diff --git a/src/chain-observers/providers/block-header-provider.ts b/src/chain-observers/providers/block-header-provider.ts new file mode 100644 index 0000000..187516e --- /dev/null +++ b/src/chain-observers/providers/block-header-provider.ts @@ -0,0 +1,26 @@ +import { BlockHeaderClassifier, IBlockHeaderData } from "../classifiers/block-header-classifier"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export interface IBlockHeaderProviderData { + block: IBlockHeaderData; +}; + +export class BlockHeaderProvider extends ProviderBase { + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + BlockHeaderClassifier + ] + } + + public async provide(data: DataEvaluationContext): Promise<IBlockHeaderProviderData> { + const blockHeader = await data.get(BlockHeaderClassifier); + + return { + block: { + ...blockHeader + } + }; + } +} diff --git a/src/chain-observers/providers/block-provider.ts b/src/chain-observers/providers/block-provider.ts new file mode 100644 index 0000000..be8449b --- /dev/null +++ b/src/chain-observers/providers/block-provider.ts @@ -0,0 +1,30 @@ +import { BlockClassifier, IBlockData } from "../classifiers/block-classifier"; +import { BlockHeaderClassifier, IBlockHeaderData } from "../classifiers/block-header-classifier"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export interface IBlockProviderData { + block: IBlockHeaderData & IBlockData; +}; + +export class BlockProvider extends ProviderBase { + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + BlockHeaderClassifier, + BlockClassifier + ] + } + + public async provide(data: DataEvaluationContext): Promise<IBlockProviderData> { + const blockHeader = await data.get(BlockHeaderClassifier); + const block = await data.get(BlockClassifier); + + return { + block: { + ...blockHeader, + ...block + } + }; + } +} diff --git a/src/chain-observers/providers/mention-provider.ts b/src/chain-observers/providers/mention-provider.ts new file mode 100644 index 0000000..2e4aa99 --- /dev/null +++ b/src/chain-observers/providers/mention-provider.ts @@ -0,0 +1,45 @@ +import { comment, TAccountName } from "@hiveio/wax"; +import { WorkerBeeIterable } from "../../types/iterator"; +import { mentionedAccount } from "../../utils/mention"; +import { OperationClassifier } from "../classifiers"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export type TMentionedAccountProvided<TMentions extends Array<TAccountName>> = { + [K in TMentions[number]]: WorkerBeeIterable<comment>; +}; + +export interface IMentionedAccountProviderData<TMentions extends Array<TAccountName>> { + mentioned: TMentionedAccountProvided<TMentions>; +}; + +export class MentionedAccountProvider<TMentions extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase { + public constructor( + private readonly accounts: TMentions + ) { + super(); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ]; + } + + public async provide(data: DataEvaluationContext): Promise<IMentionedAccountProviderData<TMentions>> { + const mentioned: Record<string, comment[]> = Object.fromEntries(this.accounts.map(account => [account, []])); + + const { operationsPerType } = await data.get(OperationClassifier); + + for(const { operation } of (operationsPerType.comment ?? [])) + for(const account of this.accounts) + if (mentionedAccount(account, operation.body)) + mentioned[account].push(operation); + + + return { + mentioned: Object.entries(([account, comments]) => [account, { iterate: () => comments }]) + } as IMentionedAccountProviderData<TMentions>; + } +} diff --git a/src/chain-observers/providers/provider-base.ts b/src/chain-observers/providers/provider-base.ts new file mode 100644 index 0000000..bd689da --- /dev/null +++ b/src/chain-observers/providers/provider-base.ts @@ -0,0 +1,8 @@ +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; + +export abstract class ProviderBase { + public abstract usedContexts(): Array<TRegisterEvaluationContext>; + + public abstract provide(data: DataEvaluationContext): Promise<any>; +} diff --git a/src/chain-observers/providers/rc-account-provider.ts b/src/chain-observers/providers/rc-account-provider.ts new file mode 100644 index 0000000..b87ead2 --- /dev/null +++ b/src/chain-observers/providers/rc-account-provider.ts @@ -0,0 +1,39 @@ +import { TAccountName } from "@hiveio/wax"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { IRcAccount, RcAccountClassifier } from "../classifiers/rc-account-classifier"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export type TRcAccountProvided<TAccounts extends Array<TAccountName>> = { + [K in TAccounts[number]]: IRcAccount; +}; + +export interface IAccountProviderData<TAccounts extends Array<TAccountName>> { + rcAccounts: TRcAccountProvided<TAccounts>; +}; + +export class RcAccountProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase { + public constructor( + private readonly rcAccounts: TAccounts + ) { + super(); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return this.rcAccounts.map(rcAccount => RcAccountClassifier.forOptions({ + rcAccount + })); + } + + public async provide(data: DataEvaluationContext): Promise<IAccountProviderData<TAccounts>> { + const result = { + rcAccounts: {} + }; + + const rcAccounts = await data.get(RcAccountClassifier); + for(const rcAccount of this.rcAccounts) + result.rcAccounts[rcAccount] = rcAccounts.rcAccounts[rcAccount]; + + return result as IAccountProviderData<TAccounts>; + } +} diff --git a/src/chain-observers/providers/transaction-provider.ts b/src/chain-observers/providers/transaction-provider.ts new file mode 100644 index 0000000..5a8deb9 --- /dev/null +++ b/src/chain-observers/providers/transaction-provider.ts @@ -0,0 +1,40 @@ +import { transaction } from "@hiveio/wax"; +import { BlockClassifier } from "../classifiers"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export type TTransactionProvider<TIdOfTx extends Array<string>> = { + [K in TIdOfTx[number]]: transaction; +}; + +export interface IAccountProviderData<TIdOfTx extends Array<string>> { + transactions: Partial<TTransactionProvider<TIdOfTx>>; +}; + +export class TransactionByIdProvider<TIdOfTx extends Array<string> = Array<string>> extends ProviderBase { + public constructor( + private readonly transactionIds: TIdOfTx + ) { + super(); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + BlockClassifier + ]; + } + + public async provide(data: DataEvaluationContext): Promise<IAccountProviderData<TIdOfTx>> { + const result = { + transactions: {} + }; + + const block = await data.get(BlockClassifier); + for(const txId of this.transactionIds) + if (block.transactionsPerId.has(txId)) + result.transactions[txId] = block.transactionsPerId.get(txId); + + return result as IAccountProviderData<TIdOfTx>; + } +} diff --git a/src/chain-observers/providers/whale-alert-provider.ts b/src/chain-observers/providers/whale-alert-provider.ts new file mode 100644 index 0000000..f9c9740 --- /dev/null +++ b/src/chain-observers/providers/whale-alert-provider.ts @@ -0,0 +1,103 @@ +import type { asset } from "@hiveio/wax"; +import { WorkerBeeIterable } from "../../types/iterator"; +import { isGreaterThan } from "../../utils/assets"; +import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base"; +import { OperationClassifier, IOperationTransactionPair } from "../classifiers/operation-classifier"; +import { DataEvaluationContext } from "../factories/data-evaluation-context"; +import { ProviderBase } from "./provider-base"; + +export interface IWhaleAlertMetadata { + from: string; + to: string; + amount: asset; +} + +export interface IWhaleAlertProviderData { + whaleOperations: WorkerBeeIterable<IOperationTransactionPair<IWhaleAlertMetadata>>; +}; + +export class WhaleAlertProvider extends ProviderBase { + public constructor( + private readonly asset: asset + ) { + super(); + } + + public usedContexts(): Array<TRegisterEvaluationContext> { + return [ + OperationClassifier + ] + } + + public async provide(data: DataEvaluationContext): Promise<IWhaleAlertProviderData> { + const operations = await data.get(OperationClassifier); + + const transfer = operations.operationsPerType["transfer"]; + const fromSavings = operations.operationsPerType["transfer_from_savings"]; + const escrow = operations.operationsPerType["escrow_transfer"]; + const recurrent = operations.operationsPerType["recurrent_transfer"]; + + const whaleOperations: IOperationTransactionPair<IWhaleAlertMetadata>[] = []; + + if(transfer) + for(const op of transfer) + if(isGreaterThan(this.asset, op.operation.amount!)) + whaleOperations.push({ + operation: { + from: op.operation.from_account, + to: op.operation.to_account, + amount: op.operation.amount! + }, + transaction: op.transaction + }); + + if(fromSavings) + for(const op of fromSavings) + if(isGreaterThan(this.asset, op.operation.amount!)) + whaleOperations.push({ + operation: { + from: op.operation.from_account, + to: op.operation.to_account, + amount: op.operation.amount! + }, + transaction: op.transaction + }); + + if(escrow) + for(const op of escrow) + if(isGreaterThan(this.asset, op.operation.hbd_amount!)) + whaleOperations.push({ + operation: { + from: op.operation.from_account, + to: op.operation.to_account, + amount: op.operation.hbd_amount! + }, + transaction: op.transaction + }); + else if(isGreaterThan(this.asset, op.operation.hive_amount!)) + whaleOperations.push({ + operation: { + from: op.operation.from_account, + to: op.operation.to_account, + amount: op.operation.hive_amount! + }, + transaction: op.transaction + }); + + if(recurrent) + for(const op of recurrent) + if(isGreaterThan(this.asset, op.operation.amount!)) + whaleOperations.push({ + operation: { + from: op.operation.from_account, + to: op.operation.to_account, + amount: op.operation.amount! + }, + transaction: op.transaction + }); + + return { + whaleOperations: new WorkerBeeIterable(whaleOperations) + }; + } +} diff --git a/src/errors.ts b/src/errors.ts index 963f7ef..3021a64 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -4,6 +4,11 @@ export class WorkerBeeError extends Error { public constructor(message: string, public readonly originator?: Error | any) { super(message); - this.name = "WorkerBeeError"; + } +} + +export class WorkerBeeUnsatisfiedFilterError extends WorkerBeeError { + public constructor() { + super("Unsatisfied filter"); } } diff --git a/src/interfaces.ts b/src/interfaces.ts index cb30acd..6882c04 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,66 +1,10 @@ import type { IBeekeeperUnlockedWallet } from "@hiveio/beekeeper"; -import type { ApiAccount, ApiBlock, ApiTransaction, IHiveChainInterface, ITransaction, operation } from "@hiveio/wax"; -import type { Subscribable } from "rxjs"; +import type { ApiTransaction, IHiveChainInterface, ITransaction, transaction} from "@hiveio/wax"; import type { IStartConfiguration } from "./bot"; -import type { WorkerBeeError } from "./errors"; - -export interface IBlockData { - number: number; - block: ApiBlock; -} - -export interface ITransactionData { - id: string; - transaction: ApiTransaction; - block: IBlockData; -} - -export interface IOperationData { - op: operation; - transaction: ITransactionData; -} - -export interface IQueenBee { - /** - * Observes block with given id and notifies on its detection - * - * @param blockId block id to observe - * @returns subscribable object that will call `next` only once and completes - */ - block(blockId: string): Subscribable<IBlockData>; - /** - * Observes block with given number and notifies on its detection - * - * @param blockNumber block number to observe - * @returns subscribable object that will call `next` only once and completes - */ - block(blockNumber: number): Subscribable<IBlockData>; - - /** - * Observes transaction with given id and notifies on its detection - * - * @param transactionId transaction id to observe - * @returns subscribable object that will call `next` only once and completes - */ - transaction(transactionId: string): Subscribable<ITransactionData>; - - /** - * Observes given account and notifies when new operation in blockchain related to the given account is detected (no virtual operations for now) - * - * @param name account name to observe - * @returns subscribable object that will call `next` on every operation related to the given account - */ - accountOperations(name: string): Subscribable<IOperationData>; - - /** - * Observes given account and notifies when its manabar is 98 percent loaded - * Note: This function will be called on every new block detected if manabar is full on every new block - * - * @param name account name to observe - * @returns subscribable object that will call `next` each time time its manabar is 98 percent loaded - */ - accountFullManabar(name: string): Subscribable<ApiAccount>; -} +import { IBlockData } from "./chain-observers/classifiers/block-classifier"; +import { IBlockHeaderData } from "./chain-observers/classifiers/block-header-classifier"; +import type { QueenBee } from "./queen"; +import type { Subscribable } from "./types/subscribable"; export interface IBroadcastOptions { /** @@ -73,14 +17,6 @@ export interface IBroadcastOptions { throwAfter?: string | number | Date; } -export interface IWorkerBeeEvents { - "stop": () => void | Promise<void>; - "start": () => void | Promise<void>; - "block": (blockData: IBlockData) => void | Promise<void>; - "transaction": (transactionData: ITransactionData) => void | Promise<void>; - "error": (error: WorkerBeeError) => void | Promise<void>; -} - export interface IWorkerBee { readonly running: boolean; readonly configuration: Readonly<IStartConfiguration>; @@ -103,15 +39,15 @@ export interface IWorkerBee { /** * Request automation stop */ - stop(): Promise<void>; + stop(): void; /** * Deletes the current bot instance and underlying wax and beekepeer objects. * wax chain object is deleted only when its instance was managed by workerbee itself. */ - delete(): Promise<void>; + delete(): void; - readonly observe: IQueenBee; + get observe(): QueenBee; /** * Broadcast given transaction to the remote and returns a subscribable object @@ -124,24 +60,12 @@ export interface IWorkerBee { * @param tx Protobuf transactoin to broadcast * @param options Options for broadcasting */ - broadcast(tx: ApiTransaction | ITransaction, options?: IBroadcastOptions): Promise<Subscribable<ITransactionData>>; + broadcast(tx: ApiTransaction | ITransaction, options?: IBroadcastOptions): Promise<Subscribable<transaction>>; /** * Allows you to iterate over blocks indefinitely */ - [Symbol.asyncIterator](): AsyncIterator<IBlockData>; - - on<U extends keyof IWorkerBeeEvents>( - event: U, listener: IWorkerBeeEvents[U] - ): this; - - once<U extends keyof IWorkerBeeEvents>( - event: U, listener: IWorkerBeeEvents[U] - ): this; - - off<U extends keyof IWorkerBeeEvents>( - event: U, listener: IWorkerBeeEvents[U] - ): this; + [Symbol.asyncIterator](): AsyncIterator<IBlockData & IBlockHeaderData>; } export interface IWorkerBeeConstructor { diff --git a/src/queen.ts b/src/queen.ts index b50b0f5..ddb78ab 100644 --- a/src/queen.ts +++ b/src/queen.ts @@ -1,187 +1,197 @@ -import type { ApiAccount, operation } from "@hiveio/wax"; -import type { Subscribable, Observer, Unsubscribable } from "rxjs"; - -import { WorkerBeeError } from "./errors"; -import type { IBlockData, ITransactionData, IOperationData, IWorkerBee } from "./interfaces"; - -export class QueenBee { +import type { asset, TAccountName } from "@hiveio/wax"; + +import { WorkerBee } from "./bot"; +import { AccountCreatedFilter } from "./chain-observers/filters/account-created-filter"; +import { AccountFullManabarFilter } from "./chain-observers/filters/account-full-manabar-filter"; +import { AccountMetadataChangeFilter } from "./chain-observers/filters/account-metadata-change-filter"; +import { BalanceChangeFilter } from "./chain-observers/filters/balance-change-filter"; +import { BlockNumberFilter } from "./chain-observers/filters/block-filter"; +import { LogicalAndFilter, LogicalOrFilter } from "./chain-observers/filters/composite-filter"; +import { CustomOperationFilter } from "./chain-observers/filters/custom-operation-filter"; +import type { FilterBase } from "./chain-observers/filters/filter-base"; +import { FollowFilter } from "./chain-observers/filters/follow-filter"; +import { ImpactedAccountFilter } from "./chain-observers/filters/impacted-account-filter"; +import { BlockChangedFilter } from "./chain-observers/filters/new-block-filter"; +import { PostFilter } from "./chain-observers/filters/post-filter"; +import { PostMentionFilter } from "./chain-observers/filters/post-mention"; +import { ReblogFilter } from "./chain-observers/filters/reblog-filter"; +import { TransactionIdFilter } from "./chain-observers/filters/transaction-id-filter"; +import { VoteFilter } from "./chain-observers/filters/vote-filter"; +import { WhaleAlertFilter } from "./chain-observers/filters/whale-alert-filter"; +import { AccountProvider } from "./chain-observers/providers/account-provider"; +import { BlockHeaderProvider } from "./chain-observers/providers/block-header-provider"; +import { BlockProvider } from "./chain-observers/providers/block-provider"; +import { MentionedAccountProvider } from "./chain-observers/providers/mention-provider"; +import { ProviderBase } from "./chain-observers/providers/provider-base"; +import { RcAccountProvider } from "./chain-observers/providers/rc-account-provider"; +import { TransactionByIdProvider } from "./chain-observers/providers/transaction-provider"; +import { WhaleAlertProvider } from "./chain-observers/providers/whale-alert-provider"; +import type { Observer, Unsubscribable } from "./types/subscribable"; + +export class QueenBee<TPreviousSubscriberData extends object = {}> { public constructor( - private readonly worker: IWorkerBee + private readonly worker: WorkerBee ) {} - public block(idOrNumber: string | number): Subscribable<IBlockData> { + private providers: ProviderBase[] = []; + private operands: FilterBase[] = []; + private filterContainers: FilterBase[] = []; + + public subscribe(observer: Partial<Observer<TPreviousSubscriberData>>): Unsubscribable { + if (this.operands.length > 0) { + if (this.operands.length === 1) // Optimize by not creating a logical AND filter for only one filter + this.filterContainers.push(this.operands[0]); + else + this.filterContainers.push(new LogicalAndFilter(this.worker, this.operands)); + this.operands = []; + } + + const committedFilters = this.filterContainers; + // Optimize by not creating a logical OR filter for only one filter + const orFilter: FilterBase = committedFilters.length === 1 ? committedFilters[0] : new LogicalOrFilter(this.worker, committedFilters); + + this.worker.mediator.registerListener(observer, orFilter, this.providers); + + this.filterContainers = []; + this.providers = []; + return { - subscribe: (observer: Partial<Observer<IBlockData>>): Unsubscribable => { - const complete = (): void => { - try { - observer.complete?.(); - } catch (error) { - observer.error?.(error); - } finally { - this.worker.off("block", listener); - } - }; - - const listener = (blockData: IBlockData): void => { - const confirm = (): void => { - try { - observer.next?.(blockData); - } catch (error) { - observer.error?.(error); - } finally { - complete(); - } - }; - - if(typeof idOrNumber === "string") { - if(idOrNumber === blockData.block.block_id) - confirm(); - } else if(idOrNumber === blockData.number) - confirm(); - }; - this.worker.on("block", listener); - - return { - unsubscribe: (): void => { - complete(); - } - }; + unsubscribe: () => { + this.worker.mediator.unregisterListener(observer); + // XXX: Maybe force cancel here } }; } - public transaction(txId: string, expireIn?: number): Subscribable<ITransactionData> { - return { - subscribe: (observer: Partial<Observer<ITransactionData>>): Unsubscribable => { - let timeoutId: undefined | NodeJS.Timeout = undefined; - - const complete = (): void => { - try { - observer.complete?.(); - } catch (error) { - observer.error?.(error); - } finally { - this.worker.off("transaction", listener); - clearTimeout(timeoutId); - } - }; - - const listener = (transactionData: ITransactionData): void => { - const confirm = (): void => { - try { - observer.next?.(transactionData); - } catch (error) { - observer.error?.(error); - } finally { - complete(); - } - }; - - if(txId === transactionData.id) - confirm(); - }; - this.worker.on("transaction", listener); - - if(typeof expireIn === "number") - timeoutId = setTimeout(() => { - try { - observer.error?.(new WorkerBeeError("Transaction expired")); - } catch (error) { - observer.error?.(error); - } finally { - complete(); - } - }, expireIn); - - return { - unsubscribe: (): void => { - complete(); - } - }; - } - }; + public get or(): QueenBee<TPreviousSubscriberData> { + if (this.operands.length > 0) { + if (this.operands.length === 1) // Optimize by not creating a logical AND filter for only one filter + this.filterContainers.push(this.operands[0]); + else + this.filterContainers.push(new LogicalAndFilter(this.worker, this.operands)); + this.operands = []; + } + + return this as unknown as QueenBee<TPreviousSubscriberData>; } - public accountOperations(name: string): Subscribable<IOperationData> { - return { - subscribe: (observer: Partial<Observer<IOperationData>>): Unsubscribable => { - const complete = (): void => { - try { - observer.complete?.(); - } catch (error) { - observer.error?.(error); - } finally { - this.worker.off("transaction", listener); - } - }; - - const listener = (transactionData: ITransactionData): void => { - const confirm = (result: operation): void => { - try { - observer.next?.({ op: result, transaction: transactionData }); - } catch (error) { - observer.error?.(error); - } - }; - - const proto = this.worker.chain!.createTransactionFromJson(transactionData.transaction).transaction; - - for(const op of proto.operations) - if(this.worker.chain!.operationGetImpactedAccounts(op).has(name)) - confirm(op); - }; - this.worker.on("transaction", listener); - - return { - unsubscribe: (): void => { - complete(); - } - }; - } - }; + public onBlockNumber(number: number): QueenBee<TPreviousSubscriberData> { + this.operands.push(new BlockNumberFilter(this.worker, number)); + + return this; } - public accountFullManabar(name: string): Subscribable<ApiAccount> { - return { - subscribe: (observer: Partial<Observer<ApiAccount>>): Unsubscribable => { - const listener = async(): Promise<void> => { - try { - const { accounts: [ account ] } = await this.worker.chain!.api.database_api.find_accounts({ - accounts: [ name ] - }); - const dgpo = await this.worker.chain!.api.database_api.get_dynamic_global_properties({}); - - const { percent } = this.worker.chain!.calculateCurrentManabarValue( - Math.round(new Date(`${dgpo.time}Z`).getTime() / 1000), // Convert API time to seconds - account.post_voting_power.amount, - account.voting_manabar.current_mana, - account.voting_manabar.last_update_time - ); - - if(percent >= 98) - observer.next?.(account); - } catch (error) { - observer.error?.(error); - } - }; - this.worker.on("block", listener); - - const complete = (): void => { - try { - observer.complete?.(); - } catch (error) { - observer.error?.(error); - } finally { - this.worker.off("block", listener); - } - }; - - return { - unsubscribe: (): void => { - complete(); - } - }; - } - }; + public onTransactionId< + TIdTx extends string + >(transactionId: TIdTx): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<TransactionByIdProvider<[TIdTx]>["provide"]>>> { + this.operands.push(new TransactionIdFilter(this.worker, transactionId)); + this.providers.push(new TransactionByIdProvider<[TIdTx]>([transactionId])); + + return this; + } + + public onAccountFullManabar(account: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new AccountFullManabarFilter(this.worker, account)); + + return this; + } + + public onAccountBalanceChange(account: TAccountName, includeInternalTransfers: boolean = false): QueenBee<TPreviousSubscriberData> { + this.operands.push(new BalanceChangeFilter(this.worker, account, includeInternalTransfers)); + + return this; + } + + public onAccountMetadataChange(account: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new AccountMetadataChangeFilter(this.worker, account)); + + return this; + } + + public onVoteCreated(voter: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new VoteFilter(this.worker, voter)); + + return this; + } + + public onPostCreated(author: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new PostFilter(this.worker, author)); + + return this; + } + + public onNewAccount(): QueenBee<TPreviousSubscriberData> { + this.operands.push(new AccountCreatedFilter(this.worker)); + + return this; + } + + public onCustomOperation(id: string | number): QueenBee<TPreviousSubscriberData> { + this.operands.push(new CustomOperationFilter(this.worker, id)); + + return this; + } + + public onReblog(reblogger: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new ReblogFilter(this.worker, reblogger)); + + return this; + } + + public onFollow(follower: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new FollowFilter(this.worker, follower)); + + return this; + } + + public onMention< + TMention extends TAccountName + >(mentionedAccount: TMention): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<MentionedAccountProvider<[TMention]>["provide"]>>> { + this.operands.push(new PostMentionFilter(this.worker, mentionedAccount)); + this.providers.push(new MentionedAccountProvider<[TMention]>([mentionedAccount])); + + return this; + } + + public onImpactedAccount(account: TAccountName): QueenBee<TPreviousSubscriberData> { + this.operands.push(new ImpactedAccountFilter(this.worker, account)); + + return this; + } + + public onBlock(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<BlockHeaderProvider["provide"]>>> { + this.operands.push(new BlockChangedFilter(this.worker)); + this.providers.push(new BlockHeaderProvider()); + + return this; + } + + public provideAccounts< + TAccounts extends Array<TAccountName> + >(...accounts: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<AccountProvider<TAccounts>["provide"]>>> { + this.providers.push(new AccountProvider<TAccounts>(accounts)); + + return this; + } + + public provideRcAccounts< + TAccounts extends Array<TAccountName> + >(...accounts: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<RcAccountProvider<TAccounts>["provide"]>>> { + this.providers.push(new RcAccountProvider<TAccounts>(accounts)); + + return this; + } + + public onWhaleAlert(asset: asset): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<WhaleAlertProvider["provide"]>>> { + this.operands.push(new WhaleAlertFilter(this.worker, asset)); + this.providers.push(new WhaleAlertProvider(asset)); + + return this; + } + + public provideBlockData(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<BlockProvider["provide"]>>> { + this.providers.push(new BlockProvider()); + + return this; } } diff --git a/src/types/iterator.ts b/src/types/iterator.ts new file mode 100644 index 0000000..7b0682f --- /dev/null +++ b/src/types/iterator.ts @@ -0,0 +1,30 @@ +/** + * Container that can be iterated over the elements of the collection. + * + * @example + * ```typescript + * for (const { transaction } of data.whaleOperations) + * console.log(`Got transaction: #${transaction.id}`); + * ``` + * + * @example + * ```typescript + * data.whaleOperations.forEach(({ transaction }) => console.log(`Got transaction: #${transaction.id}`))); + * ``` + */ +export class WorkerBeeIterable<T> implements Iterable<T> { + public constructor (private readonly iterable: Iterable<T>) {} + + public [Symbol.iterator](): Iterator<T> { + return this.iterable[Symbol.iterator](); + } + + public values(): Iterable<T> { + return this; + } + + public forEach(callbackfn: (value: T) => void): void { + for(const value of this.iterable) + callbackfn(value); + } +} diff --git a/src/types/subscribable.ts b/src/types/subscribable.ts new file mode 100644 index 0000000..af24292 --- /dev/null +++ b/src/types/subscribable.ts @@ -0,0 +1,35 @@ +/** + * An object interface that defines a set of callback functions a user can use to get notified + */ +export interface Observer<T> { + /** + * A callback function that gets called by the producer during the subscription when + * the producer "has" the `value`. It won't be called if `error` or `complete` callback + * functions have been called, nor after the consumer has unsubscribed. + */ + next: (value: T) => void; + /** + * A callback function that gets called by the producer if and when it encountered a + * problem of any kind. The errored value will be provided through the `err` parameter. + * This callback can't be called more than one time, it can't be called if the + * `complete` callback function have been called previously, nor it can't be called if + * the consumer has unsubscribed. + */ + error: (err: any) => void; + /** + * A callback function that gets called by the producer if and when it has no more + * values to provide (by calling `next` callback function). This means that no error + * has happened. This callback can't be called more than one time, it can't be called + * if the `error` callback function have been called previously, nor it can't be called + * if the consumer has unsubscribed. + */ + complete: () => void; +} + +export interface Unsubscribable { + unsubscribe(): void; +} + +export interface Subscribable<T> { + subscribe(observer: Partial<Observer<T>>): Unsubscribable; +} diff --git a/src/utils/assets.ts b/src/utils/assets.ts new file mode 100644 index 0000000..b2fa748 --- /dev/null +++ b/src/utils/assets.ts @@ -0,0 +1,5 @@ +import type { asset } from "@hiveio/wax"; + +export const isGreaterThan = (base: asset, other: asset): boolean => { + return other.amount > base.amount && other.nai === base.nai; +}; diff --git a/src/utils/mention.ts b/src/utils/mention.ts new file mode 100644 index 0000000..d3374e0 --- /dev/null +++ b/src/utils/mention.ts @@ -0,0 +1,22 @@ +import type { TAccountName } from "@hiveio/wax"; + +export const mentionedAccount = (accountName: TAccountName, message: string): boolean => { + let lastIndexOf = -1; + while((lastIndexOf = message.indexOf(`@${accountName}`, lastIndexOf + 1)) !== -1) { + const nextChar = message[lastIndexOf + accountName.length + 1]; + switch(nextChar) { + case "a": case "b": case "c": case "d": case "e": case "f": case "g": case "h": + case "i": case "j": case "k": case "l": case "m": case "n": case "o": case "p": + case "q": case "r": case "s": case "t": case "u": case "v": case "w": case "x": + case "y": case "z": + case "0": case "1": case "2": case "3": case "4": case "5": case "6": case "7": + case "8": case "9": + case "-": case ".": + continue; + default: + return true; // - next char is not a possible username character, so we have a mention + } + } + + return false; +}; -- GitLab