Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/hivemind
1 result
Show changes
Commits on Source (12)
...@@ -284,23 +284,23 @@ follow_api_smoketest: ...@@ -284,23 +284,23 @@ follow_api_smoketest:
reports: reports:
junit: api_smoketest_follow_api.xml junit: api_smoketest_follow_api.xml
tags_api_smoketest_old: tags_api_smoketest:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_tags_api_patterns.tavern.yaml api_smoketest_tags_api_old.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_patterns/ api_smoketest_tags_api.xml
artifacts: artifacts:
reports: reports:
junit: api_smoketest_tags_api_old.xml junit: api_smoketest_tags_api.xml
tags_api_smoketest: tags_api_smoketest_negative:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_patterns/ api_smoketest_tags_api.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_negative/ api_smoketest_tags_api_negative.xml
artifacts: artifacts:
reports: reports:
junit: api_smoketest_tags_api.xml junit: api_smoketest_tags_api_negative.xml
...@@ -50,6 +50,9 @@ class Db: ...@@ -50,6 +50,9 @@ class Db:
self._exec = self._conn.execute self._exec = self._conn.execute
self._exec(sqlalchemy.text("COMMIT")) self._exec(sqlalchemy.text("COMMIT"))
def clone(self):
return Db(self._url)
def engine(self): def engine(self):
"""Lazy-loaded SQLAlchemy engine.""" """Lazy-loaded SQLAlchemy engine."""
if not self._engine: if not self._engine:
......
...@@ -8,7 +8,7 @@ from time import perf_counter ...@@ -8,7 +8,7 @@ from time import perf_counter
import logging import logging
import sqlalchemy import sqlalchemy
from hive.db.schema import (setup, reset_autovac, build_metadata, from hive.db.schema import (setup, reset_autovac, set_logged_table_attribute, build_metadata,
build_metadata_community, teardown, DB_VERSION) build_metadata_community, teardown, DB_VERSION)
from hive.db.adapter import Db from hive.db.adapter import Db
...@@ -46,8 +46,6 @@ class DbState: ...@@ -46,8 +46,6 @@ class DbState:
log.info("[INIT] Create db schema...") log.info("[INIT] Create db schema...")
setup(cls.db()) setup(cls.db())
cls._before_initial_sync()
# perform db migrations # perform db migrations
cls._check_migrations() cls._check_migrations()
...@@ -91,15 +89,12 @@ class DbState: ...@@ -91,15 +89,12 @@ class DbState:
@classmethod @classmethod
def _disableable_indexes(cls): def _disableable_indexes(cls):
to_locate = [ to_locate = [
#'hive_posts_ix3', # (author, depth, id)
#'hive_posts_ix4', # (parent_id, id, counter_deleted=0)
#'hive_posts_ix5', # (community_id>0, is_pinned=1)
'hive_follows_ix5a', # (following, state, created_at, follower) 'hive_follows_ix5a', # (following, state, created_at, follower)
'hive_follows_ix5b', # (follower, state, created_at, following) 'hive_follows_ix5b', # (follower, state, created_at, following)
# 'hive_posts_parent_id_idx', 'hive_posts_parent_id_idx',
'hive_posts_depth_idx', 'hive_posts_depth_idx',
'hive_posts_created_at_idx',
'hive_posts_root_id_id_idx', 'hive_posts_root_id_id_idx',
'hive_posts_community_id_idx', 'hive_posts_community_id_idx',
...@@ -115,20 +110,9 @@ class DbState: ...@@ -115,20 +110,9 @@ class DbState:
'hive_votes_voter_id_idx', 'hive_votes_voter_id_idx',
'hive_votes_block_num_idx', 'hive_votes_block_num_idx',
#'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0) 'hive_accounts_ix5', # (cached_at, name)
#'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0)
#'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0) 'hive_post_tags_tag_id_idx'
#'hive_posts_cache_ix7b', # (post_id, sc_hot, paidout=0)
#'hive_posts_cache_ix8', # (category, payout, depth, paidout=0)
#'hive_posts_cache_ix9a', # (depth, payout, post_id, paidout=0)
#'hive_posts_cache_ix9b', # (category, depth, payout, post_id, paidout=0)
#'hive_posts_cache_ix10', # (post_id, payout, gray=1, payout>0)
#'hive_posts_cache_ix30', # API: community trend
#'hive_posts_cache_ix31', # API: community hot
#'hive_posts_cache_ix32', # API: community created
#'hive_posts_cache_ix33', # API: community payout
#'hive_posts_cache_ix34', # API: community muted
'hive_accounts_ix5' # (cached_at, name)
] ]
to_return = [] to_return = []
...@@ -145,12 +129,18 @@ class DbState: ...@@ -145,12 +129,18 @@ class DbState:
return to_return return to_return
@classmethod @classmethod
def _before_initial_sync(cls): def before_initial_sync(cls, last_imported_block, hived_head_block):
"""Routine which runs *once* after db setup. """Routine which runs *once* after db setup.
Disables non-critical indexes for faster initial sync, as well Disables non-critical indexes for faster initial sync, as well
as foreign key constraints.""" as foreign key constraints."""
to_sync = hived_head_block - last_imported_block
if to_sync < SYNCED_BLOCK_LIMIT:
log.info("[INIT] Skipping pre-initial sync hooks")
return
engine = cls.db().engine() engine = cls.db().engine()
log.info("[INIT] Begin pre-initial sync hooks") log.info("[INIT] Begin pre-initial sync hooks")
...@@ -162,13 +152,30 @@ class DbState: ...@@ -162,13 +152,30 @@ class DbState:
except sqlalchemy.exc.ProgrammingError as ex: except sqlalchemy.exc.ProgrammingError as ex:
log.warning("Ignoring ex: {}".format(ex)) log.warning("Ignoring ex: {}".format(ex))
# TODO: #111 from hive.db.schema import drop_fk, set_logged_table_attribute
#for key in cls._all_foreign_keys(): log.info("Dropping FKs")
# log.info("Drop fk %s", key.name) drop_fk(cls.db())
# key.drop(engine)
set_logged_table_attribute(cls.db(), False)
log.info("[INIT] Finish pre-initial sync hooks") log.info("[INIT] Finish pre-initial sync hooks")
@classmethod
def update_work_mem(cls, workmem_value):
row = cls.db().query_row("SHOW work_mem")
current_work_mem = row['work_mem']
sql = """
DO $$
BEGIN
EXECUTE 'ALTER DATABASE '||current_database()||' SET work_mem TO "{}"';
END
$$;
"""
cls.db().query_no_return(sql.format(workmem_value))
return current_work_mem
@classmethod @classmethod
def _after_initial_sync(cls, current_imported_block, last_imported_block): def _after_initial_sync(cls, current_imported_block, last_imported_block):
"""Routine which runs *once* after initial sync. """Routine which runs *once* after initial sync.
...@@ -197,6 +204,8 @@ class DbState: ...@@ -197,6 +204,8 @@ class DbState:
else: else:
log.info("[INIT] Post-initial sync hooks skipped") log.info("[INIT] Post-initial sync hooks skipped")
current_work_mem = cls.update_work_mem('2GB')
time_start = perf_counter() time_start = perf_counter()
# Update count of all child posts (what was hold during initial sync) # Update count of all child posts (what was hold during initial sync)
...@@ -243,10 +252,15 @@ class DbState: ...@@ -243,10 +252,15 @@ class DbState:
time_end = perf_counter() time_end = perf_counter()
log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start) log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start)
# TODO: #111
#for key in cls._all_foreign_keys(): cls.update_work_mem(current_work_mem)
# log.info("Create fk %s", key.name)
# key.create(engine) if synced_blocks >= SYNCED_BLOCK_LIMIT:
from hive.db.schema import create_fk, set_logged_table_attribute
set_logged_table_attribute(cls.db(), True)
log.info("Recreating FKs")
create_fk(cls.db())
@staticmethod @staticmethod
def status(): def status():
......
...@@ -8,6 +8,9 @@ from sqlalchemy.types import VARCHAR ...@@ -8,6 +8,9 @@ from sqlalchemy.types import VARCHAR
from sqlalchemy.types import TEXT from sqlalchemy.types import TEXT
from sqlalchemy.types import BOOLEAN from sqlalchemy.types import BOOLEAN
import logging
log = logging.getLogger(__name__)
#pylint: disable=line-too-long, too-many-lines, bad-whitespace #pylint: disable=line-too-long, too-many-lines, bad-whitespace
# [DK] we changed and removed some tables so i upgraded DB_VERSION to 18 # [DK] we changed and removed some tables so i upgraded DB_VERSION to 18
...@@ -61,6 +64,23 @@ def build_metadata(): ...@@ -61,6 +64,23 @@ def build_metadata():
sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep
) )
sa.Table(
'hive_reputation_data', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('author_id', sa.Integer, nullable=False),
sa.Column('voter_id', sa.Integer, nullable=False),
sa.Column('permlink', sa.String(255, collation='C'), nullable=False),
sa.Column('rshares', sa.BigInteger, nullable=False),
sa.Column('block_num', sa.Integer, nullable=False),
# sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
# sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']),
# sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']),
sa.UniqueConstraint('author_id', 'permlink', 'voter_id', name='hive_reputation_data_uk')
)
sa.Table( sa.Table(
'hive_posts', metadata, 'hive_posts', metadata,
sa.Column('id', sa.Integer, primary_key=True), sa.Column('id', sa.Integer, primary_key=True),
...@@ -183,11 +203,11 @@ def build_metadata(): ...@@ -183,11 +203,11 @@ def build_metadata():
sa.PrimaryKeyConstraint('author_id', 'permlink_id', 'voter_id', name='hive_votes_pk'), sa.PrimaryKeyConstraint('author_id', 'permlink_id', 'voter_id', name='hive_votes_pk'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_votes_fk1'),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id'], name='hive_votes_fk2'),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id'], name='hive_votes_fk3'),
sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']), sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id'], name='hive_votes_fk4'),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']), sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_votes_fk5'),
sa.Index('hive_votes_post_id_idx', 'post_id'), sa.Index('hive_votes_post_id_idx', 'post_id'),
sa.Index('hive_votes_voter_id_idx', 'voter_id'), sa.Index('hive_votes_voter_id_idx', 'voter_id'),
...@@ -206,9 +226,10 @@ def build_metadata(): ...@@ -206,9 +226,10 @@ def build_metadata():
sa.Column('post_id', sa.Integer, nullable=False), sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('tag_id', sa.Integer, nullable=False), sa.Column('tag_id', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'), sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']),
sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id']), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_post_tags_fk1'),
sa.Index('hive_post_tags_post_id_idx', 'post_id'), sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id'], name='hive_post_tags_fk2'),
sa.Index('hive_post_tags_tag_id_idx', 'tag_id') sa.Index('hive_post_tags_tag_id_idx', 'tag_id')
) )
...@@ -377,6 +398,24 @@ def teardown(db): ...@@ -377,6 +398,24 @@ def teardown(db):
"""Drop all tables""" """Drop all tables"""
build_metadata().drop_all(db.engine()) build_metadata().drop_all(db.engine())
def drop_fk(db):
db.query_no_return("START TRANSACTION")
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
sql = """ALTER TABLE {} DROP CONSTRAINT IF EXISTS {}""".format(table.name, fk.name)
db.query_no_return(sql)
db.query_no_return("COMMIT")
def create_fk(db):
from sqlalchemy.schema import AddConstraint
from sqlalchemy import text
connection = db.engine().connect()
connection.execute(text("START TRANSACTION"))
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
connection.execute(AddConstraint(fk.constraint))
connection.execute(text("COMMIT"))
def setup(db): def setup(db):
"""Creates all tables and seed data""" """Creates all tables and seed data"""
# initialize schema # initialize schema
...@@ -1372,6 +1411,64 @@ def setup(db): ...@@ -1372,6 +1411,64 @@ def setup(db):
""" """
db.query_no_return(sql) db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS process_reputation_data(in _block_num hive_blocks.num%TYPE, in _author hive_accounts.name%TYPE,
in _permlink hive_permlink_data.permlink%TYPE, in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE)
;
CREATE OR REPLACE FUNCTION process_reputation_data(in _block_num hive_blocks.num%TYPE,
in _author hive_accounts.name%TYPE, in _permlink hive_permlink_data.permlink%TYPE,
in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE)
RETURNS void
LANGUAGE sql
VOLATILE
AS $BODY$
WITH __insert_info AS (
INSERT INTO hive_reputation_data
(author_id, voter_id, permlink, block_num, rshares)
--- Warning DISTINCT is needed here since we have to strict join to hv table and there is really made a CROSS JOIN
--- between ha and hv records (producing 2 duplicated records)
SELECT DISTINCT ha.id as author_id, hv.id as voter_id, _permlink, _block_num, _rshares
FROM hive_accounts ha
JOIN hive_accounts hv ON hv.name = _voter
JOIN hive_posts hp ON hp.author_id = ha.id
JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE hpd.permlink = _permlink
AND ha.name = _author
AND NOT hp.is_paidout --- voting on paidout posts shall have no effect
AND hv.reputation >= 0 --- voter's negative reputation eliminates vote from processing
AND (_rshares >= 0
OR (hv.reputation >= (ha.reputation - COALESCE((SELECT (hrd.rshares >> 6) -- if previous vote was a downvote we need to correct author reputation before current comparison to voter's reputation
FROM hive_reputation_data hrd
WHERE hrd.author_id = ha.id
AND hrd.voter_id=hv.id
AND hrd.permlink=_permlink
AND hrd.rshares < 0), 0)))
)
ON CONFLICT ON CONSTRAINT hive_reputation_data_uk DO
UPDATE SET
rshares = EXCLUDED.rshares
RETURNING (xmax = 0) AS is_new_vote,
(SELECT hrd.rshares
FROM hive_reputation_data hrd
--- Warning we want OLD row here, not both, so we're using old ID to select old one (new record has different value) !!!
WHERE hrd.id = hive_reputation_data.id AND hrd.author_id = author_id and hrd.voter_id=voter_id and hrd.permlink=_permlink) AS old_rshares, author_id, voter_id
)
UPDATE hive_accounts uha
SET reputation = CASE __insert_info.is_new_vote
WHEN true THEN ha.reputation + (_rshares >> 6)
ELSE ha.reputation - (__insert_info.old_rshares >> 6) + (_rshares >> 6)
END
FROM hive_accounts ha
JOIN __insert_info ON ha.id = __insert_info.author_id
WHERE uha.id = __insert_info.author_id
;
$BODY$;
"""
db.query_no_return(sql)
def reset_autovac(db): def reset_autovac(db):
"""Initializes/resets per-table autovacuum/autoanalyze params. """Initializes/resets per-table autovacuum/autoanalyze params.
...@@ -1403,9 +1500,28 @@ def set_fillfactor(db): ...@@ -1403,9 +1500,28 @@ def set_fillfactor(db):
fillfactor_config = { fillfactor_config = {
'hive_posts': 70, 'hive_posts': 70,
'hive_post_data': 70, 'hive_post_data': 70,
'hive_votes': 70 'hive_votes': 70,
'hive_reputation_data': 50
} }
for table, fillfactor in fillfactor_config.items(): for table, fillfactor in fillfactor_config.items():
sql = """ALTER TABLE {} SET (FILLFACTOR = {})""" sql = """ALTER TABLE {} SET (FILLFACTOR = {})"""
db.query(sql.format(table, fillfactor)) db.query(sql.format(table, fillfactor))
def set_logged_table_attribute(db, logged):
"""Initializes/resets LOGGED/UNLOGGED attribute for tables which are intesively updated"""
logged_config = [
'hive_accounts',
'hive_permlink_data',
'hive_post_tags',
'hive_posts',
'hive_post_data',
'hive_votes',
'hive_reputation_data'
]
for table in logged_config:
log.info("Setting {} attribute on a table: {}".format('LOGGED' if logged else 'UNLOGGED', table))
sql = """ALTER TABLE {} SET {}"""
db.query_no_return(sql.format(table, 'LOGGED' if logged else 'UNLOGGED'))
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
from hive.indexer.reblog import Reblog from hive.indexer.reblog import Reblog
import logging import logging
import concurrent
from hive.db.adapter import Db from hive.db.adapter import Db
...@@ -13,6 +14,8 @@ from hive.indexer.follow import Follow ...@@ -13,6 +14,8 @@ from hive.indexer.follow import Follow
from hive.indexer.votes import Votes from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from hive.indexer.reblog import Reblog
from time import perf_counter from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM from hive.utils.stats import OPStatusManager as OPSM
...@@ -20,15 +23,38 @@ from hive.utils.stats import FlushStatusManager as FSM ...@@ -20,15 +23,38 @@ from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.trends import update_hot_and_tranding_for_block_range from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.post_active import update_active_starting_from_posts_on_block from hive.utils.post_active import update_active_starting_from_posts_on_block
from concurrent.futures import ThreadPoolExecutor
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance() DB = Db.instance()
def time_collector(f):
startTime = FSM.start()
result = f()
elapsedTime = FSM.stop(startTime)
return (result, elapsedTime)
def follows_flush_helper():
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
return folllow_items
class Blocks: class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table.""" """Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = [] blocks_to_flush = []
_head_block_date = None # timestamp of last fully processed block ("previous block") _head_block_date = None
_current_block_date = None # timestamp of block currently being processes ("current block") _current_block_date = None
_concurrent_flush = [
('Posts', Posts.flush, Posts),
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Tags', Tags.flush, Tags),
('Follow', follows_flush_helper, Follow),
('Reblog', Reblog.flush, Reblog)
]
def __init__(cls): def __init__(cls):
head_date = cls.head_date() head_date = cls.head_date()
...@@ -39,6 +65,16 @@ class Blocks: ...@@ -39,6 +65,16 @@ class Blocks:
cls._head_block_date = head_date cls._head_block_date = head_date
cls._current_block_date = head_date cls._current_block_date = head_date
@classmethod
def setup_db_access(self, sharedDbAdapter):
PostDataCache.setup_db_access(sharedDbAdapter)
Reputations.setup_db_access(sharedDbAdapter)
Votes.setup_db_access(sharedDbAdapter)
Tags.setup_db_access(sharedDbAdapter)
Follow.setup_db_access(sharedDbAdapter)
Posts.setup_db_access(sharedDbAdapter)
Reblog.setup_db_access(sharedDbAdapter)
@classmethod @classmethod
def head_num(cls): def head_num(cls):
"""Get hive's head block number.""" """Get hive's head block number."""
...@@ -63,6 +99,9 @@ class Blocks: ...@@ -63,6 +99,9 @@ class Blocks:
Votes.flush() Votes.flush()
Posts.flush() Posts.flush()
Reblog.flush() Reblog.flush()
Follow.flush(trx=False)
Reputations.flush()
block_num = int(block['block_id'][:8], base=16) block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num ) cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter() time_end = perf_counter()
...@@ -73,6 +112,7 @@ class Blocks: ...@@ -73,6 +112,7 @@ class Blocks:
def process_multi(cls, blocks, vops, hived, is_initial_sync=False): def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction.""" """Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start() time_start = OPSM.start()
DB.query("START TRANSACTION") DB.query("START TRANSACTION")
last_num = 0 last_num = 0
...@@ -98,18 +138,36 @@ class Blocks: ...@@ -98,18 +138,36 @@ class Blocks:
log.info("#############################################################################") log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks()) flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush()) DB.query("COMMIT")
flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) completedThreads = 0;
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush()) pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
flush_time = register_time(flush_time, "Reblog", Reblog.flush()) flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush}
for future in concurrent.futures.as_completed(flush_futures):
(description, c) = flush_futures[future]
completedThreads = completedThreads + 1
try:
(n, elapsedTime) = future.result()
assert n is not None
assert not c.tx_active()
FSM.flush_stat(description, elapsedTime, n)
# if n > 0:
# log.info('%r flush generated %d records' % (description, n))
except Exception as exc:
log.error('%r generated an exception: %s' % (description, exc))
raise exc
pool.shutdown()
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1): if (not is_initial_sync) and (first_block > -1):
DB.query("START TRANSACTION")
cls.on_live_blocks_processed( first_block, last_num ) cls.on_live_blocks_processed( first_block, last_num )
DB.query("COMMIT")
DB.query("COMMIT")
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
...@@ -144,6 +202,7 @@ class Blocks: ...@@ -144,6 +202,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation': elif op_type == 'effective_comment_vote_operation':
Votes.effective_comment_vote_op( op_value ) Votes.effective_comment_vote_op( op_value )
Reputations.process_vote(block_num, op_value)
if key not in comment_payout_ops: if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None } comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None }
...@@ -397,6 +456,6 @@ class Blocks: ...@@ -397,6 +456,6 @@ class Blocks:
update_hot_and_tranding_for_block_range( first_block, last_block ) update_hot_and_tranding_for_block_range( first_block, last_block )
update_active_starting_from_posts_on_block( first_block, last_block ) update_active_starting_from_posts_on_block( first_block, last_block )
DB.query("SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block)) DB.query_no_return("SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block))
DB.query("SELECT update_hive_posts_root_id({},{})".format(first_block, last_block)) DB.query_no_return("SELECT update_hive_posts_root_id({},{})".format(first_block, last_block))
DB.query("SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block)) DB.query_no_return("SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block))
import logging
log = logging.getLogger(__name__)
class DbAdapterHolder(object):
db = None
_inside_tx = False
@classmethod
def setup_db_access(self, sharedDb):
self.db = sharedDb.clone()
@classmethod
def tx_active(self):
return self._inside_tx
@classmethod
def beginTx(self):
self.db.query("START TRANSACTION")
self._inside_tx = True
@classmethod
def commitTx(self):
self.db.query("COMMIT")
self._inside_tx = False
...@@ -9,9 +9,9 @@ from hive.db.db_state import DbState ...@@ -9,9 +9,9 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts from hive.indexer.accounts import Accounts
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
log = logging.getLogger(__name__) from hive.indexer.db_adapter_holder import DbAdapterHolder
DB = Db.instance() log = logging.getLogger(__name__)
FOLLOWERS = 'followers' FOLLOWERS = 'followers'
FOLLOWING = 'following' FOLLOWING = 'following'
...@@ -66,7 +66,7 @@ def _flip_dict(dict_to_flip): ...@@ -66,7 +66,7 @@ def _flip_dict(dict_to_flip):
flipped[value] = [key] flipped[value] = [key]
return flipped return flipped
class Follow: class Follow(DbAdapterHolder):
"""Handles processing of incoming follow ups and flushing to db.""" """Handles processing of incoming follow ups and flushing to db."""
follow_items_to_flush = dict() follow_items_to_flush = dict()
...@@ -102,7 +102,7 @@ class Follow: ...@@ -102,7 +102,7 @@ class Follow:
else: else:
old_state = cls._get_follow_db_state(op['flr'], op['flg']) old_state = cls._get_follow_db_state(op['flr'], op['flg'])
# insert or update state # insert or update state
DB.query(FOLLOW_ITEM_INSERT_QUERY, **op) cls.db.query(FOLLOW_ITEM_INSERT_QUERY, **op)
if new_state == 1: if new_state == 1:
Follow.follow(op['flr'], op['flg']) Follow.follow(op['flr'], op['flg'])
if old_state is None: if old_state is None:
...@@ -145,7 +145,7 @@ class Follow: ...@@ -145,7 +145,7 @@ class Follow:
sql = """SELECT state FROM hive_follows sql = """SELECT state FROM hive_follows
WHERE follower = :follower WHERE follower = :follower
AND following = :following""" AND following = :following"""
return DB.query_one(sql, follower=follower, following=following) return cls.db.query_one(sql, follower=follower, following=following)
# -- stat tracking -- # -- stat tracking --
...@@ -210,7 +210,7 @@ class Follow: ...@@ -210,7 +210,7 @@ class Follow:
else: else:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) cls.db.query(query)
values.clear() values.clear()
values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'], values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'], follow_item['at'], follow_item['state'],
...@@ -222,7 +222,7 @@ class Follow: ...@@ -222,7 +222,7 @@ class Follow:
if len(values) > 0: if len(values) > 0:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) cls.db.query(query)
cls.follow_items_to_flush.clear() cls.follow_items_to_flush.clear()
...@@ -244,7 +244,7 @@ class Follow: ...@@ -244,7 +244,7 @@ class Follow:
return 0 return 0
start = perf() start = perf()
DB.batch_queries(sqls, trx=trx) cls.db.batch_queries(sqls, trx=trx)
if trx: if trx:
log.info("[SYNC] flushed %d follow deltas in %ds", log.info("[SYNC] flushed %d follow deltas in %ds",
updated, perf() - start) updated, perf() - start)
...@@ -268,7 +268,7 @@ class Follow: ...@@ -268,7 +268,7 @@ class Follow:
following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id) following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id)
WHERE id IN :ids WHERE id IN :ids
""" """
DB.query(sql, ids=tuple(ids)) cls.db.query(sql, ids=tuple(ids))
@classmethod @classmethod
def force_recount(cls): def force_recount(cls):
...@@ -286,7 +286,7 @@ class Follow: ...@@ -286,7 +286,7 @@ class Follow:
LEFT JOIN hive_follows hf ON id = hf.following AND state = 1 LEFT JOIN hive_follows hf ON id = hf.following AND state = 1
GROUP BY id); GROUP BY id);
""" """
DB.query(sql) cls.db.query(sql)
log.info("[SYNC] update follower counts") log.info("[SYNC] update follower counts")
sql = """ sql = """
...@@ -296,4 +296,4 @@ class Follow: ...@@ -296,4 +296,4 @@ class Follow:
UPDATE hive_accounts SET following = num FROM following_counts UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num; WHERE id = account_id AND following != num;
""" """
DB.query(sql) cls.db.query(sql)
import logging import logging
import logging
from hive.utils.normalize import escape_characters from hive.utils.normalize import escape_characters
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance()
class PostDataCache(object): class PostDataCache(DbAdapterHolder):
""" Procides cache for DB operations on post data table in order to speed up initial sync """ """ Procides cache for DB operations on post data table in order to speed up initial sync """
_data = {} _data = {}
@classmethod @classmethod
def is_cached(cls, pid): def is_cached(cls, pid):
""" Check if data is cached """ """ Check if data is cached """
...@@ -35,7 +38,7 @@ class PostDataCache(object): ...@@ -35,7 +38,7 @@ class PostDataCache(object):
sql = """ sql = """
SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id; SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id;
""" """
row = DB.query_row(sql, post_id = pid) row = cls.db.query_row(sql, post_id = pid)
post_data = dict(row) post_data = dict(row)
return post_data['body'] return post_data['body']
...@@ -45,6 +48,13 @@ class PostDataCache(object): ...@@ -45,6 +48,13 @@ class PostDataCache(object):
if cls._data: if cls._data:
values_insert = [] values_insert = []
values_update = [] values_update = []
cls.beginTx()
sql = """
INSERT INTO
hive_post_data (id, title, preview, img_url, body, json)
VALUES
"""
values = []
for k, data in cls._data.items(): for k, data in cls._data.items():
title = 'NULL' if data['title'] is None else "{}".format(escape_characters(data['title'])) title = 'NULL' if data['title'] is None else "{}".format(escape_characters(data['title']))
body = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body'])) body = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body']))
...@@ -66,7 +76,7 @@ class PostDataCache(object): ...@@ -66,7 +76,7 @@ class PostDataCache(object):
sql += ','.join(values_insert) sql += ','.join(values_insert)
if print_query: if print_query:
log.info("Executing query:\n{}".format(sql)) log.info("Executing query:\n{}".format(sql))
DB.query(sql) cls.db.query(sql)
if values_update: if values_update:
sql = """ sql = """
...@@ -88,7 +98,9 @@ class PostDataCache(object): ...@@ -88,7 +98,9 @@ class PostDataCache(object):
""" """
if print_query: if print_query:
log.info("Executing query:\n{}".format(sql)) log.info("Executing query:\n{}".format(sql))
DB.query(sql) cls.db.query(sql)
cls.commitTx()
n = len(cls._data.keys()) n = len(cls._data.keys())
cls._data.clear() cls._data.clear()
......
...@@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE ...@@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags from hive.indexer.tags import Tags
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive, safe_img_url from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive, safe_img_url
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance() DB = Db.instance()
class Posts: class Posts(DbAdapterHolder):
"""Handles critical/core post ops and data.""" """Handles critical/core post ops and data."""
# LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries) # LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries)
...@@ -224,9 +226,13 @@ class Posts: ...@@ -224,9 +226,13 @@ class Posts:
yield lst[i:i + n] yield lst[i:i + n]
for chunk in chunks(cls._comment_payout_ops, 1000): for chunk in chunks(cls._comment_payout_ops, 1000):
cls.beginTx()
values_str = ','.join(chunk) values_str = ','.join(chunk)
actual_query = sql.format(values_str) actual_query = sql.format(values_str)
DB.query(actual_query) cls.db.query(actual_query)
cls.commitTx()
n = len(cls._comment_payout_ops) n = len(cls._comment_payout_ops)
cls._comment_payout_ops.clear() cls._comment_payout_ops.clear()
......
...@@ -8,9 +8,7 @@ from hive.db.db_state import DbState ...@@ -8,9 +8,7 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache from hive.indexer.feed_cache import FeedCache
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
from hive.indexer.db_adapter_holder import DbAdapterHolder
DB = Db.instance()
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -42,7 +40,7 @@ INSERT_SQL = """ ...@@ -42,7 +40,7 @@ INSERT_SQL = """
RETURNING post_id RETURNING post_id
""" """
class Reblog(): class Reblog(DbAdapterHolder):
""" Class for reblog operations """ """ Class for reblog operations """
reblog_items_to_flush = [] reblog_items_to_flush = []
...@@ -63,7 +61,7 @@ class Reblog(): ...@@ -63,7 +61,7 @@ class Reblog():
return return
if 'delete' in op_json and op_json['delete'] == 'delete': if 'delete' in op_json and op_json['delete'] == 'delete':
row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink) row = cls.db.query_row(DELETE_SQL, a=blogger, permlink=permlink)
if row is None: if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink) log.debug("reblog: post not found: %s/%s", author, permlink)
return return
...@@ -72,12 +70,12 @@ class Reblog(): ...@@ -72,12 +70,12 @@ class Reblog():
FeedCache.delete(result['post_id'], result['account_id']) FeedCache.delete(result['post_id'], result['account_id'])
else: else:
if DbState.is_initial_sync(): if DbState.is_initial_sync():
row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) row = cls.db.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None: if row is not None:
result = dict(row) result = dict(row)
cls.reblog_items_to_flush.append(result) cls.reblog_items_to_flush.append(result)
else: else:
row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num) row = cls.db.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None: if row is not None:
author_id = Accounts.get_id(author) author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger) blogger_id = Accounts.get_id(blogger)
...@@ -111,7 +109,7 @@ class Reblog(): ...@@ -111,7 +109,7 @@ class Reblog():
else: else:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) cls.db.query(query)
values.clear() values.clear()
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"])) values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = 1 count = 1
...@@ -119,6 +117,6 @@ class Reblog(): ...@@ -119,6 +117,6 @@ class Reblog():
if len(values) > 0: if len(values) > 0:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) cls.db.query(query)
cls.reblog_items_to_flush.clear() cls.reblog_items_to_flush.clear()
return item_count return item_count
\ No newline at end of file
""" Reputation update support """
import logging
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
CACHED_ITEMS_LIMIT = 200
class Reputations(DbAdapterHolder):
_queries = []
@classmethod
def process_vote(self, block_num, effective_vote_op):
return
self._queries.append("\nSELECT process_reputation_data({}, '{}', '{}', '{}', {});".format(block_num, effective_vote_op['author'], effective_vote_op['permlink'],
effective_vote_op['voter'], effective_vote_op['rshares']))
@classmethod
def flush(self):
if not self._queries:
return 0
self.beginTx()
query = ""
i = 0
items = 0
for s in self._queries:
query = query + str(self._queries[i]) + ";\n"
i = i + 1
items = items + 1
if items >= CACHED_ITEMS_LIMIT:
self.db.query_no_return(query)
query = ""
items = 0
if items >= CACHED_ITEMS_LIMIT:
self.db.query_no_return(query)
query = ""
items = 0
n = len(self._queries)
self._queries.clear()
self.commitTx()
return n
"""Hive sync manager.""" """Hive sync manager."""
from hive.indexer.reblog import Reblog
import logging import logging
import glob import glob
from time import perf_counter as perf from time import perf_counter as perf
...@@ -25,6 +24,9 @@ from hive.indexer.accounts import Accounts ...@@ -25,6 +24,9 @@ from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow from hive.indexer.follow import Follow
from hive.indexer.community import Community from hive.indexer.community import Community
from hive.indexer.reblog import Reblog
from hive.indexer.reputations import Reputations
from hive.server.common.mutes import Mutes from hive.server.common.mutes import Mutes
from hive.utils.stats import OPStatusManager as OPSM from hive.utils.stats import OPStatusManager as OPSM
...@@ -99,6 +101,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun ...@@ -99,6 +101,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
try: try:
count = ubound - lbound count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps']) timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound: while lbound < ubound:
wait_time_1 = WSM.start() wait_time_1 = WSM.start()
...@@ -221,6 +224,7 @@ class Sync: ...@@ -221,6 +224,7 @@ class Sync:
# ensure db schema up to date, check app status # ensure db schema up to date, check app status
DbState.initialize() DbState.initialize()
Blocks.setup_db_access(self._db)
# prefetch id->name and id->rank memory maps # prefetch id->name and id->rank memory maps
Accounts.load_ids() Accounts.load_ids()
...@@ -234,12 +238,14 @@ class Sync: ...@@ -234,12 +238,14 @@ class Sync:
# community stats # community stats
Community.recalc_pending_payouts() Community.recalc_pending_payouts()
sql = "SELECT num FROM hive_blocks ORDER BY num DESC LIMIT 1" last_imported_block = Blocks.head_num()
database_head_block = DbState.db().query_one(sql) hived_head_block = self._conf.get('test_max_block') or self._steem.last_irreversible()
log.info("database_head_block : %s", database_head_block)
log.info("database_head_block : %s", last_imported_block)
log.info("target_head_block : %s", hived_head_block)
if DbState.is_initial_sync(): if DbState.is_initial_sync():
last_imported_block = Blocks.head_num() DbState.before_initial_sync(last_imported_block, hived_head_block)
# resume initial sync # resume initial sync
self.initial() self.initial()
if not CONTINUE_PROCESSING: if not CONTINUE_PROCESSING:
...@@ -315,13 +321,14 @@ class Sync: ...@@ -315,13 +321,14 @@ class Sync:
remaining = drop(skip_lines, f) remaining = drop(skip_lines, f)
for lines in partition_all(chunk_size, remaining): for lines in partition_all(chunk_size, remaining):
raise RuntimeError("Sync from checkpoint disabled") raise RuntimeError("Sync from checkpoint disabled")
Blocks.process_multi(map(json.loads, lines), True) # Blocks.process_multi(map(json.loads, lines), True)
last_block = num last_block = num
last_read = num last_read = num
def from_steemd(self, is_initial_sync=False, chunk_size=1000): def from_steemd(self, is_initial_sync=False, chunk_size=1000):
"""Fast sync strategy: read/process blocks in batches.""" """Fast sync strategy: read/process blocks in batches."""
steemd = self._steem steemd = self._steem
lbound = Blocks.head_num() + 1 lbound = Blocks.head_num() + 1
ubound = self._conf.get('test_max_block') or steemd.last_irreversible() ubound = self._conf.get('test_max_block') or steemd.last_irreversible()
...@@ -367,6 +374,7 @@ class Sync: ...@@ -367,6 +374,7 @@ class Sync:
# debug: no max gap if disable_sync in effect # debug: no max gap if disable_sync in effect
max_gap = None if self._conf.get('test_disable_sync') else 100 max_gap = None if self._conf.get('test_disable_sync') else 100
assert self._blocksProcessor
steemd = self._steem steemd = self._steem
hive_head = Blocks.head_num() hive_head = Blocks.head_num()
......
import logging import logging
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance()
from hive.utils.normalize import escape_characters from hive.utils.normalize import escape_characters
class Tags(object): class Tags(DbAdapterHolder):
""" Tags cache """ """ Tags cache """
_tags = [] _tags = []
...@@ -17,8 +17,9 @@ class Tags(object): ...@@ -17,8 +17,9 @@ class Tags(object):
@classmethod @classmethod
def flush(cls): def flush(cls):
""" Flush tags to table """ """ Flush tags to table """
if cls._tags: if cls._tags:
cls.beginTx()
limit = 1000 limit = 1000
sql = """ sql = """
...@@ -32,11 +33,11 @@ class Tags(object): ...@@ -32,11 +33,11 @@ class Tags(object):
values.append("({})".format(escape_characters(tag[1]))) values.append("({})".format(escape_characters(tag[1])))
if len(values) >= limit: if len(values) >= limit:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
if len(values) > 0: if len(values) > 0:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
sql = """ sql = """
...@@ -62,13 +63,13 @@ class Tags(object): ...@@ -62,13 +63,13 @@ class Tags(object):
values.append("({}, {})".format(tag[0], escape_characters(tag[1]))) values.append("({}, {})".format(tag[0], escape_characters(tag[1])))
if len(values) >= limit: if len(values) >= limit:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
if len(values) > 0: if len(values) > 0:
tag_query = str(sql) tag_query = str(sql)
DB.query(tag_query.format(','.join(values))) cls.db.query(tag_query.format(','.join(values)))
values.clear() values.clear()
cls.commitTx()
n = len(cls._tags) n = len(cls._tags)
cls._tags.clear() cls._tags.clear()
return n return n
...@@ -4,11 +4,11 @@ import logging ...@@ -4,11 +4,11 @@ import logging
from hive.db.db_state import DbState from hive.db.db_state import DbState
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance()
class Votes: class Votes(DbAdapterHolder):
""" Class for managing posts votes """ """ Class for managing posts votes """
_votes_data = {} _votes_data = {}
...@@ -67,9 +67,12 @@ class Votes: ...@@ -67,9 +67,12 @@ class Votes:
@classmethod @classmethod
def flush(cls): def flush(cls):
""" Flush vote data from cache to database """ """ Flush vote data from cache to database """
cls.inside_flush = True cls.inside_flush = True
n = 0 n = 0
if cls._votes_data: if cls._votes_data:
cls.beginTx()
sql = """ sql = """
INSERT INTO hive_votes INSERT INTO hive_votes
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective) (post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective)
...@@ -110,16 +113,19 @@ class Votes: ...@@ -110,16 +113,19 @@ class Votes:
if len(values) >= values_limit: if len(values) >= values_limit:
values_str = ','.join(values) values_str = ','.join(values)
actual_query = sql.format(values_str) actual_query = sql.format(values_str)
DB.query(actual_query) cls.db.query(actual_query)
values.clear() values.clear()
if len(values) > 0: if len(values) > 0:
values_str = ','.join(values) values_str = ','.join(values)
actual_query = sql.format(values_str) actual_query = sql.format(values_str)
DB.query(actual_query) cls.db.query(actual_query)
values.clear() values.clear()
n = len(cls._votes_data) n = len(cls._votes_data)
cls._votes_data.clear() cls._votes_data.clear()
cls.commitTx()
cls.inside_flush = False cls.inside_flush = False
return n return n
...@@ -21,7 +21,7 @@ async def get_post_id(db, author, permlink): ...@@ -21,7 +21,7 @@ async def get_post_id(db, author, permlink):
INNER JOIN hive_accounts ha_a ON ha_a.id = hp.author_id INNER JOIN hive_accounts ha_a ON ha_a.id = hp.author_id
INNER JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id INNER JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id
WHERE ha_a.name = :author AND hpd_p.permlink = :permlink WHERE ha_a.name = :author AND hpd_p.permlink = :permlink
AND counter_deleted = 0 LIMIT 1""" # ABW: replace with find_comment_id(:author,:permlink)? AND counter_deleted = 0 LIMIT 1""" # ABW: replace with find_comment_id(:author,:permlink,True)?
return await db.query_one(sql, author=author, permlink=permlink) return await db.query_one(sql, author=author, permlink=permlink)
async def get_child_ids(db, post_id): async def get_child_ids(db, post_id):
...@@ -190,8 +190,8 @@ async def pids_by_query(db, sort, start_author, start_permlink, limit, tag): ...@@ -190,8 +190,8 @@ async def pids_by_query(db, sort, start_author, start_permlink, limit, tag):
where.append(sql) where.append(sql)
start_id = None start_id = None
if start_permlink and start_author: if start_permlink or start_author:
sql = "%s <= (SELECT %s FROM %s WHERE id = find_comment_id('{}', '{}'))".format(start_author, start_permlink) sql = "%s <= (SELECT %s FROM %s WHERE id = find_comment_id('{}', '{}', True))".format(start_author, start_permlink)
where.append(sql % (field, raw_field, table)) where.append(sql % (field, raw_field, table))
sql = """ sql = """
......
Subproject commit 9f6058b31adec6378ead1b15ae6c1e7bb75823f7 Subproject commit 43f46f320af704f4ac173005696ab8526e8d08f2