Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/hivemind
1 result
Show changes
Commits on Source (12)
......@@ -284,23 +284,23 @@ follow_api_smoketest:
reports:
junit: api_smoketest_follow_api.xml
tags_api_smoketest_old:
tags_api_smoketest:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_tags_api_patterns.tavern.yaml api_smoketest_tags_api_old.xml
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_patterns/ api_smoketest_tags_api.xml
artifacts:
reports:
junit: api_smoketest_tags_api_old.xml
junit: api_smoketest_tags_api.xml
tags_api_smoketest:
tags_api_smoketest_negative:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_patterns/ api_smoketest_tags_api.xml
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_negative/ api_smoketest_tags_api_negative.xml
artifacts:
reports:
junit: api_smoketest_tags_api.xml
junit: api_smoketest_tags_api_negative.xml
......@@ -50,6 +50,9 @@ class Db:
self._exec = self._conn.execute
self._exec(sqlalchemy.text("COMMIT"))
def clone(self):
return Db(self._url)
def engine(self):
"""Lazy-loaded SQLAlchemy engine."""
if not self._engine:
......
......@@ -8,7 +8,7 @@ from time import perf_counter
import logging
import sqlalchemy
from hive.db.schema import (setup, reset_autovac, build_metadata,
from hive.db.schema import (setup, reset_autovac, set_logged_table_attribute, build_metadata,
build_metadata_community, teardown, DB_VERSION)
from hive.db.adapter import Db
......@@ -46,8 +46,6 @@ class DbState:
log.info("[INIT] Create db schema...")
setup(cls.db())
cls._before_initial_sync()
# perform db migrations
cls._check_migrations()
......@@ -91,15 +89,12 @@ class DbState:
@classmethod
def _disableable_indexes(cls):
to_locate = [
#'hive_posts_ix3', # (author, depth, id)
#'hive_posts_ix4', # (parent_id, id, counter_deleted=0)
#'hive_posts_ix5', # (community_id>0, is_pinned=1)
'hive_follows_ix5a', # (following, state, created_at, follower)
'hive_follows_ix5b', # (follower, state, created_at, following)
# 'hive_posts_parent_id_idx',
'hive_posts_parent_id_idx',
'hive_posts_depth_idx',
'hive_posts_created_at_idx',
'hive_posts_root_id_id_idx',
'hive_posts_community_id_idx',
......@@ -115,20 +110,9 @@ class DbState:
'hive_votes_voter_id_idx',
'hive_votes_block_num_idx',
#'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0)
#'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0)
#'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0)
#'hive_posts_cache_ix7b', # (post_id, sc_hot, paidout=0)
#'hive_posts_cache_ix8', # (category, payout, depth, paidout=0)
#'hive_posts_cache_ix9a', # (depth, payout, post_id, paidout=0)
#'hive_posts_cache_ix9b', # (category, depth, payout, post_id, paidout=0)
#'hive_posts_cache_ix10', # (post_id, payout, gray=1, payout>0)
#'hive_posts_cache_ix30', # API: community trend
#'hive_posts_cache_ix31', # API: community hot
#'hive_posts_cache_ix32', # API: community created
#'hive_posts_cache_ix33', # API: community payout
#'hive_posts_cache_ix34', # API: community muted
'hive_accounts_ix5' # (cached_at, name)
'hive_accounts_ix5', # (cached_at, name)
'hive_post_tags_tag_id_idx'
]
to_return = []
......@@ -145,12 +129,18 @@ class DbState:
return to_return
@classmethod
def _before_initial_sync(cls):
def before_initial_sync(cls, last_imported_block, hived_head_block):
"""Routine which runs *once* after db setup.
Disables non-critical indexes for faster initial sync, as well
as foreign key constraints."""
to_sync = hived_head_block - last_imported_block
if to_sync < SYNCED_BLOCK_LIMIT:
log.info("[INIT] Skipping pre-initial sync hooks")
return
engine = cls.db().engine()
log.info("[INIT] Begin pre-initial sync hooks")
......@@ -162,13 +152,30 @@ class DbState:
except sqlalchemy.exc.ProgrammingError as ex:
log.warning("Ignoring ex: {}".format(ex))
# TODO: #111
#for key in cls._all_foreign_keys():
# log.info("Drop fk %s", key.name)
# key.drop(engine)
from hive.db.schema import drop_fk, set_logged_table_attribute
log.info("Dropping FKs")
drop_fk(cls.db())
set_logged_table_attribute(cls.db(), False)
log.info("[INIT] Finish pre-initial sync hooks")
@classmethod
def update_work_mem(cls, workmem_value):
row = cls.db().query_row("SHOW work_mem")
current_work_mem = row['work_mem']
sql = """
DO $$
BEGIN
EXECUTE 'ALTER DATABASE '||current_database()||' SET work_mem TO "{}"';
END
$$;
"""
cls.db().query_no_return(sql.format(workmem_value))
return current_work_mem
@classmethod
def _after_initial_sync(cls, current_imported_block, last_imported_block):
"""Routine which runs *once* after initial sync.
......@@ -197,6 +204,8 @@ class DbState:
else:
log.info("[INIT] Post-initial sync hooks skipped")
current_work_mem = cls.update_work_mem('2GB')
time_start = perf_counter()
# Update count of all child posts (what was hold during initial sync)
......@@ -243,10 +252,15 @@ class DbState:
time_end = perf_counter()
log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start)
# TODO: #111
#for key in cls._all_foreign_keys():
# log.info("Create fk %s", key.name)
# key.create(engine)
cls.update_work_mem(current_work_mem)
if synced_blocks >= SYNCED_BLOCK_LIMIT:
from hive.db.schema import create_fk, set_logged_table_attribute
set_logged_table_attribute(cls.db(), True)
log.info("Recreating FKs")
create_fk(cls.db())
@staticmethod
def status():
......
......@@ -8,6 +8,9 @@ from sqlalchemy.types import VARCHAR
from sqlalchemy.types import TEXT
from sqlalchemy.types import BOOLEAN
import logging
log = logging.getLogger(__name__)
#pylint: disable=line-too-long, too-many-lines, bad-whitespace
# [DK] we changed and removed some tables so i upgraded DB_VERSION to 18
......@@ -61,6 +64,23 @@ def build_metadata():
sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep
)
sa.Table(
'hive_reputation_data', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('author_id', sa.Integer, nullable=False),
sa.Column('voter_id', sa.Integer, nullable=False),
sa.Column('permlink', sa.String(255, collation='C'), nullable=False),
sa.Column('rshares', sa.BigInteger, nullable=False),
sa.Column('block_num', sa.Integer, nullable=False),
# sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
# sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']),
# sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']),
sa.UniqueConstraint('author_id', 'permlink', 'voter_id', name='hive_reputation_data_uk')
)
sa.Table(
'hive_posts', metadata,
sa.Column('id', sa.Integer, primary_key=True),
......@@ -183,11 +203,11 @@ def build_metadata():
sa.PrimaryKeyConstraint('author_id', 'permlink_id', 'voter_id', name='hive_votes_pk'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']),
sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_votes_fk1'),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id'], name='hive_votes_fk2'),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id'], name='hive_votes_fk3'),
sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id'], name='hive_votes_fk4'),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_votes_fk5'),
sa.Index('hive_votes_post_id_idx', 'post_id'),
sa.Index('hive_votes_voter_id_idx', 'voter_id'),
......@@ -206,9 +226,10 @@ def build_metadata():
sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('tag_id', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']),
sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id']),
sa.Index('hive_post_tags_post_id_idx', 'post_id'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_post_tags_fk1'),
sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id'], name='hive_post_tags_fk2'),
sa.Index('hive_post_tags_tag_id_idx', 'tag_id')
)
......@@ -377,6 +398,24 @@ def teardown(db):
"""Drop all tables"""
build_metadata().drop_all(db.engine())
def drop_fk(db):
db.query_no_return("START TRANSACTION")
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
sql = """ALTER TABLE {} DROP CONSTRAINT IF EXISTS {}""".format(table.name, fk.name)
db.query_no_return(sql)
db.query_no_return("COMMIT")
def create_fk(db):
from sqlalchemy.schema import AddConstraint
from sqlalchemy import text
connection = db.engine().connect()
connection.execute(text("START TRANSACTION"))
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
connection.execute(AddConstraint(fk.constraint))
connection.execute(text("COMMIT"))
def setup(db):
"""Creates all tables and seed data"""
# initialize schema
......@@ -1372,6 +1411,64 @@ def setup(db):
"""
db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS process_reputation_data(in _block_num hive_blocks.num%TYPE, in _author hive_accounts.name%TYPE,
in _permlink hive_permlink_data.permlink%TYPE, in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE)
;
CREATE OR REPLACE FUNCTION process_reputation_data(in _block_num hive_blocks.num%TYPE,
in _author hive_accounts.name%TYPE, in _permlink hive_permlink_data.permlink%TYPE,
in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE)
RETURNS void
LANGUAGE sql
VOLATILE
AS $BODY$
WITH __insert_info AS (
INSERT INTO hive_reputation_data
(author_id, voter_id, permlink, block_num, rshares)
--- Warning DISTINCT is needed here since we have to strict join to hv table and there is really made a CROSS JOIN
--- between ha and hv records (producing 2 duplicated records)
SELECT DISTINCT ha.id as author_id, hv.id as voter_id, _permlink, _block_num, _rshares
FROM hive_accounts ha
JOIN hive_accounts hv ON hv.name = _voter
JOIN hive_posts hp ON hp.author_id = ha.id
JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE hpd.permlink = _permlink
AND ha.name = _author
AND NOT hp.is_paidout --- voting on paidout posts shall have no effect
AND hv.reputation >= 0 --- voter's negative reputation eliminates vote from processing
AND (_rshares >= 0
OR (hv.reputation >= (ha.reputation - COALESCE((SELECT (hrd.rshares >> 6) -- if previous vote was a downvote we need to correct author reputation before current comparison to voter's reputation
FROM hive_reputation_data hrd
WHERE hrd.author_id = ha.id
AND hrd.voter_id=hv.id
AND hrd.permlink=_permlink
AND hrd.rshares < 0), 0)))
)
ON CONFLICT ON CONSTRAINT hive_reputation_data_uk DO
UPDATE SET
rshares = EXCLUDED.rshares
RETURNING (xmax = 0) AS is_new_vote,
(SELECT hrd.rshares
FROM hive_reputation_data hrd
--- Warning we want OLD row here, not both, so we're using old ID to select old one (new record has different value) !!!
WHERE hrd.id = hive_reputation_data.id AND hrd.author_id = author_id and hrd.voter_id=voter_id and hrd.permlink=_permlink) AS old_rshares, author_id, voter_id
)
UPDATE hive_accounts uha
SET reputation = CASE __insert_info.is_new_vote
WHEN true THEN ha.reputation + (_rshares >> 6)
ELSE ha.reputation - (__insert_info.old_rshares >> 6) + (_rshares >> 6)
END
FROM hive_accounts ha
JOIN __insert_info ON ha.id = __insert_info.author_id
WHERE uha.id = __insert_info.author_id
;
$BODY$;
"""
db.query_no_return(sql)
def reset_autovac(db):
"""Initializes/resets per-table autovacuum/autoanalyze params.
......@@ -1403,9 +1500,28 @@ def set_fillfactor(db):
fillfactor_config = {
'hive_posts': 70,
'hive_post_data': 70,
'hive_votes': 70
'hive_votes': 70,
'hive_reputation_data': 50
}
for table, fillfactor in fillfactor_config.items():
sql = """ALTER TABLE {} SET (FILLFACTOR = {})"""
db.query(sql.format(table, fillfactor))
def set_logged_table_attribute(db, logged):
"""Initializes/resets LOGGED/UNLOGGED attribute for tables which are intesively updated"""
logged_config = [
'hive_accounts',
'hive_permlink_data',
'hive_post_tags',
'hive_posts',
'hive_post_data',
'hive_votes',
'hive_reputation_data'
]
for table in logged_config:
log.info("Setting {} attribute on a table: {}".format('LOGGED' if logged else 'UNLOGGED', table))
sql = """ALTER TABLE {} SET {}"""
db.query_no_return(sql.format(table, 'LOGGED' if logged else 'UNLOGGED'))
......@@ -2,6 +2,7 @@
from hive.indexer.reblog import Reblog
import logging
import concurrent
from hive.db.adapter import Db
......@@ -13,6 +14,8 @@ from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from hive.indexer.reblog import Reblog
from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM
......@@ -20,15 +23,38 @@ from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.post_active import update_active_starting_from_posts_on_block
from concurrent.futures import ThreadPoolExecutor
log = logging.getLogger(__name__)
DB = Db.instance()
def time_collector(f):
startTime = FSM.start()
result = f()
elapsedTime = FSM.stop(startTime)
return (result, elapsedTime)
def follows_flush_helper():
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
return folllow_items
class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
_head_block_date = None # timestamp of last fully processed block ("previous block")
_current_block_date = None # timestamp of block currently being processes ("current block")
_head_block_date = None
_current_block_date = None
_concurrent_flush = [
('Posts', Posts.flush, Posts),
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Tags', Tags.flush, Tags),
('Follow', follows_flush_helper, Follow),
('Reblog', Reblog.flush, Reblog)
]
def __init__(cls):
head_date = cls.head_date()
......@@ -39,6 +65,16 @@ class Blocks:
cls._head_block_date = head_date
cls._current_block_date = head_date
@classmethod
def setup_db_access(self, sharedDbAdapter):
PostDataCache.setup_db_access(sharedDbAdapter)
Reputations.setup_db_access(sharedDbAdapter)
Votes.setup_db_access(sharedDbAdapter)
Tags.setup_db_access(sharedDbAdapter)
Follow.setup_db_access(sharedDbAdapter)
Posts.setup_db_access(sharedDbAdapter)
Reblog.setup_db_access(sharedDbAdapter)
@classmethod
def head_num(cls):
"""Get hive's head block number."""
......@@ -63,6 +99,9 @@ class Blocks:
Votes.flush()
Posts.flush()
Reblog.flush()
Follow.flush(trx=False)
Reputations.flush()
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter()
......@@ -73,6 +112,7 @@ class Blocks:
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start()
DB.query("START TRANSACTION")
last_num = 0
......@@ -98,18 +138,36 @@ class Blocks:
log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Reblog", Reblog.flush())
DB.query("COMMIT")
completedThreads = 0;
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush}
for future in concurrent.futures.as_completed(flush_futures):
(description, c) = flush_futures[future]
completedThreads = completedThreads + 1
try:
(n, elapsedTime) = future.result()
assert n is not None
assert not c.tx_active()
FSM.flush_stat(description, elapsedTime, n)
# if n > 0:
# log.info('%r flush generated %d records' % (description, n))
except Exception as exc:
log.error('%r generated an exception: %s' % (description, exc))
raise exc
pool.shutdown()
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1):
DB.query("START TRANSACTION")
cls.on_live_blocks_processed( first_block, last_num )
DB.query("COMMIT")
DB.query("COMMIT")
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
......@@ -144,6 +202,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation':
Votes.effective_comment_vote_op( op_value )
Reputations.process_vote(block_num, op_value)
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None }
......@@ -397,6 +456,6 @@ class Blocks:
update_hot_and_tranding_for_block_range( first_block, last_block )
update_active_starting_from_posts_on_block( first_block, last_block )
DB.query("SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block))
DB.query("SELECT update_hive_posts_root_id({},{})".format(first_block, last_block))
DB.query("SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block))
DB.query_no_return("SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block))
DB.query_no_return("SELECT update_hive_posts_root_id({},{})".format(first_block, last_block))
DB.query_no_return("SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block))
import logging
log = logging.getLogger(__name__)
class DbAdapterHolder(object):
db = None
_inside_tx = False
@classmethod
def setup_db_access(self, sharedDb):
self.db = sharedDb.clone()
@classmethod
def tx_active(self):
return self._inside_tx
@classmethod
def beginTx(self):
self.db.query("START TRANSACTION")
self._inside_tx = True
@classmethod
def commitTx(self):
self.db.query("COMMIT")
self._inside_tx = False
......@@ -9,9 +9,9 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.notify import Notify
log = logging.getLogger(__name__)
from hive.indexer.db_adapter_holder import DbAdapterHolder
DB = Db.instance()
log = logging.getLogger(__name__)
FOLLOWERS = 'followers'
FOLLOWING = 'following'
......@@ -66,7 +66,7 @@ def _flip_dict(dict_to_flip):
flipped[value] = [key]
return flipped
class Follow:
class Follow(DbAdapterHolder):
"""Handles processing of incoming follow ups and flushing to db."""
follow_items_to_flush = dict()
......@@ -102,7 +102,7 @@ class Follow:
else:
old_state = cls._get_follow_db_state(op['flr'], op['flg'])
# insert or update state
DB.query(FOLLOW_ITEM_INSERT_QUERY, **op)
cls.db.query(FOLLOW_ITEM_INSERT_QUERY, **op)
if new_state == 1:
Follow.follow(op['flr'], op['flg'])
if old_state is None:
......@@ -145,7 +145,7 @@ class Follow:
sql = """SELECT state FROM hive_follows
WHERE follower = :follower
AND following = :following"""
return DB.query_one(sql, follower=follower, following=following)
return cls.db.query_one(sql, follower=follower, following=following)
# -- stat tracking --
......@@ -210,7 +210,7 @@ class Follow:
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
values.clear()
values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'],
......@@ -222,7 +222,7 @@ class Follow:
if len(values) > 0:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
cls.follow_items_to_flush.clear()
......@@ -244,7 +244,7 @@ class Follow:
return 0
start = perf()
DB.batch_queries(sqls, trx=trx)
cls.db.batch_queries(sqls, trx=trx)
if trx:
log.info("[SYNC] flushed %d follow deltas in %ds",
updated, perf() - start)
......@@ -268,7 +268,7 @@ class Follow:
following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id)
WHERE id IN :ids
"""
DB.query(sql, ids=tuple(ids))
cls.db.query(sql, ids=tuple(ids))
@classmethod
def force_recount(cls):
......@@ -286,7 +286,7 @@ class Follow:
LEFT JOIN hive_follows hf ON id = hf.following AND state = 1
GROUP BY id);
"""
DB.query(sql)
cls.db.query(sql)
log.info("[SYNC] update follower counts")
sql = """
......@@ -296,4 +296,4 @@ class Follow:
UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num;
"""
DB.query(sql)
cls.db.query(sql)
import logging
import logging
from hive.utils.normalize import escape_characters
from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
DB = Db.instance()
class PostDataCache(object):
class PostDataCache(DbAdapterHolder):
""" Procides cache for DB operations on post data table in order to speed up initial sync """
_data = {}
@classmethod
def is_cached(cls, pid):
""" Check if data is cached """
......@@ -35,7 +38,7 @@ class PostDataCache(object):
sql = """
SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id;
"""
row = DB.query_row(sql, post_id = pid)
row = cls.db.query_row(sql, post_id = pid)
post_data = dict(row)
return post_data['body']
......@@ -45,6 +48,13 @@ class PostDataCache(object):
if cls._data:
values_insert = []
values_update = []
cls.beginTx()
sql = """
INSERT INTO
hive_post_data (id, title, preview, img_url, body, json)
VALUES
"""
values = []
for k, data in cls._data.items():
title = 'NULL' if data['title'] is None else "{}".format(escape_characters(data['title']))
body = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body']))
......@@ -66,7 +76,7 @@ class PostDataCache(object):
sql += ','.join(values_insert)
if print_query:
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
cls.db.query(sql)
if values_update:
sql = """
......@@ -88,7 +98,9 @@ class PostDataCache(object):
"""
if print_query:
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
cls.db.query(sql)
cls.commitTx()
n = len(cls._data.keys())
cls._data.clear()
......
......@@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive, safe_img_url
log = logging.getLogger(__name__)
DB = Db.instance()
class Posts:
class Posts(DbAdapterHolder):
"""Handles critical/core post ops and data."""
# LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries)
......@@ -224,9 +226,13 @@ class Posts:
yield lst[i:i + n]
for chunk in chunks(cls._comment_payout_ops, 1000):
cls.beginTx()
values_str = ','.join(chunk)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
cls.commitTx()
n = len(cls._comment_payout_ops)
cls._comment_payout_ops.clear()
......
......@@ -8,9 +8,7 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.notify import Notify
DB = Db.instance()
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
......@@ -42,7 +40,7 @@ INSERT_SQL = """
RETURNING post_id
"""
class Reblog():
class Reblog(DbAdapterHolder):
""" Class for reblog operations """
reblog_items_to_flush = []
......@@ -63,7 +61,7 @@ class Reblog():
return
if 'delete' in op_json and op_json['delete'] == 'delete':
row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink)
row = cls.db.query_row(DELETE_SQL, a=blogger, permlink=permlink)
if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink)
return
......@@ -72,12 +70,12 @@ class Reblog():
FeedCache.delete(result['post_id'], result['account_id'])
else:
if DbState.is_initial_sync():
row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
row = cls.db.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
result = dict(row)
cls.reblog_items_to_flush.append(result)
else:
row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
row = cls.db.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger)
......@@ -111,7 +109,7 @@ class Reblog():
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
values.clear()
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = 1
......@@ -119,6 +117,6 @@ class Reblog():
if len(values) > 0:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
cls.reblog_items_to_flush.clear()
return item_count
\ No newline at end of file
""" Reputation update support """
import logging
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
CACHED_ITEMS_LIMIT = 200
class Reputations(DbAdapterHolder):
_queries = []
@classmethod
def process_vote(self, block_num, effective_vote_op):
return
self._queries.append("\nSELECT process_reputation_data({}, '{}', '{}', '{}', {});".format(block_num, effective_vote_op['author'], effective_vote_op['permlink'],
effective_vote_op['voter'], effective_vote_op['rshares']))
@classmethod
def flush(self):
if not self._queries:
return 0
self.beginTx()
query = ""
i = 0
items = 0
for s in self._queries:
query = query + str(self._queries[i]) + ";\n"
i = i + 1
items = items + 1
if items >= CACHED_ITEMS_LIMIT:
self.db.query_no_return(query)
query = ""
items = 0
if items >= CACHED_ITEMS_LIMIT:
self.db.query_no_return(query)
query = ""
items = 0
n = len(self._queries)
self._queries.clear()
self.commitTx()
return n
"""Hive sync manager."""
from hive.indexer.reblog import Reblog
import logging
import glob
from time import perf_counter as perf
......@@ -25,6 +24,9 @@ from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow
from hive.indexer.community import Community
from hive.indexer.reblog import Reblog
from hive.indexer.reputations import Reputations
from hive.server.common.mutes import Mutes
from hive.utils.stats import OPStatusManager as OPSM
......@@ -99,6 +101,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
try:
count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
wait_time_1 = WSM.start()
......@@ -221,6 +224,7 @@ class Sync:
# ensure db schema up to date, check app status
DbState.initialize()
Blocks.setup_db_access(self._db)
# prefetch id->name and id->rank memory maps
Accounts.load_ids()
......@@ -234,12 +238,14 @@ class Sync:
# community stats
Community.recalc_pending_payouts()
sql = "SELECT num FROM hive_blocks ORDER BY num DESC LIMIT 1"
database_head_block = DbState.db().query_one(sql)
log.info("database_head_block : %s", database_head_block)
last_imported_block = Blocks.head_num()
hived_head_block = self._conf.get('test_max_block') or self._steem.last_irreversible()
log.info("database_head_block : %s", last_imported_block)
log.info("target_head_block : %s", hived_head_block)
if DbState.is_initial_sync():
last_imported_block = Blocks.head_num()
DbState.before_initial_sync(last_imported_block, hived_head_block)
# resume initial sync
self.initial()
if not CONTINUE_PROCESSING:
......@@ -315,13 +321,14 @@ class Sync:
remaining = drop(skip_lines, f)
for lines in partition_all(chunk_size, remaining):
raise RuntimeError("Sync from checkpoint disabled")
Blocks.process_multi(map(json.loads, lines), True)
# Blocks.process_multi(map(json.loads, lines), True)
last_block = num
last_read = num
def from_steemd(self, is_initial_sync=False, chunk_size=1000):
"""Fast sync strategy: read/process blocks in batches."""
steemd = self._steem
lbound = Blocks.head_num() + 1
ubound = self._conf.get('test_max_block') or steemd.last_irreversible()
......@@ -367,6 +374,7 @@ class Sync:
# debug: no max gap if disable_sync in effect
max_gap = None if self._conf.get('test_disable_sync') else 100
assert self._blocksProcessor
steemd = self._steem
hive_head = Blocks.head_num()
......
import logging
from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
DB = Db.instance()
from hive.utils.normalize import escape_characters
class Tags(object):
class Tags(DbAdapterHolder):
""" Tags cache """
_tags = []
......@@ -17,8 +17,9 @@ class Tags(object):
@classmethod
def flush(cls):
""" Flush tags to table """
""" Flush tags to table """
if cls._tags:
cls.beginTx()
limit = 1000
sql = """
......@@ -32,11 +33,11 @@ class Tags(object):
values.append("({})".format(escape_characters(tag[1])))
if len(values) >= limit:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
if len(values) > 0:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
sql = """
......@@ -62,13 +63,13 @@ class Tags(object):
values.append("({}, {})".format(tag[0], escape_characters(tag[1])))
if len(values) >= limit:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
if len(values) > 0:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
cls.commitTx()
n = len(cls._tags)
cls._tags.clear()
return n
......@@ -4,11 +4,11 @@ import logging
from hive.db.db_state import DbState
from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
DB = Db.instance()
class Votes:
class Votes(DbAdapterHolder):
""" Class for managing posts votes """
_votes_data = {}
......@@ -67,9 +67,12 @@ class Votes:
@classmethod
def flush(cls):
""" Flush vote data from cache to database """
cls.inside_flush = True
n = 0
if cls._votes_data:
cls.beginTx()
sql = """
INSERT INTO hive_votes
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective)
......@@ -110,16 +113,19 @@ class Votes:
if len(values) >= values_limit:
values_str = ','.join(values)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
values.clear()
if len(values) > 0:
values_str = ','.join(values)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
values.clear()
n = len(cls._votes_data)
cls._votes_data.clear()
cls.commitTx()
cls.inside_flush = False
return n
......@@ -21,7 +21,7 @@ async def get_post_id(db, author, permlink):
INNER JOIN hive_accounts ha_a ON ha_a.id = hp.author_id
INNER JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id
WHERE ha_a.name = :author AND hpd_p.permlink = :permlink
AND counter_deleted = 0 LIMIT 1""" # ABW: replace with find_comment_id(:author,:permlink)?
AND counter_deleted = 0 LIMIT 1""" # ABW: replace with find_comment_id(:author,:permlink,True)?
return await db.query_one(sql, author=author, permlink=permlink)
async def get_child_ids(db, post_id):
......@@ -190,8 +190,8 @@ async def pids_by_query(db, sort, start_author, start_permlink, limit, tag):
where.append(sql)
start_id = None
if start_permlink and start_author:
sql = "%s <= (SELECT %s FROM %s WHERE id = find_comment_id('{}', '{}'))".format(start_author, start_permlink)
if start_permlink or start_author:
sql = "%s <= (SELECT %s FROM %s WHERE id = find_comment_id('{}', '{}', True))".format(start_author, start_permlink)
where.append(sql % (field, raw_field, table))
sql = """
......
Subproject commit 9f6058b31adec6378ead1b15ae6c1e7bb75823f7
Subproject commit 43f46f320af704f4ac173005696ab8526e8d08f2