From dc255c151f147c2462cea6e4af3916c13fcb6d90 Mon Sep 17 00:00:00 2001 From: Dariusz Kedzierski <dkedzierski@syncad.com> Date: Tue, 23 Jun 2020 21:13:03 +0200 Subject: [PATCH] Removed track_tx: it was causing missing COMMIT for START TRANSACTION error. Improved keyboard interruption. Added missing APIs: with placeholders but it will be filled with proper methods. Other minor fixes. --- hive/cli.py | 1 - hive/indexer/blocks.py | 42 +++++--------------- hive/indexer/posts.py | 2 +- hive/indexer/sync.py | 63 ++++++++++++++++-------------- hive/indexer/votes.py | 8 ++-- hive/server/follow_api/__init__.py | 0 hive/server/follow_api/methods.py | 16 ++++++++ hive/server/serve.py | 12 ++++++ hive/server/tags_api/methods.py | 33 +++++++++++++++- 9 files changed, 105 insertions(+), 72 deletions(-) create mode 100644 hive/server/follow_api/__init__.py create mode 100644 hive/server/follow_api/methods.py diff --git a/hive/cli.py b/hive/cli.py index 8d6179609..9df933413 100755 --- a/hive/cli.py +++ b/hive/cli.py @@ -22,7 +22,6 @@ def run(): else: launch_mode(mode, conf) - def launch_mode(mode, conf): """Launch a routine as indicated by `mode`.""" if mode == 'server': diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 223e27f79..d71bc0957 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -22,18 +22,14 @@ class Blocks: blocks_to_flush = [] ops_stats = {} - operations_in_tx = 0 - opened_tx = False - - OPERATIONS_IN_TX_TRESHOLD = 500000 - @staticmethod def merge_ops_stats(od1, od2): - for (k, v) in od2.items(): - if k in od1: - od1[k] += v - else: - od1[k] = v + if od2 is not None: + for k, v in od2.items(): + if k in od1: + od1[k] += v + else: + od1[k] = v return od1 @@ -65,7 +61,7 @@ class Blocks: def process_multi(cls, blocks, vops, hived, is_initial_sync=False): """Batch-process blocks; wrapped in a transaction.""" time_start = perf_counter() -# DB.query("START TRANSACTION") + DB.query("START TRANSACTION") last_num = 0 try: @@ -83,7 +79,7 @@ class Blocks: cls._flush_blocks() Follow.flush(trx=False) -# DB.query("COMMIT") + DB.query("COMMIT") time_end = perf_counter() log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start) @@ -119,19 +115,6 @@ class Blocks: return (vote_ops, comment_payout_ops) - @classmethod - def _track_tx(cls, opCount = 1): - if(cls.opened_tx == False): - DB.query("START TRANSACTION") - cls.operations_in_tx = 0 - cls.opened_tx = True - - cls.operations_in_tx += opCount - - if(cls.operations_in_tx >= cls.OPERATIONS_IN_TX_TRESHOLD): - DB.query("COMMIT") - DB.query("START TRANSACTION") - cls.operations_in_tx = 0 @classmethod def _process(cls, block, virtual_operations, hived, is_initial_sync=False): @@ -159,7 +142,6 @@ class Blocks: elif op_type == 'create_claimed_account_operation': account_names.add(op['new_account_name']) - cls._track_tx() Accounts.register(account_names, date) # register any new names # second scan will process all other ops @@ -176,8 +158,6 @@ class Blocks: else: cls.ops_stats[op_type] = 1 - cls._track_tx() - # account metadata updates if op_type == 'account_update_operation': if not is_initial_sync: @@ -211,12 +191,10 @@ class Blocks: if json_ops: custom_ops_stats = CustomOp.process_ops(json_ops, num, date) cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats) - cls._track_tx(len(json_ops)) if update_comment_pending_payouts: payout_ops_stat = Posts.update_comment_pending_payouts(hived, update_comment_pending_payouts) cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, payout_ops_stat) - cls._track_tx(len(update_comment_pending_payouts)) # virtual ops comment_payout_ops = {} @@ -228,7 +206,7 @@ class Blocks: (vote_ops, comment_payout_ops) = virtual_operations[num] if num in virtual_operations else empty_vops else: vops = hived.get_virtual_operations(num) - (vote_ops, comment_payout_ops) = prepare_vops(vops, date) + (vote_ops, comment_payout_ops) = Blocks.prepare_vops(vops, date) for v in vote_ops: Votes.vote_op(v, date) @@ -238,12 +216,10 @@ class Blocks: else: cls.ops_stats[op_type] = 1 - cls._track_tx(len(vote_ops)) if comment_payout_ops: comment_payout_stats = Posts.comment_payout_op(comment_payout_ops, date) cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats) - cls._track_tx(len(comment_payout_ops)) return num diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index a271e6b03..d060fe44c 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -269,7 +269,7 @@ class Posts: max_accepted_payout = :max_accepted_payout, percent_hbd = :percent_hbd, allow_votes = :allow_votes, - allow_curation_rewards = :allow_curation_rewards + allow_curation_rewards = :allow_curation_rewards, beneficiaries = :beneficiaries WHERE hp.author_id = (SELECT id FROM hive_accounts WHERE name = :author) AND diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 93a28b20b..8677fb67b 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -30,7 +30,7 @@ from hive.server.common.mutes import Mutes log = logging.getLogger(__name__) -CONTINUE_PROCESSING = 1 +CONTINUE_PROCESSING = True def print_ops_stats(prefix, ops_stats): log.info("############################################################################") @@ -52,20 +52,16 @@ def prepare_vops(vops_by_block): def _block_provider(node, queue, lbound, ubound, chunk_size): try: + num = 0 count = ubound - lbound log.info("[SYNC] start block %d, +%d to sync", lbound, count) timer = Timer(count, entity='block', laps=['rps', 'wps']) - num = 0 while CONTINUE_PROCESSING and lbound < ubound: to = min(lbound + chunk_size, ubound) - # log.info("Querying a node for blocks from range: [%d, %d]", lbound, to) timer.batch_start() blocks = node.get_blocks_range(lbound, to) lbound = to timer.batch_lap() - # if(queue.full()): - # log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize()) - queue.put(blocks) num = num + 1 return num @@ -77,24 +73,20 @@ def _block_provider(node, queue, lbound, ubound, chunk_size): def _vops_provider(node, queue, lbound, ubound, chunk_size): try: + num = 0 count = ubound - lbound log.info("[SYNC] start vops %d, +%d to sync", lbound, count) timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps']) - num = 0 - while CONTINUE_PROCESSING and lbound < ubound: + while CONTINUE_PROCESSING and lbound < ubound: to = min(lbound + chunk_size, ubound) - # log.info("Querying a node for vops from block range: [%d, %d]", lbound, to) timer.batch_start() vops = node.enum_virtual_ops(lbound, to) preparedVops = prepare_vops(vops) lbound = to timer.batch_lap() - # if(queue.full()): - # log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize()) queue.put(preparedVops) num = num + 1 - return num except KeyboardInterrupt: log.info("Caught SIGINT") @@ -103,28 +95,28 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size): log.exception("Exception caught during fetching vops...") def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): + num = 0 try: count = ubound - lbound - num = 0 timer = Timer(count, entity='block', laps=['rps', 'wps']) - total_ops_stats = {} - time_start = perf() - - while CONTINUE_PROCESSING and lbound < ubound: - if(blocksQueue.empty()): + while lbound < ubound: + if blocksQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any block to process...") + + blocks = [] + if not blocksQueue.empty() or CONTINUE_PROCESSING: + blocks = blocksQueue.get() - blocks = blocksQueue.get() - - if(vopsQueue.empty()): + if vopsQueue.empty() and CONTINUE_PROCESSING: log.info("Awaiting any vops to process...") - - preparedVops = vopsQueue.get() + + preparedVops = [] + if not vopsQueue.empty() or CONTINUE_PROCESSING: + preparedVops = vopsQueue.get() to = min(lbound + chunk_size, ubound) - # log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to) timer.batch_start() @@ -140,7 +132,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun prefix = ("[SYNC] Got block %d @ %s" % ( to - 1, blocks[-1]['timestamp'])) log.info(timer.batch_status(prefix)) - log.info("Time elapsed: %fs", time_current - time_start) + log.info("[SYNC] Time elapsed: %fs", time_current - time_start) total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats) @@ -154,16 +146,21 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun num = num + 1 + if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): + break + print_ops_stats("All operations present in the processed blocks:", total_ops_stats) return num except KeyboardInterrupt: log.info("Caught SIGINT") + except Exception: log.exception("Exception caught during processing blocks...") def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): blocksQueue = queue.Queue(maxsize=10) vopsQueue = queue.Queue(maxsize=10) + global CONTINUE_PROCESSING with ThreadPoolExecutor(max_workers = 4) as pool: try: @@ -172,12 +169,14 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) blockConsumerFuture.result() - # pool.shutdown() + if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): + pool.shutdown(False) except KeyboardInterrupt: - global CONTINUE_PROCESSING - CONTINUE_PROCESSING = 0 - pool.shutdown(False) - + log.info(""" ********************************************************** + CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES... + ********************************************************** + """) + CONTINUE_PROCESSING = False blocksQueue.join() vopsQueue.join() @@ -214,6 +213,8 @@ class Sync: if DbState.is_initial_sync(): # resume initial sync self.initial() + if not CONTINUE_PROCESSING: + return DbState.finish_initial_sync() else: @@ -257,6 +258,8 @@ class Sync: log.info("[INIT] *** Initial fast sync ***") self.from_checkpoints() self.from_steemd(is_initial_sync=True) + if not CONTINUE_PROCESSING: + return log.info("[INIT] *** Initial cache build ***") # CachedPost.recover_missing_posts(self._steem) diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index 2248e8e75..218c9f354 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -77,9 +77,8 @@ class Votes: @classmethod def flush(cls): - log.info("Inside Votes.flush") - cls.inside_flush = True """ Flush vote data from cache to database """ + cls.inside_flush = True if cls._votes_data: sql = """ INSERT INTO hive_votes @@ -128,7 +127,6 @@ class Votes: actual_query = sql.format(values_str) DB.query(actual_query) values.clear() - - cls._votes_data.clear() + + cls._votes_data.clear() cls.inside_flush = False - log.info("Exiting Votes.flush") diff --git a/hive/server/follow_api/__init__.py b/hive/server/follow_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/hive/server/follow_api/methods.py b/hive/server/follow_api/methods.py new file mode 100644 index 000000000..58e38914a --- /dev/null +++ b/hive/server/follow_api/methods.py @@ -0,0 +1,16 @@ +from hive.server.common.helpers import return_error_info + +@return_error_info +async def get_feed_entries(context, account: str, start_entry_id: int, limit: int): + """ Returns a list of entries in an account’s feed. """ + raise NotImplementedError() + +@return_error_info +async def get_feed(context, account: str, start_entry_id: int, limit: int): + """ Returns a list of items in an account’s feed. """ + raise NotImplementedError() + +@return_error_info +async def get_blog_authors(context, blog_account: str): + """ Returns a list of authors that have had their content reblogged on a given blog account. """ + raise NotImplementedError() diff --git a/hive/server/serve.py b/hive/server/serve.py index 9593dce1c..6b7ecad02 100644 --- a/hive/server/serve.py +++ b/hive/server/serve.py @@ -26,6 +26,9 @@ from hive.server.hive_api import community as hive_api_community from hive.server.hive_api import notify as hive_api_notify from hive.server.hive_api import stats as hive_api_stats +from hive.server.follow_api import methods as follow_api +from hive.server.tags_api import methods as tags_api + from hive.server.database_api import methods as database_api from hive.server.db import Db @@ -93,6 +96,9 @@ def build_methods(): 'follow_api.get_blog': condenser_api.get_blog, 'follow_api.get_blog_entries': condenser_api.get_blog_entries, 'follow_api.get_reblogged_by': condenser_api.get_reblogged_by, + 'follow_api.get_feed_entries': follow_api.get_feed_entries, + 'follow_api.get_feed': follow_api.get_feed, + 'follow_api.get_blog_authors': follow_api.get_blog_authors }) # tags_api aliases @@ -108,6 +114,12 @@ def build_methods(): 'tags_api.get_discussions_by_author_before_date': condenser_api.get_discussions_by_author_before_date, 'tags_api.get_post_discussions_by_payout': condenser_api.get_post_discussions_by_payout, 'tags_api.get_comment_discussions_by_payout': condenser_api.get_comment_discussions_by_payout, + 'tags_api.get_active_votes' : tags_api.get_active_votes, + 'tags_api.get_tags_used_by_author' : tags_api.get_tags_used_by_author, + 'tags_api.get_discussions_by_active' : tags_api.get_discussions_by_active, + 'tags_api.get_discussions_by_cashout' : tags_api.get_discussions_by_cashout, + 'tags_api.get_discussions_by_votes' : tags_api.get_discussions_by_votes, + 'tags_api.get_discussions_by_children' : tags_api.get_discussions_by_children }) # legacy `call` style adapter diff --git a/hive/server/tags_api/methods.py b/hive/server/tags_api/methods.py index 5024bcdcb..db846c5f7 100644 --- a/hive/server/tags_api/methods.py +++ b/hive/server/tags_api/methods.py @@ -1,5 +1,4 @@ from hive.server.common.helpers import ( - ApiError, return_error_info, valid_account, valid_permlink) @@ -9,4 +8,34 @@ async def get_active_votes(context, author: str, permlink: str): """ Returns all votes for the given post. """ valid_account(author) valid_permlink(permlink) - # TODO: body \ No newline at end of file + # TODO: body + raise NotImplementedError() + +@return_error_info +async def get_tags_used_by_author(context, author: str): + """ Returns a list of tags used by an author. """ + raise NotImplementedError() + +@return_error_info +async def get_discussions_by_active(context, tag: str, limit: int, filter_tags: list, + select_authors: list, select_tags: list, truncate_body: int): + """ Returns a list of discussions based on active. """ + raise NotImplementedError() + +@return_error_info +async def get_discussions_by_cashout(context, tag: str, limit: int, filter_tags: list, + select_authors: list, select_tags: list, truncate_body: int): + """ Returns a list of discussions by cashout. """ + raise NotImplementedError() + +@return_error_info +async def get_discussions_by_votes(context, tag: str, limit: int, filter_tags: list, + select_authors: list, select_tags: list, truncate_body: int): + """ Returns a list of discussions by votes. """ + raise NotImplementedError() + +@return_error_info +async def get_discussions_by_children(context, tag: str, limit: int, filter_tags: list, + select_authors: list, select_tags: list, truncate_body: int): + """ Returns a list of discussions by children. """ + raise NotImplementedError() -- GitLab