diff --git a/hive/db/adapter.py b/hive/db/adapter.py index e175c540bdb4e9e754e2b8e8e9c80aa862f12e08..bddf312530bdcc7696c76e94cf2ecdff9050d66f 100644 --- a/hive/db/adapter.py +++ b/hive/db/adapter.py @@ -177,7 +177,11 @@ class Db: try: start = perf() query = self._sql_text(sql) + if 'log_query' in kwargs and kwargs['log_query']: + log.info("QUERY: {}".format(query)) result = self._exec(query, **kwargs) + if 'log_result' in kwargs and kwargs['log_result']: + log.info("RESULT: {}".format(result)) Stats.log_db(sql, perf() - start) return result except Exception as e: diff --git a/hive/db/schema.py b/hive/db/schema.py index fc3734cac881052f35083935e2128c4c4cc4e81a..fa2b8fcd155a2be1916c1280628cbe0e87e1b753 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -224,10 +224,13 @@ def build_metadata(): sa.Column('created_at', sa.DateTime, nullable=False), sa.Column('blacklisted', sa.Boolean, nullable=False, server_default='0'), sa.Column('follow_blacklists', sa.Boolean, nullable=False, server_default='0'), + sa.Column('block_num', sa.Integer, nullable=False ), sa.PrimaryKeyConstraint('following', 'follower', name='hive_follows_pk'), # core + sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_follows_fk1'), sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'), sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'), + sa.Index('hive_follows_block_num_idx', 'block_num') ) sa.Table( @@ -235,12 +238,15 @@ def build_metadata(): sa.Column('account', VARCHAR(16), nullable=False), sa.Column('post_id', sa.Integer, nullable=False), sa.Column('created_at', sa.DateTime, nullable=False), + sa.Column('block_num', sa.Integer, nullable=False ), sa.ForeignKeyConstraint(['account'], ['hive_accounts.name'], name='hive_reblogs_fk1'), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_reblogs_fk2'), + sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_reblogs_fk3'), sa.PrimaryKeyConstraint('account', 'post_id', name='hive_reblogs_pk'), # core sa.Index('hive_reblogs_account', 'account'), sa.Index('hive_reblogs_post_id', 'post_id'), + sa.Index('hive_reblogs_block_num_idx', 'block_num') ) sa.Table( diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 56005d778b0adde5133a8d7c7e493fa7bc6aab51..eb4f6597542bd46373a30bc39576a5f5f435a210 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -1,5 +1,6 @@ """Blocks processor.""" +from hive.indexer.reblog import Reblog import logging import json @@ -62,6 +63,7 @@ class Blocks: Tags.flush() Votes.flush() Posts.flush() + Reblog.flush() block_num = int(block['block_id'][:8], base=16) cls.on_live_blocks_processed( block_num, block_num ) time_end = perf_counter() @@ -103,6 +105,7 @@ class Blocks: folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Posts", Posts.flush()) + flush_time = register_time(flush_time, "Reblog", Reblog.flush()) if (not is_initial_sync) and (first_block > -1): cls.on_live_blocks_processed( first_block, last_num ) diff --git a/hive/indexer/custom_op.py b/hive/indexer/custom_op.py index 27289e65ec099d0d5377b6aa5a12f9905e8de9dd..7c7ccd58d0a709fb8e57fb909fca03edd77905ca 100644 --- a/hive/indexer/custom_op.py +++ b/hive/indexer/custom_op.py @@ -1,14 +1,12 @@ """Main custom_json op handler.""" + import logging from funcy.seqs import first, second from hive.db.adapter import Db -from hive.db.db_state import DbState -from hive.indexer.accounts import Accounts -from hive.indexer.posts import Posts -from hive.indexer.feed_cache import FeedCache from hive.indexer.follow import Follow +from hive.indexer.reblog import Reblog from hive.indexer.notify import Notify from hive.indexer.community import process_json_community_op, START_BLOCK @@ -44,7 +42,7 @@ class CustomOp: """Given a list of operation in block, filter and process them.""" for op in ops: start = OPSM.start() - opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify'] else '' ) + opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify', 'reblog'] else '' ) account = _get_auth(op) if not account: @@ -54,13 +52,16 @@ class CustomOp: if op['id'] == 'follow': if block_num < 6000000 and not isinstance(op_json, list): op_json = ['follow', op_json] # legacy compat - cls._process_legacy(account, op_json, block_date) + cls._process_legacy(account, op_json, block_date, block_num) + elif op['id'] == 'reblog': + if block_num < 6000000 and not isinstance(op_json, list): + op_json = ['reblog', op_json] # legacy compat + cls._process_legacy(account, op_json, block_date, block_num) elif op['id'] == 'community': if block_num > START_BLOCK: process_json_community_op(account, op_json, block_date) elif op['id'] == 'notify': cls._process_notify(account, op_json, block_date) - OPSM.op_stats(opName, OPSM.stop(start)) @classmethod @@ -81,7 +82,7 @@ class CustomOp: log.warning("notify op fail: %s in %s", e, op_json) @classmethod - def _process_legacy(cls, account, op_json, block_date): + def _process_legacy(cls, account, op_json, block_date, block_num): """Handle legacy 'follow' plugin ops (follow/mute/clear, reblog) follow {follower: {type: 'account'}, @@ -103,70 +104,6 @@ class CustomOp: cmd, op_json = op_json # ['follow', {data...}] if cmd == 'follow': - Follow.follow_op(account, op_json, block_date) + Follow.follow_op(account, op_json, block_date, block_num) elif cmd == 'reblog': - cls.reblog(account, op_json, block_date) - - @classmethod - def reblog(cls, account, op_json, block_date): - """Handle legacy 'reblog' op""" - if ('account' not in op_json - or 'author' not in op_json - or 'permlink' not in op_json): - return - blogger = op_json['account'] - author = op_json['author'] - permlink = op_json['permlink'] - - if blogger != account: - return # impersonation - if not all(map(Accounts.exists, [author, blogger])): - return - - if 'delete' in op_json and op_json['delete'] == 'delete': - sql = """ - WITH processing_set AS ( - SELECT hp.id as post_id, ha.id as account_id - FROM hive_posts hp - INNER JOIN hive_accounts ha ON hp.author_id = ha.id - INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id - WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0 - ) - DELETE FROM hive_reblogs AS hr - WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps) - RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id - """ - - row = DB.query_row(sql, a=blogger, permlink=permlink) - if row is None: - log.debug("reblog: post not found: %s/%s", author, permlink) - return - - if not DbState.is_initial_sync(): - result = dict(row) - FeedCache.delete(result['post_id'], result['account_id']) - - else: - sql = """ - INSERT INTO hive_reblogs (account, post_id, created_at) - SELECT ha.name, hp.id as post_id, :date - FROM hive_accounts ha - INNER JOIN hive_posts hp ON hp.author_id = ha.id - INNER JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id - WHERE ha.name = :a AND hpd.permlink = :p - ON CONFLICT (account, post_id) DO NOTHING - RETURNING post_id - """ - row = DB.query_row(sql, a=blogger, p=permlink, date=block_date) - if not DbState.is_initial_sync(): - author_id = Accounts.get_id(author) - blogger_id = Accounts.get_id(blogger) - if row is not None: - result = dict(row) - post_id = result['post_id'] - FeedCache.insert(post_id, blogger_id, block_date) - Notify('reblog', src_id=blogger_id, dst_id=author_id, - post_id=post_id, when=block_date, - score=Accounts.default_score(blogger)).write() - else: - log.error("Error in reblog: row is None!") + Reblog.reblog_op(account, op_json, block_date, block_num) diff --git a/hive/indexer/follow.py b/hive/indexer/follow.py index 92deadf60bc027199f45e9073d9b53cda288dfc7..1734b3eafd64eb203fb3e462e65db3a5c669319a 100644 --- a/hive/indexer/follow.py +++ b/hive/indexer/follow.py @@ -17,7 +17,7 @@ FOLLOWERS = 'followers' FOLLOWING = 'following' FOLLOW_ITEM_INSERT_QUERY = """ - INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists) + INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num) VALUES ( :flr, @@ -35,7 +35,8 @@ FOLLOW_ITEM_INSERT_QUERY = """ WHEN 4 THEN TRUE ELSE TRUE END - ) + ), + :block_num ) ON CONFLICT (follower, following) DO UPDATE SET @@ -71,11 +72,12 @@ class Follow: follow_items_to_flush = dict() @classmethod - def follow_op(cls, account, op_json, date): + def follow_op(cls, account, op_json, date, block_num): """Process an incoming follow op.""" op = cls._validated_op(account, op_json, date) if not op: return + op['block_num'] = block_num # perform delta check new_state = op['state'] @@ -94,7 +96,8 @@ class Follow: flr=op['flr'], flg=op['flg'], state=op['state'], - at=op['at']) + at=op['at'], + block_num=op['block_num']) else: old_state = cls._get_follow_db_state(op['flr'], op['flg']) @@ -171,7 +174,7 @@ class Follow: @classmethod def _flush_follow_items(cls): sql_prefix = """ - INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists) + INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num) VALUES """ sql_postfix = """ @@ -198,20 +201,22 @@ class Follow: count = 0 for _, follow_item in cls.follow_items_to_flush.items(): if count < limit: - values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'], + values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'], follow_item['at'], follow_item['state'], follow_item['state'] == 3, - follow_item['state'] == 4)) + follow_item['state'] == 4, + follow_item['block_num'])) count = count + 1 else: query = sql_prefix + ",".join(values) query += sql_postfix DB.query(query) values.clear() - values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'], + values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'], follow_item['at'], follow_item['state'], follow_item['state'] == 3, - follow_item['state'] == 4)) + follow_item['state'] == 4, + follow_item['block_num'])) count = 1 if len(values) > 0: diff --git a/hive/indexer/reblog.py b/hive/indexer/reblog.py new file mode 100644 index 0000000000000000000000000000000000000000..f5957b37e43861e2e204e043809ef71331d80b66 --- /dev/null +++ b/hive/indexer/reblog.py @@ -0,0 +1,124 @@ +""" Class for reblog operations """ + +import logging + +from hive.db.adapter import Db +from hive.db.db_state import DbState + +from hive.indexer.accounts import Accounts +from hive.indexer.feed_cache import FeedCache +from hive.indexer.notify import Notify + + +DB = Db.instance() + +log = logging.getLogger(__name__) + +DELETE_SQL = """ + WITH processing_set AS ( + SELECT hp.id as post_id, ha.id as account_id + FROM hive_posts hp + INNER JOIN hive_accounts ha ON hp.author_id = ha.id + INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id + WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0 + ) + DELETE FROM hive_reblogs AS hr + WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps) + RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id +""" + +SELECT_SQL = """ + SELECT :blogger as blogger, hp.id as post_id, :date as date, :block_num as block_num + FROM hive_posts hp + INNER JOIN hive_accounts ha ON ha.id = hp.author_id + INNER JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id + WHERE ha.name = :author AND hpd.permlink = :permlink AND hp.depth <= 0 +""" + +INSERT_SQL = """ + INSERT INTO hive_reblogs (account, post_id, created_at, block_num) +""" + SELECT_SQL + """ + ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING + RETURNING post_id +""" + +class Reblog(): + """ Class for reblog operations """ + reblog_items_to_flush = [] + + @classmethod + def reblog_op(cls, account, op_json, block_date, block_num): + if 'account' not in op_json or \ + 'author' not in op_json or \ + 'permlink' not in op_json: + return + + blogger = op_json['account'] + author = op_json['author'] + permlink = op_json['permlink'] + + if blogger != account: + return # impersonation + if not all(map(Accounts.exists, [author, blogger])): + return + + if 'delete' in op_json and op_json['delete'] == 'delete': + row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink) + if row is None: + log.debug("reblog: post not found: %s/%s", author, permlink) + return + if not DbState.is_initial_sync(): + result = dict(row) + FeedCache.delete(result['post_id'], result['account_id']) + else: + if DbState.is_initial_sync(): + row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) + if row is not None: + result = dict(row) + cls.reblog_items_to_flush.append(result) + else: + row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) + if row is not None: + author_id = Accounts.get_id(author) + blogger_id = Accounts.get_id(blogger) + result = dict(row) + post_id = result['post_id'] + FeedCache.insert(post_id, blogger_id, block_date) + Notify('reblog', src_id=blogger_id, dst_id=author_id, + post_id=post_id, when=block_date, + score=Accounts.default_score(blogger)).write() + else: + log.warning("Error in reblog: Insert operation returned `None` as `post_id`. Op details: {}".format(op_json)) + @classmethod + def flush(cls): + sql_prefix = """ + INSERT INTO hive_reblogs (account, post_id, created_at, block_num) + VALUES + """ + sql_postfix = """ + ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING + """ + + values = [] + limit = 1000 + count = 0 + item_count = len(cls.reblog_items_to_flush) + + for reblog_item in cls.reblog_items_to_flush: + if count < limit: + values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"])) + count = count + 1 + else: + query = sql_prefix + ",".join(values) + query += sql_postfix + DB.query(query) + values.clear() + values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"])) + count = 1 + + if len(values) > 0: + query = sql_prefix + ",".join(values) + query += sql_postfix + DB.query(query) + cls.reblog_items_to_flush.clear() + return item_count \ No newline at end of file diff --git a/hive/indexer/sync.py b/hive/indexer/sync.py index d60791260352025140e3bcfe2cd3e4c09eff6160..b3b63123cf632686fea79f6036b2a915ca59e28f 100644 --- a/hive/indexer/sync.py +++ b/hive/indexer/sync.py @@ -1,5 +1,6 @@ """Hive sync manager.""" +from hive.indexer.reblog import Reblog import logging import glob from time import perf_counter as perf @@ -211,6 +212,12 @@ class Sync: def run(self): """Initialize state; setup/recovery checks; sync and runloop.""" + from hive.version import VERSION, GIT_REVISION + log.info("hivemind_version : %s", VERSION) + log.info("hivemind_git_rev : %s", GIT_REVISION) + + from hive.db.schema import DB_VERSION as SCHEMA_DB_VERSION + log.info("database_schema_version : %s", SCHEMA_DB_VERSION) # ensure db schema up to date, check app status DbState.initialize() @@ -228,6 +235,10 @@ class Sync: # community stats Community.recalc_pending_payouts() + sql = "SELECT num FROM hive_blocks ORDER BY num DESC LIMIT 1" + database_head_block = DbState.db().query_one(sql) + log.info("database_head_block : %s", database_head_block) + if DbState.is_initial_sync(): last_imported_block = Blocks.head_num() # resume initial sync