Skip to content
Snippets Groups Projects
Commit 367fefce authored by Dariusz Kędzierski's avatar Dariusz Kędzierski
Browse files

Cache for post data

parent 9f66215b
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
......@@ -166,11 +166,11 @@ def build_metadata():
sa.Table(
'hive_post_data', metadata,
sa.Column('id', sa.Integer, primary_key=True, autoincrement=False),
sa.Column('title', VARCHAR(255), nullable=False),
sa.Column('preview', VARCHAR(1024), nullable=False),
sa.Column('img_url', VARCHAR(1024), nullable=False),
sa.Column('body', TEXT),
sa.Column('json', TEXT)
sa.Column('title', VARCHAR(255), server_default=''),
sa.Column('preview', VARCHAR(1024), server_default=''),
sa.Column('img_url', VARCHAR(1024), server_default=''),
sa.Column('body', TEXT, server_default=''),
sa.Column('json', TEXT, server_default='')
)
sa.Table(
......@@ -198,7 +198,9 @@ def build_metadata():
sa.Column('rshares', sa.BigInteger, nullable=False, server_default='0'),
sa.Column('vote_percent', sa.Integer, server_default='0'),
sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('num_changes', sa.Integer, server_default='0'),
sa.Column('num_changes', sa.Integer, server_default='0'),
sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
......
......@@ -10,6 +10,7 @@ from hive.indexer.custom_op import CustomOp
from hive.indexer.payments import Payments
from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
log = logging.getLogger(__name__)
......@@ -53,6 +54,7 @@ class Blocks:
# Follows flushing needs to be atomic because recounts are
# expensive. So is tracking follows at all; hence we track
# deltas in memory and update follow/er counts in bulk.
PostDataCache.flush()
cls._flush_blocks()
Follow.flush(trx=False)
......
import logging
from hive.db.adapter import Db
log = logging.getLogger(__name__)
DB = Db.instance()
def escape_characters(text):
characters = ["'", "\\", "_", "%"]
for ch in characters:
text = text.replace(ch, "\\" + ch)
class PostDataCache(object):
""" Procides cache for DB operations on post data table in order to speed up initial sync """
_data = {}
@classmethod
def is_cached(cls, pid):
""" Check if data is cached """
return pid in cls._data
@classmethod
def add_data(cls, pid, post_data):
""" Add data to cache """
cls._data[pid] = post_data
@classmethod
def flush(cls):
""" Flush data from cache to db """
if cls._data:
sql = """
INSERT INTO
hive_post_data (id, title, preview, img_url, body, json)
VALUES
"""
values = []
for k, data in cls._data.items():
title = "''" if not data['title'] else "'{}'".format(escape_characters(data['title']))
preview = "''" if not data['preview'] else "'{}'".format(escape_characters(data['preview']))
img_url = "''" if not data['img_url'] else "'{}'".format(escape_characters(data['img_url']))
body = "''" if not data['body'] else "'{}'".format(escape_characters(data['body']))
json = "'{}'" if not data['json'] else "'{}'".format(escape_characters(data['json']))
values.append("({},{},{},{},{},{})".format(k, title, preview, img_url, body, json))
sql += ','.join(values)
sql += """
ON CONFLICT (id)
DO
UPDATE SET
title = EXCLUDED.title,
preview = EXCLUDED.preview,
img_url = EXCLUDED.img_url,
body = EXCLUDED.body,
json = EXCLUDED.json
WHERE
hive_post_data.id = EXCLUDED.id
"""
DB.query(sql)
cls._data.clear()
......@@ -12,6 +12,7 @@ from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache
from hive.utils.normalize import legacy_amount, asset_to_hbd_hive
log = logging.getLogger(__name__)
......@@ -198,24 +199,32 @@ class Posts:
FROM add_hive_post((:author)::varchar, (:permlink)::varchar, (:parent_author)::varchar, (:parent_permlink)::varchar, (:date)::timestamp, (:community_support_start_date)::timestamp);
"""
row = DB.query_row(sql, author=op['author'], permlink=op['permlink'], parent_author=op['parent_author'],
parent_permlink=op['parent_permlink'], date=date, community_support_start_date=START_DATE)
row = DB.query_row(sql, author=op['author'], permlink=op['permlink'],
parent_author=op['parent_author'], parent_permlink=op['parent_permlink'],
date=date, community_support_start_date=START_DATE)
result = dict(row)
# TODO we need to enhance checking related community post validation and honor is_muted.
error = cls._verify_post_against_community(op, result['community_id'], result['is_valid'], result['is_muted'])
error = cls._verify_post_against_community(op, result['community_id'], result['is_valid'],
result['is_muted'])
cls._set_id(op['author']+'/'+op['permlink'], result['id'])
# add content data to hive_post_data
sql = """
INSERT INTO hive_post_data (id, title, preview, img_url, body, json)
VALUES (:id, :title, :preview, :img_url, :body, :json)"""
DB.query(sql, id=result['id'], title=op['title'],
preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "",
body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}')
if DbState.is_initial_sync():
post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "", body=op['body'],
json=op['json_metadata'] if op['json_metadata'] else '{}')
PostDataCache.add_data(result['id'], post_data)
else:
sql = """
INSERT INTO hive_post_data (id, title, preview, img_url, body, json)
VALUES (:id, :title, :preview, :img_url, :body, :json)"""
DB.query(sql, id=result['id'], title=op['title'],
preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "",
body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}')
if not DbState.is_initial_sync():
if error:
......@@ -224,9 +233,10 @@ class Posts:
post_id=result['id'], payload=error).write()
cls._insert_feed_cache(result, date)
if op['parent_author']:
# TODO: we will do that in batches at the end of sync
#if op['parent_author']:
#update parent child count
cls.update_child_count(result['id'])
#cls.update_child_count(result['id'])
@classmethod
def update_child_count(cls, child_id, op='+'):
......@@ -333,22 +343,29 @@ class Posts:
post['id'] = pid
DB.query(sql, **post)
sql = """
UPDATE
hive_post_data
SET
title = :title,
preview = :preview,
img_url = :img_url,
body = :body,
json = :json
WHERE id = :id
"""
# add content data to hive_post_data
if DbState.is_initial_sync():
post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "", body=op['body'],
json=op['json_metadata'] if op['json_metadata'] else '{}')
PostDataCache.add_data(pid, post_data)
else:
sql = """
UPDATE
hive_post_data
SET
title = :title,
preview = :preview,
img_url = :img_url,
body = :body,
json = :json
WHERE id = :id
"""
DB.query(sql, id=pid, title=op['title'],
preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "",
body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}')
DB.query(sql, id=pid, title=op['title'],
preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "",
body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}')
@classmethod
def update_comment_pending_payouts(cls, hived, posts):
......
......@@ -16,7 +16,6 @@ from hive.steem.block.stream import MicroForkException
from hive.indexer.blocks import Blocks
from hive.indexer.accounts import Accounts
from hive.indexer.cached_post import CachedPost
from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow
from hive.indexer.community import Community
......@@ -199,8 +198,6 @@ class Sync:
num = Blocks.process(block, {}, steemd)
follows = Follow.flush(trx=False)
accts = Accounts.flush(steemd, trx=False, spread=8)
# CachedPost.dirty_paidouts(block['timestamp'])
# cnt = CachedPost.flush(steemd, trx=False)
self._db.query("COMMIT")
ms = (perf() - start_time) * 1000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment