From 82c0eb71368dfb2ad0f2034fa9b7a3b13a58f3a8 Mon Sep 17 00:00:00 2001
From: Bartek Wrona <wrona@syncad.com>
Date: Wed, 4 Nov 2020 22:20:08 +0100
Subject: [PATCH] Implemented periodic fill of notification cache, to
 significantly speedup notification API calls.

---
 hive/db/db_state.py                           | 13 +++++++-
 hive/db/schema.py                             | 21 +++++++++++-
 hive/db/sql_scripts/notifications_api.sql     | 32 +++++++++++++++++--
 hive/db/sql_scripts/notifications_view.sql    |  6 +++-
 .../upgrade/upgrade_runtime_migration.sql     | 17 ++++++++++
 .../upgrade/upgrade_table_schema.sql          | 27 ++++++++++++++++
 hive/indexer/blocks.py                        |  5 ++-
 7 files changed, 114 insertions(+), 7 deletions(-)

diff --git a/hive/db/db_state.py b/hive/db/db_state.py
index 2f23629e6..a20a36ea0 100644
--- a/hive/db/db_state.py
+++ b/hive/db/db_state.py
@@ -132,7 +132,10 @@ class DbState:
             'hive_votes_voter_id_post_id_idx',
             'hive_votes_post_id_voter_id_idx',
 
-            'hive_reputation_data_block_num_idx'
+            'hive_reputation_data_block_num_idx',
+
+            'hive_notification_cache_block_num_idx',
+            'hive_notification_cache_dst_score_idx'
         ]
 
         to_return = []
@@ -344,6 +347,14 @@ class DbState:
         time_end = perf_counter()
         log.info("[INIT] update_posts_rshares executed in %.4fs", time_end - time_start)
 
+        time_start = perf_counter()
+        sql = """
+              SELECT update_notification_cache(NULL, NULL, False);
+              """
+        DbState.db().query_no_return(sql)
+        time_end = perf_counter()
+        log.info("[INIT] update_notification_cache executed in %.4fs", time_end - time_start)
+
         # Update a block num immediately
         DbState.db().query_no_return("UPDATE hive_state SET block_num = :block_num", block_num = current_imported_block)
 
diff --git a/hive/db/schema.py b/hive/db/schema.py
index 40460ad3d..1b9a6973f 100644
--- a/hive/db/schema.py
+++ b/hive/db/schema.py
@@ -395,7 +395,7 @@ def build_metadata_community(metadata=None):
         sa.Column('dst_id',       sa.Integer,  nullable=True),
         sa.Column('post_id',      sa.Integer,  nullable=True),
         sa.Column('community_id', sa.Integer,  nullable=True),
-        sa.Column('block_num',    sa.Integer,  nullable=True),
+        sa.Column('block_num',    sa.Integer,  nullable=False),
         sa.Column('payload',      sa.Text,     nullable=True),
 
         sa.Index('hive_notifs_ix1', 'dst_id',                  'id', postgresql_where=sql_text("dst_id IS NOT NULL")),
@@ -406,6 +406,25 @@ def build_metadata_community(metadata=None):
         sa.Index('hive_notifs_ix6', 'dst_id', 'created_at', 'score', 'id', postgresql_where=sql_text("dst_id IS NOT NULL")), # unread
     )
 
+    sa.Table('hive_notification_cache', metadata,
+        sa.Column('id', sa.BigInteger, primary_key=True),
+        sa.Column('block_num', sa.Integer, nullable = False),
+        sa.Column('type_id', sa.Integer, nullable = False),
+        sa.Column('dst', sa.Integer, nullable=True), # dst account id except persistent notifs from hive_notifs
+        sa.Column('src', sa.Integer, nullable=True), # src account id
+        sa.Column('dst_post_id', sa.Integer, nullable=True), # destination post id
+        sa.Column('post_id', sa.Integer, nullable=True),
+        sa.Column('created_at', sa.DateTime, nullable=False), # notification creation time
+        sa.Column('score', sa.Integer, nullable=False),
+        sa.Column('community_title', sa.String(32), nullable=True),
+        sa.Column('community', sa.String(16), nullable=True),
+        sa.Column('payload', sa.String, nullable=True),
+
+        sa.Index('hive_notification_cache_block_num_idx', 'block_num'),
+        sa.Index('hive_notification_cache_dst_score_idx', 'dst', 'score', postgresql_where=sql_text("dst IS NOT NULL"))
+
+    )
+
     return metadata
 
 
diff --git a/hive/db/sql_scripts/notifications_api.sql b/hive/db/sql_scripts/notifications_api.sql
index 46090298d..8c0cf1cf5 100644
--- a/hive/db/sql_scripts/notifications_api.sql
+++ b/hive/db/sql_scripts/notifications_api.sql
@@ -44,7 +44,7 @@ BEGIN
   RETURN QUERY SELECT
     __last_read_at as lastread_at,
     count(1) as unread
-  FROM hive_raw_notifications_view hnv
+  FROM hive_notification_cache hnv
   WHERE hnv.dst = __account_id  AND hnv.block_num > __limit_block AND hnv.block_num > __last_read_at_block AND hnv.score >= _minimum_score
   ;
 END
@@ -82,7 +82,7 @@ BEGIN
   FROM
   (
     select nv.id, nv.type_id, nv.created_at, nv.src, nv.dst, nv.dst_post_id, nv.score, nv.community, nv.community_title, nv.payload
-      from hive_raw_notifications_view nv
+      from hive_notification_cache nv
   WHERE nv.dst = __account_id  AND nv.block_num > __limit_block AND nv.score >= _min_score AND ( _last_id = 0 OR nv.id < _last_id )
   ORDER BY nv.id DESC
   LIMIT _limit
@@ -123,7 +123,7 @@ BEGIN
   FROM
   (
     SELECT nv.id, nv.type_id, nv.created_at, nv.src, nv.dst, nv.dst_post_id, nv.score, nv.community, nv.community_title, nv.payload
-    FROM hive_raw_notifications_view nv
+    FROM hive_notification_cache nv
     WHERE nv.post_id = __post_id AND nv.block_num > __limit_block AND nv.score >= _min_score AND ( _last_id = 0 OR nv.id < _last_id )
     ORDER BY nv.id DESC
     LIMIT _limit
@@ -139,3 +139,29 @@ END
 $function$
 LANGUAGE plpgsql STABLE
 ;
+
+DROP FUNCTION IF EXISTS update_notification_cache;
+;
+CREATE OR REPLACE FUNCTION update_notification_cache(in _first_block_num INT, in _last_block_num INT, in _prune_old BOOLEAN)
+RETURNS VOID
+AS
+$function$
+DECLARE
+  __limit_block hive_blocks.num%TYPE = block_before_head( '90 days' );
+BEGIN
+  IF _first_block_num IS NULL THEN
+    TRUNCATE TABLE hive_notification_cache;
+  ELSE
+    DELETE FROM hive_notification_cache nc WHERE _prune_old AND nc.block_num <= __limit_block;
+  END IF;
+
+  INSERT INTO hive_notification_cache
+  (id, block_num, type_id, created_at, src, dst, dst_post_id, post_id, score, payload, community, community_title)
+  SELECT nv.id, nv.block_num, nv.type_id, nv.created_at, nv.src, nv.dst, nv.dst_post_id, nv.post_id, nv.score, nv.payload, nv.community, nv.community_title
+  FROM hive_raw_notifications_view nv
+  WHERE nv.block_num > __limit_block AND (_first_block_num IS NULL OR nv.block_num BETWEEN _first_block_num AND _last_block_num)
+  ;
+END
+$function$
+LANGUAGE plpgsql VOLATILE
+;
diff --git a/hive/db/sql_scripts/notifications_view.sql b/hive/db/sql_scripts/notifications_view.sql
index 38ba082d8..ba797e9ab 100644
--- a/hive/db/sql_scripts/notifications_view.sql
+++ b/hive/db/sql_scripts/notifications_view.sql
@@ -70,7 +70,11 @@ RETURNS FLOAT
 LANGUAGE 'sql'
 IMMUTABLE
 AS $BODY$
-    SELECT CAST( ( _post_payout/_post_rshares ) * _vote_rshares as FLOAT);
+    SELECT CASE _post_rshares != 0
+              WHEN TRUE THEN CAST( ( _post_payout/_post_rshares ) * _vote_rshares as FLOAT)
+           ELSE
+              CAST(0 AS FLOAT)
+           END
 $BODY$;
 
 
diff --git a/hive/db/sql_scripts/upgrade/upgrade_runtime_migration.sql b/hive/db/sql_scripts/upgrade/upgrade_runtime_migration.sql
index c7fb77d69..7e02aa97d 100644
--- a/hive/db/sql_scripts/upgrade/upgrade_runtime_migration.sql
+++ b/hive/db/sql_scripts/upgrade/upgrade_runtime_migration.sql
@@ -108,6 +108,23 @@ END
 $BODY$;
 COMMIT;
 
+START TRANSACTION;
+DO
+$BODY$
+BEGIN
+IF EXISTS (SELECT * FROM hive_db_data_migration WHERE migration = 'Notification cache initial fill') THEN
+  RAISE NOTICE 'Performing notification cache initial fill...';
+  SET work_mem='2GB';
+  PERFORM update_notification_cache(NULL, NULL, False);
+  DELETE FROM hive_db_data_migration WHERE migration = 'Notification cache initial fill';
+ELSE
+  RAISE NOTICE 'Skipping notification cache initial fill...';
+END IF;
+
+END
+$BODY$;
+COMMIT;
+
 START TRANSACTION;
 
 TRUNCATE TABLE hive_db_data_migration;
diff --git a/hive/db/sql_scripts/upgrade/upgrade_table_schema.sql b/hive/db/sql_scripts/upgrade/upgrade_table_schema.sql
index 23e37c22a..03abe3a36 100644
--- a/hive/db/sql_scripts/upgrade/upgrade_table_schema.sql
+++ b/hive/db/sql_scripts/upgrade/upgrade_table_schema.sql
@@ -295,3 +295,30 @@ CREATE INDEX IF NOT EXISTS hive_posts_author_id_created_at_idx ON public.hive_po
 
 CREATE INDEX IF NOT EXISTS hive_blocks_created_at_idx ON hive_blocks (created_at);
 
+INSERT INTO hive_db_data_migration
+SELECT 'Notification cache initial fill'
+WHERE NOT EXISTS (SELECT data_type
+              FROM information_schema.columns
+              WHERE table_name = 'hive_notification_cache');
+
+--- Notification cache to significantly speedup notification APIs.
+CREATE TABLE IF NOT EXISTS hive_notification_cache
+(
+  id BIGINT NOT NULL,
+  block_num INT NOT NULL,
+  type_id INT NOT NULL,
+  dst INT NULL,
+  src INT NULL,
+  dst_post_id INT NULL,
+  post_id INT NULL,
+  score INT NOT NULL,
+  created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  community_title VARCHAR(32) NULL,
+  community VARCHAR(16) NULL,
+  payload VARCHAR NULL,
+
+  CONSTRAINT hive_notification_cache_pk PRIMARY KEY (id)
+);
+
+CREATE INDEX IF NOT EXISTS hive_notification_cache_block_num_idx ON hive_notification_cache (block_num);
+CREATE INDEX IF NOT EXISTS hive_notification_cache_dst_score_idx ON hive_notification_cache (dst, score) WHERE dst IS NOT NULL;
diff --git a/hive/indexer/blocks.py b/hive/indexer/blocks.py
index 0e2604195..7a94abcac 100644
--- a/hive/indexer/blocks.py
+++ b/hive/indexer/blocks.py
@@ -432,13 +432,16 @@ class Blocks:
         """
         update_active_starting_from_posts_on_block( first_block, last_block )
 
+        is_hour_action = last_block % 1200 == 0
+
         queries = [
             "SELECT update_posts_rshares({}, {})".format(first_block, last_block),
             "SELECT update_hive_posts_children_count({}, {})".format(first_block, last_block),
             "SELECT update_hive_posts_root_id({},{})".format(first_block, last_block),
             "SELECT update_hive_posts_api_helper({},{})".format(first_block, last_block),
             "SELECT update_feed_cache({}, {})".format(first_block, last_block),
-            "SELECT update_hive_posts_mentions({}, {})".format(first_block, last_block)
+            "SELECT update_hive_posts_mentions({}, {})".format(first_block, last_block),
+            "SELECT update_notification_cache({}, {}, {})".format(first_block, last_block, is_hour_action)
             #,"SELECT update_account_reputations({}, {})".format(first_block, last_block)
         ]
 
-- 
GitLab