From 1de6c82df136cd1f9a117966c5683d7c46f38354 Mon Sep 17 00:00:00 2001 From: roadscape <roadscape@users.noreply.github.com> Date: Tue, 13 Feb 2018 15:30:41 -0600 Subject: [PATCH] cleanup, bugfixes --- grep | 2 +- hive/db/methods.py | 65 +----------------------------------- hive/db/schema.py | 21 +++++++----- hive/indexer/blocks.py | 3 +- hive/indexer/steem_client.py | 10 +++--- hive/utils/query_stats.py | 65 ++++++++++++++++++++++++++++++++++++ pylintrc | 2 +- 7 files changed, 88 insertions(+), 80 deletions(-) create mode 100644 hive/utils/query_stats.py diff --git a/grep b/grep index dff44be1c..fab21b350 100755 --- a/grep +++ b/grep @@ -1 +1 @@ -grep -rin $1 * --color=always | grep -v __pycache__ --color=auto +grep -rn $1 * --color=always | grep -v __pycache__ --color=auto diff --git a/hive/db/methods.py b/hive/db/methods.py index 80790654d..3efcf8dc2 100644 --- a/hive/db/methods.py +++ b/hive/db/methods.py @@ -1,12 +1,10 @@ import logging -import time -import re -import atexit from funcy.seqs import first from sqlalchemy import text from hive.db.schema import connect +from hive.utils.query_stats import QueryStats _conn = None def conn(): @@ -15,67 +13,6 @@ def conn(): _conn = connect(echo=False) return _conn -class QueryStats: - stats = {} - ttl_time = 0.0 - - def __init__(self): - atexit.register(QueryStats.print) - - def __call__(self, fn): - def wrap(*args, **kwargs): - time_start = time.perf_counter() - result = fn(*args, **kwargs) - time_end = time.perf_counter() - QueryStats.log(args[0], (time_end - time_start) * 1000) - return result - return wrap - - @classmethod - def log(cls, sql, ms): - nsql = cls.normalize_sql(sql) - cls.add_nsql_ms(nsql, ms) - cls.check_timing(nsql, ms) - if cls.ttl_time > 30 * 60 * 1000: - cls.print() - - @classmethod - def add_nsql_ms(cls, nsql, ms): - if nsql not in cls.stats: - cls.stats[nsql] = [ms, 1] - else: - cls.stats[nsql][0] += ms - cls.stats[nsql][1] += 1 - cls.ttl_time += ms - - @classmethod - def normalize_sql(cls, sql): - nsql = re.sub(r'\s+', ' ', sql).strip()[0:256] - nsql = re.sub(r'VALUES (\s*\([^\)]+\),?)+', 'VALUES (...)', nsql) - return nsql - - @classmethod - def check_timing(cls, nsql, ms): - if ms > 100: - print("\033[93m[SQL][%dms] %s\033[0m" % (ms, nsql[:250])) - - @classmethod - def print(cls): - if not cls.stats: - return - ttl = cls.ttl_time - print("[DEBUG] total SQL time: {}s".format(int(ttl / 1000))) - for arr in sorted(cls.stats.items(), key=lambda x: -x[1][0])[0:40]: - sql, vals = arr - ms, calls = vals - print("% 5.1f%% % 7dms % 9.2favg % 8dx -- %s" - % (100 * ms/ttl, ms, ms/calls, calls, sql[0:180])) - cls.clear() - - @classmethod - def clear(cls): - cls.stats = {} - cls.ttl_time = 0 logger = logging.getLogger(__name__) diff --git a/hive/db/schema.py b/hive/db/schema.py index f5f449841..df83d53b8 100644 --- a/hive/db/schema.py +++ b/hive/db/schema.py @@ -282,13 +282,20 @@ logging.basicConfig() # logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) -def connect(**kwargs): - connection_url = Conf.get('database_url') - return sa.create_engine(connection_url, isolation_level="READ UNCOMMITTED", pool_recycle=3600, **kwargs).connect() + +def _create_engine(echo=False): + engine = sa.create_engine( + Conf.get('database_url'), + isolation_level="READ UNCOMMITTED", # only works in mysql + pool_recycle=3600, + echo=echo) + return engine + +def connect(echo=False): + return _create_engine(echo=echo).connect() def setup(): - connection_url = Conf.get('database_url') - engine = sa.create_engine(connection_url) + engine = _create_engine(echo=True) metadata.create_all(engine) conn = engine.connect() @@ -310,10 +317,8 @@ def setup(): insert = hive_state.insert().values(block_num=0, db_version=3, steem_per_mvest=0, usd_per_steem=0, sbd_per_steem=0, dgpo='') conn.execute(insert) - def teardown(): - connection_url = Conf.get('database_url') - engine = sa.create_engine(connection_url) + engine = _create_engine(echo=True) metadata.drop_all(engine) diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py index 4d6f838e6..950e69703 100644 --- a/hive/indexer/blocks.py +++ b/hive/indexer/blocks.py @@ -1,5 +1,3 @@ -import time - from hive.db.methods import query_row, query_col, query_one, query, is_trx_active from hive.indexer.steem_client import SteemClient @@ -7,6 +5,7 @@ from hive.indexer.accounts import Accounts from hive.indexer.posts import Posts from hive.indexer.cached_post import CachedPost from hive.indexer.custom_op import CustomOp +from hive.indexer.follow import Follow class Blocks: diff --git a/hive/indexer/steem_client.py b/hive/indexer/steem_client.py index 3b5051388..8cbbe18cf 100644 --- a/hive/indexer/steem_client.py +++ b/hive/indexer/steem_client.py @@ -146,7 +146,7 @@ class SteemClient: last = self.get_block_simple(start_from - 1) head_num = self.head_block() - next_expected = time.time() + 1.5 + next_expected = time.time() start_head = head_num lag_secs = 0 @@ -177,11 +177,13 @@ class SteemClient: block_num = last['num'] + 1 block = self.get_block(block_num) if not block: - lag_secs = (lag_secs + 0.5) % 3 # tune inter-slot timing - print("[LIVE] block %d failed. delay 1/2s. head: %d/%d." - % (block_num, head_num, self.head_block())) + lag_secs = min(3, lag_secs + 0.25) # tune inter-slot timing + print("[LIVE] block %d failed. hive:%d steem:%d. lag:%f" + % (block_num, head_num, self.head_block(), lag_secs)) time.sleep(0.5) continue + else: + lag_secs = max(0, lag_secs - 0.001) last['num'] = block_num # if block doesn't link, we're forked diff --git a/hive/utils/query_stats.py b/hive/utils/query_stats.py new file mode 100644 index 000000000..014fe5027 --- /dev/null +++ b/hive/utils/query_stats.py @@ -0,0 +1,65 @@ +import time +import re +import atexit + +class QueryStats: + stats = {} + ttl_time = 0.0 + + def __init__(self): + atexit.register(QueryStats.print) + + def __call__(self, fn): + def wrap(*args, **kwargs): + time_start = time.perf_counter() + result = fn(*args, **kwargs) + time_end = time.perf_counter() + QueryStats.log(args[0], (time_end - time_start) * 1000) + return result + return wrap + + @classmethod + def log(cls, sql, ms): + nsql = cls.normalize_sql(sql) + cls.add_nsql_ms(nsql, ms) + cls.check_timing(nsql, ms) + if cls.ttl_time > 30 * 60 * 1000: + cls.print() + + @classmethod + def add_nsql_ms(cls, nsql, ms): + if nsql not in cls.stats: + cls.stats[nsql] = [ms, 1] + else: + cls.stats[nsql][0] += ms + cls.stats[nsql][1] += 1 + cls.ttl_time += ms + + @classmethod + def normalize_sql(cls, sql): + nsql = re.sub(r'\s+', ' ', sql).strip()[0:256] + nsql = re.sub(r'VALUES (\s*\([^\)]+\),?)+', 'VALUES (...)', nsql) + return nsql + + @classmethod + def check_timing(cls, nsql, ms): + if ms > 100: + print("\033[93m[SQL][%dms] %s\033[0m" % (ms, nsql[:250])) + + @classmethod + def print(cls): + if not cls.stats: + return + ttl = cls.ttl_time + print("[DEBUG] total SQL time: {}s".format(int(ttl / 1000))) + for arr in sorted(cls.stats.items(), key=lambda x: -x[1][0])[0:40]: + sql, vals = arr + ms, calls = vals + print("% 5.1f%% % 7dms % 9.2favg % 8dx -- %s" + % (100 * ms/ttl, ms, ms/calls, calls, sql[0:180])) + cls.clear() + + @classmethod + def clear(cls): + cls.stats = {} + cls.ttl_time = 0 diff --git a/pylintrc b/pylintrc index fbbe13b14..18b49f02e 100644 --- a/pylintrc +++ b/pylintrc @@ -153,7 +153,7 @@ evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / stateme [BASIC] # Good variable names which should always be accepted, separated by a comma -good-names=i,j,k,ex,Run,_,op,tx,md,e,f,ms,db +good-names=i,j,k,ex,Run,_,op,tx,md,e,f,ms,db,fn # Bad variable names which should always be refused, separated by a comma bad-names=foo,bar,baz,toto,tutu,tata -- GitLab