From 31e5c984f1a57a79055f473f20e6ca52867ac214 Mon Sep 17 00:00:00 2001 From: Dariusz Kedzierski <dkedzierski@syncad.com> Date: Tue, 10 Nov 2020 00:26:06 +0100 Subject: [PATCH] Removed delta calculations from follow.py. Follower/following calculation moved to dedicated SQL function. --- hive/db/db_state.py | 12 +- hive/db/schema.py | 4 +- hive/db/sql_scripts/update_follow_count.sql | 33 +++++ hive/indexer/blocks.py | 3 +- hive/indexer/follow.py | 154 +------------------- hive/indexer/sync.py | 3 - 6 files changed, 50 insertions(+), 159 deletions(-) create mode 100644 hive/db/sql_scripts/update_follow_count.sql diff --git a/hive/db/db_state.py b/hive/db/db_state.py index b6af6fa05..3772593ec 100644 --- a/hive/db/db_state.py +++ b/hive/db/db_state.py @@ -358,6 +358,14 @@ class DbState: time_end = perf_counter() log.info("[INIT] update_notification_cache executed in %.4fs", time_end - time_start) + time_start = perf_counter() + sql = """ + SELECT update_follow_count({}, {}); + """.format(last_imported_block, current_imported_block) + DbState.db().query_no_return(sql) + time_end = perf_counter() + log.info("[INIT] update_follow_count executed in %.4fs", time_end - time_start) + # Update a block num immediately DbState.db().query_no_return("UPDATE hive_state SET block_num = :block_num", block_num = current_imported_block) @@ -458,8 +466,8 @@ class DbState: cls._set_ver(9) if cls._ver == 9: - from hive.indexer.follow import Follow - Follow.force_recount() + #from hive.indexer.follow import Follow + #Follow.force_recount() cls._set_ver(10) if cls._ver == 10: diff --git a/hive/db/schema.py b/hive/db/schema.py index a1fc3ec4c..56501b157 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -606,8 +606,8 @@ def setup(db): "bridge_get_account_posts_by_blog.sql", "condenser_get_names_by_reblogged.sql", "condenser_get_discussions_by_comments.sql", - "condenser_get_account_reputations.sql" - + "condenser_get_account_reputations.sql", + "update_follow_count.sql" ] from os.path import dirname, realpath dir_path = dirname(realpath(__file__)) diff --git a/hive/db/sql_scripts/update_follow_count.sql b/hive/db/sql_scripts/update_follow_count.sql new file mode 100644 index 000000000..90376dda9 --- /dev/null +++ b/hive/db/sql_scripts/update_follow_count.sql @@ -0,0 +1,33 @@ +DROP FUNCTION IF EXISTS update_follow_count(hive_blocks.num%TYPE, hive_blocks.num%TYPE); +CREATE OR REPLACE FUNCTION update_follow_count( + in _first_block hive_blocks.num%TYPE, + in _last_block hive_blocks.num%TYPE +) +RETURNS VOID +LANGUAGE 'plpgsql' +AS +$BODY$ +BEGIN +UPDATE hive_accounts ha +SET + followers = data_set.followers_count, + following = data_set.following_count +FROM + ( + WITH data_cfe(user_id) AS ( + SELECT DISTINCT following FROM hive_follows WHERE block_num BETWEEN _first_block AND _last_block + UNION + SELECT DISTINCT follower FROM hive_follows WHERE block_num BETWEEN _first_block AND _last_block + ) + SELECT + data_cfe.user_id AS user_id, + (SELECT COUNT(1) FROM hive_follows hf1 WHERE hf1.following = data_cfe.user_id AND hf1.state = 1) AS followers_count, + (SELECT COUNT(1) FROM hive_follows hf2 WHERE hf2.follower = data_cfe.user_id AND hf2.state = 1) AS following_count + FROM + data_cfe + ) AS data_set(user_id, followers_count, following_count) +WHERE + ha.id = data_set.user_id; +END +$BODY$ +; \ No newline at end of file diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 7a94abcac..f814e8554 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -441,7 +441,8 @@ class Blocks: "SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block), "SELECT update_feed_cache({}, {})".format(first_block, last_block), "SELECT update_hive_posts_mentions({}, {})".format(first_block, last_block), - "SELECT update_notification_cache({}, {}, {})".format(first_block, last_block, is_hour_action) + "SELECT update_notification_cache({}, {}, {})".format(first_block, last_block, is_hour_action), + "SELECT update_follow_count({}, {})".format(first_block, last_block), #,"SELECT update_account_reputations({}, {})".format(first_block, last_block) ] diff --git a/hive/indexer/follow.py b/hive/indexer/follow.py index f2c727a5b..7d5a0d55e 100644 --- a/hive/indexer/follow.py +++ b/hive/indexer/follow.py @@ -16,21 +16,9 @@ from hive.utils.normalize import escape_characters log = logging.getLogger(__name__) -FOLLOWERS = 'followers' -FOLLOWING = 'following' DB = Db.instance() -def _flip_dict(dict_to_flip): - """Swap keys/values. Returned dict values are array of keys.""" - flipped = {} - for key, value in dict_to_flip.items(): - if value in flipped: - flipped[value].append(key) - else: - flipped[value] = [key] - return flipped - class Follow(DbAdapterHolder): """Handles processing of incoming follow ups and flushing to db.""" @@ -74,15 +62,6 @@ class Follow(DbAdapterHolder): block_num=block_num) cls.idx += 1 - if not DbState.is_initial_sync(): - for following in op['flg']: - new_state = state - old_state = cls._get_follow_db_state(op['flr'], following) - if new_state == 1: - Follow.follow(op['flr'], following) - if old_state == 1: - Follow.unfollow(op['flr'], following) - if state > 8: # check if given state exists in dict # if exists add follower to a list for a given state @@ -134,43 +113,7 @@ class Follow(DbAdapterHolder): at=date) @classmethod - def _get_follow_db_state(cls, follower, following): - """Retrieve current follow state of an account pair.""" - sql = """ - SELECT - state - FROM - hive_follows hf - INNER JOIN hive_accounts ha_flr ON ha_flr.id = hf.follower AND ha_flr.name = :follower - INNER JOIN hive_accounts ha_flg ON ha_flg.id = hf.following AND ha_flg.name = :following - """ - return cls.db.query_one(sql, follower=follower, following=following) - - # -- stat tracking -- - - _delta = {FOLLOWERS: {}, FOLLOWING: {}} - - @classmethod - def follow(cls, follower, following): - """Applies follow count change the next flush.""" - cls._apply_delta(follower, FOLLOWING, 1) - cls._apply_delta(following, FOLLOWERS, 1) - - @classmethod - def unfollow(cls, follower, following): - """Applies follow count change the next flush.""" - cls._apply_delta(follower, FOLLOWING, -1) - cls._apply_delta(following, FOLLOWERS, -1) - - @classmethod - def _apply_delta(cls, account, role, direction): - """Modify an account's follow delta in specified direction.""" - if not account in cls._delta[role]: - cls._delta[role][account] = 0 - cls._delta[role][account] += direction - - @classmethod - def _flush_follow_items(cls): + def flush(cls): n = 0 if cls.follow_items_to_flush: sql_prefix = """ @@ -220,7 +163,8 @@ class Follow(DbAdapterHolder): WHEN 7 THEN TRUE WHEN 8 THEN FALSE ELSE EXCLUDED.follow_muted - END) + END), + block_num = EXCLUDED.block_num WHERE hf.following = EXCLUDED.following AND hf.follower = EXCLUDED.follower """ values = [] @@ -473,95 +417,3 @@ class Follow(DbAdapterHolder): cls.follow_update_items_to_flush.clear() cls.idx = 0 return n - - @classmethod - def flush(cls, trx=False): - """Flushes pending follow count deltas.""" - - n = cls._flush_follow_items() - - updated = 0 - sqls = [] - for col, deltas in cls._delta.items(): - for delta, names in _flip_dict(deltas).items(): - updated += len(names) - query_values = ','.join(["({})".format(account) for account in names]) - sql = """ - UPDATE - hive_accounts ha - SET - %s = %s + :mag - FROM - ( - VALUES - %s - ) AS T(name) - WHERE ha.name = T.name - """ - sqls.append((sql % (col, col, query_values), dict(mag=delta))) - - if not updated: - return n - - start = perf() - cls.db.batch_queries(sqls, trx=trx) - if trx: - log.info("[SYNC] flushed %d follow deltas in %ds", - updated, perf() - start) - - cls._delta = {FOLLOWERS: {}, FOLLOWING: {}} - return updated + n - - @classmethod - def flush_recount(cls): - """Recounts follows/following counts for all queued accounts. - - This is currently not used; this approach was shown to be too - expensive, but it's useful in case follow counts manage to get - out of sync. - """ - names = set([*cls._delta[FOLLOWERS].keys(), - *cls._delta[FOLLOWING].keys()]) - query_values = ','.join(["({})".format(account) for account in names]) - sql = """ - UPDATE - hive_accounts ha - SET - followers = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND following = ha.id), - following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = ha.id) - FROM - ( - VALUES - {} - ) AS T(name) - WHERE ha.name = T.name - """.format(query_values) - cls.db.query(sql) - - @classmethod - def force_recount(cls): - """Recounts all follows after init sync.""" - log.info("[SYNC] query follower counts") - sql = """ - CREATE TEMPORARY TABLE following_counts AS ( - SELECT ha.id account_id, COUNT(state) num - FROM hive_accounts ha - LEFT JOIN hive_follows hf ON ha.id = hf.follower AND state = 1 - GROUP BY ha.id); - CREATE TEMPORARY TABLE follower_counts AS ( - SELECT ha.id account_id, COUNT(state) num - FROM hive_accounts ha - LEFT JOIN hive_follows hf ON ha.id = hf.following AND state = 1 - GROUP BY ha.id); - """ - cls.db.query(sql) - - log.info("[SYNC] update follower counts") - sql = """ - UPDATE hive_accounts SET followers = num FROM follower_counts - WHERE id = account_id AND followers != num; - - UPDATE hive_accounts SET following = num FROM following_counts - WHERE id = account_id AND following != num; - """ - cls.db.query(sql) diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 2f6813c23..05c63d4d3 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -297,9 +297,6 @@ class Sync: if not CONTINUE_PROCESSING: return - log.info("[INIT] *** Initial cache build ***") - Follow.force_recount() - def from_steemd(self, is_initial_sync=False, chunk_size=1000): """Fast sync strategy: read/process blocks in batches.""" steemd = self._steem -- GitLab