Skip to content
Snippets Groups Projects
Commit 1d1c6898 authored by Krzysztof Leśniak's avatar Krzysztof Leśniak Committed by Dan Notestein
Browse files

Fill subscriptions notifications from the indexer

parent 9e86cfcf
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !863. Comments created here will be created in the context of that merge request.
...@@ -94,19 +94,6 @@ CREATE OR REPLACE VIEW hivemind_app.hive_raw_notifications_as_view ...@@ -94,19 +94,6 @@ CREATE OR REPLACE VIEW hivemind_app.hive_raw_notifications_as_view
notifs.payload, notifs.payload,
harv.score harv.score
FROM ( FROM (
SELECT hs.block_num,
0 AS post_id,
11 AS type_id,
hs.created_at,
hs.account_id AS src,
hs.community_id AS dst,
0 as dst_post_id,
hc.name AS community,
hc.title AS community_title,
''::character varying AS payload
FROM hivemind_app.hive_subscriptions hs -- subscriptions
JOIN hivemind_app.hive_communities hc ON hs.community_id = hc.id
UNION ALL
SELECT hm.block_num, SELECT hm.block_num,
hm.post_id, hm.post_id,
16 AS type_id, 16 AS type_id,
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
from enum import IntEnum from enum import IntEnum
import logging import logging
import re import re
from time import perf_counter
import ujson as json import ujson as json
...@@ -12,6 +13,7 @@ from hive.conf import SCHEMA_NAME ...@@ -12,6 +13,7 @@ from hive.conf import SCHEMA_NAME
from hive.indexer.db_adapter_holder import DbAdapterHolder from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.indexer.accounts import Accounts from hive.indexer.accounts import Accounts
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
from hive.utils.stats import FlushStatusManager as FSM
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -256,6 +258,8 @@ class Community: ...@@ -256,6 +258,8 @@ class Community:
class CommunityOp: class CommunityOp:
"""Handles validating and processing of community custom_json ops.""" """Handles validating and processing of community custom_json ops."""
_notification_first_block = None
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
SCHEMA = { SCHEMA = {
...@@ -337,6 +341,8 @@ class CommunityOp: ...@@ -337,6 +341,8 @@ class CommunityOp:
"""Applies a validated operation.""" """Applies a validated operation."""
assert self.valid, 'cannot apply invalid op' assert self.valid, 'cannot apply invalid op'
time_start = perf_counter()
action = self.action action = self.action
params = dict( params = dict(
date=self.date, date=self.date,
...@@ -375,6 +381,46 @@ class CommunityOp: ...@@ -375,6 +381,46 @@ class CommunityOp:
WHERE id = :community_id""", WHERE id = :community_id""",
**params, **params,
) )
if CommunityOp._notification_first_block is None:
CommunityOp._notification_first_block = DbAdapterHolder.common_block_processing_db().query_row("select hivemind_app.block_before_irreversible( '90 days' ) AS num")['num']
if self.block_num > CommunityOp._notification_first_block:
# With clause is inlined, modified call to reptracker_endpoints.get_account_reputation.
# Reputation is multiplied by 7.5 rather than 9 to bring the max value to 100 rather than 115.
# In case of reputation being 0, the score is set to 25 rather than 0.
sql = f"""
WITH log_account_rep AS
(
SELECT
account_id,
LOG(10, ABS(nullif(reputation, 0))) AS rep,
(CASE WHEN reputation < 0 THEN -1 ELSE 1 END) AS is_neg
FROM reptracker_app.account_reputations
),
calculate_rep AS
(
SELECT
account_id,
GREATEST(lar.rep - 9, 0) * lar.is_neg AS rep
FROM log_account_rep lar
),
final_rep AS
(
SELECT account_id, (cr.rep * 7.5 + 25)::INT AS rep FROM calculate_rep AS cr
)
INSERT INTO {SCHEMA_NAME}.hive_notification_cache
(block_num, type_id, created_at, src, dst, dst_post_id, post_id, score, payload, community, community_title)
SELECT n.block_num, 11, n.created_at, r.id, hc.id, 0, 0, COALESCE(rep.rep, 25), '', hc.name, hc.title
FROM
(VALUES (:block_num, (:date)::timestamp, :actor_id, :community_id)) AS n(block_num, created_at, src, dst)
JOIN {SCHEMA_NAME}.hive_accounts AS r ON n.src = r.id
JOIN {SCHEMA_NAME}.hive_communities AS hc ON n.dst = hc.id
LEFT JOIN final_rep AS rep ON r.haf_id = rep.account_id
WHERE n.block_num > hivemind_app.block_before_irreversible( '90 days' )
AND COALESCE(rep.rep, 25) > 0
AND n.src IS DISTINCT FROM n.dst
ORDER BY n.block_num, n.created_at, r.id, hc.id
"""
DbAdapterHolder.common_block_processing_db().query_no_return(sql, **params)
elif action == 'unsubscribe': elif action == 'unsubscribe':
DbAdapterHolder.common_block_processing_db().query( DbAdapterHolder.common_block_processing_db().query(
f"""DELETE FROM {SCHEMA_NAME}.hive_subscriptions f"""DELETE FROM {SCHEMA_NAME}.hive_subscriptions
...@@ -445,6 +491,7 @@ class CommunityOp: ...@@ -445,6 +491,7 @@ class CommunityOp:
elif action == 'flagPost': elif action == 'flagPost':
self._notify('flag_post', payload=self.notes) self._notify('flag_post', payload=self.notes)
FSM.flush_stat('Community', perf_counter() - time_start, 1)
return True return True
def _notify(self, op, **kwargs): def _notify(self, op, **kwargs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment