Skip to content
Snippets Groups Projects
Commit d58d9b4d authored by Dariusz Kędzierski's avatar Dariusz Kędzierski Committed by Jason Salyers
Browse files

Removed delta calculations from follow.py. Follower/following calculation...

Removed delta calculations from follow.py. Follower/following calculation moved to dedicated SQL function.
parent 5c33c134
No related branches found
No related tags found
2 merge requests!456Release candidate v1 24,!370Jsalyers muting at sql level
...@@ -358,6 +358,14 @@ class DbState: ...@@ -358,6 +358,14 @@ class DbState:
time_end = perf_counter() time_end = perf_counter()
log.info("[INIT] update_notification_cache executed in %.4fs", time_end - time_start) log.info("[INIT] update_notification_cache executed in %.4fs", time_end - time_start)
time_start = perf_counter()
sql = """
SELECT update_follow_count({}, {});
""".format(last_imported_block, current_imported_block)
DbState.db().query_no_return(sql)
time_end = perf_counter()
log.info("[INIT] update_follow_count executed in %.4fs", time_end - time_start)
# Update a block num immediately # Update a block num immediately
DbState.db().query_no_return("UPDATE hive_state SET block_num = :block_num", block_num = current_imported_block) DbState.db().query_no_return("UPDATE hive_state SET block_num = :block_num", block_num = current_imported_block)
...@@ -458,8 +466,8 @@ class DbState: ...@@ -458,8 +466,8 @@ class DbState:
cls._set_ver(9) cls._set_ver(9)
if cls._ver == 9: if cls._ver == 9:
from hive.indexer.follow import Follow #from hive.indexer.follow import Follow
Follow.force_recount() #Follow.force_recount()
cls._set_ver(10) cls._set_ver(10)
if cls._ver == 10: if cls._ver == 10:
......
...@@ -606,8 +606,8 @@ def setup(db): ...@@ -606,8 +606,8 @@ def setup(db):
"bridge_get_account_posts_by_blog.sql", "bridge_get_account_posts_by_blog.sql",
"condenser_get_names_by_reblogged.sql", "condenser_get_names_by_reblogged.sql",
"condenser_get_discussions_by_comments.sql", "condenser_get_discussions_by_comments.sql",
"condenser_get_account_reputations.sql" "condenser_get_account_reputations.sql",
"update_follow_count.sql"
] ]
from os.path import dirname, realpath from os.path import dirname, realpath
dir_path = dirname(realpath(__file__)) dir_path = dirname(realpath(__file__))
......
DROP FUNCTION IF EXISTS update_follow_count(hive_blocks.num%TYPE, hive_blocks.num%TYPE);
CREATE OR REPLACE FUNCTION update_follow_count(
in _first_block hive_blocks.num%TYPE,
in _last_block hive_blocks.num%TYPE
)
RETURNS VOID
LANGUAGE 'plpgsql'
AS
$BODY$
BEGIN
UPDATE hive_accounts ha
SET
followers = data_set.followers_count,
following = data_set.following_count
FROM
(
WITH data_cfe(user_id) AS (
SELECT DISTINCT following FROM hive_follows WHERE block_num BETWEEN _first_block AND _last_block
UNION
SELECT DISTINCT follower FROM hive_follows WHERE block_num BETWEEN _first_block AND _last_block
)
SELECT
data_cfe.user_id AS user_id,
(SELECT COUNT(1) FROM hive_follows hf1 WHERE hf1.following = data_cfe.user_id AND hf1.state = 1) AS followers_count,
(SELECT COUNT(1) FROM hive_follows hf2 WHERE hf2.follower = data_cfe.user_id AND hf2.state = 1) AS following_count
FROM
data_cfe
) AS data_set(user_id, followers_count, following_count)
WHERE
ha.id = data_set.user_id;
END
$BODY$
;
\ No newline at end of file
...@@ -441,7 +441,8 @@ class Blocks: ...@@ -441,7 +441,8 @@ class Blocks:
"SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block), "SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block),
"SELECT update_feed_cache({}, {})".format(first_block, last_block), "SELECT update_feed_cache({}, {})".format(first_block, last_block),
"SELECT update_hive_posts_mentions({}, {})".format(first_block, last_block), "SELECT update_hive_posts_mentions({}, {})".format(first_block, last_block),
"SELECT update_notification_cache({}, {}, {})".format(first_block, last_block, is_hour_action) "SELECT update_notification_cache({}, {}, {})".format(first_block, last_block, is_hour_action),
"SELECT update_follow_count({}, {})".format(first_block, last_block),
#,"SELECT update_account_reputations({}, {})".format(first_block, last_block) #,"SELECT update_account_reputations({}, {})".format(first_block, last_block)
] ]
......
...@@ -16,21 +16,9 @@ from hive.utils.normalize import escape_characters ...@@ -16,21 +16,9 @@ from hive.utils.normalize import escape_characters
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
FOLLOWERS = 'followers'
FOLLOWING = 'following'
DB = Db.instance() DB = Db.instance()
def _flip_dict(dict_to_flip):
"""Swap keys/values. Returned dict values are array of keys."""
flipped = {}
for key, value in dict_to_flip.items():
if value in flipped:
flipped[value].append(key)
else:
flipped[value] = [key]
return flipped
class Follow(DbAdapterHolder): class Follow(DbAdapterHolder):
"""Handles processing of incoming follow ups and flushing to db.""" """Handles processing of incoming follow ups and flushing to db."""
...@@ -74,15 +62,6 @@ class Follow(DbAdapterHolder): ...@@ -74,15 +62,6 @@ class Follow(DbAdapterHolder):
block_num=block_num) block_num=block_num)
cls.idx += 1 cls.idx += 1
if not DbState.is_initial_sync():
for following in op['flg']:
new_state = state
old_state = cls._get_follow_db_state(op['flr'], following)
if new_state == 1:
Follow.follow(op['flr'], following)
if old_state == 1:
Follow.unfollow(op['flr'], following)
if state > 8: if state > 8:
# check if given state exists in dict # check if given state exists in dict
# if exists add follower to a list for a given state # if exists add follower to a list for a given state
...@@ -134,43 +113,7 @@ class Follow(DbAdapterHolder): ...@@ -134,43 +113,7 @@ class Follow(DbAdapterHolder):
at=date) at=date)
@classmethod @classmethod
def _get_follow_db_state(cls, follower, following): def flush(cls):
"""Retrieve current follow state of an account pair."""
sql = """
SELECT
state
FROM
hive_follows hf
INNER JOIN hive_accounts ha_flr ON ha_flr.id = hf.follower AND ha_flr.name = :follower
INNER JOIN hive_accounts ha_flg ON ha_flg.id = hf.following AND ha_flg.name = :following
"""
return cls.db.query_one(sql, follower=follower, following=following)
# -- stat tracking --
_delta = {FOLLOWERS: {}, FOLLOWING: {}}
@classmethod
def follow(cls, follower, following):
"""Applies follow count change the next flush."""
cls._apply_delta(follower, FOLLOWING, 1)
cls._apply_delta(following, FOLLOWERS, 1)
@classmethod
def unfollow(cls, follower, following):
"""Applies follow count change the next flush."""
cls._apply_delta(follower, FOLLOWING, -1)
cls._apply_delta(following, FOLLOWERS, -1)
@classmethod
def _apply_delta(cls, account, role, direction):
"""Modify an account's follow delta in specified direction."""
if not account in cls._delta[role]:
cls._delta[role][account] = 0
cls._delta[role][account] += direction
@classmethod
def _flush_follow_items(cls):
n = 0 n = 0
if cls.follow_items_to_flush: if cls.follow_items_to_flush:
sql_prefix = """ sql_prefix = """
...@@ -220,7 +163,8 @@ class Follow(DbAdapterHolder): ...@@ -220,7 +163,8 @@ class Follow(DbAdapterHolder):
WHEN 7 THEN TRUE WHEN 7 THEN TRUE
WHEN 8 THEN FALSE WHEN 8 THEN FALSE
ELSE EXCLUDED.follow_muted ELSE EXCLUDED.follow_muted
END) END),
block_num = EXCLUDED.block_num
WHERE hf.following = EXCLUDED.following AND hf.follower = EXCLUDED.follower WHERE hf.following = EXCLUDED.following AND hf.follower = EXCLUDED.follower
""" """
values = [] values = []
...@@ -473,95 +417,3 @@ class Follow(DbAdapterHolder): ...@@ -473,95 +417,3 @@ class Follow(DbAdapterHolder):
cls.follow_update_items_to_flush.clear() cls.follow_update_items_to_flush.clear()
cls.idx = 0 cls.idx = 0
return n return n
@classmethod
def flush(cls, trx=False):
"""Flushes pending follow count deltas."""
n = cls._flush_follow_items()
updated = 0
sqls = []
for col, deltas in cls._delta.items():
for delta, names in _flip_dict(deltas).items():
updated += len(names)
query_values = ','.join(["({})".format(account) for account in names])
sql = """
UPDATE
hive_accounts ha
SET
%s = %s + :mag
FROM
(
VALUES
%s
) AS T(name)
WHERE ha.name = T.name
"""
sqls.append((sql % (col, col, query_values), dict(mag=delta)))
if not updated:
return n
start = perf()
cls.db.batch_queries(sqls, trx=trx)
if trx:
log.info("[SYNC] flushed %d follow deltas in %ds",
updated, perf() - start)
cls._delta = {FOLLOWERS: {}, FOLLOWING: {}}
return updated + n
@classmethod
def flush_recount(cls):
"""Recounts follows/following counts for all queued accounts.
This is currently not used; this approach was shown to be too
expensive, but it's useful in case follow counts manage to get
out of sync.
"""
names = set([*cls._delta[FOLLOWERS].keys(),
*cls._delta[FOLLOWING].keys()])
query_values = ','.join(["({})".format(account) for account in names])
sql = """
UPDATE
hive_accounts ha
SET
followers = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND following = ha.id),
following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = ha.id)
FROM
(
VALUES
{}
) AS T(name)
WHERE ha.name = T.name
""".format(query_values)
cls.db.query(sql)
@classmethod
def force_recount(cls):
"""Recounts all follows after init sync."""
log.info("[SYNC] query follower counts")
sql = """
CREATE TEMPORARY TABLE following_counts AS (
SELECT ha.id account_id, COUNT(state) num
FROM hive_accounts ha
LEFT JOIN hive_follows hf ON ha.id = hf.follower AND state = 1
GROUP BY ha.id);
CREATE TEMPORARY TABLE follower_counts AS (
SELECT ha.id account_id, COUNT(state) num
FROM hive_accounts ha
LEFT JOIN hive_follows hf ON ha.id = hf.following AND state = 1
GROUP BY ha.id);
"""
cls.db.query(sql)
log.info("[SYNC] update follower counts")
sql = """
UPDATE hive_accounts SET followers = num FROM follower_counts
WHERE id = account_id AND followers != num;
UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num;
"""
cls.db.query(sql)
...@@ -297,9 +297,6 @@ class Sync: ...@@ -297,9 +297,6 @@ class Sync:
if not CONTINUE_PROCESSING: if not CONTINUE_PROCESSING:
return return
log.info("[INIT] *** Initial cache build ***")
Follow.force_recount()
def from_steemd(self, is_initial_sync=False, chunk_size=1000): def from_steemd(self, is_initial_sync=False, chunk_size=1000):
"""Fast sync strategy: read/process blocks in batches.""" """Fast sync strategy: read/process blocks in batches."""
steemd = self._steem steemd = self._steem
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment