Skip to content
Snippets Groups Projects
Commit cb3844bf authored by Dariusz Kędzierski's avatar Dariusz Kędzierski
Browse files

[WIP] Works on sync and removing get_content call to download comment data....

[WIP] Works on sync and removing get_content call to download comment data. NOT TESTED! Probably will mess your db badly!
parent a1093a63
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
......@@ -63,8 +63,8 @@ class Blocks:
num = cls._push(block)
date = block['timestamp']
# [DK] we will make two scans, first scan will register all accounts
account_names = set()
json_ops = []
for tx_idx, tx in enumerate(block['transactions']):
for operation in tx['operations']:
op_type = operation['type']
......@@ -81,9 +81,17 @@ class Blocks:
account_names.add(op['new_account_name'])
elif op_type == 'create_claimed_account_operation':
account_names.add(op['new_account_name'])
Accounts.register(account_names, date) # register any new names
# second scan will process all other ops
json_ops = []
for tx_idx, tx in enumerate(block['transactions']):
for operation in tx['operations']:
op_type = operation['type']
op = operation['value']
# account metadata updates
elif op_type == 'account_update_operation':
if op_type == 'account_update_operation':
if not is_initial_sync:
Accounts.dirty(op['account']) # full
elif op_type == 'account_update2_operation':
......@@ -110,7 +118,6 @@ class Blocks:
elif op_type == 'custom_json_operation':
json_ops.append(op)
Accounts.register(account_names, date) # register any new names
CustomOp.process_ops(json_ops, num, date) # follow/reblog/community ops
return num
......
......@@ -27,7 +27,7 @@ def _keyify(items):
return dict(map(lambda x: ("val_%d" % x[0], x[1]), enumerate(items)))
class CachedPost:
"""Maintain update queue and writing to `hive_posts_cache`."""
"""Maintain update queue and writing to `hive_posts`."""
# cursor signifying upper bound of cached post span
_last_id = -1
......@@ -116,6 +116,7 @@ class CachedPost:
- author/permlink is unique and always references the same post
- you can always get_content on any author/permlink you see in an op
"""
raise NotImplementedError("Cannot delete from CachedPost!!!")
DB.query("DELETE FROM hive_posts_cache WHERE post_id = :id", id=post_id)
DB.query("DELETE FROM hive_post_tags WHERE post_id = :id", id=post_id)
......@@ -230,7 +231,7 @@ class CachedPost:
@classmethod
def _select_paidout_tuples(cls, date):
"""Query hive_posts_cache for payout sweep.
"""Query hive_posts for payout sweep.
Select all posts which should have been paid out before `date`
yet do not have the `is_paidout` flag set. We perform this
......@@ -241,14 +242,20 @@ class CachedPost:
"""
from hive.indexer.posts import Posts
sql = """SELECT post_id FROM hive_posts_cache
sql = """SELECT id FROM hive_posts
WHERE is_paidout = '0' AND payout_at <= :date"""
ids = DB.query_col(sql, date=date)
if not ids:
return []
sql = """SELECT id, author, permlink
FROM hive_posts WHERE id IN :ids"""
sql = """
SELECT
hp.id, ha_a.name as author, hpd_p.permlink as permlink,
FROM
hive_posts hp
LEFT JOIN hive_accounts ha_a ON ha_a.id = hp.author_id
LEFT JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id
WHERE hp.id IN :ids"""
results = DB.query_all(sql, ids=tuple(ids))
return Posts.save_ids_from_tuples(results)
......@@ -394,7 +401,7 @@ class CachedPost:
"""Retrieve the latest post_id that was cached."""
if cls._last_id == -1:
# after initial query, we maintain last_id w/ _bump_last_id()
sql = "SELECT COALESCE(MAX(post_id), 0) FROM hive_posts_cache"
sql = "SELECT id FROM hive_posts ORDER BY id DESC LIMIT 1"
cls._last_id = DB.query_one(sql)
return cls._last_id
......@@ -687,8 +694,10 @@ class CachedPost:
@classmethod
def _insert(cls, values):
raise NotImplementedError("Cannot insert from CachedPost")
return DB.build_insert('hive_posts_cache', values, pk='post_id')
@classmethod
def _update(cls, values):
raise NotImplementedError("Cannot update from CachedPost")
return DB.build_update('hive_posts_cache', values, pk='post_id')
......@@ -15,6 +15,7 @@ def _last_cached_post_id(db):
def audit_cache_missing(db, steem):
"""Scan all posts to check for missing cache entries."""
raise NotImplementedError("Post cache is disabled in this version")
last_id = _last_cached_post_id(db)
step = 1000000
steps = int(last_id / step) + 1
......
......@@ -117,26 +117,80 @@ class Posts:
@classmethod
def insert(cls, op, date):
"""Inserts new post records."""
# TODO check if category and permlink exists
sql = """INSERT INTO hive_posts (is_valid, is_muted, parent_id, author_id,
permlink_id, category_id, community_id, depth, created_at)
VALUES (:is_valid, :is_muted, :parent_id, (SELECT id FROM hive_accounts WHERE name = :author),
(SELECT id FROM hive_permlink_data WHERE permlink = :permlink), (SELECT :category), :community_id, :depth, :date)"""
# inserting new post
# * Check for permlink, parent_permlink, root_permlink
# * Check for authro, parent_author, root_author
# * check for category data
# * insert post basic data
# * obtain id
# * insert post content data
# add permlinks to permlink table
for permlink in [op['permlink'], op['parent_permlink'], op['root_permlink']]:
sql = """
INSERT INTO hive_permlink_data (permlink)
VALUES (:permlink)
ON CONFLICT (permlink) DO NOTHING"""
DB.query(permlink=permlink)
# add category to category table
sql = """
INSERT INTO hive_category_data (category)
VALUES (:category)
ON CONFLICT (category) DO NOTHING"""
DB.query(category=op['category'])
sql = """
INSERT INTO hive_posts (parent_id, author_id, permlink_id,
category_id, community_id, created_at, depth, is_deleted, is_pinned,
is_muted, is_valid, promoted, children, author_rep, flag_weight,
total_votes, up_votes, payout, payout_at, updated_at, is_paidout,
is_nsfw, is_declined, is_full_power, is_hidden, is_grayed, rshares,
sc_trend, sc_hot, parent_author_id, parent_permlink_id,
curator_payout_value, root_author_id, root_permlink_id,
max_accepted_payout, percent_steem_dollars, allow_replies, allow_votes,
allow_curation_rewards, beneficiaries, url, root_title)
VALUES (:parent_id,
(SELECT id FROM hive_accounts WHERE name = :author),
(SELECT id FROM hive_permlink_data WHERE permlink = :permlink),
(SELECT id FROM hive_category_data WHERE category = :category)
:community_id, :created_at, :depth, :is_deleted, :is_pinned,
:is_muted, :is_valid, :promoted, :children, :author_rep, :flag_weight,
:total_votes, :up_votes, :payout, :payout_at, :updated_at, :is_paidout,
:is_nsfw, :is_declined, :is_full_power, :is_hidden, :is_grayed, :rshares,
:sc_trend, :sc_hot,
(SELECT id FROM hive_accounts WHERE name = :parent_author),
(SELECT id FROM hive_permlink_data WHERE permlink = :parent_permlink),
:curator_payout_value,
(SELECT id FROM hive_accounts WHERE name = :root_author),
(SELECT id FROM hive_permlink_data WHERE permlink = :root_permlink),
:max_accepted_payout, :percent_steem_dollars, :allow_replies, :allow_votes,
:allow_curation_rewards, :beneficiaries, :url, :root_title)"""
sql += ";SELECT currval(pg_get_serial_sequence('hive_posts','id'))"
post = cls._build_post(op, date)
result = DB.query(sql, **post)
post['id'] = int(list(result)[0][0])
cls._set_id(op['author']+'/'+op['permlink'], post['id'])
# add content data to hive_post_data
sql = """
INSERT INTO hive_post_data (id, title, preview, img_url, body,
votes, json)
VALUES (:id, :title, :preview, :img_url, :body, :votes, :json)"""
DB.query(sql, id=post['id'], title=op['title'], preview=op['preview'],
img_url=op['img_url'], body=op['body'], votes=op['votes'],
json=op['json_metadata'])
if not DbState.is_initial_sync():
if post['error']:
author_id = Accounts.get_id(post['author'])
Notify('error', dst_id=author_id, when=date,
post_id=post['id'], payload=post['error']).write()
CachedPost.insert(op['author'], op['permlink'], post['id'])
# TODO: [DK] Remove CachedPost
# CachedPost.insert(op['author'], op['permlink'], post['id'])
if op['parent_author']: # update parent's child count
CachedPost.recount(op['parent_author'],
op['parent_permlink'], post['parent_id'])
CachedPost.recount(op['parent_author'], op['parent_permlink'], post['parent_id'])
cls._insert_feed_cache(post)
@classmethod
......@@ -156,8 +210,8 @@ class Posts:
Notify('error', dst_id=author_id, when=date,
post_id=post['id'], payload=post['error']).write()
CachedPost.undelete(pid, post['author'], post['permlink'],
post['category'])
# TODO: [DK] Remove CachedPost
#CachedPost.undelete(pid, post['author'], post['permlink'], post['category'])
cls._insert_feed_cache(post)
@classmethod
......@@ -167,9 +221,11 @@ class Posts:
DB.query("UPDATE hive_posts SET is_deleted = '1' WHERE id = :id", id=pid)
if not DbState.is_initial_sync():
CachedPost.delete(pid, op['author'], op['permlink'])
# TODO: [DK] Remove CachedPost
#CachedPost.delete(pid, op['author'], op['permlink'])
if depth == 0:
# TODO: delete from hive_reblogs -- otherwise feed cache gets populated with deleted posts somwrimas
# TODO: delete from hive_reblogs -- otherwise feed cache gets
# populated with deleted posts somwrimas
FeedCache.delete(pid)
else:
# force parent child recount when child is deleted
......@@ -185,15 +241,65 @@ class Posts:
a signal to update cache record.
"""
# pylint: disable=unused-argument
if not DbState.is_initial_sync():
CachedPost.update(op['author'], op['permlink'], pid)
#if not DbState.is_initial_sync():
# CachedPost.update(op['author'], op['permlink'], pid)
sql = """
UPDATE hive_posts
SET
parent_id = :parent_id,
community_id = :community_id,
created_at = :created_at,
is_deleted = :is_deleted,
is_pinned = :is_pinned,
is_muted = :is_muted,
is_valid = :is_valid,
promoted = :promoted,
children = :children,
author_rep = :author_rep,
flag_weight = :flag_weight,
total_votes = :total_votes,
up_votes = :up_votes,
payout = :payout,
payout_at = :payout_at,
updated_at = :updated_at,
is_paidout = :is_paidout,
is_nsfw = :is_nsfw,
is_declined = :is_declined,
is_full_power = :is_full_power,
is_hidden = :is_hidden,
is_grayed = :is_grayed,
rshares = :rshares,
sc_trend = :sc_trend,
sc_hot = :sc_hot,
parent_author_id = (SELECT id FROM hive_accounts WHERE name = :parent_author),
parent_permlink_id = (SELECT id FROM hive_permlink_data WHERE permlink = :parent_permlink),
curator_payout_value = :curator_payout_value,
root_author_id = (SELECT id FROM hive_accounts WHERE name = :root_author),
root_permlink_id = (SELECT id FROM hive_permlink_data WHERE permlink = :root_permlink),
max_accepted_payout = :max_accepted_payout,
percent_steem_dollars = :percent_steem_dollars,
allow_replies = :allow_replies,
allow_votes = :allow_votes,
allow_curation_rewards = :allow_curation_rewards,
beneficiaries = :beneficiaries,
url = :url,
root_title = :root_title
WHERE id = :id"""
post = cls._build_post(op, date, pid)
DB.query(sql, **post)
@classmethod
def _get_parent_by_child_id(cls, child_id):
"""Get parent's `id`, `author`, `permlink` by child id."""
sql = """SELECT id, author, permlink FROM hive_posts
WHERE id = (SELECT parent_id FROM hive_posts
WHERE id = :child_id)"""
sql = """
SELECT
hp.id, ha_a.name as author, hpd_p.permlink as permlink,
FROM
hive_posts hp
LEFT JOIN hive_accounts ha_a ON ha_a.id = hp.author_id
LEFT JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id
WHERE
hp.id = (SELECT parent_id FROM hive_posts WHERE id = :child_id)"""
result = DB.query_row(sql, child_id=child_id)
assert result, "parent of %d not found" % child_id
return result
......@@ -233,8 +339,11 @@ class Posts:
# this is a comment; inherit parent props.
else:
parent_id = cls.get_id(op['parent_author'], op['parent_permlink'])
sql = """SELECT depth, category, community_id, is_valid, is_muted
FROM hive_posts WHERE id = :id"""
sql = """
SELECT depth, hcd.category as category, community_id, is_valid, is_muted
FROM hive_posts hp
LEFT JOIN hive_category_data hcd ON hcd.id = hp.category_id
WHERE hp.id = :id"""
(parent_depth, category, community_id, is_valid,
is_muted) = DB.query_row(sql, id=parent_id)
depth = parent_depth + 1
......@@ -248,7 +357,21 @@ class Posts:
#is_valid = False # TODO: reserved for future blacklist status?
is_muted = True
return dict(author=op['author'], permlink=op['permlink'], id=pid,
is_valid=is_valid, is_muted=is_muted, parent_id=parent_id,
depth=depth, category=category, community_id=community_id,
date=date, error=error)
return dict(parent_id=parent_id,
author=op['author'], permlink=op['permlink'], id=pid,
category=category, community_id=community_id, created_at=op['created_at'],
depth=depth, is_deleted=op['is_deleted'], is_pinned=op['is_pinned'],
is_muted=op['is_muted'], is_valid=is_valid, promoted=op['promoted'],
children=op['children'], author_rep=op['author_rep'], flag_weight=op['flag_weight'],
total_votes=op['total_votes'], up_votes=op['up_votes'], payout=op['payout'],
payout_at=op['payout_at'], updated_at=op['updated_at'], is_paidout=op['is_paidout'],
is_nsfw=op['is_nsfw'], is_declined=op['is_declined'], is_full_power=op['is_full_power'],
is_hidden=op['is_hidden'], is_grayed=op['is_grayed'], rshares=op['rshares'],
sc_trend=op['sc_trend'], sc_hot=op['sc_hot'], parent_author=op['parent_author'],
parent_permlink=op['parent_permlink'],
curator_payout_value=op['curator_payout_value'], root_author=op['root_author'],
root_permlink=op['root_permlink'],
max_accepted_payout=op['max_accepted_payout'], percent_steem_dollars=op['percent_steem_dollars'],
allow_replies=op['allow_replies'], allow_votes=op['allow_votes'],
allow_curation_rewards=op['allow_curation_rewards'], beneficiaries=op['beneficiaries'], url=op['url'],
root_title=op['root_title'], date=date, error=error)
......@@ -42,7 +42,7 @@ class PayoutStats:
SUM(payout) payout,
COUNT(*) posts,
NULL authors
FROM hive_posts_cache
FROM hive_posts
WHERE is_paidout = '0'
GROUP BY community_id, author
......@@ -52,8 +52,8 @@ class PayoutStats:
NULL author,
SUM(payout) payout,
COUNT(*) posts,
COUNT(DISTINCT(author)) authors
FROM hive_posts_cache
COUNT(DISTINCT(author_id)) authors
FROM hive_posts
WHERE is_paidout = '0'
GROUP BY community_id
"""
......
......@@ -44,11 +44,7 @@ class SteemClient:
def get_content_batch(self, tuples):
"""Fetch multiple comment objects."""
posts = self.__exec_batch('get_content', tuples)
# TODO: how are we ensuring sequential results? need to set and sort id.
for post in posts: # sanity-checking jussi responses
assert 'author' in post, "invalid post: %s" % post
return posts
raise NotImplementedError("get_content is not implemented in hived")
def get_block(self, num, strict=True):
"""Fetches a single block.
......
......@@ -292,6 +292,7 @@ ALTER TABLE hive_reblogs ADD CONSTRAINT hive_reblogs_fk2 FOREIGN KEY (post_id) R
ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_fk1 FOREIGN KEY (author_id) REFERENCES hive_accounts(id);
ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_fk3 FOREIGN KEY (parent_id) REFERENCES hive_posts(id);
ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_fk4 FOREIGN KEY (permlink_id) REFERENCES hive_permlink_data(id);
ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_ux1 UNIQUE (author_id, permlink_id);
-- Make indexes in hive_posts
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment