From 43036a92dc49ee91bd8f92114bd60b88c425e719 Mon Sep 17 00:00:00 2001 From: Dariusz Kedzierski <dkedzierski@syncad.com> Date: Wed, 10 Jun 2020 23:35:23 +0200 Subject: [PATCH] Cleanup and fixes. Testing in progress. --- hive/db/schema.py | 11 +++--- hive/indexer/blocks.py | 10 +++--- hive/indexer/community.py | 8 ++--- hive/indexer/payments.py | 8 ++--- hive/indexer/posts.py | 66 +++++++++++++++++++--------------- hive/indexer/sync.py | 12 +++---- hive/steem/client.py | 14 ++++---- hive/steem/http_client.py | 5 +-- hive/utils/normalize.py | 1 + hive/utils/post.py | 2 +- hive/utils/stats.py | 5 +-- scripts/update_hivemind_db.sql | 15 ++++---- 12 files changed, 85 insertions(+), 72 deletions(-) diff --git a/hive/db/schema.py b/hive/db/schema.py index 8320842dd..430059b80 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -111,12 +111,11 @@ def build_metadata(): sa.Column('sc_trend', sa.Float(precision=6), nullable=False, server_default='0'), sa.Column('sc_hot', sa.Float(precision=6), nullable=False, server_default='0'), - sa.Column('total_payout_value', sa.String(16), nullable=False, server_default=''), + sa.Column('total_payout_value', sa.String(19), nullable=False, server_default=''), sa.Column('author_rewards', sa.Integer, nullable=False, server_default='0'), sa.Column('children_abs_rshares', sa.Integer, nullable=False, server_default='0'), - sa.Column('net_rshares', sa.Integer, nullable=False, server_default='0'), - sa.Column('abs_rshares', sa.Integer, nullable=False, server_default='0'), - sa.Column('vote_rshares', sa.Integer, nullable=False, server_default='0'), + sa.Column('abs_rshares', sa.BigInteger, nullable=False, server_default='0'), + sa.Column('vote_rshares', sa.BigInteger, nullable=False, server_default='0'), sa.Column('net_votes', sa.Integer, nullable=False, server_default='0'), sa.Column('active', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), sa.Column('last_payout', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), @@ -127,10 +126,10 @@ def build_metadata(): sa.Column('parent_author_id', sa.Integer, nullable=False), sa.Column('parent_permlink_id', sa.Integer, nullable=False), - sa.Column('curator_payout_value', sa.String(16), nullable=False, server_default=''), + sa.Column('curator_payout_value', sa.String(19), nullable=False, server_default=''), sa.Column('root_author_id', sa.Integer, nullable=False), sa.Column('root_permlink_id', sa.Integer, nullable=False), - sa.Column('max_accepted_payout', sa.String(16), nullable=False, server_default=''), + sa.Column('max_accepted_payout', sa.String(19), nullable=False, server_default=''), sa.Column('percent_steem_dollars', sa.Integer, nullable=False, server_default='-1'), sa.Column('allow_replies', BOOLEAN, nullable=False, server_default='1'), sa.Column('allow_votes', BOOLEAN, nullable=False, server_default='1'), diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 51c4a51ad..ffa8a6695 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -6,7 +6,6 @@ from hive.db.adapter import Db from hive.indexer.accounts import Accounts from hive.indexer.posts import Posts -from hive.indexer.cached_post import CachedPost from hive.indexer.custom_op import CustomOp from hive.indexer.payments import Payments from hive.indexer.follow import Follow @@ -116,7 +115,10 @@ class Blocks: Payments.op_transfer(op, tx_idx, num, date) elif op_type == 'custom_json_operation': json_ops.append(op) + # follow/reblog/community ops + CustomOp.process_ops(json_ops, num, date) + # virtual ops comment_payout_ops = {} for vop in hived.get_virtual_operations(num): key = None @@ -139,8 +141,8 @@ class Blocks: else: comment_payout_ops[key] = [{vop['op']['type']:val}] if comment_payout_ops: - Posts.comment_payout_op(comment_payout_ops, date) - CustomOp.process_ops(json_ops, num, date) # follow/reblog/community ops + price = hived.get_price() + Posts.comment_payout_op(comment_payout_ops, date, price) return num @@ -244,7 +246,7 @@ class Blocks: # remove all recent records -- core DB.query("DELETE FROM hive_feed_cache WHERE created_at >= :date", date=date) DB.query("DELETE FROM hive_reblogs WHERE created_at >= :date", date=date) - DB.query("DELETE FROM hive_follows WHERE created_at >= :date", date=date) #* + DB.query("DELETE FROM hive_follows WHERE created_at >= :date", date=date) # remove posts: core, tags, cache entries if post_ids: diff --git a/hive/indexer/community.py b/hive/indexer/community.py index 51df84475..9e12b9845 100644 --- a/hive/indexer/community.py +++ b/hive/indexer/community.py @@ -401,15 +401,15 @@ class CommunityOp: DB.query("""UPDATE hive_posts SET is_muted = '1' WHERE id = :post_id""", **params) self._notify('mute_post', payload=self.notes) - if not DbState.is_initial_sync(): - CachedPost.update(self.account, self.permlink, self.post_id) + #if not DbState.is_initial_sync(): + # CachedPost.update(self.account, self.permlink, self.post_id) elif action == 'unmutePost': DB.query("""UPDATE hive_posts SET is_muted = '0' WHERE id = :post_id""", **params) self._notify('unmute_post', payload=self.notes) - if not DbState.is_initial_sync(): - CachedPost.update(self.account, self.permlink, self.post_id) + #if not DbState.is_initial_sync(): + # CachedPost.update(self.account, self.permlink, self.post_id) elif action == 'pinPost': DB.query("""UPDATE hive_posts SET is_pinned = '1' diff --git a/hive/indexer/payments.py b/hive/indexer/payments.py index 08892070b..f13cf2f41 100644 --- a/hive/indexer/payments.py +++ b/hive/indexer/payments.py @@ -39,10 +39,10 @@ class Payments: DB.query(sql, val=new_amount, id=record['post_id']) # notify cached_post of new promoted balance, and trigger update - if not DbState.is_initial_sync(): - CachedPost.update_promoted_amount(record['post_id'], new_amount) - author, permlink = cls._split_url(op['memo']) - CachedPost.vote(author, permlink, record['post_id']) + #if not DbState.is_initial_sync(): + # CachedPost.update_promoted_amount(record['post_id'], new_amount) + # author, permlink = cls._split_url(op['memo']) + # CachedPost.vote(author, permlink, record['post_id']) @classmethod def _validated(cls, op, tx_idx, num, date): diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index 5cd140c3f..5d7b58eab 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -9,11 +9,10 @@ from hive.db.adapter import Db from hive.db.db_state import DbState from hive.indexer.accounts import Accounts -from hive.indexer.cached_post import CachedPost from hive.indexer.feed_cache import FeedCache from hive.indexer.community import Community, START_DATE from hive.indexer.notify import Notify -from hive.utils.normalize import legacy_amount, parse_amount +from hive.utils.normalize import legacy_amount, asset_to_hbd_hive log = logging.getLogger(__name__) DB = Db.instance() @@ -151,38 +150,39 @@ class Posts: DB.query(sql, id=pid, votes=dumps(votes)) @classmethod - def comment_payout_op(cls, ops, date): + def comment_payout_op(cls, ops, date, price): """ Process comment payment operations """ for k, v in ops.items(): author, permlink = k.split("/") pid = cls.get_id(author, permlink) curator_rewards_sum = 0 - author_rewards = '' + author_rewards_sum = 0 comment_author_reward = None - print(v) for operation in v: for op, value in operation.items(): if op == 'curation_reward_operation': curator_rewards_sum = curator_rewards_sum + int(value['reward']['amount']) if op == 'author_reward_operation': - author_rewards = "{}, {}, {}".format(legacy_amount(value['hbd_payout']), legacy_amount(value['hive_payout']), legacy_amount(value['vesting_payout'])) + hive_to_hbd = asset_to_hbd_hive(price, value['hive_payout']) + author_rewards_sum = int(hive_to_hbd['amount']) + int(value['hbd_payout']['amount']) if op == 'comment_reward_operation': comment_author_reward = value['payout'] - curator_rewards = {'amount' : str(curator_rewards_sum), 'precision': 6, 'nai': '@@000000037'} - print("COMMENT OP REWARDS ==> {}/{} > AUTHOR > {} > CURATORS > {} > TOTAL > {}".format(author, permlink, author_rewards, legacy_amount(curator_rewards), legacy_amount(comment_author_reward))) - raise RuntimeError("Comment payout op") + curator_rewards = {'amount' : str(curator_rewards_sum), 'precision': 6, 'nai': '@@000000037'} sql = """UPDATE hive_posts SET total_payout_value = :total_payout_value, curator_payout_value = :curator_payout_value, - max_accepted_payout = :max_accepted_payout, author_rewards = :author_rewards, last_payout = :last_payout, cashout_time = :cashout_time, is_paidout = true WHERE id = :id """ + DB.query(sql, total_payout_value=legacy_amount(comment_author_reward), + curator_payout_value=legacy_amount(curator_rewards), + author_rewards=author_rewards_sum, last_payout=date, + cashout_time=date, id=pid) @classmethod def insert(cls, hived, op, date): @@ -245,7 +245,7 @@ class Posts: max_accepted_payout = :max_accepted_payout, author_rewards = :author_rewards, children_abs_rshares = :children_abs_rshares, - net_rshares = :net_rshares, + rshares = :net_rshares, abs_rshares = :abs_rshares, vote_rshares = :vote_rshares, net_votes = :net_votes, @@ -298,12 +298,21 @@ class Posts: author_id = Accounts.get_id(post['author']) Notify('error', dst_id=author_id, when=date, post_id=post['id'], payload=post['error']).write() - # TODO: [DK] Remove CachedPost - # CachedPost.insert(op['author'], op['permlink'], post['id']) if op['parent_author']: # update parent's child count - CachedPost.recount(op['parent_author'], op['parent_permlink'], post['parent_id']) + cls.update_child_count(post['parent_id']) cls._insert_feed_cache(post) + @classmethod + def update_child_count(cls, parent_id, op='+'): + """ Increase/decrease child count by 1 """ + sql = """ + UPDATE + hive_posts + SET + children = (SELECT children FROM hive_posts WHERE id = :id) :op 1 + WHERE id = :id""" + DB.query(sql, id=parent_id, op=op) + @classmethod def undelete(cls, op, date, pid): """Re-allocates an existing record flagged as deleted.""" @@ -328,9 +337,6 @@ class Posts: author_id = Accounts.get_id(post['author']) Notify('error', dst_id=author_id, when=date, post_id=post['id'], payload=post['error']).write() - - # TODO: [DK] Remove CachedPost - #CachedPost.undelete(pid, post['author'], post['permlink'], post['category']) cls._insert_feed_cache(post) @classmethod @@ -340,8 +346,6 @@ class Posts: DB.query("UPDATE hive_posts SET is_deleted = '1' WHERE id = :id", id=pid) if not DbState.is_initial_sync(): - # TODO: [DK] Remove CachedPost - #CachedPost.delete(pid, op['author'], op['permlink']) if depth == 0: # TODO: delete from hive_reblogs -- otherwise feed cache gets # populated with deleted posts somwrimas @@ -349,8 +353,7 @@ class Posts: else: # force parent child recount when child is deleted prnt = cls._get_parent_by_child_id(pid) - CachedPost.recount(prnt['author'], prnt['permlink'], prnt['id']) - + cls.update_child_count(prnt['id'], '-') @classmethod def update(cls, hived, op, date, pid): @@ -360,8 +363,7 @@ class Posts: a signal to update cache record. """ # pylint: disable=unused-argument - #if not DbState.is_initial_sync(): - # CachedPost.update(op['author'], op['permlink'], pid) + # add category to category table if 'category' in op: sql = """ @@ -399,7 +401,7 @@ class Posts: max_accepted_payout = :max_accepted_payout, author_rewards = :author_rewards, children_abs_rshares = :children_abs_rshares, - net_rshares = :net_rshares, + rshares = :net_rshares, abs_rshares = :abs_rshares, vote_rshares = :vote_rshares, net_votes = :net_votes, @@ -435,7 +437,6 @@ class Posts: id=pid ) - votes = hived.get_votes(op['author'], op['permlink']) sql = """ UPDATE hive_post_data @@ -444,14 +445,13 @@ class Posts: preview = :preview, img_url = :img_url, body = :body, - votes = :votes, json = :json WHERE id = :id""" DB.query(sql, id=pid, title=op['title'], preview=op['preview'] if 'preview' in op else "", img_url=op['img_url'] if 'img_url' in op else "", - body=op['body'], votes=dumps(votes), + body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}') @classmethod @@ -518,7 +518,17 @@ class Posts: if not is_valid: error = 'replying to invalid post' elif is_muted: error = 'replying to muted post' #find root comment - + root_id = cls.find_root(op['author'], op['permlink']) + sql = """ + SELECT + ha_a.name as author, hpd_p.permlink as permlink + FROM + hive_posts hp + LEFT JOIN hive_accounts ha_a ON ha_a.id = hp.author_id + LEFT JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id + WHERE + hp.id = :id""" + root_author, root_permlink = DB.query_row(sql, id=root_id) # check post validity in specified context error = None diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 142c32ec6..e5716cd2e 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -66,7 +66,7 @@ class Sync: Blocks.verify_head(self._steem) # perform cleanup if process did not exit cleanly - CachedPost.recover_missing_posts(self._steem) + # CachedPost.recover_missing_posts(self._steem) #audit_cache_missing(self._db, self._steem) #audit_cache_deleted(self._db) @@ -85,8 +85,8 @@ class Sync: self.from_steemd() # take care of payout backlog - CachedPost.dirty_paidouts(Blocks.head_date()) - #CachedPost.flush(self._steem, trx=True) + # CachedPost.dirty_paidouts(Blocks.head_date()) + # CachedPost.flush(self._steem, trx=True) try: # listen for new blocks @@ -104,7 +104,7 @@ class Sync: self.from_steemd(is_initial_sync=True) log.info("[INIT] *** Initial cache build ***") - CachedPost.recover_missing_posts(self._steem) + # CachedPost.recover_missing_posts(self._steem) FeedCache.rebuild() Follow.force_recount() @@ -195,8 +195,8 @@ class Sync: num = Blocks.process(block, steemd) follows = Follow.flush(trx=False) accts = Accounts.flush(steemd, trx=False, spread=8) - CachedPost.dirty_paidouts(block['timestamp']) - #cnt = CachedPost.flush(steemd, trx=False) + # CachedPost.dirty_paidouts(block['timestamp']) + # cnt = CachedPost.flush(steemd, trx=False) self._db.query("COMMIT") ms = (perf() - start_time) * 1000 diff --git a/hive/steem/client.py b/hive/steem/client.py index a86acc483..a190f8fa6 100644 --- a/hive/steem/client.py +++ b/hive/steem/client.py @@ -154,13 +154,13 @@ class SteemClient: def get_votes(self, author, permlink): """ Get list of votes """ - call = self.__exec("list_votes", {'start':[author, permlink, ""], - 'limit':1000, 'order':'by_comment_voter'}) - ret = [] - for vote in call['votes']: - if vote['author'] == author and vote['permlink'] == permlink: - ret.append(vote) - return ret + call = self.__exec("find_votes", {'author':author, 'permlink':permlink}) + return call['votes'] + + def get_price(self): + """ Get current price feed """ + call = self.__exec("get_current_price_feed") + return call def __exec(self, method, params=None): """Perform a single steemd call.""" diff --git a/hive/steem/http_client.py b/hive/steem/http_client.py index eff65df2d..1233ec439 100644 --- a/hive/steem/http_client.py +++ b/hive/steem/http_client.py @@ -88,9 +88,10 @@ class HttpClient(object): get_order_book='condenser_api', get_feed_history='condenser_api', get_dynamic_global_properties='database_api', - list_votes='database_api', + find_votes='database_api', get_comment_pending_payouts='database_api', - get_ops_in_block='account_history_api' + get_ops_in_block='account_history_api', + get_current_price_feed='database_api' ) def __init__(self, nodes, **kwargs): diff --git a/hive/utils/normalize.py b/hive/utils/normalize.py index 10b3bcd51..f640bd278 100644 --- a/hive/utils/normalize.py +++ b/hive/utils/normalize.py @@ -189,3 +189,4 @@ def asset_to_hbd_hive(price, asset): elif asset['nai'] == price['quote']['nai']: result = int(asset['amount']) * int(price['base']['amount']) / int(price['quote']['amount']) return {'amount' : result, 'nai' : price['base']['nai'], 'precision' : price['base']['precision']} + raise ValueError("Asset not supported") diff --git a/hive/utils/post.py b/hive/utils/post.py index 8e4de28b7..ad247655b 100644 --- a/hive/utils/post.py +++ b/hive/utils/post.py @@ -31,7 +31,7 @@ def post_to_internal(post, post_id, level='insert', promoted=None): #post['gray'] = core['is_muted'] #post['hide'] = not core['is_valid'] - values = [('post_id', post_id)] + values = [('id', post_id)] # immutable; write only once (*edge case: undeleted posts) if level == 'insert': diff --git a/hive/utils/stats.py b/hive/utils/stats.py index 5ae4f1f74..3d804d6d7 100644 --- a/hive/utils/stats.py +++ b/hive/utils/stats.py @@ -89,9 +89,10 @@ class SteemStats(StatsAbstract): 'get_order_book': 20, 'get_feed_history': 20, 'lookup_accounts': 1000, - 'list_votes':1000, + 'find_votes':1000, 'get_comment_pending_payouts':1000, - 'get_ops_in_block':500 + 'get_ops_in_block':500, + 'get_current_price_feed':50 } def __init__(self): diff --git a/scripts/update_hivemind_db.sql b/scripts/update_hivemind_db.sql index 42175c3db..7dc4272cc 100644 --- a/scripts/update_hivemind_db.sql +++ b/scripts/update_hivemind_db.sql @@ -106,12 +106,11 @@ CREATE TABLE IF NOT EXISTS hive_posts_new ( sc_trend NUMERIC(6) DEFAULT '0.0', sc_hot NUMERIC(6) DEFAULT '0.0', - total_payout_value VARCHAR(16) DEFAULT '', + total_payout_value VARCHAR(19) DEFAULT '', author_rewards INT DEFAULT '0', children_abs_rshares INT DEFAULT '0', - net_rshares INT DEFAULT '0', - abs_rshares INT DEFAULT '0', - vote_rshares INT DEFAULT '0', + abs_rshares BIGINT DEFAULT '0', + vote_rshares BIGINT DEFAULT '0', net_votes INT DEFAULT '0', active DATE DEFAULT '1970-01-01T00:00:00', last_payout DATE DEFAULT '1970-01-01T00:00:00', @@ -123,10 +122,10 @@ CREATE TABLE IF NOT EXISTS hive_posts_new ( -- columns from raw_json parent_author_id INT DEFAULT '-1', parent_permlink_id INT DEFAULT '-1', - curator_payout_value VARCHAR(16) DEFAULT '', + curator_payout_value VARCHAR(19) DEFAULT '', root_author_id INT DEFAULT '-1', root_permlink_id INT DEFAULT '-1', - max_accepted_payout VARCHAR(16) DEFAULT '', + max_accepted_payout VARCHAR(19) DEFAULT '', percent_steem_dollars INT DEFAULT '-1', allow_replies BOOLEAN DEFAULT '1', allow_votes BOOLEAN DEFAULT '1', @@ -209,10 +208,10 @@ CREATE TABLE legacy_comment_data ( raw_json TEXT, parent_author VARCHAR(16), parent_permlink VARCHAR(255), - curator_payout_value VARCHAR(16), + curator_payout_value VARCHAR(19), root_author VARCHAR(16), root_permlink VARCHAR(255), - max_accepted_payout VARCHAR(16), + max_accepted_payout VARCHAR(19), percent_steem_dollars INT, allow_replies BOOLEAN, allow_votes BOOLEAN, -- GitLab