Skip to content
Snippets Groups Projects
Commit 3b01c2aa authored by Bartek Wrona's avatar Bartek Wrona
Browse files

branch 'reputation_api_support' rebased onto develop

parents 763dbde8 dbf23919
No related branches found
No related tags found
1 merge request!121Prerequisuites to Reputation api support
......@@ -50,6 +50,9 @@ class Db:
self._exec = self._conn.execute
self._exec(sqlalchemy.text("COMMIT"))
def clone(self):
return Db(self._url)
def engine(self):
"""Lazy-loaded SQLAlchemy engine."""
if not self._engine:
......
......@@ -91,15 +91,12 @@ class DbState:
@classmethod
def _disableable_indexes(cls):
to_locate = [
#'hive_posts_ix3', # (author, depth, id)
#'hive_posts_ix4', # (parent_id, id, counter_deleted=0)
#'hive_posts_ix5', # (community_id>0, is_pinned=1)
'hive_follows_ix5a', # (following, state, created_at, follower)
'hive_follows_ix5b', # (follower, state, created_at, following)
# 'hive_posts_parent_id_idx',
'hive_posts_parent_id_idx',
'hive_posts_depth_idx',
'hive_posts_created_at_idx',
'hive_posts_root_id_id_idx',
'hive_posts_community_id_idx',
......@@ -115,20 +112,9 @@ class DbState:
'hive_votes_voter_id_idx',
'hive_votes_block_num_idx',
#'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0)
#'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0)
#'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0)
#'hive_posts_cache_ix7b', # (post_id, sc_hot, paidout=0)
#'hive_posts_cache_ix8', # (category, payout, depth, paidout=0)
#'hive_posts_cache_ix9a', # (depth, payout, post_id, paidout=0)
#'hive_posts_cache_ix9b', # (category, depth, payout, post_id, paidout=0)
#'hive_posts_cache_ix10', # (post_id, payout, gray=1, payout>0)
#'hive_posts_cache_ix30', # API: community trend
#'hive_posts_cache_ix31', # API: community hot
#'hive_posts_cache_ix32', # API: community created
#'hive_posts_cache_ix33', # API: community payout
#'hive_posts_cache_ix34', # API: community muted
'hive_accounts_ix5' # (cached_at, name)
'hive_accounts_ix5', # (cached_at, name)
'hive_post_tags_tag_id_idx'
]
to_return = []
......
......@@ -61,6 +61,23 @@ def build_metadata():
sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep
)
sa.Table(
'hive_reputation_data', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('author_id', sa.Integer, nullable=False),
sa.Column('voter_id', sa.Integer, nullable=False),
sa.Column('permlink', sa.String(255, collation='C'), nullable=False),
sa.Column('rshares', sa.BigInteger, nullable=False),
sa.Column('block_num', sa.Integer, nullable=False),
# sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']),
# sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']),
# sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']),
sa.UniqueConstraint('author_id', 'permlink', 'voter_id', name='hive_reputation_data_uk')
)
sa.Table(
'hive_posts', metadata,
sa.Column('id', sa.Integer, primary_key=True),
......@@ -206,9 +223,10 @@ def build_metadata():
sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('tag_id', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_post_tags_fk1'),
sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id'], name='hive_post_tags_fk2'),
sa.Index('hive_post_tags_post_id_idx', 'post_id'),
sa.Index('hive_post_tags_tag_id_idx', 'tag_id')
)
......@@ -1390,6 +1408,64 @@ def setup(db):
"""
db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS process_reputation_data(in _block_num hive_blocks.num%TYPE, in _author hive_accounts.name%TYPE,
in _permlink hive_permlink_data.permlink%TYPE, in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE)
;
CREATE OR REPLACE FUNCTION process_reputation_data(in _block_num hive_blocks.num%TYPE,
in _author hive_accounts.name%TYPE, in _permlink hive_permlink_data.permlink%TYPE,
in _voter hive_accounts.name%TYPE, in _rshares hive_votes.rshares%TYPE)
RETURNS void
LANGUAGE sql
VOLATILE
AS $BODY$
WITH __insert_info AS (
INSERT INTO hive_reputation_data
(author_id, voter_id, permlink, block_num, rshares)
--- Warning DISTINCT is needed here since we have to strict join to hv table and there is really made a CROSS JOIN
--- between ha and hv records (producing 2 duplicated records)
SELECT DISTINCT ha.id as author_id, hv.id as voter_id, _permlink, _block_num, _rshares
FROM hive_accounts ha
JOIN hive_accounts hv ON hv.name = _voter
JOIN hive_posts hp ON hp.author_id = ha.id
JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE hpd.permlink = _permlink
AND ha.name = _author
AND NOT hp.is_paidout --- voting on paidout posts shall have no effect
AND hv.reputation >= 0 --- voter's negative reputation eliminates vote from processing
AND (_rshares >= 0
OR (hv.reputation >= (ha.reputation - COALESCE((SELECT (hrd.rshares >> 6) -- if previous vote was a downvote we need to correct author reputation before current comparison to voter's reputation
FROM hive_reputation_data hrd
WHERE hrd.author_id = ha.id
AND hrd.voter_id=hv.id
AND hrd.permlink=_permlink
AND hrd.rshares < 0), 0)))
)
ON CONFLICT ON CONSTRAINT hive_reputation_data_uk DO
UPDATE SET
rshares = EXCLUDED.rshares
RETURNING (xmax = 0) AS is_new_vote,
(SELECT hrd.rshares
FROM hive_reputation_data hrd
--- Warning we want OLD row here, not both, so we're using old ID to select old one (new record has different value) !!!
WHERE hrd.id = hive_reputation_data.id AND hrd.author_id = author_id and hrd.voter_id=voter_id and hrd.permlink=_permlink) AS old_rshares, author_id, voter_id
)
UPDATE hive_accounts uha
SET reputation = CASE __insert_info.is_new_vote
WHEN true THEN ha.reputation + (_rshares >> 6)
ELSE ha.reputation - (__insert_info.old_rshares >> 6) + (_rshares >> 6)
END
FROM hive_accounts ha
JOIN __insert_info ON ha.id = __insert_info.author_id
WHERE uha.id = __insert_info.author_id
;
$BODY$;
"""
db.query_no_return(sql)
def reset_autovac(db):
"""Initializes/resets per-table autovacuum/autoanalyze params.
......@@ -1421,9 +1497,27 @@ def set_fillfactor(db):
fillfactor_config = {
'hive_posts': 70,
'hive_post_data': 70,
'hive_votes': 70
'hive_votes': 70,
'hive_reputation_data': 50
}
for table, fillfactor in fillfactor_config.items():
sql = """ALTER TABLE {} SET (FILLFACTOR = {})"""
db.query(sql.format(table, fillfactor))
def set_logged(db, logged):
"""Initializes/resets LOGGED/UNLOGGED attribute for tables which are intesively updated"""
logged_config = [
'hive_author',
'hive_permlink_data',
'hive_post_tags',
'hive_posts',
'hive_post_data',
'hive_votes',
'hive_reputation_data'
]
for table in logged_config:
sql = """ALTER TABLE {} SET {}"""
db.query_no_return(sql.format(table, 'LOGGED' if logged else 'UNLOGGED'))
......@@ -2,6 +2,7 @@
from hive.indexer.reblog import Reblog
import logging
import concurrent
from hive.db.adapter import Db
......@@ -13,6 +14,8 @@ from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from hive.indexer.reblog import Reblog
from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM
......@@ -20,15 +23,38 @@ from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.trends import update_hot_and_tranding_for_block_range
from hive.utils.post_active import update_active_starting_from_posts_on_block
from concurrent.futures import ThreadPoolExecutor
log = logging.getLogger(__name__)
DB = Db.instance()
def time_collector(f):
startTime = FSM.start()
result = f()
elapsedTime = FSM.stop(startTime)
return (result, elapsedTime)
def follows_flush_helper():
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
return folllow_items
class Blocks:
"""Processes blocks, dispatches work, manages `hive_blocks` table."""
blocks_to_flush = []
_head_block_date = None # timestamp of last fully processed block ("previous block")
_current_block_date = None # timestamp of block currently being processes ("current block")
_head_block_date = None
_current_block_date = None
_concurrent_flush = [
('Posts', Posts.flush, Posts),
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Tags', Tags.flush, Tags),
('Follow', follows_flush_helper, Follow),
('Reblog', Reblog.flush, Reblog)
]
def __init__(cls):
head_date = cls.head_date()
......@@ -39,6 +65,16 @@ class Blocks:
cls._head_block_date = head_date
cls._current_block_date = head_date
@classmethod
def setup_db_access(self, sharedDbAdapter):
PostDataCache.setup_db_access(sharedDbAdapter)
Reputations.setup_db_access(sharedDbAdapter)
Votes.setup_db_access(sharedDbAdapter)
Tags.setup_db_access(sharedDbAdapter)
Follow.setup_db_access(sharedDbAdapter)
Posts.setup_db_access(sharedDbAdapter)
Reblog.setup_db_access(sharedDbAdapter)
@classmethod
def head_num(cls):
"""Get hive's head block number."""
......@@ -63,6 +99,8 @@ class Blocks:
Votes.flush()
Posts.flush()
Reblog.flush()
Reputations.flush()
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter()
......@@ -73,6 +111,7 @@ class Blocks:
def process_multi(cls, blocks, vops, hived, is_initial_sync=False):
"""Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start()
DB.query("START TRANSACTION")
last_num = 0
......@@ -98,18 +137,36 @@ class Blocks:
log.info("#############################################################################")
flush_time = register_time(flush_time, "Blocks", cls._flush_blocks())
flush_time = register_time(flush_time, "PostDataCache", PostDataCache.flush())
flush_time = register_time(flush_time, "Tags", Tags.flush())
flush_time = register_time(flush_time, "Votes", Votes.flush())
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Reblog", Reblog.flush())
DB.query("COMMIT")
completedThreads = 0;
pool = ThreadPoolExecutor(max_workers = len(cls._concurrent_flush))
flush_futures = {pool.submit(time_collector, f): (description, c) for (description, f, c) in cls._concurrent_flush}
for future in concurrent.futures.as_completed(flush_futures):
(description, c) = flush_futures[future]
completedThreads = completedThreads + 1
try:
(n, elapsedTime) = future.result()
assert n is not None
assert not c.tx_active()
FSM.flush_stat(description, elapsedTime, n)
# if n > 0:
# log.info('%r flush generated %d records' % (description, n))
except Exception as exc:
log.error('%r generated an exception: %s' % (description, exc))
raise exc
pool.shutdown()
assert completedThreads == len(cls._concurrent_flush)
if (not is_initial_sync) and (first_block > -1):
DB.query("START TRANSACTION")
cls.on_live_blocks_processed( first_block, last_num )
DB.query("COMMIT")
DB.query("COMMIT")
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
......@@ -144,6 +201,7 @@ class Blocks:
elif op_type == 'effective_comment_vote_operation':
Votes.effective_comment_vote_op( op_value )
Reputations.process_vote(block_num, op_value)
if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None }
......
import logging
log = logging.getLogger(__name__)
class DbAdapterHolder(object):
db = None
_inside_tx = False
@classmethod
def setup_db_access(self, sharedDb):
self.db = sharedDb.clone()
@classmethod
def tx_active(self):
return self._inside_tx
@classmethod
def beginTx(self):
self.db.query("START TRANSACTION")
self._inside_tx = True
@classmethod
def commitTx(self):
self.db.query("COMMIT")
self._inside_tx = False
......@@ -9,9 +9,9 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.notify import Notify
log = logging.getLogger(__name__)
from hive.indexer.db_adapter_holder import DbAdapterHolder
DB = Db.instance()
log = logging.getLogger(__name__)
FOLLOWERS = 'followers'
FOLLOWING = 'following'
......@@ -66,7 +66,7 @@ def _flip_dict(dict_to_flip):
flipped[value] = [key]
return flipped
class Follow:
class Follow(DbAdapterHolder):
"""Handles processing of incoming follow ups and flushing to db."""
follow_items_to_flush = dict()
......@@ -102,7 +102,7 @@ class Follow:
else:
old_state = cls._get_follow_db_state(op['flr'], op['flg'])
# insert or update state
DB.query(FOLLOW_ITEM_INSERT_QUERY, **op)
cls.db.query(FOLLOW_ITEM_INSERT_QUERY, **op)
if new_state == 1:
Follow.follow(op['flr'], op['flg'])
if old_state is None:
......@@ -145,7 +145,7 @@ class Follow:
sql = """SELECT state FROM hive_follows
WHERE follower = :follower
AND following = :following"""
return DB.query_one(sql, follower=follower, following=following)
return cls.db.query_one(sql, follower=follower, following=following)
# -- stat tracking --
......@@ -210,7 +210,7 @@ class Follow:
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
values.clear()
values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'],
......@@ -222,7 +222,7 @@ class Follow:
if len(values) > 0:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
cls.follow_items_to_flush.clear()
......@@ -244,7 +244,7 @@ class Follow:
return 0
start = perf()
DB.batch_queries(sqls, trx=trx)
cls.db.batch_queries(sqls, trx=trx)
if trx:
log.info("[SYNC] flushed %d follow deltas in %ds",
updated, perf() - start)
......@@ -268,7 +268,7 @@ class Follow:
following = (SELECT COUNT(*) FROM hive_follows WHERE state = 1 AND follower = hive_accounts.id)
WHERE id IN :ids
"""
DB.query(sql, ids=tuple(ids))
cls.db.query(sql, ids=tuple(ids))
@classmethod
def force_recount(cls):
......@@ -286,7 +286,7 @@ class Follow:
LEFT JOIN hive_follows hf ON id = hf.following AND state = 1
GROUP BY id);
"""
DB.query(sql)
cls.db.query(sql)
log.info("[SYNC] update follower counts")
sql = """
......@@ -296,4 +296,4 @@ class Follow:
UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num;
"""
DB.query(sql)
cls.db.query(sql)
import logging
import logging
from hive.utils.normalize import escape_characters
from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
DB = Db.instance()
class PostDataCache(object):
class PostDataCache(DbAdapterHolder):
""" Procides cache for DB operations on post data table in order to speed up initial sync """
_data = {}
@classmethod
def is_cached(cls, pid):
""" Check if data is cached """
......@@ -35,7 +38,7 @@ class PostDataCache(object):
sql = """
SELECT hpd.body FROM hive_post_data hpd WHERE hpd.id = :post_id;
"""
row = DB.query_row(sql, post_id = pid)
row = cls.db.query_row(sql, post_id = pid)
post_data = dict(row)
return post_data['body']
......@@ -45,6 +48,13 @@ class PostDataCache(object):
if cls._data:
values_insert = []
values_update = []
cls.beginTx()
sql = """
INSERT INTO
hive_post_data (id, title, preview, img_url, body, json)
VALUES
"""
values = []
for k, data in cls._data.items():
title = 'NULL' if data['title'] is None else "{}".format(escape_characters(data['title']))
body = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body']))
......@@ -66,7 +76,7 @@ class PostDataCache(object):
sql += ','.join(values_insert)
if print_query:
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
cls.db.query(sql)
if values_update:
sql = """
......@@ -88,7 +98,9 @@ class PostDataCache(object):
"""
if print_query:
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
cls.db.query(sql)
cls.commitTx()
n = len(cls._data.keys())
cls._data.clear()
......
......@@ -16,12 +16,14 @@ from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive, safe_img_url
log = logging.getLogger(__name__)
DB = Db.instance()
class Posts:
class Posts(DbAdapterHolder):
"""Handles critical/core post ops and data."""
# LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries)
......@@ -224,9 +226,13 @@ class Posts:
yield lst[i:i + n]
for chunk in chunks(cls._comment_payout_ops, 1000):
cls.beginTx()
values_str = ','.join(chunk)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
cls.commitTx()
n = len(cls._comment_payout_ops)
cls._comment_payout_ops.clear()
......
......@@ -8,9 +8,7 @@ from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.notify import Notify
DB = Db.instance()
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
......@@ -42,7 +40,7 @@ INSERT_SQL = """
RETURNING post_id
"""
class Reblog():
class Reblog(DbAdapterHolder):
""" Class for reblog operations """
reblog_items_to_flush = []
......@@ -63,7 +61,7 @@ class Reblog():
return
if 'delete' in op_json and op_json['delete'] == 'delete':
row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink)
row = cls.db.query_row(DELETE_SQL, a=blogger, permlink=permlink)
if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink)
return
......@@ -72,12 +70,12 @@ class Reblog():
FeedCache.delete(result['post_id'], result['account_id'])
else:
if DbState.is_initial_sync():
row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
row = cls.db.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
result = dict(row)
cls.reblog_items_to_flush.append(result)
else:
row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
row = cls.db.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger)
......@@ -111,7 +109,7 @@ class Reblog():
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
values.clear()
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = 1
......@@ -119,6 +117,6 @@ class Reblog():
if len(values) > 0:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.db.query(query)
cls.reblog_items_to_flush.clear()
return item_count
\ No newline at end of file
""" Reputation update support """
import logging
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
CACHED_ITEMS_LIMIT = 200
class Reputations(DbAdapterHolder):
_queries = []
@classmethod
def process_vote(self, block_num, effective_vote_op):
self._queries.append("\nSELECT process_reputation_data({}, '{}', '{}', '{}', {});".format(block_num, effective_vote_op['author'], effective_vote_op['permlink'],
effective_vote_op['voter'], effective_vote_op['rshares']))
@classmethod
def flush(self):
if not self._queries:
return 0
self.beginTx()
query = ""
i = 0
items = 0
for s in self._queries:
query = query + str(self._queries[i]) + ";\n"
i = i + 1
items = items + 1
if items >= CACHED_ITEMS_LIMIT:
self.db.query_no_return(query)
query = ""
items = 0
if items >= CACHED_ITEMS_LIMIT:
self.db.query_no_return(query)
query = ""
items = 0
n = len(self._queries)
self._queries.clear()
self.commitTx()
return n
"""Hive sync manager."""
from hive.indexer.reblog import Reblog
import logging
import glob
from time import perf_counter as perf
......@@ -25,6 +24,9 @@ from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow
from hive.indexer.community import Community
from hive.indexer.reblog import Reblog
from hive.indexer.reputations import Reputations
from hive.server.common.mutes import Mutes
from hive.utils.stats import OPStatusManager as OPSM
......@@ -99,6 +101,7 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
try:
count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
wait_time_1 = WSM.start()
......@@ -221,6 +224,7 @@ class Sync:
# ensure db schema up to date, check app status
DbState.initialize()
Blocks.setup_db_access(self._db)
# prefetch id->name and id->rank memory maps
Accounts.load_ids()
......@@ -315,13 +319,14 @@ class Sync:
remaining = drop(skip_lines, f)
for lines in partition_all(chunk_size, remaining):
raise RuntimeError("Sync from checkpoint disabled")
Blocks.process_multi(map(json.loads, lines), True)
# Blocks.process_multi(map(json.loads, lines), True)
last_block = num
last_read = num
def from_steemd(self, is_initial_sync=False, chunk_size=1000):
"""Fast sync strategy: read/process blocks in batches."""
steemd = self._steem
lbound = Blocks.head_num() + 1
ubound = self._conf.get('test_max_block') or steemd.last_irreversible()
......@@ -367,6 +372,7 @@ class Sync:
# debug: no max gap if disable_sync in effect
max_gap = None if self._conf.get('test_disable_sync') else 100
assert self._blocksProcessor
steemd = self._steem
hive_head = Blocks.head_num()
......
import logging
from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
DB = Db.instance()
from hive.utils.normalize import escape_characters
class Tags(object):
class Tags(DbAdapterHolder):
""" Tags cache """
_tags = []
......@@ -17,8 +17,9 @@ class Tags(object):
@classmethod
def flush(cls):
""" Flush tags to table """
""" Flush tags to table """
if cls._tags:
cls.beginTx()
limit = 1000
sql = """
......@@ -32,11 +33,11 @@ class Tags(object):
values.append("({})".format(escape_characters(tag[1])))
if len(values) >= limit:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
if len(values) > 0:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
sql = """
......@@ -62,13 +63,13 @@ class Tags(object):
values.append("({}, {})".format(tag[0], escape_characters(tag[1])))
if len(values) >= limit:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
if len(values) > 0:
tag_query = str(sql)
DB.query(tag_query.format(','.join(values)))
cls.db.query(tag_query.format(','.join(values)))
values.clear()
cls.commitTx()
n = len(cls._tags)
cls._tags.clear()
return n
......@@ -4,11 +4,11 @@ import logging
from hive.db.db_state import DbState
from hive.db.adapter import Db
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
DB = Db.instance()
class Votes:
class Votes(DbAdapterHolder):
""" Class for managing posts votes """
_votes_data = {}
......@@ -67,9 +67,12 @@ class Votes:
@classmethod
def flush(cls):
""" Flush vote data from cache to database """
cls.inside_flush = True
n = 0
if cls._votes_data:
cls.beginTx()
sql = """
INSERT INTO hive_votes
(post_id, voter_id, author_id, permlink_id, weight, rshares, vote_percent, last_update, block_num, is_effective)
......@@ -110,16 +113,19 @@ class Votes:
if len(values) >= values_limit:
values_str = ','.join(values)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
values.clear()
if len(values) > 0:
values_str = ','.join(values)
actual_query = sql.format(values_str)
DB.query(actual_query)
cls.db.query(actual_query)
values.clear()
n = len(cls._votes_data)
cls._votes_data.clear()
cls.commitTx()
cls.inside_flush = False
return n
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment