Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/hivemind
1 result
Show changes
Commits on Source (15)
......@@ -150,6 +150,8 @@ checkpoint_timeout = 30min
max_wal_size = 4GB
```
It is required to load 'intarray' extension. The postgresql user who has CREATE privilege can load the module with command `CREATE EXTENSION intarray`.
## JSON-RPC API
The minimum viable API is to remove the requirement for the `follow` and `tags` plugins (now rolled into [`condenser_api`](https://gitlab.syncad.com/hive/hive/-/tree/master/libraries/plugins/apis/condenser_api/condenser_api.cpp)) from the backend node while still being able to power condenser's non-wallet features. Thus, this is the core API set:
......
......@@ -119,6 +119,7 @@ class DbState:
'hive_posts_updated_at_idx',
'hive_posts_payout_plus_pending_payout_id_idx',
'hive_posts_category_id_payout_plus_pending_payout_depth_idx',
'hive_posts_tags_ids_idx',
'hive_posts_api_helper_author_s_permlink_idx',
......@@ -130,8 +131,6 @@ class DbState:
'hive_communities_block_num_idx',
'hive_reblogs_created_at_idx',
'hive_post_tags_tag_id_idx',
'hive_votes_voter_id_post_id_idx',
'hive_votes_post_id_voter_id_idx',
......
......@@ -128,6 +128,7 @@ def build_metadata():
sa.Column('beneficiaries', sa.JSON, nullable=False, server_default='[]'),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.Column('block_num_created', sa.Integer, nullable=False ),
sa.Column('tags_ids', sa.ARRAY(sa.Integer), nullable=True ),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id'], name='hive_posts_fk1'),
sa.ForeignKeyConstraint(['root_id'], ['hive_posts.id'], name='hive_posts_fk2'),
......@@ -152,8 +153,9 @@ def build_metadata():
sa.Index('hive_posts_cashout_time_id_idx', 'cashout_time', 'id'),
sa.Index('hive_posts_updated_at_idx', sa.text('updated_at DESC')),
sa.Index('hive_posts_payout_plus_pending_payout_id_idx', sa.text('(payout+pending_payout), id, is_paidout'), postgresql_where=sql_text("counter_deleted = 0 AND NOT is_paidout")),
sa.Index('hive_posts_category_id_payout_plus_pending_payout_depth_idx', sa.text('category_id, (payout+pending_payout), depth'), postgresql_where=sql_text("NOT is_paidout AND counter_deleted = 0"))
)
sa.Index('hive_posts_category_id_payout_plus_pending_payout_depth_idx', sa.text('category_id, (payout+pending_payout), depth'), postgresql_where=sql_text("NOT is_paidout AND counter_deleted = 0")),
sa.Index('hive_posts_tags_ids_idx', 'tags_ids', postgresql_using="gin", postgresql_ops={'tags_ids': 'gin__int_ops'})
)
sa.Table(
'hive_post_data', metadata,
......@@ -215,18 +217,6 @@ def build_metadata():
sa.UniqueConstraint('tag', name='hive_tag_data_ux1')
)
sa.Table(
'hive_post_tags', metadata,
sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('tag_id', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_post_tags_fk1'),
sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id'], name='hive_post_tags_fk2'),
sa.Index('hive_post_tags_tag_id_idx', 'tag_id')
)
sa.Table(
'hive_follows', metadata,
sa.Column('id', sa.Integer, primary_key=True ),
......@@ -457,6 +447,9 @@ def create_fk(db):
def setup(db):
"""Creates all tables and seed data"""
sql = """SELECT * FROM pg_extension WHERE extname='intarray'"""
assert db.query_row( sql ), "The database requires created 'intarray' extension"
# initialize schema
build_metadata().create_all(db.engine())
......@@ -617,8 +610,8 @@ def setup(db):
dir_path = dirname(realpath(__file__))
for script in sql_scripts:
execute_sql_script(db.query_no_return, "{}/sql_scripts/{}".format(dir_path, script))
......@@ -631,7 +624,6 @@ def reset_autovac(db):
autovac_config = { # vacuum analyze
'hive_accounts': (50000, 100000),
'hive_posts': (2500, 10000),
'hive_post_tags': (5000, 10000),
'hive_follows': (5000, 5000),
'hive_feed_cache': (5000, 5000),
'hive_blocks': (5000, 25000),
......@@ -667,7 +659,6 @@ def set_logged_table_attribute(db, logged):
logged_config = [
'hive_accounts',
'hive_permlink_data',
'hive_post_tags',
'hive_posts',
'hive_post_data',
'hive_votes',
......
......@@ -5,11 +5,11 @@ AS
$function$
DECLARE
__post_id INT;
__hive_tag INT;
__hive_tag INT[];
__observer_id INT;
BEGIN
__post_id = find_comment_id( _author, _permlink, True );
__hive_tag = find_tag_id( _tag, True );
__hive_tag = ARRAY_APPEND( __hive_tag, find_tag_id( _tag, True ));
__observer_id = find_account_id(_observer, False);
RETURN QUERY SELECT
hp.id,
......@@ -54,10 +54,9 @@ BEGIN
SELECT
hp1.id
FROM
hive_post_tags hpt
JOIN hive_posts hp1 ON hp1.id = hpt.post_id
hive_posts hp1
JOIN hive_accounts_view ha ON hp1.author_id = ha.id
WHERE hpt.tag_id = __hive_tag AND hp1.counter_deleted = 0 AND hp1.depth = 0 AND NOT ha.is_grayed AND ( __post_id = 0 OR hp1.id < __post_id )
WHERE hp1.tags_ids @> __hive_tag AND hp1.counter_deleted = 0 AND hp1.depth = 0 AND NOT ha.is_grayed AND ( __post_id = 0 OR hp1.id < __post_id )
--ORDER BY hp1.id + 0 DESC -- this workaround helped the query to better choose indexes, but after some time it started to significally slow down
AND (NOT EXISTS (SELECT 1 FROM muted_accounts_by_id_view WHERE observer_id = __observer_id AND muted_id = hp1.author_id))
ORDER BY hp1.id DESC
......@@ -78,14 +77,14 @@ $function$
DECLARE
__post_id INT;
__hot_limit FLOAT;
__hive_tag INT;
__hive_tag INT[];
__observer_id INT;
BEGIN
__post_id = find_comment_id( _author, _permlink, True );
IF __post_id <> 0 THEN
SELECT hp.sc_hot INTO __hot_limit FROM hive_posts hp WHERE hp.id = __post_id;
END IF;
__hive_tag = find_tag_id( _tag, True );
__hive_tag = ARRAY_APPEND( __hive_tag, find_tag_id( _tag, True ));
__observer_id = find_account_id(_observer, False);
RETURN QUERY SELECT
hp.id,
......@@ -131,9 +130,8 @@ BEGIN
hp1.id
, hp1.sc_hot as hot
FROM
hive_post_tags hpt
JOIN hive_posts hp1 ON hp1.id = hpt.post_id
WHERE hpt.tag_id = __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND hp1.depth = 0
hive_posts hp1
WHERE hp1.tags_ids @> __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND hp1.depth = 0
AND ( __post_id = 0 OR hp1.sc_hot < __hot_limit OR ( hp1.sc_hot = __hot_limit AND hp1.id < __post_id ) )
AND (NOT EXISTS (SELECT 1 FROM muted_accounts_by_id_view WHERE observer_id = __observer_id AND muted_id = hp1.author_id))
ORDER BY hp1.sc_hot DESC, hp1.id DESC
......@@ -154,13 +152,13 @@ $function$
DECLARE
__post_id INT;
__payout_limit hive_posts.payout%TYPE;
__hive_tag INT;
__hive_tag INT[];
BEGIN
__post_id = find_comment_id( _author, _permlink, True );
IF __post_id <> 0 THEN
SELECT ( hp.payout + hp.pending_payout ) INTO __payout_limit FROM hive_posts hp WHERE hp.id = __post_id;
END IF;
__hive_tag = find_tag_id( _tag, True );
__hive_tag = ARRAY_APPEND( __hive_tag, find_tag_id( _tag, True ) );
RETURN QUERY SELECT
hp.id,
hp.author,
......@@ -206,9 +204,8 @@ BEGIN
, ( hp1.payout + hp1.pending_payout ) as all_payout
FROM
hive_posts hp1
JOIN hive_post_tags hpt ON hp1.id = hpt.post_id
JOIN hive_accounts_view ha ON hp1.author_id = ha.id
WHERE hpt.tag_id = __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND ha.is_grayed AND ( hp1.payout + hp1.pending_payout ) > 0
WHERE hp1.tags_ids @> __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND ha.is_grayed AND ( hp1.payout + hp1.pending_payout ) > 0
AND ( __post_id = 0 OR ( hp1.payout + hp1.pending_payout ) < __payout_limit OR ( ( hp1.payout + hp1.pending_payout ) = __payout_limit AND hp1.id < __post_id ) )
ORDER BY ( hp1.payout + hp1.pending_payout ) DESC, hp1.id DESC
LIMIT _limit
......@@ -381,14 +378,14 @@ $function$
DECLARE
__post_id INT;
__promoted_limit hive_posts.promoted%TYPE;
__hive_tag INT;
__hive_tag INT[];
__observer_id INT;
BEGIN
__post_id = find_comment_id( _author, _permlink, True );
IF __post_id <> 0 THEN
SELECT hp.promoted INTO __promoted_limit FROM hive_posts hp WHERE hp.id = __post_id;
END IF;
__hive_tag = find_tag_id( _tag, True );
__hive_tag = ARRAY_APPEND( __hive_tag, find_tag_id( _tag, True ) );
__observer_id = find_account_id(_observer, False);
RETURN QUERY SELECT
hp.id,
......@@ -434,9 +431,8 @@ BEGIN
hp1.id
, hp1.promoted as promoted
FROM
hive_post_tags hpt
JOIN hive_posts hp1 ON hp1.id = hpt.post_id
WHERE hpt.tag_id = __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND hp1.promoted > 0
hive_posts hp1
WHERE hp1.tags_ids @> __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND hp1.promoted > 0
AND ( __post_id = 0 OR hp1.promoted < __promoted_limit OR ( hp1.promoted = __promoted_limit AND hp1.id < __post_id ) )
AND (NOT EXISTS (SELECT 1 FROM muted_accounts_by_id_view WHERE observer_id = __observer_id AND muted_id = hp1.author_id))
ORDER BY hp1.promoted DESC, hp1.id DESC
......@@ -457,14 +453,14 @@ $function$
DECLARE
__post_id INT;
__trending_limit FLOAT;
__hive_tag INT;
__hive_tag INT[];
__observer_id INT;
BEGIN
__post_id = find_comment_id( _author, _permlink, True );
IF __post_id <> 0 THEN
SELECT hp.sc_trend INTO __trending_limit FROM hive_posts hp WHERE hp.id = __post_id;
END IF;
__hive_tag = find_tag_id( _tag, True );
__hive_tag = ARRAY_APPEND( __hive_tag, find_tag_id( _tag, True ));
__observer_id = find_account_id(_observer, False);
RETURN QUERY SELECT
hp.id,
......@@ -510,15 +506,15 @@ BEGIN
hp1.id
, hp1.sc_trend as trend
FROM
hive_post_tags hpt
JOIN hive_posts hp1 ON hp1.id = hpt.post_id
WHERE hpt.tag_id = __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND hp1.depth = 0
hive_posts hp1
WHERE hp1.tags_ids @> __hive_tag AND hp1.counter_deleted = 0 AND NOT hp1.is_paidout AND hp1.depth = 0
AND ( __post_id = 0 OR hp1.sc_trend < __trending_limit OR ( hp1.sc_trend = __trending_limit AND hp1.id < __post_id ) )
AND (NOT EXISTS (SELECT 1 FROM muted_accounts_by_id_view WHERE observer_id = __observer_id AND muted_id = hp1.author_id))
ORDER BY hp1.sc_trend DESC, hp1.id DESC
LIMIT _limit
) as trends
JOIN hive_posts_view hp ON hp.id = trends.id
WHERE (CASE WHEN _observer IS NOT NULL THEN NOT EXISTS (SELECT 1 FROM muted_accounts_view WHERE observer = _observer AND muted = hp.author) ELSE true END)
ORDER BY trends.trend DESC, trends.id DESC
LIMIT _limit;
END
......
DROP FUNCTION if exists process_hive_post_operation(character varying,character varying,character varying,character varying,timestamp without time zone,integer,integer)
DROP FUNCTION IF EXISTS prepare_tags;
CREATE OR REPLACE FUNCTION prepare_tags( in _raw_tags VARCHAR[] )
RETURNS SETOF hive_tag_data.id%TYPE
LANGUAGE 'plpgsql'
VOLATILE
AS
$function$
DECLARE
__i INTEGER;
__tags VARCHAR[];
__tag VARCHAR;
BEGIN
FOR __i IN 1 .. ARRAY_UPPER( _raw_tags, 1)
LOOP
__tag = CAST( LEFT(LOWER(REGEXP_REPLACE( _raw_tags[ __i ], '[#\s]', '', 'g' )),32) as VARCHAR);
CONTINUE WHEN __tag = '' OR __tag = ANY(__tags);
__tags = ARRAY_APPEND( __tags, __tag );
END LOOP;
RETURN QUERY INSERT INTO
hive_tag_data AS htd(tag)
SELECT UNNEST( __tags )
ON CONFLICT("tag") DO UPDATE SET tag=EXCLUDED.tag --trick to always return id
RETURNING htd.id;
END
$function$;
DROP FUNCTION IF EXISTS process_hive_post_operation;
;
CREATE OR REPLACE FUNCTION process_hive_post_operation(
in _author hive_accounts.name%TYPE,
......@@ -7,7 +34,8 @@ CREATE OR REPLACE FUNCTION process_hive_post_operation(
in _parent_permlink hive_permlink_data.permlink%TYPE,
in _date hive_posts.created_at%TYPE,
in _community_support_start_block hive_posts.block_num%TYPE,
in _block_num hive_posts.block_num%TYPE)
in _block_num hive_posts.block_num%TYPE,
in _metadata_tags VARCHAR[])
RETURNS TABLE (is_new_post boolean, id hive_posts.id%TYPE, author_id hive_posts.author_id%TYPE, permlink_id hive_posts.permlink_id%TYPE,
post_category hive_category_data.category%TYPE, parent_id hive_posts.parent_id%TYPE, community_id hive_posts.community_id%TYPE,
is_valid hive_posts.is_valid%TYPE, is_muted hive_posts.is_muted%TYPE, depth hive_posts.depth%TYPE)
......@@ -74,7 +102,9 @@ ELSE
RETURN QUERY INSERT INTO hive_posts as hp
(parent_id, depth, community_id, category_id,
root_id, is_muted, is_valid,
author_id, permlink_id, created_at, updated_at, sc_hot, sc_trend, active, payout_at, cashout_time, counter_deleted, block_num, block_num_created)
author_id, permlink_id, created_at, updated_at, sc_hot, sc_trend,
active, payout_at, cashout_time, counter_deleted, block_num, block_num_created,
tags_ids)
SELECT 0 AS parent_id, 0 AS depth,
(CASE
WHEN _block_num > _community_support_start_block THEN
......@@ -90,6 +120,10 @@ ELSE
calculate_time_part_of_trending(_date) AS sc_trend,
_date AS active, (_date + INTERVAL '7 days') AS payout_at, (_date + INTERVAL '7 days') AS cashout_time, 0
, _block_num as block_num, _block_num as block_num_created
, (
SELECT ARRAY_AGG( prepare_tags )
FROM prepare_tags( ARRAY_APPEND(_metadata_tags, _parent_permlink ) )
) as tags_ids
FROM hive_accounts ha,
hive_permlink_data hpd
WHERE ha.name = _author and hpd.permlink = _permlink
......@@ -100,7 +134,8 @@ ELSE
--- post edit part
updated_at = _date,
active = _date,
block_num = _block_num
block_num = _block_num,
tags_ids = EXCLUDED.tags_ids
RETURNING (xmax = 0) as is_new_post, hp.id, hp.author_id, hp.permlink_id, _parent_permlink as post_category, hp.parent_id, hp.community_id, hp.is_valid, hp.is_muted, hp.depth
;
......
do $$
BEGIN
ASSERT EXISTS (SELECT * FROM pg_extension WHERE extname='intarray'), 'The database requires created "intarray" extension';
END$$;
CREATE TABLE IF NOT EXISTS hive_db_patch_level
(
level SERIAL NOT NULL PRIMARY KEY,
......@@ -215,6 +220,29 @@ IF NOT EXISTS (SELECT data_type FROM information_schema.columns
ELSE
RAISE NOTICE 'SKIPPING hive_posts upgrade - adding total_votes and net_votes columns';
END IF;
IF NOT EXISTS(SELECT data_type FROM information_schema.columns
WHERE table_name = 'hive_posts' AND column_name = 'tags_ids') THEN
ALTER TABLE ONLY hive_posts
ADD COLUMN tags_ids INTEGER[];
UPDATE hive_posts hp
SET
tags_ids = tags.tags
FROM
(
SELECT
post_id as post_id,
array_agg( hpt.tag_id ) as tags
FROM
hive_post_tags hpt
GROUP BY post_id
) as tags
WHERE hp.id = tags.post_id;
ELSE
RAISE NOTICE 'SKIPPING hive_posts upgrade - adding a tags_ids column';
END IF;
END
$BODY$
......@@ -390,3 +418,8 @@ DROP INDEX IF EXISTS hive_posts_promoted_idx;
CREATE INDEX IF NOT EXISTS hive_posts_promoted_id_idx ON hive_posts (promoted, id)
WHERE NOT is_paidout AND counter_deleted = 0
;
CREATE INDEX IF NOT EXISTS hive_posts_tags_ids_idx ON hive_posts USING gin(tags_ids gin__int_ops);
--DROP TABLE IF EXISTS hive_post_tags;
......@@ -14,7 +14,6 @@ from hive.indexer.payments import Payments
from hive.indexer.follow import Follow
from hive.indexer.votes import Votes
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.reputations import Reputations
from hive.indexer.reblog import Reblog
from hive.indexer.notify import Notify
......@@ -49,7 +48,6 @@ class Blocks:
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Tags', Tags.flush, Tags),
('Follow', Follow.flush, Follow),
('Reblog', Reblog.flush, Reblog),
('Notify', Notify.flush, Notify),
......@@ -70,7 +68,6 @@ class Blocks:
PostDataCache.setup_own_db_access(sharedDbAdapter)
Reputations.setup_own_db_access(sharedDbAdapter)
Votes.setup_own_db_access(sharedDbAdapter)
Tags.setup_own_db_access(sharedDbAdapter)
Follow.setup_own_db_access(sharedDbAdapter)
Posts.setup_own_db_access(sharedDbAdapter)
Reblog.setup_own_db_access(sharedDbAdapter)
......@@ -413,7 +410,6 @@ class Blocks:
# remove posts: core, tags, cache entries
if post_ids:
DB.query("DELETE FROM hive_post_tags WHERE post_id IN :ids", ids=post_ids)
DB.query("DELETE FROM hive_posts WHERE id IN :ids", ids=post_ids)
DB.query("DELETE FROM hive_post_data WHERE id IN :ids", ids=post_ids)
......
......@@ -14,7 +14,6 @@ from hive.indexer.feed_cache import FeedCache
from hive.indexer.community import Community
from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.misc import chunks
......@@ -90,13 +89,27 @@ class Posts(DbAdapterHolder):
def comment_op(cls, op, block_date):
"""Register new/edited/undeleted posts; insert into feed cache."""
md = {}
# At least one case where jsonMetadata was double-encoded: condenser#895
# jsonMetadata = JSON.parse(jsonMetadata);
try:
md = loads(op['json_metadata'])
if not isinstance(md, dict):
md = {}
except Exception:
pass
tags = []
if md and 'tags' in md and isinstance(md['tags'], list):
tags = md['tags']
sql = """
SELECT is_new_post, id, author_id, permlink_id, post_category, parent_id, community_id, is_valid, is_muted, depth
FROM process_hive_post_operation((:author)::varchar, (:permlink)::varchar, (:parent_author)::varchar, (:parent_permlink)::varchar, (:date)::timestamp, (:community_support_start_block)::integer, (:block_num)::integer);
FROM process_hive_post_operation((:author)::varchar, (:permlink)::varchar, (:parent_author)::varchar, (:parent_permlink)::varchar, (:date)::timestamp, (:community_support_start_block)::integer, (:block_num)::integer, (:tags)::VARCHAR[]);
"""
row = DB.query_row(sql, author=op['author'], permlink=op['permlink'], parent_author=op['parent_author'],
parent_permlink=op['parent_permlink'], date=block_date, community_support_start_block=Community.start_block, block_num=op['block_num'])
parent_permlink=op['parent_permlink'], date=block_date, community_support_start_block=Community.start_block, block_num=op['block_num'], tags=tags)
result = dict(row)
......@@ -105,16 +118,6 @@ class Posts(DbAdapterHolder):
cls._set_id(op['author']+'/'+op['permlink'], result['id'])
md = {}
# At least one case where jsonMetadata was double-encoded: condenser#895
# jsonMetadata = JSON.parse(jsonMetadata);
try:
md = loads(op['json_metadata'])
if not isinstance(md, dict):
md = {}
except Exception:
pass
img_url = None
if 'image' in md:
img_url = md['image']
......@@ -143,18 +146,6 @@ class Posts(DbAdapterHolder):
# log.info("Adding author: {} permlink: {}".format(op['author'], op['permlink']))
PostDataCache.add_data(result['id'], post_data, is_new_post)
if not result['depth']:
tags = [result['post_category']]
if md and 'tags' in md and isinstance(md['tags'], list):
tags = tags + md['tags']
tags = map(lambda tag: (str(tag) or '').strip('# ').lower()[:32], tags)
tags = filter(None, tags)
from funcy.seqs import distinct
tags = list(distinct(tags))[:5]
for tag in tags:
Tags.add_tag(result['id'], tag)
if not DbState.is_initial_sync():
if error:
author_id = result['author_id']
......
......@@ -34,10 +34,33 @@ from hive.indexer.mock_vops_provider import MockVopsProvider
from datetime import datetime
from signal import signal, SIGINT, SIGTERM
from atomic import AtomicLong
log = logging.getLogger(__name__)
CONTINUE_PROCESSING = True
EXCEPTION_THROWN = AtomicLong(0)
FINISH_SIGNAL_DURING_SYNC = AtomicLong(0)
def finish_signals_handler(signal, frame):
global FINISH_SIGNAL_DURING_SYNC
FINISH_SIGNAL_DURING_SYNC += 1
log.info("""
**********************************************************
CAUGHT {}. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""".format( "SIGINT" if signal == SIGINT else "SIGTERM" ) )
def set_exception_thrown():
global EXCEPTION_THROWN
EXCEPTION_THROWN += 1
def can_continue_thread():
return EXCEPTION_THROWN.value == 0 and FINISH_SIGNAL_DURING_SYNC.value == 0
def prepare_vops(vops_by_block):
preparedVops = {}
......@@ -47,26 +70,33 @@ def prepare_vops(vops_by_block):
return preparedVops
def put_to_queue( data_queue, value ):
while can_continue_thread():
try:
data_queue.put( value, True, 1)
return
except queue.Full:
continue
def _block_provider(node, queue, lbound, ubound, chunk_size):
try:
num = 0
count = ubound - lbound
log.info("[SYNC] start block %d, +%d to sync", lbound, count)
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound:
while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound)
timer.batch_start()
blocks = node.get_blocks_range(lbound, to)
lbound = to
timer.batch_lap()
queue.put(blocks)
put_to_queue( queue, blocks )
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during fetching blocks")
set_exception_thrown()
raise
def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
try:
......@@ -75,21 +105,31 @@ def _vops_provider(conf, node, queue, lbound, ubound, chunk_size):
log.info("[SYNC] start vops %d, +%d to sync", lbound, count)
timer = Timer(count, entity='vops-chunk', laps=['rps', 'wps'])
while CONTINUE_PROCESSING and lbound < ubound:
while can_continue_thread() and lbound < ubound:
to = min(lbound + chunk_size, ubound)
timer.batch_start()
vops = node.enum_virtual_ops(conf, lbound, to)
preparedVops = prepare_vops(vops)
lbound = to
timer.batch_lap()
queue.put(preparedVops)
put_to_queue( queue, preparedVops )
num = num + 1
return num
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during fetching vops...")
set_exception_thrown()
raise
def get_from_queue( data_queue ):
while can_continue_thread():
try:
ret = data_queue.get(True, 1)
data_queue.task_done()
except queue.Empty:
continue
return ret
return []
def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size):
from hive.utils.stats import minmax
......@@ -97,36 +137,49 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = 0
time_start = OPSM.start()
rate = {}
def print_summary():
stop = OPSM.stop(time_start)
log.info("=== TOTAL STATS ===")
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times")
otm = OPSM.log_global("All operations present in the processed blocks")
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
try:
count = ubound - lbound
timer = Timer(count, entity='block', laps=['rps', 'wps'])
while lbound < ubound:
wait_time_1 = WSM.start()
if blocksQueue.empty() and CONTINUE_PROCESSING:
if blocksQueue.empty() and can_continue_thread():
log.info("Awaiting any block to process...")
blocks = []
if not blocksQueue.empty() or CONTINUE_PROCESSING:
blocks = blocksQueue.get()
blocksQueue.task_done()
if not blocksQueue.empty() or can_continue_thread():
blocks = get_from_queue( blocksQueue )
WSM.wait_stat('block_consumer_block', WSM.stop(wait_time_1))
wait_time_2 = WSM.start()
if vopsQueue.empty() and CONTINUE_PROCESSING:
if vopsQueue.empty() and can_continue_thread():
log.info("Awaiting any vops to process...")
preparedVops = []
if not vopsQueue.empty() or CONTINUE_PROCESSING:
preparedVops = vopsQueue.get()
vopsQueue.task_done()
if not vopsQueue.empty() or can_continue_thread():
preparedVops = get_from_queue(vopsQueue)
WSM.wait_stat('block_consumer_vop', WSM.stop(wait_time_2))
to = min(lbound + chunk_size, ubound)
timer.batch_start()
if not can_continue_thread():
break;
block_start = perf()
Blocks.process_multi(blocks, preparedVops, is_initial_sync)
block_end = perf()
......@@ -157,47 +210,45 @@ def _block_consumer(node, blocksQueue, vopsQueue, is_initial_sync, lbound, uboun
num = num + 1
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
if not can_continue_thread() and blocksQueue.empty() and vopsQueue.empty():
break
except KeyboardInterrupt:
log.info("Caught SIGINT")
except Exception:
log.exception("Exception caught during processing blocks...")
finally:
stop = OPSM.stop(time_start)
log.info("=== TOTAL STATS ===")
wtm = WSM.log_global("Total waiting times")
ftm = FSM.log_global("Total flush times")
otm = OPSM.log_global("All operations present in the processed blocks")
ttm = ftm + otm + wtm
log.info(f"Elapsed time: {stop :.4f}s. Calculated elapsed time: {ttm :.4f}s. Difference: {stop - ttm :.4f}s")
log.info(f"Highest block processing rate: {rate['max'] :.4f} bps. From: {rate['max_from']} To: {rate['max_to']}")
log.info(f"Lowest block processing rate: {rate['min'] :.4f} bps. From: {rate['min_from']} To: {rate['min_to']}")
log.info("=== TOTAL STATS ===")
return num
set_exception_thrown()
print_summary()
raise
print_summary()
return num
def _node_data_provider(self, is_initial_sync, lbound, ubound, chunk_size):
blocksQueue = queue.Queue(maxsize=10)
vopsQueue = queue.Queue(maxsize=10)
global CONTINUE_PROCESSING
old_sig_int_handler = signal(SIGINT, finish_signals_handler)
old_sig_term_handler = signal(SIGTERM, finish_signals_handler)
with ThreadPoolExecutor(max_workers = 4) as pool:
try:
pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
blockConsumerFuture.result()
if not CONTINUE_PROCESSING and blocksQueue.empty() and vopsQueue.empty():
pool.shutdown(False)
except KeyboardInterrupt:
log.info(""" **********************************************************
CAUGHT SIGINT. PLEASE WAIT... PROCESSING DATA IN QUEUES...
**********************************************************
""")
CONTINUE_PROCESSING = False
blocksQueue.join()
vopsQueue.join()
block_provider_future = pool.submit(_block_provider, self._steem, blocksQueue, lbound, ubound, chunk_size)
vops_provider_future = pool.submit(_vops_provider, self._conf, self._steem, vopsQueue, lbound, ubound, chunk_size)
blockConsumerFuture = pool.submit(_block_consumer, self._steem, blocksQueue, vopsQueue, is_initial_sync, lbound, ubound, chunk_size)
consumer_exception = blockConsumerFuture.exception()
block_exception = block_provider_future.exception()
vops_exception = vops_provider_future.exception()
if consumer_exception:
raise consumer_exception
if block_exception:
raise block_exception
if vops_exception:
raise vops_exception
signal(SIGINT, old_sig_int_handler)
signal(SIGTERM, old_sig_term_handler)
blocksQueue.queue.clear()
vopsQueue.queue.clear()
class Sync:
"""Manages the sync/index process.
......@@ -229,7 +280,7 @@ class Sync:
Community.start_block = self._conf.get("community_start_block")
paths = self._conf.get("mock_block_data_path")
paths = self._conf.get("mock_block_data_path") or []
for path in paths:
self.load_mock_data(path)
......@@ -264,7 +315,7 @@ class Sync:
DbState.before_initial_sync(last_imported_block, hived_head_block)
# resume initial sync
self.initial()
if not CONTINUE_PROCESSING:
if not can_continue_thread():
return
current_imported_block = Blocks.head_num()
DbState.finish_initial_sync(current_imported_block)
......@@ -287,6 +338,8 @@ class Sync:
while True:
# sync up to irreversible block
self.from_steemd()
if not can_continue_thread():
return
try:
# listen for new blocks
......@@ -301,7 +354,7 @@ class Sync:
log.info("[INIT] *** Initial fast sync ***")
self.from_steemd(is_initial_sync=True)
if not CONTINUE_PROCESSING:
if not can_continue_thread():
return
def from_steemd(self, is_initial_sync=False, chunk_size=1000):
......
import logging
from hive.indexer.db_adapter_holder import DbAdapterHolder
log = logging.getLogger(__name__)
from hive.utils.normalize import escape_characters
class Tags(DbAdapterHolder):
""" Tags cache """
_tags = []
@classmethod
def add_tag(cls, tid, tag):
""" Add tag to cache """
cls._tags.append((tid, tag))
@classmethod
def flush(cls):
""" Flush tags to table """
if cls._tags:
cls.beginTx()
limit = 1000
sql = """
INSERT INTO
hive_tag_data (tag)
VALUES {}
ON CONFLICT DO NOTHING
"""
values = []
for tag in cls._tags:
values.append("({})".format(escape_characters(tag[1])))
if len(values) >= limit:
tag_query = str(sql)
cls.db.query(tag_query.format(','.join(values)))
values.clear()
if len(values) > 0:
tag_query = str(sql)
cls.db.query(tag_query.format(','.join(values)))
values.clear()
sql = """
INSERT INTO
hive_post_tags (post_id, tag_id)
SELECT
data_source.post_id, data_source.tag_id
FROM
(
SELECT
post_id, htd.id
FROM
(
VALUES
{}
) AS T(post_id, tag)
INNER JOIN hive_tag_data htd ON htd.tag = T.tag
) AS data_source(post_id, tag_id)
ON CONFLICT DO NOTHING
"""
values = []
for tag in cls._tags:
values.append("({}, {})".format(tag[0], escape_characters(tag[1])))
if len(values) >= limit:
tag_query = str(sql)
cls.db.query(tag_query.format(','.join(values)))
values.clear()
if len(values) > 0:
tag_query = str(sql)
cls.db.query(tag_query.format(','.join(values)))
values.clear()
cls.commitTx()
n = len(cls._tags)
cls._tags.clear()
return n
......@@ -92,6 +92,7 @@ if __name__ == "__main__":
'diff-match-patch',
'prometheus-client',
'psutil',
'atomic',
],
extras_require={
'dev': [
......
Subproject commit 0f0fd1af6d7e367849a87443c0137702b135e297
Subproject commit eb454fc8250a988c1c5a4440a3d138b8f3941232