From 611bc98b6633733a9ea268e36611677e576dae00 Mon Sep 17 00:00:00 2001 From: Marcin Ickiewicz <mickiewicz@syncad.com> Date: Mon, 17 Aug 2020 14:20:51 +0200 Subject: [PATCH] issue #58: hot/trends computed for lie sync --- hive/db/schema.py | 9 ++++++--- hive/indexer/blocks.py | 14 +++++++++----- hive/indexer/sync.py | 10 +++++----- hive/indexer/votes.py | 41 +++++++++++++++++++++++++++-------------- hive/utils/trends.py | 23 ++++++++++++++++++++--- 5 files changed, 67 insertions(+), 30 deletions(-) diff --git a/hive/db/schema.py b/hive/db/schema.py index 8eadb9ec6..01c60aa5a 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -197,6 +197,7 @@ def build_metadata(): sa.Column('vote_percent', sa.Integer, server_default='0'), sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), sa.Column('num_changes', sa.Integer, server_default='0'), + sa.Column('block_num', sa.Integer, nullable=False ), sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'), @@ -204,13 +205,15 @@ def build_metadata(): sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']), + sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']), sa.Index('hive_votes_post_id_idx', 'post_id'), sa.Index('hive_votes_voter_id_idx', 'voter_id'), sa.Index('hive_votes_author_id_idx', 'author_id'), sa.Index('hive_votes_permlink_id_idx', 'permlink_id'), sa.Index('hive_votes_upvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent > 0")), - sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")) + sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")), + sa.Index('hive_votes_block_num_idx', 'block_num') ) sa.Table( @@ -960,7 +963,7 @@ def setup(db): hp.counter_deleted = 0 AND -- ABW: wrong! fat node required _start_post_author+_start_port_permlink to exist (when given) and sorted by ( _parent_author, updated_at, comment_id ) hp.parent_author > _parent_author COLLATE "C" OR - hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND + hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author >= _start_post_author AND hp1.permlink >= _start_post_permlink ORDER BY id LIMIT 1) ORDER BY hp.parent_author ASC, @@ -1003,7 +1006,7 @@ def setup(db): -- ABW: wrong! fat node required _start_post_author+_start_post_permlink to exist (when given) and sorted just like -- in case of by_last_update (but in fat node) but should by ( _author, updated_at, comment_id ) hp.author > _author COLLATE "C" OR - hp.author = _author AND hp.updated_at >= _updated_at AND + hp.author = _author AND hp.updated_at >= _updated_at AND hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author > _start_post_author COLLATE "C" OR hp1.author = _start_post_author AND hp1.permlink >= _start_post_permlink COLLATE "C" ORDER BY id LIMIT 1) ORDER BY hp.parent_author ASC, diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 589d3c9ee..07d5e677b 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -64,10 +64,10 @@ class Blocks: time_start = perf_counter() #assert is_trx_active(), "Block.process must be in a trx" ret = cls._process(block, vops_in_block, hived, is_initial_sync=False) + cls._flush_blocks() PostDataCache.flush() Tags.flush() Votes.flush() - cls._flush_blocks() Posts.flush() time_end = perf_counter() log.info("[PROCESS BLOCK] %fs", time_end - time_start) @@ -90,10 +90,10 @@ class Blocks: # Follows flushing needs to be atomic because recounts are # expensive. So is tracking follows at all; hence we track # deltas in memory and update follow/er counts in bulk. + cls._flush_blocks() PostDataCache.flush() Tags.flush() Votes.flush() - cls._flush_blocks() Follow.flush(trx=False) Posts.flush() @@ -104,7 +104,7 @@ class Blocks: return cls.ops_stats @staticmethod - def prepare_vops(comment_payout_ops, vopsList, date): + def prepare_vops(comment_payout_ops, vopsList, date, block_num): vote_ops = {} ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0, 'ineffective_delete_comment_operation' : 0 } inefficient_deleted_ops = {} @@ -115,6 +115,7 @@ class Blocks: op_type = vop['type'] op_value = vop['value'] + op_value['block_num'] = block_num key = "{}/{}".format(op_value['author'], op_value['permlink']) if op_type == 'author_reward_operation': @@ -180,10 +181,10 @@ class Blocks: if is_initial_sync: if num in virtual_operations: - (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date) + (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num) else: vops = hived.get_virtual_operations(num) - (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date) + (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num) json_ops = [] for tx_idx, tx in enumerate(block['transactions']): @@ -191,6 +192,9 @@ class Blocks: op_type = operation['type'] op = operation['value'] + assert 'block_num' not in op + op['block_num'] = num + account_name = None # account ops if op_type == 'pow_operation': diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 9c6488a82..c2a41f7f1 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -45,7 +45,7 @@ def prepare_vops(vops_by_block): for blockNum, blockDict in vops_by_block.items(): vopsList = blockDict['ops'] preparedVops[blockNum] = vopsList - + return preparedVops def _block_provider(node, queue, lbound, ubound, chunk_size): @@ -102,7 +102,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun while lbound < ubound: if blocksQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any block to process...") - + blocks = [] if not blocksQueue.empty() or CONTINUE_PROCESSING: blocks = blocksQueue.get() @@ -110,7 +110,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun if vopsQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any vops to process...") - + preparedVops = [] if not vopsQueue.empty() or CONTINUE_PROCESSING: preparedVops = vopsQueue.get() @@ -119,11 +119,11 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun to = min(lbound + chunk_size, ubound) timer.batch_start() - + block_start = perf() - ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)) Blocks.ops_stats.clear() block_end = perf() + ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)) timer.batch_lap() timer.batch_finish(len(blocks)) diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index 2e62d0187..5efe549da 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -2,7 +2,9 @@ import logging +from hive.db.db_state import DbState from hive.db.adapter import Db +from hive.utils.trends import update_hot_and_tranding_for_block_range log = logging.getLogger(__name__) DB = Db.instance() @@ -50,11 +52,11 @@ class Votes: def get_total_vote_weight(cls, author, permlink): """ Get total vote weight for selected post """ sql = """ - SELECT + SELECT sum(weight) - FROM + FROM hive_votes_accounts_permlinks_view hv - WHERE + WHERE hv.author = :author AND hv.permlink = :permlink """ @@ -65,11 +67,11 @@ class Votes: def get_total_vote_rshares(cls, author, permlink): """ Get total vote rshares for selected post """ sql = """ - SELECT + SELECT sum(rshares) - FROM + FROM hive_votes_accounts_permlinks_view hv - WHERE + WHERE hv.author = :author AND hv.permlink = :permlink """ @@ -85,6 +87,7 @@ class Votes: author = vote_operation['author'] permlink = vote_operation['permlink'] weight = vote_operation['weight'] + block_num = vote_operation['block_num'] if cls.inside_flush: log.exception("Adding new vote-info into '_votes_data' dict") @@ -95,6 +98,7 @@ class Votes: if key in cls._votes_data: cls._votes_data[key]["vote_percent"] = weight cls._votes_data[key]["last_update"] = date + cls._votes_data[key]["block_num"] = block_num else: cls._votes_data[key] = dict(voter=voter, author=author, @@ -103,7 +107,8 @@ class Votes: weight=0, rshares=0, last_update=date, - is_effective=False) + is_effective=False, + block_num=block_num) @classmethod def effective_comment_vote_op(cls, key, vop): @@ -118,6 +123,7 @@ class Votes: cls._votes_data[key]["weight"] = vop["weight"] cls._votes_data[key]["rshares"] = vop["rshares"] cls._votes_data[key]["is_effective"] = True + cls._votes_data[key]["block_num"] = vop['block_num'] @classmethod def flush(cls): @@ -126,14 +132,14 @@ class Votes: if cls._votes_data: sql = """ INSERT INTO hive_votes - (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update) - SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update + (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num) + SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update, t.block_num FROM ( VALUES - -- voter, author, permlink, weight, rshares, vote_percent, last_update + -- voter, author, permlink, weight, rshares, vote_percent, last_update, block_num {} - ) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update) + ) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update, block_num) INNER JOIN hive_accounts ha_v ON ha_v.name = t.voter INNER JOIN hive_accounts ha_a ON ha_a.name = t.author INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink @@ -146,7 +152,8 @@ class Votes: rshares = {}.rshares, vote_percent = EXCLUDED.vote_percent, last_update = EXCLUDED.last_update, - num_changes = hive_votes.num_changes + 1 + num_changes = hive_votes.num_changes + 1, + block_num = EXCLUDED.block_num WHERE hive_votes.voter_id = EXCLUDED.voter_id and hive_votes.author_id = EXCLUDED.author_id and hive_votes.permlink_id = EXCLUDED.permlink_id; """ # WHERE clause above seems superfluous (and works all the same without it, at least up to 5mln) @@ -154,10 +161,14 @@ class Votes: values_skip = [] values_override = [] values_limit = 1000 + first_block = 0 + last_block = 0 for _, vd in cls._votes_data.items(): values = None on_conflict_data_source = None + first_block = min( first_block, vd['block_num'] ) + last_block = max( last_block, vd['block_num'] ) if vd['is_effective']: values = values_override @@ -166,8 +177,8 @@ class Votes: values = values_skip on_conflict_data_source = 'hive_votes' - values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp)".format( - vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'])) + values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp, {})".format( + vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'], vd['block_num'])) if len(values) >= values_limit: values_str = ','.join(values) @@ -186,5 +197,7 @@ class Votes: DB.query(actual_query) values_override.clear() + if not DbState.is_initial_sync(): + update_hot_and_tranding_for_block_range( first_block, last_block ) cls._votes_data.clear() cls.inside_flush = False diff --git a/hive/utils/trends.py b/hive/utils/trends.py index 0d5342c13..b33bfc3a4 100644 --- a/hive/utils/trends.py +++ b/hive/utils/trends.py @@ -1,13 +1,20 @@ import math import decimal + from hive.db.adapter import Db DB = Db.instance() def update_all_hot_and_tranding(): """Calculate and set hot and trending values of all posts""" - sql = """ + update_hot_and_tranding_for_block_range() + +NO_CONSTRAINT = -1 + +def update_hot_and_tranding_for_block_range( first_block = NO_CONSTRAINT, last_block = NO_CONSTRAINT): + """Calculate and set hot and trending values of all posts""" + hot_and_trend_sql = """ UPDATE hive_posts ihp set sc_hot = calculate_hot(ds.rshares_sum, ihp.created_at), sc_trend = calculate_tranding(ds.rshares_sum, ihp.created_at) @@ -15,9 +22,19 @@ def update_all_hot_and_tranding(): ( SELECT hv.post_id as id, CAST(sum(hv.rshares) AS BIGINT) as rshares_sum FROM hive_votes hv - group by hv.post_id + {} + GROUP BY hv.post_id ) as ds WHERE ihp.id = ds.id """ - DB.query_no_return(sql) + sql = "" + if first_block == NO_CONSTRAINT and last_block == NO_CONSTRAINT: + sql = hot_and_trend_sql.format( "" ) + elif last_block == NO_CONSTRAINT: + sql = hot_and_trend_sql.format( "WHERE block_num >= {}".format( first_block ) ) + elif first_block == NO_CONSTRAINT: + sql = hot_and_trend_sql.format( "WHERE block_num <= {}".format( last_block ) ) + else: + sql = hot_and_trend_sql.format( "WHERE block_num >= {} AND block_num <= {}".format( first_block, last_block ) ) + DB.query_no_return(sql) -- GitLab