diff --git a/hive/db/adapter.py b/hive/db/adapter.py index bddf312530bdcc7696c76e94cf2ecdff9050d66f..c8cfe44b9c7a4e5219b295e7d13db0bce7be37c6 100644 --- a/hive/db/adapter.py +++ b/hive/db/adapter.py @@ -50,6 +50,9 @@ class Db: self._exec = self._conn.execute self._exec(sqlalchemy.text("COMMIT")) + def clone(self): + return Db(self._url) + def engine(self): """Lazy-loaded SQLAlchemy engine.""" if not self._engine: diff --git a/hive/db/db_state.py b/hive/db/db_state.py index 836d6c2f24d09eabb0fe15b71c2c0eb681ab343c..d146f724569a4c8e4a7f3e26ca325255d74a387c 100644 --- a/hive/db/db_state.py +++ b/hive/db/db_state.py @@ -8,7 +8,7 @@ from time import perf_counter import logging import sqlalchemy -from hive.db.schema import (setup, reset_autovac, build_metadata, +from hive.db.schema import (setup, reset_autovac, set_logged_table_attribute, build_metadata, build_metadata_community, teardown, DB_VERSION) from hive.db.adapter import Db @@ -91,15 +91,12 @@ class DbState: @classmethod def _disableable_indexes(cls): to_locate = [ - #'hive_posts_ix3', # (author, depth, id) - #'hive_posts_ix4', # (parent_id, id, counter_deleted=0) - #'hive_posts_ix5', # (community_id>0, is_pinned=1) 'hive_follows_ix5a', # (following, state, created_at, follower) 'hive_follows_ix5b', # (follower, state, created_at, following) -# 'hive_posts_parent_id_idx', + 'hive_posts_parent_id_idx', 'hive_posts_depth_idx', - + 'hive_posts_created_at_idx', 'hive_posts_root_id_id_idx', 'hive_posts_community_id_idx', @@ -115,20 +112,9 @@ class DbState: 'hive_votes_voter_id_idx', 'hive_votes_block_num_idx', - #'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0) - #'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0) - #'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0) - #'hive_posts_cache_ix7b', # (post_id, sc_hot, paidout=0) - #'hive_posts_cache_ix8', # (category, payout, depth, paidout=0) - #'hive_posts_cache_ix9a', # (depth, payout, post_id, paidout=0) - #'hive_posts_cache_ix9b', # (category, depth, payout, post_id, paidout=0) - #'hive_posts_cache_ix10', # (post_id, payout, gray=1, payout>0) - #'hive_posts_cache_ix30', # API: community trend - #'hive_posts_cache_ix31', # API: community hot - #'hive_posts_cache_ix32', # API: community created - #'hive_posts_cache_ix33', # API: community payout - #'hive_posts_cache_ix34', # API: community muted - 'hive_accounts_ix5' # (cached_at, name) + 'hive_accounts_ix5', # (cached_at, name) + + 'hive_post_tags_tag_id_idx' ] to_return = [] @@ -162,10 +148,12 @@ class DbState: except sqlalchemy.exc.ProgrammingError as ex: log.warning("Ignoring ex: {}".format(ex)) - from hive.db.schema import drop_fk, create_fk + from hive.db.schema import drop_fk, set_logged_table_attribute log.info("Dropping FKs") drop_fk(cls.db()) + set_logged_table_attribute(cls.db(), False) + log.info("[INIT] Finish pre-initial sync hooks") @classmethod @@ -243,8 +231,11 @@ class DbState: time_end = perf_counter() log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start) + + from hive.db.schema import create_fk, set_logged_table_attribute + set_logged_table_attribute(cls.db(), True) + log.info("Recreating FKs") - from hive.db.schema import create_fk create_fk(cls.db()) @staticmethod diff --git a/hive/db/schema.py b/hive/db/schema.py index 7d630aaf1f7792803230a425b665d93565e6b637..f9917be87bf62ef2a80e231ba3ed5e4dc0d7d7c5 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -8,6 +8,9 @@ from sqlalchemy.types import VARCHAR from sqlalchemy.types import TEXT from sqlalchemy.types import BOOLEAN +import logging +log = logging.getLogger(__name__) + #pylint: disable=line-too-long, too-many-lines, bad-whitespace # [DK] we changed and removed some tables so i upgraded DB_VERSION to 18 @@ -61,6 +64,23 @@ def build_metadata(): sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep ) + + sa.Table( + 'hive_reputation_data', metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('author_id', sa.Integer, nullable=False), + sa.Column('voter_id', sa.Integer, nullable=False), + sa.Column('permlink', sa.String(255, collation='C'), nullable=False), + sa.Column('rshares', sa.BigInteger, nullable=False), + sa.Column('block_num', sa.Integer, nullable=False), + +# sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), +# sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']), +# sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']), + + sa.UniqueConstraint('author_id', 'permlink', 'voter_id', name='hive_reputation_data_uk') + ) + sa.Table( 'hive_posts', metadata, sa.Column('id', sa.Integer, primary_key=True), @@ -206,9 +226,10 @@ def build_metadata(): sa.Column('post_id', sa.Integer, nullable=False), sa.Column('tag_id', sa.Integer, nullable=False), sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'), + sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_post_tags_fk1'), sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id'], name='hive_post_tags_fk2'), - sa.Index('hive_post_tags_post_id_idx', 'post_id'), + sa.Index('hive_post_tags_tag_id_idx', 'tag_id') ) @@ -1390,6 +1411,64 @@ def setup(db): """ db.query_no_return(sql) + sql = """ + DROP FUNCTION IF EXISTS process_reputation_data(in _block_num hive_blocks.num%TYPE, in _author hive_accounts.name%TYPE, + in _permlink hive_permlink_data.permlink%TYPE, in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE) + ; + + CREATE OR REPLACE FUNCTION process_reputation_data(in _block_num hive_blocks.num%TYPE, + in _author hive_accounts.name%TYPE, in _permlink hive_permlink_data.permlink%TYPE, + in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE) + RETURNS void + LANGUAGE sql + VOLATILE + AS $BODY$ + WITH __insert_info AS ( + INSERT INTO hive_reputation_data + (author_id, voter_id, permlink, block_num, rshares) + --- Warning DISTINCT is needed here since we have to strict join to hv table and there is really made a CROSS JOIN + --- between ha and hv records (producing 2 duplicated records) + SELECT DISTINCT ha.id as author_id, hv.id as voter_id, _permlink, _block_num, _rshares + FROM hive_accounts ha + JOIN hive_accounts hv ON hv.name = _voter + JOIN hive_posts hp ON hp.author_id = ha.id + JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id + WHERE hpd.permlink = _permlink + AND ha.name = _author + + AND NOT hp.is_paidout --- voting on paidout posts shall have no effect + AND hv.reputation >= 0 --- voter's negative reputation eliminates vote from processing + AND (_rshares >= 0 + OR (hv.reputation >= (ha.reputation - COALESCE((SELECT (hrd.rshares >> 6) -- if previous vote was a downvote we need to correct author reputation before current comparison to voter's reputation + FROM hive_reputation_data hrd + WHERE hrd.author_id = ha.id + AND hrd.voter_id=hv.id + AND hrd.permlink=_permlink + AND hrd.rshares < 0), 0))) + ) + ON CONFLICT ON CONSTRAINT hive_reputation_data_uk DO + UPDATE SET + rshares = EXCLUDED.rshares + RETURNING (xmax = 0) AS is_new_vote, + (SELECT hrd.rshares + FROM hive_reputation_data hrd + --- Warning we want OLD row here, not both, so we're using old ID to select old one (new record has different value) !!! + WHERE hrd.id = hive_reputation_data.id AND hrd.author_id = author_id and hrd.voter_id=voter_id and hrd.permlink=_permlink) AS old_rshares, author_id, voter_id + ) + UPDATE hive_accounts uha + SET reputation = CASE __insert_info.is_new_vote + WHEN true THEN ha.reputation + (_rshares >> 6) + ELSE ha.reputation - (__insert_info.old_rshares >> 6) + (_rshares >> 6) + END + FROM hive_accounts ha + JOIN __insert_info ON ha.id = __insert_info.author_id + WHERE uha.id = __insert_info.author_id + ; + $BODY$; + """ + + db.query_no_return(sql) + def reset_autovac(db): """Initializes/resets per-table autovacuum/autoanalyze params. @@ -1421,9 +1500,28 @@ def set_fillfactor(db): fillfactor_config = { 'hive_posts': 70, 'hive_post_data': 70, - 'hive_votes': 70 + 'hive_votes': 70, + 'hive_reputation_data': 50 } for table, fillfactor in fillfactor_config.items(): sql = """ALTER TABLE {} SET (FILLFACTOR = {})""" db.query(sql.format(table, fillfactor)) + +def set_logged_table_attribute(db, logged): + """Initializes/resets LOGGED/UNLOGGED attribute for tables which are intesively updated""" + + logged_config = [ + 'hive_accounts', + 'hive_permlink_data', + 'hive_post_tags', + 'hive_posts', + 'hive_post_data', + 'hive_votes', + 'hive_reputation_data' + ] + + for table in logged_config: + log.info("Setting {} attribute on a table: {}".format('LOGGED' if logged else 'UNLOGGED', table)) + sql = """ALTER TABLE {} SET {}""" + db.query_no_return(sql.format(table, 'LOGGED' if logged else 'UNLOGGED')) diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 1baa3794707805ac65e33bf4bc9e6005cf3b29c3..5e96e25df3d332d1d07bd1f32a90059776d06aa6 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -2,6 +2,7 @@ from hive.indexer.reblog import Reblog import logging +import concurrent from hive.db.adapter import Db @@ -13,6 +14,8 @@ from hive.indexer.follow import Follow from hive.indexer.votes import Votes from hive.indexer.post_data_cache import PostDataCache from hive.indexer.tags import Tags +from hive.indexer.reputations import Reputations +from hive.indexer.reblog import Reblog from time import perf_counter from hive.utils.stats import OPStatusManager as OPSM @@ -20,15 +23,38 @@ from hive.utils.stats import FlushStatusManager as FSM from hive.utils.trends import update_hot_and_tranding_for_block_range from hive.utils.post_active import update_active_starting_from_posts_on_block +from concurrent.futures import ThreadPoolExecutor + log = logging.getLogger(__name__) DB = Db.instance() +def time_collector(f): + startTime = FSM.start() + result = f() + elapsedTime = FSM.stop(startTime) + + return (result, elapsedTime) + +def follows_flush_helper(): + folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) + return folllow_items + class Blocks: """Processes blocks, dispatches work, manages `hive_blocks` table.""" blocks_to_flush = [] - _head_block_date = None # timestamp of last fully processed block ("previous block") - _current_block_date = None # timestamp of block currently being processes ("current block") + _head_block_date = None + _current_block_date = None + + _concurrent_flush = [ + ('Posts', Posts.flush, Posts), + ('PostDataCache', PostDataCache.flush, PostDataCache), + ('Reputations', Reputations.flush, Reputations), + ('Votes', Votes.flush, Votes), + ('Tags', Tags.flush, Tags), + ('Follow', follows_flush_helper, Follow), + ('Reblog', Reblog.flush, Reblog) + ] def __init__(cls): head_date = cls.head_date() @@ -39,6 +65,16 @@ class Blocks: cls._head_block_date = head_date cls._current_block_date = head_date + @classmethod + def setup_db_access(self, sharedDbAdapter): + PostDataCache.setup_db_access(sharedDbAdapter) + Reputations.setup_db_access(sharedDbAdapter) + Votes.setup_db_access(sharedDbAdapter) + Tags.setup_db_access(sharedDbAdapter) + Follow.setup_db_access(sharedDbAdapter) + Posts.setup_db_access(sharedDbAdapter) + Reblog.setup_db_access(sharedDbAdapter) + @classmethod def head_num(cls): """Get hive's head block number.""" @@ -64,6 +100,8 @@ class Blocks: Posts.flush() Reblog.flush() Follow.flush(trx=False) + Reputations.flush() + block_num = int(block['block_id'][:8], base=16) cls.on_live_blocks_processed( block_num, block_num ) time_end = perf_counter() @@ -74,6 +112,7 @@ class Blocks: def process_multi(cls, blocks, vops, hived, is_initial_sync=False): """Batch-process blocks; wrapped in a transaction.""" time_start = OPSM.start() + DB.query("START TRANSACTION") last_num = 0 @@ -99,18 +138,36 @@ class Blocks: log.info("#############################################################################") flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) - flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush()) - flush_time = register_time(flush_time, "Tags", Tags.flush()) - flush_time = register_time(flush_time, "Votes", Votes.flush()) - folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) - flush_time = register_time(flush_time, "Follow", folllow_items) - flush_time = register_time(flush_time, "Posts", Posts.flush()) - flush_time = register_time(flush_time, "Reblog", Reblog.flush()) + + DB.query("COMMIT") + + completedThreads = 0; + + pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush)) + flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush} + for future in concurrent.futures.as_completed(flush_futures): + (description, c) = flush_futures[future] + completedThreads = completedThreads + 1 + try: + (n, elapsedTime) = future.result() + assert n is not None + assert not c.tx_active() + + FSM.flush_stat(description, elapsedTime, n) + +# if n > 0: +# log.info('%r flush generated %d records' % (description, n)) + except Exception as exc: + log.error('%r generated an exception: %s' % (description, exc)) + raise exc + pool.shutdown() + + assert completedThreads == len(cls._concurrent_flush) if (not is_initial_sync) and (first_block > -1): + DB.query("START TRANSACTION") cls.on_live_blocks_processed( first_block, last_num ) - - DB.query("COMMIT") + DB.query("COMMIT") log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") @@ -145,6 +202,7 @@ class Blocks: elif op_type == 'effective_comment_vote_operation': Votes.effective_comment_vote_op( op_value ) + Reputations.process_vote(block_num, op_value) if key not in comment_payout_ops: comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None } diff --git a/hive/indexer/db_adapter_holder.py b/hive/indexer/db_adapter_holder.py new file mode 100644 index 0000000000000000000000000000000000000000..db720051e2e11f1a634b14eccd24ea7eea312086 --- /dev/null +++ b/hive/indexer/db_adapter_holder.py @@ -0,0 +1,25 @@ +import logging +log = logging.getLogger(__name__) + +class DbAdapterHolder(object): + db = None + + _inside_tx = False + + @classmethod + def setup_db_access(self, sharedDb): + self.db = sharedDb.clone() + + @classmethod + def tx_active(self): + return self._inside_tx + + @classmethod + def beginTx(self): + self.db.query("START TRANSACTION") + self._inside_tx = True + + @classmethod + def commitTx(self): + self.db.query("COMMIT") + self._inside_tx = False diff --git a/hive/indexer/follow.py b/hive/indexer/follow.py index 1734b3eafd64eb203fb3e462e65db3a5c669319a..fa494a9b7e1adc8d1b93e11142c5900903fb72a2 100644 --- a/hive/indexer/follow.py +++ b/hive/indexer/follow.py @@ -9,9 +9,9 @@ from hive.db.db_state import DbState from hive.indexer.accounts import Accounts from hive.indexer.notify import Notify -log = logging.getLogger(__name__) +from hive.indexer.db_adapter_holder import DbAdapterHolder -DB = Db.instance() +log = logging.getLogger(__name__) FOLLOWERS = 'followers' FOLLOWING = 'following' @@ -66,7 +66,7 @@ def _flip_dict(dict_to_flip): flipped[value] = [key] return flipped -class Follow: +class Follow(DbAdapterHolder): """Handles processing of incoming follow ups and flushing to db.""" follow_items_to_flush = dict() @@ -102,7 +102,7 @@ class Follow: else: old_state = cls._get_follow_db_state(op['flr'], op['flg']) # insert or update state - DB.query(FOLLOW_ITEM_INSERT_QUERY, **op) + cls.db.query(FOLLOW_ITEM_INSERT_QUERY, **op) if new_state == 1: Follow.follow(op['flr'], op['flg']) if old_state is None: @@ -145,7 +145,7 @@ class Follow: sql = """SELECT state FROM hive_follows WHERE follower = :follower AND following = :following""" - return DB.query_one(sql, follower=follower, following=following) + return cls.db.query_one(sql, follower=follower, following=following) # -- stat tracking -- @@ -210,7 +210,7 @@ class Follow: else: query = sql_prefix + ",".join(values) query += sql_postfix - DB.query(query) + cls.db.query(query) values.clear() values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'], follow_item['at'], follow_item['state'], @@ -222,7 +222,7 @@ class Follow: if len(values) > 0: query = sql_prefix + ",".join(values) query += sql_postfix - DB.query(query) + cls.db.query(query) cls.follow_items_to_flush.clear() @@ -244,7 +244,7 @@ class Follow: return 0 start = perf() - DB.batch_queries(sqls, trx=trx) + cls.db.batch_queries(sqls, trx=trx) if trx: log.info("[SYNC] flushed %d follow deltas in %ds", updated, perf() - start) @@ -268,7 +268,7 @@ class Follow: following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id) WHERE id IN :ids """ - DB.query(sql, ids=tuple(ids)) + cls.db.query(sql, ids=tuple(ids)) @classmethod def force_recount(cls): @@ -286,7 +286,7 @@ class Follow: LEFT JOIN hive_follows hf ON id = hf.following AND state = 1 GROUP BY id); """ - DB.query(sql) + cls.db.query(sql) log.info("[SYNC] update follower counts") sql = """ @@ -296,4 +296,4 @@ class Follow: UPDATE hive_accounts SET following = num FROM following_counts WHERE id = account_id AND following != num; """ - DB.query(sql) + cls.db.query(sql) diff --git a/hive/indexer/post_data_cache.py b/hive/indexer/post_data_cache.py index 3b90035b0771670ed3dac0476774690b27eac42f..83ea68ad382145de8eb54acf59256ca9c879f15f 100644 --- a/hive/indexer/post_data_cache.py +++ b/hive/indexer/post_data_cache.py @@ -1,14 +1,17 @@ import logging +import logging from hive.utils.normalize import escape_characters from hive.db.adapter import Db +from hive.indexer.db_adapter_holder import DbAdapterHolder + log = logging.getLogger(__name__) -DB = Db.instance() -class PostDataCache(object): +class PostDataCache(DbAdapterHolder): """ Procides cache for DB operations on post data table in order to speed up initial sync """ _data = {} + @classmethod def is_cached(cls, pid): """ Check if data is cached """ @@ -35,7 +38,7 @@ class PostDataCache(object): sql = """ SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id; """ - row = DB.query_row(sql, post_id = pid) + row = cls.db.query_row(sql, post_id = pid) post_data = dict(row) return post_data['body'] @@ -45,6 +48,13 @@ class PostDataCache(object): if cls._data: values_insert = [] values_update = [] + cls.beginTx() + sql = """ + INSERT INTO + hive_post_data (id, title, preview, img_url, body, json) + VALUES + """ + values = [] for k, data in cls._data.items(): title = 'NULL' if data['title'] is None else "{}".format(escape_characters(data['title'])) body = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body'])) @@ -66,7 +76,7 @@ class PostDataCache(object): sql += ','.join(values_insert) if print_query: log.info("Executing query:\n{}".format(sql)) - DB.query(sql) + cls.db.query(sql) if values_update: sql = """ @@ -88,7 +98,9 @@ class PostDataCache(object): """ if print_query: log.info("Executing query:\n{}".format(sql)) - DB.query(sql) + cls.db.query(sql) + + cls.commitTx() n = len(cls._data.keys()) cls._data.clear() diff --git a/hive/indexer/posts.py b/hive/indexer/posts.py index f2560d54a90fb4f28cf31fdd4bd5c23e5e755082..f85f45b8fd46e9f978821a6b788a4d1b40ed051a 100644 --- a/hive/indexer/posts.py +++ b/hive/indexer/posts.py @@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE from hive.indexer.notify import Notify from hive.indexer.post_data_cache import PostDataCache from hive.indexer.tags import Tags +from hive.indexer.db_adapter_holder import DbAdapterHolder + from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive, safe_img_url log = logging.getLogger(__name__) DB = Db.instance() -class Posts: +class Posts(DbAdapterHolder): """Handles critical/core post ops and data.""" # LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries) @@ -224,9 +226,13 @@ class Posts: yield lst[i:i + n] for chunk in chunks(cls._comment_payout_ops, 1000): + cls.beginTx() + values_str = ','.join(chunk) actual_query = sql.format(values_str) - DB.query(actual_query) + cls.db.query(actual_query) + + cls.commitTx() n = len(cls._comment_payout_ops) cls._comment_payout_ops.clear() diff --git a/hive/indexer/reblog.py b/hive/indexer/reblog.py index f5957b37e43861e2e204e043809ef71331d80b66..1b3788b4212913841006e2bfd966c342ff399d72 100644 --- a/hive/indexer/reblog.py +++ b/hive/indexer/reblog.py @@ -8,9 +8,7 @@ from hive.db.db_state import DbState from hive.indexer.accounts import Accounts from hive.indexer.feed_cache import FeedCache from hive.indexer.notify import Notify - - -DB = Db.instance() +from hive.indexer.db_adapter_holder import DbAdapterHolder log = logging.getLogger(__name__) @@ -42,7 +40,7 @@ INSERT_SQL = """ RETURNING post_id """ -class Reblog(): +class Reblog(DbAdapterHolder): """ Class for reblog operations """ reblog_items_to_flush = [] @@ -63,7 +61,7 @@ class Reblog(): return if 'delete' in op_json and op_json['delete'] == 'delete': - row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink) + row = cls.db.query_row(DELETE_SQL, a=blogger, permlink=permlink) if row is None: log.debug("reblog: post not found: %s/%s", author, permlink) return @@ -72,12 +70,12 @@ class Reblog(): FeedCache.delete(result['post_id'], result['account_id']) else: if DbState.is_initial_sync(): - row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) + row = cls.db.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) if row is not None: result = dict(row) cls.reblog_items_to_flush.append(result) else: - row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) + row = cls.db.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) if row is not None: author_id = Accounts.get_id(author) blogger_id = Accounts.get_id(blogger) @@ -111,7 +109,7 @@ class Reblog(): else: query = sql_prefix + ",".join(values) query += sql_postfix - DB.query(query) + cls.db.query(query) values.clear() values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"])) count = 1 @@ -119,6 +117,6 @@ class Reblog(): if len(values) > 0: query = sql_prefix + ",".join(values) query += sql_postfix - DB.query(query) + cls.db.query(query) cls.reblog_items_to_flush.clear() return item_count \ No newline at end of file diff --git a/hive/indexer/reputations.py b/hive/indexer/reputations.py new file mode 100644 index 0000000000000000000000000000000000000000..e14a5484f90fead6fbd3d49c41a3722a469844b4 --- /dev/null +++ b/hive/indexer/reputations.py @@ -0,0 +1,48 @@ +""" Reputation update support """ + +import logging +from hive.indexer.db_adapter_holder import DbAdapterHolder + +log = logging.getLogger(__name__) + +CACHED_ITEMS_LIMIT = 200 + +class Reputations(DbAdapterHolder): + _queries = [] + + @classmethod + def process_vote(self, block_num, effective_vote_op): + return + self._queries.append("\nSELECT process_reputation_data({}, '{}', '{}', '{}', {});".format(block_num, effective_vote_op['author'], effective_vote_op['permlink'], + effective_vote_op['voter'], effective_vote_op['rshares'])) + + @classmethod + def flush(self): + if not self._queries: + return 0 + + self.beginTx() + + query = "" + i = 0 + items = 0 + for s in self._queries: + query = query + str(self._queries[i]) + ";\n" + i = i + 1 + items = items + 1 + if items >= CACHED_ITEMS_LIMIT: + self.db.query_no_return(query) + query = "" + items = 0 + + if items >= CACHED_ITEMS_LIMIT: + self.db.query_no_return(query) + query = "" + items = 0 + + n = len(self._queries) + self._queries.clear() + + self.commitTx() + return n + diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index 8635f05a298263ee4c30a429c88d3b7f15a5b7f7..6f82908ab37ff0684d3b2f38a93ef113d1835848 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -1,6 +1,5 @@ """Hive sync manager.""" -from hive.indexer.reblog import Reblog import logging import glob from time import perf_counter as perf @@ -25,6 +24,9 @@ from hive.indexer.accounts import Accounts from hive.indexer.feed_cache import FeedCache from hive.indexer.follow import Follow from hive.indexer.community import Community +from hive.indexer.reblog import Reblog +from hive.indexer.reputations import Reputations + from hive.server.common.mutes import Mutes from hive.utils.stats import OPStatusManager as OPSM @@ -99,6 +101,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun try: count = ubound - lbound timer = Timer(count, entity='block', laps=['rps', 'wps']) + while lbound < ubound: wait_time_1 = WSM.start() @@ -221,6 +224,7 @@ class Sync: # ensure db schema up to date, check app status DbState.initialize() + Blocks.setup_db_access(self._db) # prefetch id->name and id->rank memory maps Accounts.load_ids() @@ -315,13 +319,14 @@ class Sync: remaining = drop(skip_lines, f) for lines in partition_all(chunk_size, remaining): raise RuntimeError("Sync from checkpoint disabled") - Blocks.process_multi(map(json.loads, lines), True) +# Blocks.process_multi(map(json.loads, lines), True) last_block = num last_read = num def from_steemd(self, is_initial_sync=False, chunk_size=1000): """Fast sync strategy: read/process blocks in batches.""" steemd = self._steem + lbound = Blocks.head_num() + 1 ubound = self._conf.get('test_max_block') or steemd.last_irreversible() @@ -367,6 +372,7 @@ class Sync: # debug: no max gap if disable_sync in effect max_gap = None if self._conf.get('test_disable_sync') else 100 + assert self._blocksProcessor steemd = self._steem hive_head = Blocks.head_num() diff --git a/hive/indexer/tags.py b/hive/indexer/tags.py index 44d942395456ee39d48f8236b095f8f379ce0c05..1179c3c50029b8d620ffb1254f075e37a790b928 100644 --- a/hive/indexer/tags.py +++ b/hive/indexer/tags.py @@ -1,12 +1,12 @@ import logging from hive.db.adapter import Db +from hive.indexer.db_adapter_holder import DbAdapterHolder log = logging.getLogger(__name__) -DB = Db.instance() from hive.utils.normalize import escape_characters -class Tags(object): +class Tags(DbAdapterHolder): """ Tags cache """ _tags = [] @@ -17,8 +17,9 @@ class Tags(object): @classmethod def flush(cls): - """ Flush tags to table """ + """ Flush tags to table """ if cls._tags: + cls.beginTx() limit = 1000 sql = """ @@ -32,11 +33,11 @@ class Tags(object): values.append("({})".format(escape_characters(tag[1]))) if len(values) >= limit: tag_query = str(sql) - DB.query(tag_query.format(','.join(values))) + cls.db.query(tag_query.format(','.join(values))) values.clear() if len(values) > 0: tag_query = str(sql) - DB.query(tag_query.format(','.join(values))) + cls.db.query(tag_query.format(','.join(values))) values.clear() sql = """ @@ -62,13 +63,13 @@ class Tags(object): values.append("({}, {})".format(tag[0], escape_characters(tag[1]))) if len(values) >= limit: tag_query = str(sql) - DB.query(tag_query.format(','.join(values))) + cls.db.query(tag_query.format(','.join(values))) values.clear() if len(values) > 0: tag_query = str(sql) - DB.query(tag_query.format(','.join(values))) + cls.db.query(tag_query.format(','.join(values))) values.clear() - + cls.commitTx() n = len(cls._tags) cls._tags.clear() return n diff --git a/hive/indexer/votes.py b/hive/indexer/votes.py index 07952303d7d90d7e852c0214a7b547d13c0d6575..c8b0b7194d28359eb2c88a301f5ffd83a774413e 100644 --- a/hive/indexer/votes.py +++ b/hive/indexer/votes.py @@ -4,11 +4,11 @@ import logging from hive.db.db_state import DbState from hive.db.adapter import Db +from hive.indexer.db_adapter_holder import DbAdapterHolder log = logging.getLogger(__name__) -DB = Db.instance() -class Votes: +class Votes(DbAdapterHolder): """ Class for managing posts votes """ _votes_data = {} @@ -67,9 +67,12 @@ class Votes: @classmethod def flush(cls): """ Flush vote data from cache to database """ + cls.inside_flush = True n = 0 if cls._votes_data: + cls.beginTx() + sql = """ INSERT INTO hive_votes (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective) @@ -110,16 +113,19 @@ class Votes: if len(values) >= values_limit: values_str = ','.join(values) actual_query = sql.format(values_str) - DB.query(actual_query) + cls.db.query(actual_query) values.clear() if len(values) > 0: values_str = ','.join(values) actual_query = sql.format(values_str) - DB.query(actual_query) + cls.db.query(actual_query) values.clear() n = len(cls._votes_data) cls._votes_data.clear() + cls.commitTx() + cls.inside_flush = False + return n