Commit 66c81e9a authored by Bartek Wrona's avatar Bartek Wrona
Browse files

Merge branch 'km_issue_151' into 'develop'

removed Posts._ids

See merge request !485
parents 2956ce10 86d1e6d9
......@@ -78,6 +78,13 @@ class Accounts(DbAdapterHolder):
assert name in cls._ids, 'Account \'%s\' does not exist' % name
return cls._ids[name]
@classmethod
def get_id_noexept(cls, name):
"""Get account id by name. Return None if not found."""
assert isinstance(name, str), "account name should be string"
return cls._ids.get(name, None)
@classmethod
def exists(cls, names):
"""Check if an account name exists."""
......
......@@ -425,21 +425,29 @@ class CommunityOp:
def _read_account(self):
_name = read_key_str(self.op, 'account', 16)
assert _name, 'must name an account'
assert Accounts.exists(_name), 'account `%s` not found' % _name
self.account = _name
self.account_id = Accounts.get_id(_name)
self.account = _name
def _read_permlink(self):
assert self.account, 'permlink requires named account'
_permlink = read_key_str(self.op, 'permlink', 256)
assert _permlink, 'must name a permlink'
from hive.indexer.posts import Posts
_pid = Posts.get_id(self.account, _permlink)
assert _pid, 'invalid post: %s/%s' % (self.account, _permlink)
sql = """SELECT community_id FROM hive_posts WHERE id = :id LIMIT 1"""
_comm = DB.query_one(sql, id=_pid)
sql = \
"""
SELECT hp.id, community_id
FROM hive_posts hp
JOIN hive_permlink_data hpd ON hp.permlink_id=hpd.id
WHERE author_id=:_author AND hpd.permlink=:_permlink
"""
result = DB.query_row(sql, _author=self.account_id, _permlink=_permlink)
assert result, f"post does not exists, query:\t{sql}"
result = dict(result)
_pid = result.get('id', None)
assert _pid, f'post does not exists {self.account}/{_permlink}'
_comm = result.get('community_id', None)
assert self.community_id == _comm, 'post does not belong to community'
self.permlink = _permlink
......
......@@ -19,22 +19,49 @@ class Payments:
@classmethod
def op_transfer(cls, op, tx_idx, num, date):
"""Process raw transfer op; apply balance if valid post promote."""
record = cls._validated(op, tx_idx, num, date)
if not record:
result = cls._validated(op, tx_idx, num, date)
if not result:
return
# add payment record
sql = DB.build_insert('hive_payments', record, pk='id')
DB.query(sql)
# read current amount
sql = "SELECT promoted FROM hive_posts WHERE id = :id"
curr_amount = DB.query_one(sql, id=record['post_id'])
new_amount = curr_amount + record['amount']
# update post record
sql = "UPDATE hive_posts SET promoted = :val WHERE id = :id"
DB.query(sql, val=new_amount, id=record['post_id'])
record, author_id, permlink = result
# add payment record and return post id
sql = \
"""
INSERT INTO hive_payments(block_num, tx_idx, post_id, from_account, to_account, amount, token) SELECT
bn, tx, hp.id, fa, ta, am, tkn
FROM
(
SELECT bn, tx, hpd.id, auth_id, fa, ta, am, tkn
FROM (VALUES (:_block_num, :_tx_idx, :_permlink, :_author_id , :_from_account , :_to_account , :_amount, :_token))
AS v(bn, tx, perm, auth_id, fa, ta, am, tkn)
JOIN hive_permlink_data hpd
ON v.perm = hpd.permlink
) as vv(bn, tx, hpd_id, auth_id, fa, ta, am, tkn )
JOIN hive_posts hp
ON hp.author_id=vv.auth_id AND hp.permlink_id=vv.hpd_id
RETURNING post_id
"""
post_id = DB.query_one(sql,
_block_num=record['block_num'],
_tx_idx=record['tx_idx'],
_permlink=permlink,
_author_id=author_id,
_from_account=record['from_account'],
_to_account=record['to_account'],
_amount=record['amount'],
_token=record['token']
)
amount = record['amount']
if not isinstance(amount, float):
amount = float(amount)
if amount != 0.0 and post_id is not None:
# update post record
sql = "UPDATE hive_posts SET promoted = promoted + :val WHERE id = :id"
DB.query(sql, val=amount, id=post_id)
@classmethod
def _validated(cls, op, tx_idx, num, date):
......@@ -53,22 +80,18 @@ class Payments:
return # invalid url
author, permlink = cls._split_url(url)
if not Accounts.exists(author):
author_id = Accounts.get_id_noexept(author)
if not author_id:
return
post_id = Posts.get_id(author, permlink)
if not post_id:
log.debug("post does not exist: %s", url)
return
return {'id': None,
return [{'id': None,
'block_num': num,
'tx_idx': tx_idx,
'post_id': post_id,
'from_account': Accounts.get_id(op['from']),
'to_account': Accounts.get_id(op['to']),
'amount': amount,
'token': token}
'token': token}, author_id, permlink]
@staticmethod
def _validate_url(url):
......
......@@ -27,7 +27,6 @@ class Posts(DbAdapterHolder):
# LRU cache for (author-permlink -> id) lookup (~400mb per 1M entries)
CACHE_SIZE = 2000000
_ids = collections.OrderedDict()
_hits = 0
_miss = 0
......@@ -40,43 +39,6 @@ class Posts(DbAdapterHolder):
sql = "SELECT MAX(id) FROM hive_posts WHERE counter_deleted = 0"
return DB.query_one(sql) or 0
@classmethod
def get_id(cls, author, permlink):
"""Look up id by author/permlink, making use of LRU cache."""
url = author+'/'+permlink
if url in cls._ids:
cls._hits += 1
_id = cls._ids.pop(url)
cls._ids[url] = _id
else:
cls._miss += 1
sql = """
SELECT hp.id
FROM hive_posts hp
INNER JOIN hive_accounts ha_a ON ha_a.id = hp.author_id
INNER JOIN hive_permlink_data hpd_p ON hpd_p.id = hp.permlink_id
WHERE ha_a.name = :a AND hpd_p.permlink = :p
"""
_id = DB.query_one(sql, a=author, p=permlink)
if _id:
cls._set_id(url, _id)
# cache stats (under 10M every 10K else every 100K)
total = cls._hits + cls._miss
if total % 100000 == 0:
log.info("pid lookups: %d, hits: %d (%.1f%%), entries: %d",
total, cls._hits, 100.0*cls._hits/total, len(cls._ids))
return _id
@classmethod
def _set_id(cls, url, pid):
"""Add an entry to the LRU, maintaining max size."""
assert pid, "no pid provided for %s" % url
if len(cls._ids) > cls.CACHE_SIZE:
cls._ids.popitem(last=False)
cls._ids[url] = pid
@classmethod
def delete_op(cls, op, block_date):
"""Given a delete_comment op, mark the post as deleted.
......@@ -119,8 +81,6 @@ class Posts(DbAdapterHolder):
# TODO we need to enhance checking related community post validation and honor is_muted.
error = cls._verify_post_against_community(op, result['community_id'], result['is_valid'], result['is_muted'])
cls._set_id(op['author']+'/'+op['permlink'], result['id'])
img_url = None
if 'image' in md:
img_url = md['image']
......
......@@ -29,6 +29,7 @@ from hive.utils.stats import WaitingStatusManager as WSM
from hive.utils.stats import PrometheusClient as PC
from hive.utils.stats import BroadcastObject
from hive.utils.communities_rank import update_communities_posts_and_rank
from hive.utils.misc import show_app_version, log_memory_usage
from hive.indexer.mock_block_provider import MockBlockProvider
from hive.indexer.mock_vops_provider import MockVopsProvider
......@@ -145,6 +146,7 @@ def _block_consumer(blocks_data_provider, is_initial_sync, lbound, ubound):
log.info(timer.batch_status(prefix))
log.info("[INITIAL SYNC] Time elapsed: %fs", time_current - time_start)
log.info("[INITIAL SYNC] Current system time: %s", datetime.now().strftime("%H:%M:%S"))
log.info(log_memory_usage())
rate = minmax(rate, len(vops_and_blocks['blocks']), time_current - time_before_waiting_for_data, lbound)
if block_end - block_start > 1.0 or is_debug:
......@@ -260,7 +262,6 @@ class Sync:
sql = "SELECT level, patch_date, patched_to_revision FROM hive_db_patch_level ORDER BY level DESC LIMIT 1"
patch_level_data = self._db.query_row(sql)
from hive.utils.misc import show_app_version;
show_app_version(log, database_head_block, patch_level_data)
set_handlers()
......
import os, psutil
from hive.utils.stats import PrometheusClient, BroadcastObject
def log_memory_usage(memtypes=["rss", "vms", "shared"], broadcast = True) -> str:
"""
Logs current memory types, additionally broadcast if broadcast set to True (default)
Available memtypes: rss, vms, shared, text, lib, data, dirty
"""
def format_bytes(val : int):
assert isinstance(val, int) or isinstance(val, float), 'invalid data type, required int or float'
return f'{ val / 1024.0 / 1024.0 :.2f} MB'
human_readable = { "rss": "physical_memory", "vms": "virtual_memory", "shared": "shared_memory", "text": "used_by_executable", "lib": "used_by_shared_libraries" }
stats = psutil.Process(os.getpid()).memory_info() # docs: https://psutil.readthedocs.io/en/latest/#psutil.Process.memory_info
if broadcast:
PrometheusClient.broadcast([ BroadcastObject(f'hivemind_memory_{key}', getattr(stats, key), 'b') for key in stats._fields ]) # broadcast to prometheus
return f"memory usage report: { ', '.join( [ f'{ human_readable.get(k, k) } = { format_bytes(getattr(stats, k)) }' for k in memtypes ] ) }"
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
......
......@@ -49,7 +49,7 @@ class PrometheusClient:
gauges = {}
while pid_exists(pid):
value : BroadcastObject = PrometheusClient.logs_to_broadcast.get()
value : BroadcastObject = PrometheusClient.logs_to_broadcast.get(True)
value.debug()
value_name = value.name()
......@@ -75,11 +75,13 @@ class PrometheusClient:
log.warn("Failed to import prometheus client. Online stats disabled")
return
from threading import Thread
deamon = Thread(target=PrometheusClient.work, args=[ port, getpid() ], daemon=True)
deamon.start()
PrometheusClient.deamon = Thread(target=PrometheusClient.work, args=[ port, getpid() ], daemon=True)
PrometheusClient.deamon.start()
@staticmethod
def broadcast(obj):
if PrometheusClient.deamon is None:
return
if type(obj) == type(list()):
for v in obj:
PrometheusClient.broadcast(v)
......
{
"4000001": {
"previous": "",
"timestamp": "",
"witness": "",
"transaction_merkle_root": "",
"extensions": [],
"witness_signature": "",
"transactions": [
{
"ref_block_num": 100000,
"ref_block_prefix": 1,
"expiration": "2020-03-23T12:17:00",
"operations": [
{
"type": "transfer_operation",
"value": {
"from": "gtg",
"to": "null",
"amount": "0.02 HBD",
"memo": "@alice/firstpost______20"
}
}
]
}
],
"block_id": "",
"signing_key": "",
"transaction_ids": []
},
"4010001": {
"previous": "",
"timestamp": "",
"witness": "",
"transaction_merkle_root": "",
"extensions": [],
"witness_signature": "",
"transactions": [
{
"ref_block_num": 100001,
"ref_block_prefix": 1,
"expiration": "2020-03-23T12:17:00",
"operations": [
{
"type": "comment_operation",
"value": {
"parent_author": "",
"parent_permlink": "",
"author": "alice",
"permlink": "firstpost______20",
"title": "firstpost______20",
"body": "XXXXXX",
"json_metadata": "{}"
}
},
{
"type": "delete_comment_operation",
"value": {
"author": "alice",
"permlink": "firstpost______20"
}
},
{
"type": "transfer_operation",
"value": {
"from": "gtg",
"to": "null",
"amount": "0.02 HBD",
"memo": "@alice/firstpost______20"
}
}
]
}
],
"block_id": "",
"signing_key": "",
"transaction_ids": []
},
"4020001": {
"previous": "",
"timestamp": "",
"witness": "",
"transaction_merkle_root": "",
"extensions": [],
"witness_signature": "",
"transactions": [
{
"ref_block_num": 100003,
"ref_block_prefix": 1,
"expiration": "2020-03-23T12:17:00",
"operations": [
{
"type": "transfer_operation",
"value": {
"from": "gtg",
"to": "null",
"amount": "0.02 HBD",
"memo": "@alice/firstpost______20"
}
},
{
"type": "comment_operation",
"value": {
"parent_author": "",
"parent_permlink": "",
"author": "alice",
"permlink": "firstpost______20",
"title": "firstpost______20",
"body": "XXXXXX",
"json_metadata": "{}"
}
},
{
"type": "transfer_operation",
"value": {
"from": "gtg",
"to": "null",
"amount": "0.02 HBD",
"memo": "@alice/firstpost______20"
}
}
]
}
],
"block_id": "",
"signing_key": "",
"transaction_ids": []
}
}
\ No newline at end of file
......@@ -32,6 +32,7 @@ EOF
mock_data/block_data/community_op/mock_block_data_community.json \
mock_data/block_data/reblog_op/mock_block_data_reblog.json \
mock_data/block_data/reblog_op/mock_block_data_reblog_delete.json \
mock_data/block_data/payments_op/mock_block_data_payments.json \
--community-start-block 4999998 \
2>&1 | tee -i hivemind-sync.log
......
Subproject commit c737db85c8fcb68426a232438c9228bd13b9847e
Subproject commit 88caa692f239eb4a35754b0168673a131c3c7ab2
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment