From 1d1c6898b7ebb7ea530048d9255c8da3594eb49f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Le=C5=9Bniak?= <klesniak@syncad.com> Date: Mon, 24 Mar 2025 12:49:17 +0100 Subject: [PATCH] Fill subscriptions notifications from the indexer --- hive/db/sql_scripts/notifications_view.sql | 13 ------ hive/indexer/community.py | 47 ++++++++++++++++++++++ 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/hive/db/sql_scripts/notifications_view.sql b/hive/db/sql_scripts/notifications_view.sql index de75c10e2..7a11e6223 100644 --- a/hive/db/sql_scripts/notifications_view.sql +++ b/hive/db/sql_scripts/notifications_view.sql @@ -94,19 +94,6 @@ CREATE OR REPLACE VIEW hivemind_app.hive_raw_notifications_as_view notifs.payload, harv.score FROM ( - SELECT hs.block_num, - 0 AS post_id, - 11 AS type_id, - hs.created_at, - hs.account_id AS src, - hs.community_id AS dst, - 0 as dst_post_id, - hc.name AS community, - hc.title AS community_title, - ''::character varying AS payload - FROM hivemind_app.hive_subscriptions hs -- subscriptions - JOIN hivemind_app.hive_communities hc ON hs.community_id = hc.id -UNION ALL SELECT hm.block_num, hm.post_id, 16 AS type_id, diff --git a/hive/indexer/community.py b/hive/indexer/community.py index b375a0c0c..266bcb1f1 100644 --- a/hive/indexer/community.py +++ b/hive/indexer/community.py @@ -5,6 +5,7 @@ from enum import IntEnum import logging import re +from time import perf_counter import ujson as json @@ -12,6 +13,7 @@ from hive.conf import SCHEMA_NAME from hive.indexer.db_adapter_holder import DbAdapterHolder from hive.indexer.accounts import Accounts from hive.indexer.notify import Notify +from hive.utils.stats import FlushStatusManager as FSM log = logging.getLogger(__name__) @@ -256,6 +258,8 @@ class Community: class CommunityOp: """Handles validating and processing of community custom_json ops.""" + _notification_first_block = None + # pylint: disable=too-many-instance-attributes SCHEMA = { @@ -337,6 +341,8 @@ class CommunityOp: """Applies a validated operation.""" assert self.valid, 'cannot apply invalid op' + time_start = perf_counter() + action = self.action params = dict( date=self.date, @@ -375,6 +381,46 @@ class CommunityOp: WHERE id = :community_id""", **params, ) + if CommunityOp._notification_first_block is None: + CommunityOp._notification_first_block = DbAdapterHolder.common_block_processing_db().query_row("select hivemind_app.block_before_irreversible( '90 days' ) AS num")['num'] + if self.block_num > CommunityOp._notification_first_block: + # With clause is inlined, modified call to reptracker_endpoints.get_account_reputation. + # Reputation is multiplied by 7.5 rather than 9 to bring the max value to 100 rather than 115. + # In case of reputation being 0, the score is set to 25 rather than 0. + sql = f""" + WITH log_account_rep AS + ( + SELECT + account_id, + LOG(10, ABS(nullif(reputation, 0))) AS rep, + (CASE WHEN reputation < 0 THEN -1 ELSE 1 END) AS is_neg + FROM reptracker_app.account_reputations + ), + calculate_rep AS + ( + SELECT + account_id, + GREATEST(lar.rep - 9, 0) * lar.is_neg AS rep + FROM log_account_rep lar + ), + final_rep AS + ( + SELECT account_id, (cr.rep * 7.5 + 25)::INT AS rep FROM calculate_rep AS cr + ) + INSERT INTO {SCHEMA_NAME}.hive_notification_cache + (block_num, type_id, created_at, src, dst, dst_post_id, post_id, score, payload, community, community_title) + SELECT n.block_num, 11, n.created_at, r.id, hc.id, 0, 0, COALESCE(rep.rep, 25), '', hc.name, hc.title + FROM + (VALUES (:block_num, (:date)::timestamp, :actor_id, :community_id)) AS n(block_num, created_at, src, dst) + JOIN {SCHEMA_NAME}.hive_accounts AS r ON n.src = r.id + JOIN {SCHEMA_NAME}.hive_communities AS hc ON n.dst = hc.id + LEFT JOIN final_rep AS rep ON r.haf_id = rep.account_id + WHERE n.block_num > hivemind_app.block_before_irreversible( '90 days' ) + AND COALESCE(rep.rep, 25) > 0 + AND n.src IS DISTINCT FROM n.dst + ORDER BY n.block_num, n.created_at, r.id, hc.id + """ + DbAdapterHolder.common_block_processing_db().query_no_return(sql, **params) elif action == 'unsubscribe': DbAdapterHolder.common_block_processing_db().query( f"""DELETE FROM {SCHEMA_NAME}.hive_subscriptions @@ -445,6 +491,7 @@ class CommunityOp: elif action == 'flagPost': self._notify('flag_post', payload=self.notes) + FSM.flush_stat('Community', perf_counter() - time_start, 1) return True def _notify(self, op, **kwargs): -- GitLab