Skip to content
Snippets Groups Projects
Commit 84dc1e5c authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'mi_58_columns_with_blocks_numbers' into 'develop'

Mi #58 hot and trends computed for live sync

See merge request !89
parents 17bb5c24 86b544d8
No related branches found
No related tags found
4 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!89Mi #58 hot and trends computed for live sync
......@@ -197,6 +197,7 @@ def build_metadata():
sa.Column('vote_percent', sa.Integer, server_default='0'),
sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('num_changes', sa.Integer, server_default='0'),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'),
......@@ -204,13 +205,15 @@ def build_metadata():
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']),
sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']),
sa.Index('hive_votes_post_id_idx', 'post_id'),
sa.Index('hive_votes_voter_id_idx', 'voter_id'),
sa.Index('hive_votes_author_id_idx', 'author_id'),
sa.Index('hive_votes_permlink_id_idx', 'permlink_id'),
sa.Index('hive_votes_upvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent > 0")),
sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0"))
sa.Index('hive_votes_downvote_idx', 'vote_percent', postgresql_where=sql_text("vote_percent < 0")),
sa.Index('hive_votes_block_num_idx', 'block_num')
)
sa.Table(
......@@ -960,7 +963,7 @@ def setup(db):
hp.counter_deleted = 0 AND
-- ABW: wrong! fat node required _start_post_author+_start_port_permlink to exist (when given) and sorted by ( _parent_author, updated_at, comment_id )
hp.parent_author > _parent_author COLLATE "C" OR
hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND
hp.parent_author = _parent_author AND hp.updated_at >= _updated_at AND
hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author >= _start_post_author AND hp1.permlink >= _start_post_permlink ORDER BY id LIMIT 1)
ORDER BY
hp.parent_author ASC,
......@@ -1003,7 +1006,7 @@ def setup(db):
-- ABW: wrong! fat node required _start_post_author+_start_post_permlink to exist (when given) and sorted just like
-- in case of by_last_update (but in fat node) but should by ( _author, updated_at, comment_id )
hp.author > _author COLLATE "C" OR
hp.author = _author AND hp.updated_at >= _updated_at AND
hp.author = _author AND hp.updated_at >= _updated_at AND
hp.id >= (SELECT id FROM hive_posts_view hp1 WHERE hp1.author > _start_post_author COLLATE "C" OR hp1.author = _start_post_author AND hp1.permlink >= _start_post_permlink COLLATE "C" ORDER BY id LIMIT 1)
ORDER BY
hp.parent_author ASC,
......
......@@ -55,10 +55,10 @@ class Blocks:
time_start = perf_counter()
#assert is_trx_active(), "Block.process must be in a trx"
ret = cls._process(block, vops_in_block, hived, is_initial_sync=False)
cls._flush_blocks()
PostDataCache.flush()
Tags.flush()
Votes.flush()
cls._flush_blocks()
Posts.flush()
time_end = perf_counter()
log.info("[PROCESS BLOCK] %fs", time_end - time_start)
......@@ -89,10 +89,10 @@ class Blocks:
return FSM.start()
log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
......@@ -102,7 +102,7 @@ class Blocks:
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
@staticmethod
def prepare_vops(comment_payout_ops, vopsList, date):
def prepare_vops(comment_payout_ops, vopsList, date, block_num):
vote_ops = {}
inefficient_deleted_ops = {}
......@@ -115,6 +115,7 @@ class Blocks:
op_type = vop['type']
op_value = vop['value']
op_value['block_num'] = block_num
key = "{}/{}".format(op_value['author'], op_value['permlink'])
if op_type == 'author_reward_operation':
......@@ -175,10 +176,10 @@ class Blocks:
if is_initial_sync:
if num in virtual_operations:
(vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date)
(vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num)
else:
vops = hived.get_virtual_operations(num)
(vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date)
(vote_ops, inefficient_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num)
json_ops = []
for tx_idx, tx in enumerate(block['transactions']):
......@@ -187,6 +188,9 @@ class Blocks:
op_type = operation['type']
op = operation['value']
assert 'block_num' not in op
op['block_num'] = num
account_name = None
# account ops
if op_type == 'pow_operation':
......@@ -307,9 +311,9 @@ class Blocks:
@classmethod
def _flush_blocks(cls):
query = """
INSERT INTO
hive_blocks (num, hash, prev, txs, ops, created_at)
VALUES
INSERT INTO
hive_blocks (num, hash, prev, txs, ops, created_at)
VALUES
"""
values = []
for block in cls.blocks_to_flush:
......
......@@ -2,7 +2,9 @@
import logging
from hive.db.db_state import DbState
from hive.db.adapter import Db
from hive.utils.trends import update_hot_and_tranding_for_block_range
log = logging.getLogger(__name__)
DB = Db.instance()
......@@ -85,6 +87,7 @@ class Votes:
author = vote_operation['author']
permlink = vote_operation['permlink']
weight = vote_operation['weight']
block_num = vote_operation['block_num']
if cls.inside_flush:
log.exception("Adding new vote-info into '_votes_data' dict")
......@@ -95,6 +98,7 @@ class Votes:
if key in cls._votes_data:
cls._votes_data[key]["vote_percent"] = weight
cls._votes_data[key]["last_update"] = date
cls._votes_data[key]["block_num"] = block_num
else:
cls._votes_data[key] = dict(voter=voter,
author=author,
......@@ -103,7 +107,8 @@ class Votes:
weight=0,
rshares=0,
last_update=date,
is_effective=False)
is_effective=False,
block_num=block_num)
@classmethod
def effective_comment_vote_op(cls, key, vop):
......@@ -118,6 +123,7 @@ class Votes:
cls._votes_data[key]["weight"] = vop["weight"]
cls._votes_data[key]["rshares"] = vop["rshares"]
cls._votes_data[key]["is_effective"] = True
cls._votes_data[key]["block_num"] = vop['block_num']
@classmethod
def flush(cls):
......@@ -127,14 +133,14 @@ class Votes:
if cls._votes_data:
sql = """
INSERT INTO hive_votes
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update)
SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num)
SELECT hp.id as post_id, ha_v.id as voter_id, ha_a.id as author_id, hpd_p.id as permlink_id, t.weight, t.rshares, t.vote_percent, t.last_update, t.block_num
FROM
(
VALUES
-- voter, author, permlink, weight, rshares, vote_percent, last_update
-- voter, author, permlink, weight, rshares, vote_percent, last_update, block_num
{}
) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update)
) AS T(voter, author, permlink, weight, rshares, vote_percent, last_update, block_num)
INNER JOIN hive_accounts ha_v ON ha_v.name = t.voter
INNER JOIN hive_accounts ha_a ON ha_a.name = t.author
INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink
......@@ -155,10 +161,14 @@ class Votes:
values_skip = []
values_override = []
values_limit = 1000
first_block = 0
last_block = 0
for _, vd in cls._votes_data.items():
values = None
on_conflict_data_source = None
first_block = min( first_block, vd['block_num'] )
last_block = max( last_block, vd['block_num'] )
if vd['is_effective']:
values = values_override
......@@ -167,8 +177,8 @@ class Votes:
values = values_skip
on_conflict_data_source = 'hive_votes'
values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp)".format(
vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update']))
values.append("('{}', '{}', '{}', {}, {}, {}, '{}'::timestamp, {})".format(
vd['voter'], vd['author'], vd['permlink'], vd['weight'], vd['rshares'], vd['vote_percent'], vd['last_update'], vd['block_num']))
if len(values) >= values_limit:
values_str = ','.join(values)
......@@ -188,6 +198,8 @@ class Votes:
values_override.clear()
n = len(cls._votes_data)
if not DbState.is_initial_sync():
update_hot_and_tranding_for_block_range( first_block, last_block )
cls._votes_data.clear()
cls.inside_flush = False
return n
......
import math
import decimal
from hive.db.adapter import Db
DB = Db.instance()
def update_all_hot_and_tranding():
"""Calculate and set hot and trending values of all posts"""
sql = """
update_hot_and_tranding_for_block_range()
NO_CONSTRAINT = -1
def update_hot_and_tranding_for_block_range( first_block = NO_CONSTRAINT, last_block = NO_CONSTRAINT):
"""Calculate and set hot and trending values of all posts"""
hot_and_trend_sql = """
UPDATE hive_posts ihp
set sc_hot = calculate_hot(ds.rshares_sum, ihp.created_at),
sc_trend = calculate_tranding(ds.rshares_sum, ihp.created_at)
......@@ -15,9 +22,19 @@ def update_all_hot_and_tranding():
(
SELECT hv.post_id as id, CAST(sum(hv.rshares) AS BIGINT) as rshares_sum
FROM hive_votes hv
group by hv.post_id
{}
GROUP BY hv.post_id
) as ds
WHERE ihp.id = ds.id
"""
DB.query_no_return(sql)
sql = ""
if first_block == NO_CONSTRAINT and last_block == NO_CONSTRAINT:
sql = hot_and_trend_sql.format( "" )
elif last_block == NO_CONSTRAINT:
sql = hot_and_trend_sql.format( "WHERE block_num >= {}".format( first_block ) )
elif first_block == NO_CONSTRAINT:
sql = hot_and_trend_sql.format( "WHERE block_num <= {}".format( last_block ) )
else:
sql = hot_and_trend_sql.format( "WHERE block_num >= {} AND block_num <= {}".format( first_block, last_block ) )
DB.query_no_return(sql)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment