Skip to content
Snippets Groups Projects
Commit 1e627fa0 authored by Krzysztof Leśniak's avatar Krzysztof Leśniak
Browse files

Fill reblog notifications from the indexer

parent 7ea188d2
No related branches found
No related tags found
No related merge requests found
...@@ -96,19 +96,6 @@ CREATE OR REPLACE VIEW hivemind_app.hive_raw_notifications_as_view ...@@ -96,19 +96,6 @@ CREATE OR REPLACE VIEW hivemind_app.hive_raw_notifications_as_view
notifs.payload, notifs.payload,
harv.score harv.score
FROM ( FROM (
SELECT hr.block_num,
hp.id AS post_id,
14 AS type_id,
hr.created_at,
hr.blogger_id AS src,
hp.author_id AS dst,
hr.post_id as dst_post_id,
''::character varying(16) AS community,
''::character varying AS community_title,
''::character varying AS payload
FROM hivemind_app.hive_reblogs hr -- reblogs
JOIN hivemind_app.hive_posts hp ON hr.post_id = hp.id
UNION ALL
SELECT hs.block_num, SELECT hs.block_num,
0 AS post_id, 0 AS post_id,
11 AS type_id, 11 AS type_id,
......
...@@ -7,6 +7,7 @@ from hive.db.adapter import Db ...@@ -7,6 +7,7 @@ from hive.db.adapter import Db
from hive.indexer.accounts import Accounts from hive.indexer.accounts import Accounts
from hive.indexer.db_adapter_holder import DbAdapterHolder from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import escape_characters from hive.utils.normalize import escape_characters
from hive.utils.misc import chunks
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -14,6 +15,8 @@ class Reblog(DbAdapterHolder): ...@@ -14,6 +15,8 @@ class Reblog(DbAdapterHolder):
"""Class for reblog operations""" """Class for reblog operations"""
reblog_items_to_flush = {} reblog_items_to_flush = {}
reblog_notifications_to_flush = []
_notification_first_block = None
@classmethod @classmethod
def _validated_op(cls, actor, op, block_date, block_num): def _validated_op(cls, actor, op, block_date, block_num):
...@@ -54,6 +57,13 @@ class Reblog(DbAdapterHolder): ...@@ -54,6 +57,13 @@ class Reblog(DbAdapterHolder):
cls.delete(op['author'], op['permlink'], op['account']) cls.delete(op['author'], op['permlink'], op['account'])
else: else:
cls.reblog_items_to_flush[key] = {'op': op} cls.reblog_items_to_flush[key] = {'op': op}
cls.reblog_notifications_to_flush.append({
"block_num": block_num,
"created_at": block_date,
"src": op['account'],
"dst": op['author'],
"permlink": op['permlink'],
})
@classmethod @classmethod
def delete(cls, author, permlink, account): def delete(cls, author, permlink, account):
...@@ -66,6 +76,10 @@ class Reblog(DbAdapterHolder): ...@@ -66,6 +76,10 @@ class Reblog(DbAdapterHolder):
@classmethod @classmethod
def flush(cls): def flush(cls):
return cls.flush_reblogs() + cls.flush_notifications()
@classmethod
def flush_reblogs(cls):
"""Flush collected data to database""" """Flush collected data to database"""
sql_prefix = f""" sql_prefix = f"""
INSERT INTO {SCHEMA_NAME}.hive_reblogs (blogger_id, post_id, created_at, block_num) INSERT INTO {SCHEMA_NAME}.hive_reblogs (blogger_id, post_id, created_at, block_num)
...@@ -119,3 +133,60 @@ class Reblog(DbAdapterHolder): ...@@ -119,3 +133,60 @@ class Reblog(DbAdapterHolder):
cls.reblog_items_to_flush.clear() cls.reblog_items_to_flush.clear()
return item_count return item_count
@classmethod
def flush_notifications(cls):
if cls._notification_first_block is None:
cls._notification_first_block = cls.db.query_row("select hivemind_app.block_before_irreversible( '90 days' ) AS num")['num']
n = len(cls.reblog_notifications_to_flush)
max_block_num = max(n['block_num'] for n in cls.reblog_notifications_to_flush or [{"block_num": 0}])
if n > 0 and max_block_num > cls._notification_first_block:
# With clause is inlined, modified call to reptracker_endpoints.get_account_reputation.
# Reputation is multiplied by 7.5 rather than 9 to bring the max value to 100 rather than 115.
# In case of reputation being 0, the score is set to 25 rather than 0.
sql = f"""
WITH log_account_rep AS
(
SELECT
account_id,
LOG(10, ABS(nullif(reputation, 0))) AS rep,
(CASE WHEN reputation < 0 THEN -1 ELSE 1 END) AS is_neg
FROM reptracker_app.account_reputations
),
calculate_rep AS
(
SELECT
account_id,
GREATEST(lar.rep - 9, 0) * lar.is_neg AS rep
FROM log_account_rep lar
),
final_rep AS
(
SELECT account_id, (cr.rep * 7.5 + 25)::INT AS rep FROM calculate_rep AS cr
)
INSERT INTO {SCHEMA_NAME}.hive_notification_cache
(block_num, type_id, created_at, src, dst, dst_post_id, post_id, score, payload, community, community_title)
SELECT n.block_num, 14, n.created_at, r.id, g.id, pp.parent_id, p.id, COALESCE(rep.rep, 25), '', '', ''
FROM
(VALUES {{}})
AS n(block_num, created_at, src, dst, permlink)
JOIN {SCHEMA_NAME}.hive_accounts AS r ON n.src = r.name
JOIN {SCHEMA_NAME}.hive_accounts AS g ON n.dst = g.name
JOIN {SCHEMA_NAME}.hive_permlink_data AS p ON n.permlink = p.permlink
JOIN {SCHEMA_NAME}.hive_posts AS pp ON pp.id = p.id
LEFT JOIN final_rep AS rep ON r.haf_id = rep.account_id
WHERE n.block_num > hivemind_app.block_before_irreversible( '90 days' )
AND COALESCE(rep.rep, 25) > 0
AND n.src IS DISTINCT FROM n.dst
ORDER BY n.block_num, n.created_at, r.id, g.id, pp.parent_id, p.id
"""
for chunk in chunks(cls.reblog_notifications_to_flush, 1000):
cls.beginTx()
values_str = ','.join(f"({n['block_num']}, {escape_characters(n['created_at'])}::timestamp, {escape_characters(n['src'])}, {escape_characters(n['dst'])}, {escape_characters(n['permlink'])})" for n in chunk)
cls.db.query_prepared(sql.format(values_str))
cls.commitTx()
else:
n = 0
cls.reblog_notifications_to_flush.clear()
return n
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment