Skip to content
Snippets Groups Projects
Commit dc255c15 authored by Dariusz Kędzierski's avatar Dariusz Kędzierski
Browse files

Removed track_tx: it was causing missing COMMIT for START TRANSACTION error....

Removed track_tx: it was causing missing COMMIT for START TRANSACTION error. Improved keyboard interruption. Added missing APIs: with placeholders but it will be filled with proper methods. Other minor fixes.
parent 9e297b0f
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!16Dk issue 3 concurrent block query rebase,!15Dk issue 3 concurrent block query
......@@ -22,7 +22,6 @@ def run():
else:
launch_mode(mode, conf)
def launch_mode(mode, conf):
"""Launch a routine as indicated by `mode`."""
if mode == 'server':
......
......@@ -22,18 +22,14 @@ class Blocks:
blocks_to_flush = []
ops_stats = {}
operations_in_tx = 0
opened_tx = False
OPERATIONS_IN_TX_TRESHOLD = 500000
@staticmethod
def merge_ops_stats(od1, od2):
for (k, v) in od2.items():
if k in od1:
od1[k] += v
else:
od1[k] = v
if od2 is not None:
for k, v in od2.items():
if k in od1:
od1[k] += v
else:
od1[k] = v
return od1
......@@ -65,7 +61,7 @@ class Blocks:
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = perf_counter()
# DB.query("START TRANSACTION")
DB.query("START TRANSACTION")
last_num = 0
try:
......@@ -83,7 +79,7 @@ class Blocks:
cls._flush_blocks()
Follow.flush(trx=False)
# DB.query("COMMIT")
DB.query("COMMIT")
time_end = perf_counter()
log.info("[PROCESS MULTI] %i blocks in %fs", len(blocks), time_end - time_start)
......@@ -119,19 +115,6 @@ class Blocks:
return (vote_ops, comment_payout_ops)
@classmethod
def _track_tx(cls, opCount = 1):
if(cls.opened_tx == False):
DB.query("START TRANSACTION")
cls.operations_in_tx = 0
cls.opened_tx = True
cls.operations_in_tx += opCount
if(cls.operations_in_tx >= cls.OPERATIONS_IN_TX_TRESHOLD):
DB.query("COMMIT")
DB.query("START TRANSACTION")
cls.operations_in_tx = 0
@classmethod
def _process(cls, block, virtual_operations, hived, is_initial_sync=False):
......@@ -159,7 +142,6 @@ class Blocks:
elif op_type == 'create_claimed_account_operation':
account_names.add(op['new_account_name'])
cls._track_tx()
Accounts.register(account_names, date) # register any new names
# second scan will process all other ops
......@@ -176,8 +158,6 @@ class Blocks:
else:
cls.ops_stats[op_type] = 1
cls._track_tx()
# account metadata updates
if op_type == 'account_update_operation':
if not is_initial_sync:
......@@ -211,12 +191,10 @@ class Blocks:
if json_ops:
custom_ops_stats = CustomOp.process_ops(json_ops, num, date)
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, custom_ops_stats)
cls._track_tx(len(json_ops))
if update_comment_pending_payouts:
payout_ops_stat = Posts.update_comment_pending_payouts(hived, update_comment_pending_payouts)
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, payout_ops_stat)
cls._track_tx(len(update_comment_pending_payouts))
# virtual ops
comment_payout_ops = {}
......@@ -228,7 +206,7 @@ class Blocks:
(vote_ops, comment_payout_ops) = virtual_operations[num] if num in virtual_operations else empty_vops
else:
vops = hived.get_virtual_operations(num)
(vote_ops, comment_payout_ops) = prepare_vops(vops, date)
(vote_ops, comment_payout_ops) = Blocks.prepare_vops(vops, date)
for v in vote_ops:
Votes.vote_op(v, date)
......@@ -238,12 +216,10 @@ class Blocks:
else:
cls.ops_stats[op_type] = 1
cls._track_tx(len(vote_ops))
if comment_payout_ops:
comment_payout_stats = Posts.comment_payout_op(comment_payout_ops, date)
cls.ops_stats = Blocks.merge_ops_stats(cls.ops_stats, comment_payout_stats)
cls._track_tx(len(comment_payout_ops))
return num
......
......@@ -269,7 +269,7 @@ class Posts:
max_accepted_payout = :max_accepted_payout,
percent_hbd = :percent_hbd,
allow_votes = :allow_votes,
allow_curation_rewards = :allow_curation_rewards
allow_curation_rewards = :allow_curation_rewards,
beneficiaries = :beneficiaries
WHERE
hp.author_id = (SELECT id FROM hive_accounts WHERE name = :author) AND
......
......@@ -30,7 +30,7 @@ from hive.server.common.mutes import Mutes
log = logging.getLogger(__name__)
CONTINUE_PROCESSING = 1
CONTINUE_PROCESSING = True
def print_ops_stats(prefix, ops_stats):
log.info("############################################################################")
......@@ -52,20 +52,16 @@ def prepare_vops(vops_by_block):
def _block_provider(node, queue, lbound, ubound, chunk_size):
try:
num = 0
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
num = 0
while CONTINUE_PROCESSING and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for blocks from range: [%d, %d]", lbound, to)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
# if(queue.full()):
# log.info("Block queue is full - Enqueuing retrieved block-data for block range: [%d, %d] will block... Block queue size: %d", lbound, to, queue.qsize())
queue.put(blocks)
num = num + 1
return num
......@@ -77,24 +73,20 @@ def _block_provider(node, queue, lbound, ubound, chunk_size):
def _vops_provider(node, queue, lbound, ubound, chunk_size):
try:
num = 0
count = ubound - lbound
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
num = 0
while CONTINUE_PROCESSING and lbound < ubound:
while CONTINUE_PROCESSING and lbound < ubound:
to = min(lbound + chunk_size, ubound)
# log.info("Querying a node for vops from block range: [%d, %d]", lbound, to)
timer.batch_start()
vops = node.enum_virtual_ops(lbound, to)
preparedVops = prepare_vops(vops)
lbound = to
timer.batch_lap()
# if(queue.full()):
# log.info("Vops queue is full - Enqueuing retrieved vops for block range: [%d, %d] will block... Vops queue size: %d", lbound, to, queue.qsize())
queue.put(preparedVops)
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
......@@ -103,28 +95,28 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size):
log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
num = 0
try:
count = ubound - lbound
num = 0
timer = Timer(count, entity='block', laps=['rps', 'wps'])
total_ops_stats = {}
time_start = perf()
while CONTINUE_PROCESSING and lbound < ubound:
if(blocksQueue.empty()):
while lbound < ubound:
if blocksQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any block to process...")
blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get()
blocks = blocksQueue.get()
if(vopsQueue.empty()):
if vopsQueue.empty() and CONTINUE_PROCESSING:
log.info("Awaiting any vops to process...")
preparedVops = vopsQueue.get()
preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get()
to = min(lbound + chunk_size, ubound)
# log.info("Processing retrieved blocks and vops from range: [%d, %d].", lbound, to)
timer.batch_start()
......@@ -140,7 +132,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
prefix = ("[SYNC] Got block %d @ %s" % (
to - 1, blocks[-1]['timestamp']))
log.info(timer.batch_status(prefix))
log.info("Time elapsed: %fs", time_current - time_start)
log.info("[SYNC] Time elapsed: %fs", time_current - time_start)
total_ops_stats = Blocks.merge_ops_stats(total_ops_stats, ops_stats)
......@@ -154,16 +146,21 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
break
print_ops_stats("All operations present in the processed blocks:", total_ops_stats)
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during processing blocks...")
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
global CONTINUE_PROCESSING
with ThreadPoolExecutor(max_workers = 4) as pool:
try:
......@@ -172,12 +169,14 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
# pool.shutdown()
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
pool.shutdown(False)
except KeyboardInterrupt:
global CONTINUE_PROCESSING
CONTINUE_PROCESSING = 0
pool.shutdown(False)
log.info(""" **********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
CONTINUE_PROCESSING = False
blocksQueue.join()
vopsQueue.join()
......@@ -214,6 +213,8 @@ class Sync:
if DbState.is_initial_sync():
# resume initial sync
self.initial()
if not CONTINUE_PROCESSING:
return
DbState.finish_initial_sync()
else:
......@@ -257,6 +258,8 @@ class Sync:
log.info("[INIT] *** Initial fast sync ***")
self.from_checkpoints()
self.from_steemd(is_initial_sync=True)
if not CONTINUE_PROCESSING:
return
log.info("[INIT] *** Initial cache build ***")
# CachedPost.recover_missing_posts(self._steem)
......
......@@ -77,9 +77,8 @@ class Votes:
@classmethod
def flush(cls):
log.info("Inside Votes.flush")
cls.inside_flush = True
""" Flush vote data from cache to database """
cls.inside_flush = True
if cls._votes_data:
sql = """
INSERT INTO hive_votes
......@@ -128,7 +127,6 @@ class Votes:
actual_query = sql.format(values_str)
DB.query(actual_query)
values.clear()
cls._votes_data.clear()
cls._votes_data.clear()
cls.inside_flush = False
log.info("Exiting Votes.flush")
from hive.server.common.helpers import return_error_info
@return_error_info
async def get_feed_entries(context, account: str, start_entry_id: int, limit: int):
""" Returns a list of entries in an account’s feed. """
raise NotImplementedError()
@return_error_info
async def get_feed(context, account: str, start_entry_id: int, limit: int):
""" Returns a list of items in an account’s feed. """
raise NotImplementedError()
@return_error_info
async def get_blog_authors(context, blog_account: str):
""" Returns a list of authors that have had their content reblogged on a given blog account. """
raise NotImplementedError()
......@@ -26,6 +26,9 @@ from hive.server.hive_api import community as hive_api_community
from hive.server.hive_api import notify as hive_api_notify
from hive.server.hive_api import stats as hive_api_stats
from hive.server.follow_api import methods as follow_api
from hive.server.tags_api import methods as tags_api
from hive.server.database_api import methods as database_api
from hive.server.db import Db
......@@ -93,6 +96,9 @@ def build_methods():
'follow_api.get_blog': condenser_api.get_blog,
'follow_api.get_blog_entries': condenser_api.get_blog_entries,
'follow_api.get_reblogged_by': condenser_api.get_reblogged_by,
'follow_api.get_feed_entries': follow_api.get_feed_entries,
'follow_api.get_feed': follow_api.get_feed,
'follow_api.get_blog_authors': follow_api.get_blog_authors
})
# tags_api aliases
......@@ -108,6 +114,12 @@ def build_methods():
'tags_api.get_discussions_by_author_before_date': condenser_api.get_discussions_by_author_before_date,
'tags_api.get_post_discussions_by_payout': condenser_api.get_post_discussions_by_payout,
'tags_api.get_comment_discussions_by_payout': condenser_api.get_comment_discussions_by_payout,
'tags_api.get_active_votes' : tags_api.get_active_votes,
'tags_api.get_tags_used_by_author' : tags_api.get_tags_used_by_author,
'tags_api.get_discussions_by_active' : tags_api.get_discussions_by_active,
'tags_api.get_discussions_by_cashout' : tags_api.get_discussions_by_cashout,
'tags_api.get_discussions_by_votes' : tags_api.get_discussions_by_votes,
'tags_api.get_discussions_by_children' : tags_api.get_discussions_by_children
})
# legacy `call` style adapter
......
from hive.server.common.helpers import (
ApiError,
return_error_info,
valid_account,
valid_permlink)
......@@ -9,4 +8,34 @@ async def get_active_votes(context, author: str, permlink: str):
""" Returns all votes for the given post. """
valid_account(author)
valid_permlink(permlink)
# TODO: body
\ No newline at end of file
# TODO: body
raise NotImplementedError()
@return_error_info
async def get_tags_used_by_author(context, author: str):
""" Returns a list of tags used by an author. """
raise NotImplementedError()
@return_error_info
async def get_discussions_by_active(context, tag: str, limit: int, filter_tags: list,
select_authors: list, select_tags: list, truncate_body: int):
""" Returns a list of discussions based on active. """
raise NotImplementedError()
@return_error_info
async def get_discussions_by_cashout(context, tag: str, limit: int, filter_tags: list,
select_authors: list, select_tags: list, truncate_body: int):
""" Returns a list of discussions by cashout. """
raise NotImplementedError()
@return_error_info
async def get_discussions_by_votes(context, tag: str, limit: int, filter_tags: list,
select_authors: list, select_tags: list, truncate_body: int):
""" Returns a list of discussions by votes. """
raise NotImplementedError()
@return_error_info
async def get_discussions_by_children(context, tag: str, limit: int, filter_tags: list,
select_authors: list, select_tags: list, truncate_body: int):
""" Returns a list of discussions by children. """
raise NotImplementedError()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment