Skip to content
Snippets Groups Projects

Implement `Power up` part of hive power management

Merged Jakub Ziebinski requested to merge jziebinski/prepare-hp-management into develop
Compare and Show latest version
202 files
+ 2570
4902
Compare changes
  • Side-by-side
  • Inline
Files
202
@@ -12,7 +12,8 @@ if TYPE_CHECKING:
from clive.__private.core.node import Node
from clive.models.aliased import DynamicGlobalProperties, SchemasAccount
from schemas.apis.database_api import FindAccounts, ListWithdrawVestingRoutes
from schemas.apis.database_api import FindAccounts, FindVestingDelegations, ListWithdrawVestingRoutes
from schemas.apis.database_api.fundaments_of_reponses import VestingDelegationsFundament as VestingDelegation
from schemas.apis.database_api.fundaments_of_reponses import WithdrawVestingRoutesFundament as WithdrawRoute
@@ -21,6 +22,7 @@ class HarvestedDataRaw:
dgpo: DynamicGlobalProperties | None = None
core_account: FindAccounts | None = None
withdraw_routes: ListWithdrawVestingRoutes | None = None
delegations: FindVestingDelegations | None = None
@dataclass
@@ -28,6 +30,7 @@ class SanitizedData:
dgpo: DynamicGlobalProperties
core_account: SchemasAccount
withdraw_routes: list[WithdrawRoute]
delegations: list[VestingDelegation[Asset.Vests]]
@dataclass
@@ -46,9 +49,11 @@ class HivePowerData:
delegated_balance: SharesBalance
next_vesting_withdrawal: datetime
withdraw_routes: list[WithdrawRoute]
delegations: list[VestingDelegation[Asset.Vests]]
to_withdraw: SharesBalance
next_power_down: Asset.Hive
next_power_down: SharesBalance
current_hp_apr: str
dgpo: DynamicGlobalProperties
@dataclass(kw_only=True)
@@ -64,6 +69,7 @@ class HivePowerDataRetrieval(CommandDataRetrieval[HarvestedDataRaw, SanitizedDat
await node.api.database_api.list_withdraw_vesting_routes(
start=(self.account_name, ""), limit=10, order="by_withdraw_route"
),
await node.api.database_api.find_vesting_delegations(account=self.account_name),
)
async def _sanitize_data(self, data: HarvestedDataRaw) -> SanitizedData:
@@ -71,13 +77,14 @@ class HivePowerDataRetrieval(CommandDataRetrieval[HarvestedDataRaw, SanitizedDat
dgpo=self._assert_gdpo(data.dgpo),
core_account=self._assert_core_account(data.core_account),
withdraw_routes=self._assert_withdraw_routes(data.withdraw_routes),
delegations=self._assert_delegations(data.delegations),
)
async def _process_data(self, data: SanitizedData) -> HivePowerData:
owned_shares = data.core_account.vesting_shares
received_shares = data.core_account.received_vesting_shares
delegated_shares = data.core_account.delegated_vesting_shares
total_shares = owned_shares + received_shares - delegated_shares
total_shares = owned_shares + received_shares - delegated_shares - data.core_account.vesting_withdraw_rate
return HivePowerData(
owned_balance=self._create_balance_representation(data.dgpo, owned_shares),
@@ -86,11 +93,13 @@ class HivePowerDataRetrieval(CommandDataRetrieval[HarvestedDataRaw, SanitizedDat
delegated_balance=self._create_balance_representation(data.dgpo, delegated_shares),
next_vesting_withdrawal=data.core_account.next_vesting_withdrawal,
withdraw_routes=[route for route in data.withdraw_routes if route.from_account == self.account_name],
delegations=data.delegations,
to_withdraw=self._create_balance_representation(
data.dgpo, Asset.vests(data.core_account.to_withdraw / 10**data.dgpo.total_vesting_shares.precision)
),
next_power_down=vests_to_hive(data.core_account.vesting_withdraw_rate, data.dgpo),
next_power_down=self._create_balance_representation(data.dgpo, data.core_account.vesting_withdraw_rate),
current_hp_apr=self._calculate_current_hp_apr(data.dgpo),
dgpo=data.dgpo,
)
def _assert_gdpo(self, data: DynamicGlobalProperties | None) -> DynamicGlobalProperties:
@@ -109,6 +118,10 @@ class HivePowerDataRetrieval(CommandDataRetrieval[HarvestedDataRaw, SanitizedDat
assert data is not None, "ListWithdrawVestingRoutes data is missing"
return data.routes
def _assert_delegations(self, data: FindVestingDelegations | None) -> list[VestingDelegation[Asset.Vests]]:
assert data is not None, "FindVestingDelegations data is missing"
return data.delegations
def _create_balance_representation(self, dgdpo: DynamicGlobalProperties, vests_value: Asset.Vests) -> SharesBalance:
return SharesBalance(hp_balance=vests_to_hive(vests_value, dgdpo), vests_balance=vests_value)
Loading