Skip to content
Snippets Groups Projects
Verified Commit 60b086b9 authored by Mateusz Tyszczak's avatar Mateusz Tyszczak :scroll:
Browse files

Create only one provider per type when called multiple on methods in queen bee

parent 40a8b4a9
No related branches found
No related tags found
1 merge request!25Add missing filters and providers
Pipeline #114180 failed
......@@ -20,7 +20,7 @@ export class ObserverMediator {
this.factory = new factory(worker);
}
private filters = new Map<Partial<Observer<any>>, { filter: FilterBase; providers: ProviderBase[]; }>();
private filters = new Map<Partial<Observer<any>>, { filter: FilterBase; providers: Iterable<ProviderBase>; }>();
public notify() {
const context = this.factory.collect();
......@@ -46,7 +46,7 @@ export class ObserverMediator {
}).catch(error => listener.error?.(error));
}
public registerListener(listener: Partial<Observer<any>>, filter: FilterBase, providers: ProviderBase[]) {
public registerListener(listener: Partial<Observer<any>>, filter: FilterBase, providers: Iterable<ProviderBase>) {
this.filters.set(listener, { filter, providers });
for(const classifier of filter.usedContexts())
......
......@@ -13,11 +13,15 @@ export interface IAccountProviderData<TAccounts extends Array<TAccountName>> {
accounts: TAccountProvided<TAccounts>;
};
export class AccountProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase {
public constructor(
private readonly accounts: TAccounts
) {
super();
export interface IAccountProviderOptions {
accounts: string[];
}
export class AccountProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase<IAccountProviderOptions> {
public readonly accounts: string[] = [];
public pushOptions(options: IAccountProviderOptions): void {
this.accounts.push(...options.accounts);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......
......@@ -21,11 +21,15 @@ export interface IAlarmAccountsData<TAccounts extends Array<TAccountName>> {
alarmsPerAccount: TAlarmAccounts<TAccounts>;
};
export class AlarmProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase {
public constructor(
private readonly accounts: TAccounts
) {
super();
export interface IAlarmProviderOptions {
accounts: string[];
}
export class AlarmProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase<IAlarmProviderOptions> {
public readonly accounts: string[] = [];
public pushOptions(options: IAlarmProviderOptions): void {
this.accounts.push(...options.accounts);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......
......@@ -13,11 +13,15 @@ export interface IMentionedAccountProviderData<TMentions extends Array<TAccountN
mentioned: TMentionedAccountProvided<TMentions>;
};
export class MentionedAccountProvider<TMentions extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase {
public constructor(
private readonly accounts: TMentions
) {
super();
export interface IMentionedAccountProviderOptions {
accounts: string[];
}
export class MentionedAccountProvider<TMentions extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase<IMentionedAccountProviderOptions> {
public readonly accounts: string[] = [];
public pushOptions(options: IMentionedAccountProviderOptions): void {
this.accounts.push(...options.accounts);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......
import { TRegisterEvaluationContext } from "../classifiers/collector-classifier-base";
import { DataEvaluationContext } from "../factories/data-evaluation-context";
export abstract class ProviderBase {
export abstract class ProviderBase<IOptions extends object = {}> {
public abstract usedContexts(): Array<TRegisterEvaluationContext>;
public pushOptions?(options: IOptions): void;
public abstract provide(data: DataEvaluationContext): Promise<any>;
}
......@@ -12,11 +12,15 @@ export interface IRcAccountProviderData<TAccounts extends Array<TAccountName>> {
rcAccounts: TRcAccountProvided<TAccounts>;
};
export class RcAccountProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase {
public constructor(
private readonly rcAccounts: TAccounts
) {
super();
export interface IRcAccountsProviderOptions {
accounts: string[];
}
export class RcAccountProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase<IRcAccountsProviderOptions> {
public readonly rcAccounts: string[] = [];
public pushOptions(options: IRcAccountsProviderOptions): void {
this.rcAccounts.push(...options.accounts);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......
......@@ -12,11 +12,16 @@ export interface ITransactionProviderData<TIdOfTx extends Array<string>> {
transactions: Partial<TTransactionProvider<TIdOfTx>>;
};
export class TransactionByIdProvider<TIdOfTx extends Array<string> = Array<string>> extends ProviderBase {
public constructor(
private readonly transactionIds: TIdOfTx
) {
super();
export interface ITransactionByIdProviderOptions {
transactionIds: string[];
}
export class TransactionByIdProvider<TIdOfTx extends Array<string> = Array<string>> extends ProviderBase<ITransactionByIdProviderOptions> {
public readonly transactionIds = new Set<string>();
public pushOptions(options: ITransactionByIdProviderOptions): void {
for(const id of options.transactionIds)
this.transactionIds.add(id);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......
......@@ -16,11 +16,16 @@ export interface IWhaleAlertProviderData {
whaleOperations: WorkerBeeIterable<IOperationTransactionPair<IWhaleAlertMetadata>>;
};
export class WhaleAlertProvider extends ProviderBase {
public constructor(
private readonly asset: asset
) {
super();
export interface IWhaleAlertProviderOptions {
assets: asset[];
}
export class WhaleAlertProvider extends ProviderBase<IWhaleAlertProviderOptions> {
public readonly assets = new Map<string, asset>();
public pushOptions(options: IWhaleAlertProviderOptions): void {
for(const asset of options.assets)
this.assets.set(asset.nai, asset);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......@@ -39,62 +44,64 @@ export class WhaleAlertProvider extends ProviderBase {
const whaleOperations: IOperationTransactionPair<IWhaleAlertMetadata>[] = [];
if(transfer)
for(const op of transfer)
if(isGreaterThan(this.asset, op.operation.amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.amount!
},
transaction: op.transaction
});
for(const asset of this.assets.values()) {
if(transfer)
for(const op of transfer)
if(isGreaterThan(asset, op.operation.amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.amount!
},
transaction: op.transaction
});
if(fromSavings)
for(const op of fromSavings)
if(isGreaterThan(this.asset, op.operation.amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.amount!
},
transaction: op.transaction
});
if(fromSavings)
for(const op of fromSavings)
if(isGreaterThan(asset, op.operation.amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.amount!
},
transaction: op.transaction
});
if(escrow)
for(const op of escrow)
if(isGreaterThan(this.asset, op.operation.hbd_amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.hbd_amount!
},
transaction: op.transaction
});
else if(isGreaterThan(this.asset, op.operation.hive_amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.hive_amount!
},
transaction: op.transaction
});
if(escrow)
for(const op of escrow)
if(isGreaterThan(asset, op.operation.hbd_amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.hbd_amount!
},
transaction: op.transaction
});
else if(isGreaterThan(asset, op.operation.hive_amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.hive_amount!
},
transaction: op.transaction
});
if(recurrent)
for(const op of recurrent)
if(isGreaterThan(this.asset, op.operation.amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.amount!
},
transaction: op.transaction
});
if(recurrent)
for(const op of recurrent)
if(isGreaterThan(asset, op.operation.amount!))
whaleOperations.push({
operation: {
from: op.operation.from_account,
to: op.operation.to_account,
amount: op.operation.amount!
},
transaction: op.transaction
});
}
return {
whaleOperations: new WorkerBeeIterable(whaleOperations)
......
......@@ -12,11 +12,15 @@ export interface IWitnessProviderData<TAccounts extends Array<TAccountName>> {
witnesses: TWitnessProvider<TAccounts>;
};
export class WitnessProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase {
public constructor(
private readonly witnesses: TAccounts
) {
super();
export interface IWitnessProviderOptions {
accounts: string[];
}
export class WitnessProvider<TAccounts extends Array<TAccountName> = Array<TAccountName>> extends ProviderBase<IWitnessProviderOptions> {
public readonly witnesses: string[] = [];
public pushOptions(options: IWitnessProviderOptions): void {
this.witnesses.push(...options.accounts);
}
public usedContexts(): Array<TRegisterEvaluationContext> {
......
......@@ -44,7 +44,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
private readonly worker: WorkerBee
) {}
private providers: ProviderBase[] = [];
private providers = new Map<new () => ProviderBase, ProviderBase>();
private operands: FilterBase[] = [];
private filterContainers: FilterBase[] = [];
......@@ -61,10 +61,10 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
// Optimize by not creating a logical OR filter for only one filter
const orFilter: FilterBase = committedFilters.length === 1 ? committedFilters[0] : new LogicalOrFilter(this.worker, committedFilters);
this.worker.mediator.registerListener(observer, orFilter, this.providers);
this.worker.mediator.registerListener(observer, orFilter, this.providers.values());
this.filterContainers = [];
this.providers = [];
this.providers = new Map();
return {
unsubscribe: () => {
......@@ -86,6 +86,18 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
return this as unknown as QueenBee<TPreviousSubscriberData>;
}
private pushProvider<T extends new () => ProviderBase>(
provider: T,
options: InstanceType<T>["pushOptions"] extends undefined ? {} : Parameters<Exclude<InstanceType<T>["pushOptions"], undefined>>[0] = {}
) {
let instance = this.providers.get(provider);
if (!instance)
this.providers.set(provider, instance = new provider());
instance.pushOptions?.(options);
}
public onBlockNumber(number: number): QueenBee<TPreviousSubscriberData> {
this.operands.push(new BlockNumberFilter(this.worker, number));
......@@ -96,7 +108,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
TIdTx extends string
>(transactionId: TIdTx): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<TransactionByIdProvider<[TIdTx]>["provide"]>>> {
this.operands.push(new TransactionIdFilter(this.worker, transactionId));
this.providers.push(new TransactionByIdProvider<[TIdTx]>([transactionId]));
this.pushProvider(TransactionByIdProvider, { transactionIds: [transactionId] });
return this;
}
......@@ -156,7 +168,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
}
public provideFeedPriceData(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<FeedPriceProvider["provide"]>>> {
this.providers.push(new FeedPriceProvider());
this.pushProvider(FeedPriceProvider);
return this;
}
......@@ -177,7 +189,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
TMention extends TAccountName
>(mentionedAccount: TMention): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<MentionedAccountProvider<[TMention]>["provide"]>>> {
this.operands.push(new PostMentionFilter(this.worker, mentionedAccount));
this.providers.push(new MentionedAccountProvider<[TMention]>([mentionedAccount]));
this.pushProvider(MentionedAccountProvider, { accounts: [mentionedAccount] });
return this;
}
......@@ -186,7 +198,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
TAccount extends TAccountName
>(watchAccount: TAccount): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<AlarmProvider<[TAccount]>["provide"]>>> {
this.operands.push(new AlarmFilter(this.worker, watchAccount));
this.providers.push(new AlarmProvider<[TAccount]>([watchAccount]));
this.pushProvider(AlarmProvider, { accounts: [watchAccount] });
return this;
}
......@@ -205,14 +217,14 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
public onInternalMarketOperation(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<InternalMarketProvider["provide"]>>> {
this.operands.push(new InternalMarketFilter(this.worker));
this.providers.push(new InternalMarketProvider());
this.pushProvider(InternalMarketProvider);
return this;
}
public onBlock(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<BlockHeaderProvider["provide"]>>> {
this.operands.push(new BlockChangedFilter(this.worker));
this.providers.push(new BlockHeaderProvider());
this.pushProvider(BlockHeaderProvider);
return this;
}
......@@ -220,7 +232,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
public provideAccounts<
TAccounts extends Array<TAccountName>
>(...accounts: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<AccountProvider<TAccounts>["provide"]>>> {
this.providers.push(new AccountProvider<TAccounts>(accounts));
this.pushProvider(AccountProvider, { accounts });
return this;
}
......@@ -228,7 +240,7 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
public provideWitnesses<
TAccounts extends Array<TAccountName>
>(...witnesses: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<WitnessProvider<TAccounts>["provide"]>>> {
this.providers.push(new WitnessProvider<TAccounts>(witnesses));
this.pushProvider(WitnessProvider, { accounts: witnesses });
return this;
}
......@@ -236,27 +248,27 @@ export class QueenBee<TPreviousSubscriberData extends object = {}> {
public provideRcAccounts<
TAccounts extends Array<TAccountName>
>(...accounts: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<RcAccountProvider<TAccounts>["provide"]>>> {
this.providers.push(new RcAccountProvider<TAccounts>(accounts));
this.pushProvider(RcAccountProvider, { accounts });
return this;
}
public onWhaleAlert(asset: asset): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<WhaleAlertProvider["provide"]>>> {
this.operands.push(new WhaleAlertFilter(this.worker, asset));
this.providers.push(new WhaleAlertProvider(asset));
this.pushProvider(WhaleAlertProvider, { assets: [asset] });
return this;
}
public onExchangeTransfer(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<ExchangeTransferProvider["provide"]>>> {
this.operands.push(new ExchangeTransferFilter(this.worker));
this.providers.push(new ExchangeTransferProvider());
this.pushProvider(ExchangeTransferProvider);
return this;
}
public provideBlockData(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<BlockProvider["provide"]>>> {
this.providers.push(new BlockProvider());
this.pushProvider(BlockProvider);
return this;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment