Skip to content
Snippets Groups Projects
Commit 4a133e81 authored by Marcin's avatar Marcin
Browse files

recursive query to update comments parents

parent d8dfa799
No related branches found
No related tags found
4 merge requests!456Release candidate v1 24,!230Setup monitoring with pghero,!135Enable postgres monitoring on CI server,!94Mi posts active computed and block num column hive posts
...@@ -13,6 +13,7 @@ from hive.db.schema import (setup, reset_autovac, build_metadata, ...@@ -13,6 +13,7 @@ from hive.db.schema import (setup, reset_autovac, build_metadata,
from hive.db.adapter import Db from hive.db.adapter import Db
from hive.utils.trends import update_all_hot_and_tranding from hive.utils.trends import update_all_hot_and_tranding
from hive.utils.post_active import update_all_posts_active
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -197,6 +198,12 @@ class DbState: ...@@ -197,6 +198,12 @@ class DbState:
time_end = perf_counter() time_end = perf_counter()
log.info("[INIT] update_all_hot_and_tranding executed in %fs", time_end - time_start) log.info("[INIT] update_all_hot_and_tranding executed in %fs", time_end - time_start)
time_start = perf_counter()
update_all_posts_active()
time_end = perf_counter()
log.info("[INIT] update_all_posts_active executed in %fs", time_end - time_start)
# TODO: #111 # TODO: #111
#for key in cls._all_foreign_keys(): #for key in cls._all_foreign_keys():
# log.info("Create fk %s", key.name) # log.info("Create fk %s", key.name)
......
...@@ -1228,6 +1228,24 @@ def setup(db): ...@@ -1228,6 +1228,24 @@ def setup(db):
""" """
db.query_no_return(sql) db.query_no_return(sql)
sql = """
DROP FUNCTION IF EXISTS public.max_time_stamp() CASCADE;
CREATE OR REPLACE FUNCTION public.max_time_stamp( _first TIMESTAMP, _second TIMESTAMP )
RETURNS TIMESTAMP
LANGUAGE 'plpgsql'
IMMUTABLE
AS $BODY$
BEGIN
IF _first > _second THEN
RETURN _first;
ELSE
RETURN _second;
END IF;
END
$BODY$;
"""
db.query_no_return(sql)
def reset_autovac(db): def reset_autovac(db):
"""Initializes/resets per-table autovacuum/autoanalyze params. """Initializes/resets per-table autovacuum/autoanalyze params.
......
...@@ -17,6 +17,7 @@ from time import perf_counter ...@@ -17,6 +17,7 @@ from time import perf_counter
from hive.utils.stats import OPStatusManager as OPSM from hive.utils.stats import OPStatusManager as OPSM
from hive.utils.stats import FlushStatusManager as FSM from hive.utils.stats import FlushStatusManager as FSM
from hive.utils.post_active import update_active_starting_from_posts_on_block
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -60,6 +61,8 @@ class Blocks: ...@@ -60,6 +61,8 @@ class Blocks:
Tags.flush() Tags.flush()
Votes.flush() Votes.flush()
Posts.flush() Posts.flush()
block_num = int(block['block_id'][:8], base=16)
cls.on_live_blocks_processed( block_num, block_num, is_initial_sync )
time_end = perf_counter() time_end = perf_counter()
log.info("[PROCESS BLOCK] %fs", time_end - time_start) log.info("[PROCESS BLOCK] %fs", time_end - time_start)
return ret return ret
...@@ -71,8 +74,11 @@ class Blocks: ...@@ -71,8 +74,11 @@ class Blocks:
DB.query("START TRANSACTION") DB.query("START TRANSACTION")
last_num = 0 last_num = 0
first_block = -1
try: try:
for block in blocks: for block in blocks:
if first_block == -1:
first_block = int(block['block_id'][:8], base=16)
last_num = cls._process(block, vops, hived, is_initial_sync) last_num = cls._process(block, vops, hived, is_initial_sync)
except Exception as e: except Exception as e:
log.error("exception encountered block %d", last_num + 1) log.error("exception encountered block %d", last_num + 1)
...@@ -97,6 +103,9 @@ class Blocks: ...@@ -97,6 +103,9 @@ class Blocks:
flush_time = register_time(flush_time, "Follow", folllow_items) flush_time = register_time(flush_time, "Follow", folllow_items)
flush_time = register_time(flush_time, "Posts", Posts.flush()) flush_time = register_time(flush_time, "Posts", Posts.flush())
if first_block > -1:
cls.on_live_blocks_processed( first_block, last_num, is_initial_sync )
DB.query("COMMIT") DB.query("COMMIT")
log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s") log.info(f"[PROCESS MULTI] {len(blocks)} blocks in {OPSM.stop(time_start) :.4f}s")
...@@ -382,3 +391,13 @@ class Blocks: ...@@ -382,3 +391,13 @@ class Blocks:
DB.query("COMMIT") DB.query("COMMIT")
log.warning("[FORK] recovery complete") log.warning("[FORK] recovery complete")
# TODO: manually re-process here the blocks which were just popped. # TODO: manually re-process here the blocks which were just popped.
@classmethod
def on_live_blocks_processed( cls, first_block, last_block, is_initial_sync ):
"""Is invoked when processing of block range is done and received
informations from hived are already stored in db
"""
if is_initial_sync:
return;
update_active_starting_from_posts_on_block( first_block, last_block )
from hive.db.adapter import Db
DB = Db.instance()
"""
There are three cases when 'active' field in post is updated:
1) when a descendant post comment was added (recursivly on any depth)
2) when a descendant post comment was deleted (recursivly on any depth)
3) when the post is updated
It means that, when the comment for posts is updated then its 'active' field
does not propagate for its ancestors.
"""
update_active_sql = """
WITH RECURSIVE parent_posts ( parent_id, post_id, intrusive_active) AS (
SELECT
parent_id as parent_id,
id as post_id,
CASE WHEN hp1.active = hp1.created_at OR hp1.counter_deleted > 0 THEN hp1.active
ELSE hp1.created_at
END as intrusive_active
FROM hive_posts hp1 {}
UNION
SELECT
hp2.parent_id as parent_id,
id as post_id,
max_time_stamp(
CASE WHEN hp2.active = hp2.created_at OR hp2.counter_deleted > 0 THEN hp2.active
ELSE hp2.created_at
END
, pp.intrusive_active
) as intrusive_active
FROM parent_posts pp
JOIN hive_posts hp2 ON pp.parent_id = hp2.id
WHERE hp2.depth > 0 AND pp.intrusive_active > hp2.active
)
UPDATE
hive_posts
SET
active = new_active
FROM
(
SELECT hp.id as post_id, MAX(pp.intrusive_active) as new_active
FROM parent_posts pp
JOIN hive_posts hp ON pp.parent_id = hp.id GROUP BY hp.id
) as dataset
WHERE dataset.post_id = hive_posts.id;
"""
def update_all_posts_active():
DB.query_no_return(update_active_sql.format( "WHERE ( children = 0 OR hp1.counter_deleted > 0 ) AND depth > 0" ))
def update_active_starting_from_posts_on_block( first_block_num, last_block_num ):
if first_block_num == last_block_num:
DB.query_no_return(update_active_sql.format( "WHERE block_num={} AND depth > 0" ).format(first_block_num) )
return
DB.query_no_return(update_active_sql.format( "WHERE block_num>={} AND block_num <={} AND depth > 0" ).format(first_block_num, last_block_num) )
Subproject commit bae23397a6538852645dc8f3c4f08eca06f18144 Subproject commit 385a6826f33d7818ba0e04bb2144bf5f275d1acf
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment