From 093388a90c2935f4d208b91807a6a3c9c742873e Mon Sep 17 00:00:00 2001 From: Howo Date: Thu, 27 Nov 2025 21:33:49 -0500 Subject: [PATCH 1/3] move sync to single transactions to increase speed --- hive/indexer/notification_cache.py | 16 ++++++++-------- hive/indexer/posts.py | 8 ++++---- hive/indexer/votes.py | 16 +++++++++++----- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/hive/indexer/notification_cache.py b/hive/indexer/notification_cache.py index 1155ca289..eec3d5825 100644 --- a/hive/indexer/notification_cache.py +++ b/hive/indexer/notification_cache.py @@ -84,14 +84,14 @@ class VoteNotificationCache(NotificationCache): ORDER BY hn.block_num, created_at, hn.src, hn.dst ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ + cls.beginTx() for chunk in chunks(cls.vote_notifications, 1000): - cls.beginTx() values_str = ",".join( f"({n['block_num']}, {escape_characters(n['voter'])}, {escape_characters(n['author'])}, {escape_characters(n['permlink'])}, {escape_characters(n['last_update'])}::timestamp, {n['rshares']}, {n['counter']})" for k, n in chunk.items() ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.vote_notifications.clear() @@ -150,14 +150,14 @@ class PostNotificationCache(NotificationCache): ORDER BY n.block_num, n.type_id, n.created_at, n.src, n.dst, n.dst_post_id, n.post_id ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ + cls.beginTx() for chunk in chunks(cls.comment_notifications, 1000): - cls.beginTx() values_str = ",".join( f"({n['block_num']}, {n['type_id']}, {escape_characters(n['created_at'])}::timestamp, {n['src']}, {n['dst']}, {n['dst_post_id']}, {n['post_id']}, {n['counter']})" for _, n in chunk.items() ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.comment_notifications.clear() @@ -225,14 +225,14 @@ class FollowNotificationCache(NotificationCache): ORDER BY nd.block_num, created_at, r.id, r.id ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ + cls.beginTx() for chunk in chunks(cls.follow_notifications_to_flush, 1000): - cls.beginTx() values_str = ",".join( f"({follower}, {following}, {block_num}, {counter})" for (follower, following, block_num, counter) in chunk ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.follow_notifications_to_flush.clear() @@ -295,14 +295,14 @@ class ReblogNotificationCache(NotificationCache): ORDER BY n.block_num, n.created_at, r.id, g.id, pp.parent_id, pp.id ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ + cls.beginTx() for chunk in chunks(cls.reblog_notifications_to_flush, 1000): - cls.beginTx() values_str = ",".join( f"({n['block_num']}, {escape_characters(n['created_at'])}::timestamp, {escape_characters(n['src'])}, {escape_characters(n['dst'])}, {escape_characters(n['permlink'])}, {n['counter']})" for _, n in chunk.items() ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.reblog_notifications_to_flush.clear() diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index 10a2fd2f4..3a8792d11 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -217,15 +217,15 @@ class Posts(DbAdapterHolder): WHERE ihp.permlink_id = data_source.permlink_id and ihp.author_id = data_source.author_id """ - for chunk in chunks(cls._comment_payout_ops, 1000): - cls.beginTx() + cls.beginTx() + cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') - cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') # synchronise with update_posts_rshares in votes + for chunk in chunks(cls._comment_payout_ops, 1000): values_str = ','.join(chunk) actual_query = sql.format(values_str) cls.db.query_prepared(actual_query) - cls.commitTx() + cls.commitTx() n = len(cls._comment_payout_ops) cls._comment_payout_ops.clear() diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index 4044b52ff..484068491 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -174,11 +174,14 @@ class Votes(DbAdapterHolder): # WHERE clause above seems superfluous (and works all the same without it, at least up to 5mln) cnt = count() + cls.beginTx() + cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') + + all_post_ids = [] for chunk in chunks(cls._votes_data, 1000): - cls.beginTx() values_str = ','.join( "({}, '{}', '{}', {}, {}, {}, {}, '{}'::timestamp, {}, {}, {})".format( - next(cnt), # for ordering + next(cnt), vd['voter'], vd['author'], vd['permlink'], @@ -193,9 +196,12 @@ class Votes(DbAdapterHolder): ) actual_query = sql.format(values_str) post_ids = cls.db.query_prepared_all(actual_query) - cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') # synchronise with update hive_posts in posts - cls.db.query_no_return("SELECT * FROM hivemind_app.update_posts_rshares(:post_ids)", post_ids=[id[0] for id in post_ids]) - cls.commitTx() + all_post_ids.extend([id[0] for id in post_ids]) + + if all_post_ids: + cls.db.query_no_return("SELECT * FROM hivemind_app.update_posts_rshares(:post_ids)", post_ids=all_post_ids) + + cls.commitTx() n = len(cls._votes_data) cls._votes_data.clear() -- GitLab From 398a3fd54ccd4d51c4ab432e759ce4587d227bb5 Mon Sep 17 00:00:00 2001 From: Howo Date: Thu, 27 Nov 2025 23:17:32 -0500 Subject: [PATCH 2/3] try with vote only --- hive/indexer/notification_cache.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hive/indexer/notification_cache.py b/hive/indexer/notification_cache.py index eec3d5825..c8592cbd6 100644 --- a/hive/indexer/notification_cache.py +++ b/hive/indexer/notification_cache.py @@ -150,14 +150,14 @@ class PostNotificationCache(NotificationCache): ORDER BY n.block_num, n.type_id, n.created_at, n.src, n.dst, n.dst_post_id, n.post_id ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ - cls.beginTx() for chunk in chunks(cls.comment_notifications, 1000): + cls.beginTx() values_str = ",".join( f"({n['block_num']}, {n['type_id']}, {escape_characters(n['created_at'])}::timestamp, {n['src']}, {n['dst']}, {n['dst_post_id']}, {n['post_id']}, {n['counter']})" for _, n in chunk.items() ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.comment_notifications.clear() @@ -225,14 +225,14 @@ class FollowNotificationCache(NotificationCache): ORDER BY nd.block_num, created_at, r.id, r.id ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ - cls.beginTx() for chunk in chunks(cls.follow_notifications_to_flush, 1000): + cls.beginTx() values_str = ",".join( f"({follower}, {following}, {block_num}, {counter})" for (follower, following, block_num, counter) in chunk ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.follow_notifications_to_flush.clear() @@ -295,14 +295,14 @@ class ReblogNotificationCache(NotificationCache): ORDER BY n.block_num, n.created_at, r.id, g.id, pp.parent_id, pp.id ON CONFLICT (src, dst, type_id, post_id, block_num) DO NOTHING """ - cls.beginTx() for chunk in chunks(cls.reblog_notifications_to_flush, 1000): + cls.beginTx() values_str = ",".join( f"({n['block_num']}, {escape_characters(n['created_at'])}::timestamp, {escape_characters(n['src'])}, {escape_characters(n['dst'])}, {escape_characters(n['permlink'])}, {n['counter']})" for _, n in chunk.items() ) cls.db.query_prepared(sql.format(values_str)) - cls.commitTx() + cls.commitTx() else: n = 0 cls.reblog_notifications_to_flush.clear() -- GitLab From a2c8ff525b54576814c9c422fbcf7a4d52ea4a73 Mon Sep 17 00:00:00 2001 From: Howo Date: Fri, 28 Nov 2025 11:44:05 -0500 Subject: [PATCH 3/3] revert modifications but keep voteNotification changes --- hive/indexer/posts.py | 8 ++++---- hive/indexer/votes.py | 16 +++++----------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index 3a8792d11..10a2fd2f4 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -217,15 +217,15 @@ class Posts(DbAdapterHolder): WHERE ihp.permlink_id = data_source.permlink_id and ihp.author_id = data_source.author_id """ - cls.beginTx() - cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') - for chunk in chunks(cls._comment_payout_ops, 1000): + cls.beginTx() + + cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') # synchronise with update_posts_rshares in votes values_str = ','.join(chunk) actual_query = sql.format(values_str) cls.db.query_prepared(actual_query) - cls.commitTx() + cls.commitTx() n = len(cls._comment_payout_ops) cls._comment_payout_ops.clear() diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index 484068491..4044b52ff 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -174,14 +174,11 @@ class Votes(DbAdapterHolder): # WHERE clause above seems superfluous (and works all the same without it, at least up to 5mln) cnt = count() - cls.beginTx() - cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') - - all_post_ids = [] for chunk in chunks(cls._votes_data, 1000): + cls.beginTx() values_str = ','.join( "({}, '{}', '{}', {}, {}, {}, {}, '{}'::timestamp, {}, {}, {})".format( - next(cnt), + next(cnt), # for ordering vd['voter'], vd['author'], vd['permlink'], @@ -196,12 +193,9 @@ class Votes(DbAdapterHolder): ) actual_query = sql.format(values_str) post_ids = cls.db.query_prepared_all(actual_query) - all_post_ids.extend([id[0] for id in post_ids]) - - if all_post_ids: - cls.db.query_no_return("SELECT * FROM hivemind_app.update_posts_rshares(:post_ids)", post_ids=all_post_ids) - - cls.commitTx() + cls.db.query_no_return('SELECT pg_advisory_xact_lock(777)') # synchronise with update hive_posts in posts + cls.db.query_no_return("SELECT * FROM hivemind_app.update_posts_rshares(:post_ids)", post_ids=[id[0] for id in post_ids]) + cls.commitTx() n = len(cls._votes_data) cls._votes_data.clear() -- GitLab