Skip to content
Snippets Groups Projects
Commit 611bc98b authored by Marcin's avatar Marcin
Browse files

issue #58: hot/trends computed for lie sync

parent 75b31450
No related branches found
No related tags found
No related merge requests found
...@@ -197,6 +197,7 @@ def build_metadata(): ...@@ -197,6 +197,7 @@ def build_metadata():
sa.Column('vote_percent', sa.Integer, server_default='0'), sa.Column('vote_percent', sa.Integer, server_default='0'),
sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('num_changes', sa.Integer, server_default='0'), sa.Column('num_changes', sa.Integer, server_default='0'),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'), sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'),
...@@ -204,13 +205,15 @@ def build_metadata(): ...@@ -204,13 +205,15 @@ def build_metadata():
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']),
sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']), sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']),
sa.Index('hive_votes_post_id_idx', 'post_id'), sa.Index('hive_votes_post_id_idx', 'post_id'),
sa.Index('hive_votes_voter_id_idx', 'voter_id'), sa.Index('hive_votes_voter_id_idx', 'voter_id'),
sa.Index('hive_votes_author_id_idx', 'author_id'), sa.Index('hive_votes_author_id_idx', 'author_id'),
sa.Index('hive_votes_permlink_id_idx', 'permlink_id'), sa.Index('hive_votes_permlink_id_idx', 'permlink_id'),
sa.Index('hive_votes_upvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent > 0")), sa.Index('hive_votes_upvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent > 0")),
sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")) sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")),
sa.Index('hive_votes_block_num_idx', 'block_num')
) )
sa.Table( sa.Table(
...@@ -960,7 +963,7 @@ def setup(db): ...@@ -960,7 +963,7 @@ def setup(db):
hp.counter_deleted = 0 AND hp.counter_deleted = 0 AND
-- ABW: wrong! fat node required _start_post_author+_start_port_permlink to exist (when given) and sorted by ( _parent_author, updated_at, comment_id ) -- ABW: wrong! fat node required _start_post_author+_start_port_permlink to exist (when given) and sorted by ( _parent_author, updated_at, comment_id )
hp.parent_author > _parent_author COLLATE "C" OR hp.parent_author > _parent_author COLLATE "C" OR
hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND
hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author >= _start_post_author AND hp1.permlink >= _start_post_permlink ORDER BY id LIMIT 1) hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author >= _start_post_author AND hp1.permlink >= _start_post_permlink ORDER BY id LIMIT 1)
ORDER BY ORDER BY
hp.parent_author ASC, hp.parent_author ASC,
...@@ -1003,7 +1006,7 @@ def setup(db): ...@@ -1003,7 +1006,7 @@ def setup(db):
-- ABW: wrong! fat node required _start_post_author+_start_post_permlink to exist (when given) and sorted just like -- ABW: wrong! fat node required _start_post_author+_start_post_permlink to exist (when given) and sorted just like
-- in case of by_last_update (but in fat node) but should by ( _author, updated_at, comment_id ) -- in case of by_last_update (but in fat node) but should by ( _author, updated_at, comment_id )
hp.author > _author COLLATE "C" OR hp.author > _author COLLATE "C" OR
hp.author = _author AND hp.updated_at >= _updated_at AND hp.author = _author AND hp.updated_at >= _updated_at AND
hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author > _start_post_author COLLATE "C" OR hp1.author = _start_post_author AND hp1.permlink >= _start_post_permlink COLLATE "C" ORDER BY id LIMIT 1) hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author > _start_post_author COLLATE "C" OR hp1.author = _start_post_author AND hp1.permlink >= _start_post_permlink COLLATE "C" ORDER BY id LIMIT 1)
ORDER BY ORDER BY
hp.parent_author ASC, hp.parent_author ASC,
......
...@@ -64,10 +64,10 @@ class Blocks: ...@@ -64,10 +64,10 @@ class Blocks:
time_start = perf_counter() time_start = perf_counter()
#assert is_trx_active(), "Block.process must be in a trx" #assert is_trx_active(), "Block.process must be in a trx"
ret = cls._process(block, vops_in_block, hived, is_initial_sync=False) ret = cls._process(block, vops_in_block, hived, is_initial_sync=False)
cls._flush_blocks()
PostDataCache.flush() PostDataCache.flush()
Tags.flush() Tags.flush()
Votes.flush() Votes.flush()
cls._flush_blocks()
Posts.flush() Posts.flush()
time_end = perf_counter() time_end = perf_counter()
log.info("[PROCESS BLOCK] %fs", time_end - time_start) log.info("[PROCESS BLOCK] %fs", time_end - time_start)
...@@ -90,10 +90,10 @@ class Blocks: ...@@ -90,10 +90,10 @@ class Blocks:
# Follows flushing needs to be atomic because recounts are # Follows flushing needs to be atomic because recounts are
# expensive. So is tracking follows at all; hence we track # expensive. So is tracking follows at all; hence we track
# deltas in memory and update follow/er counts in bulk. # deltas in memory and update follow/er counts in bulk.
cls._flush_blocks()
PostDataCache.flush() PostDataCache.flush()
Tags.flush() Tags.flush()
Votes.flush() Votes.flush()
cls._flush_blocks()
Follow.flush(trx=False) Follow.flush(trx=False)
Posts.flush() Posts.flush()
...@@ -104,7 +104,7 @@ class Blocks: ...@@ -104,7 +104,7 @@ class Blocks:
return cls.ops_stats return cls.ops_stats
@staticmethod @staticmethod
def prepare_vops(comment_payout_ops, vopsList, date): def prepare_vops(comment_payout_ops, vopsList, date, block_num):
vote_ops = {} vote_ops = {}
ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0, 'ineffective_delete_comment_operation' : 0 } ops_stats = { 'author_reward_operation' : 0, 'comment_reward_operation' : 0, 'effective_comment_vote_operation' : 0, 'comment_payout_update_operation' : 0, 'ineffective_delete_comment_operation' : 0 }
inefficient_deleted_ops = {} inefficient_deleted_ops = {}
...@@ -115,6 +115,7 @@ class Blocks: ...@@ -115,6 +115,7 @@ class Blocks:
op_type = vop['type'] op_type = vop['type']
op_value = vop['value'] op_value = vop['value']
op_value['block_num'] = block_num
key = "{}/{}".format(op_value['author'], op_value['permlink']) key = "{}/{}".format(op_value['author'], op_value['permlink'])
if op_type == 'author_reward_operation': if op_type == 'author_reward_operation':
...@@ -180,10 +181,10 @@ class Blocks: ...@@ -180,10 +181,10 @@ class Blocks:
if is_initial_sync: if is_initial_sync:
if num in virtual_operations: if num in virtual_operations:
(vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date) (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num)
else: else:
vops = hived.get_virtual_operations(num) vops = hived.get_virtual_operations(num)
(vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date) (vote_ops, comment_payout_stats, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num)
json_ops = [] json_ops = []
for tx_idx, tx in enumerate(block['transactions']): for tx_idx, tx in enumerate(block['transactions']):
...@@ -191,6 +192,9 @@ class Blocks: ...@@ -191,6 +192,9 @@ class Blocks:
op_type = operation['type'] op_type = operation['type']
op = operation['value'] op = operation['value']
assert 'block_num' not in op
op['block_num'] = num
account_name = None account_name = None
# account ops # account ops
if op_type == 'pow_operation': if op_type == 'pow_operation':
......
...@@ -45,7 +45,7 @@ def prepare_vops(vops_by_block): ...@@ -45,7 +45,7 @@ def prepare_vops(vops_by_block):
for blockNum, blockDict in vops_by_block.items(): for blockNum, blockDict in vops_by_block.items():
vopsList = blockDict['ops'] vopsList = blockDict['ops']
preparedVops[blockNum] = vopsList preparedVops[blockNum] = vopsList
return preparedVops return preparedVops
def _block_provider(node, queue, lbound, ubound, chunk_size): def _block_provider(node, queue, lbound, ubound, chunk_size):
...@@ -102,7 +102,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -102,7 +102,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
while lbound < ubound: while lbound < ubound:
if blocksQueue.empty() and CONTINUE_PROCESSING: if blocksQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any block to process...") log.info("Awaiting any block to process...")
blocks = [] blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING: if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get() blocks = blocksQueue.get()
...@@ -110,7 +110,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -110,7 +110,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
if vopsQueue.empty() and CONTINUE_PROCESSING: if vopsQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any vops to process...") log.info("Awaiting any vops to process...")
preparedVops = [] preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING: if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get() preparedVops = vopsQueue.get()
...@@ -119,11 +119,11 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -119,11 +119,11 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
to = min(lbound + chunk_size, ubound) to = min(lbound + chunk_size, ubound)
timer.batch_start() timer.batch_start()
block_start = perf() block_start = perf()
ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync))
Blocks.ops_stats.clear() Blocks.ops_stats.clear()
block_end = perf() block_end = perf()
ops_stats = dict(Blocks.process_multi(blocks, preparedVops, node, is_initial_sync))
timer.batch_lap() timer.batch_lap()
timer.batch_finish(len(blocks)) timer.batch_finish(len(blocks))
......
...@@ -2,7 +2,9 @@ ...@@ -2,7 +2,9 @@
import logging import logging
from hive.db.db_state import DbState
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.utils.trends import update_hot_and_tranding_for_block_range
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance() DB = Db.instance()
...@@ -50,11 +52,11 @@ class Votes: ...@@ -50,11 +52,11 @@ class Votes:
def get_total_vote_weight(cls, author, permlink): def get_total_vote_weight(cls, author, permlink):
""" Get total vote weight for selected post """ """ Get total vote weight for selected post """
sql = """ sql = """
SELECT SELECT
sum(weight) sum(weight)
FROM FROM
hive_votes_accounts_permlinks_view hv hive_votes_accounts_permlinks_view hv
WHERE WHERE
hv.author = :author AND hv.author = :author AND
hv.permlink = :permlink hv.permlink = :permlink
""" """
...@@ -65,11 +67,11 @@ class Votes: ...@@ -65,11 +67,11 @@ class Votes:
def get_total_vote_rshares(cls, author, permlink): def get_total_vote_rshares(cls, author, permlink):
""" Get total vote rshares for selected post """ """ Get total vote rshares for selected post """
sql = """ sql = """
SELECT SELECT
sum(rshares) sum(rshares)
FROM FROM
hive_votes_accounts_permlinks_view hv hive_votes_accounts_permlinks_view hv
WHERE WHERE
hv.author = :author AND hv.author = :author AND
hv.permlink = :permlink hv.permlink = :permlink
""" """
...@@ -85,6 +87,7 @@ class Votes: ...@@ -85,6 +87,7 @@ class Votes:
author = vote_operation['author'] author = vote_operation['author']
permlink = vote_operation['permlink'] permlink = vote_operation['permlink']
weight = vote_operation['weight'] weight = vote_operation['weight']
block_num = vote_operation['block_num']
if cls.inside_flush: if cls.inside_flush:
log.exception("Adding new vote-info into '_votes_data' dict") log.exception("Adding new vote-info into '_votes_data' dict")
...@@ -95,6 +98,7 @@ class Votes: ...@@ -95,6 +98,7 @@ class Votes:
if key in cls._votes_data: if key in cls._votes_data:
cls._votes_data[key]["vote_percent"] = weight cls._votes_data[key]["vote_percent"] = weight
cls._votes_data[key]["last_update"] = date cls._votes_data[key]["last_update"] = date
cls._votes_data[key]["block_num"] = block_num
else: else:
cls._votes_data[key] = dict(voter=voter, cls._votes_data[key] = dict(voter=voter,
author=author, author=author,
...@@ -103,7 +107,8 @@ class Votes: ...@@ -103,7 +107,8 @@ class Votes:
weight=0, weight=0,
rshares=0, rshares=0,
last_update=date, last_update=date,
is_effective=False) is_effective=False,
block_num=block_num)
@classmethod @classmethod
def effective_comment_vote_op(cls, key, vop): def effective_comment_vote_op(cls, key, vop):
...@@ -118,6 +123,7 @@ class Votes: ...@@ -118,6 +123,7 @@ class Votes:
cls._votes_data[key]["weight"] = vop["weight"] cls._votes_data[key]["weight"] = vop["weight"]
cls._votes_data[key]["rshares"] = vop["rshares"] cls._votes_data[key]["rshares"] = vop["rshares"]
cls._votes_data[key]["is_effective"] = True cls._votes_data[key]["is_effective"] = True
cls._votes_data[key]["block_num"] = vop['block_num']
@classmethod @classmethod
def flush(cls): def flush(cls):
...@@ -126,14 +132,14 @@ class Votes: ...@@ -126,14 +132,14 @@ class Votes:
if cls._votes_data: if cls._votes_data:
sql = """ sql = """
INSERT INTO hive_votes INSERT INTO hive_votes
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update) (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num)
SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update, t.block_num
FROM FROM
( (
VALUES VALUES
-- voter, author, permlink, weight, rshares, vote_percent, last_update -- voter, author, permlink, weight, rshares, vote_percent, last_update, block_num
{} {}
) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update) ) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update, block_num)
INNER JOIN hive_accounts ha_v ON ha_v.name = t.voter INNER JOIN hive_accounts ha_v ON ha_v.name = t.voter
INNER JOIN hive_accounts ha_a ON ha_a.name = t.author INNER JOIN hive_accounts ha_a ON ha_a.name = t.author
INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink
...@@ -146,7 +152,8 @@ class Votes: ...@@ -146,7 +152,8 @@ class Votes:
rshares = {}.rshares, rshares = {}.rshares,
vote_percent = EXCLUDED.vote_percent, vote_percent = EXCLUDED.vote_percent,
last_update = EXCLUDED.last_update, last_update = EXCLUDED.last_update,
num_changes = hive_votes.num_changes + 1 num_changes = hive_votes.num_changes + 1,
block_num = EXCLUDED.block_num
WHERE hive_votes.voter_id = EXCLUDED.voter_id and hive_votes.author_id = EXCLUDED.author_id and hive_votes.permlink_id = EXCLUDED.permlink_id; WHERE hive_votes.voter_id = EXCLUDED.voter_id and hive_votes.author_id = EXCLUDED.author_id and hive_votes.permlink_id = EXCLUDED.permlink_id;
""" """
# WHERE clause above seems superfluous (and works all the same without it, at least up to 5mln) # WHERE clause above seems superfluous (and works all the same without it, at least up to 5mln)
...@@ -154,10 +161,14 @@ class Votes: ...@@ -154,10 +161,14 @@ class Votes:
values_skip = [] values_skip = []
values_override = [] values_override = []
values_limit = 1000 values_limit = 1000
first_block = 0
last_block = 0
for _, vd in cls._votes_data.items(): for _, vd in cls._votes_data.items():
values = None values = None
on_conflict_data_source = None on_conflict_data_source = None
first_block = min( first_block, vd['block_num'] )
last_block = max( last_block, vd['block_num'] )
if vd['is_effective']: if vd['is_effective']:
values = values_override values = values_override
...@@ -166,8 +177,8 @@ class Votes: ...@@ -166,8 +177,8 @@ class Votes:
values = values_skip values = values_skip
on_conflict_data_source = 'hive_votes' on_conflict_data_source = 'hive_votes'
values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp)".format( values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp, {})".format(
vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'])) vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'], vd['block_num']))
if len(values) >= values_limit: if len(values) >= values_limit:
values_str = ','.join(values) values_str = ','.join(values)
...@@ -186,5 +197,7 @@ class Votes: ...@@ -186,5 +197,7 @@ class Votes:
DB.query(actual_query) DB.query(actual_query)
values_override.clear() values_override.clear()
if not DbState.is_initial_sync():
update_hot_and_tranding_for_block_range( first_block, last_block )
cls._votes_data.clear() cls._votes_data.clear()
cls.inside_flush = False cls.inside_flush = False
import math import math
import decimal import decimal
from hive.db.adapter import Db from hive.db.adapter import Db
DB = Db.instance() DB = Db.instance()
def update_all_hot_and_tranding(): def update_all_hot_and_tranding():
"""Calculate and set hot and trending values of all posts""" """Calculate and set hot and trending values of all posts"""
sql = """ update_hot_and_tranding_for_block_range()
NO_CONSTRAINT = -1
def update_hot_and_tranding_for_block_range( first_block = NO_CONSTRAINT, last_block = NO_CONSTRAINT):
"""Calculate and set hot and trending values of all posts"""
hot_and_trend_sql = """
UPDATE hive_posts ihp UPDATE hive_posts ihp
set sc_hot = calculate_hot(ds.rshares_sum, ihp.created_at), set sc_hot = calculate_hot(ds.rshares_sum, ihp.created_at),
sc_trend = calculate_tranding(ds.rshares_sum, ihp.created_at) sc_trend = calculate_tranding(ds.rshares_sum, ihp.created_at)
...@@ -15,9 +22,19 @@ def update_all_hot_and_tranding(): ...@@ -15,9 +22,19 @@ def update_all_hot_and_tranding():
( (
SELECT hv.post_id as id, CAST(sum(hv.rshares) AS BIGINT) as rshares_sum SELECT hv.post_id as id, CAST(sum(hv.rshares) AS BIGINT) as rshares_sum
FROM hive_votes hv FROM hive_votes hv
group by hv.post_id {}
GROUP BY hv.post_id
) as ds ) as ds
WHERE ihp.id = ds.id WHERE ihp.id = ds.id
""" """
DB.query_no_return(sql)
sql = ""
if first_block == NO_CONSTRAINT and last_block == NO_CONSTRAINT:
sql = hot_and_trend_sql.format( "" )
elif last_block == NO_CONSTRAINT:
sql = hot_and_trend_sql.format( "WHERE block_num >= {}".format( first_block ) )
elif first_block == NO_CONSTRAINT:
sql = hot_and_trend_sql.format( "WHERE block_num <= {}".format( last_block ) )
else:
sql = hot_and_trend_sql.format( "WHERE block_num >= {} AND block_num <= {}".format( first_block, last_block ) )
DB.query_no_return(sql)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment