Skip to content
Snippets Groups Projects
Commit c99b2bd8 authored by Marcin's avatar Marcin
Browse files

generate notifications with SQL query

Notifications about:
votes, reblogs, subscribe, replies, new communitiy
are dynamically generated

Blocks number are added for some tables.
parent e3cb8b90
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !123. Comments created here will be created in the context of that merge request.
...@@ -115,6 +115,9 @@ class DbState: ...@@ -115,6 +115,9 @@ class DbState:
'hive_votes_voter_id_idx', 'hive_votes_voter_id_idx',
'hive_votes_block_num_idx', 'hive_votes_block_num_idx',
'hive_subscriptions_block_num_idx',
'hive_communities_block_num_idx',
#'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0) #'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0)
#'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0) #'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0)
#'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0) #'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0)
...@@ -308,7 +311,7 @@ class DbState: ...@@ -308,7 +311,7 @@ class DbState:
from hive.indexer.accounts import Accounts from hive.indexer.accounts import Accounts
names = SteemClient().get_all_account_names() names = SteemClient().get_all_account_names()
Accounts.load_ids() Accounts.load_ids()
Accounts.register(names, '1970-01-01T00:00:00') Accounts.register(names, '1970-01-01T00:00:00', 0)
Accounts.clear_ids() Accounts.clear_ids()
cls._set_ver(6) cls._set_ver(6)
......
...@@ -59,6 +59,7 @@ def build_metadata(): ...@@ -59,6 +59,7 @@ def build_metadata():
sa.UniqueConstraint('name', name='hive_accounts_ux1'), sa.UniqueConstraint('name', name='hive_accounts_ux1'),
sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep
sa.Index('hive_accounts_ix6', 'reputation'),
) )
sa.Table( sa.Table(
...@@ -169,6 +170,7 @@ def build_metadata(): ...@@ -169,6 +170,7 @@ def build_metadata():
sa.Table( sa.Table(
'hive_votes', metadata, 'hive_votes', metadata,
sa.Column('id', sa.BigInteger, primary_key=True),
sa.Column('post_id', sa.Integer, nullable=False), sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('voter_id', sa.Integer, nullable=False), sa.Column('voter_id', sa.Integer, nullable=False),
sa.Column('author_id', sa.Integer, nullable=False), sa.Column('author_id', sa.Integer, nullable=False),
...@@ -181,7 +183,7 @@ def build_metadata(): ...@@ -181,7 +183,7 @@ def build_metadata():
sa.Column('block_num', sa.Integer, nullable=False ), sa.Column('block_num', sa.Integer, nullable=False ),
sa.Column('is_effective', BOOLEAN, nullable=False, server_default='0'), sa.Column('is_effective', BOOLEAN, nullable=False, server_default='0'),
sa.PrimaryKeyConstraint('author_id', 'permlink_id', 'voter_id', name='hive_votes_pk'), sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
...@@ -214,6 +216,7 @@ def build_metadata(): ...@@ -214,6 +216,7 @@ def build_metadata():
sa.Table( sa.Table(
'hive_follows', metadata, 'hive_follows', metadata,
sa.Column('id', sa.Integer, primary_key=True ),
sa.Column('follower', sa.Integer, nullable=False), sa.Column('follower', sa.Integer, nullable=False),
sa.Column('following', sa.Integer, nullable=False), sa.Column('following', sa.Integer, nullable=False),
sa.Column('state', SMALLINT, nullable=False, server_default='1'), sa.Column('state', SMALLINT, nullable=False, server_default='1'),
...@@ -222,7 +225,7 @@ def build_metadata(): ...@@ -222,7 +225,7 @@ def build_metadata():
sa.Column('follow_blacklists', sa.Boolean, nullable=False, server_default='0'), sa.Column('follow_blacklists', sa.Boolean, nullable=False, server_default='0'),
sa.Column('block_num', sa.Integer, nullable=False ), sa.Column('block_num', sa.Integer, nullable=False ),
sa.PrimaryKeyConstraint('following', 'follower', name='hive_follows_pk'), # core sa.UniqueConstraint('following', 'follower', name='hive_follows_ux1'), # core
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_follows_fk1'), sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_follows_fk1'),
sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'), sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'),
sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'), sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'),
...@@ -231,6 +234,7 @@ def build_metadata(): ...@@ -231,6 +234,7 @@ def build_metadata():
sa.Table( sa.Table(
'hive_reblogs', metadata, 'hive_reblogs', metadata,
sa.Column('id', sa.Integer, primary_key=True ),
sa.Column('account', VARCHAR(16), nullable=False), sa.Column('account', VARCHAR(16), nullable=False),
sa.Column('post_id', sa.Integer, nullable=False), sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False), sa.Column('created_at', sa.DateTime, nullable=False),
...@@ -239,7 +243,7 @@ def build_metadata(): ...@@ -239,7 +243,7 @@ def build_metadata():
sa.ForeignKeyConstraint(['account'], ['hive_accounts.name'], name='hive_reblogs_fk1'), sa.ForeignKeyConstraint(['account'], ['hive_accounts.name'], name='hive_reblogs_fk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_reblogs_fk2'), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_reblogs_fk2'),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_reblogs_fk3'), sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_reblogs_fk3'),
sa.PrimaryKeyConstraint('account', 'post_id', name='hive_reblogs_pk'), # core sa.UniqueConstraint('account', 'post_id', name='hive_reblogs_ux1'), # core
sa.Index('hive_reblogs_account', 'account'), sa.Index('hive_reblogs_account', 'account'),
sa.Index('hive_reblogs_post_id', 'post_id'), sa.Index('hive_reblogs_post_id', 'post_id'),
sa.Index('hive_reblogs_block_num_idx', 'block_num') sa.Index('hive_reblogs_block_num_idx', 'block_num')
...@@ -322,9 +326,11 @@ def build_metadata_community(metadata=None): ...@@ -322,9 +326,11 @@ def build_metadata_community(metadata=None):
sa.Column('description', sa.String(5000), nullable=False, server_default=''), sa.Column('description', sa.String(5000), nullable=False, server_default=''),
sa.Column('flag_text', sa.String(5000), nullable=False, server_default=''), sa.Column('flag_text', sa.String(5000), nullable=False, server_default=''),
sa.Column('settings', TEXT, nullable=False, server_default='{}'), sa.Column('settings', TEXT, nullable=False, server_default='{}'),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.UniqueConstraint('name', name='hive_communities_ux1'), sa.UniqueConstraint('name', name='hive_communities_ux1'),
sa.Index('hive_communities_ix1', 'rank', 'id') sa.Index('hive_communities_ix1', 'rank', 'id'),
sa.Index('hive_communities_block_num_idx', 'block_num')
) )
sa.Table( sa.Table(
...@@ -341,12 +347,15 @@ def build_metadata_community(metadata=None): ...@@ -341,12 +347,15 @@ def build_metadata_community(metadata=None):
sa.Table( sa.Table(
'hive_subscriptions', metadata, 'hive_subscriptions', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('account_id', sa.Integer, nullable=False), sa.Column('account_id', sa.Integer, nullable=False),
sa.Column('community_id', sa.Integer, nullable=False), sa.Column('community_id', sa.Integer, nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False), sa.Column('created_at', sa.DateTime, nullable=False),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.UniqueConstraint('account_id', 'community_id', name='hive_subscriptions_ux1'), sa.UniqueConstraint('account_id', 'community_id', name='hive_subscriptions_ux1'),
sa.Index('hive_subscriptions_ix1', 'community_id', 'account_id', 'created_at'), sa.Index('hive_subscriptions_ix1', 'community_id', 'account_id', 'created_at'),
sa.Index('hive_subscriptions_block_num_idx', 'block_num')
) )
sa.Table( sa.Table(
...@@ -694,6 +703,7 @@ def setup(db): ...@@ -694,6 +703,7 @@ def setup(db):
hp.is_grayed, hp.is_grayed,
hp.total_vote_weight, hp.total_vote_weight,
ha_pp.name AS parent_author, ha_pp.name AS parent_author,
ha_pp.id AS parent_author_id,
( CASE hp.depth > 0 ( CASE hp.depth > 0
WHEN True THEN hpd_pp.permlink WHEN True THEN hpd_pp.permlink
ELSE hcd.category ELSE hcd.category
...@@ -841,7 +851,7 @@ def setup(db): ...@@ -841,7 +851,7 @@ def setup(db):
LANGUAGE 'plpgsql' LANGUAGE 'plpgsql'
AS AS
$function$ $function$
DECLARE DECLARE
post_id INT; post_id INT;
BEGIN BEGIN
SELECT INTO post_id COALESCE( (SELECT hp.id SELECT INTO post_id COALESCE( (SELECT hp.id
...@@ -1012,7 +1022,7 @@ def setup(db): ...@@ -1012,7 +1022,7 @@ def setup(db):
hp.allow_curation_rewards, hp.beneficiaries, hp.url, hp.root_title, hp.abs_rshares, hp.allow_curation_rewards, hp.beneficiaries, hp.url, hp.root_title, hp.abs_rshares,
hp.active, hp.author_rewards hp.active, hp.author_rewards
FROM hive_posts_view hp FROM hive_posts_view hp
INNER JOIN INNER JOIN
( (
SELECT hp2.id, hp2.root_id FROM hive_posts hp2 SELECT hp2.id, hp2.root_id FROM hive_posts hp2
WHERE NOT hp2.is_muted WHERE NOT hp2.is_muted
...@@ -1037,7 +1047,7 @@ def setup(db): ...@@ -1037,7 +1047,7 @@ def setup(db):
in _start_post_author hive_accounts.name%TYPE, in _start_post_author hive_accounts.name%TYPE,
in _start_post_permlink hive_permlink_data.permlink%TYPE, in _start_post_permlink hive_permlink_data.permlink%TYPE,
in _limit INT) in _limit INT)
RETURNS SETOF database_api_post RETURNS SETOF database_api_post
LANGUAGE sql LANGUAGE sql
COST 100 COST 100
STABLE STABLE
...@@ -1164,6 +1174,43 @@ def setup(db): ...@@ -1164,6 +1174,43 @@ def setup(db):
LANGUAGE plpgsql LANGUAGE plpgsql
; ;
""" """
db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS score_for_account(in _account_id hive_accounts.id%TYPE)
;
CREATE OR REPLACE FUNCTION score_for_account(in _account_id hive_accounts.id%TYPE)
RETURNS SMALLINT
AS
$function$
DECLARE
score SMALLINT;
BEGIN
SELECT INTO score
CASE
WHEN rank.position < 200 THEN 70
WHEN rank.position < 1000 THEN 60
WHEN rank.position < 6500 THEN 50
WHEN rank.position < 25000 THEN 40
WHEN rank.position < 100000 THEN 30
ELSE 20
END as score
FROM (
SELECT
(
SELECT COUNT(*)
FROM hive_accounts ha_for_rank2
WHERE ha_for_rank2.reputation > ha_for_rank.reputation
) as position
FROM hive_accounts ha_for_rank WHERE ha_for_rank.id = _account_id
) as rank;
return score;
END
$function$
LANGUAGE plpgsql
;
"""
db.query_no_return(sql) db.query_no_return(sql)
# hot and tranding functions # hot and tranding functions
...@@ -1372,6 +1419,41 @@ def setup(db): ...@@ -1372,6 +1419,41 @@ def setup(db):
""" """
db.query_no_return(sql) db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS public.calculate_notify_vote_score(_payout hive_posts.payout%TYPE, _abs_rshares hive_posts_view.abs_rshares%TYPE, _rshares hive_votes.rshares%TYPE) CASCADE
;
CREATE OR REPLACE FUNCTION public.calculate_notify_vote_score(_payout hive_posts.payout%TYPE, _abs_rshares hive_posts_view.abs_rshares%TYPE, _rshares hive_votes.rshares%TYPE)
RETURNS INT
LANGUAGE 'sql'
IMMUTABLE
AS $BODY$
SELECT CASE
WHEN ((( _payout )/_abs_rshares) * 1000 * _rshares < 20 ) THEN -1
ELSE LEAST(100, (LENGTH(CAST( ( (( _payout )/_abs_rshares) * 1000 * _rshares ) as text)) - 1) * 25)
END;
$BODY$;
"""
db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS notification_id(in _block_number INTEGER, in _notifyType INTEGER, in _id INTEGER)
;
CREATE OR REPLACE FUNCTION notification_id(in _block_number INTEGER, in _notifyType INTEGER, in _id INTEGER)
RETURNS BIGINT
AS
$function$
BEGIN
RETURN CAST( _block_number as BIGINT ) << 32
| ( _notifyType << 16 )
| ( _id & CAST( x'00FF' as INTEGER) );
END
$function$
LANGUAGE plpgsql IMMUTABLE
;
"""
db.query_no_return(sql)
def reset_autovac(db): def reset_autovac(db):
"""Initializes/resets per-table autovacuum/autoanalyze params. """Initializes/resets per-table autovacuum/autoanalyze params.
......
...@@ -70,7 +70,7 @@ class Accounts: ...@@ -70,7 +70,7 @@ class Accounts:
return False return False
@classmethod @classmethod
def register(cls, name, block_date): def register(cls, name, block_date, block_num):
"""Block processing: register "candidate" names. """Block processing: register "candidate" names.
There are four ops which can result in account creation: There are four ops which can result in account creation:
...@@ -95,7 +95,7 @@ class Accounts: ...@@ -95,7 +95,7 @@ class Accounts:
# post-insert: pass to communities to check for new registrations # post-insert: pass to communities to check for new registrations
from hive.indexer.community import Community, START_DATE from hive.indexer.community import Community, START_DATE
if block_date > START_DATE: if block_date > START_DATE:
Community.register(name, block_date) Community.register(name, block_date, block_num)
# account cache methods # account cache methods
# --------------------- # ---------------------
......
...@@ -213,7 +213,7 @@ class Blocks: ...@@ -213,7 +213,7 @@ class Blocks:
elif op_type == 'create_claimed_account_operation': elif op_type == 'create_claimed_account_operation':
account_name = op['new_account_name'] account_name = op['new_account_name']
Accounts.register(account_name, cls._head_block_date) Accounts.register(account_name, cls._head_block_date, num)
# account metadata updates # account metadata updates
if op_type == 'account_update_operation': if op_type == 'account_update_operation':
......
...@@ -57,9 +57,9 @@ def assert_keys_match(keys, expected, allow_missing=True): ...@@ -57,9 +57,9 @@ def assert_keys_match(keys, expected, allow_missing=True):
extra = keys - expected extra = keys - expected
assert not extra, 'extraneous keys: %s' % extra assert not extra, 'extraneous keys: %s' % extra
def process_json_community_op(actor, op_json, date): def process_json_community_op(actor, op_json, date, block_num):
"""Validates community op and apply state changes to db.""" """Validates community op and apply state changes to db."""
CommunityOp.process_if_valid(actor, op_json, date) CommunityOp.process_if_valid(actor, op_json, date, block_num)
def read_key_bool(op, key): def read_key_bool(op, key):
"""Reads a key from dict, ensuring valid bool if present.""" """Reads a key from dict, ensuring valid bool if present."""
...@@ -104,7 +104,7 @@ class Community: ...@@ -104,7 +104,7 @@ class Community:
_names = {} _names = {}
@classmethod @classmethod
def register(cls, name, block_date): def register(cls, name, block_date, block_num):
"""Block processing: hooks into new account registration. """Block processing: hooks into new account registration.
`Accounts` calls this method with any newly registered names. `Accounts` calls this method with any newly registered names.
...@@ -119,8 +119,8 @@ class Community: ...@@ -119,8 +119,8 @@ class Community:
# insert community # insert community
sql = """INSERT INTO hive_communities (id, name, type_id, created_at) sql = """INSERT INTO hive_communities (id, name, type_id, created_at)
VALUES (:id, :name, :type_id, :date)""" VALUES (:id, :name, :type_id, :date, :block_num)"""
DB.query(sql, id=_id, name=name, type_id=type_id, date=block_date) DB.query(sql, id=_id, name=name, type_id=type_id, date=block_date, block_num=block_num)
# insert owner # insert owner
sql = """INSERT INTO hive_roles (community_id, account_id, role_id, created_at) sql = """INSERT INTO hive_roles (community_id, account_id, role_id, created_at)
...@@ -128,9 +128,6 @@ class Community: ...@@ -128,9 +128,6 @@ class Community:
DB.query(sql, community_id=_id, account_id=_id, DB.query(sql, community_id=_id, account_id=_id,
role_id=Role.owner.value, date=block_date) role_id=Role.owner.value, date=block_date)
Notify('new_community', src_id=None, dst_id=_id,
when=block_date, community_id=_id).write()
@classmethod @classmethod
def validated_id(cls, name): def validated_id(cls, name):
"""Verify `name` as a candidate and check for record id.""" """Verify `name` as a candidate and check for record id."""
...@@ -275,9 +272,10 @@ class CommunityOp: ...@@ -275,9 +272,10 @@ class CommunityOp:
'unsubscribe': ['community'], 'unsubscribe': ['community'],
} }
def __init__(self, actor, date): def __init__(self, actor, date, block_num):
"""Inits a community op for validation and processing.""" """Inits a community op for validation and processing."""
self.date = date self.date = date
self.block_num = block_num
self.valid = False self.valid = False
self.action = None self.action = None
self.op = None self.op = None
...@@ -302,9 +300,9 @@ class CommunityOp: ...@@ -302,9 +300,9 @@ class CommunityOp:
self.props = None self.props = None
@classmethod @classmethod
def process_if_valid(cls, actor, op_json, date): def process_if_valid(cls, actor, op_json, date, block_num):
"""Helper to instantiate, validate, process an op.""" """Helper to instantiate, validate, process an op."""
op = CommunityOp(actor, date) op = CommunityOp(actor, date, block_num)
if op.validate(op_json): if op.validate(op_json):
op.process() op.process()
return True return True
...@@ -353,6 +351,7 @@ class CommunityOp: ...@@ -353,6 +351,7 @@ class CommunityOp:
role_id=self.role_id, role_id=self.role_id,
notes=self.notes, notes=self.notes,
title=self.title, title=self.title,
block_num=self.block_num
) )
# Community-level commands # Community-level commands
...@@ -364,12 +363,11 @@ class CommunityOp: ...@@ -364,12 +363,11 @@ class CommunityOp:
elif action == 'subscribe': elif action == 'subscribe':
DB.query("""INSERT INTO hive_subscriptions DB.query("""INSERT INTO hive_subscriptions
(account_id, community_id, created_at) (account_id, community_id, created_at, block_num)
VALUES (:actor_id, :community_id, :date)""", **params) VALUES (:actor_id, :community_id, :date, :block_num)""", **params)
DB.query("""UPDATE hive_communities DB.query("""UPDATE hive_communities
SET subscribers = subscribers + 1 SET subscribers = subscribers + 1
WHERE id = :community_id""", **params) WHERE id = :community_id""", **params)
self._notify('subscribe')
elif action == 'unsubscribe': elif action == 'unsubscribe':
DB.query("""DELETE FROM hive_subscriptions DB.query("""DELETE FROM hive_subscriptions
WHERE account_id = :actor_id WHERE account_id = :actor_id
...@@ -384,7 +382,7 @@ class CommunityOp: ...@@ -384,7 +382,7 @@ class CommunityOp:
(account_id, community_id, role_id, created_at) (account_id, community_id, role_id, created_at)
VALUES (:account_id, :community_id, :role_id, :date) VALUES (:account_id, :community_id, :role_id, :date)
ON CONFLICT (account_id, community_id) ON CONFLICT (account_id, community_id)
DO UPDATE SET role_id = :role_id""", **params) DO UPDATE SET role_id = :role_id """, **params)
self._notify('set_role', payload=Role(self.role_id).name) self._notify('set_role', payload=Role(self.role_id).name)
elif action == 'setUserTitle': elif action == 'setUserTitle':
DB.query("""INSERT INTO hive_roles DB.query("""INSERT INTO hive_roles
......
...@@ -59,7 +59,7 @@ class CustomOp: ...@@ -59,7 +59,7 @@ class CustomOp:
cls._process_legacy(account, op_json, block_date, block_num) cls._process_legacy(account, op_json, block_date, block_num)
elif op['id'] == 'community': elif op['id'] == 'community':
if block_num > START_BLOCK: if block_num > START_BLOCK:
process_json_community_op(account, op_json, block_date) process_json_community_op(account, op_json, block_date, block_num)
elif op['id'] == 'notify': elif op['id'] == 'notify':
cls._process_notify(account, op_json, block_date) cls._process_notify(account, op_json, block_date)
OPSM.op_stats(opName, OPSM.stop(start)) OPSM.op_stats(opName, OPSM.stop(start))
......
...@@ -18,18 +18,18 @@ FOLLOWING = 'following' ...@@ -18,18 +18,18 @@ FOLLOWING = 'following'
FOLLOW_ITEM_INSERT_QUERY = """ FOLLOW_ITEM_INSERT_QUERY = """
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num) INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num)
VALUES VALUES
( (
:flr, :flr,
:flg, :flg,
:at, :at,
:state, :state,
(CASE :state (CASE :state
WHEN 3 THEN TRUE WHEN 3 THEN TRUE
WHEN 4 THEN FALSE WHEN 4 THEN FALSE
ELSE FALSE ELSE FALSE
END END
), ),
(CASE :state (CASE :state
WHEN 3 THEN FALSE WHEN 3 THEN FALSE
WHEN 4 THEN TRUE WHEN 4 THEN TRUE
...@@ -38,18 +38,18 @@ FOLLOW_ITEM_INSERT_QUERY = """ ...@@ -38,18 +38,18 @@ FOLLOW_ITEM_INSERT_QUERY = """
), ),
:block_num :block_num
) )
ON CONFLICT (follower, following) DO UPDATE ON CONFLICT (follower, following) DO UPDATE
SET SET
state = (CASE EXCLUDED.state state = (CASE EXCLUDED.state
WHEN 0 THEN 0 -- 0 blocks possibility to update state WHEN 0 THEN 0 -- 0 blocks possibility to update state
ELSE EXCLUDED.state ELSE EXCLUDED.state
END), END),
blacklisted = (CASE EXCLUDED.state blacklisted = (CASE EXCLUDED.state
WHEN 3 THEN TRUE WHEN 3 THEN TRUE
WHEN 5 THEN FALSE WHEN 5 THEN FALSE
ELSE EXCLUDED.blacklisted ELSE EXCLUDED.blacklisted
END), END),
follow_blacklists = (CASE EXCLUDED.state follow_blacklists = (CASE EXCLUDED.state
WHEN 4 THEN TRUE WHEN 4 THEN TRUE
WHEN 6 THEN FALSE WHEN 6 THEN FALSE
ELSE EXCLUDED.follow_blacklists ELSE EXCLUDED.follow_blacklists
...@@ -89,7 +89,7 @@ class Follow: ...@@ -89,7 +89,7 @@ class Follow:
if k in cls.follow_items_to_flush: if k in cls.follow_items_to_flush:
old_value = cls.follow_items_to_flush.get(k) old_value = cls.follow_items_to_flush.get(k)
old_value['state'] = op['state'] old_value['state'] = op['state']
cls.follow_items_to_flush[k] = old_value cls.follow_items_to_flush[k] = old_value
else: else:
cls.follow_items_to_flush[k] = dict( cls.follow_items_to_flush[k] = dict(
...@@ -105,10 +105,6 @@ class Follow: ...@@ -105,10 +105,6 @@ class Follow:
DB.query(FOLLOW_ITEM_INSERT_QUERY, **op) DB.query(FOLLOW_ITEM_INSERT_QUERY, **op)
if new_state == 1: if new_state == 1:
Follow.follow(op['flr'], op['flg']) Follow.follow(op['flr'], op['flg'])
if old_state is None:
score = Accounts.default_score(op_json['follower'])
Notify('follow', src_id=op['flr'], dst_id=op['flg'],
when=op['at'], score=score).write()
if old_state == 1: if old_state == 1:
Follow.unfollow(op['flr'], op['flg']) Follow.unfollow(op['flr'], op['flg'])
...@@ -178,18 +174,18 @@ class Follow: ...@@ -178,18 +174,18 @@ class Follow:
VALUES """ VALUES """
sql_postfix = """ sql_postfix = """
ON CONFLICT ON CONSTRAINT hive_follows_pk DO UPDATE ON CONFLICT ON CONSTRAINT hive_follows_ux1 DO UPDATE
SET SET
state = (CASE EXCLUDED.state state = (CASE EXCLUDED.state
WHEN 0 THEN 0 -- 0 blocks possibility to update state WHEN 0 THEN 0 -- 0 blocks possibility to update state
ELSE EXCLUDED.state ELSE EXCLUDED.state
END), END),
blacklisted = (CASE EXCLUDED.state blacklisted = (CASE EXCLUDED.state
WHEN 3 THEN TRUE WHEN 3 THEN TRUE
WHEN 5 THEN FALSE WHEN 5 THEN FALSE
ELSE EXCLUDED.blacklisted ELSE EXCLUDED.blacklisted
END), END),
follow_blacklists = (CASE EXCLUDED.state follow_blacklists = (CASE EXCLUDED.state
WHEN 4 THEN TRUE WHEN 4 THEN TRUE
WHEN 6 THEN FALSE WHEN 6 THEN FALSE
ELSE EXCLUDED.follow_blacklists ELSE EXCLUDED.follow_blacklists
...@@ -276,15 +272,15 @@ class Follow: ...@@ -276,15 +272,15 @@ class Follow:
log.info("[SYNC] query follower counts") log.info("[SYNC] query follower counts")
sql = """ sql = """
CREATE TEMPORARY TABLE following_counts AS ( CREATE TEMPORARY TABLE following_counts AS (
SELECT id account_id, COUNT(state) num SELECT ha.id account_id, COUNT(state) num
FROM hive_accounts FROM hive_accounts ha
LEFT JOIN hive_follows hf ON id = hf.follower AND state = 1 LEFT JOIN hive_follows hf ON ha.id = hf.follower AND state = 1
GROUP BY id); GROUP BY ha.id);
CREATE TEMPORARY TABLE follower_counts AS ( CREATE TEMPORARY TABLE follower_counts AS (
SELECT id account_id, COUNT(state) num SELECT ha.id account_id, COUNT(state) num
FROM hive_accounts FROM hive_accounts ha
LEFT JOIN hive_follows hf ON id = hf.following AND state = 1 LEFT JOIN hive_follows hf ON ha.id = hf.following AND state = 1
GROUP BY id); GROUP BY ha.id);
""" """
DB.query(sql) DB.query(sql)
......
...@@ -22,7 +22,7 @@ DELETE_SQL = """ ...@@ -22,7 +22,7 @@ DELETE_SQL = """
INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0 WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0
) )
DELETE FROM hive_reblogs AS hr DELETE FROM hive_reblogs AS hr
WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps) WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps)
RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id
""" """
...@@ -38,8 +38,8 @@ SELECT_SQL = """ ...@@ -38,8 +38,8 @@ SELECT_SQL = """
INSERT_SQL = """ INSERT_SQL = """
INSERT INTO hive_reblogs (account, post_id, created_at, block_num) INSERT INTO hive_reblogs (account, post_id, created_at, block_num)
""" + SELECT_SQL + """ """ + SELECT_SQL + """
ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING ON CONFLICT ON CONSTRAINT hive_reblogs_ux1 DO NOTHING
RETURNING post_id RETURNING post_id
""" """
class Reblog(): class Reblog():
...@@ -84,9 +84,6 @@ class Reblog(): ...@@ -84,9 +84,6 @@ class Reblog():
result = dict(row) result = dict(row)
post_id = result['post_id'] post_id = result['post_id']
FeedCache.insert(post_id, blogger_id, block_date) FeedCache.insert(post_id, blogger_id, block_date)
Notify('reblog', src_id=blogger_id, dst_id=author_id,
post_id=post_id, when=block_date,
score=Accounts.default_score(blogger)).write()
else: else:
log.warning("Error in reblog: Insert operation returned `None` as `post_id`. Op details: {}".format(op_json)) log.warning("Error in reblog: Insert operation returned `None` as `post_id`. Op details: {}".format(op_json))
@classmethod @classmethod
...@@ -96,7 +93,7 @@ class Reblog(): ...@@ -96,7 +93,7 @@ class Reblog():
VALUES VALUES
""" """
sql_postfix = """ sql_postfix = """
ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING ON CONFLICT ON CONSTRAINT hive_reblogs_ux1 DO NOTHING
""" """
values = [] values = []
...@@ -121,4 +118,4 @@ class Reblog(): ...@@ -121,4 +118,4 @@ class Reblog():
query += sql_postfix query += sql_postfix
DB.query(query) DB.query(query)
cls.reblog_items_to_flush.clear() cls.reblog_items_to_flush.clear()
return item_count return item_count
\ No newline at end of file
...@@ -87,7 +87,7 @@ class Votes: ...@@ -87,7 +87,7 @@ class Votes:
INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink INNER JOIN hive_permlink_data hpd_p ON hpd_p.permlink = t.permlink
INNER JOIN hive_posts hp ON hp.author_id = ha_a.id AND hp.permlink_id = hpd_p.id INNER JOIN hive_posts hp ON hp.author_id = ha_a.id AND hp.permlink_id = hpd_p.id
WHERE hp.counter_deleted = 0 WHERE hp.counter_deleted = 0
ON CONFLICT ON CONSTRAINT hive_votes_pk DO ON CONFLICT ON CONSTRAINT hive_votes_ux1 DO
UPDATE UPDATE
SET SET
weight = CASE EXCLUDED.is_effective WHEN true THEN EXCLUDED.weight ELSE hive_votes.weight END, weight = CASE EXCLUDED.is_effective WHEN true THEN EXCLUDED.weight ELSE hive_votes.weight END,
......
...@@ -63,7 +63,7 @@ async def get_followers(db, account: str, start: str, follow_type: str, limit: i ...@@ -63,7 +63,7 @@ async def get_followers(db, account: str, start: str, follow_type: str, limit: i
sql = """ sql = """
SELECT name FROM hive_follows hf SELECT name FROM hive_follows hf
LEFT JOIN hive_accounts ON hf.follower = id LEFT JOIN hive_accounts ha ON hf.follower = ha.id
WHERE hf.following = :account_id WHERE hf.following = :account_id
AND state = :state %s AND state = :state %s
ORDER BY hf.created_at DESC ORDER BY hf.created_at DESC
......
...@@ -4,6 +4,7 @@ import logging ...@@ -4,6 +4,7 @@ import logging
from hive.server.common.helpers import return_error_info, json_date from hive.server.common.helpers import return_error_info, json_date
from hive.indexer.notify import NotifyType from hive.indexer.notify import NotifyType
from hive.server.hive_api.common import get_account_id, valid_limit, get_post_id from hive.server.hive_api.common import get_account_id, valid_limit, get_post_id
from hive.server.common.mutes import Mutes
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -62,15 +63,7 @@ async def account_notifications(context, account, min_score=25, last_id=None, li ...@@ -62,15 +63,7 @@ async def account_notifications(context, account, min_score=25, last_id=None, li
limit = valid_limit(limit, 100) limit = valid_limit(limit, 100)
account_id = await get_account_id(db, account) account_id = await get_account_id(db, account)
if account[:5] == 'hive-': min_score = 0 return await _dynamic_notifications(db = db, limit=limit, min_score=min_score, last_id = last_id, account_id = account_id)
seek = ' AND hn.id < :last_id' if last_id else ''
col = 'hn.community_id' if account[:5] == 'hive-' else 'dst_id'
sql = _notifs_sql(col + " = :dst_id" + seek)
rows = await db.query_all(sql, min_score=min_score, dst_id=account_id,
last_id=last_id, limit=limit)
return [_render(row) for row in rows]
@return_error_info @return_error_info
async def post_notifications(context, author, permlink, min_score=25, last_id=None, limit=100): async def post_notifications(context, author, permlink, min_score=25, last_id=None, limit=100):
...@@ -80,18 +73,13 @@ async def post_notifications(context, author, permlink, min_score=25, last_id=No ...@@ -80,18 +73,13 @@ async def post_notifications(context, author, permlink, min_score=25, last_id=No
limit = valid_limit(limit, 100) limit = valid_limit(limit, 100)
post_id = await get_post_id(db, author, permlink) post_id = await get_post_id(db, author, permlink)
seek = ' AND hn.id < :last_id' if last_id else '' return await _dynamic_notifications(db = db, limit=limit, min_score=min_score, last_id = last_id, post_id = post_id)
sql = _notifs_sql("post_id = :post_id" + seek)
rows = await db.query_all(sql, min_score=min_score, post_id=post_id,
last_id=last_id, limit=limit)
return [_render(row) for row in rows]
def _notifs_sql(where): def _notifs_sql(where):
sql = """SELECT hn.id, hn.type_id, hn.score, hn.created_at, sql = """SELECT hn.id, hn.type_id, hn.score, hn.created_at,
src.name src, dst.name dst, src.name src, dst.name dst,
(SELECT name FROM hive_accounts WHERE id = hp.author_id) as author, (SELECT name FROM hive_accounts WHERE id = hp.author_id) as author,
(SELECT permlink FROM hive_permlink_data WHERE id = hp.permlink_id) as permlink, (SELECT permlink FROM hive_permlink_data WHERE id = hp.permlink_id) as permlink,
hc.name community, hc.name community,
hc.title community_title, payload hc.title community_title, payload
FROM hive_notifs hn FROM hive_notifs hn
...@@ -148,3 +136,289 @@ def _render_url(row): ...@@ -148,3 +136,289 @@ def _render_url(row):
if row['dst']: return '@' + row['dst'] if row['dst']: return '@' + row['dst']
assert False, 'no url for %s' % row assert False, 'no url for %s' % row
return None return None
def _vote_notifs_sql(min_score, account_id = None, post_id = None, last_id = None, ):
conditions = ()
if ( account_id ):
conditions = conditions + ( "hpv.author_id = {}".format( account_id ), )
if ( post_id ):
conditions = conditions + ( "hv1.post_id = {}".format( post_id ), )
conditions = conditions + ( "hv1.rshares >= 10e9", "ar.abs_rshares != 0", )
condition = "WHERE " + ' AND '.join( conditions )
last_id_where = ""
if last_id:
last_id_where = "AND scores.notif_id < {}".format(last_id)
return """
SELECT
scores.notif_id as id
, 17 as type_id
, hv.last_update as created_at
, scores.src as src
, scores.dst as dst
, scores.dst as author
, scores.permlink as permlink
, '' as community
, '' as community_title
, '' as payload
, scores.score as score
FROM hive_votes hv
JOIN (
SELECT
hv1.id as id
, notification_id(hv1.block_num, 17, CAST( hv1.id as INT) ) as notif_id
, calculate_notify_vote_score( (hpv.payout + hpv.pending_payout), ar.abs_rshares, hv1.rshares ) as score
, hpv.author as dst
, ha.name as src
, hpv.permlink as permlink
FROM hive_votes hv1
JOIN hive_posts_view hpv ON hv1.post_id = hpv.id
JOIN hive_accounts ha ON ha.id = hv1.voter_id
JOIN (
SELECT
v.post_id as post_id
, COALESCE(
SUM( CASE v.rshares >= 0 WHEN True THEN v.rshares ELSE -v.rshares END )
, 0
) as abs_rshares
FROM hive_votes v
WHERE NOT v.rshares = 0
GROUP BY v.post_id
) as ar ON ar.post_id = hpv.id
{}
) as scores ON scores.id = hv.id
WHERE scores.score >= {} {}
""".format( condition, min_score, last_id_where )
def _new_community_notifs_sql( min_score, account_id, last_id = None ):
last_id_where = ""
if last_id:
last_id_where = "AND hc_id.notif_id < {}".format(last_id)
return """
SELECT
hc_id.notif_id as id
, 1 as type_id
, hc.created_at as created_at
, '' as src
, ha.name as dst
, '' as author
, '' as permlink
, hc.name as community
, '' as community_title
, '' as payload
, 35 as score
FROM
hive_communities hc
JOIN hive_accounts ha ON ha.id = hc.id
JOIN (
SELECT
hc2.id as id
, notification_id(hc2.block_num, 11, hc2.id) as notif_id
FROM hive_communities hc2
) as hc_id ON hc_id.id = hc.id
WHERE hc.id={} {}
""".format( account_id, last_id_where )
def _subsription_notifs_sql( min_score, account_id, last_id = None ):
last_id_where = ""
if last_id:
last_id_where = "AND hs_scores.notif_id < {}".format(last_id)
return """
SELECT
hs_scores.notif_id as id
, 11 as type_id
, hs.created_at as created_at
, hs_scores.src as src
, ha_com.name as dst
, '' as author
, '' as permlink
, hc.name as community
, hc.title as community_title
, '' as payload
, hs_scores.score
FROM
hive_subscriptions hs
JOIN hive_communities hc ON hs.community_id = hc.id
JOIN (
SELECT
hs2.id as id
, notification_id(hs2.block_num, 11, hs2.id) as notif_id
, score_for_account( ha.id ) as score
, ha.name as src
FROM hive_subscriptions hs2
JOIN hive_accounts ha ON hs2.account_id = ha.id
) as hs_scores ON hs_scores.id = hs.id
JOIN hive_accounts ha_com ON hs.community_id = ha_com.id
WHERE {} = hs.community_id {}
""".format( account_id, last_id_where )
def _reblog_notifs_sql( min_score, last_id = None, account_id = None, post_id = None ):
conditions = ()
if ( last_id ):
conditions = conditions + ( "hr_scores.id < {}".format( last_id ), )
if ( post_id ):
conditions = conditions + ( "hr.post_id = {}".format( post_id ), )
if ( account_id ):
conditions = conditions + ( "hp.author_id = {}".format( account_id ), )
conditions = conditions + ( "hr_scores.score >= {}".format( min_score ), )
conditions = "WHERE " + ' AND '.join( conditions )
sql = """
SELECT
hr_scores.notif_id as id
, 14 as type_id
, hr.created_at as created_at
, hr.account as src
, ha.name as dst
, ha.name as author
, hpd.permlink as permlink
, '' as community
, '' as community_title
, '' as payload
, hr_scores.score as score
FROM
hive_reblogs hr
JOIN hive_posts hp ON hr.post_id = hp.id
JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
JOIN (
SELECT
hr2.id as id
, notification_id(hr2.block_num, 14, hr2.id) as notif_id
, score_for_account( has.id ) as score
FROM hive_reblogs hr2
JOIN hive_accounts has ON hr2.account = has.name
) as hr_scores ON hr_scores.id = hr.id
JOIN hive_accounts ha ON hp.author_id = ha.id
{}
"""
return sql.format( conditions )
def _follow_notifications_sql(min_score, account_id, last_id = None ):
last_id_where = ""
if last_id:
last_id_where = "AND notifs_id.notif_id < {}".format(last_id)
return """
SELECT
notifs_id.notif_id as id
, 15 as type_id
, hf.created_at as created_at
, followers_scores.follower_name as src
, ha2.name as dst
, '' as author
, '' as permlink
, '' as community
, '' as community_title
, '' as payload
, followers_scores.score as score
FROM
hive_follows hf
JOIN hive_accounts ha2 ON hf.following = ha2.id
JOIN (
SELECT
ha.id as follower_id
, ha.name as follower_name
, score_for_account( ha.id ) as score
FROM hive_accounts ha
) as followers_scores ON followers_scores.follower_id = hf.follower
JOIN (
SELECT
hf2.id as id
, notification_id(hf2.block_num, 15, hf2.id) as notif_id
FROM hive_follows hf2
) as notifs_id ON notifs_id.id = hf.id
WHERE {} = hf.following AND score >= {} {}
""".format( account_id, min_score, last_id_where )
def _replies_notifications_sql( min_score, account_id = None, post_id = None, last_id = None ):
replies_conditions = ("WHERE hpv.depth > 0".format(min_score),)
if ( post_id ):
replies_conditions = replies_conditions + ( "hpv.parent_id = {}".format( post_id ), )
if ( account_id ):
replies_conditions = replies_conditions + ( "hpv.parent_author_id = {}".format( account_id ), )
last_id_where = ""
if ( last_id ):
last_id_where = "posts_and_scores.id < {} AND ".format(last_id)
replies_conditions = ' AND '.join( replies_conditions )
return """
SELECT
posts_and_scores.id as id
, posts_and_scores.type_id as type_id
, posts_and_scores.created_at as created_at
, posts_and_scores.author as src
, posts_and_scores.parent_author as dst
, posts_and_scores.author as author
, posts_and_scores.permlink as permlink
, '' as community
, '' as community_title
, '' as payload
, posts_and_scores.score as score
FROM
(
SELECT
notification_id(
block_num
, CASE ( hpv.depth )
WHEN 1 THEN 12
ELSE 13
END
, hpv.id ) as id
, CASE ( hpv.depth )
WHEN 1 THEN 12
ELSE 13
END as type_id
, created_at
, author
, parent_author
, permlink
, depth
, parent_author_id
, author_id
, score_for_account( hpv.author_id ) as score
FROM
hive_posts_view hpv
{}
) as posts_and_scores
WHERE {} posts_and_scores.score >= {} AND NOT EXISTS(
SELECT 1
FROM
hive_follows hf
WHERE hf.follower = posts_and_scores.parent_author_id AND hf.following = posts_and_scores.author_id AND hf.state = 2
)
""".format( replies_conditions, last_id_where, min_score )
async def _dynamic_notifications( db, limit, min_score, account_id = None, post_id = None, last_id = None ):
# posts and account notifs
sub_queries = ( _replies_notifications_sql( min_score, account_id, post_id, last_id ), )
sub_queries += ( _reblog_notifs_sql( min_score, account_id, post_id, last_id ), )
sub_queries += ( _vote_notifs_sql( min_score, account_id, post_id, last_id ), )
if ( account_id ):
sub_queries += ( _follow_notifications_sql(min_score, account_id, last_id), )
sub_queries += ( _subsription_notifs_sql(min_score, account_id, last_id), )
sub_queries += ( _new_community_notifs_sql(min_score, account_id, last_id), )
sql_query = ' UNION ALL '.join( sub_queries )
sql_query += " ORDER BY id DESC, type_id LIMIT {}".format(limit)
print(sql_query)
rows = await db.query_all(sql_query)
rows = [row for row in rows if row['author'] not in Mutes.all()]
return [_render(row) for row in rows]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment