diff --git a/hive/db/schema.py b/hive/db/schema.py index 8eadb9ec6fb90b0d9b809620e0e9701adb0713b5..01c60aa5adc0008974ed21c48b3d78a54c393bdf 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -197,6 +197,7 @@ def build_metadata(): sa.Column('vote_percent', sa.Integer, server_default='0'), sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), sa.Column('num_changes', sa.Integer, server_default='0'), + sa.Column('block_num', sa.Integer, nullable=False ), sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'), @@ -204,13 +205,15 @@ def build_metadata(): sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']), + sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']), sa.Index('hive_votes_post_id_idx', 'post_id'), sa.Index('hive_votes_voter_id_idx', 'voter_id'), sa.Index('hive_votes_author_id_idx', 'author_id'), sa.Index('hive_votes_permlink_id_idx', 'permlink_id'), sa.Index('hive_votes_upvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent > 0")), - sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")) + sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")), + sa.Index('hive_votes_block_num_idx', 'block_num') ) sa.Table( @@ -960,7 +963,7 @@ def setup(db): hp.counter_deleted = 0 AND -- ABW: wrong! fat node required _start_post_author+_start_port_permlink to exist (when given) and sorted by ( _parent_author, updated_at, comment_id ) hp.parent_author > _parent_author COLLATE "C" OR - hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND + hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author >= _start_post_author AND hp1.permlink >= _start_post_permlink ORDER BY id LIMIT 1) ORDER BY hp.parent_author ASC, @@ -1003,7 +1006,7 @@ def setup(db): -- ABW: wrong! fat node required _start_post_author+_start_post_permlink to exist (when given) and sorted just like -- in case of by_last_update (but in fat node) but should by ( _author, updated_at, comment_id ) hp.author > _author COLLATE "C" OR - hp.author = _author AND hp.updated_at >= _updated_at AND + hp.author = _author AND hp.updated_at >= _updated_at AND hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author > _start_post_author COLLATE "C" OR hp1.author = _start_post_author AND hp1.permlink >= _start_post_permlink COLLATE "C" ORDER BY id LIMIT 1) ORDER BY hp.parent_author ASC, diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index a469d5be8bee86082121bff3eae7f0913786bbda..8c0e26d82ac3bebfb7009b8bc130aa37946e2c22 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -55,10 +55,10 @@ class Blocks: time_start = perf_counter() #assert is_trx_active(), "Block.process must be in a trx" ret = cls._process(block, vops_in_block, hived, is_initial_sync=False) + cls._flush_blocks() PostDataCache.flush() Tags.flush() Votes.flush() - cls._flush_blocks() Posts.flush() time_end = perf_counter() log.info("[PROCESS BLOCK] %fs", time_end - time_start) @@ -89,10 +89,10 @@ class Blocks: return FSM.start() log.info("#############################################################################") + flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush()) flush_time = register_time(flush_time, "Tags", Tags.flush()) flush_time = register_time(flush_time, "Votes", Votes.flush()) - flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Posts", Posts.flush()) @@ -102,7 +102,7 @@ class Blocks: log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") @staticmethod - def prepare_vops(comment_payout_ops, vopsList, date): + def prepare_vops(comment_payout_ops, vopsList, date, block_num): vote_ops = {} inefficient_deleted_ops = {} @@ -115,6 +115,7 @@ class Blocks: op_type = vop['type'] op_value = vop['value'] + op_value['block_num'] = block_num key = "{}/{}".format(op_value['author'], op_value['permlink']) if op_type == 'author_reward_operation': @@ -175,10 +176,10 @@ class Blocks: if is_initial_sync: if num in virtual_operations: - (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date) + (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num) else: vops = hived.get_virtual_operations(num) - (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date) + (vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num) json_ops = [] for tx_idx, tx in enumerate(block['transactions']): @@ -187,6 +188,9 @@ class Blocks: op_type = operation['type'] op = operation['value'] + assert 'block_num' not in op + op['block_num'] = num + account_name = None # account ops if op_type == 'pow_operation': @@ -307,9 +311,9 @@ class Blocks: @classmethod def _flush_blocks(cls): query = """ - INSERT INTO - hive_blocks (num, hash, prev, txs, ops, created_at) - VALUES + INSERT INTO + hive_blocks (num, hash, prev, txs, ops, created_at) + VALUES """ values = [] for block in cls.blocks_to_flush: diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index e5387d3854f5b9a479203e2aa76dc3d7b2d44422..955fa31a9cf4162e5ba8376feaff200158f9759d 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -2,7 +2,9 @@ import logging +from hive.db.db_state import DbState from hive.db.adapter import Db +from hive.utils.trends import update_hot_and_tranding_for_block_range log = logging.getLogger(__name__) DB = Db.instance() @@ -85,6 +87,7 @@ class Votes: author = vote_operation['author'] permlink = vote_operation['permlink'] weight = vote_operation['weight'] + block_num = vote_operation['block_num'] if cls.inside_flush: log.exception("Adding new vote-info into '_votes_data' dict") @@ -95,6 +98,7 @@ class Votes: if key in cls._votes_data: cls._votes_data[key]["vote_percent"] = weight cls._votes_data[key]["last_update"] = date + cls._votes_data[key]["block_num"] = block_num else: cls._votes_data[key] = dict(voter=voter, author=author, @@ -103,7 +107,8 @@ class Votes: weight=0, rshares=0, last_update=date, - is_effective=False) + is_effective=False, + block_num=block_num) @classmethod def effective_comment_vote_op(cls, key, vop): @@ -118,6 +123,7 @@ class Votes: cls._votes_data[key]["weight"] = vop["weight"] cls._votes_data[key]["rshares"] = vop["rshares"] cls._votes_data[key]["is_effective"] = True + cls._votes_data[key]["block_num"] = vop['block_num'] @classmethod def flush(cls): @@ -127,14 +133,14 @@ class Votes: if cls._votes_data: sql = """ INSERT INTO hive_votes - (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update) - SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update + (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num) + SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update, t.block_num FROM ( VALUES - -- voter, author, permlink, weight, rshares, vote_percent, last_update + -- voter, author, permlink, weight, rshares, vote_percent, last_update, block_num {} - ) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update) + ) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update, block_num) INNER JOIN hive_accounts ha_v ON ha_v.name = t.voter INNER JOIN hive_accounts ha_a ON ha_a.name = t.author INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink @@ -155,10 +161,14 @@ class Votes: values_skip = [] values_override = [] values_limit = 1000 + first_block = 0 + last_block = 0 for _, vd in cls._votes_data.items(): values = None on_conflict_data_source = None + first_block = min( first_block, vd['block_num'] ) + last_block = max( last_block, vd['block_num'] ) if vd['is_effective']: values = values_override @@ -167,8 +177,8 @@ class Votes: values = values_skip on_conflict_data_source = 'hive_votes' - values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp)".format( - vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'])) + values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp, {})".format( + vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'], vd['block_num'])) if len(values) >= values_limit: values_str = ','.join(values) @@ -188,6 +198,8 @@ class Votes: values_override.clear() n = len(cls._votes_data) + if not DbState.is_initial_sync(): + update_hot_and_tranding_for_block_range( first_block, last_block ) cls._votes_data.clear() cls.inside_flush = False return n diff --git a/hive/utils/trends.py b/hive/utils/trends.py index 0d5342c134de9aef274930b39825168e10b2a10a..b33bfc3a4c1c5424bd117ae6fdd3f5dbd230b5c5 100644 --- a/hive/utils/trends.py +++ b/hive/utils/trends.py @@ -1,13 +1,20 @@ import math import decimal + from hive.db.adapter import Db DB = Db.instance() def update_all_hot_and_tranding(): """Calculate and set hot and trending values of all posts""" - sql = """ + update_hot_and_tranding_for_block_range() + +NO_CONSTRAINT = -1 + +def update_hot_and_tranding_for_block_range( first_block = NO_CONSTRAINT, last_block = NO_CONSTRAINT): + """Calculate and set hot and trending values of all posts""" + hot_and_trend_sql = """ UPDATE hive_posts ihp set sc_hot = calculate_hot(ds.rshares_sum, ihp.created_at), sc_trend = calculate_tranding(ds.rshares_sum, ihp.created_at) @@ -15,9 +22,19 @@ def update_all_hot_and_tranding(): ( SELECT hv.post_id as id, CAST(sum(hv.rshares) AS BIGINT) as rshares_sum FROM hive_votes hv - group by hv.post_id + {} + GROUP BY hv.post_id ) as ds WHERE ihp.id = ds.id """ - DB.query_no_return(sql) + sql = "" + if first_block == NO_CONSTRAINT and last_block == NO_CONSTRAINT: + sql = hot_and_trend_sql.format( "" ) + elif last_block == NO_CONSTRAINT: + sql = hot_and_trend_sql.format( "WHERE block_num >= {}".format( first_block ) ) + elif first_block == NO_CONSTRAINT: + sql = hot_and_trend_sql.format( "WHERE block_num <= {}".format( last_block ) ) + else: + sql = hot_and_trend_sql.format( "WHERE block_num >= {} AND block_num <= {}".format( first_block, last_block ) ) + DB.query_no_return(sql)