From 367fefce7afed50f58b7cc505570ee120136962f Mon Sep 17 00:00:00 2001 From: Dariusz Kedzierski <dkedzierski@syncad.com> Date: Mon, 22 Jun 2020 11:31:02 +0200 Subject: [PATCH] Cache for post data --- hive/db/schema.py | 14 ++++--- hive/indexer/blocks.py | 2 + hive/indexer/post_data_cache.py | 57 ++++++++++++++++++++++++++ hive/indexer/posts.py | 71 ++++++++++++++++++++------------- hive/indexer/sync.py | 3 -- 5 files changed, 111 insertions(+), 36 deletions(-) create mode 100644 hive/indexer/post_data_cache.py diff --git a/hive/db/schema.py b/hive/db/schema.py index 55254cb82..7b927971b 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -166,11 +166,11 @@ def build_metadata(): sa.Table( 'hive_post_data', metadata, sa.Column('id', sa.Integer, primary_key=True, autoincrement=False), - sa.Column('title', VARCHAR(255), nullable=False), - sa.Column('preview', VARCHAR(1024), nullable=False), - sa.Column('img_url', VARCHAR(1024), nullable=False), - sa.Column('body', TEXT), - sa.Column('json', TEXT) + sa.Column('title', VARCHAR(255), server_default=''), + sa.Column('preview', VARCHAR(1024), server_default=''), + sa.Column('img_url', VARCHAR(1024), server_default=''), + sa.Column('body', TEXT, server_default=''), + sa.Column('json', TEXT, server_default='') ) sa.Table( @@ -198,7 +198,9 @@ def build_metadata(): sa.Column('rshares', sa.BigInteger, nullable=False, server_default='0'), sa.Column('vote_percent', sa.Integer, server_default='0'), sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), - sa.Column('num_changes', sa.Integer, server_default='0'), + sa.Column('num_changes', sa.Integer, server_default='0'), + + sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']), sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 0841c1451..69414049a 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -10,6 +10,7 @@ from hive.indexer.custom_op import CustomOp from hive.indexer.payments import Payments from hive.indexer.follow import Follow from hive.indexer.votes import Votes +from hive.indexer.post_data_cache import PostDataCache log = logging.getLogger(__name__) @@ -53,6 +54,7 @@ class Blocks: # Follows flushing needs to be atomic because recounts are # expensive. So is tracking follows at all; hence we track # deltas in memory and update follow/er counts in bulk. + PostDataCache.flush() cls._flush_blocks() Follow.flush(trx=False) diff --git a/hive/indexer/post_data_cache.py b/hive/indexer/post_data_cache.py new file mode 100644 index 000000000..65a40997e --- /dev/null +++ b/hive/indexer/post_data_cache.py @@ -0,0 +1,57 @@ +import logging +from hive.db.adapter import Db + +log = logging.getLogger(__name__) +DB = Db.instance() + +def escape_characters(text): + characters = ["'", "\\", "_", "%"] + for ch in characters: + text = text.replace(ch, "\\" + ch) + +class PostDataCache(object): + """ Procides cache for DB operations on post data table in order to speed up initial sync """ + _data = {} + + @classmethod + def is_cached(cls, pid): + """ Check if data is cached """ + return pid in cls._data + + @classmethod + def add_data(cls, pid, post_data): + """ Add data to cache """ + cls._data[pid] = post_data + + @classmethod + def flush(cls): + """ Flush data from cache to db """ + if cls._data: + sql = """ + INSERT INTO + hive_post_data (id, title, preview, img_url, body, json) + VALUES + """ + values = [] + for k, data in cls._data.items(): + title = "''" if not data['title'] else "'{}'".format(escape_characters(data['title'])) + preview = "''" if not data['preview'] else "'{}'".format(escape_characters(data['preview'])) + img_url = "''" if not data['img_url'] else "'{}'".format(escape_characters(data['img_url'])) + body = "''" if not data['body'] else "'{}'".format(escape_characters(data['body'])) + json = "'{}'" if not data['json'] else "'{}'".format(escape_characters(data['json'])) + values.append("({},{},{},{},{},{})".format(k, title, preview, img_url, body, json)) + sql += ','.join(values) + sql += """ + ON CONFLICT (id) + DO + UPDATE SET + title = EXCLUDED.title, + preview = EXCLUDED.preview, + img_url = EXCLUDED.img_url, + body = EXCLUDED.body, + json = EXCLUDED.json + WHERE + hive_post_data.id = EXCLUDED.id + """ + DB.query(sql) + cls._data.clear() diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index ecf05ea8d..cbbfed7a3 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -12,6 +12,7 @@ from hive.indexer.accounts import Accounts from hive.indexer.feed_cache import FeedCache from hive.indexer.community import Community, START_DATE from hive.indexer.notify import Notify +from hive.indexer.post_data_cache import PostDataCache from hive.utils.normalize import legacy_amount, asset_to_hbd_hive log = logging.getLogger(__name__) @@ -198,24 +199,32 @@ class Posts: FROM add_hive_post((:author)::varchar, (:permlink)::varchar, (:parent_author)::varchar, (:parent_permlink)::varchar, (:date)::timestamp, (:community_support_start_date)::timestamp); """ - row = DB.query_row(sql, author=op['author'], permlink=op['permlink'], parent_author=op['parent_author'], - parent_permlink=op['parent_permlink'], date=date, community_support_start_date=START_DATE) + row = DB.query_row(sql, author=op['author'], permlink=op['permlink'], + parent_author=op['parent_author'], parent_permlink=op['parent_permlink'], + date=date, community_support_start_date=START_DATE) result = dict(row) # TODO we need to enhance checking related community post validation and honor is_muted. - error = cls._verify_post_against_community(op, result['community_id'], result['is_valid'], result['is_muted']) + error = cls._verify_post_against_community(op, result['community_id'], result['is_valid'], + result['is_muted']) cls._set_id(op['author']+'/'+op['permlink'], result['id']) # add content data to hive_post_data - sql = """ - INSERT INTO hive_post_data (id, title, preview, img_url, body, json) - VALUES (:id, :title, :preview, :img_url, :body, :json)""" - DB.query(sql, id=result['id'], title=op['title'], - preview=op['preview'] if 'preview' in op else "", - img_url=op['img_url'] if 'img_url' in op else "", - body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}') + if DbState.is_initial_sync(): + post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "", + img_url=op['img_url'] if 'img_url' in op else "", body=op['body'], + json=op['json_metadata'] if op['json_metadata'] else '{}') + PostDataCache.add_data(result['id'], post_data) + else: + sql = """ + INSERT INTO hive_post_data (id, title, preview, img_url, body, json) + VALUES (:id, :title, :preview, :img_url, :body, :json)""" + DB.query(sql, id=result['id'], title=op['title'], + preview=op['preview'] if 'preview' in op else "", + img_url=op['img_url'] if 'img_url' in op else "", + body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}') if not DbState.is_initial_sync(): if error: @@ -224,9 +233,10 @@ class Posts: post_id=result['id'], payload=error).write() cls._insert_feed_cache(result, date) - if op['parent_author']: + # TODO: we will do that in batches at the end of sync + #if op['parent_author']: #update parent child count - cls.update_child_count(result['id']) + #cls.update_child_count(result['id']) @classmethod def update_child_count(cls, child_id, op='+'): @@ -333,22 +343,29 @@ class Posts: post['id'] = pid DB.query(sql, **post) - sql = """ - UPDATE - hive_post_data - SET - title = :title, - preview = :preview, - img_url = :img_url, - body = :body, - json = :json - WHERE id = :id - """ + # add content data to hive_post_data + if DbState.is_initial_sync(): + post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "", + img_url=op['img_url'] if 'img_url' in op else "", body=op['body'], + json=op['json_metadata'] if op['json_metadata'] else '{}') + PostDataCache.add_data(pid, post_data) + else: + sql = """ + UPDATE + hive_post_data + SET + title = :title, + preview = :preview, + img_url = :img_url, + body = :body, + json = :json + WHERE id = :id + """ - DB.query(sql, id=pid, title=op['title'], - preview=op['preview'] if 'preview' in op else "", - img_url=op['img_url'] if 'img_url' in op else "", - body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}') + DB.query(sql, id=pid, title=op['title'], + preview=op['preview'] if 'preview' in op else "", + img_url=op['img_url'] if 'img_url' in op else "", + body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}') @classmethod def update_comment_pending_payouts(cls, hived, posts): diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index a064b72bc..d93e88651 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -16,7 +16,6 @@ from hive.steem.block.stream import MicroForkException from hive.indexer.blocks import Blocks from hive.indexer.accounts import Accounts -from hive.indexer.cached_post import CachedPost from hive.indexer.feed_cache import FeedCache from hive.indexer.follow import Follow from hive.indexer.community import Community @@ -199,8 +198,6 @@ class Sync: num = Blocks.process(block, {}, steemd) follows = Follow.flush(trx=False) accts = Accounts.flush(steemd, trx=False, spread=8) - # CachedPost.dirty_paidouts(block['timestamp']) - # cnt = CachedPost.flush(steemd, trx=False) self._db.query("COMMIT") ms = (perf() - start_time) * 1000 -- GitLab