Skip to content
Snippets Groups Projects
Commit cb7050d3 authored by Dariusz Kędzierski's avatar Dariusz Kędzierski
Browse files

Add num_block to tables, add proper reblog support

* Added num_block to hive_follow and hive_reblog tables,
* Added reblog class to process reblog operations,
* Sync will now show version info, git revision and db version on start
parent ab277b83
No related branches found
No related tags found
5 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!138Small typos fixed,!135Enable postgres monitoring on CI server,!112Add num_block to tables, add proper reblog support
......@@ -177,7 +177,11 @@ class Db:
try:
start = perf()
query = self._sql_text(sql)
if 'log_query' in kwargs and kwargs['log_query']:
log.info("QUERY: {}".format(query))
result = self._exec(query, **kwargs)
if 'log_result' in kwargs and kwargs['log_result']:
log.info("RESULT: {}".format(result))
Stats.log_db(sql, perf() - start)
return result
except Exception as e:
......
......@@ -235,10 +235,13 @@ def build_metadata():
sa.Column('created_at', sa.DateTime, nullable=False),
sa.Column('blacklisted', sa.Boolean, nullable=False, server_default='0'),
sa.Column('follow_blacklists', sa.Boolean, nullable=False, server_default='0'),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.PrimaryKeyConstraint('following', 'follower', name='hive_follows_pk'), # core
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_follows_fk1'),
sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'),
sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'),
sa.Index('hive_follows_block_num_idx', 'block_num')
)
sa.Table(
......@@ -246,12 +249,15 @@ def build_metadata():
sa.Column('account', VARCHAR(16), nullable=False),
sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.ForeignKeyConstraint(['account'], ['hive_accounts.name'], name='hive_reblogs_fk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_reblogs_fk2'),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_reblogs_fk3'),
sa.PrimaryKeyConstraint('account', 'post_id', name='hive_reblogs_pk'), # core
sa.Index('hive_reblogs_account', 'account'),
sa.Index('hive_reblogs_post_id', 'post_id'),
sa.Index('hive_reblogs_block_num_idx', 'block_num')
)
sa.Table(
......
"""Blocks processor."""
from hive.indexer.reblog import Reblog
import logging
import json
......@@ -62,6 +63,7 @@ class Blocks:
Tags.flush()
Votes.flush()
Posts.flush()
Reblog.flush()
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter()
......@@ -103,6 +105,7 @@ class Blocks:
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Reblog", Reblog.flush())
if (not is_initial_sync) and (first_block > -1):
cls.on_live_blocks_processed( first_block, last_num )
......
"""Main custom_json op handler."""
import logging
from funcy.seqs import first, second
from hive.db.adapter import Db
from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.posts import Posts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow
from hive.indexer.reblog import Reblog
from hive.indexer.notify import Notify
from hive.indexer.community import process_json_community_op, START_BLOCK
......@@ -44,7 +42,7 @@ class CustomOp:
"""Given a list of operation in block, filter and process them."""
for op in ops:
start = OPSM.start()
opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify'] else '' )
opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify', 'reblog'] else '' )
account = _get_auth(op)
if not account:
......@@ -54,13 +52,19 @@ class CustomOp:
if op['id'] == 'follow':
if block_num < 6000000 and not isinstance(op_json, list):
op_json = ['follow', op_json] # legacy compat
cls._process_legacy(account, op_json, block_date)
cls._process_legacy(account, op_json, block_date, block_num)
elif op['id'] == 'reblog':
if block_num < 6000000 and not isinstance(op_json, list):
op_json = ['reblog', op_json] # legacy compat
cls._process_legacy(account, op_json, block_date, block_num)
with open("reblog.log", "a") as reblog_file:
reblog_file.write("{} -> {}".format(account, op_json))
reblog_file.write("\n")
elif op['id'] == 'community':
if block_num > START_BLOCK:
process_json_community_op(account, op_json, block_date)
elif op['id'] == 'notify':
cls._process_notify(account, op_json, block_date)
OPSM.op_stats(opName, OPSM.stop(start))
@classmethod
......@@ -81,7 +85,7 @@ class CustomOp:
log.warning("notify op fail: %s in %s", e, op_json)
@classmethod
def _process_legacy(cls, account, op_json, block_date):
def _process_legacy(cls, account, op_json, block_date, block_num):
"""Handle legacy 'follow' plugin ops (follow/mute/clear, reblog)
follow {follower: {type: 'account'},
......@@ -103,70 +107,6 @@ class CustomOp:
cmd, op_json = op_json # ['follow', {data...}]
if cmd == 'follow':
Follow.follow_op(account, op_json, block_date)
Follow.follow_op(account, op_json, block_date, block_num)
elif cmd == 'reblog':
cls.reblog(account, op_json, block_date)
@classmethod
def reblog(cls, account, op_json, block_date):
"""Handle legacy 'reblog' op"""
if ('account' not in op_json
or 'author' not in op_json
or 'permlink' not in op_json):
return
blogger = op_json['account']
author = op_json['author']
permlink = op_json['permlink']
if blogger != account:
return # impersonation
if not all(map(Accounts.exists, [author, blogger])):
return
if 'delete' in op_json and op_json['delete'] == 'delete':
sql = """
WITH processing_set AS (
SELECT hp.id as post_id, ha.id as account_id
FROM hive_posts hp
INNER JOIN hive_accounts ha ON hp.author_id = ha.id
INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0
)
DELETE FROM hive_reblogs AS hr
WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps)
RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id
"""
row = DB.query_row(sql, a=blogger, permlink=permlink)
if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink)
return
if not DbState.is_initial_sync():
result = dict(row)
FeedCache.delete(result['post_id'], result['account_id'])
else:
sql = """
INSERT INTO hive_reblogs (account, post_id, created_at)
SELECT ha.name, hp.id as post_id, :date
FROM hive_accounts ha
INNER JOIN hive_posts hp ON hp.author_id = ha.id
INNER JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id
WHERE ha.name = :a AND hpd.permlink = :p
ON CONFLICT (account, post_id) DO NOTHING
RETURNING post_id
"""
row = DB.query_row(sql, a=blogger, p=permlink, date=block_date)
if not DbState.is_initial_sync():
author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger)
if row is not None:
result = dict(row)
post_id = result['post_id']
FeedCache.insert(post_id, blogger_id, block_date)
Notify('reblog', src_id=blogger_id, dst_id=author_id,
post_id=post_id, when=block_date,
score=Accounts.default_score(blogger)).write()
else:
log.error("Error in reblog: row is None!")
Reblog.reblog_op(account, op_json, block_date, block_num)
......@@ -17,7 +17,7 @@ FOLLOWERS = 'followers'
FOLLOWING = 'following'
FOLLOW_ITEM_INSERT_QUERY = """
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists)
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num)
VALUES
(
:flr,
......@@ -35,7 +35,8 @@ FOLLOW_ITEM_INSERT_QUERY = """
WHEN 4 THEN TRUE
ELSE TRUE
END
)
),
:block_num
)
ON CONFLICT (follower, following) DO UPDATE
SET
......@@ -71,11 +72,12 @@ class Follow:
follow_items_to_flush = dict()
@classmethod
def follow_op(cls, account, op_json, date):
def follow_op(cls, account, op_json, date, block_num):
"""Process an incoming follow op."""
op = cls._validated_op(account, op_json, date)
if not op:
return
op['block_num'] = block_num
# perform delta check
new_state = op['state']
......@@ -94,7 +96,8 @@ class Follow:
flr=op['flr'],
flg=op['flg'],
state=op['state'],
at=op['at'])
at=op['at'],
block_num=op['block_num'])
else:
old_state = cls._get_follow_db_state(op['flr'], op['flg'])
......@@ -171,7 +174,7 @@ class Follow:
@classmethod
def _flush_follow_items(cls):
sql_prefix = """
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists)
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num)
VALUES """
sql_postfix = """
......@@ -198,20 +201,22 @@ class Follow:
count = 0
for _, follow_item in cls.follow_items_to_flush.items():
if count < limit:
values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'],
follow_item['state'] == 3,
follow_item['state'] == 4))
follow_item['state'] == 4,
follow_item['block_num']))
count = count + 1
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
values.clear()
values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'],
follow_item['state'] == 3,
follow_item['state'] == 4))
follow_item['state'] == 4,
follow_item['block_num']))
count = 1
if len(values) > 0:
......
""" Class for reblog operations """
import logging
from hive.db.adapter import Db
from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.notify import Notify
DB = Db.instance()
log = logging.getLogger(__name__)
DELETE_SQL = """
WITH processing_set AS (
SELECT hp.id as post_id, ha.id as account_id
FROM hive_posts hp
INNER JOIN hive_accounts ha ON hp.author_id = ha.id
INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0
)
DELETE FROM hive_reblogs AS hr
WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps)
RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id
"""
SELECT_SQL = """
SELECT :blogger as blogger, hp.id as post_id, :date as date, :block_num as block_num
FROM hive_posts hp
INNER JOIN hive_accounts ha ON ha.id = hp.author_id
INNER JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id
WHERE ha.name = :author AND hpd.permlink = :permlink AND hp.depth <= 0
"""
INSERT_SQL = """
INSERT INTO hive_reblogs (account, post_id, created_at, block_num)
""" + SELECT_SQL + """
ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING
RETURNING post_id
"""
class Reblog():
""" Class for reblog operations """
reblog_items_to_flush = []
@classmethod
def reblog_op(cls, account, op_json, block_date, block_num):
if 'account' not in op_json or \
'author' not in op_json or \
'permlink' not in op_json:
return
blogger = op_json['account']
author = op_json['author']
permlink = op_json['permlink']
if blogger != account:
return # impersonation
if not all(map(Accounts.exists, [author, blogger])):
return
if 'delete' in op_json and op_json['delete'] == 'delete':
row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink)
if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink)
return
if not DbState.is_initial_sync():
result = dict(row)
FeedCache.delete(result['post_id'], result['account_id'])
else:
if DbState.is_initial_sync():
row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
result = dict(row)
cls.reblog_items_to_flush.append(result)
else:
row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger)
result = dict(row)
post_id = result['post_id']
FeedCache.insert(post_id, blogger_id, block_date)
Notify('reblog', src_id=blogger_id, dst_id=author_id,
post_id=post_id, when=block_date,
score=Accounts.default_score(blogger)).write()
else:
log.warning("Error in reblog: Insert operation returned `None` as `post_id`. Op details: {}".format(op_json))
@classmethod
def flush(cls):
sql_prefix = """
INSERT INTO hive_reblogs (account, post_id, created_at, block_num)
VALUES
"""
sql_postfix = """
ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING
"""
values = []
limit = 1000
count = 0
item_count = len(cls.reblog_items_to_flush)
for reblog_item in cls.reblog_items_to_flush:
if count < limit:
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = count + 1
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
values.clear()
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = 1
if len(values) > 0:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.reblog_items_to_flush.clear()
return item_count
\ No newline at end of file
"""Hive sync manager."""
from hive.indexer.reblog import Reblog
import logging
import glob
from time import perf_counter as perf
......@@ -211,6 +212,12 @@ class Sync:
def run(self):
"""Initialize state; setup/recovery checks; sync and runloop."""
from hive.version import VERSION, GIT_REVISION
log.info("hivemind_version : %s", VERSION)
log.info("hivemind_git_rev : %s", GIT_REVISION)
from hive.db.schema import DB_VERSION as SCHEMA_DB_VERSION
log.info("database_schema_version : %s", SCHEMA_DB_VERSION)
# ensure db schema up to date, check app status
DbState.initialize()
......@@ -228,6 +235,10 @@ class Sync:
# community stats
Community.recalc_pending_payouts()
sql = "SELECT num FROM hive_blocks ORDER BY num DESC LIMIT 1"
database_head_block = DbState.db().query_one(sql)
log.info("database_head_block : %s", database_head_block)
if DbState.is_initial_sync():
# resume initial sync
self.initial()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment