Skip to content
Snippets Groups Projects
queen.ts 11.02 KiB
import type { asset, TAccountName } from "@hiveio/wax";

import { WorkerBee } from "./bot";
import { AccountCreatedFilter } from "./chain-observers/filters/account-created-filter";
import { AccountFullManabarFilter } from "./chain-observers/filters/account-full-manabar-filter";
import { AccountMetadataChangeFilter } from "./chain-observers/filters/account-metadata-change-filter";
import { AlarmFilter } from "./chain-observers/filters/alarm-filter";
import { BalanceChangeFilter } from "./chain-observers/filters/balance-change-filter";
import { BlockNumberFilter } from "./chain-observers/filters/block-filter";
import { LogicalAndFilter, LogicalOrFilter } from "./chain-observers/filters/composite-filter";
import { CustomOperationFilter } from "./chain-observers/filters/custom-operation-filter";
import { ExchangeTransferFilter } from "./chain-observers/filters/exchange-transfer-filter";
import { FeedPriceChangeFilter } from "./chain-observers/filters/feed-price-change-percent-filter";
import { FeedPriceNoChangeFilter } from "./chain-observers/filters/feed-price-no-change-filter";
import type { FilterBase } from "./chain-observers/filters/filter-base";
import { FollowFilter } from "./chain-observers/filters/follow-filter";
import { ImpactedAccountFilter } from "./chain-observers/filters/impacted-account-filter";
import { InternalMarketFilter } from "./chain-observers/filters/internal-market-filter";
import { BlockChangedFilter } from "./chain-observers/filters/new-block-filter";
import { PostFilter } from "./chain-observers/filters/post-filter";
import { PostMentionFilter } from "./chain-observers/filters/post-mention";
import { ReblogFilter } from "./chain-observers/filters/reblog-filter";
import { TransactionIdFilter } from "./chain-observers/filters/transaction-id-filter";
import { VoteFilter } from "./chain-observers/filters/vote-filter";
import { WhaleAlertFilter } from "./chain-observers/filters/whale-alert-filter";
import { WitnessMissedBlocksFilter } from "./chain-observers/filters/witness-miss-block-filter";
import { AccountProvider } from "./chain-observers/providers/account-provider";
import { AlarmProvider } from "./chain-observers/providers/alarm-provider";
import { BlockHeaderProvider } from "./chain-observers/providers/block-header-provider";
import { BlockProvider } from "./chain-observers/providers/block-provider";
import { ExchangeTransferProvider } from "./chain-observers/providers/exchange-transfer-provider";
import { FeedPriceProvider } from "./chain-observers/providers/feed-price-provider";
import { InternalMarketProvider } from "./chain-observers/providers/internal-market-provider";
import { MentionedAccountProvider } from "./chain-observers/providers/mention-provider";
import { ProviderBase } from "./chain-observers/providers/provider-base";
import { RcAccountProvider } from "./chain-observers/providers/rc-account-provider";
import { TransactionByIdProvider } from "./chain-observers/providers/transaction-provider";
import { WhaleAlertProvider } from "./chain-observers/providers/whale-alert-provider";
import { WitnessProvider } from "./chain-observers/providers/witness-provider";
import type { Observer, Unsubscribable } from "./types/subscribable";

export class QueenBee<TPreviousSubscriberData extends object = {}> {
  public constructor(
    private readonly worker: WorkerBee
  ) {}

  private providers = new Map<new () => ProviderBase, ProviderBase>();
  private operands: FilterBase[] = [];
  private filterContainers: FilterBase[] = [];

  public subscribe(observer: Partial<Observer<TPreviousSubscriberData>>): Unsubscribable {
    if (this.operands.length > 0) {
      if (this.operands.length === 1) // Optimize by not creating a logical AND filter for only one filter
        this.filterContainers.push(this.operands[0]);
      else
        this.filterContainers.push(new LogicalAndFilter(this.worker, this.operands));
      this.operands = [];
    }

    const committedFilters = this.filterContainers;
    // Optimize by not creating a logical OR filter for only one filter
    const orFilter: FilterBase = committedFilters.length === 1 ? committedFilters[0] : new LogicalOrFilter(this.worker, committedFilters);

    this.worker.mediator.registerListener(observer, orFilter, this.providers.values());

    this.filterContainers = [];
    this.providers = new Map();

    return {
      unsubscribe: () => {
        this.worker.mediator.unregisterListener(observer);
        // XXX: Maybe force cancel here
      }
    };
  }

  public get or(): QueenBee<TPreviousSubscriberData> {
    if (this.operands.length > 0) {
      if (this.operands.length === 1) // Optimize by not creating a logical AND filter for only one filter
        this.filterContainers.push(this.operands[0]);
      else
        this.filterContainers.push(new LogicalAndFilter(this.worker, this.operands));
      this.operands = [];
    }

    return this as unknown as QueenBee<TPreviousSubscriberData>;
  }

  private pushProvider<T extends new () => ProviderBase>(
    provider: T,
    options: InstanceType<T>["pushOptions"] extends undefined ? {} : Parameters<Exclude<InstanceType<T>["pushOptions"], undefined>>[0] = {}
  ) {
    let instance = this.providers.get(provider);

    if (!instance)
      this.providers.set(provider, instance = new provider());

    instance.pushOptions?.(options);
  }

  public onBlockNumber(number: number): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new BlockNumberFilter(this.worker, number));

    return this;
  }

  public onTransactionId<
    TIdTx extends string
  >(transactionId: TIdTx): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<TransactionByIdProvider<[TIdTx]>["provide"]>>> {
    this.operands.push(new TransactionIdFilter(this.worker, transactionId));
    this.pushProvider(TransactionByIdProvider, { transactionIds: [transactionId] });

    return this;
  }

  public onAccountFullManabar(account: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new AccountFullManabarFilter(this.worker, account));

    return this;
  }

  public onAccountBalanceChange(account: TAccountName, includeInternalTransfers: boolean = false): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new BalanceChangeFilter(this.worker, account, includeInternalTransfers));

    return this;
  }

  public onAccountMetadataChange(account: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new AccountMetadataChangeFilter(this.worker, account));

    return this;
  }

  public onVoteCreated(voter: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new VoteFilter(this.worker, voter));

    return this;
  }

  public onPostCreated(author: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new PostFilter(this.worker, author));

    return this;
  }

  public onNewAccount(): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new AccountCreatedFilter(this.worker));

    return this;
  }

  public onCustomOperation(id: string | number): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new CustomOperationFilter(this.worker, id));

    return this;
  }

  public onFeedPriceChange(percent: number): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new FeedPriceChangeFilter(this.worker, percent));

    return this;
  }

  public onFeedPriceNoChange(lastHoursCount: number = 24): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new FeedPriceNoChangeFilter(this.worker, lastHoursCount));

    return this;
  }

  public provideFeedPriceData(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<FeedPriceProvider["provide"]>>> {
    this.pushProvider(FeedPriceProvider);

    return this;
  }

  public onReblog(reblogger: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new ReblogFilter(this.worker, reblogger));

    return this;
  }

  public onFollow(follower: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new FollowFilter(this.worker, follower));

    return this;
  }

  public onMention<
    TMention extends TAccountName
  >(mentionedAccount: TMention): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<MentionedAccountProvider<[TMention]>["provide"]>>> {
    this.operands.push(new PostMentionFilter(this.worker, mentionedAccount));
    this.pushProvider(MentionedAccountProvider, { accounts: [mentionedAccount] });

    return this;
  }

  public onAlarm<
    TAccount extends TAccountName
  >(watchAccount: TAccount): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<AlarmProvider<[TAccount]>["provide"]>>> {
    this.operands.push(new AlarmFilter(this.worker, watchAccount));
    this.pushProvider(AlarmProvider, { accounts: [watchAccount] });

    return this;
  }

  public onImpactedAccount(account: TAccountName): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new ImpactedAccountFilter(this.worker, account));

    return this;
  }

  public onWitnessMissedBlocks(witness: TAccountName, missedBlocksMinCount: number): QueenBee<TPreviousSubscriberData> {
    this.operands.push(new WitnessMissedBlocksFilter(this.worker, witness, missedBlocksMinCount));

    return this;
  }

  public onInternalMarketOperation(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<InternalMarketProvider["provide"]>>> {
    this.operands.push(new InternalMarketFilter(this.worker));
    this.pushProvider(InternalMarketProvider);

    return this;
  }

  public onBlock(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<BlockHeaderProvider["provide"]>>> {
    this.operands.push(new BlockChangedFilter(this.worker));
    this.pushProvider(BlockHeaderProvider);

    return this;
  }

  public provideAccounts<
    TAccounts extends Array<TAccountName>
  >(...accounts: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<AccountProvider<TAccounts>["provide"]>>> {
    this.pushProvider(AccountProvider, { accounts });

    return this;
  }

  public provideWitnesses<
    TAccounts extends Array<TAccountName>
  >(...witnesses: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<WitnessProvider<TAccounts>["provide"]>>> {
    this.pushProvider(WitnessProvider, { accounts: witnesses });

    return this;
  }

  public provideRcAccounts<
    TAccounts extends Array<TAccountName>
  >(...accounts: TAccounts): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<RcAccountProvider<TAccounts>["provide"]>>> {
    this.pushProvider(RcAccountProvider, { accounts });

    return this;
  }

  public onWhaleAlert(asset: asset): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<WhaleAlertProvider["provide"]>>> {
    this.operands.push(new WhaleAlertFilter(this.worker, asset));
    this.pushProvider(WhaleAlertProvider, { assets: [asset] });

    return this;
  }

  public onExchangeTransfer(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<ExchangeTransferProvider["provide"]>>> {
    this.operands.push(new ExchangeTransferFilter(this.worker));
    this.pushProvider(ExchangeTransferProvider);

    return this;
  }

  public provideBlockData(): QueenBee<TPreviousSubscriberData & Awaited<ReturnType<BlockProvider["provide"]>>> {
    this.pushProvider(BlockProvider);

    return this;
  }
}