Skip to content
Snippets Groups Projects

Prerequisuites to Reputation api support

Merged Bartek Wrona requested to merge reputation_api_support into develop
8 files
+ 136
73
Compare changes
  • Side-by-side
  • Inline
Files
8
+ 46
23
@@ -2,6 +2,7 @@
@@ -2,6 +2,7 @@
import logging
import logging
import json
import json
 
import concurrent
from hive.db.adapter import Db
from hive.db.adapter import Db
@@ -13,7 +14,7 @@ from hive.indexer.follow import Follow
@@ -13,7 +14,7 @@ from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from time import perf_counter
from time import perf_counter
@@ -22,6 +23,8 @@ from hive.utils.stats import FlushStatusManager as FSM
@@ -22,6 +23,8 @@ from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.post_active import update_active_starting_from_posts_on_block
from hive.utils.post_active import update_active_starting_from_posts_on_block
 
from concurrent.futures import ThreadPoolExecutor
 
log = logging.getLogger(__name__)
log = logging.getLogger(__name__)
DB = Db.instance()
DB = Db.instance()
@@ -30,13 +33,16 @@ class Blocks:
@@ -30,13 +33,16 @@ class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
blocks_to_flush = []
_head_block_date = None
_head_block_date = None
_reputations = None
_current_block_date = None
_current_block_date = None
def __init__(cls):
_concurrent_flush = [
log.info("Creating a reputations processor")
('PostDataCache', PostDataCache.flush, PostDataCache),
log.info("Built blocks object: {}".format(cls))
('Reputations', Reputations.flush, Reputations),
 
('Votes', Votes.flush, Votes),
 
('Tags', Tags.flush, Tags),
 
]
 
def __init__(cls):
head_date = cls.head_date()
head_date = cls.head_date()
if(head_date == ''):
if(head_date == ''):
cls._head_block_date = None
cls._head_block_date = None
@@ -45,11 +51,13 @@ class Blocks:
@@ -45,11 +51,13 @@ class Blocks:
cls._head_block_date = head_date
cls._head_block_date = head_date
cls._current_block_date = head_date
cls._current_block_date = head_date
@classmethod
@classmethod
def set_reputations_processor(cls, reputations_processor):
def setup_db_access(self, sharedDbAdapter):
cls._reputations = reputations_processor
PostDataCache.setup_db_access(sharedDbAdapter)
assert cls._reputations is not None, "Reputation object is None"
Reputations.setup_db_access(sharedDbAdapter)
log.info("Built reputations object: {}".format(cls._reputations))
Votes.setup_db_access(sharedDbAdapter)
 
Tags.setup_db_access(sharedDbAdapter)
 
Follow.setup_db_access(sharedDbAdapter)
@classmethod
@classmethod
def head_num(cls):
def head_num(cls):
@@ -74,7 +82,7 @@ class Blocks:
@@ -74,7 +82,7 @@ class Blocks:
Tags.flush()
Tags.flush()
Votes.flush()
Votes.flush()
Posts.flush()
Posts.flush()
cls._reputations.flush()
Reputations.flush()
block_num = int(block['block_id'][:8], base=16)
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num )
cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter()
time_end = perf_counter()
@@ -86,8 +94,6 @@ class Blocks:
@@ -86,8 +94,6 @@ class Blocks:
"""Batch-process blocks; wrapped in a transaction."""
"""Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start()
time_start = OPSM.start()
log.info("Blocks object: {}".format(cls))
DB.query("START TRANSACTION")
DB.query("START TRANSACTION")
last_num = 0
last_num = 0
@@ -113,13 +119,34 @@ class Blocks:
@@ -113,13 +119,34 @@ class Blocks:
log.info("#############################################################################")
log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
# flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
 
# flush_time = register_time(flush_time, "Tags", Tags.flush())
 
# flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
# flush_time = register_time(flush_time, "Reputations", cls._flush_reputations())
flush_time = register_time(flush_time, "Reputations", cls._flush_reputations())
 
completedThreads = 0;
 
 
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
 
flush_futures = {pool.submit(f): (description, c) for (description, f, c) in cls._concurrent_flush}
 
for future in concurrent.futures.as_completed(flush_futures):
 
(description, c) = flush_futures[future]
 
completedThreads = completedThreads + 1
 
try:
 
n = future.result()
 
assert not c.tx_active()
 
 
if n > 0:
 
log.info('%r flush generated %d records' % (description, n))
 
except Exception as exc:
 
log.error('%r generated an exception: %s' % (description, exc))
 
raise exc
 
pool.shutdown()
 
 
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1):
if (not is_initial_sync) and (first_block > -1):
cls.on_live_blocks_processed( first_block, last_num )
cls.on_live_blocks_processed( first_block, last_num )
@@ -162,7 +189,7 @@ class Blocks:
@@ -162,7 +189,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation':
elif op_type == 'effective_comment_vote_operation':
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
cls._reputations.process_vote(block_num, op_value)
Reputations.process_vote(block_num, op_value)
vote_ops[ key_vote ] = op_value
vote_ops[ key_vote ] = op_value
@@ -339,10 +366,6 @@ class Blocks:
@@ -339,10 +366,6 @@ class Blocks:
'date': block['timestamp']})
'date': block['timestamp']})
return num
return num
@classmethod
def _flush_reputations(cls):
return cls._reputations.flush()
@classmethod
@classmethod
def _flush_blocks(cls):
def _flush_blocks(cls):
query = """
query = """
Loading