diff --git a/hive/conf.py b/hive/conf.py index 60bd4e682d0d4c32e827d999cfa0cf39521ef4c3..e5dd9a5bb6d13e2fac12185d11884fbc81e1f187 100644 --- a/hive/conf.py +++ b/hive/conf.py @@ -50,7 +50,6 @@ class Conf(): add('--log-level', env_var='LOG_LEVEL', default='INFO') add('--test-disable-sync', type=strtobool, env_var='TEST_DISABLE_SYNC', help='(debug) skip sync and sweep; jump to block streaming', default=False) add('--test-max-block', type=int, env_var='TEST_MAX_BLOCK', help='(debug) only sync to given block, for running sync test', default=None) - add('--exit-after-sync', help='exit when sync is completed', action='store_true') add('--test-profile', type=strtobool, env_var='TEST_PROFILE', help='(debug) profile execution', default=False) add('--log-virtual-op-calls', type=strtobool, env_var='LOG_VIRTUAL_OP_CALLS', help='(debug) log virtual op calls and responses', default=False) add('--mock-block-data-path', type=str, nargs='+', env_var='MOCK_BLOCK_DATA_PATH', help='(debug/testing) load additional data from block data file') diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 89050cace91b6fc89207cbbb41f5199c9cb91ca1..420f550fcd87140fb7d937a552a8b30540ec1553 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -4,7 +4,6 @@ import logging import concurrent from time import perf_counter from concurrent.futures import ThreadPoolExecutor - from hive.db.adapter import Db from hive.indexer.accounts import Accounts diff --git a/hive/indexer/mock_block_provider.py b/hive/indexer/mock_block_provider.py index 7b8adced9e8df98834a7ee871dd071f7744a4326..32a38b84bf71b07122faf6cd7253a12fbe02f23a 100644 --- a/hive/indexer/mock_block_provider.py +++ b/hive/indexer/mock_block_provider.py @@ -1,27 +1,25 @@ """ Data provider for test operations """ +import datetime +import dateutil.parser import logging -import os -from hive.indexer.mock_data_provider import MockDataProvider + +from hive.indexer.mock_data_provider import MockDataProvider, MockDataProviderException log = logging.getLogger(__name__) class MockBlockProvider(MockDataProvider): """ Data provider for test ops """ - @classmethod - def load_block_data(cls, data_path): - if os.path.isdir(data_path): - log.warning("Loading mock block data from directory: {}".format(data_path)) - cls.add_block_data_from_directory(data_path) - else: - log.warning("Loading mock block data from file: {}".format(data_path)) - cls.add_block_data_from_file(data_path) + + min_block = 0 + max_block = 0 + + last_real_block_num = 1 + last_real_block_time = dateutil.parser.isoparse("2016-03-24T16:05:00") @classmethod - def add_block_data_from_directory(cls, dir_name): - for name in os.listdir(dir_name): - file_path = os.path.join(dir_name, name) - if os.path.isfile(file_path) and file_path.endswith(".json"): - cls.add_block_data_from_file(file_path) + def set_last_real_block_num_date(cls, block_num, block_date): + cls.last_real_block_num = int(block_num) + cls.last_real_block_time = dateutil.parser.isoparse(block_date) @classmethod def add_block_data_from_file(cls, file_name): @@ -33,26 +31,67 @@ class MockBlockProvider(MockDataProvider): cls.add_block_data(block_num, block_content) @classmethod - def add_block_data(cls, block_num, block_content): + def add_block_data(cls, _block_num, block_content): + block_num = int(_block_num) + + if block_num > cls.max_block: + cls.max_block = block_num + if block_num < cls.min_block: + cls.min_block = block_num + + #log.info("Loading mock data for block {} with timestamp: {}".format(block_num, block_content['timestamp'])) + if block_num in cls.block_data: - assert 'transactions' in cls.block_data[str(block_num)] + assert 'transactions' in cls.block_data[block_num] assert 'transactions' in block_content - cls.block_data[str(block_num)]['transactions'] = cls.block_data[str(block_num)]['transactions'] + block_content['transactions'] + cls.block_data[block_num]['transactions'] = cls.block_data[block_num]['transactions'] + block_content['transactions'] else: - cls.block_data[str(block_num)] = block_content + cls.block_data[block_num] = dict(block_content) @classmethod - def get_block_data(cls, block_num, pop=False): - if pop: - return cls.block_data.pop(str(block_num), None) - return cls.block_data.get(str(block_num), None) + def get_block_data(cls, block_num, make_on_empty=False): + data = cls.block_data.get(block_num, None) + + #if data is not None: + #log.info("Block {} has timestamp: {}".format(block_num, data['timestamp'])) + + if make_on_empty and data is None: + data = cls.make_empty_block(block_num) + if data is None: + raise MockDataProviderException("No more blocks to serve") + return data @classmethod def get_max_block_number(cls): - block_numbers = [int(block) for block in cls.block_data] - block_numbers.append(0) - return max(block_numbers) + return cls.max_block + + @classmethod + def make_block_id(cls, block_num): + return "{:08x}00000000000000000000000000000000".format(block_num) + + @classmethod + def make_block_timestamp(cls, block_num): + block_delta = block_num - cls.last_real_block_num + time_delta = datetime.timedelta(days=0, seconds=block_delta*3, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0) + ret_time = cls.last_real_block_time + time_delta + return ret_time.replace(microsecond=0).isoformat() @classmethod - def get_blocks_greater_than(cls, block_num): - return sorted([int(block) for block in cls.block_data if int(block) >= block_num]) + def make_empty_block(cls, block_num, witness="initminer"): + fake_block = dict({ + "previous": cls.make_block_id(block_num - 1), + "timestamp": cls.make_block_timestamp(block_num), + "witness": witness, + "transaction_merkle_root": "0000000000000000000000000000000000000000", + "extensions": [], + "witness_signature": "", + "transactions": [], + "block_id": cls.make_block_id(block_num), + "signing_key": "", + "transaction_ids": [] + }) + # supply enough blocks to fill block queue with empty blocks only + # throw exception if there is no more data to serve + if cls.min_block < block_num < cls.max_block + 3: + return fake_block + return None diff --git a/hive/indexer/mock_data_provider.py b/hive/indexer/mock_data_provider.py index 4745003adf0d0c4b28eb4eaec8d9bd8ec6b07e9f..ddf768f946867d2dde70110b37453c31e61f6b7d 100644 --- a/hive/indexer/mock_data_provider.py +++ b/hive/indexer/mock_data_provider.py @@ -1,6 +1,14 @@ """ Data provider for test operations """ +import os +import logging + from json import dumps +log = logging.getLogger(__name__) + +class MockDataProviderException(Exception): + pass + class MockDataProvider(): """ Data provider for test operations """ block_data = {} @@ -8,3 +16,25 @@ class MockDataProvider(): @classmethod def print_data(cls): print(dumps(cls.block_data, indent=4, sort_keys=True)) + + @classmethod + def add_block_data_from_directory(cls, dir_name): + from fnmatch import fnmatch + pattern = "*.json" + for path, _, files in os.walk(dir_name): + for name in files: + if fnmatch(name, pattern): + cls.add_block_data_from_file(os.path.join(path, name)) + + @classmethod + def add_block_data_from_file(cls, file_name): + raise NotImplementedError("add_block_data_from_file is not implemented") + + @classmethod + def load_block_data(cls, data_path): + if os.path.isdir(data_path): + log.warning("Loading mock ops data from directory: {}".format(data_path)) + cls.add_block_data_from_directory(data_path) + else: + log.warning("Loading mock ops data from file: {}".format(data_path)) + cls.add_block_data_from_file(data_path) diff --git a/hive/indexer/mock_vops_provider.py b/hive/indexer/mock_vops_provider.py index 6716e4643de44068cb3d4ff0c46fedfe1d41a820..133f62985752ebbe30f9436226dadf5fb2eb3cfb 100644 --- a/hive/indexer/mock_vops_provider.py +++ b/hive/indexer/mock_vops_provider.py @@ -1,27 +1,12 @@ """ Data provider for test vops """ -import logging -import os from hive.indexer.mock_data_provider import MockDataProvider -log = logging.getLogger(__name__) - class MockVopsProvider(MockDataProvider): """ Data provider for test vops """ - @classmethod - def load_block_data(cls, data_path): - if os.path.isdir(data_path): - log.warning("Loading mock virtual ops data from directory: {}".format(data_path)) - cls.add_block_data_from_directory(data_path) - else: - log.warning("Loading mock virtual ops data from file: {}".format(data_path)) - cls.add_block_data_from_file(data_path) - - @classmethod - def add_block_data_from_directory(cls, dir_name): - for name in os.listdir(dir_name): - file_path = os.path.join(dir_name, name) - if os.path.isfile(file_path) and file_path.endswith(".json"): - cls.add_block_data_from_file(file_path) + block_data = { + 'ops' : {}, + 'ops_by_block' : {} + } @classmethod def add_block_data_from_file(cls, file_name): @@ -34,37 +19,57 @@ class MockVopsProvider(MockDataProvider): @classmethod def add_block_data(cls, data): if 'ops' in data: - if 'ops' in cls.block_data: - cls.block_data['ops'].extend(data['ops']) - else: - cls.block_data['ops'] = data['ops'] + for op in data['ops']: + if 'ops' in cls.block_data and op['block'] in cls.block_data['ops']: + cls.block_data['ops'][op['block']].append(op) + else: + cls.block_data['ops'][op['block']] = [op] if 'ops_by_block' in data: - if 'ops_by_block' not in cls.block_data: - cls.block_data['ops_by_block'] = [] - - for ops in data['ops_by_block']: - for obb_ops in cls.block_data['ops_by_block']: - if ops['block'] == obb_ops['block']: - obb_ops['ops'].extend(ops['ops']) + for ops in data['ops_by_block']: + if 'ops_by_block' in cls.block_data and ops['block'] in cls.block_data['ops_by_block']: + cls.block_data['ops_by_block'][ops['block']].extend(ops['ops']) + else: + cls.block_data['ops_by_block'][ops['block']] = ops @classmethod def get_block_data(cls, block_num): ret = {} - if 'ops' in cls.block_data: - for ops in cls.block_data['ops']: - if ops['block'] == block_num: - ret['timestamp'] = ops['timestamp'] - if 'ops' in ret: - ret['ops'].append(ops) - else: - ret['ops'] = [ops] - if 'ops_by_block' in cls.block_data: - for ops in cls.block_data['ops_by_block']: - if ops['block'] == block_num: - ret['timestamp'] = ops['timestamp'] - if 'ops_by_block' in ret: - ret['ops_by_block'].extend(ops['ops']) - else: - ret['ops_by_block'] = ops['ops'] + if 'ops' in cls.block_data and block_num in cls.block_data['ops']: + data = cls.block_data['ops'][block_num] + if data: + ret['timestamp'] = data[0]['timestamp'] + if 'ops' in ret: + ret['ops'].extend([op['op'] for op in data]) + else: + ret['ops'] = [op['op'] for op in data] + + if 'ops_by_block' in cls.block_data and block_num in cls.block_data['ops_by_block']: + data = cls.block_data['ops_by_block'][block_num] + if data: + ret['timestamp'] = data['timestamp'] + if 'ops_by_block' in ret: + ret['ops_by_block'].extend([ops['op'] for ops in data['ops']]) + else: + ret['ops_by_block'] = [ops['op'] for ops in data['ops']] return ret + + @classmethod + def add_mock_vops(cls, ret, from_block, end_block): + # dont do anyting when there is no block data + if not cls.block_data['ops_by_block'] and not cls.block_data['ops']: + return + for block_num in range(from_block, end_block): + mock_vops = cls.get_block_data(block_num) + if mock_vops: + if block_num in ret: + if 'ops_by_block' in mock_vops: + ret[block_num]['ops'].extend(mock_vops['ops_by_block']) + if 'ops' in mock_vops: + ret[block_num]['ops'].extend(mock_vops['ops']) + else: + if 'ops' in mock_vops: + ret[block_num] = {'timestamp':mock_vops['timestamp'], "ops" : mock_vops['ops']} + if 'ops_by_block' in mock_vops: + ret[block_num] = {'timestamp':mock_vops['timestamp'], "ops" : mock_vops['ops_by_block']} + diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index cbc1591c9047af2f980161f336663f77731a23f9..ed3f511aecd16e225083e853d41ad6bb832498d9 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -138,6 +138,8 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun time_start = OPSM.start() rate = {} + rate = minmax(rate, 0, 1.0, 0) + def print_summary(): stop = OPSM.stop(time_start) log.info("=== TOTAL STATS ===") @@ -269,6 +271,15 @@ class Sync: MockBlockProvider.load_block_data(mock_block_data_path) MockBlockProvider.print_data() + def refresh_sparse_stats(self): + # normally it should be refreshed in various time windows + # but we need the ability to do it all at the same time + self._update_chain_state() + update_communities_posts_and_rank() + with ThreadPoolExecutor(max_workers=2) as executor: + executor.submit(PayoutStats.generate) + executor.submit(Mentions.refresh) + def run(self): """Initialize state; setup/recovery checks; sync and runloop.""" from hive.version import VERSION, GIT_REVISION @@ -325,15 +336,23 @@ class Sync: self._update_chain_state() + trail_blocks = self._conf.get('trail_blocks') + assert trail_blocks >= 0 + assert trail_blocks <= 100 + + import sys + max_block_limit = sys.maxsize + do_stale_block_check = True if self._conf.get('test_max_block'): - # debug mode: partial sync - return self.from_steemd() - if self._conf.get("exit_after_sync"): - log.info("Exiting after sync on user request...") - return + max_block_limit = self._conf.get('test_max_block') + do_stale_block_check = False + # Correct max_block_limit by trail_blocks + max_block_limit = max_block_limit - trail_blocks + log.info("max_block_limit corrected by specified trail_blocks number: %d is: %d", trail_blocks, max_block_limit) + if self._conf.get('test_disable_sync'): # debug mode: no sync, just stream - return self.listen() + return self.listen(trail_blocks, max_block_limit, do_stale_block_check) while True: # sync up to irreversible block @@ -341,13 +360,25 @@ class Sync: if not can_continue_thread(): return + head = Blocks.head_num() + if head >= max_block_limit: + self.refresh_sparse_stats() + log.info("Exiting [LIVE SYNC] because irreversible block sync reached specified block limit: %d", max_block_limit) + break; + try: # listen for new blocks - self.listen() + self.listen(trail_blocks, max_block_limit, do_stale_block_check) except MicroForkException as e: # attempt to recover by restarting stream log.error("microfork: %s", repr(e)) + head = Blocks.head_num() + if head >= max_block_limit: + self.refresh_sparse_stats() + log.info("Exiting [LIVE SYNC] because of specified block limit: %d", max_block_limit) + break; + def initial(self): """Initial sync routine.""" assert DbState.is_initial_sync(), "already synced" @@ -362,7 +393,10 @@ class Sync: steemd = self._steem lbound = Blocks.head_num() + 1 - ubound = self._conf.get('test_max_block') or steemd.last_irreversible() + ubound = steemd.last_irreversible() + + if self._conf.get('test_max_block') and self._conf.get('test_max_block') < ubound: + ubound = self._conf.get('test_max_block') count = ubound - lbound if count < 1: @@ -401,11 +435,8 @@ class Sync: PC.broadcast(BroadcastObject('sync_current_block', to, 'blocks')) - def listen(self): + def listen(self, trail_blocks, max_sync_block, do_stale_block_check): """Live (block following) mode.""" - trail_blocks = self._conf.get('trail_blocks') - assert trail_blocks >= 0 - assert trail_blocks <= 100 # debug: no max gap if disable_sync in effect max_gap = None if self._conf.get('test_disable_sync') else 100 @@ -413,11 +444,20 @@ class Sync: steemd = self._steem hive_head = Blocks.head_num() - for block in steemd.stream_blocks(hive_head + 1, trail_blocks, max_gap): - start_time = perf() + log.info("[LIVE SYNC] Entering listen with HM head: %d", hive_head) + + if hive_head >= max_sync_block: + self.refresh_sparse_stats() + log.info("[LIVE SYNC] Exiting due to block limit exceeded: synced block number: %d, max_sync_block: %d", hive_head, max_sync_block) + return + + for block in steemd.stream_blocks(hive_head + 1, trail_blocks, max_gap, do_stale_block_check): num = int(block['block_id'][:8], base=16) - log.info("[LIVE SYNC] =====> About to process block %d", num) + log.info("[LIVE SYNC] =====> About to process block %d with timestamp %s", num, block['timestamp']) + + start_time = perf() + vops = steemd.enum_virtual_ops(self._conf, num, num + 1) prepared_vops = prepare_vops(vops) @@ -431,11 +471,10 @@ class Sync: ms, ' SLOW' if ms > 1000 else '') log.info("[LIVE SYNC] Current system time: %s", datetime.now().strftime("%H:%M:%S")) - if num % 1200 == 0: #1hr + if num % 1200 == 0: #1hour log.warning("head block %d @ %s", num, block['timestamp']) log.info("[LIVE SYNC] hourly stats") - if num % 1200 == 0: #1hour log.info("[LIVE SYNC] filling payout_stats_view executed") with ThreadPoolExecutor(max_workers=2) as executor: executor.submit(PayoutStats.generate) @@ -449,6 +488,10 @@ class Sync: FSM.next_blocks() OPSM.next_blocks() + if num >= max_sync_block: + log.info("Stopping [LIVE SYNC] because of specified block limit: %d", max_sync_block) + break + # refetch dynamic_global_properties, feed price, etc def _update_chain_state(self): """Update basic state props (head block, feed price) in db.""" diff --git a/hive/server/hive_api/community.py b/hive/server/hive_api/community.py index 08c2e770593e538fed20d3db4ee5d68eb76e6fe8..1faf7793d2a4cb6e091a1b01ad41d7c6e649ee9b 100644 --- a/hive/server/hive_api/community.py +++ b/hive/server/hive_api/community.py @@ -134,7 +134,7 @@ async def list_subscribers(context, community): AND hs.community_id = hr.community_id JOIN hive_accounts ha ON hs.account_id = ha.id WHERE hs.community_id = :cid - ORDER BY hs.created_at DESC + ORDER BY hs.created_at DESC, hs.id ASC LIMIT 250""" rows = await db.query_all(sql, cid=cid) return [(r['name'], ROLES[r['role_id'] or 0], r['title'], @@ -277,7 +277,7 @@ async def _community_team(db, community_id): JOIN hive_accounts a ON r.account_id = a.id WHERE r.community_id = :community_id AND r.role_id BETWEEN 4 AND 8 - ORDER BY r.role_id DESC""" + ORDER BY r.role_id DESC, r.account_id DESC""" rows = await db.query_all(sql, community_id=community_id) return [(r['name'], ROLES[r['role_id']], r['title']) for r in rows] diff --git a/hive/steem/block/schedule.py b/hive/steem/block/schedule.py index 1958232715702c87de916f21199bfe9e8244eac7..b7226d2743c28c9aa60b2a4fc21437d529b1f243 100644 --- a/hive/steem/block/schedule.py +++ b/hive/steem/block/schedule.py @@ -15,13 +15,14 @@ class BlockSchedule: BLOCK_INTERVAL = 3 - def __init__(self, current_head_block): + def __init__(self, current_head_block, do_stale_block_check): self._start_block = current_head_block self._head_num = current_head_block self._next_expected = time() + self.BLOCK_INTERVAL / 2 self._drift = self.BLOCK_INTERVAL / 2 self._missed = 0 self._last_date = None + self._do_stale_block_check = do_stale_block_check def wait_for_block(self, num): """Sleep until the requested block is expected to be available. @@ -70,7 +71,7 @@ class BlockSchedule: It's possible a steemd node could fall behind or stop syncing; we can identify this case by comparing current time to latest received block time.""" - if num == self._head_num: + if self._do_stale_block_check and num == self._head_num: gap = int(time() - utc_timestamp(date)) assert gap > -60, 'system clock is %ds behind chain' % gap if gap > 60: diff --git a/hive/steem/block/stream.py b/hive/steem/block/stream.py index 02bfe24a40d9b28695442574a23ebead4c5f6656..38f950543e89b08b2dcca4ad6fd0b139f72afdf5 100644 --- a/hive/steem/block/stream.py +++ b/hive/steem/block/stream.py @@ -55,10 +55,10 @@ class BlockStream: """ETA-based block streamer.""" @classmethod - def stream(cls, client, start_block, min_gap=0, max_gap=100): + def stream(cls, client, start_block, min_gap=0, max_gap=100, do_stale_block_check=True): """Instantiates a BlockStream and returns a generator.""" streamer = BlockStream(client, min_gap, max_gap) - return streamer.start(start_block) + return streamer.start(start_block, do_stale_block_check) def __init__(self, client, min_gap=0, max_gap=100): assert not (min_gap < 0 or min_gap > 100) @@ -70,7 +70,7 @@ class BlockStream: """Ensures gap between curr and head is within limits (max_gap).""" return not self._max_gap or head - curr < self._max_gap - def start(self, start_block): + def start(self, start_block, do_stale_block_check): """Stream blocks starting from `start_block`. Will run forever unless `max_gap` is specified and exceeded. @@ -80,11 +80,16 @@ class BlockStream: prev = self._client.get_block(curr - 1)['block_id'] queue = BlockQueue(self._min_gap, prev) - schedule = BlockSchedule(head) + + schedule = BlockSchedule(head, do_stale_block_check) while self._gap_ok(curr, head): head = schedule.wait_for_block(curr) - block = self._client.get_block(curr, strict=False) + block = self._client.get_block(curr) + + block_num = int(block['block_id'][:8], base=16) + #log.info("stream is processing a block %d with timestamp: %s", block_num, block['timestamp']) + schedule.check_block(curr, block) if not block: diff --git a/hive/steem/client.py b/hive/steem/client.py index cb48631cc0a9e973fcf63f52b88514ce3cc74c6c..932c8acbed7327a809d14f08fda8829bc1aefd91 100644 --- a/hive/steem/client.py +++ b/hive/steem/client.py @@ -1,5 +1,6 @@ """Tight and reliable steem API client for hive indexer.""" +from hive.indexer.mock_data_provider import MockDataProviderException import logging from time import perf_counter as perf @@ -53,7 +54,7 @@ class SteemClient: """Fetch multiple comment objects.""" raise NotImplementedError("get_content is not implemented in hived") - def get_block(self, num, strict=True): + def get_block(self, num): """Fetches a single block. If the result does not contain a `block` key, it's assumed @@ -62,24 +63,25 @@ class SteemClient: result = self.__exec('get_block', {'block_num': num}) if 'block' in result: ret = result['block'] - data = MockBlockProvider.get_block_data(num, True) + + #logger.info("Found real block %d with timestamp: %s", num, ret['timestamp']) + + MockBlockProvider.set_last_real_block_num_date(num, ret['timestamp']) + data = MockBlockProvider.get_block_data(num) if data is not None: ret["transactions"].extend(data["transactions"]) ret["transaction_ids"].extend(data["transaction_ids"]) return ret - elif strict: - raise Exception('block %d not available' % num) else: # if block does not exist in hived but exist in Mock Provider # return block from block provider - data = MockBlockProvider.get_block_data(num, True) - if data is not None: - return data - return None + mocked_block = MockBlockProvider.get_block_data(num, True) + #logger.info("Found real block %d with timestamp: %s", num, mocked_block['timestamp']) + return mocked_block - def stream_blocks(self, start_from, trail_blocks=0, max_gap=100): + def stream_blocks(self, start_from, trail_blocks=0, max_gap=100, do_stale_block_check=True): """Stream blocks. Returns a generator.""" - return BlockStream.stream(self, start_from, trail_blocks, max_gap) + return BlockStream.stream(self, start_from, trail_blocks, max_gap, do_stale_block_check) def _gdgp(self): ret = self.__exec('get_dynamic_global_properties') @@ -152,20 +154,22 @@ class SteemClient: blocks = {} batch_params = [{'block_num': i} for i in block_nums] + idx = 0 for result in self.__exec_batch('get_block', batch_params): + block_num = batch_params[idx]['block_num'] if 'block' in result: block = result['block'] num = int(block['block_id'][:8], base=16) + assert block_num == num, "Reference block number and block number from result does not match" blocks[num] = block - - for block_num in block_nums: - data = MockBlockProvider.get_block_data(block_num, True) - if data is not None: - if block_num in blocks: - blocks[block_num]["transactions"].extend(data["transactions"]) - blocks[block_num]["transaction_ids"].extend(data["transaction_ids"]) - else: - blocks[block_num] = data + MockBlockProvider.set_last_real_block_num_date(num, block['timestamp']) + data = MockBlockProvider.get_block_data(num) + if data is not None: + blocks[num]["transactions"].extend(data["transactions"]) + blocks[num]["transaction_ids"].extend(data["transaction_ids"]) + else: + blocks[block_num] = MockBlockProvider.get_block_data(block_num, True) + idx += 1 return [blocks[x] for x in block_nums] @@ -182,21 +186,7 @@ class SteemClient: def enum_virtual_ops(self, conf, begin_block, end_block): """ Get virtual ops for range of blocks """ - def add_mock_vops(ret, from_block, end_block): - for block_num in range(from_block, end_block): - mock_vops = MockVopsProvider.get_block_data(block_num) - if mock_vops: - if block_num in ret: - if 'ops_by_block' in mock_vops: - ret[block_num]['ops'].extend([op['op'] for op in mock_vops['ops_by_block'] if op['block'] == block_num]) - if 'ops' in mock_vops: - ret[block_num]['ops'].extend([op['op'] for op in mock_vops['ops'] if op['block'] == block_num]) - else: - if 'ops_by_block' in mock_vops: - ret[block_num] = {'timestamp':mock_vops['timestamp'], "ops" : [op['op'] for op in mock_vops['ops_by_block'] if op['block'] == block_num]} - if 'ops' in mock_vops: - ret[block_num] = {'timestamp':mock_vops['timestamp'], "ops" : [op['op'] for op in mock_vops['ops'] if op['block'] == block_num]} - + ret = {} from_block = begin_block @@ -241,18 +231,16 @@ class SteemClient: next_block = call_result['next_block_range_begin'] if next_block == 0: - add_mock_vops(ret, from_block, end_block) - return ret + break if next_block < begin_block: logger.error( "Next next block nr {} returned by enum_virtual_ops is smaller than begin block {}.".format( next_block, begin_block ) ) - add_mock_vops(ret, from_block, end_block) - return ret + break # Move to next block only if operations from current one have been processed completely. from_block = next_block - add_mock_vops(ret, begin_block, end_block) + MockVopsProvider.add_mock_vops(ret, begin_block, end_block) return ret diff --git a/mock_data/block_data/follow_op/mock_block_data_follow.json b/mock_data/block_data/follow_op/mock_block_data_follow.json index f7ac08d2d6d644e47623bf4c0c65ef4c2fdda5fb..37bebb093cb916bf88c1bf8f34e5a22494b6ad03 100644 --- a/mock_data/block_data/follow_op/mock_block_data_follow.json +++ b/mock_data/block_data/follow_op/mock_block_data_follow.json @@ -1196,30 +1196,6 @@ "signing_key": "", "transaction_ids": [] }, - "5000015": { - "previous": "004c4b4e00000000000000000000000000000000", - "timestamp": "2016-09-15T19:48:06", - "witness": "initminer", - "transaction_merkle_root": "0000000000000000000000000000000000000000", - "extensions": [], - "witness_signature": "", - "transactions": [], - "block_id": "004c4b4f00000000000000000000000000000000", - "signing_key": "", - "transaction_ids": [] - }, - "5000016": { - "previous": "004c4b4f00000000000000000000000000000000", - "timestamp": "2016-09-15T19:48:09", - "witness": "initminer", - "transaction_merkle_root": "0000000000000000000000000000000000000000", - "extensions": [], - "witness_signature": "", - "transactions": [], - "block_id": "004c4b5000000000000000000000000000000000", - "signing_key": "", - "transaction_ids": [] - }, "5000017": { "previous": "004c4b5000000000000000000000000000000000", "timestamp": "2016-09-15T19:48:12", diff --git a/mock_data/block_data/vops_tests/mock_block_data_vops_prepare.json b/mock_data/block_data/vops_tests/mock_block_data_vops_prepare.json new file mode 100644 index 0000000000000000000000000000000000000000..8d87c746735bcfc556f504d0df6527a523eea304 --- /dev/null +++ b/mock_data/block_data/vops_tests/mock_block_data_vops_prepare.json @@ -0,0 +1,34 @@ +{ + "5000014": { + "previous": "004c4b4d00000000000000000000000000000000", + "timestamp": "2016-09-15T19:48:03", + "witness": "initminer", + "transaction_merkle_root": "0000000000000000000000000000000000000000", + "extensions": [], + "witness_signature": "", + "transactions": [ + { + "ref_block_num": 100001, + "ref_block_prefix": 1, + "expiration": "2020-03-23T12:17:00", + "operations": [ + { + "type" : "comment_operation", + "value" : { + "author": "tester1", + "body": "Tester1 post is here. This post was added by mock block provider. Enjoy!", + "json_metadata": "{\"app\":\"mock-block-provider\"}", + "parent_author": "", + "parent_permlink": "", + "permlink": "tester1-post", + "title": "tester1-post" + } + } + ] + } + ], + "block_id": "004c4b4e00000000000000000000000000000000", + "signing_key": "", + "transaction_ids": [] + } +} \ No newline at end of file diff --git a/mock_data/examples/vops_data/mock_vops_data_example_003.json b/mock_data/examples/vops_data/mock_vops_data_example_003.json new file mode 100644 index 0000000000000000000000000000000000000000..639201f9f283169e91199ca0fb7a99bd67ba118d --- /dev/null +++ b/mock_data/examples/vops_data/mock_vops_data_example_003.json @@ -0,0 +1,34 @@ +{ + "ops": [], + "ops_by_block": [ + { + "block": 10532, + "irreversible": true, + "timestamp": "2016-03-25T01:01:24", + "ops": [ + { + "trx_id": "0000000000000000000000000000000000000000", + "block": 10532, + "trx_in_block": 4294967295, + "op_in_trx": 0, + "virtual_op": 1, + "timestamp": "2016-03-25T01:01:24", + "op": { + "type": "producer_reward_operation", + "value": { + "producer": "analisa", + "vesting_shares": { + "amount": "1000000", + "precision": 6, + "nai": "@@000000037" + } + } + }, + "operation_id": "9223372036854776508" + } + ] + } + ], + "next_block_range_begin": 10977, + "next_operation_begin": 0 +} \ No newline at end of file diff --git a/mock_data/vops_data/vops_tests/mock_vops_data_vops_tests.json b/mock_data/vops_data/vops_tests/mock_vops_data_vops_tests.json new file mode 100644 index 0000000000000000000000000000000000000000..5216effde6c4138cd0ce339f3cef5352b82f681c --- /dev/null +++ b/mock_data/vops_data/vops_tests/mock_vops_data_vops_tests.json @@ -0,0 +1,68 @@ +{ + "ops": [], + "ops_by_block": [ + { + "block": 5000015, + "irreversible": true, + "timestamp": "2016-09-15T19:48:06", + "ops": [ + { + "trx_id": "0000000000000000000000000000000000000000", + "block": 5000015, + "trx_in_block": 4294967295, + "op_in_trx": 0, + "virtual_op": 1, + "timestamp": "2016-09-15T19:48:06", + "op": { + "type": "author_reward_operation", + "value": { + "author": "tester1", + "permlink": "tester1-post", + + "hbd_payout": { + "amount": "123321", + "nai": "@@000000013", + "precision": 3 + }, + "hive_payout": { + "amount": "123321", + "nai": "@@000000021", + "precision": 3 + }, + "vesting_payout": { + "amount": "123321", + "nai": "@@000000037", + "precision": 6 + } + } + }, + "operation_id": "9223372036854776508" + }, + { + "trx_id": "0000000000000000000000000000000000000000", + "block": 5000015, + "trx_in_block": 4294967295, + "op_in_trx": 0, + "virtual_op": 1, + "timestamp": "2016-09-15T19:48:06", + "op": { + "type": "comment_reward_operation", + "value": { + "author": "tester1", + "permlink": "tester1-post", + + "payout": "123321", + "author_rewards": "123321", + "total_payout_value": "123321 HBD", + "curator_payout_value": "123321 HBD", + "beneficiary_payout_value": "123321" + } + }, + "operation_id": "9223372036854776508" + } + ] + } + ], + "next_block_range_begin": 1, + "next_operation_begin": 0 +} \ No newline at end of file diff --git a/scripts/ci/hive-sync.sh b/scripts/ci/hive-sync.sh index 5d9b9f08851752935385f753bc3d276cb9d47909..eadeab5aaceac4b3f5eb6183dd169b193d334f19 100755 --- a/scripts/ci/hive-sync.sh +++ b/scripts/ci/hive-sync.sh @@ -23,12 +23,13 @@ EOF --log-mask-sensitive-data \ --pid-file hive_sync.pid \ --test-max-block=${RUNNER_HIVEMIND_SYNC_MAX_BLOCK} \ - --exit-after-sync \ --test-profile=False \ --steemd-url "${RUNNER_HIVED_URL}" \ --prometheus-port 11011 \ --database-url "${DATABASE_URL}" \ - --mock-block-data-path mock_data/block_data/follow_op/mock_block_data_follow.json mock_data/block_data/community_op/mock_block_data_community.json mock_data/block_data/reblog_op/mock_block_data_reblog.json \ + --mock-block-data-path mock_data/block_data/follow_op/mock_block_data_follow.json \ + mock_data/block_data/community_op/mock_block_data_community.json \ + mock_data/block_data/reblog_op/mock_block_data_reblog.json \ --community-start-block 4999998 \ 2>&1 | tee -i hivemind-sync.log diff --git a/scripts/ci_sync.sh b/scripts/ci_sync.sh index c4aab0ab78131ef3ef59654f65b3f40f311a3cdd..0950cdd666794d4eef103d03b833c0c5c319d77d 100755 --- a/scripts/ci_sync.sh +++ b/scripts/ci_sync.sh @@ -58,5 +58,7 @@ fi echo Attempting to starting hive sync using hived node: $HIVEMIND_SOURCE_HIVED_URL . Max sync block is: $HIVEMIND_MAX_BLOCK echo Attempting to access database $DB_URL -./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --exit-after-sync --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --prometheus-port 11011 --database-url $DB_URL --mock-block-data-path mock_data/block_data/follow_op/mock_block_data_follow.json mock_data/block_data/community_op/mock_block_data_community.json mock_data/block_data/reblog_op/mock_block_data_reblog.json --community-start-block 4999998 2>&1 | tee -i hivemind-sync.log +./$HIVE_NAME sync --pid-file hive_sync.pid --test-max-block=$HIVEMIND_MAX_BLOCK --test-profile=False --steemd-url "$HIVEMIND_SOURCE_HIVED_URL" --prometheus-port 11011 \ + --database-url $DB_URL --mock-block-data-path mock_data/block_data/follow_op/mock_block_data_follow.json mock_data/block_data/community_op/mock_block_data_community.json mock_data/block_data/reblog_op/mock_block_data_reblog.json \ + --community-start-block 4999998 2>&1 | tee -i hivemind-sync.log rm hive_sync.pid diff --git a/tests/tests_api b/tests/tests_api index 9adc05954e347c3e2441e64c9e0e523d9f21361a..47a3e3580d78ebda3fd792ba25a13cb1c926201d 160000 --- a/tests/tests_api +++ b/tests/tests_api @@ -1 +1 @@ -Subproject commit 9adc05954e347c3e2441e64c9e0e523d9f21361a +Subproject commit 47a3e3580d78ebda3fd792ba25a13cb1c926201d