Something went wrong on our end
-
Cast all follow name data to string. Should protect against setting non string data types as names in follower and following
Cast all follow name data to string. Should protect against setting non string data types as names in follower and following
follow.py 24.27 KiB
"""Handles follow operations."""
import logging
from time import perf_counter as perf
from json import dumps
from funcy.seqs import first
from hive.db.adapter import Db
from hive.db.db_state import DbState
from hive.utils.misc import chunks
from hive.indexer.accounts import Accounts
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import escape_characters
log = logging.getLogger(__name__)
FOLLOWERS = 'followers'
FOLLOWING = 'following'
DB = Db.instance()
def _flip_dict(dict_to_flip):
"""Swap keys/values. Returned dict values are array of keys."""
flipped = {}
for key, value in dict_to_flip.items():
if value in flipped:
flipped[value].append(key)
else:
flipped[value] = [key]
return flipped
class Follow(DbAdapterHolder):
"""Handles processing of incoming follow ups and flushing to db."""
follow_items_to_flush = dict()
# [DK] this dictionary will hold data for table update operations
# since for each status update query is different we will group
# follower id per status:
# {
# state_number_1 : [follower_id_1, follower_id_2, ...]
# state_number_2 : [follower_id_3, follower_id_4, ...]
# }
# we will use this dict later to perform batch updates
follow_update_items_to_flush = dict()
idx = 0
@classmethod
def follow_op(cls, account, op_json, date, block_num):
"""Process an incoming follow op."""
op = cls._validated_op(account, op_json, date)
if not op:
return
op['block_num'] = block_num
state = op['state']
for following in op['flg']:
k = '{}/{}'.format(op['flr'], following)
if k in cls.follow_items_to_flush:
cls.follow_items_to_flush[k]['state'] = state
cls.follow_items_to_flush[k]['idx'] = cls.idx
cls.follow_items_to_flush[k]['block_num'] = block_num
else:
cls.follow_items_to_flush[k] = dict(
idx=cls.idx,
flr=op['flr'],
flg=following,
state=state,
at=op['at'],
block_num=block_num)
cls.idx += 1
if not DbState.is_initial_sync():
for following in op['flg']:
new_state = state
old_state = cls._get_follow_db_state(op['flr'], following)
if new_state == 1:
Follow.follow(op['flr'], following)
if old_state == 1:
Follow.unfollow(op['flr'], following)
if state > 8:
# check if given state exists in dict
# if exists add follower to a list for a given state
# if not exists create list and set that list for given state
if state in cls.follow_update_items_to_flush:
cls.follow_update_items_to_flush[state].append(op['flr'])
else:
cls.follow_update_items_to_flush[state] = [op['flr']]
@classmethod
def _validated_op(cls, account, op, date):
"""Validate and normalize the operation."""
if ( not 'what' in op
or not isinstance(op['what'], list)
or not 'follower' in op
or not 'following' in op):
return None
op['following'] = op['following'] if isinstance(op['following'], list) else [op['following']]
# additional layer of protection against putting complex data types as user names
as_str = []
for following in op['following']:
if isinstance(following, list) or isinstance(following, dict):
as_str.append(dumps(following))
else:
as_str.append(str(following))
op['following'] = as_str
if isinstance(op['follower'], list) or isinstance(op['follower'], dict):
op['follower'] = dumps(op['follower'])
else:
op['follower'] = str(op['follower'])
# follower/following is empty
if not op['follower'] or not op['following']:
return None
what = first(op['what']) or ''
if not isinstance(what, str):
return None
defs = {'': 0, 'blog': 1, 'ignore': 2, 'blacklist': 3, 'follow_blacklist': 4, 'unblacklist': 5, 'unfollow_blacklist': 6,
'follow_muted': 7, 'unfollow_muted': 8, 'reset_blacklist' : 9, 'reset_following_list': 10, 'reset_muted_list': 11,
'reset_follow_blacklist': 12, 'reset_follow_muted_list': 13, 'reset_all_lists': 14}
if what not in defs:
return None
all_accounts = list(op['following'])
all_accounts.append(op['follower'])
if (op['follower'] in op['following']
or op['follower'] != account):
return None
non_existent_names = Accounts.check_names(all_accounts)
if non_existent_names:
log.warning("Follow op validation, following names does not exists in database: {}".format(non_existent_names))
return dict(flr=escape_characters(op['follower']),
flg=[escape_characters(following) for following in op['following']],
state=defs[what],
at=date)
@classmethod
def _get_follow_db_state(cls, follower, following):
"""Retrieve current follow state of an account pair."""
sql = """
SELECT
state
FROM
hive_follows hf
INNER JOIN hive_accounts ha_flr ON ha_flr.id = hf.follower AND ha_flr.name = :follower
INNER JOIN hive_accounts ha_flg ON ha_flg.id = hf.following AND ha_flg.name = :following
"""
return cls.db.query_one(sql, follower=follower, following=following)
# -- stat tracking --
_delta = {FOLLOWERS: {}, FOLLOWING: {}}
@classmethod
def follow(cls, follower, following):
"""Applies follow count change the next flush."""
cls._apply_delta(follower, FOLLOWING, 1)
cls._apply_delta(following, FOLLOWERS, 1)
@classmethod
def unfollow(cls, follower, following):
"""Applies follow count change the next flush."""
cls._apply_delta(follower, FOLLOWING, -1)
cls._apply_delta(following, FOLLOWERS, -1)
@classmethod
def _apply_delta(cls, account, role, direction):
"""Modify an account's follow delta in specified direction."""
if not account in cls._delta[role]:
cls._delta[role][account] = 0
cls._delta[role][account] += direction
@classmethod
def _flush_follow_items(cls):
n = 0
if cls.follow_items_to_flush:
sql_prefix = """
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, follow_muted, block_num)
SELECT ds.follower_id, ds.following_id, ds.created_at, ds.state, ds.blacklisted, ds.follow_blacklists, ds.follow_muted, ds.block_num
FROM
(
SELECT
t.id,
ha_flr.id as follower_id,
ha_flg.id as following_id,
t.created_at,
t.state,
t.blacklisted,
t.follow_blacklists,
t.follow_muted,
t.block_num
FROM
(
VALUES
{}
) as T (id, follower, following, created_at, state, blacklisted, follow_blacklists, follow_muted, block_num)
INNER JOIN hive_accounts ha_flr ON ha_flr.name = T.follower
INNER JOIN hive_accounts ha_flg ON ha_flg.name = T.following
ORDER BY T.block_num ASC, T.id ASC
) AS ds(id, follower_id, following_id, created_at, state, blacklisted, follow_blacklists, follow_muted, block_num)
ORDER BY ds.block_num ASC, ds.id ASC
"""
sql_postfix = """
ON CONFLICT ON CONSTRAINT hive_follows_ux1 DO UPDATE
SET
state = (CASE EXCLUDED.state
WHEN 0 THEN 0 -- 0 blocks possibility to update state
ELSE EXCLUDED.state
END),
blacklisted = (CASE EXCLUDED.state
WHEN 3 THEN TRUE
WHEN 5 THEN FALSE
ELSE EXCLUDED.blacklisted
END),
follow_blacklists = (CASE EXCLUDED.state
WHEN 4 THEN TRUE
WHEN 6 THEN FALSE
ELSE EXCLUDED.follow_blacklists
END),
follow_muted = (CASE EXCLUDED.state
WHEN 7 THEN TRUE
WHEN 8 THEN FALSE
ELSE EXCLUDED.follow_muted
END)
WHERE hf.following = EXCLUDED.following AND hf.follower = EXCLUDED.follower
"""
values = []
limit = 1000
count = 0
cls.beginTx()
for _, follow_item in cls.follow_items_to_flush.items():
if count < limit:
values.append("({}, {}, {}, '{}'::timestamp, {}, {}, {}, {}, {})".format(follow_item['idx'],
follow_item['flr'],
follow_item['flg'],
follow_item['at'],
follow_item['state'],
follow_item['state'] == 3,
follow_item['state'] == 4,
follow_item['state'] == 7,
follow_item['block_num']))
count = count + 1
else:
query = str(sql_prefix).format(",".join(values))
query += sql_postfix
cls.db.query(query)
values.clear()
values.append("({}, {}, {}, '{}'::timestamp, {}, {}, {}, {}, {})".format(follow_item['idx'],
follow_item['flr'],
follow_item['flg'],
follow_item['at'],
follow_item['state'],
follow_item['state'] == 3,
follow_item['state'] == 4,
follow_item['state'] == 7,
follow_item['block_num']))
count = 1
n += 1
if len(values) > 0:
query = str(sql_prefix).format(",".join(values))
query += sql_postfix
cls.db.query(query)
cls.commitTx()
cls.follow_items_to_flush.clear()
# process follow_update_items_to_flush dictionary
# .items() will return list of tuples: [(state_number, [list of follower ids]), ...]
# for each state get list of follower_id and make update query
# for that list, if list size is greater than 1000 it will be divided
# to chunks of 1000
#
for state, update_flush_items in cls.follow_update_items_to_flush.items():
for chunk in chunks(update_flush_items, 1000):
sql = None
query_values = ','.join(["({})".format(account) for account in chunk])
# [DK] probaly not a bad idea to move that logic to SQL function
if state == 9:
#reset blacklists for follower
sql = """
UPDATE
hive_follows hf
SET
blacklisted = false
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id
""".format(query_values)
elif state == 10:
#reset following list for follower
sql = """
UPDATE
hive_follows hf
SET
state = 0
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id
AND hf.state = 1
""".format(query_values)
elif state == 11:
#reset all muted list for follower
sql = """
UPDATE
hive_follows hf
SET
state = 0
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id
AND hf.state = 2
""".format(query_values)
elif state == 12:
#reset followed blacklists
sql = """
UPDATE
hive_follows hf
SET
follow_blacklists = false
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{0}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id;
UPDATE
hive_follows hf
SET
follow_blacklists = true
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{0}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id
AND following = (SELECT id FROM hive_accounts WHERE name = 'null')
""".format(query_values)
elif state == 13:
#reset followed mute lists
sql = """
UPDATE
hive_follows hf
SET
follow_muted = false
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{0}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id;
UPDATE
hive_follows hf
SET
follow_muted = true
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{0}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id
AND following = (SELECT id FROM hive_accounts WHERE name = 'null')
""".format(query_values)
elif state == 14:
#reset all lists
sql = """
UPDATE
hive_follows hf
SET
blacklisted = false,
follow_blacklists = false,
follow_muted = false,
state = 0
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{0}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id;
UPDATE
hive_follows hf
SET
follow_blacklists = true,
follow_muted = true
FROM
(
SELECT
ha.id as follower_id
FROM
(
VALUES
{0}
) AS T(name)
INNER JOIN hive_accounts ha ON ha.name = T.name
) AS ds (follower_id)
WHERE
hf.follower = ds.follower_id
AND following = (SELECT id FROM hive_accounts WHERE name = 'null')
""".format(query_values)
if sql is not None:
cls.beginTx()
DB.query(sql)
cls.commitTx()
n += len(chunk)
cls.follow_update_items_to_flush.clear()
cls.idx = 0
return n
@classmethod
def flush(cls, trx=False):
"""Flushes pending follow count deltas."""
n = cls._flush_follow_items()
updated = 0
sqls = []
for col, deltas in cls._delta.items():
for delta, names in _flip_dict(deltas).items():
updated += len(names)
query_values = ','.join(["({})".format(account) for account in names])
sql = """
UPDATE
hive_accounts ha
SET
%s = %s + :mag
FROM
(
VALUES
%s
) AS T(name)
WHERE ha.name = T.name
"""
sqls.append((sql % (col, col, query_values), dict(mag=delta)))
if not updated:
return n
start = perf()
cls.db.batch_queries(sqls, trx=trx)
if trx:
log.info("[SYNC] flushed %d follow deltas in %ds",
updated, perf() - start)
cls._delta = {FOLLOWERS: {}, FOLLOWING: {}}
return updated + n
@classmethod
def flush_recount(cls):
"""Recounts follows/following counts for all queued accounts.
This is currently not used; this approach was shown to be too
expensive, but it's useful in case follow counts manage to get
out of sync.
"""
names = set([*cls._delta[FOLLOWERS].keys(),
*cls._delta[FOLLOWING].keys()])
query_values = ','.join(["({})".format(account) for account in names])
sql = """
UPDATE
hive_accounts ha
SET
followers = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND following = ha.id),
following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = ha.id)
FROM
(
VALUES
{}
) AS T(name)
WHERE ha.name = T.name
""".format(query_values)
cls.db.query(sql)
@classmethod
def force_recount(cls):
"""Recounts all follows after init sync."""
log.info("[SYNC] query follower counts")
sql = """
CREATE TEMPORARY TABLE following_counts AS (
SELECT ha.id account_id, COUNT(state) num
FROM hive_accounts ha
LEFT JOIN hive_follows hf ON ha.id = hf.follower AND state = 1
GROUP BY ha.id);
CREATE TEMPORARY TABLE follower_counts AS (
SELECT ha.id account_id, COUNT(state) num
FROM hive_accounts ha
LEFT JOIN hive_follows hf ON ha.id = hf.following AND state = 1
GROUP BY ha.id);
"""
cls.db.query(sql)
log.info("[SYNC] update follower counts")
sql = """
UPDATE hive_accounts SET followers = num FROM follower_counts
WHERE id = account_id AND followers != num;
UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num;
"""
cls.db.query(sql)