Commit 02b28201 authored by Dan Notestein's avatar Dan Notestein
Browse files

Merge branch 'v1.24.1-RC' into 'master'

v1.24.1 Release

See merge request !474
parents 46906b99 ebc75a89
......@@ -146,3 +146,6 @@ pghero.yml
.tmp
.private
# report file
tavern_benchmarks_report.html
......@@ -258,8 +258,9 @@ api_smoketest_benchmark:
tags:
- hivemind
script:
- ./scripts/ci_start_api_benchmarks.sh localhost $HIVEMIND_HTTP_PORT 5
- ./scripts/ci_start_api_benchmarks.sh localhost $HIVEMIND_HTTP_PORT 5 auto $CI_PROJECT_DIR/tests/tests_api/hivemind/tavern
artifacts:
when: always
paths:
- tavern_benchmarks_report.html
- request_process_times.log
......@@ -36,7 +36,7 @@ stages:
- echo "HIVEMIND_DB_NAME is $HIVEMIND_DB_NAME"
.fetch-git-tags: &fetch-git-tags
# - git fetch --tags # Looks to be unnecessary.
#- git fetch --tags # Looks to be unnecessary.
- git tag -f ci_implicit_tag # Needed to build python package
.start-timer: &start-timer
......@@ -366,8 +366,8 @@ stages:
./scripts/ci/start-api-benchmarks.sh \
localhost $RUNNER_HIVEMIND_SERVER_HTTP_PORT \
$RUNNER_BENCHMARK_ITERATIONS \
$RUNNER_BENCHMARK_JOBS
- ./scripts/xml_report_parser.py --time-threshold=1.5 . ./tests/tests_api/hivemind/tavern
$RUNNER_BENCHMARK_JOBS \
$CI_PROJECT_DIR/tests/tests_api/hivemind/tavern
.api-smoketest-benchmark:
stage: benchmark-tests
......@@ -414,6 +414,7 @@ sync-e2e-benchmark:
- hive-sync-runner-id.txt
- tavern_benchmarks_report.html
- tests/tests_api/hivemind/tavern/**/*.out.json
- request_process_times.log
reports:
junit: "*.xml"
expire_in: 7 days
......
......@@ -70,7 +70,7 @@ $ export HIVEMIND_ADDRESS=127.0.0.1
```
5. Run tests using tox:
```bash
$ tox -- -v -n auto --durations=0
$ tox -e tavern -- --workers auto --tests-per-worker auto --durations=0
```
## Production Environment
......
......@@ -37,36 +37,41 @@ def setup_logging(conf):
def run():
"""Run the service specified in the `--mode` argument."""
conf = Conf.init_argparse()
mode = conf.mode()
PrometheusClient( conf.get('prometheus_port') )
setup_logging(conf)
if mode == 'completion':
conf.generate_completion()
return
Db.set_shared_instance(conf.db())
pid_file_name = conf.pid_file()
if pid_file_name is not None:
fh = open(pid_file_name, 'w')
if fh is None:
print("Cannot write into specified pid_file: %s", pid_file_name)
else:
pid = os.getpid()
fh.write(str(pid))
fh.close()
if conf.get('test_profile'):
from hive.utils.profiler import Profiler
with Profiler():
launch_mode(mode, conf)
else:
launch_mode(mode, conf)
with Conf() as conf:
conf.init_argparse()
mode = conf.mode()
PrometheusClient( conf.get('prometheus_port') )
setup_logging(conf)
if mode == 'completion':
conf.generate_completion()
return
#Calculation of number of maximum connection and closing a database
#In next step the database will be opened with correct number of connections
Db.set_max_connections(conf.db())
conf.disconnect()
Db.set_shared_instance(conf.db())
pid_file_name = conf.pid_file()
if pid_file_name is not None:
fh = open(pid_file_name, 'w')
if fh is None:
print("Cannot write into specified pid_file: %s", pid_file_name)
else:
pid = os.getpid()
fh.write(str(pid))
fh.close()
if conf.get('test_profile'):
from hive.utils.profiler import Profiler
with Profiler():
launch_mode(mode, conf)
else:
launch_mode(mode, conf)
def launch_mode(mode, conf):
"""Launch a routine as indicated by `mode`."""
......@@ -76,7 +81,8 @@ def launch_mode(mode, conf):
elif mode == 'sync':
from hive.indexer.sync import Sync
Sync(conf=conf).run()
with Sync(conf=conf) as sync:
sync.run()
elif mode == 'status':
from hive.db.db_state import DbState
......
......@@ -9,6 +9,8 @@ from hive.db.adapter import Db
from hive.utils.normalize import strtobool, int_log_level
from hive.utils.stats import DbStats
log = logging.getLogger(__name__)
def _sanitized_conf(parser):
"""Formats parser config, redacting database url password."""
out = parser.format_values()
......@@ -17,8 +19,14 @@ def _sanitized_conf(parser):
class Conf():
""" Manages sync/server configuration via args, ENVs, and hive.conf. """
@classmethod
def init_argparse(cls, strict=True, **kwargs):
def __init__(self):
self._args = None
self._env = None
self._db = None
self._steem = None
self.arguments = None
def init_argparse(self, strict=True, **kwargs):
"""Read hive config (CLI arg > ENV var > config)"""
#pylint: disable=line-too-long
......@@ -69,11 +77,12 @@ class Conf():
args = (parser.parse_args() if strict
else parser.parse_known_args()[0])
conf = Conf(args=vars(args), arguments=parser._actions)
self._args = vars(args)
self.arguments = parser._actions
# configure logger and print config
root = logging.getLogger()
root.setLevel(conf.log_level())
root.setLevel(self.log_level())
try:
if 'auto_http_server_port' in vars(args) and vars(args)['auto_http_server_port'] is not None:
......@@ -90,7 +99,7 @@ class Conf():
# Print command line args, but on continuous integration server
# hide db connection string.
from sys import argv
if conf.get('log_mask_sensitive_data'):
if self.get('log_mask_sensitive_data'):
my_args = []
upcoming_connection_string = False
for elem in argv[1:]:
......@@ -109,23 +118,15 @@ class Conf():
#args_list = ["--" + k + " " + str(v) for k,v in vars(args).items()]
#root.info("Full command line args: %s", " ".join(args_list))
if conf.mode() == 'server':
if self.mode() == 'server':
#DbStats.SLOW_QUERY_MS = 750
DbStats.SLOW_QUERY_MS = 200 # TODO
return conf
def __enter__(self):
return self
@classmethod
def init_test(cls):
"""Initialize hive config for testing."""
return cls.init_argparse(strict=False)
def __init__(self, args, env=None, arguments=None):
self._args = args
self._env = env
self._db = None
self._steem = None
self.arguments = arguments
def __exit__(self, exc_type, value, traceback):
self.disconnect()
def args(self):
"""Get the raw Namespace object as generated by configargparse"""
......@@ -143,11 +144,12 @@ class Conf():
def db(self):
"""Get a configured instance of Db."""
if not self._db:
if self._db is None:
url = self.get('database_url')
assert url, ('--database-url (or DATABASE_URL env) not specified; '
'e.g. postgresql://user:pass@localhost:5432/hive')
self._db = Db(url)
self._db = Db(url, "root db creation")
log.info("The database created...")
return self._db
def get(self, param):
......@@ -188,3 +190,10 @@ class Conf():
f'complete -f -W "{arguments}" hive\n',
"\n"
])
def disconnect(self):
if self._db is not None:
self._db.close()
self._db.close_engine()
self._db = None
log.info("The database is disconnected...")
......@@ -18,6 +18,10 @@ class Db:
_instance = None
#maximum number of connections that is required so as to execute some tasks concurrently
necessary_connections = 15
max_connections = 1
@classmethod
def instance(cls):
"""Get the shared instance."""
......@@ -29,7 +33,17 @@ class Db:
"""Set the global/shared db instance. Do not use."""
cls._instance = db
def __init__(self, url):
@classmethod
def set_max_connections(cls, db):
"""Remember maximum connections offered by postgres database."""
assert db is not None, "Database has to be initialized"
cls.max_connections = db.query_one("SELECT setting::int FROM pg_settings WHERE name = 'max_connections'")
if cls.necessary_connections > cls.max_connections:
log.info("A database offers only {} connections, but it's required {} connections".format(cls.max_connections, cls.necessary_connections))
else:
log.info("A database offers maximum connections: {}. Required {} connections.".format(cls.max_connections, cls.necessary_connections))
def __init__(self, url, name):
"""Initialize an instance.
No work is performed here. Some modues might initialize an
......@@ -38,38 +52,75 @@ class Db:
assert url, ('--database-url (or DATABASE_URL env) not specified; '
'e.g. postgresql://user:pass@localhost:5432/hive')
self._url = url
self._conn = None
self._conn = []
self._engine = None
self._trx_active = False
self._prep_sql = {}
self._conn = self.engine().connect()
self.name = name
self._conn.append( { "connection" : self.engine().connect(), "name" : name } )
# Since we need to manage transactions ourselves, yet the
# core behavior of DBAPI (per PEP-0249) is that a transaction
# is always in progress, this COMMIT is a workaround to get
# back control (and used with autocommit=False query exec).
self._exec = self._conn.execute
self._exec = self.get_connection(0).execute
self._exec(sqlalchemy.text("COMMIT"))
def clone(self):
return Db(self._url)
def clone(self, name):
cloned = Db(self._url, name)
cloned._engine = self._engine
return cloned
def close(self):
"""Close connection."""
try:
for item in self._conn:
if item is not None:
log.info("Closing database connection: '{}'".format(item['name']))
item['connection'].close()
item = None
self._conn = []
except Exception as ex:
log.exception("Error during connections closing: {}".format(ex))
raise ex
def close_engine(self):
"""Dispose db instance."""
try:
if self._engine is not None:
log.info("Disposing SQL engine")
self._engine.dispose()
self._engine = None
else:
log.info("SQL engine was already disposed")
except Exception as ex:
log.exception("Error during database closing: {}".format(ex))
raise ex
def get_connection(self, number):
assert len(self._conn) > number, "Incorrect number of connection. total: {} number: {}".format(len(self._conn), number)
assert 'connection' in self._conn[number], 'Incorrect construction of db connection'
return self._conn[number]['connection']
def engine(self):
"""Lazy-loaded SQLAlchemy engine."""
if not self._engine:
pool_size = os.cpu_count()
if pool_size > 5:
pool_size = pool_size - 1
else:
pool_size = 5
if self._engine is None:
self._engine = sqlalchemy.create_engine(
self._url,
isolation_level="READ UNCOMMITTED", # only supported in mysql
pool_size=pool_size,
pool_size=self.max_connections,
pool_recycle=3600,
echo=False)
return self._engine
def get_new_connection(self, name):
self._conn.append( { "connection" : self.engine().connect(), "name" : name } )
return self.get_connection(len(self._conn) - 1)
def get_dialect(self):
return self.get_connection(0).dialect
def is_trx_active(self):
"""Check if a transaction is in progress."""
return self._trx_active
......@@ -113,10 +164,10 @@ class Db:
def engine_name(self):
"""Get the name of the engine (e.g. `postgresql`, `mysql`)."""
engine = self._conn.dialect.name
if engine not in ['postgresql', 'mysql']:
raise Exception("db engine %s not supported" % engine)
return engine
_engine_name = self.get_dialect().name
if _engine_name not in ['postgresql', 'mysql']:
raise Exception("db engine %s not supported" % _engine_name)
return _engine_name
def batch_queries(self, queries, trx):
"""Process batches of prepared SQL tuples.
......
This diff is collapsed.
......@@ -13,9 +13,6 @@ log = logging.getLogger(__name__)
#pylint: disable=line-too-long, too-many-lines, bad-whitespace
# [DK] we changed and removed some tables so i upgraded DB_VERSION to 18
DB_VERSION = 18
def build_metadata():
"""Build schema def with SqlAlchemy"""
metadata = sa.MetaData()
......@@ -139,7 +136,7 @@ def build_metadata():
sa.Index('hive_posts_root_id_id_idx', 'root_id','id'),
sa.Index('hive_posts_parent_id_counter_deleted_id_idx', 'parent_id', 'counter_deleted', 'id'),
sa.Index('hive_posts_parent_id_id_idx', sa.text('parent_id, id DESC'), postgresql_where=sql_text("counter_deleted = 0")),
sa.Index('hive_posts_community_id_id_idx', 'community_id', sa.text('id DESC')),
sa.Index('hive_posts_payout_at_idx', 'payout_at'),
......@@ -440,7 +437,7 @@ def drop_fk(db):
def create_fk(db):
from sqlalchemy.schema import AddConstraint
from sqlalchemy import text
connection = db.engine().connect()
connection = db.get_new_connection('create_fk')
connection.execute(text("START TRANSACTION"))
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
......@@ -463,7 +460,7 @@ def setup(db):
# default rows
sqls = [
"INSERT INTO hive_state (block_num, db_version, steem_per_mvest, usd_per_steem, sbd_per_steem, dgpo) VALUES (0, %d, 0, 0, 0, '')" % DB_VERSION,
"INSERT INTO hive_state (block_num, db_version, steem_per_mvest, usd_per_steem, sbd_per_steem, dgpo) VALUES (0, 0, 0, 0, 0, '')",
"INSERT INTO hive_blocks (num, hash, created_at) VALUES (0, '0000000000000000000000000000000000000000', '2016-03-24 16:04:57')",
"INSERT INTO hive_permlink_data (id, permlink) VALUES (0, '')",
......@@ -537,15 +534,6 @@ def setup(db):
);
"""
db.query_no_return(sql)
sql = """
INSERT INTO hive_db_patch_level
(patch_date, patched_to_revision)
values
(now(), '{}');
"""
from hive.version import GIT_REVISION
db.query_no_return(sql.format(GIT_REVISION))
# max_time_stamp definition moved into utility_functions.sql
......@@ -561,6 +549,7 @@ def setup(db):
"hive_muted_accounts_view.sql",
"hive_muted_accounts_by_id_view.sql",
"hive_blacklisted_accounts_by_observer_view.sql",
"get_post_view_by_id.sql",
"hive_post_operations.sql",
"head_block_time.sql",
"update_feed_cache.sql",
......@@ -604,15 +593,33 @@ def setup(db):
"bridge_get_account_posts_by_blog.sql",
"condenser_get_names_by_reblogged.sql",
"condenser_get_account_reputations.sql",
"bridge_get_community.sql",
"bridge_get_community_context.sql",
"bridge_list_all_subscriptions.sql",
"bridge_list_communities.sql",
"bridge_list_community_roles.sql",
"bridge_list_pop_communities.sql",
"bridge_list_subscribers.sql",
"update_follow_count.sql",
"delete_reblog_feed_cache.sql"
"delete_reblog_feed_cache.sql",
"follows.sql",
"upgrade/update_db_patchlevel.sql" #Additionally execute db patchlevel import to mark (already done) upgrade changes and avoid its reevaluation during next upgrade.
]
from os.path import dirname, realpath
dir_path = dirname(realpath(__file__))
for script in sql_scripts:
execute_sql_script(db.query_no_return, "{}/sql_scripts/{}".format(dir_path, script))
# Move this part here, to mark latest db patch level as current Hivemind revision (which just created schema).
sql = """
INSERT INTO hive_db_patch_level
(patch_date, patched_to_revision)
values
(now(), '{}');
"""
from hive.version import GIT_REVISION
db.query_no_return(sql.format(GIT_REVISION))
......
......@@ -23,7 +23,26 @@ BEGIN
WHERE hfc.account_id = __account_id AND hfc.post_id = __post_id;
END IF;
RETURN QUERY SELECT -- bridge_get_account_posts_by_blog
RETURN QUERY
WITH blog AS -- bridge_get_account_posts_by_blog
(
SELECT
hfc.post_id,
hfc.created_at
FROM hive_feed_cache hfc
WHERE hfc.account_id = __account_id
AND ( __post_id = 0 OR hfc.created_at < __created_at
OR (hfc.created_at = __created_at AND hfc.post_id < __post_id) )
AND ( NOT _bridge_api OR
NOT EXISTS (SELECT NULL FROM live_posts_comments_view hp1 --should this just be live_posts_view?
WHERE hp1.id = hfc.post_id AND hp1.community_id IS NOT NULL
AND NOT EXISTS (SELECT NULL FROM hive_reblogs hr WHERE hr.blogger_id = __account_id AND hr.post_id = hp1.id)
)
)
ORDER BY hfc.created_at DESC, hfc.post_id DESC
LIMIT _limit
)
SELECT
hp.id,
hp.author,
hp.parent_author,
......@@ -62,21 +81,8 @@ BEGIN
hp.curator_payout_value,
hp.is_muted,
NULL
FROM hive_posts_view hp
JOIN
(
SELECT hfc.post_id, hfc.created_at
FROM hive_feed_cache hfc
WHERE hfc.account_id = __account_id AND ( __post_id = 0 OR hfc.created_at < __created_at OR ( hfc.created_at = __created_at AND hfc.post_id < __post_id ) )
AND ( NOT _bridge_api OR
NOT EXISTS (SELECT NULL FROM hive_posts hp1
WHERE hp1.id = hfc.post_id AND hp1.counter_deleted = 0 AND hp1.depth = 0 AND hp1.community_id IS NOT NULL
AND NOT EXISTS (SELECT NULL FROM hive_reblogs hr WHERE hr.blogger_id = __account_id AND hr.post_id = hp1.id)
)
)
ORDER BY hfc.created_at DESC, hfc.post_id DESC
LIMIT _limit
) blog ON hp.id = blog.post_id
FROM blog,
LATERAL get_post_view_by_id(blog.post_id) hp
ORDER BY blog.created_at DESC, blog.post_id DESC
LIMIT _limit;
END
......
......@@ -10,7 +10,17 @@ DECLARE
BEGIN
__account_id = find_account_id( _account, True );
__post_id = find_comment_id( _author, _permlink, True );
RETURN QUERY SELECT
RETURN QUERY
WITH ds AS --bridge_get_account_posts_by_comments
(
SELECT hp1.id
FROM live_comments_view hp1
WHERE hp1.author_id = __account_id
AND (__post_id = 0 OR hp1.id < __post_id)
ORDER BY hp1.id DESC
LIMIT _limit
)
SELECT
hp.id,
hp.author,
hp.parent_author,
......@@ -49,16 +59,9 @@ BEGIN
hp.curator_payout_value,
hp.is_muted,
NULL
FROM
(
SELECT hp1.id
FROM hive_posts hp1
WHERE hp1.author_id = __account_id AND hp1.counter_deleted = 0 AND hp1.depth > 0 AND ( __post_id = 0 OR hp1.id < __post_id )
ORDER BY hp1.id DESC
LIMIT _limit
) ds
JOIN hive_posts_view hp ON ds.id = hp.id
ORDER BY hp.id DESC
FROM ds,
LATERAL get_post_view_by_id(ds.id) hp
ORDER BY ds.id DESC
LIMIT _limit;
END
$function$
......
......@@ -14,7 +14,22 @@ BEGIN
IF __post_id <> 0 THEN
SELECT ( hp.payout + hp.pending_payout ) INTO __payout_limit FROM hive_posts hp WHERE hp.id = __post_id;
END IF;
RETURN QUERY SELECT
RETURN QUERY
WITH payouts AS
(
SELECT
id,
(hp.payout + hp.pending_payout) as total_payout
FROM live_posts_comments_view hp
WHERE
hp.author_id = __account_id
AND NOT hp.is_paidout
AND ( __post_id = 0 OR (hp.payout + hp.pending_payout) < __payout_limit
OR ((hp.payout + hp.pending_payout) = __payout_limit AND hp.id < __post_id) )
ORDER BY (hp.payout + hp.pending_payout) DESC, hp.id DESC
LIMIT _limit
)
SELECT
hp.id,
hp.author,
hp.parent_author,
......@@ -53,12 +68,9 @@ BEGIN
hp.curator_payout_value,
hp.is_muted,
NULL
FROM
hive_posts_view hp
WHERE
hp.author_id = __account_id AND NOT hp.is_paidout
AND ( __post_id = 0 OR ( hp.payout + hp.pending_payout ) < __payout_limit OR ( ( hp.payout + hp.pending_payout ) = __payout_limit AND hp.id < __post_id ) )
ORDER BY ( hp.payout + hp.pending_payout ) DESC, hp.id DESC
FROM payouts,
LATERAL get_post_view_by_id(payouts.id) hp
ORDER BY payouts.total_payout DESC, payouts.id DESC
LIMIT _limit;
END
$function$
......
......@@ -10,7 +10,18 @@ DECLARE
BEGIN
__account_id = find_account_id( _account, True );
__post_id = find_comment_id( _author, _permlink, True );
RETURN QUERY SELECT
RETURN QUERY
WITH posts AS
(
SELECT id
FROM live_posts_view hp
WHERE
hp.author_id = __account_id
AND ( __post_id = 0 OR hp.id < __post_id )
ORDER BY hp.id DESC