Skip to content
Snippets Groups Projects
Commit cd57e251 authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Preliminary implementation of MT flush.

parent 13afb0fd
No related branches found
No related tags found
1 merge request!121Prerequisuites to Reputation api support
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import logging import logging
import json import json
import concurrent
from hive.db.adapter import Db from hive.db.adapter import Db
...@@ -13,7 +14,7 @@ from hive.indexer.follow import Follow ...@@ -13,7 +14,7 @@ from hive.indexer.follow import Follow
from hive.indexer.votes import Votes from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from time import perf_counter from time import perf_counter
...@@ -22,6 +23,8 @@ from hive.utils.stats import FlushStatusManager as FSM ...@@ -22,6 +23,8 @@ from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.trends import update_hot_and_tranding_for_block_range from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.post_active import update_active_starting_from_posts_on_block from hive.utils.post_active import update_active_starting_from_posts_on_block
from concurrent.futures import ThreadPoolExecutor
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance() DB = Db.instance()
...@@ -30,13 +33,16 @@ class Blocks: ...@@ -30,13 +33,16 @@ class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table.""" """Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = [] blocks_to_flush = []
_head_block_date = None _head_block_date = None
_reputations = None
_current_block_date = None _current_block_date = None
def __init__(cls): _concurrent_flush = [
log.info("Creating a reputations processor") ('PostDataCache', PostDataCache.flush, PostDataCache),
log.info("Built blocks object: {}".format(cls)) ('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Tags', Tags.flush, Tags),
]
def __init__(cls):
head_date = cls.head_date() head_date = cls.head_date()
if(head_date == ''): if(head_date == ''):
cls._head_block_date = None cls._head_block_date = None
...@@ -45,11 +51,13 @@ class Blocks: ...@@ -45,11 +51,13 @@ class Blocks:
cls._head_block_date = head_date cls._head_block_date = head_date
cls._current_block_date = head_date cls._current_block_date = head_date
@classmethod @classmethod
def set_reputations_processor(cls, reputations_processor): def setup_db_access(self, sharedDbAdapter):
cls._reputations = reputations_processor PostDataCache.setup_db_access(sharedDbAdapter)
assert cls._reputations is not None, "Reputation object is None" Reputations.setup_db_access(sharedDbAdapter)
log.info("Built reputations object: {}".format(cls._reputations)) Votes.setup_db_access(sharedDbAdapter)
Tags.setup_db_access(sharedDbAdapter)
Follow.setup_db_access(sharedDbAdapter)
@classmethod @classmethod
def head_num(cls): def head_num(cls):
...@@ -74,7 +82,7 @@ class Blocks: ...@@ -74,7 +82,7 @@ class Blocks:
Tags.flush() Tags.flush()
Votes.flush() Votes.flush()
Posts.flush() Posts.flush()
cls._reputations.flush() Reputations.flush()
block_num = int(block['block_id'][:8], base=16) block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num ) cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter() time_end = perf_counter()
...@@ -86,8 +94,6 @@ class Blocks: ...@@ -86,8 +94,6 @@ class Blocks:
"""Batch-process blocks; wrapped in a transaction.""" """Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start() time_start = OPSM.start()
log.info("Blocks object: {}".format(cls))
DB.query("START TRANSACTION") DB.query("START TRANSACTION")
last_num = 0 last_num = 0
...@@ -113,13 +119,34 @@ class Blocks: ...@@ -113,13 +119,34 @@ class Blocks:
log.info("#############################################################################") log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush()) flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush()) # flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
# flush_time = register_time(flush_time, "Tags", Tags.flush())
# flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush()) # flush_time = register_time(flush_time, "Reputations", cls._flush_reputations())
flush_time = register_time(flush_time, "Reputations", cls._flush_reputations())
completedThreads = 0;
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
flush_futures = {pool.submit(f): (description, c) for (description, f, c) in cls._concurrent_flush}
for future in concurrent.futures.as_completed(flush_futures):
(description, c) = flush_futures[future]
completedThreads = completedThreads + 1
try:
n = future.result()
assert not c.tx_active()
if n > 0:
log.info('%r flush generated %d records' % (description, n))
except Exception as exc:
log.error('%r generated an exception: %s' % (description, exc))
raise exc
pool.shutdown()
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1): if (not is_initial_sync) and (first_block > -1):
cls.on_live_blocks_processed( first_block, last_num ) cls.on_live_blocks_processed( first_block, last_num )
...@@ -162,7 +189,7 @@ class Blocks: ...@@ -162,7 +189,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation': elif op_type == 'effective_comment_vote_operation':
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink']) key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink'])
cls._reputations.process_vote(block_num, op_value) Reputations.process_vote(block_num, op_value)
vote_ops[ key_vote ] = op_value vote_ops[ key_vote ] = op_value
...@@ -339,10 +366,6 @@ class Blocks: ...@@ -339,10 +366,6 @@ class Blocks:
'date': block['timestamp']}) 'date': block['timestamp']})
return num return num
@classmethod
def _flush_reputations(cls):
return cls._reputations.flush()
@classmethod @classmethod
def _flush_blocks(cls): def _flush_blocks(cls):
query = """ query = """
......
import logging
log = logging.getLogger(__name__)
class DbAdapterHolder(object):
db = None
_inside_tx = False
@classmethod
def setup_db_access(self, sharedDb):
self.db = sharedDb.clone()
@classmethod
def tx_active(self):
return self._inside_tx
@classmethod
def beginTx(self):
self.db.query("START TRANSACTION")
self._inside_tx = True
@classmethod
def commitTx(self):
self.db.query("COMMIT")
self._inside_tx = False
...@@ -9,9 +9,9 @@ from hive.db.db_state import DbState ...@@ -9,9 +9,9 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts from hive.indexer.accounts import Accounts
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
log = logging.getLogger(__name__) from hive.indexer.db_adapter_holder import DbAdapterHolder
DB = Db.instance() log = logging.getLogger(__name__)
FOLLOWERS = 'followers' FOLLOWERS = 'followers'
FOLLOWING = 'following' FOLLOWING = 'following'
...@@ -65,7 +65,7 @@ def _flip_dict(dict_to_flip): ...@@ -65,7 +65,7 @@ def _flip_dict(dict_to_flip):
flipped[value] = [key] flipped[value] = [key]
return flipped return flipped
class Follow: class Follow(DbAdapterHolder):
"""Handles processing of incoming follow ups and flushing to db.""" """Handles processing of incoming follow ups and flushing to db."""
follow_items_to_flush = dict() follow_items_to_flush = dict()
...@@ -99,7 +99,7 @@ class Follow: ...@@ -99,7 +99,7 @@ class Follow:
else: else:
old_state = cls._get_follow_db_state(op['flr'], op['flg']) old_state = cls._get_follow_db_state(op['flr'], op['flg'])
# insert or update state # insert or update state
DB.query(FOLLOW_ITEM_INSERT_QUERY, **op) cls.db.query(FOLLOW_ITEM_INSERT_QUERY, **op)
if new_state == 1: if new_state == 1:
Follow.follow(op['flr'], op['flg']) Follow.follow(op['flr'], op['flg'])
if old_state is None: if old_state is None:
...@@ -142,7 +142,7 @@ class Follow: ...@@ -142,7 +142,7 @@ class Follow:
sql = """SELECT state FROM hive_follows sql = """SELECT state FROM hive_follows
WHERE follower = :follower WHERE follower = :follower
AND following = :following""" AND following = :following"""
return DB.query_one(sql, follower=follower, following=following) return cls.db.query_one(sql, follower=follower, following=following)
# -- stat tracking -- # -- stat tracking --
...@@ -206,7 +206,7 @@ class Follow: ...@@ -206,7 +206,7 @@ class Follow:
else: else:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) cls.db.query(query)
values.clear() values.clear()
values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'], values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'], follow_item['at'], follow_item['state'],
...@@ -217,7 +217,7 @@ class Follow: ...@@ -217,7 +217,7 @@ class Follow:
if len(values) > 0: if len(values) > 0:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) cls.db.query(query)
cls.follow_items_to_flush.clear() cls.follow_items_to_flush.clear()
...@@ -239,7 +239,7 @@ class Follow: ...@@ -239,7 +239,7 @@ class Follow:
return 0 return 0
start = perf() start = perf()
DB.batch_queries(sqls, trx=trx) cls.db.batch_queries(sqls, trx=trx)
if trx: if trx:
log.info("[SYNC] flushed %d follow deltas in %ds", log.info("[SYNC] flushed %d follow deltas in %ds",
updated, perf() - start) updated, perf() - start)
...@@ -263,7 +263,7 @@ class Follow: ...@@ -263,7 +263,7 @@ class Follow:
following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id) following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id)
WHERE id IN :ids WHERE id IN :ids
""" """
DB.query(sql, ids=tuple(ids)) cls.db.query(sql, ids=tuple(ids))
@classmethod @classmethod
def force_recount(cls): def force_recount(cls):
...@@ -281,7 +281,7 @@ class Follow: ...@@ -281,7 +281,7 @@ class Follow:
LEFT JOIN hive_follows hf ON id = hf.following AND state = 1 LEFT JOIN hive_follows hf ON id = hf.following AND state = 1
GROUP BY id); GROUP BY id);
""" """
DB.query(sql) cls.db.query(sql)
log.info("[SYNC] update follower counts") log.info("[SYNC] update follower counts")
sql = """ sql = """
...@@ -291,4 +291,4 @@ class Follow: ...@@ -291,4 +291,4 @@ class Follow:
UPDATE hive_accounts SET following = num FROM following_counts UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num; WHERE id = account_id AND following != num;
""" """
DB.query(sql) cls.db.query(sql)
import logging import logging
import logging
from hive.utils.normalize import escape_characters from hive.utils.normalize import escape_characters
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance()
class PostDataCache(object): class PostDataCache(DbAdapterHolder):
""" Procides cache for DB operations on post data table in order to speed up initial sync """ """ Procides cache for DB operations on post data table in order to speed up initial sync """
_data = {} _data = {}
@classmethod @classmethod
def is_cached(cls, pid): def is_cached(cls, pid):
""" Check if data is cached """ """ Check if data is cached """
...@@ -28,7 +31,7 @@ class PostDataCache(object): ...@@ -28,7 +31,7 @@ class PostDataCache(object):
sql = """ sql = """
SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id; SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id;
""" """
row = DB.query_row(sql, post_id = pid) row = cls.db.query_row(sql, post_id = pid)
post_data = dict(row) post_data = dict(row)
return post_data['body'] return post_data['body']
...@@ -36,6 +39,7 @@ class PostDataCache(object): ...@@ -36,6 +39,7 @@ class PostDataCache(object):
def flush(cls, print_query = False): def flush(cls, print_query = False):
""" Flush data from cache to db """ """ Flush data from cache to db """
if cls._data: if cls._data:
cls.beginTx()
sql = """ sql = """
INSERT INTO INSERT INTO
hive_post_data (id, title, preview, img_url, body, json) hive_post_data (id, title, preview, img_url, body, json)
...@@ -66,7 +70,9 @@ class PostDataCache(object): ...@@ -66,7 +70,9 @@ class PostDataCache(object):
if(print_query): if(print_query):
log.info("Executing query:\n{}".format(sql)) log.info("Executing query:\n{}".format(sql))
DB.query(sql) cls.db.query(sql)
cls.commitTx()
n = len(cls._data.keys()) n = len(cls._data.keys())
cls._data.clear() cls._data.clear()
return n return n
""" Reputation update support """ """ Reputation update support """
import logging import logging
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
CACHED_ITEMS_LIMIT = 200 CACHED_ITEMS_LIMIT = 200
class Reputations: class Reputations(DbAdapterHolder):
_queries = [] _queries = []
_db = None
def __init__(self, database):
log.info("Cloning database...")
self._db = database.clone()
assert self._db is not None, "Database not cloned"
log.info("Database object at: {}".format(self._db))
@classmethod
def process_vote(self, block_num, effective_vote_op): def process_vote(self, block_num, effective_vote_op):
self._queries.append("\nSELECT process_reputation_data({}, '{}', '{}', '{}', {});".format(block_num, effective_vote_op['author'], effective_vote_op['permlink'], self._queries.append("\nSELECT process_reputation_data({}, '{}', '{}', '{}', {});".format(block_num, effective_vote_op['author'], effective_vote_op['permlink'],
effective_vote_op['voter'], effective_vote_op['rshares'])) effective_vote_op['voter'], effective_vote_op['rshares']))
@classmethod
def flush(self): def flush(self):
if not self._queries:
return 0
self.beginTx()
query = "" query = ""
i = 0 i = 0
items = 0 items = 0
...@@ -28,16 +30,18 @@ class Reputations: ...@@ -28,16 +30,18 @@ class Reputations:
i = i + 1 i = i + 1
items = items + 1 items = items + 1
if items >= CACHED_ITEMS_LIMIT: if items >= CACHED_ITEMS_LIMIT:
self._db.query_no_return(query) self.db.query_no_return(query)
query = "" query = ""
items = 0 items = 0
if items >= CACHED_ITEMS_LIMIT: if items >= CACHED_ITEMS_LIMIT:
self._db.query_no_return(query) self.db.query_no_return(query)
query = "" query = ""
items = 0 items = 0
n = len(self._queries) n = len(self._queries)
self._queries.clear() self._queries.clear()
self.commitTx()
return n return n
...@@ -91,7 +91,7 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size): ...@@ -91,7 +91,7 @@ def _vops_provider(node, queue, lbound, ubound, chunk_size):
except Exception: except Exception:
log.exception("Exception caught during fetching vops...") log.exception("Exception caught during fetching vops...")
def _block_consumer(node, blocksProcessor, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size): def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
from hive.utils.stats import minmax from hive.utils.stats import minmax
is_debug = log.isEnabledFor(10) is_debug = log.isEnabledFor(10)
num = 0 num = 0
...@@ -128,7 +128,7 @@ def _block_consumer(node, blocksProcessor, blocksQueue, vopsQueue, is_initial_sy ...@@ -128,7 +128,7 @@ def _block_consumer(node, blocksProcessor, blocksQueue, vopsQueue, is_initial_sy
timer.batch_start() timer.batch_start()
block_start = perf() block_start = perf()
blocksProcessor.process_multi(blocks, preparedVops, node, is_initial_sync) Blocks.process_multi(blocks, preparedVops, node, is_initial_sync)
block_end = perf() block_end = perf()
timer.batch_lap() timer.batch_lap()
...@@ -184,7 +184,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size): ...@@ -184,7 +184,7 @@ def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
try: try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size) pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size) pool.submit(_vops_provider, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, self._blocksProcessor, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size) blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result() blockConsumerFuture.result()
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty(): if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
...@@ -211,15 +211,13 @@ class Sync: ...@@ -211,15 +211,13 @@ class Sync:
log.info("Using hived url: `%s'", self._conf.get('steemd_url')) log.info("Using hived url: `%s'", self._conf.get('steemd_url'))
self._steem = conf.steem() self._steem = conf.steem()
self._blocksProcessor = None
def run(self): def run(self):
"""Initialize state; setup/recovery checks; sync and runloop.""" """Initialize state; setup/recovery checks; sync and runloop."""
# ensure db schema up to date, check app status # ensure db schema up to date, check app status
DbState.initialize() DbState.initialize()
Blocks.set_reputations_processor(Reputations(self._db)) Blocks.setup_db_access(self._db)
self._blocksProcessor = Blocks()
# prefetch id->name and id->rank memory maps # prefetch id->name and id->rank memory maps
Accounts.load_ids() Accounts.load_ids()
...@@ -235,16 +233,16 @@ class Sync: ...@@ -235,16 +233,16 @@ class Sync:
Community.recalc_pending_payouts() Community.recalc_pending_payouts()
if DbState.is_initial_sync(): if DbState.is_initial_sync():
last_imported_block = self._blocksProcessor.head_num() last_imported_block = Blocks.head_num()
# resume initial sync # resume initial sync
self.initial() self.initial()
if not CONTINUE_PROCESSING: if not CONTINUE_PROCESSING:
return return
current_imported_block = self._blocksProcessor.head_num() current_imported_block = Blocks.head_num()
DbState.finish_initial_sync(current_imported_block, last_imported_block) DbState.finish_initial_sync(current_imported_block, last_imported_block)
else: else:
# recover from fork # recover from fork
self._blocksProcessor.verify_head(self._steem) Blocks.verify_head(self._steem)
self._update_chain_state() self._update_chain_state()
...@@ -319,7 +317,7 @@ class Sync: ...@@ -319,7 +317,7 @@ class Sync:
"""Fast sync strategy: read/process blocks in batches.""" """Fast sync strategy: read/process blocks in batches."""
steemd = self._steem steemd = self._steem
lbound = self._blocksProcessor.head_num() + 1 lbound = Blocks.head_num() + 1
ubound = self._conf.get('test_max_block') or steemd.last_irreversible() ubound = self._conf.get('test_max_block') or steemd.last_irreversible()
count = ubound - lbound count = ubound - lbound
...@@ -344,7 +342,7 @@ class Sync: ...@@ -344,7 +342,7 @@ class Sync:
timer.batch_lap() timer.batch_lap()
# process blocks # process blocks
self._blocksProcessor.process_multi(blocks, preparedVops, steemd, is_initial_sync) Blocks.process_multi(blocks, preparedVops, steemd, is_initial_sync)
timer.batch_finish(len(blocks)) timer.batch_finish(len(blocks))
_prefix = ("[SYNC] Got block %d @ %s" % ( _prefix = ("[SYNC] Got block %d @ %s" % (
...@@ -366,13 +364,13 @@ class Sync: ...@@ -366,13 +364,13 @@ class Sync:
assert self._blocksProcessor assert self._blocksProcessor
steemd = self._steem steemd = self._steem
hive_head = self._blocksProcessor.head_num() hive_head = Blocks.head_num()
for block in steemd.stream_blocks(hive_head + 1, trail_blocks, max_gap): for block in steemd.stream_blocks(hive_head + 1, trail_blocks, max_gap):
start_time = perf() start_time = perf()
self._db.query("START TRANSACTION") self._db.query("START TRANSACTION")
num = self._blocksProcessor.process(block, {}, steemd) num = Blocks.process(block, {}, steemd)
follows = Follow.flush(trx=False) follows = Follow.flush(trx=False)
accts = Accounts.flush(steemd, trx=False, spread=8) accts = Accounts.flush(steemd, trx=False, spread=8)
self._db.query("COMMIT") self._db.query("COMMIT")
......
import logging import logging
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance()
from hive.utils.normalize import escape_characters from hive.utils.normalize import escape_characters
class Tags(object): class Tags(DbAdapterHolder):
""" Tags cache """ """ Tags cache """
_tags = [] _tags = []
...@@ -17,8 +17,9 @@ class Tags(object): ...@@ -17,8 +17,9 @@ class Tags(object):
@classmethod @classmethod
def flush(cls): def flush(cls):
""" Flush tags to table """ """ Flush tags to table """
if cls._tags: if cls._tags:
cls.beginTx()
limit = 1000 limit = 1000
sql = """ sql = """
...@@ -32,11 +33,11 @@ class Tags(object): ...@@ -32,11 +33,11 @@ class Tags(object):
values.append("({})".format(escape_characters(tag[1]))) values.append("({})".format(escape_characters(tag[1])))
if len(values) >= limit: if len(values) >= limit:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
if len(values) > 0: if len(values) > 0:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
sql = """ sql = """
...@@ -62,13 +63,13 @@ class Tags(object): ...@@ -62,13 +63,13 @@ class Tags(object):
values.append("({}, {})".format(tag[0], escape_characters(tag[1]))) values.append("({}, {})".format(tag[0], escape_characters(tag[1])))
if len(values) >= limit: if len(values) >= limit:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
if len(values) > 0: if len(values) > 0:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
cls.commitTx()
n = len(cls._tags) n = len(cls._tags)
cls._tags.clear() cls._tags.clear()
return n return n
...@@ -4,11 +4,11 @@ import logging ...@@ -4,11 +4,11 @@ import logging
from hive.db.db_state import DbState from hive.db.db_state import DbState
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance()
class Votes: class Votes(DbAdapterHolder):
""" Class for managing posts votes """ """ Class for managing posts votes """
_votes_data = {} _votes_data = {}
...@@ -62,9 +62,12 @@ class Votes: ...@@ -62,9 +62,12 @@ class Votes:
@classmethod @classmethod
def flush(cls): def flush(cls):
""" Flush vote data from cache to database """ """ Flush vote data from cache to database """
cls.inside_flush = True cls.inside_flush = True
n = 0 n = 0
if cls._votes_data: if cls._votes_data:
cls.beginTx()
sql = """ sql = """
INSERT INTO hive_votes INSERT INTO hive_votes
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective) (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective)
...@@ -105,16 +108,19 @@ class Votes: ...@@ -105,16 +108,19 @@ class Votes:
if len(values) >= values_limit: if len(values) >= values_limit:
values_str = ','.join(values) values_str = ','.join(values)
actual_query = sql.format(values_str) actual_query = sql.format(values_str)
DB.query(actual_query) cls.db.query(actual_query)
values.clear() values.clear()
if len(values) > 0: if len(values) > 0:
values_str = ','.join(values) values_str = ','.join(values)
actual_query = sql.format(values_str) actual_query = sql.format(values_str)
DB.query(actual_query) cls.db.query(actual_query)
values.clear() values.clear()
n = len(cls._votes_data) n = len(cls._votes_data)
cls._votes_data.clear() cls._votes_data.clear()
cls.commitTx()
cls.inside_flush = False cls.inside_flush = False
return n return n
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment