Skip to content
Snippets Groups Projects
Verified Commit 2eefbba2 authored by Mateusz Tyszczak's avatar Mateusz Tyszczak :scroll:
Browse files

WIP: Further work on provide past operations

parent d4225056
No related branches found
No related tags found
No related merge requests found
Pipeline #119447 failed
......@@ -329,6 +329,37 @@ test.describe("WorkerBee Bot events test", () => {
expect(result).toBeLessThanOrEqual(3);
});
test("Should be able to parse blocks in the past", async({ workerbeeTest }) => {
const result = await workerbeeTest(async({ WorkerBee }) => {
const bot = new WorkerBee();
await bot.start();
let calls = 0;
await new Promise<void>(resolve => {
bot.providePastOperations(500017, 500020).onBlock().provideBlockHeaderData().subscribe({
next(data) {
console.log(`Got block #${data.block.number}`);
++calls;
if (calls === 4)
resolve();
},
error(err) {
console.error(err);
}
});
})
bot.stop();
bot.delete();
return calls;
});
expect(result).toBeLessThanOrEqual(4);
});
test.afterAll(async() => {
await browser.close();
});
......
......@@ -75,7 +75,7 @@ export class WorkerBee implements IWorkerBee {
public providePastOperations(fromBlock: number, toBlock: number): PastQueen {
if(typeof this.chain === "undefined")
throw new WorkerBeeError("Chain is not initialized");
throw new WorkerBeeError("Chain is not initialized. Either provide explicit chain or run start()");
return new PastQueen(this, fromBlock, toBlock);
}
......
import type { WorkerBee } from "../../bot";
import { IEvaluationContextClass, TRegisterEvaluationContext } from "../classifiers/collector-classifier-base";
import { CollectorBase } from "../collectors/collector-base";
import type { ObserverMediator } from "../observer-mediator";
import { DataEvaluationContext } from "./data-evaluation-context";
export class FactoryBase {
......@@ -10,6 +11,9 @@ export class FactoryBase {
protected readonly worker: WorkerBee
) {}
public preNotify(_mediator: ObserverMediator): void {}
public postNotify(_mediator: ObserverMediator, _context: DataEvaluationContext): void {}
public pushClassifier(classifier: TRegisterEvaluationContext): void {
const classifierClass = "class" in classifier ? classifier.class : classifier;
......
import { WorkerBee } from "../../../bot";
import { DynamicGlobalPropertiesClassifier } from "../../classifiers";
import { ObserverMediator } from "../../observer-mediator";
import { DataEvaluationContext } from "../data-evaluation-context";
import { FactoryBase } from "../factory-base";
import { HistoryDataFactoryData } from "./factory-data";
......@@ -12,4 +15,13 @@ export class HistoryDataFactory extends FactoryBase {
this.collectors = new Map(HistoryDataFactoryData(worker, fromBlock, toBlock));
}
public postNotify(mediator: ObserverMediator, context: DataEvaluationContext): void {
context.get(DynamicGlobalPropertiesClassifier).then(dgp => {
if (dgp.headBlockNumber >= this.toBlock)
mediator.unregisterAllListeners();
else
mediator.notify();
});
}
}
import { type Observer } from "../types/subscribable";
import { DataEvaluationContext } from "./factories/data-evaluation-context";
import { FactoryBase } from "./factories/factory-base";
import type { FactoryBase } from "./factories/factory-base";
import { FilterBase } from "./filters/filter-base";
import { ProviderBase } from "./providers/provider-base";
......@@ -16,6 +16,8 @@ export class ObserverMediator {
public notify() {
const context = this.factory.collect();
this.factory.preNotify(this);
// Start providing parsed, cached data to filters
for(const [listener, { filter, providers }] of this.filters.entries())
filter.match(context).then(async(matched) => {
......@@ -35,6 +37,8 @@ export class ObserverMediator {
listener.next?.(providedData);
}).catch(error => listener.error?.(error));
this.factory.postNotify(this, context);
}
public registerListener(listener: Partial<Observer<any>>, filter: FilterBase, providers: Iterable<ProviderBase>) {
......@@ -61,6 +65,8 @@ export class ObserverMediator {
this.factory.popClassifier(usedContext);
this.filters.delete(listener);
listener.complete?.();
}
public unregisterAllListeners() {
......
......@@ -11,4 +11,8 @@ export class PastQueen<TPreviousSubscriberData extends object = {}> extends Quee
) {
super(worker, new ObserverMediator(new HistoryDataFactory(worker, fromBlock, toBlock)));
}
protected onSubscribe(): void {
this.mediator.notify();
}
}
......@@ -51,6 +51,11 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
protected operands: FilterBase[] = [];
protected filterContainers: FilterBase[] = [];
/**
* Internal function to be called when the subscription is created.
*/
protected onSubscribe(): void {}
public subscribe(observer: Partial<Observer<TPreviousSubscriberData>>): Unsubscribable {
if (this.operands.length > 0) {
if (this.operands.length === 1) // Optimize by not creating a logical AND filter for only one filter
......@@ -66,6 +71,8 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
this.mediator.registerListener(observer, orFilter, Array.from(this.providers.values()));
this.onSubscribe();
this.filterContainers = [];
this.providers = new Map();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment