# -*- coding: utf-8 -*- import json from math import floor import decimal from beemgraphenebase.py23 import py23_bytes, bytes_types, integer_types, string_types, text_type from collections import OrderedDict from beemgraphenebase.types import ( Uint8, Int16, Uint16, Uint32, Uint64, Varint32, Int64, String, Bytes, Void, Array, PointInTime, Signature, Bool, Set, Fixed_array, Optional, Static_variant, Map, Id ) from beemgraphenebase.objects import GrapheneObject, isArgsThisClass from .objecttypes import object_type from beemgraphenebase.account import PublicKey from beemgraphenebase.objects import Operation as GPHOperation from beemgraphenebase.chains import known_chains from .operationids import operations import struct default_prefix = "STM" def value_to_decimal(value, decimal_places): decimal.getcontext().rounding = decimal.ROUND_DOWN # define rounding method return decimal.Decimal(str(float(value))).quantize(decimal.Decimal('1e-{}'.format(decimal_places))) class Amount(object): def __init__(self, d, prefix=default_prefix, json_str=False): self.json_str = json_str if isinstance(d, string_types): self.amount, self.symbol = d.strip().split(" ") self.precision = None for c in known_chains: if self.precision is not None: continue if known_chains[c]["prefix"] != prefix: continue for asset in known_chains[c]["chain_assets"]: if self.precision is not None: continue if asset["symbol"] == self.symbol: self.precision = asset["precision"] self.asset = asset["asset"] elif asset["asset"] == self.symbol: self.precision = asset["precision"] self.asset = asset["asset"] if self.precision is None: raise Exception("Asset unknown") self.amount = round(value_to_decimal(self.amount, self.precision) * 10 ** self.precision) # Workaround to allow transfers in HIVE self.str_repr = '{:.{}f} {}'.format((float(self.amount) / 10 ** self.precision), self.precision, self.symbol) elif isinstance(d, list): self.amount = d[0] self.asset = d[2] self.precision = d[1] self.symbol = None for c in known_chains: if known_chains[c]["prefix"] != prefix: continue for asset in known_chains[c]["chain_assets"]: if asset["asset"] == self.asset: self.symbol = asset["symbol"] if self.symbol is None: raise ValueError("Unknown NAI, cannot resolve symbol") a = Array([String(d[0]), d[1], d[2]]) self.str_repr = str(a.__str__()) elif isinstance(d, dict) and "nai" in d: self.asset = d["nai"] self.symbol = None for c in known_chains: if known_chains[c]["prefix"] != prefix: continue for asset in known_chains[c]["chain_assets"]: if asset["asset"] == d["nai"]: self.symbol = asset["symbol"] if self.symbol is None: raise ValueError("Unknown NAI, cannot resolve symbol") self.amount = d["amount"] self.precision = d["precision"] self.str_repr = json.dumps(d) else: self.amount = d.amount self.symbol = d.symbol self.asset = d.asset["asset"] self.precision = d.asset["precision"] self.amount = round(value_to_decimal(self.amount, self.precision) * 10 ** self.precision) self.str_repr = str(d) # self.str_repr = json.dumps((d.json())) # self.str_repr = '{:.{}f} {}'.format((float(self.amount) / 10 ** self.precision), self.precision, self.asset) def __bytes__(self): # padding # Workaround to allow transfers in HIVE if self.symbol == "HBD": self.symbol = "SBD" elif self.symbol == "HIVE": self.symbol = "STEEM" symbol = self.symbol + "\x00" * (7 - len(self.symbol)) return (struct.pack("<q", int(self.amount)) + struct.pack("<b", self.precision) + py23_bytes(symbol, "ascii")) def __str__(self): if self.json_str: return json.dumps({"amount": str(self.amount), "precision": self.precision, "nai": self.asset}) return self.str_repr class Operation(GPHOperation): def __init__(self, *args, **kwargs): self.appbase = kwargs.pop("appbase", False) self.prefix = kwargs.pop("prefix", default_prefix) super(Operation, self).__init__(*args, **kwargs) def _getklass(self, name): module = __import__("beembase.operations", fromlist=["operations"]) class_ = getattr(module, name) return class_ def operations(self): return operations def getOperationNameForId(self, i): """ Convert an operation id into the corresponding string """ for key in self.operations(): if int(self.operations()[key]) is int(i): return key return "Unknown Operation ID %d" % i def json(self): return json.loads(str(self)) # return json.loads(str(json.dumps([self.name, self.op.toJson()]))) def __bytes__(self): return py23_bytes(Id(self.opId)) + py23_bytes(self.op) def __str__(self): if self.appbase: return json.dumps({'type': self.name.lower() + '_operation', 'value': self.op.toJson()}) else: return json.dumps([self.name.lower(), self.op.toJson()]) class Memo(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: prefix = kwargs.pop("prefix", default_prefix) if "encrypted" not in kwargs or not kwargs["encrypted"]: super(Memo, self).__init__(None) else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] if "encrypted" in kwargs and kwargs["encrypted"]: super(Memo, self).__init__(OrderedDict([ ('from', PublicKey(kwargs["from"], prefix=prefix)), ('to', PublicKey(kwargs["to"], prefix=prefix)), ('nonce', Uint64(int(kwargs["nonce"]))), ('check', Uint32(int(kwargs["check"]))), ('encrypted', Bytes(kwargs["encrypted"])) ])) class WitnessProps(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] prefix = kwargs.get("prefix", default_prefix) if "sbd_interest_rate" in kwargs: super(WitnessProps, self).__init__(OrderedDict([ ('account_creation_fee', Amount(kwargs["account_creation_fee"], prefix=prefix)), ('maximum_block_size', Uint32(kwargs["maximum_block_size"])), ('sbd_interest_rate', Uint16(kwargs["sbd_interest_rate"])), ])) elif "hbd_interest_rate" in kwargs: super(WitnessProps, self).__init__(OrderedDict([ ('account_creation_fee', Amount(kwargs["account_creation_fee"], prefix=prefix)), ('maximum_block_size', Uint32(kwargs["maximum_block_size"])), ('hbd_interest_rate', Uint16(kwargs["hbd_interest_rate"])), ])) else: super(WitnessProps, self).__init__(OrderedDict([ ('account_creation_fee', Amount(kwargs["account_creation_fee"], prefix=prefix)), ('maximum_block_size', Uint32(kwargs["maximum_block_size"])), ])) class Price(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] prefix = kwargs.get("prefix", default_prefix) super(Price, self).__init__(OrderedDict([ ('base', Amount(kwargs["base"], prefix=prefix)), ('quote', Amount(kwargs["quote"], prefix=prefix)) ])) class Permission(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: prefix = kwargs.pop("prefix", default_prefix) if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] # Sort keys (FIXME: ideally, the sorting is part of Public # Key and not located here) kwargs["key_auths"] = sorted( kwargs["key_auths"], key=lambda x: repr(PublicKey(x[0], prefix=prefix)), reverse=False, ) kwargs["account_auths"] = sorted( kwargs["account_auths"], key=lambda x: x[0], reverse=False, ) accountAuths = Map([ [String(e[0]), Uint16(e[1])] for e in kwargs["account_auths"] ]) keyAuths = Map([ [PublicKey(e[0], prefix=prefix), Uint16(e[1])] for e in kwargs["key_auths"] ]) super(Permission, self).__init__(OrderedDict([ ('weight_threshold', Uint32(int(kwargs["weight_threshold"]))), ('account_auths', accountAuths), ('key_auths', keyAuths), ])) class Extension(Array): def __str__(self): """ We overload the __str__ function because the json representation is different for extensions """ return json.dumps(self.json) class ExchangeRate(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] prefix = kwargs.get("prefix", default_prefix) super(ExchangeRate, self).__init__( OrderedDict([ ('base', Amount(kwargs["base"], prefix=prefix)), ('quote', Amount(kwargs["quote"], prefix=prefix)), ])) class Beneficiary(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] super(Beneficiary, self).__init__( OrderedDict([ ('account', String(kwargs["account"])), ('weight', Int16(kwargs["weight"])), ])) class Beneficiaries(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] super(Beneficiaries, self).__init__( OrderedDict([ ('beneficiaries', Array([Beneficiary(o) for o in kwargs["beneficiaries"]])), ])) class CommentOptionExtensions(Static_variant): """ Serialize Comment Payout Beneficiaries. :param list beneficiaries: A static_variant containing beneficiaries. Example:: [0, {'beneficiaries': [ {'account': 'furion', 'weight': 10000} ]} ] """ def __init__(self, o): if type(o) == dict and 'type' in o and 'value' in o: if o['type'] == "comment_payout_beneficiaries": type_id = 0 else: type_id = ~0 data = o['value'] else: type_id, data = o if type_id == 0: data = (Beneficiaries(data)) else: raise Exception("Unknown CommentOptionExtension") super(CommentOptionExtensions, self).__init__(data, type_id) class UpdateProposalEndDate(GrapheneObject): def __init__(self, *args, **kwargs): if isArgsThisClass(self, args): self.data = args[0].data else: if len(args) == 1 and len(kwargs) == 0: kwargs = args[0] super(UpdateProposalEndDate, self).__init__( OrderedDict([ ('end_date', PointInTime(kwargs['end_date'])), ])) class UpdateProposalExtensions(Static_variant): """ Serialize Update proposal extensions. :param end_date: A static_variant containing the new end_date. Example:: { 'type': '1', 'value': { 'end_date': '2021-04-05T13:39:48' } } """ def __init__(self, o): if isinstance(o, dict) and 'type' in o and 'value' in o: if o['type'] == "update_proposal_end_date": type_id = 1 else: type_id = ~0 else: type_id, data = o if type_id == 1: data = (UpdateProposalEndDate(o['value'])) else: raise Exception("Unknown UpdateProposalExtension") super(UpdateProposalExtensions, self).__init__(data, type_id, False)