Skip to content
Snippets Groups Projects
Commit 785e2301 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'dk-issue-3'

Resolved Conflicts:
	hive/indexer/posts.py
parents 0df52b19 367fefce
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
......@@ -161,11 +161,11 @@ def build_metadata():
sa.Table(
'hive_post_data', metadata,
sa.Column('id', sa.Integer, primary_key=True, autoincrement=False),
sa.Column('title', VARCHAR(255), nullable=False),
sa.Column('preview', VARCHAR(1024), nullable=False),
sa.Column('img_url', VARCHAR(1024), nullable=False),
sa.Column('body', TEXT),
sa.Column('json', TEXT)
sa.Column('title', VARCHAR(255), server_default=''),
sa.Column('preview', VARCHAR(1024), server_default=''),
sa.Column('img_url', VARCHAR(1024), server_default=''),
sa.Column('body', TEXT, server_default=''),
sa.Column('json', TEXT, server_default='')
)
sa.Table(
......@@ -193,7 +193,9 @@ def build_metadata():
sa.Column('rshares', sa.BigInteger, nullable=False, server_default='0'),
sa.Column('vote_percent', sa.Integer, server_default='0'),
sa.Column('last_update', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('num_changes', sa.Integer, server_default='0'),
sa.Column('num_changes', sa.Integer, server_default='0'),
sa.UniqueConstraint('voter_id', 'author_id', 'permlink_id', name='hive_votes_ux1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
......
......@@ -10,6 +10,7 @@ from hive.indexer.custom_op import CustomOp
from hive.indexer.payments import Payments
from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
log = logging.getLogger(__name__)
......@@ -53,6 +54,7 @@ class Blocks:
# Follows flushing needs to be atomic because recounts are
# expensive. So is tracking follows at all; hence we track
# deltas in memory and update follow/er counts in bulk.
PostDataCache.flush()
cls._flush_blocks()
Follow.flush(trx=False)
......
import logging
from hive.db.adapter import Db
log = logging.getLogger(__name__)
DB = Db.instance()
def escape_characters(text):
characters = ["'", "\\", "_", "%"]
for ch in characters:
text = text.replace(ch, "\\" + ch)
class PostDataCache(object):
""" Procides cache for DB operations on post data table in order to speed up initial sync """
_data = {}
@classmethod
def is_cached(cls, pid):
""" Check if data is cached """
return pid in cls._data
@classmethod
def add_data(cls, pid, post_data):
""" Add data to cache """
cls._data[pid] = post_data
@classmethod
def flush(cls):
""" Flush data from cache to db """
if cls._data:
sql = """
INSERT INTO
hive_post_data (id, title, preview, img_url, body, json)
VALUES
"""
values = []
for k, data in cls._data.items():
title = "''" if not data['title'] else "'{}'".format(escape_characters(data['title']))
preview = "''" if not data['preview'] else "'{}'".format(escape_characters(data['preview']))
img_url = "''" if not data['img_url'] else "'{}'".format(escape_characters(data['img_url']))
body = "''" if not data['body'] else "'{}'".format(escape_characters(data['body']))
json = "'{}'" if not data['json'] else "'{}'".format(escape_characters(data['json']))
values.append("({},{},{},{},{},{})".format(k, title, preview, img_url, body, json))
sql += ','.join(values)
sql += """
ON CONFLICT (id)
DO
UPDATE SET
title = EXCLUDED.title,
preview = EXCLUDED.preview,
img_url = EXCLUDED.img_url,
body = EXCLUDED.body,
json = EXCLUDED.json
WHERE
hive_post_data.id = EXCLUDED.id
"""
DB.query(sql)
cls._data.clear()
......@@ -12,6 +12,7 @@ from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache
from hive.utils.normalize import legacy_amount, asset_to_hbd_hive
log = logging.getLogger(__name__)
......@@ -106,21 +107,27 @@ class Posts:
cls._set_id(op['author']+'/'+op['permlink'], result['id'])
# add content data to hive_post_data
sql = """
INSERT INTO hive_post_data (id, title, preview, img_url, body, json)
VALUES (:id, :title, :preview, :img_url, :body, :json)
ON CONFLICT ON CONSTRAINT hive_post_data_pkey DO UPDATE SET
title = :title,
preview = :preview,
img_url = :img_url,
body = :body,
json = :json
"""
DB.query(sql, id=result['id'], title=op['title'],
preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "",
body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}')
# add content data to hive_post_data
if DbState.is_initial_sync():
post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "", body=op['body'],
json=op['json_metadata'] if op['json_metadata'] else '{}')
PostDataCache.add_data(result['id'], post_data)
else:
sql = """
INSERT INTO hive_post_data (id, title, preview, img_url, body, json)
VALUES (:id, :title, :preview, :img_url, :body, :json)
ON CONFLICT ON CONSTRAINT hive_post_data_pkey DO UPDATE SET
title = :title,
preview = :preview,
img_url = :img_url,
body = :body,
json = :json
"""
DB.query(sql, id=result['id'], title=op['title'],
preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "",
body=op['body'], json=op['json_metadata'] if op['json_metadata'] else '{}')
if not DbState.is_initial_sync():
if error:
......
......@@ -16,7 +16,6 @@ from hive.steem.block.stream import MicroForkException
from hive.indexer.blocks import Blocks
from hive.indexer.accounts import Accounts
from hive.indexer.cached_post import CachedPost
from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow
from hive.indexer.community import Community
......@@ -199,8 +198,6 @@ class Sync:
num = Blocks.process(block, {}, steemd)
follows = Follow.flush(trx=False)
accts = Accounts.flush(steemd, trx=False, spread=8)
# CachedPost.dirty_paidouts(block['timestamp'])
# cnt = CachedPost.flush(steemd, trx=False)
self._db.query("COMMIT")
ms = (perf() - start_time) * 1000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment