Something went wrong on our end
-
Bartek Wrona authored
Eliminated performance problems in the above implementation. Defined 2 new indexes: hive_feed_cache_block_num_idx and hive_feed_cache_created_at_idx.
Bartek Wrona authoredEliminated performance problems in the above implementation. Defined 2 new indexes: hive_feed_cache_block_num_idx and hive_feed_cache_created_at_idx.
db_state.py 21.21 KiB
"""Hive db state manager. Check if schema loaded, init synced, etc."""
#pylint: disable=too-many-lines
import time
from time import perf_counter
import logging
import sqlalchemy
from hive.db.schema import (setup, reset_autovac, set_logged_table_attribute, build_metadata,
build_metadata_community, teardown, DB_VERSION)
from hive.db.adapter import Db
from hive.utils.post_active import update_active_starting_from_posts_on_block
from hive.utils.communities_rank import update_communities_posts_and_rank
from hive.server.common.payout_stats import PayoutStats
log = logging.getLogger(__name__)
SYNCED_BLOCK_LIMIT = 7*24*1200 # 7 days
class DbState:
"""Manages database state: sync status, migrations, etc."""
_db = None
# prop is true until initial sync complete
_is_initial_sync = True
# db schema version
_ver = None
@classmethod
def initialize(cls):
"""Perform startup database checks.
1) Load schema if needed
2) Run migrations if needed
3) Check if initial sync has completed
"""
log.info("[INIT] Welcome to hive!")
# create db schema if needed
if not cls._is_schema_loaded():
log.info("[INIT] Create db schema...")
setup(cls.db())
# perform db migrations
cls._check_migrations()
# check if initial sync complete
cls._is_initial_sync = True
log.info("[INIT] Continue with initial sync...")
@classmethod
def teardown(cls):
"""Drop all tables in db."""
teardown(cls.db())
@classmethod
def db(cls):
"""Get a db adapter instance."""
if not cls._db:
cls._db = Db.instance()
return cls._db
@classmethod
def finish_initial_sync(cls, current_imported_block):
"""Set status to initial sync complete."""
assert cls._is_initial_sync, "initial sync was not started."
cls._after_initial_sync(current_imported_block)
cls._is_initial_sync = False
log.info("[INIT] Initial sync complete!")
@classmethod
def is_initial_sync(cls):
"""Check if we're still in the process of initial sync."""
return cls._is_initial_sync
@classmethod
def _all_foreign_keys(cls):
md = build_metadata()
out = []
for table in md.tables.values():
out.extend(table.foreign_keys)
return out
@classmethod
def _disableable_indexes(cls):
to_locate = [
'hive_blocks_created_at_idx',
'hive_feed_cache_block_num_idx',
'hive_feed_cache_created_at_idx',
'hive_follows_ix5a', # (following, state, created_at, follower)
'hive_follows_ix5b', # (follower, state, created_at, following)
'hive_follows_block_num_idx',
'hive_follows_created_at_idx',
'hive_posts_parent_id_idx',
'hive_posts_depth_idx',
'hive_posts_author_id_created_at_idx',
'hive_posts_root_id_id_idx',
'hive_posts_community_id_idx',
'hive_posts_category_id_idx',
'hive_posts_payout_at_idx',
'hive_posts_payout_idx',
'hive_posts_promoted_idx',
'hive_posts_sc_trend_id_is_paidout_idx',
'hive_posts_sc_hot_id_is_paidout_idx',
'hive_posts_block_num_idx',
'hive_posts_block_num_created_idx',
'hive_posts_cashout_time_id_idx',
'hive_posts_updated_at_idx',
'hive_posts_payout_plus_pending_payout_id_is_paidout_idx',
'hive_posts_api_helper_author_s_permlink_idx',
'hive_votes_voter_id_last_update_idx',
'hive_votes_block_num_idx',
'hive_subscriptions_block_num_idx',
'hive_subscriptions_community_idx',
'hive_communities_block_num_idx',
'hive_reblogs_created_at_idx',
'hive_post_tags_tag_id_idx',
'hive_votes_voter_id_post_id_idx',
'hive_votes_post_id_voter_id_idx',
'hive_reputation_data_block_num_idx',
'hive_notification_cache_block_num_idx',
'hive_notification_cache_dst_score_idx'
]
to_return = []
md = build_metadata()
for table in md.tables.values():
for index in table.indexes:
if index.name not in to_locate:
continue
to_locate.remove(index.name)
to_return.append(index)
# ensure we found all the items we expected
assert not to_locate, "indexes not located: {}".format(to_locate)
return to_return
@classmethod
def has_index(cls, idx_name):
sql = "SELECT count(*) FROM pg_class WHERE relname = :relname"
count = cls.db().query_one(sql, relname=idx_name)
if count == 1:
return True
else:
return False
@classmethod
def processing_indexes(cls, is_pre_process, drop, create):
DB = cls.db()
engine = DB.engine()
log.info("[INIT] Begin %s-initial sync hooks", "pre" if is_pre_process else "post")
for index in cls._disableable_indexes():
log.info("%s index %s.%s", ("Drop" if is_pre_process else "Recreate"), index.table, index.name)
try:
if drop:
if cls.has_index(index.name):
time_start = perf_counter()
index.drop(engine)
end_time = perf_counter()
elapsed_time = end_time - time_start
log.info("Index %s dropped in time %.4f s", index.name, elapsed_time)
except sqlalchemy.exc.ProgrammingError as ex:
log.warning("Ignoring ex: {}".format(ex))
if create and cls.has_index(index.name) == False:
time_start = perf_counter()
index.create(engine)
end_time = perf_counter()
elapsed_time = end_time - time_start
log.info("Index %s created in time %.4f s", index.name, elapsed_time)
@classmethod
def before_initial_sync(cls, last_imported_block, hived_head_block):
"""Routine which runs *once* after db setup.
Disables non-critical indexes for faster initial sync, as well
as foreign key constraints."""
to_sync = hived_head_block - last_imported_block
if to_sync < SYNCED_BLOCK_LIMIT:
log.info("[INIT] Skipping pre-initial sync hooks")
return
#is_pre_process, drop, create
cls.processing_indexes( True, True, False )
from hive.db.schema import drop_fk, set_logged_table_attribute
log.info("Dropping FKs")
drop_fk(cls.db())
# intentionally disabled since it needs a lot of WAL disk space when switching back to LOGGED
#set_logged_table_attribute(cls.db(), False)
log.info("[INIT] Finish pre-initial sync hooks")
@classmethod
def update_work_mem(cls, workmem_value):
row = cls.db().query_row("SHOW work_mem")
current_work_mem = row['work_mem']
sql = """
DO $$
BEGIN
EXECUTE 'ALTER DATABASE '||current_database()||' SET work_mem TO "{}"';
END
$$;
"""
cls.db().query_no_return(sql.format(workmem_value))
return current_work_mem
@classmethod
def _after_initial_sync(cls, current_imported_block):
"""Routine which runs *once* after initial sync.
Re-creates non-core indexes for serving APIs after init sync,
as well as all foreign keys."""
last_imported_block = DbState.db().query_one("SELECT block_num FROM hive_state LIMIT 1")
log.info("[INIT] Current imported block: %s Last imported block: %s", current_imported_block, last_imported_block)
if last_imported_block > current_imported_block:
last_imported_block = current_imported_block
synced_blocks = current_imported_block - last_imported_block
force_index_rebuild = False
massive_sync_preconditions = False
if synced_blocks >= SYNCED_BLOCK_LIMIT:
force_index_rebuild = True
massive_sync_preconditions = True
#is_pre_process, drop, create
cls.processing_indexes( False, force_index_rebuild, True )
current_work_mem = cls.update_work_mem('2GB')
time_start = perf_counter()
if massive_sync_preconditions:
# Update count of all child posts (what was hold during initial sync)
sql = """
select update_all_hive_posts_children_count()
"""
row = DbState.db().query_row(sql)
time_end = perf_counter()
log.info("[INIT] update_all_hive_posts_children_count executed in %.4fs", time_end - time_start)
else:
# Update count of child posts processed during partial sync (what was hold during initial sync)
sql = """
select update_hive_posts_children_count({}, {})
""".format(last_imported_block, current_imported_block)
row = DbState.db().query_row(sql)
time_end = perf_counter()
log.info("[INIT] update_hive_posts_children_count executed in %.4fs", time_end - time_start)
time_start = perf_counter()
# Update root_id all root posts
sql = """
select update_hive_posts_root_id({}, {})
""".format(last_imported_block, current_imported_block)
row = DbState.db().query_row(sql)
time_end = perf_counter()
log.info("[INIT] update_hive_posts_root_id executed in %.4fs", time_end - time_start)
time_start = perf_counter()
# Update root_id all root posts
sql = """
select update_hive_posts_api_helper({}, {})
""".format(last_imported_block, current_imported_block)
row = DbState.db().query_row(sql)
time_end = perf_counter()
log.info("[INIT] update_hive_posts_api_helper executed in %.4fs", time_end - time_start)
time_start = perf_counter()
update_active_starting_from_posts_on_block(last_imported_block, current_imported_block)
time_end = perf_counter()
log.info("[INIT] update_all_posts_active executed in %.4fs", time_end - time_start)
time_start = perf_counter()
sql = """
SELECT update_feed_cache({}, {});
""".format(last_imported_block, current_imported_block)
DbState.db().query_no_return(sql)
time_end = perf_counter()
log.info("[INIT] update_feed_cache executed in %.4fs", time_end - time_start)
time_start = perf_counter()
sql = """
SELECT update_hive_posts_mentions({}, {});
""".format(last_imported_block, current_imported_block)
DbState.db().query_no_return(sql)
time_end = perf_counter()
log.info("[INIT] update_hive_posts_mentions executed in %.4fs", time_end - time_start)
time_start = perf_counter()
PayoutStats.generate()
time_end = perf_counter()
log.info("[INIT] filling payout_stats_view executed in %.4fs", time_end - time_start)
time_start = perf_counter()
sql = """
SELECT update_account_reputations({}, {});
""".format(last_imported_block, current_imported_block)
DbState.db().query_no_return(sql)
time_end = perf_counter()
log.info("[INIT] update_account_reputations executed in %.4fs", time_end - time_start)
time_start = perf_counter()
update_communities_posts_and_rank()
time_end = perf_counter()
log.info("[INIT] update_communities_posts_and_rank executed in %.4fs", time_end - time_start)
time_start = perf_counter()
sql = """
SELECT update_posts_rshares({}, {});
""".format(last_imported_block, current_imported_block)
DbState.db().query_no_return(sql)
time_end = perf_counter()
log.info("[INIT] update_posts_rshares executed in %.4fs", time_end - time_start)
time_start = perf_counter()
sql = """
SELECT update_notification_cache(NULL, NULL, False);
"""
DbState.db().query_no_return(sql)
time_end = perf_counter()
log.info("[INIT] update_notification_cache executed in %.4fs", time_end - time_start)
# Update a block num immediately
DbState.db().query_no_return("UPDATE hive_state SET block_num = :block_num", block_num = current_imported_block)
cls.update_work_mem(current_work_mem)
if massive_sync_preconditions:
from hive.db.schema import create_fk, set_logged_table_attribute
# intentionally disabled since it needs a lot of WAL disk space when switching back to LOGGED
#set_logged_table_attribute(cls.db(), True)
log.info("Recreating FKs")
create_fk(cls.db())
@staticmethod
def status():
"""Basic health status: head block/time, current age (secs)."""
sql = ("SELECT num, created_at, extract(epoch from created_at) ts "
"FROM hive_blocks ORDER BY num DESC LIMIT 1")
row = DbState.db().query_row(sql)
return dict(db_head_block=row['num'],
db_head_time=str(row['created_at']),
db_head_age=int(time.time() - row['ts']))
@classmethod
def _is_schema_loaded(cls):
"""Check if the schema has been loaded into db yet."""
# check if database has been initialized (i.e. schema loaded)
engine = cls.db().engine_name()
if engine == 'postgresql':
return bool(cls.db().query_one("""
SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public'
"""))
if engine == 'mysql':
return bool(cls.db().query_one('SHOW TABLES'))
raise Exception("unknown db engine %s" % engine)
@classmethod
def _is_feed_cache_empty(cls):
"""Check if the hive_feed_cache table is empty.
If empty, it indicates that the initial sync has not finished.
"""
return not cls.db().query_one("SELECT 1 FROM hive_feed_cache LIMIT 1")
@classmethod
def _check_migrations(cls):
"""Check current migration version and perform updates as needed."""
#pylint: disable=line-too-long,too-many-branches,too-many-statements
cls._ver = cls.db().query_one("SELECT db_version FROM hive_state LIMIT 1")
assert cls._ver is not None, 'could not load state record'
if cls._ver == 0:
raise Exception("dbv cannot be 0; reindex required")
if cls._ver == 1:
cls._set_ver(2)
if cls._ver == 2:
cls._set_ver(3)
if cls._ver == 3:
cls._set_ver(4)
if cls._ver == 4:
cls.db().query("CREATE INDEX hive_follows_ix4 ON hive_follows (follower, following) WHERE state = 2")
cls._set_ver(5)
if cls._ver == 5:
# recover acct names lost to issue #151
from hive.steem.client import SteemClient
from hive.indexer.accounts import Accounts
names = SteemClient().get_all_account_names()
Accounts.load_ids()
Accounts.register(names, None, '1970-01-01T00:00:00', 0)
Accounts.clear_ids()
cls._set_ver(6)
if cls._ver == 6:
cls.db().query("DROP INDEX hive_posts_cache_ix6")
#cls.db().query("CREATE INDEX hive_posts_cache_ix6a ON hive_posts_cache (sc_trend, post_id) WHERE is_paidout = '0'")
#cls.db().query("CREATE INDEX hive_posts_cache_ix6b ON hive_posts_cache (post_id, sc_trend) WHERE is_paidout = '0'")
#cls.db().query("DROP INDEX hive_posts_cache_ix7")
#cls.db().query("CREATE INDEX hive_posts_cache_ix7a ON hive_posts_cache (sc_hot, post_id) WHERE is_paidout = '0'")
#cls.db().query("CREATE INDEX hive_posts_cache_ix7b ON hive_posts_cache (post_id, sc_hot) WHERE is_paidout = '0'")
cls._set_ver(7)
if cls._ver == 7:
cls.db().query("DROP INDEX IF EXISTS hive_accounts_ix4; CREATE INDEX hive_accounts_ix4 ON hive_accounts (id, name)")
cls._set_ver(8)
if cls._ver == 8:
cls.db().query("DROP INDEX hive_follows_ix2")
cls.db().query("DROP INDEX hive_follows_ix3")
cls.db().query("DROP INDEX hive_follows_ix4")
cls.db().query("CREATE INDEX hive_follows_5a ON hive_follows (following, state, created_at, follower)")
cls.db().query("CREATE INDEX hive_follows_5b ON hive_follows (follower, state, created_at, following)")
cls._set_ver(9)
if cls._ver == 9:
from hive.indexer.follow import Follow
Follow.force_recount()
cls._set_ver(10)
if cls._ver == 10:
#cls.db().query("CREATE INDEX hive_posts_cache_ix8 ON hive_posts_cache (category, payout, depth) WHERE is_paidout = '0'")
#cls.db().query("CREATE INDEX hive_posts_cache_ix9a ON hive_posts_cache (depth, payout, post_id) WHERE is_paidout = '0'")
#cls.db().query("CREATE INDEX hive_posts_cache_ix9b ON hive_posts_cache (category, depth, payout, post_id) WHERE is_paidout = '0'")
cls._set_ver(11)
if cls._ver == 11:
cls.db().query("DROP INDEX hive_posts_ix1")
cls.db().query("DROP INDEX hive_posts_ix2")
cls.db().query("CREATE INDEX hive_posts_ix3 ON hive_posts (author, depth, id) WHERE counter_deleted = 0")
cls.db().query("CREATE INDEX hive_posts_ix4 ON hive_posts (parent_id, id) WHERE counter_deleted = 0")
cls._set_ver(12)
if cls._ver == 12: # community schema
assert False, 'not finalized'
for table in ['hive_members', 'hive_flags', 'hive_modlog',
'hive_communities', 'hive_subscriptions',
'hive_roles', 'hive_notifs']:
cls.db().query("DROP TABLE IF EXISTS %s" % table)
build_metadata_community().create_all(cls.db().engine())
cls.db().query("ALTER TABLE hive_accounts ADD COLUMN lr_notif_id integer")
cls.db().query("ALTER TABLE hive_posts DROP CONSTRAINT hive_posts_fk2")
cls.db().query("ALTER TABLE hive_posts DROP COLUMN community")
cls.db().query("ALTER TABLE hive_posts ADD COLUMN community_id integer")
cls.db().query("ALTER TABLE hive_posts_cache ADD COLUMN community_id integer")
cls._set_ver(13)
if cls._ver == 13:
sqls = ("CREATE INDEX hive_posts_ix5 ON hive_posts (id) WHERE is_pinned = '1' AND counter_deleted = 0",
"CREATE INDEX hive_posts_ix6 ON hive_posts (community_id, id) WHERE community_id IS NOT NULL AND is_pinned = '1' AND counter_deleted = 0",)
for sql in sqls:
cls.db().query(sql)
cls._set_ver(14)
if cls._ver == 14:
cls.db().query("ALTER TABLE hive_communities ADD COLUMN primary_tag VARCHAR(32) NOT NULL DEFAULT ''")
cls.db().query("ALTER TABLE hive_communities ADD COLUMN category VARCHAR(32) NOT NULL DEFAULT ''")
cls.db().query("ALTER TABLE hive_communities ADD COLUMN avatar_url VARCHAR(1024) NOT NULL DEFAULT ''")
cls.db().query("ALTER TABLE hive_communities ADD COLUMN num_authors INTEGER NOT NULL DEFAULT 0")
#cls.db().query("CREATE INDEX hive_posts_cache_ix20 ON hive_posts_cache (community_id, author, payout, post_id) WHERE is_paidout = '0'")
cls._set_ver(15)
if cls._ver == 15:
cls.db().query("ALTER TABLE hive_accounts DROP COLUMN lr_notif_id")
cls.db().query("ALTER TABLE hive_accounts ADD COLUMN lastread_at TIMESTAMP WITHOUT TIME ZONE DEFAULT '1970-01-01 00:00:00' NOT NULL")
cls.db().query("CREATE INDEX hive_notifs_ix6 ON hive_notifs (dst_id, created_at, score, id) WHERE dst_id IS NOT NULL")
cls._set_ver(16)
if cls._ver == 16:
cls.db().query("CREATE INDEX hive_communities_ft1 ON hive_communities USING GIN (to_tsvector('english', title || ' ' || about))")
cls._set_ver(17)
if cls._ver == 17:
cls.db().query("INSERT INTO hive_accounts (name, created_at) VALUES ('', '1970-01-01T00:00:00') ON CONFLICT (name) DO NOTHING")
cls.db().query("INSERT INTO hive_permlink_data (permlink) VALUES ('') ON CONFLICT (permlink) DO NOTHING")
cls.db().query("INSERT INTO hive_category_data (category) VALUES ('') ON CONFLICT (category) DO NOTHING")
cls._set_ver(18)
reset_autovac(cls.db())
log.info("[HIVE] db version: %d", cls._ver)
assert cls._ver == DB_VERSION, "migration missing or invalid DB_VERSION"
# Example migration:
#if cls._ver == 1:
# cls.db().query("ALTER TABLE hive_posts ALTER COLUMN author SET DEFAULT ''")
# cls._set_ver(2)
@classmethod
def _set_ver(cls, ver):
"""Sets the db/schema version number. Enforce sequential."""
assert cls._ver is not None, 'version needs to be read before updating'
assert ver == cls._ver + 1, 'version must follow previous'
cls.db().query("UPDATE hive_state SET db_version = %d" % ver)
cls._ver = ver
log.info("[HIVE] db migrated to version: %d", ver)