From cb3844bf1b337eaad3a47e1ed139dd0b0cdac8cb Mon Sep 17 00:00:00 2001 From: Dariusz Kedzierski <dkedzierski@syncad.com> Date: Thu, 4 Jun 2020 23:11:41 +0200 Subject: [PATCH] [WIP] Works on sync and removing get_content call to download comment data. NOT TESTED! Probably will mess your db badly! --- hive/indexer/blocks.py | 13 ++- hive/indexer/cached_post.py | 21 +++- hive/indexer/jobs.py | 1 + hive/indexer/posts.py | 169 +++++++++++++++++++++++++---- hive/server/common/payout_stats.py | 6 +- hive/steem/client.py | 6 +- scripts/update_hivemind_db.sql | 1 + 7 files changed, 177 insertions(+), 40 deletions(-) diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index fa47f4af8..bfee74c8d 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -63,8 +63,8 @@ class Blocks: num = cls._push(block) date = block['timestamp'] + # [DK] we will make two scans, first scan will register all accounts account_names = set() - json_ops = [] for tx_idx, tx in enumerate(block['transactions']): for operation in tx['operations']: op_type = operation['type'] @@ -81,9 +81,17 @@ class Blocks: account_names.add(op['new_account_name']) elif op_type == 'create_claimed_account_operation': account_names.add(op['new_account_name']) + Accounts.register(account_names, date) # register any new names + + # second scan will process all other ops + json_ops = [] + for tx_idx, tx in enumerate(block['transactions']): + for operation in tx['operations']: + op_type = operation['type'] + op = operation['value'] # account metadata updates - elif op_type == 'account_update_operation': + if op_type == 'account_update_operation': if not is_initial_sync: Accounts.dirty(op['account']) # full elif op_type == 'account_update2_operation': @@ -110,7 +118,6 @@ class Blocks: elif op_type == 'custom_json_operation': json_ops.append(op) - Accounts.register(account_names, date) # register any new names CustomOp.process_ops(json_ops, num, date) # follow/reblog/community ops return num diff --git a/hive/indexer/cached_post.py b/hive/indexer/cached_post.py index e0db3053b..48626828f 100644 --- a/hive/indexer/cached_post.py +++ b/hive/indexer/cached_post.py @@ -27,7 +27,7 @@ def _keyify(items): return dict(map(lambda x: ("val_%d" % x[0], x[1]), enumerate(items))) class CachedPost: - """Maintain update queue and writing to `hive_posts_cache`.""" + """Maintain update queue and writing to `hive_posts`.""" # cursor signifying upper bound of cached post span _last_id = -1 @@ -116,6 +116,7 @@ class CachedPost: - author/permlink is unique and always references the same post - you can always get_content on any author/permlink you see in an op """ + raise NotImplementedError("Cannot delete from CachedPost!!!") DB.query("DELETE FROM hive_posts_cache WHERE post_id = :id", id=post_id) DB.query("DELETE FROM hive_post_tags WHERE post_id = :id", id=post_id) @@ -230,7 +231,7 @@ class CachedPost: @classmethod def _select_paidout_tuples(cls, date): - """Query hive_posts_cache for payout sweep. + """Query hive_posts for payout sweep. Select all posts which should have been paid out before `date` yet do not have the `is_paidout` flag set. We perform this @@ -241,14 +242,20 @@ class CachedPost: """ from hive.indexer.posts import Posts - sql = """SELECT post_id FROM hive_posts_cache + sql = """SELECT id FROM hive_posts WHERE is_paidout = '0' AND payout_at <= :date""" ids = DB.query_col(sql, date=date) if not ids: return [] - sql = """SELECT id, author, permlink - FROM hive_posts WHERE id IN :ids""" + sql = """ + SELECT + hp.id, ha_a.name as author, hpd_p.permlink as permlink, + FROM + hive_posts hp + LEFT JOIN hive_accounts ha_a ON ha_a.id = hp.author_id + LEFT JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id + WHERE hp.id IN :ids""" results = DB.query_all(sql, ids=tuple(ids)) return Posts.save_ids_from_tuples(results) @@ -394,7 +401,7 @@ class CachedPost: """Retrieve the latest post_id that was cached.""" if cls._last_id == -1: # after initial query, we maintain last_id w/ _bump_last_id() - sql = "SELECT COALESCE(MAX(post_id), 0) FROM hive_posts_cache" + sql = "SELECT id FROM hive_posts ORDER BY id DESC LIMIT 1" cls._last_id = DB.query_one(sql) return cls._last_id @@ -687,8 +694,10 @@ class CachedPost: @classmethod def _insert(cls, values): + raise NotImplementedError("Cannot insert from CachedPost") return DB.build_insert('hive_posts_cache', values, pk='post_id') @classmethod def _update(cls, values): + raise NotImplementedError("Cannot update from CachedPost") return DB.build_update('hive_posts_cache', values, pk='post_id') diff --git a/hive/indexer/jobs.py b/hive/indexer/jobs.py index c64ff3768..e754df55f 100644 --- a/hive/indexer/jobs.py +++ b/hive/indexer/jobs.py @@ -15,6 +15,7 @@ def _last_cached_post_id(db): def audit_cache_missing(db, steem): """Scan all posts to check for missing cache entries.""" + raise NotImplementedError("Post cache is disabled in this version") last_id = _last_cached_post_id(db) step = 1000000 steps = int(last_id / step) + 1 diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index 3917217a1..c66df2bc4 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -117,26 +117,80 @@ class Posts: @classmethod def insert(cls, op, date): """Inserts new post records.""" - # TODO check if category and permlink exists - sql = """INSERT INTO hive_posts (is_valid, is_muted, parent_id, author_id, - permlink_id, category_id, community_id, depth, created_at) - VALUES (:is_valid, :is_muted, :parent_id, (SELECT id FROM hive_accounts WHERE name = :author), - (SELECT id FROM hive_permlink_data WHERE permlink = :permlink), (SELECT :category), :community_id, :depth, :date)""" + + # inserting new post + # * Check for permlink, parent_permlink, root_permlink + # * Check for authro, parent_author, root_author + # * check for category data + # * insert post basic data + # * obtain id + # * insert post content data + + # add permlinks to permlink table + for permlink in [op['permlink'], op['parent_permlink'], op['root_permlink']]: + sql = """ + INSERT INTO hive_permlink_data (permlink) + VALUES (:permlink) + ON CONFLICT (permlink) DO NOTHING""" + DB.query(permlink=permlink) + + # add category to category table + sql = """ + INSERT INTO hive_category_data (category) + VALUES (:category) + ON CONFLICT (category) DO NOTHING""" + DB.query(category=op['category']) + + sql = """ + INSERT INTO hive_posts (parent_id, author_id, permlink_id, + category_id, community_id, created_at, depth, is_deleted, is_pinned, + is_muted, is_valid, promoted, children, author_rep, flag_weight, + total_votes, up_votes, payout, payout_at, updated_at, is_paidout, + is_nsfw, is_declined, is_full_power, is_hidden, is_grayed, rshares, + sc_trend, sc_hot, parent_author_id, parent_permlink_id, + curator_payout_value, root_author_id, root_permlink_id, + max_accepted_payout, percent_steem_dollars, allow_replies, allow_votes, + allow_curation_rewards, beneficiaries, url, root_title) + VALUES (:parent_id, + (SELECT id FROM hive_accounts WHERE name = :author), + (SELECT id FROM hive_permlink_data WHERE permlink = :permlink), + (SELECT id FROM hive_category_data WHERE category = :category) + :community_id, :created_at, :depth, :is_deleted, :is_pinned, + :is_muted, :is_valid, :promoted, :children, :author_rep, :flag_weight, + :total_votes, :up_votes, :payout, :payout_at, :updated_at, :is_paidout, + :is_nsfw, :is_declined, :is_full_power, :is_hidden, :is_grayed, :rshares, + :sc_trend, :sc_hot, + (SELECT id FROM hive_accounts WHERE name = :parent_author), + (SELECT id FROM hive_permlink_data WHERE permlink = :parent_permlink), + :curator_payout_value, + (SELECT id FROM hive_accounts WHERE name = :root_author), + (SELECT id FROM hive_permlink_data WHERE permlink = :root_permlink), + :max_accepted_payout, :percent_steem_dollars, :allow_replies, :allow_votes, + :allow_curation_rewards, :beneficiaries, :url, :root_title)""" sql += ";SELECT currval(pg_get_serial_sequence('hive_posts','id'))" post = cls._build_post(op, date) result = DB.query(sql, **post) post['id'] = int(list(result)[0][0]) cls._set_id(op['author']+'/'+op['permlink'], post['id']) + # add content data to hive_post_data + sql = """ + INSERT INTO hive_post_data (id, title, preview, img_url, body, + votes, json) + VALUES (:id, :title, :preview, :img_url, :body, :votes, :json)""" + DB.query(sql, id=post['id'], title=op['title'], preview=op['preview'], + img_url=op['img_url'], body=op['body'], votes=op['votes'], + json=op['json_metadata']) + if not DbState.is_initial_sync(): if post['error']: author_id = Accounts.get_id(post['author']) Notify('error', dst_id=author_id, when=date, post_id=post['id'], payload=post['error']).write() - CachedPost.insert(op['author'], op['permlink'], post['id']) + # TODO: [DK] Remove CachedPost + # CachedPost.insert(op['author'], op['permlink'], post['id']) if op['parent_author']: # update parent's child count - CachedPost.recount(op['parent_author'], - op['parent_permlink'], post['parent_id']) + CachedPost.recount(op['parent_author'], op['parent_permlink'], post['parent_id']) cls._insert_feed_cache(post) @classmethod @@ -156,8 +210,8 @@ class Posts: Notify('error', dst_id=author_id, when=date, post_id=post['id'], payload=post['error']).write() - CachedPost.undelete(pid, post['author'], post['permlink'], - post['category']) + # TODO: [DK] Remove CachedPost + #CachedPost.undelete(pid, post['author'], post['permlink'], post['category']) cls._insert_feed_cache(post) @classmethod @@ -167,9 +221,11 @@ class Posts: DB.query("UPDATE hive_posts SET is_deleted = '1' WHERE id = :id", id=pid) if not DbState.is_initial_sync(): - CachedPost.delete(pid, op['author'], op['permlink']) + # TODO: [DK] Remove CachedPost + #CachedPost.delete(pid, op['author'], op['permlink']) if depth == 0: - # TODO: delete from hive_reblogs -- otherwise feed cache gets populated with deleted posts somwrimas + # TODO: delete from hive_reblogs -- otherwise feed cache gets + # populated with deleted posts somwrimas FeedCache.delete(pid) else: # force parent child recount when child is deleted @@ -185,15 +241,65 @@ class Posts: a signal to update cache record. """ # pylint: disable=unused-argument - if not DbState.is_initial_sync(): - CachedPost.update(op['author'], op['permlink'], pid) + #if not DbState.is_initial_sync(): + # CachedPost.update(op['author'], op['permlink'], pid) + sql = """ + UPDATE hive_posts + SET + parent_id = :parent_id, + community_id = :community_id, + created_at = :created_at, + is_deleted = :is_deleted, + is_pinned = :is_pinned, + is_muted = :is_muted, + is_valid = :is_valid, + promoted = :promoted, + children = :children, + author_rep = :author_rep, + flag_weight = :flag_weight, + total_votes = :total_votes, + up_votes = :up_votes, + payout = :payout, + payout_at = :payout_at, + updated_at = :updated_at, + is_paidout = :is_paidout, + is_nsfw = :is_nsfw, + is_declined = :is_declined, + is_full_power = :is_full_power, + is_hidden = :is_hidden, + is_grayed = :is_grayed, + rshares = :rshares, + sc_trend = :sc_trend, + sc_hot = :sc_hot, + parent_author_id = (SELECT id FROM hive_accounts WHERE name = :parent_author), + parent_permlink_id = (SELECT id FROM hive_permlink_data WHERE permlink = :parent_permlink), + curator_payout_value = :curator_payout_value, + root_author_id = (SELECT id FROM hive_accounts WHERE name = :root_author), + root_permlink_id = (SELECT id FROM hive_permlink_data WHERE permlink = :root_permlink), + max_accepted_payout = :max_accepted_payout, + percent_steem_dollars = :percent_steem_dollars, + allow_replies = :allow_replies, + allow_votes = :allow_votes, + allow_curation_rewards = :allow_curation_rewards, + beneficiaries = :beneficiaries, + url = :url, + root_title = :root_title + WHERE id = :id""" + post = cls._build_post(op, date, pid) + DB.query(sql, **post) @classmethod def _get_parent_by_child_id(cls, child_id): """Get parent's `id`, `author`, `permlink` by child id.""" - sql = """SELECT id, author, permlink FROM hive_posts - WHERE id = (SELECT parent_id FROM hive_posts - WHERE id = :child_id)""" + sql = """ + SELECT + hp.id, ha_a.name as author, hpd_p.permlink as permlink, + FROM + hive_posts hp + LEFT JOIN hive_accounts ha_a ON ha_a.id = hp.author_id + LEFT JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id + WHERE + hp.id = (SELECT parent_id FROM hive_posts WHERE id = :child_id)""" result = DB.query_row(sql, child_id=child_id) assert result, "parent of %d not found" % child_id return result @@ -233,8 +339,11 @@ class Posts: # this is a comment; inherit parent props. else: parent_id = cls.get_id(op['parent_author'], op['parent_permlink']) - sql = """SELECT depth, category, community_id, is_valid, is_muted - FROM hive_posts WHERE id = :id""" + sql = """ + SELECT depth, hcd.category as category, community_id, is_valid, is_muted + FROM hive_posts hp + LEFT JOIN hive_category_data hcd ON hcd.id = hp.category_id + WHERE hp.id = :id""" (parent_depth, category, community_id, is_valid, is_muted) = DB.query_row(sql, id=parent_id) depth = parent_depth + 1 @@ -248,7 +357,21 @@ class Posts: #is_valid = False # TODO: reserved for future blacklist status? is_muted = True - return dict(author=op['author'], permlink=op['permlink'], id=pid, - is_valid=is_valid, is_muted=is_muted, parent_id=parent_id, - depth=depth, category=category, community_id=community_id, - date=date, error=error) + return dict(parent_id=parent_id, + author=op['author'], permlink=op['permlink'], id=pid, + category=category, community_id=community_id, created_at=op['created_at'], + depth=depth, is_deleted=op['is_deleted'], is_pinned=op['is_pinned'], + is_muted=op['is_muted'], is_valid=is_valid, promoted=op['promoted'], + children=op['children'], author_rep=op['author_rep'], flag_weight=op['flag_weight'], + total_votes=op['total_votes'], up_votes=op['up_votes'], payout=op['payout'], + payout_at=op['payout_at'], updated_at=op['updated_at'], is_paidout=op['is_paidout'], + is_nsfw=op['is_nsfw'], is_declined=op['is_declined'], is_full_power=op['is_full_power'], + is_hidden=op['is_hidden'], is_grayed=op['is_grayed'], rshares=op['rshares'], + sc_trend=op['sc_trend'], sc_hot=op['sc_hot'], parent_author=op['parent_author'], + parent_permlink=op['parent_permlink'], + curator_payout_value=op['curator_payout_value'], root_author=op['root_author'], + root_permlink=op['root_permlink'], + max_accepted_payout=op['max_accepted_payout'], percent_steem_dollars=op['percent_steem_dollars'], + allow_replies=op['allow_replies'], allow_votes=op['allow_votes'], + allow_curation_rewards=op['allow_curation_rewards'], beneficiaries=op['beneficiaries'], url=op['url'], + root_title=op['root_title'], date=date, error=error) diff --git a/hive/server/common/payout_stats.py b/hive/server/common/payout_stats.py index 7a3e90873..f0c97594f 100644 --- a/hive/server/common/payout_stats.py +++ b/hive/server/common/payout_stats.py @@ -42,7 +42,7 @@ class PayoutStats: SUM(payout) payout, COUNT(*) posts, NULL authors - FROM hive_posts_cache + FROM hive_posts WHERE is_paidout = '0' GROUP BY community_id, author @@ -52,8 +52,8 @@ class PayoutStats: NULL author, SUM(payout) payout, COUNT(*) posts, - COUNT(DISTINCT(author)) authors - FROM hive_posts_cache + COUNT(DISTINCT(author_id)) authors + FROM hive_posts WHERE is_paidout = '0' GROUP BY community_id """ diff --git a/hive/steem/client.py b/hive/steem/client.py index f8efb7b3c..db2404945 100644 --- a/hive/steem/client.py +++ b/hive/steem/client.py @@ -44,11 +44,7 @@ class SteemClient: def get_content_batch(self, tuples): """Fetch multiple comment objects.""" - posts = self.__exec_batch('get_content', tuples) - # TODO: how are we ensuring sequential results? need to set and sort id. - for post in posts: # sanity-checking jussi responses - assert 'author' in post, "invalid post: %s" % post - return posts + raise NotImplementedError("get_content is not implemented in hived") def get_block(self, num, strict=True): """Fetches a single block. diff --git a/scripts/update_hivemind_db.sql b/scripts/update_hivemind_db.sql index c24d83f5c..9191e0ab6 100644 --- a/scripts/update_hivemind_db.sql +++ b/scripts/update_hivemind_db.sql @@ -292,6 +292,7 @@ ALTER TABLE hive_reblogs ADD CONSTRAINT hive_reblogs_fk2 FOREIGN KEY (post_id) R ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_fk1 FOREIGN KEY (author_id) REFERENCES hive_accounts(id); ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_fk3 FOREIGN KEY (parent_id) REFERENCES hive_posts(id); +ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_fk4 FOREIGN KEY (permlink_id) REFERENCES hive_permlink_data(id); ALTER TABLE hive_posts ADD CONSTRAINT hive_posts_ux1 UNIQUE (author_id, permlink_id); -- Make indexes in hive_posts -- GitLab