Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hive/hivemind
1 result
Show changes
Commits on Source (31)
Showing
with 549 additions and 255 deletions
...@@ -174,51 +174,131 @@ hivemind_stop_server: ...@@ -174,51 +174,131 @@ hivemind_stop_server:
tags: tags:
- hivemind - hivemind
bridge_api_smoketest_old:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_bridge_api_patterns.tavern.yaml api_smoketest_bridge_old.xml
artifacts:
reports:
junit: api_smoketest_bridge_old.xml
bridge_api_smoketest: bridge_api_smoketest:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_bridge_api_patterns.tavern.yaml api_smoketest_bridge.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" bridge_api_patterns/ api_smoketest_bridge.xml
artifacts: artifacts:
reports: reports:
junit: api_smoketest_bridge.xml junit: api_smoketest_bridge.xml
bridge_api_smoketest_negative:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" bridge_api_negative/ api_smoketest_bridge_negative.xml
artifacts:
reports:
junit: api_smoketest_bridge_negative.xml
condenser_api_smoketest_old:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_condenser_api_patterns.tavern.yaml api_smoketest_condenser_api_old.xml
artifacts:
reports:
junit: api_smoketest_condenser_api_old.xml
condenser_api_smoketest: condenser_api_smoketest:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_condenser_api_patterns.tavern.yaml api_smoketest_condenser_api.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" condenser_api_patterns/ api_smoketest_condenser_api.xml
artifacts: artifacts:
reports: reports:
junit: api_smoketest_condenser_api.xml junit: api_smoketest_condenser_api.xml
condenser_api_smoketest_negative:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" condenser_api_negative/ api_smoketest_condenser_api_negative.xml
artifacts:
reports:
junit: api_smoketest_condenser_api_negative.xml
database_api_smoketest_old:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_database_api_patterns.tavern.yaml api_smoketest_database_api_old.xml
artifacts:
reports:
junit: api_smoketest_database_api_old.xml
database_api_smoketest: database_api_smoketest:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_database_api_patterns.tavern.yaml api_smoketest_database_api.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" database_api_patterns/ api_smoketest_database_api.xml
artifacts: artifacts:
reports: reports:
junit: api_smoketest_database_api.xml junit: api_smoketest_database_api.xml
database_api_smoketest_negative:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" database_api_negative/ api_smoketest_database_api_negative.xml
artifacts:
reports:
junit: api_smoketest_database_api_negative.xml
follow_api_smoketest_old:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_follow_api_patterns.tavern.yaml api_smoketest_follow_api_old.xml
artifacts:
reports:
junit: api_smoketest_follow_api_old.xml
follow_api_smoketest: follow_api_smoketest:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_follow_api_patterns.tavern.yaml api_smoketest_follow_api.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" follow_api_patterns/ api_smoketest_follow_api.xml
artifacts: artifacts:
reports: reports:
junit: api_smoketest_follow_api.xml junit: api_smoketest_follow_api.xml
tags_api_smoketest_old:
<<: *common_api_smoketest_job
script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_tags_api_patterns.tavern.yaml api_smoketest_tags_api_old.xml
artifacts:
reports:
junit: api_smoketest_tags_api_old.xml
tags_api_smoketest: tags_api_smoketest:
<<: *common_api_smoketest_job <<: *common_api_smoketest_job
script: script:
- scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" test_tags_api_patterns.tavern.yaml api_smoketest_tags_api.xml - scripts/ci_start_api_smoketest.sh localhost "$HIVEMIND_HTTP_PORT" tags_api_patterns/ api_smoketest_tags_api.xml
artifacts: artifacts:
reports: reports:
......
...@@ -177,7 +177,11 @@ class Db: ...@@ -177,7 +177,11 @@ class Db:
try: try:
start = perf() start = perf()
query = self._sql_text(sql) query = self._sql_text(sql)
if 'log_query' in kwargs and kwargs['log_query']:
log.info("QUERY: {}".format(query))
result = self._exec(query, **kwargs) result = self._exec(query, **kwargs)
if 'log_result' in kwargs and kwargs['log_result']:
log.info("RESULT: {}".format(result))
Stats.log_db(sql, perf() - start) Stats.log_db(sql, perf() - start)
return result return result
except Exception as e: except Exception as e:
......
...@@ -128,7 +128,6 @@ class DbState: ...@@ -128,7 +128,6 @@ class DbState:
#'hive_posts_cache_ix32', # API: community created #'hive_posts_cache_ix32', # API: community created
#'hive_posts_cache_ix33', # API: community payout #'hive_posts_cache_ix33', # API: community payout
#'hive_posts_cache_ix34', # API: community muted #'hive_posts_cache_ix34', # API: community muted
'hive_accounts_ix1', # (cached_at, name)
'hive_accounts_ix5' # (cached_at, name) 'hive_accounts_ix5' # (cached_at, name)
] ]
...@@ -163,10 +162,9 @@ class DbState: ...@@ -163,10 +162,9 @@ class DbState:
except sqlalchemy.exc.ProgrammingError as ex: except sqlalchemy.exc.ProgrammingError as ex:
log.warning("Ignoring ex: {}".format(ex)) log.warning("Ignoring ex: {}".format(ex))
# TODO: #111 from hive.db.schema import drop_fk, create_fk
#for key in cls._all_foreign_keys(): log.info("Dropping FKs")
# log.info("Drop fk %s", key.name) drop_fk(cls.db())
# key.drop(engine)
log.info("[INIT] Finish pre-initial sync hooks") log.info("[INIT] Finish pre-initial sync hooks")
...@@ -244,10 +242,10 @@ class DbState: ...@@ -244,10 +242,10 @@ class DbState:
time_end = perf_counter() time_end = perf_counter()
log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start) log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start)
# TODO: #111
#for key in cls._all_foreign_keys(): log.info("Recreating FKs")
# log.info("Create fk %s", key.name) from hive.db.schema import create_fk
# key.create(engine) create_fk(cls.db())
@staticmethod @staticmethod
def status(): def status():
...@@ -297,7 +295,6 @@ class DbState: ...@@ -297,7 +295,6 @@ class DbState:
cls._set_ver(3) cls._set_ver(3)
if cls._ver == 3: if cls._ver == 3:
cls.db().query("CREATE INDEX hive_accounts_ix3 ON hive_accounts (vote_weight, name varchar_pattern_ops)")
cls._set_ver(4) cls._set_ver(4)
if cls._ver == 4: if cls._ver == 4:
......
...@@ -49,19 +49,15 @@ def build_metadata(): ...@@ -49,19 +49,15 @@ def build_metadata():
sa.Column('following', sa.Integer, nullable=False, server_default='0'), sa.Column('following', sa.Integer, nullable=False, server_default='0'),
sa.Column('proxy', VARCHAR(16), nullable=False, server_default=''), sa.Column('proxy', VARCHAR(16), nullable=False, server_default=''),
sa.Column('post_count', sa.Integer, nullable=False, server_default='0'),
sa.Column('proxy_weight', sa.Float(precision=6), nullable=False, server_default='0'), sa.Column('proxy_weight', sa.Float(precision=6), nullable=False, server_default='0'),
sa.Column('vote_weight', sa.Float(precision=6), nullable=False, server_default='0'),
sa.Column('kb_used', sa.Integer, nullable=False, server_default='0'), # deprecated sa.Column('kb_used', sa.Integer, nullable=False, server_default='0'), # deprecated
sa.Column('rank', sa.Integer, nullable=False, server_default='0'), sa.Column('rank', sa.Integer, nullable=False, server_default='0'),
sa.Column('lastread_at', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), sa.Column('lastread_at', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('active_at', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('cached_at', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'), sa.Column('cached_at', sa.DateTime, nullable=False, server_default='1970-01-01 00:00:00'),
sa.Column('raw_json', sa.Text), sa.Column('raw_json', sa.Text),
sa.UniqueConstraint('name', name='hive_accounts_ux1'), sa.UniqueConstraint('name', name='hive_accounts_ux1'),
sa.Index('hive_accounts_ix1', 'vote_weight'), # core: quick ranks
sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep sa.Index('hive_accounts_ix5', 'cached_at'), # core/listen sweep
) )
...@@ -151,8 +147,8 @@ def build_metadata(): ...@@ -151,8 +147,8 @@ def build_metadata():
'hive_post_data', metadata, 'hive_post_data', metadata,
sa.Column('id', sa.Integer, primary_key=True, autoincrement=False), sa.Column('id', sa.Integer, primary_key=True, autoincrement=False),
sa.Column('title', VARCHAR(512), nullable=False, server_default=''), sa.Column('title', VARCHAR(512), nullable=False, server_default=''),
sa.Column('preview', VARCHAR(1024), nullable=False, server_default=''), sa.Column('preview', VARCHAR(1024), nullable=False, server_default=''), # first 1k of 'body'
sa.Column('img_url', VARCHAR(1024), nullable=False, server_default=''), sa.Column('img_url', VARCHAR(1024), nullable=False, server_default=''), # first 'image' from 'json'
sa.Column('body', TEXT, nullable=False, server_default=''), sa.Column('body', TEXT, nullable=False, server_default=''),
sa.Column('json', TEXT, nullable=False, server_default='') sa.Column('json', TEXT, nullable=False, server_default='')
) )
...@@ -187,11 +183,11 @@ def build_metadata(): ...@@ -187,11 +183,11 @@ def build_metadata():
sa.PrimaryKeyConstraint('author_id', 'permlink_id', 'voter_id', name='hive_votes_pk'), sa.PrimaryKeyConstraint('author_id', 'permlink_id', 'voter_id', name='hive_votes_pk'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_votes_fk1'),
sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['voter_id'], ['hive_accounts.id'], name='hive_votes_fk2'),
sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id']), sa.ForeignKeyConstraint(['author_id'], ['hive_accounts.id'], name='hive_votes_fk3'),
sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id']), sa.ForeignKeyConstraint(['permlink_id'], ['hive_permlink_data.id'], name='hive_votes_fk4'),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num']), sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_votes_fk5'),
sa.Index('hive_votes_post_id_idx', 'post_id'), sa.Index('hive_votes_post_id_idx', 'post_id'),
sa.Index('hive_votes_voter_id_idx', 'voter_id'), sa.Index('hive_votes_voter_id_idx', 'voter_id'),
...@@ -210,8 +206,8 @@ def build_metadata(): ...@@ -210,8 +206,8 @@ def build_metadata():
sa.Column('post_id', sa.Integer, nullable=False), sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('tag_id', sa.Integer, nullable=False), sa.Column('tag_id', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'), sa.PrimaryKeyConstraint('post_id', 'tag_id', name='hive_post_tags_pk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id']), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_post_tags_fk1'),
sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id']), sa.ForeignKeyConstraint(['tag_id'], ['hive_tag_data.id'], name='hive_post_tags_fk2'),
sa.Index('hive_post_tags_post_id_idx', 'post_id'), sa.Index('hive_post_tags_post_id_idx', 'post_id'),
sa.Index('hive_post_tags_tag_id_idx', 'tag_id') sa.Index('hive_post_tags_tag_id_idx', 'tag_id')
) )
...@@ -224,10 +220,13 @@ def build_metadata(): ...@@ -224,10 +220,13 @@ def build_metadata():
sa.Column('created_at', sa.DateTime, nullable=False), sa.Column('created_at', sa.DateTime, nullable=False),
sa.Column('blacklisted', sa.Boolean, nullable=False, server_default='0'), sa.Column('blacklisted', sa.Boolean, nullable=False, server_default='0'),
sa.Column('follow_blacklists', sa.Boolean, nullable=False, server_default='0'), sa.Column('follow_blacklists', sa.Boolean, nullable=False, server_default='0'),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.PrimaryKeyConstraint('following', 'follower', name='hive_follows_pk'), # core sa.PrimaryKeyConstraint('following', 'follower', name='hive_follows_pk'), # core
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_follows_fk1'),
sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'), sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'),
sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'), sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'),
sa.Index('hive_follows_block_num_idx', 'block_num')
) )
sa.Table( sa.Table(
...@@ -235,12 +234,15 @@ def build_metadata(): ...@@ -235,12 +234,15 @@ def build_metadata():
sa.Column('account', VARCHAR(16), nullable=False), sa.Column('account', VARCHAR(16), nullable=False),
sa.Column('post_id', sa.Integer, nullable=False), sa.Column('post_id', sa.Integer, nullable=False),
sa.Column('created_at', sa.DateTime, nullable=False), sa.Column('created_at', sa.DateTime, nullable=False),
sa.Column('block_num', sa.Integer, nullable=False ),
sa.ForeignKeyConstraint(['account'], ['hive_accounts.name'], name='hive_reblogs_fk1'), sa.ForeignKeyConstraint(['account'], ['hive_accounts.name'], name='hive_reblogs_fk1'),
sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_reblogs_fk2'), sa.ForeignKeyConstraint(['post_id'], ['hive_posts.id'], name='hive_reblogs_fk2'),
sa.ForeignKeyConstraint(['block_num'], ['hive_blocks.num'], name='hive_reblogs_fk3'),
sa.PrimaryKeyConstraint('account', 'post_id', name='hive_reblogs_pk'), # core sa.PrimaryKeyConstraint('account', 'post_id', name='hive_reblogs_pk'), # core
sa.Index('hive_reblogs_account', 'account'), sa.Index('hive_reblogs_account', 'account'),
sa.Index('hive_reblogs_post_id', 'post_id'), sa.Index('hive_reblogs_post_id', 'post_id'),
sa.Index('hive_reblogs_block_num_idx', 'block_num')
) )
sa.Table( sa.Table(
...@@ -375,6 +377,24 @@ def teardown(db): ...@@ -375,6 +377,24 @@ def teardown(db):
"""Drop all tables""" """Drop all tables"""
build_metadata().drop_all(db.engine()) build_metadata().drop_all(db.engine())
def drop_fk(db):
db.query_no_return("START TRANSACTION")
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
sql = """ALTER TABLE {} DROP CONSTRAINT {}""".format(table.name, fk.name)
db.query_no_return(sql)
db.query_no_return("COMMIT")
def create_fk(db):
from sqlalchemy.schema import AddConstraint
from sqlalchemy import text
connection = db.engine().connect()
connection.execute(text("START TRANSACTION"))
for table in build_metadata().sorted_tables:
for fk in table.foreign_keys:
connection.execute(AddConstraint(fk.constraint))
connection.execute(text("COMMIT"))
def setup(db): def setup(db):
"""Creates all tables and seed data""" """Creates all tables and seed data"""
# initialize schema # initialize schema
...@@ -554,6 +574,74 @@ def setup(db): ...@@ -554,6 +574,74 @@ def setup(db):
""" """
db.query_no_return(sql) db.query_no_return(sql)
# In original hivemind, a value of 'active_at' was calculated from
# max
# {
# created ( account_create_operation ),
# last_account_update ( account_update_operation/account_update2_operation ),
# last_post ( comment_operation - only creation )
# last_root_post ( comment_operation - only creation + only ROOT ),
# last_vote_time ( vote_operation )
# }
# In order to simplify calculations, `last_account_update` is not taken into consideration, because this updating accounts is very rare
# and posting/voting after an account updating, fixes `active_at` value immediately.
sql = """
DROP VIEW IF EXISTS public.hive_accounts_info_view;
CREATE OR REPLACE VIEW public.hive_accounts_info_view
AS
SELECT
id,
name,
(
select count(*) post_count
FROM hive_posts hp
WHERE ha.id=hp.author_id
) post_count,
created_at,
(
SELECT GREATEST
(
created_at,
COALESCE(
(
select max(hp.created_at)
FROM hive_posts hp
WHERE ha.id=hp.author_id
),
'1970-01-01 00:00:00.0'
),
COALESCE(
(
select max(hv.last_update)
from hive_votes hv
WHERE ha.id=hv.voter_id
),
'1970-01-01 00:00:00.0'
)
)
) active_at,
display_name,
about,
reputation,
profile_image,
location,
website,
cover_image,
rank,
following,
followers,
proxy,
proxy_weight,
lastread_at,
cached_at,
raw_json
FROM
hive_accounts ha
"""
db.query_no_return(sql)
sql = """ sql = """
DROP VIEW IF EXISTS public.hive_posts_view; DROP VIEW IF EXISTS public.hive_posts_view;
...@@ -761,21 +849,31 @@ def setup(db): ...@@ -761,21 +849,31 @@ def setup(db):
db.query_no_return(sql) db.query_no_return(sql)
sql = """ sql = """
DROP FUNCTION IF EXISTS find_comment_id(character varying, character varying) DROP FUNCTION IF EXISTS find_comment_id(character varying, character varying, boolean)
; ;
CREATE OR REPLACE FUNCTION find_comment_id( CREATE OR REPLACE FUNCTION find_comment_id(
in _author hive_accounts.name%TYPE, in _author hive_accounts.name%TYPE,
in _permlink hive_permlink_data.permlink%TYPE) in _permlink hive_permlink_data.permlink%TYPE,
RETURNS INT AS in _check boolean)
RETURNS INT
LANGUAGE 'plpgsql'
AS
$function$ $function$
SELECT COALESCE( (SELECT hp.id DECLARE
FROM hive_posts hp post_id INT;
JOIN hive_accounts ha ON ha.id = hp.author_id BEGIN
JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id SELECT INTO post_id COALESCE( (SELECT hp.id
WHERE ha.name = _author AND hpd.permlink = _permlink AND hp.counter_deleted = 0 FROM hive_posts hp
), 0 ); JOIN hive_accounts ha ON ha.id = hp.author_id
JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id
WHERE ha.name = _author AND hpd.permlink = _permlink AND hp.counter_deleted = 0
), 0 );
IF _check AND (_author <> '' OR _permlink <> '') AND post_id = 0 THEN
RAISE EXCEPTION 'Post %/% does not exist', _author, _permlink;
END IF;
RETURN post_id;
END
$function$ $function$
LANGUAGE sql
; ;
""" """
db.query_no_return(sql) db.query_no_return(sql)
...@@ -839,7 +937,7 @@ def setup(db): ...@@ -839,7 +937,7 @@ def setup(db):
DECLARE DECLARE
__post_id INT; __post_id INT;
BEGIN BEGIN
__post_id = find_comment_id(_author,_permlink); __post_id = find_comment_id(_author,_permlink, False);
RETURN QUERY RETURN QUERY
SELECT SELECT
hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body, hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body,
...@@ -919,8 +1017,8 @@ def setup(db): ...@@ -919,8 +1017,8 @@ def setup(db):
__root_id INT; __root_id INT;
__post_id INT; __post_id INT;
BEGIN BEGIN
__root_id = find_comment_id(_root_author,_root_permlink); __root_id = find_comment_id(_root_author, _root_permlink, True);
__post_id = find_comment_id(_start_post_author,_start_post_permlink); __post_id = find_comment_id(_start_post_author, _start_post_permlink, True);
RETURN QUERY RETURN QUERY
SELECT SELECT
hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body, hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body,
...@@ -959,11 +1057,10 @@ def setup(db): ...@@ -959,11 +1057,10 @@ def setup(db):
in _limit INT) in _limit INT)
RETURNS SETOF database_api_post RETURNS SETOF database_api_post
LANGUAGE sql LANGUAGE sql
COST 100 COST 100
STABLE STABLE
ROWS 1000 ROWS 1000
AS $BODY$ AS $function$
SELECT SELECT
hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body, hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body,
hp.category, hp.depth, hp.promoted, hp.payout, hp.last_payout_at, hp.cashout_time, hp.is_paidout, hp.category, hp.depth, hp.promoted, hp.payout, hp.last_payout_at, hp.cashout_time, hp.is_paidout,
...@@ -982,7 +1079,7 @@ def setup(db): ...@@ -982,7 +1079,7 @@ def setup(db):
WHERE WHERE
h.parent_author > _parent_author OR h.parent_author > _parent_author OR
h.parent_author = _parent_author AND ( h.parent_permlink_or_category > _parent_permlink OR h.parent_author = _parent_author AND ( h.parent_permlink_or_category > _parent_permlink OR
h.parent_permlink_or_category = _parent_permlink AND h.id >= find_comment_id(_start_post_author,_start_post_permlink) ) h.parent_permlink_or_category = _parent_permlink AND h.id >= find_comment_id(_start_post_author, _start_post_permlink, True) )
ORDER BY ORDER BY
h.parent_author ASC, h.parent_author ASC,
h.parent_permlink_or_category ASC, h.parent_permlink_or_category ASC,
...@@ -993,8 +1090,8 @@ def setup(db): ...@@ -993,8 +1090,8 @@ def setup(db):
WHERE WHERE
NOT hp.is_muted NOT hp.is_muted
; ;
$BODY$; $function$
; ;
DROP FUNCTION IF EXISTS list_comments_by_last_update(character varying, timestamp, character varying, character varying, int) DROP FUNCTION IF EXISTS list_comments_by_last_update(character varying, timestamp, character varying, character varying, int)
; ;
...@@ -1010,7 +1107,7 @@ def setup(db): ...@@ -1010,7 +1107,7 @@ def setup(db):
DECLARE DECLARE
__post_id INT; __post_id INT;
BEGIN BEGIN
__post_id = find_comment_id(_start_post_author,_start_post_permlink); __post_id = find_comment_id(_start_post_author, _start_post_permlink, True);
RETURN QUERY RETURN QUERY
SELECT SELECT
hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body, hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body,
...@@ -1054,7 +1151,7 @@ def setup(db): ...@@ -1054,7 +1151,7 @@ def setup(db):
DECLARE DECLARE
__post_id INT; __post_id INT;
BEGIN BEGIN
__post_id = find_comment_id(_start_post_author,_start_post_permlink); __post_id = find_comment_id(_start_post_author, _start_post_permlink, True);
RETURN QUERY RETURN QUERY
SELECT SELECT
hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body, hp.id, hp.community_id, hp.author, hp.permlink, hp.title, hp.body,
......
...@@ -59,7 +59,7 @@ class Accounts: ...@@ -59,7 +59,7 @@ class Accounts:
def get_id(cls, name): def get_id(cls, name):
"""Get account id by name. Throw if not found.""" """Get account id by name. Throw if not found."""
assert isinstance(name, str), "account name should be string" assert isinstance(name, str), "account name should be string"
assert name in cls._ids, "account does not exist or was not registered" assert name in cls._ids, 'Account \'%s\' does not exist' % name
return cls._ids[name] return cls._ids[name]
@classmethod @classmethod
...@@ -140,13 +140,6 @@ class Accounts: ...@@ -140,13 +140,6 @@ class Accounts:
cls._cache_accounts(accounts, steem, trx=trx) cls._cache_accounts(accounts, steem, trx=trx)
return count return count
@classmethod
def fetch_ranks(cls):
"""Rebuild account ranks and store in memory for next update."""
sql = "SELECT id FROM hive_accounts ORDER BY vote_weight DESC"
for rank, _id in enumerate(DB.query_col(sql)):
cls._ranks[_id] = rank + 1
@classmethod @classmethod
def _cache_accounts(cls, accounts, steem, trx=True): def _cache_accounts(cls, accounts, steem, trx=True):
"""Fetch all `accounts` and write to db.""" """Fetch all `accounts` and write to db."""
...@@ -170,9 +163,10 @@ class Accounts: ...@@ -170,9 +163,10 @@ class Accounts:
"""Prepare a SQL query from a steemd account.""" """Prepare a SQL query from a steemd account."""
vests = vests_amount(account['vesting_shares']) vests = vests_amount(account['vesting_shares'])
vote_weight = (vests #Not used. The member `vote_weight` from `hive_accounts` is removed.
+ vests_amount(account['received_vesting_shares']) # vote_weight = (vests
- vests_amount(account['delegated_vesting_shares'])) # + vests_amount(account['received_vesting_shares'])
# - vests_amount(account['delegated_vesting_shares']))
proxy_weight = 0 if account['proxy'] else float(vests) proxy_weight = 0 if account['proxy'] else float(vests)
for satoshis in account['proxied_vsf_votes']: for satoshis in account['proxied_vsf_votes']:
...@@ -190,21 +184,12 @@ class Accounts: ...@@ -190,21 +184,12 @@ class Accounts:
del account['json_metadata'] del account['json_metadata']
del account['posting_json_metadata'] del account['posting_json_metadata']
active_at = max(account['created'],
account['last_account_update'],
account['last_post'],
account['last_root_post'],
account['last_vote_time'])
values = { values = {
'name': account['name'], 'name': account['name'],
'created_at': account['created'], 'created_at': account['created'],
'proxy': account['proxy'], 'proxy': account['proxy'],
'post_count': account['post_count'],
'reputation': rep_log10(account['reputation']), 'reputation': rep_log10(account['reputation']),
'proxy_weight': proxy_weight, 'proxy_weight': proxy_weight,
'vote_weight': vote_weight,
'active_at': active_at,
'cached_at': cached_at, 'cached_at': cached_at,
'display_name': profile['name'], 'display_name': profile['name'],
......
"""Blocks processor.""" """Blocks processor."""
from hive.indexer.reblog import Reblog
import logging import logging
import json
from hive.db.adapter import Db from hive.db.adapter import Db
...@@ -62,6 +62,7 @@ class Blocks: ...@@ -62,6 +62,7 @@ class Blocks:
Tags.flush() Tags.flush()
Votes.flush() Votes.flush()
Posts.flush() Posts.flush()
Reblog.flush()
block_num = int(block['block_id'][:8], base=16) block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num ) cls.on_live_blocks_processed( block_num, block_num )
time_end = perf_counter() time_end = perf_counter()
...@@ -103,6 +104,7 @@ class Blocks: ...@@ -103,6 +104,7 @@ class Blocks:
folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False) folllow_items = len(Follow.follow_items_to_flush) + Follow.flush(trx=False)
flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush()) flush_time = register_time(flush_time, "Posts", Posts.flush())
flush_time = register_time(flush_time, "Reblog", Reblog.flush())
if (not is_initial_sync) and (first_block > -1): if (not is_initial_sync) and (first_block > -1):
cls.on_live_blocks_processed( first_block, last_num ) cls.on_live_blocks_processed( first_block, last_num )
...@@ -113,8 +115,6 @@ class Blocks: ...@@ -113,8 +115,6 @@ class Blocks:
@staticmethod @staticmethod
def prepare_vops(comment_payout_ops, vopsList, date, block_num): def prepare_vops(comment_payout_ops, vopsList, date, block_num):
vote_ops = {}
ineffective_deleted_ops = {} ineffective_deleted_ops = {}
registered_ops_stats = [ 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation', 'comment_payout_update_operation', 'ineffective_delete_comment_operation'] registered_ops_stats = [ 'author_reward_operation', 'comment_reward_operation', 'effective_comment_vote_operation', 'comment_payout_update_operation', 'ineffective_delete_comment_operation']
...@@ -143,8 +143,7 @@ class Blocks: ...@@ -143,8 +143,7 @@ class Blocks:
comment_payout_ops[key][op_type] = ( op_value, date ) comment_payout_ops[key][op_type] = ( op_value, date )
elif op_type == 'effective_comment_vote_operation': elif op_type == 'effective_comment_vote_operation':
key_vote = "{}/{}/{}".format(op_value['voter'], op_value['author'], op_value['permlink']) Votes.effective_comment_vote_op( op_value )
vote_ops[ key_vote ] = op_value
if key not in comment_payout_ops: if key not in comment_payout_ops:
comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None } comment_payout_ops[key] = { 'author_reward_operation':None, 'comment_reward_operation':None, 'effective_comment_vote_operation':None, 'comment_payout_update_operation':None }
...@@ -163,7 +162,7 @@ class Blocks: ...@@ -163,7 +162,7 @@ class Blocks:
if op_type in registered_ops_stats: if op_type in registered_ops_stats:
OPSM.op_stats(op_type, OPSM.stop(start)) OPSM.op_stats(op_type, OPSM.stop(start))
return (vote_ops, ineffective_deleted_ops) return ineffective_deleted_ops
@classmethod @classmethod
...@@ -181,16 +180,15 @@ class Blocks: ...@@ -181,16 +180,15 @@ class Blocks:
if cls._head_block_date is None: if cls._head_block_date is None:
cls._head_block_date = cls._current_block_date cls._head_block_date = cls._current_block_date
vote_ops = None
comment_payout_stats = None comment_payout_stats = None
ineffective_deleted_ops = None ineffective_deleted_ops = None
if is_initial_sync: if is_initial_sync:
if num in virtual_operations: if num in virtual_operations:
(vote_ops, ineffective_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num) ineffective_deleted_ops = Blocks.prepare_vops(Posts.comment_payout_ops, virtual_operations[num], cls._current_block_date, num)
else: else:
vops = hived.get_virtual_operations(num) vops = hived.get_virtual_operations(num)
(vote_ops, ineffective_deleted_ops ) = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num) ineffective_deleted_ops = Blocks.prepare_vops(Posts.comment_payout_ops, vops, cls._current_block_date, num)
json_ops = [] json_ops = []
for tx_idx, tx in enumerate(block['transactions']): for tx_idx, tx in enumerate(block['transactions']):
...@@ -255,10 +253,6 @@ class Blocks: ...@@ -255,10 +253,6 @@ class Blocks:
if json_ops: if json_ops:
CustomOp.process_ops(json_ops, num, cls._head_block_date) CustomOp.process_ops(json_ops, num, cls._head_block_date)
if vote_ops is not None:
for k, v in vote_ops.items():
Votes.effective_comment_vote_op(k, v)
cls._head_block_date = cls._current_block_date cls._head_block_date = cls._current_block_date
return num return num
...@@ -403,6 +397,6 @@ class Blocks: ...@@ -403,6 +397,6 @@ class Blocks:
update_hot_and_tranding_for_block_range( first_block, last_block ) update_hot_and_tranding_for_block_range( first_block, last_block )
update_active_starting_from_posts_on_block( first_block, last_block ) update_active_starting_from_posts_on_block( first_block, last_block )
DB.query("SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block)) DB.query_no_return("SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block))
DB.query("SELECT update_hive_posts_root_id({},{})".format(first_block, last_block)) DB.query_no_return("SELECT update_hive_posts_root_id({},{})".format(first_block, last_block))
DB.query("SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block)) DB.query_no_return("SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block))
...@@ -461,7 +461,7 @@ class CommunityOp: ...@@ -461,7 +461,7 @@ class CommunityOp:
_name = read_key_str(self.op, 'community', 16) _name = read_key_str(self.op, 'community', 16)
assert _name, 'must name a community' assert _name, 'must name a community'
_id = Community.validated_id(_name) _id = Community.validated_id(_name)
assert _id, 'community `%s` does not exist' % _name assert _id, 'Community \'%s\' does not exist' % _name
self.community = _name self.community = _name
self.community_id = _id self.community_id = _id
......
"""Main custom_json op handler.""" """Main custom_json op handler."""
import logging import logging
from funcy.seqs import first, second from funcy.seqs import first, second
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.posts import Posts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.follow import Follow from hive.indexer.follow import Follow
from hive.indexer.reblog import Reblog
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
from hive.indexer.community import process_json_community_op, START_BLOCK from hive.indexer.community import process_json_community_op, START_BLOCK
...@@ -44,7 +42,7 @@ class CustomOp: ...@@ -44,7 +42,7 @@ class CustomOp:
"""Given a list of operation in block, filter and process them.""" """Given a list of operation in block, filter and process them."""
for op in ops: for op in ops:
start = OPSM.start() start = OPSM.start()
opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify'] else '' ) opName = str(op['id']) + ( '-ignored' if op['id'] not in ['follow', 'community', 'notify', 'reblog'] else '' )
account = _get_auth(op) account = _get_auth(op)
if not account: if not account:
...@@ -54,13 +52,16 @@ class CustomOp: ...@@ -54,13 +52,16 @@ class CustomOp:
if op['id'] == 'follow': if op['id'] == 'follow':
if block_num < 6000000 and not isinstance(op_json, list): if block_num < 6000000 and not isinstance(op_json, list):
op_json = ['follow', op_json] # legacy compat op_json = ['follow', op_json] # legacy compat
cls._process_legacy(account, op_json, block_date) cls._process_legacy(account, op_json, block_date, block_num)
elif op['id'] == 'reblog':
if block_num < 6000000 and not isinstance(op_json, list):
op_json = ['reblog', op_json] # legacy compat
cls._process_legacy(account, op_json, block_date, block_num)
elif op['id'] == 'community': elif op['id'] == 'community':
if block_num > START_BLOCK: if block_num > START_BLOCK:
process_json_community_op(account, op_json, block_date) process_json_community_op(account, op_json, block_date)
elif op['id'] == 'notify': elif op['id'] == 'notify':
cls._process_notify(account, op_json, block_date) cls._process_notify(account, op_json, block_date)
OPSM.op_stats(opName, OPSM.stop(start)) OPSM.op_stats(opName, OPSM.stop(start))
@classmethod @classmethod
...@@ -81,7 +82,7 @@ class CustomOp: ...@@ -81,7 +82,7 @@ class CustomOp:
log.warning("notify op fail: %s in %s", e, op_json) log.warning("notify op fail: %s in %s", e, op_json)
@classmethod @classmethod
def _process_legacy(cls, account, op_json, block_date): def _process_legacy(cls, account, op_json, block_date, block_num):
"""Handle legacy 'follow' plugin ops (follow/mute/clear, reblog) """Handle legacy 'follow' plugin ops (follow/mute/clear, reblog)
follow {follower: {type: 'account'}, follow {follower: {type: 'account'},
...@@ -103,70 +104,6 @@ class CustomOp: ...@@ -103,70 +104,6 @@ class CustomOp:
cmd, op_json = op_json # ['follow', {data...}] cmd, op_json = op_json # ['follow', {data...}]
if cmd == 'follow': if cmd == 'follow':
Follow.follow_op(account, op_json, block_date) Follow.follow_op(account, op_json, block_date, block_num)
elif cmd == 'reblog': elif cmd == 'reblog':
cls.reblog(account, op_json, block_date) Reblog.reblog_op(account, op_json, block_date, block_num)
@classmethod
def reblog(cls, account, op_json, block_date):
"""Handle legacy 'reblog' op"""
if ('account' not in op_json
or 'author' not in op_json
or 'permlink' not in op_json):
return
blogger = op_json['account']
author = op_json['author']
permlink = op_json['permlink']
if blogger != account:
return # impersonation
if not all(map(Accounts.exists, [author, blogger])):
return
if 'delete' in op_json and op_json['delete'] == 'delete':
sql = """
WITH processing_set AS (
SELECT hp.id as post_id, ha.id as account_id
FROM hive_posts hp
INNER JOIN hive_accounts ha ON hp.author_id = ha.id
INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0
)
DELETE FROM hive_reblogs AS hr
WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps)
RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id
"""
row = DB.query_row(sql, a=blogger, permlink=permlink)
if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink)
return
if not DbState.is_initial_sync():
result = dict(row)
FeedCache.delete(result['post_id'], result['account_id'])
else:
sql = """
INSERT INTO hive_reblogs (account, post_id, created_at)
SELECT ha.name, hp.id as post_id, :date
FROM hive_accounts ha
INNER JOIN hive_posts hp ON hp.author_id = ha.id
INNER JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id
WHERE ha.name = :a AND hpd.permlink = :p
ON CONFLICT (account, post_id) DO NOTHING
RETURNING post_id
"""
row = DB.query_row(sql, a=blogger, p=permlink, date=block_date)
if not DbState.is_initial_sync():
author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger)
if row is not None:
result = dict(row)
post_id = result['post_id']
FeedCache.insert(post_id, blogger_id, block_date)
Notify('reblog', src_id=blogger_id, dst_id=author_id,
post_id=post_id, when=block_date,
score=Accounts.default_score(blogger)).write()
else:
log.error("Error in reblog: row is None!")
...@@ -17,7 +17,7 @@ FOLLOWERS = 'followers' ...@@ -17,7 +17,7 @@ FOLLOWERS = 'followers'
FOLLOWING = 'following' FOLLOWING = 'following'
FOLLOW_ITEM_INSERT_QUERY = """ FOLLOW_ITEM_INSERT_QUERY = """
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists) INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num)
VALUES VALUES
( (
:flr, :flr,
...@@ -35,7 +35,8 @@ FOLLOW_ITEM_INSERT_QUERY = """ ...@@ -35,7 +35,8 @@ FOLLOW_ITEM_INSERT_QUERY = """
WHEN 4 THEN TRUE WHEN 4 THEN TRUE
ELSE TRUE ELSE TRUE
END END
) ),
:block_num
) )
ON CONFLICT (follower, following) DO UPDATE ON CONFLICT (follower, following) DO UPDATE
SET SET
...@@ -71,11 +72,12 @@ class Follow: ...@@ -71,11 +72,12 @@ class Follow:
follow_items_to_flush = dict() follow_items_to_flush = dict()
@classmethod @classmethod
def follow_op(cls, account, op_json, date): def follow_op(cls, account, op_json, date, block_num):
"""Process an incoming follow op.""" """Process an incoming follow op."""
op = cls._validated_op(account, op_json, date) op = cls._validated_op(account, op_json, date)
if not op: if not op:
return return
op['block_num'] = block_num
# perform delta check # perform delta check
new_state = op['state'] new_state = op['state']
...@@ -94,7 +96,8 @@ class Follow: ...@@ -94,7 +96,8 @@ class Follow:
flr=op['flr'], flr=op['flr'],
flg=op['flg'], flg=op['flg'],
state=op['state'], state=op['state'],
at=op['at']) at=op['at'],
block_num=op['block_num'])
else: else:
old_state = cls._get_follow_db_state(op['flr'], op['flg']) old_state = cls._get_follow_db_state(op['flr'], op['flg'])
...@@ -171,7 +174,7 @@ class Follow: ...@@ -171,7 +174,7 @@ class Follow:
@classmethod @classmethod
def _flush_follow_items(cls): def _flush_follow_items(cls):
sql_prefix = """ sql_prefix = """
INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists) INSERT INTO hive_follows as hf (follower, following, created_at, state, blacklisted, follow_blacklists, block_num)
VALUES """ VALUES """
sql_postfix = """ sql_postfix = """
...@@ -198,20 +201,22 @@ class Follow: ...@@ -198,20 +201,22 @@ class Follow:
count = 0 count = 0
for _, follow_item in cls.follow_items_to_flush.items(): for _, follow_item in cls.follow_items_to_flush.items():
if count < limit: if count < limit:
values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'], values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'], follow_item['at'], follow_item['state'],
follow_item['state'] == 3, follow_item['state'] == 3,
follow_item['state'] == 4)) follow_item['state'] == 4,
follow_item['block_num']))
count = count + 1 count = count + 1
else: else:
query = sql_prefix + ",".join(values) query = sql_prefix + ",".join(values)
query += sql_postfix query += sql_postfix
DB.query(query) DB.query(query)
values.clear() values.clear()
values.append("({}, {}, '{}', {}, {}, {})".format(follow_item['flr'], follow_item['flg'], values.append("({}, {}, '{}', {}, {}, {}, {})".format(follow_item['flr'], follow_item['flg'],
follow_item['at'], follow_item['state'], follow_item['at'], follow_item['state'],
follow_item['state'] == 3, follow_item['state'] == 3,
follow_item['state'] == 4)) follow_item['state'] == 4,
follow_item['block_num']))
count = 1 count = 1
if len(values) > 0: if len(values) > 0:
......
...@@ -15,9 +15,16 @@ class PostDataCache(object): ...@@ -15,9 +15,16 @@ class PostDataCache(object):
return pid in cls._data return pid in cls._data
@classmethod @classmethod
def add_data(cls, pid, post_data, print_query = False): def add_data(cls, pid, post_data, is_new_post):
""" Add data to cache """ """ Add data to cache """
cls._data[pid] = post_data if not cls.is_cached(pid):
cls._data[pid] = post_data
cls._data[pid]['is_new_post'] = is_new_post
else:
assert not is_new_post
for k, data in post_data.items():
if data is not None:
cls._data[pid][k] = data
@classmethod @classmethod
def get_post_body(cls, pid): def get_post_body(cls, pid):
...@@ -36,37 +43,53 @@ class PostDataCache(object): ...@@ -36,37 +43,53 @@ class PostDataCache(object):
def flush(cls, print_query = False): def flush(cls, print_query = False):
""" Flush data from cache to db """ """ Flush data from cache to db """
if cls._data: if cls._data:
sql = """ values_insert = []
INSERT INTO values_update = []
hive_post_data (id, title, preview, img_url, body, json)
VALUES
"""
values = []
for k, data in cls._data.items(): for k, data in cls._data.items():
title = "''" if not data['title'] else "{}".format(escape_characters(data['title'])) title = 'NULL' if data['title'] is None else "{}".format(escape_characters(data['title']))
preview = "''" if not data['preview'] else "{}".format(escape_characters(data['preview'])) body = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body']))
img_url = "''" if not data['img_url'] else "{}".format(escape_characters(data['img_url'])) preview = 'NULL' if data['body'] is None else "{}".format(escape_characters(data['body'][0:1024]))
body = "''" if not data['body'] else "{}".format(escape_characters(data['body'])) json = 'NULL' if data['json'] is None else "{}".format(escape_characters(data['json']))
json = "'{}'" if not data['json'] else "{}".format(escape_characters(data['json'])) img_url = 'NULL' if data['img_url'] is None else "{}".format(escape_characters(data['img_url']))
values.append("({},{},{},{},{},{})".format(k, title, preview, img_url, body, json)) value = "({},{},{},{},{},{})".format(k, title, preview, img_url, body, json)
sql += ','.join(values) if data['is_new_post']:
sql += """ values_insert.append(value)
ON CONFLICT (id) else:
DO values_update.append(value)
UPDATE SET
title = EXCLUDED.title, if values_insert:
preview = EXCLUDED.preview, sql = """
img_url = EXCLUDED.img_url, INSERT INTO
body = EXCLUDED.body, hive_post_data (id, title, preview, img_url, body, json)
json = EXCLUDED.json VALUES
WHERE """
hive_post_data.id = EXCLUDED.id sql += ','.join(values_insert)
""" if print_query:
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
if(print_query): if values_update:
log.info("Executing query:\n{}".format(sql)) sql = """
UPDATE hive_post_data AS hpd SET
title = COALESCE( data_source.title, hpd.title ),
preview = COALESCE( data_source.preview, hpd.preview ),
img_url = COALESCE( data_source.img_url, hpd.img_url ),
body = COALESCE( data_source.body, hpd.body ),
json = COALESCE( data_source.json, hpd.json )
FROM
( SELECT * FROM
( VALUES
"""
sql += ','.join(values_update)
sql += """
) AS T(id, title, preview, img_url, body, json)
) AS data_source
WHERE hpd.id = data_source.id
"""
if print_query:
log.info("Executing query:\n{}".format(sql))
DB.query(sql)
DB.query(sql)
n = len(cls._data.keys()) n = len(cls._data.keys())
cls._data.clear() cls._data.clear()
return n return n
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
import logging import logging
import collections import collections
from json import dumps, loads from ujson import dumps, loads
from diff_match_patch import diff_match_patch from diff_match_patch import diff_match_patch
...@@ -16,7 +16,7 @@ from hive.indexer.community import Community, START_DATE ...@@ -16,7 +16,7 @@ from hive.indexer.community import Community, START_DATE
from hive.indexer.notify import Notify from hive.indexer.notify import Notify
from hive.indexer.post_data_cache import PostDataCache from hive.indexer.post_data_cache import PostDataCache
from hive.indexer.tags import Tags from hive.indexer.tags import Tags
from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive from hive.utils.normalize import sbd_amount, legacy_amount, asset_to_hbd_hive, safe_img_url
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
DB = Db.instance() DB = Db.instance()
...@@ -103,23 +103,6 @@ class Posts: ...@@ -103,23 +103,6 @@ class Posts:
cls._set_id(op['author']+'/'+op['permlink'], result['id']) cls._set_id(op['author']+'/'+op['permlink'], result['id'])
if result['is_new_post']:
# add content data to hive_post_data
post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "", body=op['body'],
json=op['json_metadata'] if op['json_metadata'] else '{}')
else:
# edit case. Now we need to (potentially) apply patch to the post body.
new_body = cls._merge_post_body(id=result['id'], new_body_def=op['body'])
post_data = dict(title=op['title'], preview=op['preview'] if 'preview' in op else "",
img_url=op['img_url'] if 'img_url' in op else "", body=new_body,
json=op['json_metadata'] if op['json_metadata'] else '{}')
# log.info("Adding author: {} permlink: {}".format(op['author'], op['permlink']))
printQuery = False # op['author'] == 'xeroc' and op['permlink'] == 're-piston-20160818t080811'
PostDataCache.add_data(result['id'], post_data, printQuery)
md = {} md = {}
# At least one case where jsonMetadata was double-encoded: condenser#895 # At least one case where jsonMetadata was double-encoded: condenser#895
# jsonMetadata = JSON.parse(jsonMetadata); # jsonMetadata = JSON.parse(jsonMetadata);
...@@ -130,6 +113,34 @@ class Posts: ...@@ -130,6 +113,34 @@ class Posts:
except Exception: except Exception:
pass pass
img_url = None
if 'image' in md:
img_url = md['image']
if isinstance(img_url, list) and img_url:
img_url = img_url[0]
if img_url:
img_url = safe_img_url(img_url)
is_new_post = result['is_new_post']
if is_new_post:
# add content data to hive_post_data
post_data = dict(title=op['title'] if op['title'] else '',
img_url=img_url if img_url else '',
body=op['body'] if op['body'] else '',
json=op['json_metadata'] if op['json_metadata'] else '')
else:
# edit case. Now we need to (potentially) apply patch to the post body.
# empty new body means no body edit, not clear (same with other data)
new_body = cls._merge_post_body(id=result['id'], new_body_def=op['body']) if op['body'] else None
new_title = op['title'] if op['title'] else None
new_json = op['json_metadata'] if op['json_metadata'] else None
# when 'new_json' is not empty, 'img_url' should be overwritten even if it is itself empty
new_img = img_url if img_url else '' if new_json else None
post_data = dict(title=new_title, img_url=new_img, body=new_body, json=new_json)
# log.info("Adding author: {} permlink: {}".format(op['author'], op['permlink']))
PostDataCache.add_data(result['id'], post_data, is_new_post)
if not result['depth']: if not result['depth']:
tags = [result['post_category']] tags = [result['post_category']]
if md and 'tags' in md and isinstance(md['tags'], list): if md and 'tags' in md and isinstance(md['tags'], list):
......
""" Class for reblog operations """
import logging
from hive.db.adapter import Db
from hive.db.db_state import DbState
from hive.indexer.accounts import Accounts
from hive.indexer.feed_cache import FeedCache
from hive.indexer.notify import Notify
DB = Db.instance()
log = logging.getLogger(__name__)
DELETE_SQL = """
WITH processing_set AS (
SELECT hp.id as post_id, ha.id as account_id
FROM hive_posts hp
INNER JOIN hive_accounts ha ON hp.author_id = ha.id
INNER JOIN hive_permlink_data hpd ON hp.permlink_id = hpd.id
WHERE ha.name = :a AND hpd.permlink = :permlink AND hp.depth <= 0
)
DELETE FROM hive_reblogs AS hr
WHERE hr.account = :a AND hr.post_id IN (SELECT ps.post_id FROM processing_set ps)
RETURNING hr.post_id, (SELECT ps.account_id FROM processing_set ps) AS account_id
"""
SELECT_SQL = """
SELECT :blogger as blogger, hp.id as post_id, :date as date, :block_num as block_num
FROM hive_posts hp
INNER JOIN hive_accounts ha ON ha.id = hp.author_id
INNER JOIN hive_permlink_data hpd ON hpd.id = hp.permlink_id
WHERE ha.name = :author AND hpd.permlink = :permlink AND hp.depth <= 0
"""
INSERT_SQL = """
INSERT INTO hive_reblogs (account, post_id, created_at, block_num)
""" + SELECT_SQL + """
ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING
RETURNING post_id
"""
class Reblog():
""" Class for reblog operations """
reblog_items_to_flush = []
@classmethod
def reblog_op(cls, account, op_json, block_date, block_num):
if 'account' not in op_json or \
'author' not in op_json or \
'permlink' not in op_json:
return
blogger = op_json['account']
author = op_json['author']
permlink = op_json['permlink']
if blogger != account:
return # impersonation
if not all(map(Accounts.exists, [author, blogger])):
return
if 'delete' in op_json and op_json['delete'] == 'delete':
row = DB.query_row(DELETE_SQL, a=blogger, permlink=permlink)
if row is None:
log.debug("reblog: post not found: %s/%s", author, permlink)
return
if not DbState.is_initial_sync():
result = dict(row)
FeedCache.delete(result['post_id'], result['account_id'])
else:
if DbState.is_initial_sync():
row = DB.query_row(SELECT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
result = dict(row)
cls.reblog_items_to_flush.append(result)
else:
row = DB.query_row(INSERT_SQL, blogger=blogger, author=author, permlink=permlink, date=block_date, block_num=block_num)
if row is not None:
author_id = Accounts.get_id(author)
blogger_id = Accounts.get_id(blogger)
result = dict(row)
post_id = result['post_id']
FeedCache.insert(post_id, blogger_id, block_date)
Notify('reblog', src_id=blogger_id, dst_id=author_id,
post_id=post_id, when=block_date,
score=Accounts.default_score(blogger)).write()
else:
log.warning("Error in reblog: Insert operation returned `None` as `post_id`. Op details: {}".format(op_json))
@classmethod
def flush(cls):
sql_prefix = """
INSERT INTO hive_reblogs (account, post_id, created_at, block_num)
VALUES
"""
sql_postfix = """
ON CONFLICT ON CONSTRAINT hive_reblogs_pk DO NOTHING
"""
values = []
limit = 1000
count = 0
item_count = len(cls.reblog_items_to_flush)
for reblog_item in cls.reblog_items_to_flush:
if count < limit:
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = count + 1
else:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
values.clear()
values.append("('{}', {}, '{}', {})".format(reblog_item["blogger"], reblog_item["post_id"], reblog_item["date"], reblog_item["block_num"]))
count = 1
if len(values) > 0:
query = sql_prefix + ",".join(values)
query += sql_postfix
DB.query(query)
cls.reblog_items_to_flush.clear()
return item_count
\ No newline at end of file
"""Hive sync manager.""" """Hive sync manager."""
from hive.indexer.reblog import Reblog
import logging import logging
import glob import glob
from time import perf_counter as perf from time import perf_counter as perf
...@@ -211,13 +212,18 @@ class Sync: ...@@ -211,13 +212,18 @@ class Sync:
def run(self): def run(self):
"""Initialize state; setup/recovery checks; sync and runloop.""" """Initialize state; setup/recovery checks; sync and runloop."""
from hive.version import VERSION, GIT_REVISION
log.info("hivemind_version : %s", VERSION)
log.info("hivemind_git_rev : %s", GIT_REVISION)
from hive.db.schema import DB_VERSION as SCHEMA_DB_VERSION
log.info("database_schema_version : %s", SCHEMA_DB_VERSION)
# ensure db schema up to date, check app status # ensure db schema up to date, check app status
DbState.initialize() DbState.initialize()
# prefetch id->name and id->rank memory maps # prefetch id->name and id->rank memory maps
Accounts.load_ids() Accounts.load_ids()
Accounts.fetch_ranks()
# load irredeemables # load irredeemables
mutes = Mutes( mutes = Mutes(
...@@ -228,6 +234,10 @@ class Sync: ...@@ -228,6 +234,10 @@ class Sync:
# community stats # community stats
Community.recalc_pending_payouts() Community.recalc_pending_payouts()
sql = "SELECT num FROM hive_blocks ORDER BY num DESC LIMIT 1"
database_head_block = DbState.db().query_one(sql)
log.info("database_head_block : %s", database_head_block)
if DbState.is_initial_sync(): if DbState.is_initial_sync():
last_imported_block = Blocks.head_num() last_imported_block = Blocks.head_num()
# resume initial sync # resume initial sync
...@@ -377,7 +387,6 @@ class Sync: ...@@ -377,7 +387,6 @@ class Sync:
if num % 1200 == 0: #1hr if num % 1200 == 0: #1hr
log.warning("head block %d @ %s", num, block['timestamp']) log.warning("head block %d @ %s", num, block['timestamp'])
log.info("[LIVE] hourly stats") log.info("[LIVE] hourly stats")
Accounts.fetch_ranks()
#Community.recalc_pending_payouts() #Community.recalc_pending_payouts()
if num % 200 == 0: #10min if num % 200 == 0: #10min
Community.recalc_pending_payouts() Community.recalc_pending_payouts()
......
...@@ -27,12 +27,11 @@ class Votes: ...@@ -27,12 +27,11 @@ class Votes:
log.exception("Adding new vote-info into '_votes_data' dict") log.exception("Adding new vote-info into '_votes_data' dict")
raise RuntimeError("Fatal error") raise RuntimeError("Fatal error")
key = voter + "/" + author + "/" + permlink key = "{}/{}/{}".format(voter, author, permlink)
if key in cls._votes_data: if key in cls._votes_data:
cls._votes_data[key]["vote_percent"] = weight cls._votes_data[key]["vote_percent"] = weight
cls._votes_data[key]["last_update"] = date cls._votes_data[key]["last_update"] = date
cls._votes_data[key]["block_num"] = block_num
else: else:
cls._votes_data[key] = dict(voter=voter, cls._votes_data[key] = dict(voter=voter,
author=author, author=author,
...@@ -45,20 +44,26 @@ class Votes: ...@@ -45,20 +44,26 @@ class Votes:
block_num=block_num) block_num=block_num)
@classmethod @classmethod
def effective_comment_vote_op(cls, key, vop): def effective_comment_vote_op(cls, vop):
""" Process effective_comment_vote_operation """ """ Process effective_comment_vote_operation """
if cls.inside_flush: key = "{}/{}/{}".format(vop['voter'], vop['author'], vop['permlink'])
log.exception("Updating data in '_votes_data' using effective comment")
raise RuntimeError("Fatal error")
assert key in cls._votes_data
cls._votes_data[key]["weight"] = vop["weight"]
cls._votes_data[key]["rshares"] = vop["rshares"]
cls._votes_data[key]["is_effective"] = True
cls._votes_data[key]["block_num"] = vop['block_num']
if key in cls._votes_data:
cls._votes_data[key]["weight"] = vop["weight"]
cls._votes_data[key]["rshares"] = vop["rshares"]
cls._votes_data[key]["is_effective"] = True
cls._votes_data[key]["block_num"] = vop['block_num']
else:
cls._votes_data[key] = dict(voter=vop['voter'],
author=vop['author'],
permlink=vop['permlink'],
vote_percent=0,
weight=vop["weight"],
rshares=vop["rshares"],
last_update='1970-01-01 00:00:00',
is_effective=True,
block_num=vop['block_num'])
@classmethod @classmethod
def flush(cls): def flush(cls):
""" Flush vote data from cache to database """ """ Flush vote data from cache to database """
......
...@@ -66,7 +66,7 @@ async def get_profile(context, account, observer=None): ...@@ -66,7 +66,7 @@ async def get_profile(context, account, observer=None):
"""Load account/profile data.""" """Load account/profile data."""
db = context['db'] db = context['db']
ret = await load_profiles(db, [valid_account(account)]) ret = await load_profiles(db, [valid_account(account)])
assert ret, 'account \'{}\' does not exist'.format(account) assert ret, 'Account \'{}\' does not exist'.format(account)
observer_id = await get_account_id(db, observer) if observer else None observer_id = await get_account_id(db, observer) if observer else None
if observer_id: if observer_id:
......
...@@ -16,10 +16,8 @@ log = logging.getLogger(__name__) ...@@ -16,10 +16,8 @@ log = logging.getLogger(__name__)
async def load_profiles(db, names): async def load_profiles(db, names):
"""`get_accounts`-style lookup for `get_state` compat layer.""" """`get_accounts`-style lookup for `get_state` compat layer."""
sql = """SELECT id, name, display_name, about, reputation, vote_weight, sql = """SELECT * FROM hive_accounts_info_view
created_at, post_count, profile_image, location, website, WHERE name IN :names"""
cover_image, rank, following, followers, active_at
FROM hive_accounts WHERE name IN :names"""
rows = await db.query_all(sql, names=tuple(names)) rows = await db.query_all(sql, names=tuple(names))
return [_condenser_profile_object(row) for row in rows] return [_condenser_profile_object(row) for row in rows]
...@@ -197,6 +195,8 @@ def _condenser_profile_object(row): ...@@ -197,6 +195,8 @@ def _condenser_profile_object(row):
blacklists = Mutes.lists(row['name'], row['reputation']) blacklists = Mutes.lists(row['name'], row['reputation'])
#Important. The member `sp` in `stats` is removed, because currently the hivemind doesn't hold any balances.
# The member `vote_weight` from `hive_accounts` is removed as well.
return { return {
'id': row['id'], 'id': row['id'],
'name': row['name'], 'name': row['name'],
...@@ -206,16 +206,15 @@ def _condenser_profile_object(row): ...@@ -206,16 +206,15 @@ def _condenser_profile_object(row):
'reputation': row['reputation'], 'reputation': row['reputation'],
'blacklists': blacklists, 'blacklists': blacklists,
'stats': { 'stats': {
'sp': int(row['vote_weight'] * 0.0005037),
'rank': row['rank'], 'rank': row['rank'],
'following': row['following'], 'following': row['following'],
'followers': row['followers'], 'followers': row['followers'],
}, },
'metadata': { 'metadata': {
'profile': {'name': row['display_name'], 'profile': {'name': row['display_name'] if row['display_name'] else "",
'about': row['about'], 'about': row['about'] if row['about'] else "",
'website': row['website'], 'website': row['website'] if row['website'] else "",
'location': row['location'], 'location': row['location'] if row['location'] else "",
'cover_image': row['cover_image'], 'cover_image': row['cover_image'],
'profile_image': row['profile_image'], 'profile_image': row['profile_image'],
}}} }}}
...@@ -232,7 +231,10 @@ def _bridge_post_object(row, truncate_body=0): ...@@ -232,7 +231,10 @@ def _bridge_post_object(row, truncate_body=0):
post['title'] = row['title'] post['title'] = row['title']
post['body'] = row['body'][0:truncate_body] if truncate_body else row['body'] post['body'] = row['body'][0:truncate_body] if truncate_body else row['body']
post['json_metadata'] = json.loads(row['json']) try:
post['json_metadata'] = json.loads(row['json'])
except Exception:
post['json_metadata'] = {}
post['created'] = json_date(row['created_at']) post['created'] = json_date(row['created_at'])
post['updated'] = json_date(row['updated_at']) post['updated'] = json_date(row['updated_at'])
......
...@@ -34,7 +34,7 @@ async def get_post_header(context, author, permlink): ...@@ -34,7 +34,7 @@ async def get_post_header(context, author, permlink):
row = await db.query_row(sql, author=author, permlink=permlink) row = await db.query_row(sql, author=author, permlink=permlink)
assert row, 'post \'@{}/{}\' does not exist'.format(author,permlink) assert row, 'Post {}/{} does not exist'.format(author,permlink)
return dict( return dict(
author=row['author'], author=row['author'],
......
...@@ -5,6 +5,7 @@ from functools import wraps ...@@ -5,6 +5,7 @@ from functools import wraps
import traceback import traceback
import logging import logging
import datetime import datetime
from psycopg2.errors import RaiseException
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -20,6 +21,9 @@ def return_error_info(function): ...@@ -20,6 +21,9 @@ def return_error_info(function):
"""Catch ApiError and AssersionError (always due to user error).""" """Catch ApiError and AssersionError (always due to user error)."""
try: try:
return await function(*args, **kwargs) return await function(*args, **kwargs)
except (RaiseException) as e:
log.error("PGSQL: %s\n%s", repr(e), traceback.format_exc())
raise AssertionError(e.diag.message_primary)
except (ApiError, AssertionError, TypeError, Exception) as e: except (ApiError, AssertionError, TypeError, Exception) as e:
if isinstance(e, KeyError): if isinstance(e, KeyError):
#TODO: KeyError overloaded for method not found. Any KeyErrors #TODO: KeyError overloaded for method not found. Any KeyErrors
...@@ -115,3 +119,23 @@ def valid_follow_type(follow_type: str): ...@@ -115,3 +119,23 @@ def valid_follow_type(follow_type: str):
"""Ensure follow type is valid steemd type.""" """Ensure follow type is valid steemd type."""
assert follow_type in ['blog', 'ignore'], 'invalid follow_type `%s`' % follow_type assert follow_type in ['blog', 'ignore'], 'invalid follow_type `%s`' % follow_type
return follow_type return follow_type
def valid_date(date, allow_empty=False):
""" Ensure that date is in correct format """
if not date:
assert allow_empty, 'Date is blank'
check_date = False
# check format "%Y-%m-%d %H:%M:%S"
try:
check_date = (date == datetime.datetime.strptime(date, "%Y-%m-%d %H:%M:%S").strftime('%Y-%m-%d %H:%M%S'))
except ValueError:
check_date = False
# if check failed for format above try another format
# check format "%Y-%m-%dT%H:%M:%S"
if not check_date:
try:
check_date = (date == datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S").strftime('%Y-%m-%dT%H:%M:%S'))
except ValueError:
pass
assert check_date, "Date should be in format Y-m-d H:M:S or Y-m-dTH:M:S"
...@@ -439,11 +439,8 @@ async def get_accounts(db, accounts: list): ...@@ -439,11 +439,8 @@ async def get_accounts(db, accounts: list):
ret = [] ret = []
names = ["'{}'".format(a) for a in accounts] names = ["'{}'".format(a) for a in accounts]
sql = """SELECT created_at, reputation, display_name, about, sql = """SELECT *
location, website, profile_image, cover_image, followers, following, FROM hive_accounts_info_view WHERE name IN ({})""".format(",".join(names))
proxy, post_count, proxy_weight, vote_weight, rank,
lastread_at, active_at, cached_at, raw_json
FROM hive_accounts WHERE name IN ({})""".format(",".join(names))
result = await db.query_all(sql) result = await db.query_all(sql)
for row in result: for row in result:
...@@ -463,10 +460,11 @@ async def get_accounts(db, accounts: list): ...@@ -463,10 +460,11 @@ async def get_accounts(db, accounts: list):
account_data['proxy'] = row.proxy account_data['proxy'] = row.proxy
account_data['post_count'] = row.post_count account_data['post_count'] = row.post_count
account_data['proxy_weight'] = row.proxy_weight account_data['proxy_weight'] = row.proxy_weight
account_data['vote_weight'] = row.vote_weight
account_data['rank'] = row.rank account_data['rank'] = row.rank
account_data['lastread_at'] = row.lastread_at.isoformat() account_data['lastread_at'] = row.lastread_at.isoformat()
account_data['active_at'] = row.active_at.isoformat() account_data['active_at'] = row.active_at.isoformat()
account_data['cached_at'] = row.cached_at.isoformat() account_data['cached_at'] = row.cached_at.isoformat()
ret.append(account_data) ret.append(account_data)
......
...@@ -14,10 +14,8 @@ log = logging.getLogger(__name__) ...@@ -14,10 +14,8 @@ log = logging.getLogger(__name__)
async def load_accounts(db, names): async def load_accounts(db, names):
"""`get_accounts`-style lookup for `get_state` compat layer.""" """`get_accounts`-style lookup for `get_state` compat layer."""
sql = """SELECT id, name, display_name, about, reputation, vote_weight, sql = """SELECT * FROM hive_accounts_info_view
created_at, post_count, profile_image, location, website, WHERE name IN :names"""
cover_image
FROM hive_accounts WHERE name IN :names"""
rows = await db.query_all(sql, names=tuple(names)) rows = await db.query_all(sql, names=tuple(names))
return [_condenser_account_object(row) for row in rows] return [_condenser_account_object(row) for row in rows]
...@@ -156,12 +154,13 @@ async def _query_author_rep_map(db, posts): ...@@ -156,12 +154,13 @@ async def _query_author_rep_map(db, posts):
def _condenser_account_object(row): def _condenser_account_object(row):
"""Convert an internal account record into legacy-steemd style.""" """Convert an internal account record into legacy-steemd style."""
#The member `vote_weight` from `hive_accounts` is removed, so currently the member `net_vesting_shares` is equals to zero.
return { return {
'name': row['name'], 'name': row['name'],
'created': str(row['created_at']), 'created': str(row['created_at']),
'post_count': row['post_count'], 'post_count': row['post_count'],
'reputation': rep_to_raw(row['reputation']), 'reputation': rep_to_raw(row['reputation']),
'net_vesting_shares': row['vote_weight'], 'net_vesting_shares': 0,
'transfer_history': [], 'transfer_history': [],
'json_metadata': json.dumps({ 'json_metadata': json.dumps({
'profile': {'name': row['display_name'], 'profile': {'name': row['display_name'],
......