From 6f924f0b1955d91b6a033b60d8dd44f53cf1822c Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Thu, 12 Jun 2025 22:12:26 +0000 Subject: [PATCH 01/15] Fix building HAF in CI --- .gitlab-ci.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1de1ff7..3883346 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -84,11 +84,10 @@ prepare_haf_image: extends: .prepare_haf_image variables: SUBMODULE_DIR: "$CI_PROJECT_DIR/submodules/haf" - REGISTRY_USER: "$CI_REGISTRY_USER" - REGISTRY_PASS: "$CI_REGISTRY_PASSWORD" + REGISTRY_USER: "$HAF_IMG_BUILDER_USER" + REGISTRY_PASS: "$HAF_IMG_BUILDER_PASSWORD" before_script: - git config --global --add safe.directory $CI_PROJECT_DIR/submodules/haf - - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY tags: - public-runner-docker - hived-for-tests @@ -204,4 +203,4 @@ sync: expire_in: 1 week when: always tags: - - data-cache-storage \ No newline at end of file + - data-cache-storage -- GitLab From 4fea7019549ba7607cb444f50db7e8d5ec0e4a7f Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Sun, 15 Jun 2025 21:50:20 +0000 Subject: [PATCH 02/15] Update embeddings when posts are modified --- db/database_schema.sql | 3 +- db/main_loop.sql | 168 ++++++++++++++++++++++------------------- 2 files changed, 94 insertions(+), 77 deletions(-) diff --git a/db/database_schema.sql b/db/database_schema.sql index bdc8a55..93d3aac 100644 --- a/db/database_schema.sql +++ b/db/database_schema.sql @@ -83,7 +83,8 @@ BEGIN CREATE TABLE IF NOT EXISTS hivesense_app.post_data ( post_id INT PRIMARY KEY, - number_of_tokens INT NOT NULL + number_of_tokens INT NOT NULL, + last_vectors_block INT NOT NULL DEFAULT -1 ); -- the current version of sqlfluff doesn't understand 'GRANT MAINTAIN' diff --git a/db/main_loop.sql b/db/main_loop.sql index b17b0b4..6c2beed 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -37,30 +37,35 @@ $BODY$; CREATE OR REPLACE FUNCTION hivesense_block_range_data( _first_block_num INT, - _last_block_num INT, - _logs BOOLEAN, - _worker INT + _last_block_num INT, + _logs BOOLEAN, + _worker INT ) -RETURNS INT -- NULL need to wait for hivemind, otherwise number of vectorized posts +RETURNS INT LANGUAGE plpgsql VOLATILE PARALLEL SAFE AS $$ DECLARE __hivemind_current_block INT; - __start_ts timestamptz; - __end_ts timestamptz; - __number_of_posts INT; - __number_of_chunks INT; - - __number_of_workers INT; - __tokenizer_name TEXT; - __max_tokens INT; - __min_new_ratio REAL; - __lang_model TEXT; - __doc_prefix TEXT; - __min_token_threshold INT; + __start_ts timestamptz; + __end_ts timestamptz; + __number_of_posts INT := 0; -- counter for this run + __number_of_chunks INT := 0; -- counter for this run + __c INT; -- temp for ROW_COUNT + + __number_of_workers INT; + __tokenizer_name TEXT; + __max_tokens INT; + __min_new_ratio REAL; + __lang_model TEXT; + __doc_prefix TEXT; + __min_token_threshold INT; __max_embeddings_per_post INT; + + -- for the FOR loop + rec RECORD; + __prev_block INT; BEGIN ASSERT _first_block_num <= _last_block_num, 'Invalid range of blocks'; @@ -116,20 +121,18 @@ BEGIN CREATE TEMP TABLE tmp_posts_to_vectorize ( post_id INT PRIMARY KEY, bodies TEXT[], - token_count INT + token_count INT, + block_num INT ) ON COMMIT DROP; - -- Fill tmp_posts_to_vectorize exactly once for each post - INSERT INTO tmp_posts_to_vectorize(post_id, bodies, token_count) + INSERT INTO tmp_posts_to_vectorize(post_id, bodies, token_count, block_num) SELECT hp.id, pp.chunks, - pp.token_count + pp.token_count, + hp.block_num FROM hivemind_app.hive_posts AS hp - JOIN hivemind_app.hive_post_data AS hpd - ON hpd.id = hp.id - - -- Single CROSS JOIN LATERAL that calls preprocess_post exactly once + JOIN hivemind_app.hive_post_data AS hpd ON hpd.id = hp.id CROSS JOIN LATERAL ( SELECT * FROM preprocess_post( @@ -146,62 +149,75 @@ BEGIN __min_token_threshold ) ) AS pp(chunks, token_count) - WHERE (hp.root_id = hp.id OR hp.root_id = 0) - AND hp.block_num_created BETWEEN _first_block_num AND _last_block_num - AND pp.chunks IS NOT NULL; - - - -- record the token counts for posterity - INSERT INTO hivesense_app.post_data(post_id, number_of_tokens) - SELECT post_id, token_count - FROM tmp_posts_to_vectorize - ON CONFLICT (post_id) DO NOTHING; - - -- Explode bodies[] into chunks - WITH post_chunks AS ( - SELECT - post_id, - bodies[idx] AS chunk_text, - (idx - 1) AS chunk_number - FROM tmp_posts_to_vectorize - CROSS JOIN LATERAL generate_subscripts(bodies, 1) AS idx - ), id_and_body_agg AS ( - SELECT - ARRAY_AGG( (pc.post_id, pc.chunk_text, pc.chunk_number) - ::hivesense_app.id_and_post_chunk ) AS id_and_body - FROM post_chunks pc - ), embeddings AS ( - SELECT - (pv).post_id, - (pv).chunk_number, - (pv).vec - FROM ( - SELECT UNNEST(hivesense_app.hivesense_embed( ibagg.id_and_body) - ) AS pv - FROM id_and_body_agg ibagg - ) AS subquery - ), insert_into_posts_vectors AS ( - INSERT INTO hivesense_app.posts_vectors(post_id, chunk_number, embedding) - SELECT e.post_id, - e.chunk_number, - CASE - WHEN hivesense_app.store_halfvec_embeddings() - THEN e.vec::public.halfvec - ELSE e.vec - END - FROM embeddings e - ) - SELECT - (SELECT CARDINALITY(id_and_body) FROM id_and_body_agg), - (SELECT COUNT(*) FROM embeddings) - INTO __number_of_posts, __number_of_chunks; + AND ( + hp.block_num_created BETWEEN _first_block_num AND _last_block_num + OR hp.block_num BETWEEN _first_block_num AND _last_block_num + ); + + -- ◉◉◉ PER-POST LOOP WITH SERIALIZATION ◉◉◉ + FOR rec IN + SELECT post_id, bodies, token_count, block_num + FROM tmp_posts_to_vectorize + LOOP + -- ensure there's a metadata row to lock + INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) + VALUES (rec.post_id, + COALESCE(rec.token_count, 0), + -1) + ON CONFLICT (post_id) DO NOTHING; + + -- serialize on this post_id + SELECT last_vectors_block + INTO __prev_block + FROM hivesense_app.post_data + WHERE post_id = rec.post_id + FOR UPDATE; + + IF rec.block_num > COALESCE(__prev_block, -1) THEN + __number_of_posts := __number_of_posts + 1; + + -- delete any prior vectors + DELETE FROM hivesense_app.posts_vectors + WHERE post_id = rec.post_id; + + -- generate & insert new embeddings + INSERT INTO hivesense_app.posts_vectors(post_id, chunk_number, embedding) + SELECT + (pv).post_id, + (pv).chunk_number, + CASE + WHEN hivesense_app.store_halfvec_embeddings() + THEN (pv).vec::public.halfvec + ELSE (pv).vec + END + FROM ( + SELECT UNNEST( + hivesense_app.hivesense_embed( + ARRAY( + SELECT (rec.post_id, rec.bodies[idx], idx-1) + ::hivesense_app.id_and_post_chunk + FROM generate_subscripts(rec.bodies,1) AS idx + ) + ) + ) AS pv + ) AS sub; + + -- count how many chunks we just wrote + GET DIAGNOSTICS __c = ROW_COUNT; + __number_of_chunks := __number_of_chunks + __c; + + -- bump metadata to block_num + UPDATE hivesense_app.post_data + SET number_of_tokens = COALESCE(rec.token_count, 0), + last_vectors_block = rec.block_num + WHERE post_id = rec.post_id; + END IF; + END LOOP; - -- clean up DROP TABLE tmp_posts_to_vectorize; - --RAISE NOTICE 'End of hivesense_block_range_data, % posts, % chunks', __number_of_posts, __number_of_chunks; - RETURN coalesce(__number_of_posts, 0); + RETURN COALESCE(__number_of_posts, 0); END; $$; -- GitLab From a867457f426dac9d8b888ef3678e7eeb7985e36f Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 16 Jun 2025 18:54:36 +0000 Subject: [PATCH 03/15] Add code for syncing embeddings from other machines --- Dockerfile.syncer | 25 ++++++++ db/database_schema.sql | 26 +++++++- db/main_loop.sql | 23 +++++-- scripts/build_images.sh | 1 + scripts/sync_embeddings.py | 119 +++++++++++++++++++++++++++++++++++++ 5 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 Dockerfile.syncer create mode 100755 scripts/sync_embeddings.py diff --git a/Dockerfile.syncer b/Dockerfile.syncer new file mode 100644 index 0000000..e9d2597 --- /dev/null +++ b/Dockerfile.syncer @@ -0,0 +1,25 @@ +# Stage 1 – base image with only needed packages +FROM python:3.11-slim + +# Set environment variables to reduce image size and avoid Python warnings +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + POETRY_VIRTUALENVS_CREATE=false + +# Install system packages required for psycopg2 (libpq) +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Create working directory and copy script +WORKDIR /app +COPY scripts/sync_embeddings.py . + +# Install only necessary Python packages +RUN pip install --no-cache-dir \ + psycopg2-binary \ + requests + +# Default command +CMD ["python", "sync_embeddings.py"] diff --git a/db/database_schema.sql b/db/database_schema.sql index 93d3aac..78a6348 100644 --- a/db/database_schema.sql +++ b/db/database_schema.sql @@ -54,6 +54,9 @@ BEGIN runtime_hash TEXT ); + -- Monotonic sequence that orders every logical operation + CREATE SEQUENCE IF NOT EXISTS hivesense_app.sync_seq; + -- extend to public, to find vector from pgvector EXECUTE format( 'SET SEARCH_PATH TO %s, public', __schema_name ); IF __store_halfvec_embeddings THEN @@ -63,7 +66,8 @@ BEGIN post_id INT NOT NULL, chunk_number INT NOT NULL, embedding public.halfvec(%s) NOT NULL, - PRIMARY KEY (post_id, chunk_number) + sync_seq INT, -- drawn from hivesense_app.sync_seq + PRIMARY KEY (sync_seq, post_id, chunk_number) ); $$, __vector_size); ELSE @@ -73,7 +77,8 @@ BEGIN post_id INT NOT NULL, chunk_number INT NOT NULL, embedding vector(%s) NOT NULL, - PRIMARY KEY (post_id, chunk_number) + sync_seq INT, -- drawn from hivesense_app.sync_seq + PRIMARY KEY (sync_seq, post_id, chunk_number) ); $$, __vector_size); END IF; @@ -159,4 +164,21 @@ CREATE TABLE IF NOT EXISTS hivesense_app.block_tasks ( ); CREATE INDEX IF NOT EXISTS block_tasks_pending_idx ON hivesense_app.block_tasks (status, shard); +-- Helpful index for “give me everything > after_seq” +CREATE INDEX IF NOT EXISTS posts_vectors_sync_seq_idx + ON hivesense_app.posts_vectors(sync_seq); + +-- 2️⃣ Table that records logical deletions +CREATE TABLE IF NOT EXISTS hivesense_app.deleted_embeddings ( + post_id INT NOT NULL, + sync_seq INT NOT NULL, + PRIMARY KEY (sync_seq, post_id) +); + +CREATE INDEX IF NOT EXISTS deleted_embeddings_sync_seq_idx + ON hivesense_app.deleted_embeddings(sync_seq); + +RESET ROLE; + + RESET ROLE; diff --git a/db/main_loop.sql b/db/main_loop.sql index 6c2beed..b9ab439 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -66,6 +66,7 @@ DECLARE -- for the FOR loop rec RECORD; __prev_block INT; + __sync_seq INT; -- seq that orders insert / delete ops BEGIN ASSERT _first_block_num <= _last_block_num, 'Invalid range of blocks'; @@ -175,14 +176,25 @@ BEGIN FOR UPDATE; IF rec.block_num > COALESCE(__prev_block, -1) THEN + -- Reserve a sequence value that will identify this logical operation + SELECT nextval('hivesense_app.sync_seq') INTO __sync_seq; + __number_of_posts := __number_of_posts + 1; - -- delete any prior vectors - DELETE FROM hivesense_app.posts_vectors - WHERE post_id = rec.post_id; + -- Was the post previously embedded? If so, log a delete operation + IF EXISTS ( + SELECT 1 FROM hivesense_app.posts_vectors + WHERE post_id = rec.post_id + ) THEN + DELETE FROM hivesense_app.posts_vectors + WHERE post_id = rec.post_id; + + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + VALUES (rec.post_id, __sync_seq); + END IF; -- generate & insert new embeddings - INSERT INTO hivesense_app.posts_vectors(post_id, chunk_number, embedding) + INSERT INTO hivesense_app.posts_vectors(post_id, chunk_number, embedding, sync_seq) SELECT (pv).post_id, (pv).chunk_number, @@ -190,7 +202,8 @@ BEGIN WHEN hivesense_app.store_halfvec_embeddings() THEN (pv).vec::public.halfvec ELSE (pv).vec - END + END, + __sync_seq FROM ( SELECT UNNEST( hivesense_app.hivesense_embed( diff --git a/scripts/build_images.sh b/scripts/build_images.sh index d501a5c..45e752f 100755 --- a/scripts/build_images.sh +++ b/scripts/build_images.sh @@ -52,6 +52,7 @@ SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" docker build -t "registry.gitlab.syncad.com/hive/hivesense:${TAG}" "${SCRIPTPATH}/.." docker build -t "registry.gitlab.syncad.com/hive/hivesense/rewiter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." +docker build -t "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.syncer" "${SCRIPTPATH}/.." echo "Build images tag ${TAG}" diff --git a/scripts/sync_embeddings.py b/scripts/sync_embeddings.py new file mode 100755 index 0000000..64eb34f --- /dev/null +++ b/scripts/sync_embeddings.py @@ -0,0 +1,119 @@ +#! /usr/bin/env python3 +""" +sync_embeddings.py – pull /embedding-updates and mirror them locally +""" + +import os, time, json, logging, requests, psycopg2 +from psycopg2.extras import execute_values + +API_URL = os.environ["HS_EMBED_API"] # e.g. https://upstream/embedding-updates +DB_DSN = os.environ["LOCAL_PG_DSN"] # postgres://… (same schema names) + +BATCH = 1000 +RETRY_SLEEP = 3 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s" +) + + +def get_last_seq(cur) -> int: + cur.execute(""" + SELECT COALESCE(MAX(sync_seq), 0) FROM ( + SELECT MAX(sync_seq) AS sync_seq FROM hivesense_app.posts_vectors + UNION ALL + SELECT MAX(sync_seq) FROM hivesense_app.deleted_embeddings + ) AS t; + """) + return cur.fetchone()[0] or 0 + + +POST_LOOKUP_SQL = """ +SELECT hp.id + FROM hivemind_app.hive_posts hp + JOIN hivemind_app.hive_accounts ha ON ha.id = hp.author_id + JOIN hivemind_app.hive_permlink_data pd ON pd.id = hp.permlink_id + WHERE ha.name = %s + AND pd.permlink = %s + AND hp.counter_deleted = 0 + LIMIT 1; +""" + + +def resolve_post_id(cur, author, permlink): + cur.execute(POST_LOOKUP_SQL, (author, permlink)) + row = cur.fetchone() + return row[0] if row else None + + +def upsert_vectors(cur, post_id, sync_seq, rows): + payload = [ + (sync_seq, post_id, r["chunk_number"], r["embedding"]) + for r in rows + ] + execute_values( + cur, + """INSERT INTO hivesense_app.posts_vectors + (sync_seq, post_id, chunk_number, embedding) + VALUES %s""", + payload + ) + + +def apply_op(conn, op): + """Apply a single operation; block-retry until the post_id is resolvable.""" + while True: + with conn.cursor() as cur: + post_id = resolve_post_id(cur, op["author"], op["permlink"]) + if post_id is None: + logging.info( + "Post %s/%s not yet present locally – retry in %ds", + op["author"], op["permlink"], RETRY_SLEEP + ) + conn.rollback() + time.sleep(RETRY_SLEEP) + continue # try again + # ---- we have a post_id ---- + if op["op"] == "delete": + cur.execute( + "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", + (post_id,) + ) + cur.execute( + """INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + VALUES (%s, %s) + ON CONFLICT (sync_seq, post_id) DO NOTHING""", + (post_id, op["sync_seq"]) + ) + else: # insert | update + cur.execute( + "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", + (post_id,) + ) + upsert_vectors(cur, post_id, op["sync_seq"], op["embedding_rows"]) + return # success – caller will COMMIT + + +def main(): + conn = psycopg2.connect(DB_DSN, autocommit=False) + + while True: + with conn.cursor() as cur: + after_seq = get_last_seq(cur) + params = {"after_seq": after_seq, "limit": BATCH} + r = requests.get(API_URL, params=params, timeout=60) + r.raise_for_status() + ops = r.json() + if not ops: + time.sleep(5) + continue + + for op in ops: + logging.info("Applying %s %s/%s (seq %s)", + op["op"], op["author"], op["permlink"], op["sync_seq"]) + apply_op(conn, op) + conn.commit() # durable after every logical op + +if __name__ == "__main__": + main() -- GitLab From 877ca7922a232be84085831c7c4f890225471453 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 16 Jun 2025 20:52:23 +0000 Subject: [PATCH 04/15] Fix tests that expect old-style threading --- scripts/ci-helpers/wait-for-hivesense-startup.sh | 2 +- tests/integration/api_node/hivesense_synced_api_node_test.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/ci-helpers/wait-for-hivesense-startup.sh b/scripts/ci-helpers/wait-for-hivesense-startup.sh index 05677b1..10282d4 100755 --- a/scripts/ci-helpers/wait-for-hivesense-startup.sh +++ b/scripts/ci-helpers/wait-for-hivesense-startup.sh @@ -7,7 +7,7 @@ wait_for_hivesense_startup() { MESSAGE="Waiting for Hivesense to finish processing blocks..." HIVEMIND_BLOCK_COMMAND="SELECT last_completed_block_num FROM hivemind_app.hive_state" HAF_BLOCK_COMMAND="SELECT consistent_block FROM hafd.hive_state" - HIVESENSE_BLOCK_COMMAND="SELECT hive.app_get_current_block_num('hivesense_app1')" + HIVESENSE_BLOCK_COMMAND="SELECT hive.app_get_current_block_num('hivesense_app')" i=0 while : diff --git a/tests/integration/api_node/hivesense_synced_api_node_test.sh b/tests/integration/api_node/hivesense_synced_api_node_test.sh index 31c744a..495e2d6 100755 --- a/tests/integration/api_node/hivesense_synced_api_node_test.sh +++ b/tests/integration/api_node/hivesense_synced_api_node_test.sh @@ -9,7 +9,7 @@ query_database() { } # 1. check if hivesense is synced -hivesense_block_num=$(query_database "SELECT current_block_num FROM hafd.contexts WHERE name = 'hivesense_app1'") +hivesense_block_num=$(query_database "SELECT current_block_num FROM hafd.contexts WHERE name = 'hivesense_app'") if [ "$hivesense_block_num" -ne 1000000 ]; then echo "Current block num ${hivesense_block_num} != 1000000" >&2 exit 1 -- GitLab From 0cb28d366c620aed3c805d2356ce66a7a36782b4 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Sun, 22 Jun 2025 20:14:13 +0000 Subject: [PATCH 05/15] Crank up quality parameters on hnsw index, add request headers to allow the caller to experiment with different search parameters when doing a/b testing --- db/helpers.sql | 6 ++++-- db/search.sql | 34 +++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/db/helpers.sql b/db/helpers.sql index 6411cf9..cb4fde2 100644 --- a/db/helpers.sql +++ b/db/helpers.sql @@ -107,7 +107,8 @@ BEGIN RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; CREATE INDEX IF NOT EXISTS posts_vectors_embedding_half_hnsw ON hivesense_app.posts_vectors - USING hnsw (embedding public.halfvec_cosine_ops); + USING hnsw (embedding public.halfvec_cosine_ops) + WITH (m = 32, ef_construction = 400); ELSIF half THEN RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; EXECUTE format( @@ -118,7 +119,8 @@ BEGIN RAISE NOTICE 'Creating full-precision HNSW index (%s-d)…', dim; CREATE INDEX IF NOT EXISTS posts_vectors_embedding_hnsw ON hivesense_app.posts_vectors - USING hnsw (embedding public.vector_cosine_ops); + USING hnsw (embedding public.vector_cosine_ops) + WITH (m = 32, ef_construction = 400); END IF; END; $$; diff --git a/db/search.sql b/db/search.sql index 57bd1fa..0ef181f 100644 --- a/db/search.sql +++ b/db/search.sql @@ -31,13 +31,37 @@ DECLARE max_posts int := LEAST(_limit, 1000); -- if _start_post_id=0 we collect immediately; otherwise skip until we see it collecting bool := (_start_post_id = 0); - batch_size int := GREATEST(_limit * 5, 50); -- start with 5× - sql text; + + -- pull headers and defaults + req_headers json; + batch_multiplier int := 5; -- default multiplier + exploratory_factor int := 1000; -- default ef_search + + -- batch size will be set in BEGIN + batch_size int; + sql text; __min_search_tokens int; BEGIN + -- grab the incoming headers JSON (if any) + SELECT current_setting('request.headers', true)::json + INTO req_headers; + + -- override defaults if headers are present + batch_multiplier := COALESCE( + (req_headers->>'x-batch-size-multiplier')::int, + batch_multiplier + ); + exploratory_factor := COALESCE( + (req_headers->>'x-exploratory-factor')::int, + exploratory_factor + ); + + -- initialize batch_size using the (possibly overridden) multiplier + batch_size := GREATEST(_limit * batch_multiplier, 50); + -- tune pgvector index parameters - PERFORM set_config('ivfflat.probes', '4', true); - PERFORM set_config('hnsw.ef_search', '1000', true); + PERFORM set_config('ivfflat.probes', '4', true); + PERFORM set_config('hnsw.ef_search', exploratory_factor::text, true); SELECT min_token_search_threshold INTO __min_search_tokens @@ -47,7 +71,7 @@ BEGIN RAISE NOTICE 'In find_nearest_posts_with_embedding(vec, %, %, %, %)', _limit, _exclude_post_id, _observer_id, _start_post_id; LOOP - RAISE NOTICE 'Getting % posts', batch_size; + RAISE NOTICE 'Getting % posts (batch_size: %)', batch_size, batch_size; sql := format($q$ SELECT hpv.post_id, %s AS similarity FROM hivesense_app.posts_vectors hpv -- GitLab From 8dc2eccb28e969c2ae13ea7e6ec11c7e97156496 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Sun, 22 Jun 2025 22:52:53 +0000 Subject: [PATCH 06/15] Bump haf and hivemind submodules to latest develop --- submodules/haf | 2 +- submodules/hivemind | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/submodules/haf b/submodules/haf index 61e4e13..9ef8b5d 160000 --- a/submodules/haf +++ b/submodules/haf @@ -1 +1 @@ -Subproject commit 61e4e13a348ea3deac6ac118bfc7b99e23fec0be +Subproject commit 9ef8b5df2abe939aa0e4b219b903b2cd4fcc0bb5 diff --git a/submodules/hivemind b/submodules/hivemind index 1242d37..e378766 160000 --- a/submodules/hivemind +++ b/submodules/hivemind @@ -1 +1 @@ -Subproject commit 1242d377bf0bc6b35ede2567e7a0340f2efdd178 +Subproject commit e37876656e274b5657b682a2c57978603340a4e6 -- GitLab From f6acaf4dac69979f0ede2e1d3c315c82c87caddc Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 23 Jun 2025 14:23:57 +0000 Subject: [PATCH 07/15] Use local copy of python docker image --- Dockerfile.syncer | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.syncer b/Dockerfile.syncer index e9d2597..4ece200 100644 --- a/Dockerfile.syncer +++ b/Dockerfile.syncer @@ -1,5 +1,5 @@ # Stage 1 – base image with only needed packages -FROM python:3.11-slim +FROM registry.gitlab.syncad.com/hive/hivesense/python:3.13-slim # Set environment variables to reduce image size and avoid Python warnings ENV PYTHONDONTWRITEBYTECODE=1 \ -- GitLab From c9a787fa7f2832d53fc2ead36f3dc49b80603c63 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 23 Jun 2025 15:39:22 +0000 Subject: [PATCH 08/15] Prevent deadlock by inserting the post metadata we serialize on from the scheduler instead of the workers --- db/main_loop.sql | 50 ++++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/db/main_loop.sql b/db/main_loop.sql index b9ab439..ba77517 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -47,7 +47,6 @@ VOLATILE PARALLEL SAFE AS $$ DECLARE - __hivemind_current_block INT; __start_ts timestamptz; __end_ts timestamptz; __number_of_posts INT := 0; -- counter for this run @@ -70,8 +69,6 @@ DECLARE BEGIN ASSERT _first_block_num <= _last_block_num, 'Invalid range of blocks'; - -- will RAISE when hivemind context does not exist - SELECT hive.app_get_current_block_num('hivemind_app') INTO __hivemind_current_block; SELECT parallel_workers, tokenizer_model, tokens_per_chunk, @@ -94,15 +91,6 @@ BEGIN ASSERT __number_of_workers IS NOT NULL, 'NULL number of workers'; ASSERT __number_of_workers > 0 , 'number of workers less than 1'; - -- hivemind exists - IF __hivemind_current_block < _first_block_num THEN - RETURN NULL; - END IF; - - IF __hivemind_current_block < _last_block_num THEN - RETURN NULL; - END IF; - -- TODO(mickiewicz@syncad.com) when hivemind is not in a live stage then do not process -- maybe it is not required because last_completed in enough ? -- but last completed does not guaranteen index on block_num_created, but maybe this is an edge case @@ -160,14 +148,8 @@ BEGIN FOR rec IN SELECT post_id, bodies, token_count, block_num FROM tmp_posts_to_vectorize + ORDER BY post_id LOOP - -- ensure there's a metadata row to lock - INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) - VALUES (rec.post_id, - COALESCE(rec.token_count, 0), - -1) - ON CONFLICT (post_id) DO NOTHING; - -- serialize on this post_id SELECT last_vectors_block INTO __prev_block @@ -349,6 +331,7 @@ CREATE OR REPLACE PROCEDURE hivesense_app.scheduler( LANGUAGE plpgsql AS $$ DECLARE + __hivemind_current_block INT; __context_name hive.context_name := _app_context_base_name; -- single context __start_block INT := 0; __blocks_range hive.blocks_range := (0,0); @@ -455,6 +438,17 @@ BEGIN CONTINUE; END IF; + -- ── NEW: wait until hivemind has processed through this range ─────────────── + LOOP + SELECT hive.app_get_current_block_num('hivemind_app') + INTO __hivemind_current_block; + EXIT WHEN __hivemind_current_block >= __blocks_range.last_block; + RAISE NOTICE 'Waiting for hivemind to reach block % (currently at %)', + __blocks_range.last_block, + __hivemind_current_block; + PERFORM pg_sleep(1); + END LOOP; + ------------------------------------------------------------------- -- Split the range into contiguous slices and enqueue one per shard ------------------------------------------------------------------- @@ -499,6 +493,24 @@ BEGIN __from_block := __to_block + 1; END LOOP; + -------------------------------------------------------------------------------- + -- bulk-upsert post_data for every post in this batch + -------------------------------------------------------------------------------- + WITH posts_to_seed AS ( + SELECT DISTINCT hp.id AS post_id + FROM hivemind_app.hive_posts hp + JOIN hivemind_app.hive_post_data hpd ON hpd.id = hp.id + WHERE (hp.root_id = hp.id OR hp.root_id = 0) + AND ( + hp.block_num_created BETWEEN __blocks_range.first_block AND __blocks_range.last_block + OR hp.block_num BETWEEN __blocks_range.first_block AND __blocks_range.last_block + ) + ) + INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) + SELECT post_id, 0, -1 + FROM posts_to_seed + ON CONFLICT (post_id) DO NOTHING; + COMMIT; -- we have to commit here so the workers can pick up the tasks -------------------------------------------------------------------------------- -- GitLab From f0571b354928c7b756c7e0abadf64135938d7c7d Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 23 Jun 2025 18:39:10 +0000 Subject: [PATCH 09/15] Switch to advisory locks for preventing workers from working on the same post simultaneously. Put synchronization advisory locks in their own namespaces --- db/database_schema.sql | 9 ++- db/main_loop.sql | 136 +++++++++++++++++++++++------------------ 2 files changed, 84 insertions(+), 61 deletions(-) diff --git a/db/database_schema.sql b/db/database_schema.sql index 78a6348..9447346 100644 --- a/db/database_schema.sql +++ b/db/database_schema.sql @@ -39,7 +39,8 @@ BEGIN embedding_dimensionality INT NOT NULL DEFAULT 768, -- size of embedding vectors generated by `llm` min_token_threshold INT NOT NULL DEFAULT 75, -- don't generate embeddings for posts shorter than this number of tokens min_token_search_threshold INT NOT NULL DEFAULT 0, -- ignore posts < this size when *searching* (0 = disabled) - max_embeddings_per_post INT -- max number of chunks per post, NULL for unlimited + max_embeddings_per_post INT, -- max number of chunks per post, NULL for unlimited + advisory_lock_namespace_begin INT -- start of advisory lock namespace, if running multiple instances, use different values (separated by, say, 10 or so) ); IF NOT hive.app_context_exists(__schema_name) THEN @@ -119,7 +120,8 @@ INSERT INTO hivesense_app_status embedding_dimensionality, min_token_threshold, min_token_search_threshold, - max_embeddings_per_post + max_embeddings_per_post, + advisory_lock_namespace_begin ) VALUES ( @@ -141,7 +143,8 @@ VALUES current_setting('PG_TEMP.VECTOR_SIZE', TRUE)::INT, current_setting('PG_TEMP.MIN_TOKEN_THRESHOLD', TRUE)::INT, current_setting('PG_TEMP.MIN_TOKEN_SEARCH_THRESHOLD', TRUE)::INT, - NULLIF(current_setting('PG_TEMP.MAX_EMBEDINGS_PER_POST', TRUE)::INT, 0) + NULLIF(current_setting('PG_TEMP.MAX_EMBEDINGS_PER_POST', TRUE)::INT, 0), + 10000 ) ON CONFLICT (id) DO UPDATE SET diff --git a/db/main_loop.sql b/db/main_loop.sql index ba77517..8b56e8e 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -53,14 +53,17 @@ DECLARE __number_of_chunks INT := 0; -- counter for this run __c INT; -- temp for ROW_COUNT - __number_of_workers INT; - __tokenizer_name TEXT; - __max_tokens INT; - __min_new_ratio REAL; - __lang_model TEXT; - __doc_prefix TEXT; - __min_token_threshold INT; - __max_embeddings_per_post INT; + __number_of_workers INT; + __tokenizer_name TEXT; + __max_tokens INT; + __min_new_ratio REAL; + __lang_model TEXT; + __doc_prefix TEXT; + __min_token_threshold INT; + __max_embeddings_per_post INT; + __advisory_lock_namespace_begin INT; + + __post_id_lock_namespace INT; -- for the FOR loop rec RECORD; @@ -76,7 +79,8 @@ BEGIN sentence_language_model, document_prefix, min_token_threshold, - max_embeddings_per_post + max_embeddings_per_post, + advisory_lock_namespace_begin INTO __number_of_workers, __tokenizer_name, __max_tokens, @@ -84,13 +88,16 @@ BEGIN __lang_model, __doc_prefix, __min_token_threshold, - __max_embeddings_per_post + __max_embeddings_per_post, + __advisory_lock_namespace_begin FROM hivesense_app.hivesense_app_status WHERE id = 1; ASSERT __number_of_workers IS NOT NULL, 'NULL number of workers'; ASSERT __number_of_workers > 0 , 'number of workers less than 1'; + __post_id_lock_namespace := __advisory_lock_namespace_begin + 3; + -- TODO(mickiewicz@syncad.com) when hivemind is not in a live stage then do not process -- maybe it is not required because last_completed in enough ? -- but last completed does not guaranteen index on block_num_created, but maybe this is an edge case @@ -150,12 +157,14 @@ BEGIN FROM tmp_posts_to_vectorize ORDER BY post_id LOOP - -- serialize on this post_id + -- grab an transaction‐scoped advisory lock in our own namespace to ensure no other workers + -- are working on this same post while we are + PERFORM pg_advisory_xact_lock(__post_id_lock_namespace, rec.post_id); + SELECT last_vectors_block INTO __prev_block FROM hivesense_app.post_data - WHERE post_id = rec.post_id - FOR UPDATE; + WHERE post_id = rec.post_id; IF rec.block_num > COALESCE(__prev_block, -1) THEN -- Reserve a sequence value that will identify this logical operation @@ -331,24 +340,25 @@ CREATE OR REPLACE PROCEDURE hivesense_app.scheduler( LANGUAGE plpgsql AS $$ DECLARE - __hivemind_current_block INT; - __context_name hive.context_name := _app_context_base_name; -- single context - __start_block INT := 0; - __blocks_range hive.blocks_range := (0,0); - __batch_id BIGINT; - __todo INT; - __breaking_reason break_reason := NULL; - __blocks INT; - __blocks_per_chunk INT; - __number_of_chunks INT; - __chunks_per_worker INT; - __extra INT; - __from_block INT; - __to_block INT; - _start_key BIGINT; - _done_key BIGINT; - _ack_key BIGINT; - __shard INT; + __hivemind_current_block INT; + __context_name hive.context_name := _app_context_base_name; -- single context + __start_block INT := 0; + __blocks_range hive.blocks_range := (0,0); + __batch_id BIGINT; + __todo INT; + __breaking_reason break_reason := NULL; + __blocks INT; + __blocks_per_chunk INT; + __number_of_chunks INT; + __chunks_per_worker INT; + __extra INT; + __from_block INT; + __to_block INT; + __advisory_lock_namespace_begin INT; + __start_key_namespace INT; + __done_key_namespace INT; + __ack_key_namespace INT; + __shard INT; BEGIN -- by default, postgresql logs when threads are blocked on a lock for more than a second. -- we use locks for synchronization, and expect threads to be blocked for at least 3s @@ -358,6 +368,15 @@ BEGIN -- -- PERFORM set_config('deadlock_timeout', '5s', true); -- PERFORM set_config('log_lock_waits', 'off', true); + + -- read configured start_block + SELECT start_block, advisory_lock_namespace_begin INTO __start_block, __advisory_lock_namespace_begin + FROM hivesense_app.hivesense_app_status; + + __start_key_namespace := __advisory_lock_namespace_begin; + __done_key_namespace := __advisory_lock_namespace_begin + 1; + __ack_key_namespace := __advisory_lock_namespace_begin + 2; + -------------------------------------------------------------------------------- -- **At initialization: acquire every start_key_i** so that workers block. -- @@ -371,14 +390,11 @@ BEGIN -- so that as soon as a worker tries to lock it at loop-top, it succeeds. -------------------------------------------------------------------------------- FOR __shard IN 1.._workers LOOP - PERFORM pg_advisory_lock(10_000_000 + __shard); -- hold start_key_i - PERFORM pg_advisory_lock(30_000_000 + __shard); -- hold ack_key_i + PERFORM pg_advisory_lock(__start_key_namespace, __shard); -- hold start_key_i + PERFORM pg_advisory_lock(__ack_key_namespace, __shard); -- hold ack_key_i END LOOP; RAISE NOTICE 'Scheduler: start_keys locked for all % workers; entering main loop...', _workers; - -- read configured start_block - SELECT start_block INTO __start_block - FROM hivesense_app.hivesense_app_status; IF _max_block_limit IS NOT NULL THEN RAISE NOTICE 'Max block limit is specified as: %', _max_block_limit; @@ -522,8 +538,7 @@ BEGIN -- • then signal back on done_key_i. -------------------------------------------------------------------------------- FOR __shard IN 1.._workers LOOP - _start_key := 10_000_000 + __shard; - PERFORM pg_advisory_unlock(_start_key); + PERFORM pg_advisory_unlock(__start_key_namespace, __shard); END LOOP; -------------------------------------------------------------------------------- @@ -538,24 +553,21 @@ BEGIN -- We do it in the order: LOCK(done_key_i) → UNLOCK(done_key_i) → UNLOCK(ack_key_i) → LOCK(ack_key_i) → LOCK(start_key_i) -------------------------------------------------------------------------------- FOR __shard IN 1.._workers LOOP - _done_key := 20_000_000 + __shard; - _ack_key := 30_000_000 + __shard; - _start_key := 10_000_000 + __shard; -- Wait for worker_i to signal “done”: - PERFORM pg_advisory_lock(_done_key); + PERFORM pg_advisory_lock(__done_key_namespace, __shard); -- Immediately drop done_key_i so that next time the worker can LOCK it: - PERFORM pg_advisory_unlock(_done_key); + PERFORM pg_advisory_unlock(__done_key_namespace, __shard); -- ACK the worker’s “done” by unlocking ack_key_i - PERFORM pg_advisory_unlock(_ack_key); + PERFORM pg_advisory_unlock(__ack_key_namespace, __shard); -- re‐grab ack_key_i so that the next time the worker tries to LOCK it, it will block - PERFORM pg_advisory_lock(_ack_key); + PERFORM pg_advisory_lock(__ack_key_namespace, __shard); -- Pre‐lock start_key_i again so that the worker will block on it next loop - PERFORM pg_advisory_lock(_start_key); + PERFORM pg_advisory_lock(__start_key_namespace, __shard); END LOOP; ------------------------------------------------------------------- @@ -607,13 +619,21 @@ CREATE OR REPLACE PROCEDURE hivesense_app.worker_loop( LANGUAGE plpgsql AS $$ DECLARE - __task RECORD; - __done_posts INT; - __breaking_reason break_reason := NULL; - _start_key BIGINT := 10_000_000 + _worker; - _done_key BIGINT := 20_000_000 + _worker; - _ack_key BIGINT := 30_000_000 + _worker; + __task RECORD; + __done_posts INT; + __breaking_reason break_reason := NULL; + __advisory_lock_namespace_begin INT; + __start_key_namespace INT; + __done_key_namespace INT; + __ack_key_namespace INT; BEGIN + SELECT advisory_lock_namespace_begin INTO __advisory_lock_namespace_begin + FROM hivesense_app.hivesense_app_status; + + __start_key_namespace := __advisory_lock_namespace_begin + __done_key_namespace := __advisory_lock_namespace_begin + 1 + __ack_key_namespace := __advisory_lock_namespace_begin + 2 + -- by default, postgresql logs when threads are blocked on a lock for more than a second. -- we use locks for synchronization, and expect threads to be blocked for at least 3s -- at a time. Disable that logging to avoid spamming the log file @@ -628,18 +648,18 @@ BEGIN -- Grab done_key_i so scheduler can wait on it later. This succeeds -- immediately the very first time (because we left done_key_i unlocked). -------------------------------------------------------------------------------- - PERFORM pg_advisory_lock(_done_key); + PERFORM pg_advisory_lock(__done_key_namespace, _worker); -------------------------------------------------------------------------------- -- Now block until the scheduler says “go” by unlocking start_key_i. -- As soon as that happens, we acquire start_key_i. -------------------------------------------------------------------------------- - PERFORM pg_advisory_lock(_start_key); + PERFORM pg_advisory_lock(__start_key_namespace, _worker); -------------------------------------------------------------------------------- -- Immediately release start_key_i so it’s available for the next batch. -------------------------------------------------------------------------------- - PERFORM pg_advisory_unlock(_start_key); + PERFORM pg_advisory_unlock(__start_key_namespace, _worker); LOOP ------------------------------------------------------------------ @@ -710,16 +730,16 @@ BEGIN -- We have emptied the queue (or hit break conditions). -- Signal “I’m done” by UNLOCKing done_key_i. -------------------------------------------------------------------------------- - PERFORM pg_advisory_unlock(_done_key); + PERFORM pg_advisory_unlock(__done_key_namespace, _worker); -- Now block on ack_key_i until the scheduler “acks” our done_key_i. - PERFORM pg_advisory_lock(_ack_key); + PERFORM pg_advisory_lock(__ack_key_namespace, _worker); -- As soon as the scheduler does `UNLOCK(ack_key_i)`, this returns. -------------------------------------------------------------------------------- -- Immediately release ack_key_i so that the scheduler can lock it for the next iteration -------------------------------------------------------------------------------- - PERFORM pg_advisory_unlock(_ack_key); + PERFORM pg_advisory_unlock(__ack_key_namespace, _worker); __breaking_reason := isbreakingpending(_app_context_name, _max_block_limit, NULL); IF __breaking_reason IS NOT NULL THEN -- GitLab From cad3b4cc164c7c295997612b528d5aaaeaecdf39 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Mon, 23 Jun 2025 19:09:44 +0000 Subject: [PATCH 10/15] Clean up unnecessary or confusing logging --- db/helpers.sql | 72 ++++++++++++++++++++++++++++++++++++------------ db/main_loop.sql | 50 +++++++++++++++++++++++++++------ 2 files changed, 96 insertions(+), 26 deletions(-) diff --git a/db/helpers.sql b/db/helpers.sql index cb4fde2..3783415 100644 --- a/db/helpers.sql +++ b/db/helpers.sql @@ -96,31 +96,69 @@ $$; CREATE OR REPLACE PROCEDURE ENSURE_INDEXES_ARE_CREATED() LANGUAGE plpgsql -AS -$$ +AS $$ DECLARE dim int := hivesense_app.embedding_dims(); half boolean := hivesense_app.use_halfvec_index(); store boolean := hivesense_app.store_halfvec_embeddings(); + idx_exists boolean; BEGIN IF store THEN - RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; - CREATE INDEX IF NOT EXISTS posts_vectors_embedding_half_hnsw - ON hivesense_app.posts_vectors - USING hnsw (embedding public.halfvec_cosine_ops) - WITH (m = 32, ef_construction = 400); + -- half‐precision on‐disk embedding index + SELECT EXISTS( + SELECT 1 + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'hivesense_app' + AND c.relname = 'posts_vectors_embedding_half_hnsw' + ) INTO idx_exists; + + IF NOT idx_exists THEN + RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; + CREATE INDEX posts_vectors_embedding_half_hnsw + ON hivesense_app.posts_vectors + USING hnsw (embedding public.halfvec_cosine_ops) + WITH (m = 32, ef_construction = 400); + END IF; + ELSIF half THEN - RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; - EXECUTE format( - 'CREATE INDEX IF NOT EXISTS posts_vectors_embedding_half_hnsw - ON hivesense_app.posts_vectors - USING hnsw ((embedding::public.halfvec(%1$s)) public.halfvec_cosine_ops);', dim); + -- half‐precision casted at index time + SELECT EXISTS( + SELECT 1 + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'hivesense_app' + AND c.relname = 'posts_vectors_embedding_half_hnsw' + ) INTO idx_exists; + + IF NOT idx_exists THEN + RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; + EXECUTE format( + 'CREATE INDEX posts_vectors_embedding_half_hnsw + ON hivesense_app.posts_vectors + USING hnsw ((embedding::public.halfvec(%1$s)) public.halfvec_cosine_ops) + WITH (m = 32, ef_construction = 400)', + dim + ); + END IF; + ELSE - RAISE NOTICE 'Creating full-precision HNSW index (%s-d)…', dim; - CREATE INDEX IF NOT EXISTS posts_vectors_embedding_hnsw - ON hivesense_app.posts_vectors - USING hnsw (embedding public.vector_cosine_ops) - WITH (m = 32, ef_construction = 400); + -- full‐precision embedding index + SELECT EXISTS( + SELECT 1 + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'hivesense_app' + AND c.relname = 'posts_vectors_embedding_hnsw' + ) INTO idx_exists; + + IF NOT idx_exists THEN + RAISE NOTICE 'Creating full-precision HNSW index (%s-d)…', dim; + CREATE INDEX posts_vectors_embedding_hnsw + ON hivesense_app.posts_vectors + USING hnsw (embedding public.vector_cosine_ops) + WITH (m = 32, ef_construction = 400); + END IF; END IF; END; $$; diff --git a/db/main_loop.sql b/db/main_loop.sql index 8b56e8e..04ffdc6 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -69,6 +69,10 @@ DECLARE rec RECORD; __prev_block INT; __sync_seq INT; -- seq that orders insert / delete ops + + __lock_start timestamptz; + __lock_end timestamptz; + __wait_interval interval; BEGIN ASSERT _first_block_num <= _last_block_num, 'Invalid range of blocks'; @@ -106,7 +110,11 @@ BEGIN --END IF; IF _logs THEN - RAISE NOTICE 'Hivesense % is processing block range: <%, %>', _worker, _first_block_num, _last_block_num; + IF _first_block_num = _last_block_num THEN + RAISE NOTICE 'worker % is processing block %', _worker, _first_block_num; + ELSE + RAISE NOTICE 'worker % is processing block range: <%, %>', _worker, _first_block_num, _last_block_num; + END IF; __start_ts := clock_timestamp(); END IF; @@ -159,7 +167,23 @@ BEGIN LOOP -- grab an transaction‐scoped advisory lock in our own namespace to ensure no other workers -- are working on this same post while we are - PERFORM pg_advisory_xact_lock(__post_id_lock_namespace, rec.post_id); + -- Try to grab the lock immediately + IF NOT pg_try_advisory_xact_lock(__post_id_lock_namespace, rec.post_id) THEN + -- If it failed, we know there’s contention: measure how long it takes to acquire + __lock_start := clock_timestamp(); + PERFORM pg_advisory_xact_lock(__post_id_lock_namespace, rec.post_id); + __lock_end := clock_timestamp(); + + -- Compute and log any non-zero wait + __wait_interval := __lock_end - __lock_start; + IF __wait_interval > '0s' THEN + RAISE NOTICE + 'worker % was blocked waiting to work on post % for %s seconds', + _worker, + rec.post_id, + to_char(EXTRACT(EPOCH FROM __wait_interval), 'FM999999.000'); + END IF; + END IF; SELECT last_vectors_block INTO __prev_block @@ -422,7 +446,7 @@ BEGIN CONTINUE; END IF; - RAISE NOTICE 'Past start block...'; + -- RAISE NOTICE 'Past start block...'; -- request the next range from HAF CALL hive.app_next_iteration( __context_name, @@ -431,7 +455,7 @@ BEGIN _limit => _max_block_limit ); - RAISE NOTICE 'App_next_iteration returned'; + -- RAISE NOTICE 'App_next_iteration returned'; -- check global break conditions __breaking_reason := isbreakingpending( @@ -477,7 +501,10 @@ BEGIN __batch_id := nextval('hivesense_app.batch_seq'); __from_block := __blocks_range.first_block; - RAISE NOTICE 'Splitting range % to %', __blocks_range.first_block, __blocks_range.last_block; + IF __blocks_range.last_block <> __blocks_range.first_block THEN + RAISE NOTICE 'Splitting range % to %', __blocks_range.first_block, __blocks_range.last_block; + END IF; + WHILE __from_block <= __blocks_range.last_block LOOP -- RAISE NOTICE 'SCHEDULER: computing work for'; __to_block := __from_block + __blocks_per_chunk - 1; @@ -630,9 +657,9 @@ BEGIN SELECT advisory_lock_namespace_begin INTO __advisory_lock_namespace_begin FROM hivesense_app.hivesense_app_status; - __start_key_namespace := __advisory_lock_namespace_begin - __done_key_namespace := __advisory_lock_namespace_begin + 1 - __ack_key_namespace := __advisory_lock_namespace_begin + 2 + __start_key_namespace := __advisory_lock_namespace_begin; + __done_key_namespace := __advisory_lock_namespace_begin + 1; + __ack_key_namespace := __advisory_lock_namespace_begin + 2; -- by default, postgresql logs when threads are blocked on a lock for more than a second. -- we use locks for synchronization, and expect threads to be blocked for at least 3s @@ -707,7 +734,12 @@ BEGIN ) INTO __done_posts; - RAISE NOTICE 'worker % processed block range % to % (% blocks) containing % posts', _worker, __task.first_block, __task.last_block, __task.last_block - __task.first_block, __done_posts; + IF __task.last_block = __task.first_block THEN + RAISE NOTICE 'worker % processed block % containing % posts', _worker, __task.first_block, __done_posts; + ELSE + RAISE NOTICE 'worker % processed block range % to % (% blocks) containing % posts', _worker, __task.first_block, __task.last_block, __task.last_block - __task.first_block + 1, __done_posts; + END IF; + -- If Hivemind isn’t caught up yet → rollback & wait0 IF __done_posts IS NULL THEN ROLLBACK; -- GitLab From 23423df9c35cc06a89a27218376be3eb13f4e2a4 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Tue, 24 Jun 2025 20:48:07 +0000 Subject: [PATCH 11/15] Rename rewriter image to postgrest-rewriter to match other projects, fix slow shutdown of same --- docker/rewriter_entrypoint.sh | 2 +- scripts/build_images.sh | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docker/rewriter_entrypoint.sh b/docker/rewriter_entrypoint.sh index 0072e77..530b867 100755 --- a/docker/rewriter_entrypoint.sh +++ b/docker/rewriter_entrypoint.sh @@ -11,4 +11,4 @@ fi sed "s|\${REWRITE_LOG}|$REWRITE_LOG|g" /usr/local/openresty/nginx/conf/nginx.conf.template > /usr/local/openresty/nginx/conf/nginx.conf # Start nginx -/usr/local/openresty/bin/openresty -g 'daemon off;' +exec /usr/local/openresty/bin/openresty -g 'daemon off;' diff --git a/scripts/build_images.sh b/scripts/build_images.sh index 45e752f..65a1b1c 100755 --- a/scripts/build_images.sh +++ b/scripts/build_images.sh @@ -51,13 +51,14 @@ set -eu pipefail SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" docker build -t "registry.gitlab.syncad.com/hive/hivesense:${TAG}" "${SCRIPTPATH}/.." -docker build -t "registry.gitlab.syncad.com/hive/hivesense/rewiter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." +docker build -t "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewiter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." docker build -t "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.syncer" "${SCRIPTPATH}/.." echo "Build images tag ${TAG}" if [ -n "${PUSH:-}" ]; then docker push "registry.gitlab.syncad.com/hive/hivesense:${TAG}" - docker push "registry.gitlab.syncad.com/hive/hivesense/rewiter:${TAG}" + docker push "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewiter:${TAG}" + docker push "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" echo "Pushed images tag ${TAG}" fi -- GitLab From 559f271e60add1807ee660abfbc2599980f4b2d9 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 25 Jun 2025 14:06:09 +0000 Subject: [PATCH 12/15] Several changes to syncing: dividing up work by posts instead of by blocks, adding foreign keys, numerous other fixes --- .gitignore | 1 + Dockerfile | 2 +- db/database_schema.sql | 60 ++-- db/main_loop.sql | 477 +++++++++++++++----------------- endpoints/embedding_updates.sql | 118 ++++++++ endpoints/endpoint_schema.sql | 42 +++ rewrite_rules.conf | 9 +- scripts/install_app.sh | 1 + scripts/openapi_rewrite.sh | 3 +- scripts/sync_embeddings.py | 123 ++++++-- 10 files changed, 522 insertions(+), 314 deletions(-) create mode 100644 endpoints/embedding_updates.sql diff --git a/.gitignore b/.gitignore index ae11224..b82e46f 100644 --- a/.gitignore +++ b/.gitignore @@ -151,3 +151,4 @@ pghero.yml # report file tavern_benchmarks_report.html +endpoints_openapi diff --git a/Dockerfile b/Dockerfile index 05fd52b..9f66010 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,4 +53,4 @@ COPY db /app/db COPY endpoints /app/endpoints COPY docker/scripts/docker-entrypoint.sh /app/docker-entrypoint.sh -ENTRYPOINT ["/app/docker-entrypoint.sh"] \ No newline at end of file +ENTRYPOINT ["/app/docker-entrypoint.sh"] diff --git a/db/database_schema.sql b/db/database_schema.sql index 9447346..7a58ddb 100644 --- a/db/database_schema.sql +++ b/db/database_schema.sql @@ -40,7 +40,8 @@ BEGIN min_token_threshold INT NOT NULL DEFAULT 75, -- don't generate embeddings for posts shorter than this number of tokens min_token_search_threshold INT NOT NULL DEFAULT 0, -- ignore posts < this size when *searching* (0 = disabled) max_embeddings_per_post INT, -- max number of chunks per post, NULL for unlimited - advisory_lock_namespace_begin INT -- start of advisory lock namespace, if running multiple instances, use different values (separated by, say, 10 or so) + advisory_lock_namespace_begin INT, -- start of advisory lock namespace, if running multiple instances, use different values (separated by, say, 10 or so) + max_visible_sync_seq INT NOT NULL DEFAULT 0 -- highest sync sequence number to publish, anything higher may have gaps that will be filled later ); IF NOT hive.app_context_exists(__schema_name) THEN @@ -58,40 +59,42 @@ BEGIN -- Monotonic sequence that orders every logical operation CREATE SEQUENCE IF NOT EXISTS hivesense_app.sync_seq; + -- This table will hold, for every post_id that we generate embeddings for, + -- the total number of tokens in that post. We insert one row per post_id. + CREATE TABLE IF NOT EXISTS hivesense_app.post_data + ( + post_id INT PRIMARY KEY REFERENCES hivemind_app.hive_posts(id), + number_of_tokens INT NOT NULL, + last_vectors_block INT NOT NULL DEFAULT -1 + ); + -- extend to public, to find vector from pgvector EXECUTE format( 'SET SEARCH_PATH TO %s, public', __schema_name ); IF __store_halfvec_embeddings THEN EXECUTE format($$ CREATE TABLE IF NOT EXISTS posts_vectors ( - post_id INT NOT NULL, + post_id INT NOT NULL REFERENCES hivesense_app.post_data(post_id), chunk_number INT NOT NULL, embedding public.halfvec(%s) NOT NULL, - sync_seq INT, -- drawn from hivesense_app.sync_seq - PRIMARY KEY (sync_seq, post_id, chunk_number) + sync_seq INT NOT NULL, -- drawn from hivesense_app.sync_seq + PRIMARY KEY (post_id, chunk_number) ); $$, __vector_size); ELSE EXECUTE format($$ CREATE TABLE IF NOT EXISTS posts_vectors ( - post_id INT NOT NULL, + post_id INT NOT NULL REFERENCES hivesense_app.post_data(post_id), chunk_number INT NOT NULL, embedding vector(%s) NOT NULL, - sync_seq INT, -- drawn from hivesense_app.sync_seq - PRIMARY KEY (sync_seq, post_id, chunk_number) + sync_seq INT NOT NULL, -- drawn from hivesense_app.sync_seq + PRIMARY KEY (post_id, chunk_number) ); $$, __vector_size); END IF; - - -- This table will hold, for every post_id that we generate embeddings for, - -- the total number of tokens in that post. We insert one row per post_id. - CREATE TABLE IF NOT EXISTS hivesense_app.post_data - ( - post_id INT PRIMARY KEY, - number_of_tokens INT NOT NULL, - last_vectors_block INT NOT NULL DEFAULT -1 - ); + -- Helpful index for “give me everything > after_seq” + CREATE INDEX IF NOT EXISTS posts_vectors_sync_seq_idx ON hivesense_app.posts_vectors(sync_seq); -- the current version of sqlfluff doesn't understand 'GRANT MAINTAIN' EXECUTE format( 'GRANT MAINTAIN ON ALL TABLES IN SCHEMA %s TO hived_group' , __schema_name ); @@ -121,7 +124,8 @@ INSERT INTO hivesense_app_status min_token_threshold, min_token_search_threshold, max_embeddings_per_post, - advisory_lock_namespace_begin + advisory_lock_namespace_begin, + max_visible_sync_seq ) VALUES ( @@ -144,7 +148,8 @@ VALUES current_setting('PG_TEMP.MIN_TOKEN_THRESHOLD', TRUE)::INT, current_setting('PG_TEMP.MIN_TOKEN_SEARCH_THRESHOLD', TRUE)::INT, NULLIF(current_setting('PG_TEMP.MAX_EMBEDINGS_PER_POST', TRUE)::INT, 0), - 10000 + 10000, + 0 ) ON CONFLICT (id) DO UPDATE SET @@ -161,27 +166,18 @@ CREATE TABLE IF NOT EXISTS hivesense_app.block_tasks ( shard INT, -- worker number (1…N), NULL if unclaimed first_block INT NOT NULL, last_block INT NOT NULL, + post_ids INT[] NOT NULL DEFAULT '{}', status TEXT NOT NULL DEFAULT 'pending', -- pending | running | done claimed_at TIMESTAMPTZ, finished_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS block_tasks_pending_idx ON hivesense_app.block_tasks (status, shard); --- Helpful index for “give me everything > after_seq” -CREATE INDEX IF NOT EXISTS posts_vectors_sync_seq_idx - ON hivesense_app.posts_vectors(sync_seq); - --- 2️⃣ Table that records logical deletions +-- Table that records logical deletions CREATE TABLE IF NOT EXISTS hivesense_app.deleted_embeddings ( - post_id INT NOT NULL, - sync_seq INT NOT NULL, - PRIMARY KEY (sync_seq, post_id) + post_id INT NOT NULL REFERENCES hivesense_app.post_data (post_id), + sync_seq INT NOT NULL UNIQUE, + PRIMARY KEY (post_id, sync_seq) ); -CREATE INDEX IF NOT EXISTS deleted_embeddings_sync_seq_idx - ON hivesense_app.deleted_embeddings(sync_seq); - -RESET ROLE; - - RESET ROLE; diff --git a/db/main_loop.sql b/db/main_loop.sql index 04ffdc6..a29bde8 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -35,11 +35,10 @@ BEGIN END; $BODY$; -CREATE OR REPLACE FUNCTION hivesense_block_range_data( - _first_block_num INT, - _last_block_num INT, - _logs BOOLEAN, - _worker INT +CREATE OR REPLACE FUNCTION generate_embeddings_for_posts( + _post_ids INT[], + _logs BOOLEAN, + _worker INT ) RETURNS INT LANGUAGE plpgsql @@ -47,100 +46,56 @@ VOLATILE PARALLEL SAFE AS $$ DECLARE - __start_ts timestamptz; - __end_ts timestamptz; - __number_of_posts INT := 0; -- counter for this run - __number_of_chunks INT := 0; -- counter for this run - __c INT; -- temp for ROW_COUNT - - __number_of_workers INT; - __tokenizer_name TEXT; - __max_tokens INT; - __min_new_ratio REAL; - __lang_model TEXT; - __doc_prefix TEXT; - __min_token_threshold INT; - __max_embeddings_per_post INT; - __advisory_lock_namespace_begin INT; - - __post_id_lock_namespace INT; - - -- for the FOR loop + __number_of_posts INT := 0; + __number_of_chunks INT := 0; + __c INT; + __tokenizer_name TEXT; + __max_tokens INT; + __min_new_ratio REAL; + __lang_model TEXT; + __doc_prefix TEXT; + __min_token_threshold INT; + __max_embeddings_per_post INT; rec RECORD; - __prev_block INT; - __sync_seq INT; -- seq that orders insert / delete ops - - __lock_start timestamptz; - __lock_end timestamptz; - __wait_interval interval; + __sync_seq INT; BEGIN - ASSERT _first_block_num <= _last_block_num, 'Invalid range of blocks'; - - SELECT parallel_workers, - tokenizer_model, + SELECT tokenizer_model, tokens_per_chunk, 1 - overlap_amount, sentence_language_model, document_prefix, min_token_threshold, - max_embeddings_per_post, - advisory_lock_namespace_begin - INTO __number_of_workers, - __tokenizer_name, + max_embeddings_per_post + INTO __tokenizer_name, __max_tokens, __min_new_ratio, __lang_model, __doc_prefix, __min_token_threshold, - __max_embeddings_per_post, - __advisory_lock_namespace_begin + __max_embeddings_per_post FROM hivesense_app.hivesense_app_status WHERE id = 1; - ASSERT __number_of_workers IS NOT NULL, 'NULL number of workers'; - ASSERT __number_of_workers > 0 , 'number of workers less than 1'; - - __post_id_lock_namespace := __advisory_lock_namespace_begin + 3; - - -- TODO(mickiewicz@syncad.com) when hivemind is not in a live stage then do not process - -- maybe it is not required because last_completed in enough ? - -- but last completed does not guaranteen index on block_num_created, but maybe this is an edge case - --IF hive.get_current_stage_name( 'hivemind_app' ) != 'live' THEN - -- RETURN NULL; - --END IF; - - IF _logs THEN - IF _first_block_num = _last_block_num THEN - RAISE NOTICE 'worker % is processing block %', _worker, _first_block_num; - ELSE - RAISE NOTICE 'worker % is processing block range: <%, %>', _worker, _first_block_num, _last_block_num; - END IF; - __start_ts := clock_timestamp(); - END IF; - - -- The preprocessing step is pretty expensive. Way cheaper than generating embeddings, of course, but - -- still more expensive than postgres thinks. If we're not really careful about our CTEs below, - -- postgresql will happily preprocess posts multiple times. Instead, we don't let it, explicitly - -- preprocessing into a temp table here - CREATE TEMP TABLE tmp_posts_to_vectorize ( - post_id INT PRIMARY KEY, - bodies TEXT[], - token_count INT, - block_num INT - ) ON COMMIT DROP; - - INSERT INTO tmp_posts_to_vectorize(post_id, bodies, token_count, block_num) + /* ===================================================================== + * 0️⃣ PRE-PROCESS EVERY POST ONCE + * --------------------------------- + * We call preprocess_post with _min_token_threshold = 0 so we + * *always* get (token_count, chunks[]). Later we decide whether + * the post is “big enough” (token_count ≥ __min_token_threshold). + * ====================================================================*/ + CREATE TEMP TABLE tmp_pre ON COMMIT DROP AS SELECT - hp.id, - pp.chunks, - pp.token_count, - hp.block_num - FROM hivemind_app.hive_posts AS hp - JOIN hivemind_app.hive_post_data AS hpd ON hpd.id = hp.id - CROSS JOIN LATERAL ( - SELECT * - FROM preprocess_post( - hpd.title || '.\n\n' || hpd.body, + hp.id AS post_id, + hp.block_num, + COALESCE(pp.token_count, 0) AS token_count, + COALESCE(pp.chunks, ARRAY[]::TEXT[]) AS chunks + FROM unnest(_post_ids) AS sel(id) + JOIN hivemind_app.hive_posts hp ON hp.id = sel.id + LEFT JOIN LATERAL preprocess_post( + /* body ---------------------------------------------------- */ + (SELECT hpd.title || '.\n\n' || hpd.body + FROM hivemind_app.hive_post_data hpd + WHERE hpd.id = hp.id), hp.id, '[permlink disabled]', __tokenizer_name, @@ -148,109 +103,140 @@ BEGIN __min_new_ratio, __lang_model, __max_embeddings_per_post, - TRUE, -- _truncate_long_sentences + TRUE, __doc_prefix, - __min_token_threshold - ) - ) AS pp(chunks, token_count) - WHERE (hp.root_id = hp.id OR hp.root_id = 0) - AND ( - hp.block_num_created BETWEEN _first_block_num AND _last_block_num - OR hp.block_num BETWEEN _first_block_num AND _last_block_num - ); - - -- ◉◉◉ PER-POST LOOP WITH SERIALIZATION ◉◉◉ + 0 -- ← return *even if very short* + ) AS pp + ON TRUE; + + /* ===================================================================== + * 1️⃣ CHUNKS FOR POSTS THAT ARE STILL “BIG ENOUGH” + * ====================================================================*/ + CREATE TEMP TABLE tmp_chunks ON COMMIT DROP AS + SELECT + post_id, + generate_subscripts(chunks, 1) - 1 AS chunk_number, + chunks[generate_subscripts(chunks, 1)] AS chunk_text, + token_count, + block_num + FROM tmp_pre + WHERE token_count >= __min_token_threshold + AND array_length(chunks, 1) IS NOT NULL; + + /* ===================================================================== + * 2️⃣ EMBED THOSE CHUNKS + * ====================================================================*/ + CREATE TEMP TABLE tmp_vectors ON COMMIT DROP AS + WITH all_chunks AS ( + SELECT ARRAY_AGG( + (post_id, chunk_text, chunk_number) + ::hivesense_app.id_and_post_chunk + ORDER BY post_id, chunk_number + ) AS arr + FROM tmp_chunks + ) + SELECT + pv.post_id, + pv.chunk_number, + pv.vec + FROM all_chunks + CROSS JOIN LATERAL UNNEST(hivesense_app.hivesense_embed(all_chunks.arr)) AS pv; + + /* ===================================================================== + * 3️⃣ POSTS THAT NOW PRODUCE ZERO CHUNKS + * (true deletions / under-threshold edits) + * ====================================================================*/ + CREATE TEMP TABLE tmp_empty_posts ON COMMIT DROP AS + SELECT post_id, + block_num, + token_count + FROM tmp_pre + WHERE token_count < __min_token_threshold + OR array_length(chunks,1) IS NULL; + + /* nothing to do if no deletions */ + IF EXISTS (SELECT 1 FROM tmp_empty_posts) THEN + + /* ➤ fresh sync_seq for every empty post */ + CREATE TEMP TABLE tmp_empty_seq ON COMMIT DROP AS + SELECT post_id, + nextval('hivesense_app.sync_seq') AS sync_seq + FROM tmp_empty_posts; + + /* ➤ delete lingering vectors */ + DELETE FROM hivesense_app.posts_vectors pv + USING tmp_empty_posts ep + WHERE pv.post_id = ep.post_id; + + /* ➤ record the logical deletion */ + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + SELECT post_id, sync_seq + FROM tmp_empty_seq; + + /* ➤ bring post_data up to date */ + INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) + SELECT post_id, token_count, block_num + FROM tmp_empty_posts + ON CONFLICT (post_id) DO UPDATE + SET number_of_tokens = EXCLUDED.number_of_tokens, + last_vectors_block = EXCLUDED.last_vectors_block; + END IF; + + /* ===================================================================== + * (the original code starting at + * “-- 3) per-post sync_seq, delete & insert” continues unmodified) + * ====================================================================*/ + + + -- 3) per-post sync_seq, delete & insert FOR rec IN - SELECT post_id, bodies, token_count, block_num - FROM tmp_posts_to_vectorize - ORDER BY post_id + SELECT DISTINCT post_id, block_num, + MAX(token_count) AS tcnt + FROM tmp_chunks + GROUP BY post_id, block_num LOOP - -- grab an transaction‐scoped advisory lock in our own namespace to ensure no other workers - -- are working on this same post while we are - -- Try to grab the lock immediately - IF NOT pg_try_advisory_xact_lock(__post_id_lock_namespace, rec.post_id) THEN - -- If it failed, we know there’s contention: measure how long it takes to acquire - __lock_start := clock_timestamp(); - PERFORM pg_advisory_xact_lock(__post_id_lock_namespace, rec.post_id); - __lock_end := clock_timestamp(); - - -- Compute and log any non-zero wait - __wait_interval := __lock_end - __lock_start; - IF __wait_interval > '0s' THEN - RAISE NOTICE - 'worker % was blocked waiting to work on post % for %s seconds', - _worker, - rec.post_id, - to_char(EXTRACT(EPOCH FROM __wait_interval), 'FM999999.000'); - END IF; - END IF; - - SELECT last_vectors_block - INTO __prev_block - FROM hivesense_app.post_data - WHERE post_id = rec.post_id; - - IF rec.block_num > COALESCE(__prev_block, -1) THEN - -- Reserve a sequence value that will identify this logical operation SELECT nextval('hivesense_app.sync_seq') INTO __sync_seq; - __number_of_posts := __number_of_posts + 1; - - -- Was the post previously embedded? If so, log a delete operation IF EXISTS ( - SELECT 1 FROM hivesense_app.posts_vectors - WHERE post_id = rec.post_id + SELECT 1 FROM hivesense_app.posts_vectors + WHERE post_id = rec.post_id ) THEN DELETE FROM hivesense_app.posts_vectors WHERE post_id = rec.post_id; - INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) VALUES (rec.post_id, __sync_seq); END IF; - -- generate & insert new embeddings - INSERT INTO hivesense_app.posts_vectors(post_id, chunk_number, embedding, sync_seq) + INSERT INTO hivesense_app.posts_vectors( + post_id, chunk_number, embedding, sync_seq + ) SELECT - (pv).post_id, - (pv).chunk_number, - CASE - WHEN hivesense_app.store_halfvec_embeddings() - THEN (pv).vec::public.halfvec - ELSE (pv).vec + post_id, + chunk_number, + CASE WHEN hivesense_app.store_halfvec_embeddings() + THEN vec::public.halfvec + ELSE vec END, __sync_seq - FROM ( - SELECT UNNEST( - hivesense_app.hivesense_embed( - ARRAY( - SELECT (rec.post_id, rec.bodies[idx], idx-1) - ::hivesense_app.id_and_post_chunk - FROM generate_subscripts(rec.bodies,1) AS idx - ) - ) - ) AS pv - ) AS sub; - - -- count how many chunks we just wrote + FROM tmp_vectors + WHERE post_id = rec.post_id + ORDER BY chunk_number; + GET DIAGNOSTICS __c = ROW_COUNT; __number_of_chunks := __number_of_chunks + __c; - -- bump metadata to block_num UPDATE hivesense_app.post_data - SET number_of_tokens = COALESCE(rec.token_count, 0), + SET number_of_tokens = rec.tcnt, last_vectors_block = rec.block_num WHERE post_id = rec.post_id; - END IF; - END LOOP; - DROP TABLE tmp_posts_to_vectorize; + __number_of_posts := __number_of_posts + 1; + END LOOP; - RETURN COALESCE(__number_of_posts, 0); + RETURN __number_of_posts; END; $$; - - CREATE OR REPLACE PROCEDURE hivesense_massive_processing( IN _from INT, IN _to INT, IN _logs BOOLEAN, IN _worker INT, OUT _done INT ) @@ -383,6 +369,7 @@ DECLARE __done_key_namespace INT; __ack_key_namespace INT; __shard INT; + __posts_per_chunk INT; BEGIN -- by default, postgresql logs when threads are blocked on a lock for more than a second. -- we use locks for synchronization, and expect threads to be blocked for at least 3s @@ -403,12 +390,6 @@ BEGIN -------------------------------------------------------------------------------- -- **At initialization: acquire every start_key_i** so that workers block. - -- - -- We'll define: - -- start_key_i := 10_000_000 + i - -- done_key_i := 20_000_000 + i - -- ack_key_i := 30_000_000 + i - -- -- Here, we grab start_key_i and ack_key_i. done_key_i is left unlocked, -- so that as soon as a worker tries to lock it at loop-top, it succeeds. @@ -458,10 +439,7 @@ BEGIN -- RAISE NOTICE 'App_next_iteration returned'; -- check global break conditions - __breaking_reason := isbreakingpending( - __context_name, - _max_block_limit, - __blocks_range); + __breaking_reason := isbreakingpending(__context_name, _max_block_limit, __blocks_range); IF __breaking_reason IS NOT NULL THEN ROLLBACK; IF __breaking_reason = 'BLOCK_LIMIT_REACHED' @@ -478,7 +456,7 @@ BEGIN CONTINUE; END IF; - -- ── NEW: wait until hivemind has processed through this range ─────────────── + -- wait until hivemind has processed through this range LOOP SELECT hive.app_get_current_block_num('hivemind_app') INTO __hivemind_current_block; @@ -487,74 +465,68 @@ BEGIN __blocks_range.last_block, __hivemind_current_block; PERFORM pg_sleep(1); + __breaking_reason := isbreakingpending(__context_name, _max_block_limit, NULL); + IF __breaking_reason IS NOT NULL THEN + RETURN; + END IF; END LOOP; - ------------------------------------------------------------------- - -- Split the range into contiguous slices and enqueue one per shard - ------------------------------------------------------------------- - __blocks := __blocks_range.last_block - __blocks_range.first_block + 1; - __chunks_per_worker := 50; - __number_of_chunks := _workers * __chunks_per_worker; - __blocks_per_chunk := GREATEST(1, CEILING(__blocks / (_workers * __chunks_per_worker))); - __extra := __blocks % __number_of_chunks; -- first ⟂extra⟂ chunks get +1 - + /* ------------------------------------------------------------------ + * Build ordered list of posts in this range + * ----------------------------------------------------------------*/ __batch_id := nextval('hivesense_app.batch_seq'); - __from_block := __blocks_range.first_block; - - IF __blocks_range.last_block <> __blocks_range.first_block THEN - RAISE NOTICE 'Splitting range % to %', __blocks_range.first_block, __blocks_range.last_block; - END IF; - - WHILE __from_block <= __blocks_range.last_block LOOP - -- RAISE NOTICE 'SCHEDULER: computing work for'; - __to_block := __from_block + __blocks_per_chunk - 1; - IF __extra > 0 THEN - __to_block := __to_block + 1; - __extra := __extra - 1; - END IF; - - IF __to_block > __blocks_range.last_block THEN - __to_block := __blocks_range.last_block; - END IF; - - -- only insert if this worker actually has work - IF __to_block >= __from_block THEN - -- RAISE NOTICE 'SCHEDULER: enqueuing work [%, %]', __from_block, __to_block; - INSERT INTO hivesense_app.block_tasks( - batch_id, - shard, - first_block, - last_block - ) - VALUES ( - __batch_id, - NULL, -- unclaimed - __from_block, - __to_block - ); - END IF; - - __from_block := __to_block + 1; - END LOOP; - -------------------------------------------------------------------------------- - -- bulk-upsert post_data for every post in this batch - -------------------------------------------------------------------------------- - WITH posts_to_seed AS ( - SELECT DISTINCT hp.id AS post_id - FROM hivemind_app.hive_posts hp - JOIN hivemind_app.hive_post_data hpd ON hpd.id = hp.id - WHERE (hp.root_id = hp.id OR hp.root_id = 0) - AND ( - hp.block_num_created BETWEEN __blocks_range.first_block AND __blocks_range.last_block - OR hp.block_num BETWEEN __blocks_range.first_block AND __blocks_range.last_block - ) + WITH posts AS ( + SELECT hp.id AS post_id, + GREATEST(hp.block_num_created, hp.block_num) AS blk + FROM hivemind_app.hive_posts hp + JOIN hivemind_app.hive_post_data hpd USING(id) + WHERE (hp.root_id = hp.id OR hp.root_id = 0) + AND ( + hp.block_num_created BETWEEN __blocks_range.first_block AND __blocks_range.last_block + OR hp.block_num BETWEEN __blocks_range.first_block AND __blocks_range.last_block + ) + ), + numbered AS ( + SELECT post_id, + blk, + ROW_NUMBER() OVER (ORDER BY blk) AS rn, + COUNT(*) OVER () AS total_posts + FROM posts + ), + chunked AS ( + SELECT post_id, + blk, + ((rn - 1) / CEILING(total_posts::NUMERIC / + (_workers * 50)))::INT AS chunk_idx + FROM numbered + ), + grouped AS ( + SELECT chunk_idx, + ARRAY_AGG(post_id ORDER BY post_id) AS pids, + MIN(blk) AS first_blk, + MAX(blk) AS last_blk + FROM chunked + GROUP BY chunk_idx + ) + INSERT INTO hivesense_app.block_tasks( + batch_id, shard, post_ids, first_block, last_block ) + SELECT + __batch_id, + NULL, + pids, + first_blk, + last_blk + FROM grouped; + + /* bulk-upsert post_data for this batch (unchanged but re-uses grouped) */ INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) - SELECT post_id, 0, -1 - FROM posts_to_seed + SELECT UNNEST(post_ids), 0, -1 + FROM hivesense_app.block_tasks + WHERE batch_id = __batch_id ON CONFLICT (post_id) DO NOTHING; - COMMIT; -- we have to commit here so the workers can pick up the tasks + COMMIT; -- let workers see tasks -------------------------------------------------------------------------------- -- WAKE ALL WORKERS by dropping each start_key_i @@ -619,6 +591,14 @@ BEGIN -- clean up the tasks table, the tasks are all done, we don't need to keep that info around forever TRUNCATE hivesense_app.block_tasks; + -- mark all new sync_seq as visible only after batch completion + UPDATE hivesense_app.hivesense_app_status + SET max_visible_sync_seq = GREATEST( + COALESCE((SELECT MAX(sync_seq) FROM hivesense_app.posts_vectors),0), + COALESCE((SELECT MAX(sync_seq) FROM hivesense_app.deleted_embeddings),0) + ) + WHERE id = 1; + ------------------------------------------------------------------- -- After the batch, create indexes when appropriate ------------------------------------------------------------------- @@ -637,7 +617,6 @@ BEGIN END $$; - CREATE OR REPLACE PROCEDURE hivesense_app.worker_loop( IN _worker INT, IN _app_context_name hive.context_name, @@ -694,11 +673,12 @@ BEGIN ------------------------------------------------------------------ -- RAISE NOTICE 'getting new task for worker %', _worker; SELECT task_id, + post_ids, first_block, last_block - INTO __task - FROM hivesense_app.block_tasks - WHERE status = 'pending' + INTO __task + FROM hivesense_app.block_tasks + WHERE status = 'pending' FOR UPDATE SKIP LOCKED LIMIT 1; @@ -726,26 +706,21 @@ BEGIN -- Execute the heavy work for this block sub-range ------------------------------------------------------------------ -- RAISE NOTICE 'worker % processing block range % to %', _worker, __task.first_block, __task.last_block; - SELECT hivesense_block_range_data( - __task.first_block, - __task.last_block, - TRUE, -- logs - _worker -- keep original param order - ) - INTO __done_posts; - IF __task.last_block = __task.first_block THEN - RAISE NOTICE 'worker % processed block % containing % posts', _worker, __task.first_block, __done_posts; + RAISE NOTICE 'worker % processing block % (% posts)', + _worker, + __task.first_block, + array_length(__task.post_ids,1); ELSE - RAISE NOTICE 'worker % processed block range % to % (% blocks) containing % posts', _worker, __task.first_block, __task.last_block, __task.last_block - __task.first_block + 1, __done_posts; + RAISE NOTICE 'worker % processing blocks % to % (% blocks, % posts)', + LPAD(_worker::text, 2), + __task.first_block, + __task.last_block, + __task.last_block - __task.first_block + 1, + array_length(__task.post_ids,1); END IF; - -- If Hivemind isn’t caught up yet → rollback & wait0 - IF __done_posts IS NULL THEN - ROLLBACK; - PERFORM pg_sleep(5); - CONTINUE; - END IF; + SELECT generate_embeddings_for_posts(__task.post_ids, TRUE, _worker) INTO __done_posts; ------------------------------------------------------------------ -- Mark task finished diff --git a/endpoints/embedding_updates.sql b/endpoints/embedding_updates.sql new file mode 100644 index 0000000..4a10d99 --- /dev/null +++ b/endpoints/embedding_updates.sql @@ -0,0 +1,118 @@ +SET ROLE hivesense_owner; + +/** openapi:paths +/embedding-updates: + get: + tags: + - AI + summary: Stream post-level embedding operations since a given sequence number + operationId: hivesense_endpoints.embedding_updates + parameters: + - in: query + name: after_seq + required: true + schema: { type: integer } + description: | + Clients pass the highest sync_seq they have applied. + The server returns every operation with sync_seq > after_seq. + - in: query + name: page_size + required: true + schema: { type: integer } + description: Maximum number of operations to return. + responses: + '200': + description: JSON array of operations, ordered by sync_seq. + content: + application/json: + schema: { type: string, x-sql-datatype: JSONB } +*/ +-- openapi-generated-code-begin +DROP FUNCTION IF EXISTS hivesense_endpoints.embedding_updates; +CREATE OR REPLACE FUNCTION hivesense_endpoints.embedding_updates( + "after_seq" INT, + "page_size" INT +) +RETURNS JSONB +-- openapi-generated-code-end +LANGUAGE plpgsql STABLE +AS +$$ +DECLARE + __result JSONB; + __max_visible INT; +BEGIN + -- only expose completed sync_seq up to this threshold + SELECT max_visible_sync_seq + INTO __max_visible + FROM hivesense_app.hivesense_app_status + WHERE id = 1; + + WITH joined_ops AS ( + SELECT + COALESCE(pv.sync_seq, de.sync_seq) AS sync_seq, + COALESCE(pv.post_id, de.post_id) AS post_id, + CASE + WHEN pv.sync_seq IS NOT NULL AND de.sync_seq IS NOT NULL THEN 'update' + WHEN pv.sync_seq IS NOT NULL THEN 'insert' + ELSE 'delete' + END AS op + FROM ( + SELECT DISTINCT sync_seq, post_id + FROM hivesense_app.posts_vectors + WHERE sync_seq > after_seq + AND sync_seq <= __max_visible + ) pv + FULL OUTER JOIN ( + SELECT sync_seq, post_id + FROM hivesense_app.deleted_embeddings + WHERE sync_seq > after_seq + AND sync_seq <= __max_visible + ) de USING (sync_seq, post_id) + ), + ops_with_embeddings AS ( + SELECT + jo.sync_seq, + jo.op, + ha.name AS author, + hpd.permlink AS permlink, + pd.number_of_tokens, + pd.last_vectors_block, + CASE + WHEN jo.op IN ('insert','update') THEN ( + -- array of embeddings, ordered by chunk_number + SELECT jsonb_agg(to_jsonb(pv2.embedding::real[]) ORDER BY pv2.chunk_number) + FROM hivesense_app.posts_vectors pv2 + WHERE pv2.sync_seq = jo.sync_seq + AND pv2.post_id = jo.post_id + ) + ELSE '[]'::JSONB + END AS embeddings + FROM joined_ops jo + JOIN hivemind_app.hive_posts hp ON hp.id = jo.post_id + JOIN hivemind_app.hive_accounts ha ON ha.id = hp.author_id + JOIN hivemind_app.hive_permlink_data hpd ON hpd.id = hp.permlink_id + JOIN hivesense_app.post_data pd ON pd.post_id = jo.post_id + ORDER BY jo.sync_seq + LIMIT "page_size" + ) + + SELECT jsonb_agg( + jsonb_build_object( + 'sync_seq', sync_seq, + 'op', op, + 'author', author, + 'permlink', permlink, + 'number_of_tokens', number_of_tokens, + 'last_vectors_block', last_vectors_block, + 'embeddings', embeddings + ) + ) + INTO __result + FROM ops_with_embeddings; + + RETURN COALESCE(__result, '[]'::JSONB); +END +$$; + +RESET ROLE; diff --git a/endpoints/endpoint_schema.sql b/endpoints/endpoint_schema.sql index 3c017ad..2c0a4e8 100644 --- a/endpoints/endpoint_schema.sql +++ b/endpoints/endpoint_schema.sql @@ -299,6 +299,48 @@ DO $__$ } } } + }, + "/embedding-updates": { + "get": { + "tags": [ + "AI" + ], + "summary": "Stream post-level embedding operations since a given sequence number", + "operationId": "hivesense_endpoints.embedding_updates", + "parameters": [ + { + "in": "query", + "name": "after_seq", + "required": true, + "schema": { + "type": "integer" + }, + "description": "Clients pass the highest sync_seq they have applied. \nThe server returns every operation with sync_seq > after_seq.\n" + }, + { + "in": "query", + "name": "page_size", + "required": true, + "schema": { + "type": "integer" + }, + "description": "Maximum number of operations to return." + } + ], + "responses": { + "200": { + "description": "JSON array of operations, ordered by sync_seq.", + "content": { + "application/json": { + "schema": { + "type": "string", + "x-sql-datatype": "JSONB" + } + } + } + } + } + } } } } diff --git a/rewrite_rules.conf b/rewrite_rules.conf index aafe6ce..05c621c 100644 --- a/rewrite_rules.conf +++ b/rewrite_rules.conf @@ -1,12 +1,15 @@ +rewrite ^/embedding-updates /rpc/embedding_updates break; +# endpoint for get /embedding-updates + +rewrite ^/thematiccontributors /rpc/find_thematic_contributors break; +# endpoint for get /thematiccontributors + rewrite ^/similarpostsbypost /rpc/get_similar_posts_by_post break; # endpoint for get /similarpostsbypost rewrite ^/similarposts /rpc/get_similar_posts break; # endpoint for get /similarposts -rewrite ^/thematiccontributors /rpc/find_thematic_contributors break; -# endpoint for get /similarposts - rewrite ^/$ / break; # endpoint for openapi spec itself diff --git a/scripts/install_app.sh b/scripts/install_app.sh index a8a07ae..1fc2815 100755 --- a/scripts/install_app.sh +++ b/scripts/install_app.sh @@ -212,6 +212,7 @@ psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET custom.swagger_url = '$SWAGG psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/find_similar_posts.sql" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/get_similar_posts_by_post.sql" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/find_thematic_contributors.sql" +psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/embedding_updates.sql" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET ROLE hivesense_owner;GRANT USAGE ON SCHEMA ${HIVESENSE_SCHEMA} to hivesense_user;" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET ROLE hivesense_owner;GRANT SELECT ON ALL TABLES IN SCHEMA ${HIVESENSE_SCHEMA} TO hivesense_user;" diff --git a/scripts/openapi_rewrite.sh b/scripts/openapi_rewrite.sh index c8aade4..21bfe28 100755 --- a/scripts/openapi_rewrite.sh +++ b/scripts/openapi_rewrite.sh @@ -18,7 +18,8 @@ ENDPOINTS_IN_ORDER=" ../$endpoints/endpoint_schema.sql ../$endpoints/find_similar_posts.sql ../$endpoints/get_similar_posts_by_post.sql -../$endpoints/find_thematic_contributors.sql" +../$endpoints/find_thematic_contributors.sql +../$endpoints/embedding_updates.sql" # Function to reverse the lines reverse_lines() { diff --git a/scripts/sync_embeddings.py b/scripts/sync_embeddings.py index 64eb34f..201e18a 100755 --- a/scripts/sync_embeddings.py +++ b/scripts/sync_embeddings.py @@ -3,11 +3,16 @@ sync_embeddings.py – pull /embedding-updates and mirror them locally """ -import os, time, json, logging, requests, psycopg2 +import os +import time +import logging +import requests +import psycopg2 from psycopg2.extras import execute_values +from datetime import datetime, timezone -API_URL = os.environ["HS_EMBED_API"] # e.g. https://upstream/embedding-updates -DB_DSN = os.environ["LOCAL_PG_DSN"] # postgres://… (same schema names) +API_URL = os.environ.get("HIVESENSE_EMBED_API") # e.g. https://upstream/embedding-updates +DB_DSN = os.environ.get("POSTGRES_URI") # postgres://… (same schema names) BATCH = 1000 RETRY_SLEEP = 3 @@ -25,7 +30,8 @@ def get_last_seq(cur) -> int: UNION ALL SELECT MAX(sync_seq) FROM hivesense_app.deleted_embeddings ) AS t; - """) + """ + ) return cur.fetchone()[0] or 0 @@ -47,17 +53,22 @@ def resolve_post_id(cur, author, permlink): return row[0] if row else None -def upsert_vectors(cur, post_id, sync_seq, rows): - payload = [ - (sync_seq, post_id, r["chunk_number"], r["embedding"]) - for r in rows +def upsert_vectors(cur, post_id, sync_seq, embeddings): + """ + embeddings: list of numeric lists, in chunk order. + """ + rows = [ + (sync_seq, post_id, idx, emb) + for idx, emb in enumerate(embeddings) ] execute_values( cur, - """INSERT INTO hivesense_app.posts_vectors - (sync_seq, post_id, chunk_number, embedding) - VALUES %s""", - payload + """ + INSERT INTO hivesense_app.posts_vectors + (sync_seq, post_id, chunk_number, embedding) + VALUES %s + """, + rows ) @@ -73,25 +84,38 @@ def apply_op(conn, op): ) conn.rollback() time.sleep(RETRY_SLEEP) - continue # try again - # ---- we have a post_id ---- + continue + if op["op"] == "delete": cur.execute( "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", (post_id,) ) cur.execute( - """INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) - VALUES (%s, %s) - ON CONFLICT (sync_seq, post_id) DO NOTHING""", + """ + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + VALUES (%s, %s) + ON CONFLICT (sync_seq, post_id) DO NOTHING + """, (post_id, op["sync_seq"]) ) - else: # insert | update + else: # insert | update cur.execute( "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", (post_id,) ) - upsert_vectors(cur, post_id, op["sync_seq"], op["embedding_rows"]) + upsert_vectors(cur, post_id, op["sync_seq"], op["embeddings"]) + cur.execute( + """ + INSERT INTO hivesense_app.post_data + (post_id, number_of_tokens, last_vectors_block) + VALUES (%s, %s, %s) + ON CONFLICT (post_id) DO UPDATE + SET number_of_tokens = EXCLUDED.number_of_tokens, + last_vectors_block = EXCLUDED.last_vectors_block + """, + (post_id, op["number_of_tokens"], op["last_vectors_block"]) + ) return # success – caller will COMMIT @@ -99,21 +123,68 @@ def main(): conn = psycopg2.connect(DB_DSN, autocommit=False) while True: + # determine how far we've synced with conn.cursor() as cur: after_seq = get_last_seq(cur) - params = {"after_seq": after_seq, "limit": BATCH} - r = requests.get(API_URL, params=params, timeout=60) - r.raise_for_status() - ops = r.json() + + response = requests.get( + API_URL, + params={"after_seq": after_seq, "limit": BATCH}, + timeout=60 + ) + response.raise_for_status() + ops = response.json() if not ops: time.sleep(5) continue + # Track highest block seen + block_nums = [op.get('last_vectors_block') for op in ops] + max_block = max(block_nums) if block_nums else None + + # apply each operation for op in ops: - logging.info("Applying %s %s/%s (seq %s)", - op["op"], op["author"], op["permlink"], op["sync_seq"]) + logging.info( + "Applying %s %s/%s (seq %s, block %s)", + op["op"], op["author"], op["permlink"], + op["sync_seq"], op.get("last_vectors_block") + ) apply_op(conn, op) - conn.commit() # durable after every logical op + conn.commit() + + # Advance local sequence & visibility + max_seq = max(op["sync_seq"] for op in ops) + with conn.cursor() as cur: + cur.execute( + "SELECT setval('hivesense_app.sync_seq', %s, true)", + (max_seq,) + ) + cur.execute( + "UPDATE hivesense_app.hivesense_app_status + SET max_visible_sync_seq = %s + WHERE id = 1", + (max_seq,) + ) + conn.commit() + + # If we're caught up, create indexes + if max_block is not None: + with conn.cursor() as cur: + cur.execute( + "SELECT created_at FROM hafd.blocks WHERE num = %s", + (max_block,) + ) + row = cur.fetchone() + if row: + created_at = row[0].replace(tzinfo=timezone.utc) + age = datetime.now(timezone.utc) - created_at + if age.total_seconds() <= 60: + logging.info( + "Latest block %s is recent (%.1fs old); creating indexes", + max_block, age.total_seconds() + ) + cur.execute("CALL ensure_indexes_are_created()") + conn.commit() if __name__ == "__main__": main() -- GitLab From f181c6b5149cd26dbd374dc9c0a5429af248bc9a Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 25 Jun 2025 16:31:56 +0000 Subject: [PATCH 13/15] Change waiting in sync script to wait on hivemind by block number instead of by individual post permlink to avoid deadlock, rename rewiter to postgrest-rewriter --- readme.md | 2 +- scripts/build_images.sh | 4 +- scripts/ci-helpers/compose_variables.sh | 2 +- scripts/sync_embeddings.py | 170 ++++++++++++------------ 4 files changed, 88 insertions(+), 90 deletions(-) diff --git a/readme.md b/readme.md index 3208d7d..a434741 100644 --- a/readme.md +++ b/readme.md @@ -298,7 +298,7 @@ add hivesense to `COMPOSE_PROFILES` variable in th `.env` file. To customize set ```bash registry.gitlab.syncad.com/hive/hivesense:<8 digit git sha> - registry.gitlab.syncad.com/hive/hivesense/rewiter:<8 digit git sha> + registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter:<8 digit git sha> ``` With using switch '--push' new images will also be pushed to registry diff --git a/scripts/build_images.sh b/scripts/build_images.sh index 65a1b1c..7981024 100755 --- a/scripts/build_images.sh +++ b/scripts/build_images.sh @@ -51,14 +51,14 @@ set -eu pipefail SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" docker build -t "registry.gitlab.syncad.com/hive/hivesense:${TAG}" "${SCRIPTPATH}/.." -docker build -t "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewiter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." +docker build -t "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." docker build -t "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.syncer" "${SCRIPTPATH}/.." echo "Build images tag ${TAG}" if [ -n "${PUSH:-}" ]; then docker push "registry.gitlab.syncad.com/hive/hivesense:${TAG}" - docker push "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewiter:${TAG}" + docker push "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter:${TAG}" docker push "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" echo "Pushed images tag ${TAG}" fi diff --git a/scripts/ci-helpers/compose_variables.sh b/scripts/ci-helpers/compose_variables.sh index e255961..9c91fef 100644 --- a/scripts/ci-helpers/compose_variables.sh +++ b/scripts/ci-helpers/compose_variables.sh @@ -39,7 +39,7 @@ HIVEMIND_SYNC_ARGS="--test-max-block=${NUMBER_OF_BLOCKS_TO_SYNC}" # Hivesense HIVESENSE_IMAGE="registry.gitlab.syncad.com/hive/hivesense" HIVESENSE_VERSION="$HIVESENSE_TAG" -HIVESENSE_REWRITER_IMAGE="registry.gitlab.syncad.com/hive/hivesense/rewiter" +HIVESENSE_REWRITER_IMAGE="registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter" HIVESENSE_SYNC_ARGS="--stop-at-block=${NUMBER_OF_BLOCKS_TO_SYNC}" diff --git a/scripts/sync_embeddings.py b/scripts/sync_embeddings.py index 201e18a..896ee49 100755 --- a/scripts/sync_embeddings.py +++ b/scripts/sync_embeddings.py @@ -1,8 +1,3 @@ -#! /usr/bin/env python3 -""" -sync_embeddings.py – pull /embedding-updates and mirror them locally -""" - import os import time import logging @@ -30,8 +25,7 @@ def get_last_seq(cur) -> int: UNION ALL SELECT MAX(sync_seq) FROM hivesense_app.deleted_embeddings ) AS t; - """ - ) + """) return cur.fetchone()[0] or 0 @@ -53,74 +47,55 @@ def resolve_post_id(cur, author, permlink): return row[0] if row else None -def upsert_vectors(cur, post_id, sync_seq, embeddings): +def apply_op(cur, op, post_id): """ - embeddings: list of numeric lists, in chunk order. + Apply a single operation; assumes post_id already resolved. + Uses a savepoint so individual ops can roll back without aborting the whole batch. """ - rows = [ - (sync_seq, post_id, idx, emb) - for idx, emb in enumerate(embeddings) - ] - execute_values( - cur, - """ - INSERT INTO hivesense_app.posts_vectors - (sync_seq, post_id, chunk_number, embedding) - VALUES %s - """, - rows - ) - - -def apply_op(conn, op): - """Apply a single operation; block-retry until the post_id is resolvable.""" - while True: - with conn.cursor() as cur: - post_id = resolve_post_id(cur, op["author"], op["permlink"]) - if post_id is None: - logging.info( - "Post %s/%s not yet present locally – retry in %ds", - op["author"], op["permlink"], RETRY_SLEEP - ) - conn.rollback() - time.sleep(RETRY_SLEEP) - continue - - if op["op"] == "delete": - cur.execute( - "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", - (post_id,) - ) - cur.execute( - """ - INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) - VALUES (%s, %s) - ON CONFLICT (sync_seq, post_id) DO NOTHING - """, - (post_id, op["sync_seq"]) - ) - else: # insert | update - cur.execute( - "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", - (post_id,) - ) - upsert_vectors(cur, post_id, op["sync_seq"], op["embeddings"]) - cur.execute( - """ - INSERT INTO hivesense_app.post_data - (post_id, number_of_tokens, last_vectors_block) - VALUES (%s, %s, %s) - ON CONFLICT (post_id) DO UPDATE - SET number_of_tokens = EXCLUDED.number_of_tokens, - last_vectors_block = EXCLUDED.last_vectors_block - """, - (post_id, op["number_of_tokens"], op["last_vectors_block"]) - ) - return # success – caller will COMMIT + cur.execute("SAVEPOINT op_sp") + try: + if op["op"] == "delete": + cur.execute( + "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", + (post_id,) + ) + cur.execute( + """ + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + VALUES (%s, %s) + ON CONFLICT (sync_seq, post_id) DO NOTHING + """, + (post_id, op["sync_seq"]) + ) + else: + cur.execute( + "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", + (post_id,) + ) + upsert_vectors(cur, post_id, op["sync_seq"], op["embeddings"]) + cur.execute( + """ + INSERT INTO hivesense_app.post_data + (post_id, number_of_tokens, last_vectors_block) + VALUES (%s, %s, %s) + ON CONFLICT (post_id) DO UPDATE + SET number_of_tokens = EXCLUDED.number_of_tokens, + last_vectors_block = EXCLUDED.last_vectors_block + """, + (post_id, op["number_of_tokens"], op["last_vectors_block"]) + ) + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT op_sp") + raise + finally: + cur.execute("RELEASE SAVEPOINT op_sp") def main(): conn = psycopg2.connect(DB_DSN, autocommit=False) + # Fast-fail on deadlocks + with conn.cursor() as cur: + cur.execute("SET deadlock_timeout = '1s'") while True: # determine how far we've synced @@ -129,7 +104,7 @@ def main(): response = requests.get( API_URL, - params={"after_seq": after_seq, "limit": BATCH}, + params={"after_seq": after_seq, "page_size": BATCH}, timeout=60 ) response.raise_for_status() @@ -138,23 +113,45 @@ def main(): time.sleep(5) continue - # Track highest block seen - block_nums = [op.get('last_vectors_block') for op in ops] + # wait until hivemind has caught up to the highest block in this batch + block_nums = [op.get('last_vectors_block') for op in ops if op.get('last_vectors_block') is not None] max_block = max(block_nums) if block_nums else None + if max_block is not None: + while True: + with conn.cursor() as cur: + cur.execute("SELECT hive.app_get_current_block_num('hivemind_app')") + head = cur.fetchone()[0] + if head >= max_block: + break + logging.info( + "Waiting for hivemind head block %s (currently at %s)", + max_block, head + ) + time.sleep(1) - # apply each operation - for op in ops: - logging.info( - "Applying %s %s/%s (seq %s, block %s)", - op["op"], op["author"], op["permlink"], - op["sync_seq"], op.get("last_vectors_block") - ) - apply_op(conn, op) - conn.commit() - - # Advance local sequence & visibility - max_seq = max(op["sync_seq"] for op in ops) + # resolve all post_ids (should now exist) + resolved = [] # list of tuples (op, post_id) + with conn.cursor() as cur: + for op in ops: + post_id = resolve_post_id(cur, op["author"], op["permlink"]) + if post_id is None: + raise RuntimeError( + f"Post {op['author']}/{op['permlink']} not found after block {max_block}" + ) + resolved.append((op, post_id)) + + # apply each operation in one batch transaction with conn.cursor() as cur: + for op, post_id in resolved: + logging.info( + "Applying %s %s/%s (seq %s, block %s)", + op["op"], op["author"], op["permlink"], + op["sync_seq"], op.get("last_vectors_block") + ) + apply_op(cur, op, post_id) + + # advance local sequence & visibility + max_seq = max(op["sync_seq"] for op, _ in resolved) cur.execute( "SELECT setval('hivesense_app.sync_seq', %s, true)", (max_seq,) @@ -167,7 +164,7 @@ def main(): ) conn.commit() - # If we're caught up, create indexes + # Optionally create indexes if caught up if max_block is not None: with conn.cursor() as cur: cur.execute( @@ -183,8 +180,9 @@ def main(): "Latest block %s is recent (%.1fs old); creating indexes", max_block, age.total_seconds() ) - cur.execute("CALL ensure_indexes_are_created()") + cur.execute("CALL hivesense_app.ensure_indexes_are_created()") conn.commit() + if __name__ == "__main__": main() -- GitLab From c04c12bd103125bfc6992dfd9479d18fc007c57a Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Wed, 25 Jun 2025 23:35:47 +0000 Subject: [PATCH 14/15] Explicit table lock to prevent deadlock with hivemind --- db/main_loop.sql | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/db/main_loop.sql b/db/main_loop.sql index a29bde8..92b3c6f 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -519,11 +519,17 @@ BEGIN last_blk FROM grouped; + LOCK TABLE hivemind_app.hive_posts IN SHARE MODE; + /* bulk-upsert post_data for this batch (unchanged but re-uses grouped) */ - INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) - SELECT UNNEST(post_ids), 0, -1 - FROM hivesense_app.block_tasks - WHERE batch_id = __batch_id + INSERT INTO hivesense_app.post_data (post_id, number_of_tokens, last_vectors_block) + SELECT id, 0, -1 + FROM ( + SELECT DISTINCT UNNEST(post_ids) AS id + FROM hivesense_app.block_tasks + WHERE batch_id = __batch_id + ) AS src + ORDER BY id -- ☚ guarantees lock order ON CONFLICT (post_id) DO NOTHING; COMMIT; -- let workers see tasks -- GitLab From d4b79115eea583da8c651a10bfd37cd288c958b7 Mon Sep 17 00:00:00 2001 From: Eric Frias Date: Fri, 27 Jun 2025 20:35:07 +0000 Subject: [PATCH 15/15] Avoid long-running transactions. When hivesense is syncing at the same time as hivemind, it may need to sleep until hivemind has processed blocks. We were sleeping with a transaction open, and holding open a long-running transaction kept the autovacuum system from working, and that crippled hivemind and reptracker (at least). Also fix a similar problem with the workers, which were waiting inside a transaction. --- db/main_loop.sql | 104 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 8 deletions(-) diff --git a/db/main_loop.sql b/db/main_loop.sql index 92b3c6f..31f9b0f 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -354,6 +354,7 @@ DECLARE __context_name hive.context_name := _app_context_base_name; -- single context __start_block INT := 0; __blocks_range hive.blocks_range := (0,0); + __planned_range hive.blocks_range; __batch_id BIGINT; __todo INT; __breaking_reason break_reason := NULL; @@ -428,7 +429,8 @@ BEGIN END IF; -- RAISE NOTICE 'Past start block...'; - -- request the next range from HAF + -- request the next range from HAF *inside a tx* … + -- … but roll it back immediately so we don’t keep xmin. CALL hive.app_next_iteration( __context_name, __blocks_range, @@ -436,6 +438,54 @@ BEGIN _limit => _max_block_limit ); + -- keep a copy across the ROLLBACK + __planned_range := __blocks_range; + + ROLLBACK; -- <<< frees RowExclusiveLock on context & xmin + -- pl/pgsql variables survive the ROLLBACK, so __planned_range is safe + + -- nothing to do yet + IF __blocks_range IS NULL THEN + RAISE NOTICE '__blocks_range IS NULL'; + CONTINUE; + END IF; + + ------------------------------------------------------------------ + -- Wait for hivemind in a *read-only* loop (no open tx) + ------------------------------------------------------------------ + LOOP + SELECT hive.app_get_current_block_num('hivemind_app') + INTO __hivemind_current_block; + + EXIT WHEN __hivemind_current_block >= __planned_range.last_block; + + RAISE NOTICE 'Waiting for hivemind to reach block % (currently at %) [not in transaction]', + __planned_range.last_block, + __hivemind_current_block; + ---------------------------------------------------------------- + -- finish the txn *immediately* so backend_xmin is released + ---------------------------------------------------------------- + COMMIT; + + PERFORM pg_sleep(1); + + __breaking_reason := isbreakingpending(__context_name, _max_block_limit, NULL); + IF __breaking_reason IS NOT NULL THEN + RETURN; + END IF; + END LOOP; + + ------------------------------------------------------------------ + -- Re-enter a write tx and (re)claim the same block range. + -- This updates hafd.contexts correctly *after* the long wait. + ------------------------------------------------------------------ + CALL hive.app_next_iteration( + __context_name, + __blocks_range, + _override_max_batch => NULL, + _limit => _max_block_limit + ); + -- RAISE NOTICE 'App_next_iteration returned'; -- check global break conditions @@ -461,7 +511,7 @@ BEGIN SELECT hive.app_get_current_block_num('hivemind_app') INTO __hivemind_current_block; EXIT WHEN __hivemind_current_block >= __blocks_range.last_block; - RAISE NOTICE 'Waiting for hivemind to reach block % (currently at %)', + RAISE NOTICE 'Waiting for hivemind to reach block % (currently at %) [in transaction]', __blocks_range.last_block, __hivemind_current_block; PERFORM pg_sleep(1); @@ -623,6 +673,45 @@ BEGIN END $$; + +CREATE OR REPLACE PROCEDURE hivesense_app.wait_for_advisory_lock(_namespace INT, _worker INT, _timeout_ms INT DEFAULT 5000) +LANGUAGE plpgsql +AS $$ +BEGIN + LOOP + -- a) close out any open tx so xmin can advance + COMMIT; + -- b) bound our blocking lock to _timeout_ms + PERFORM set_config('lock_timeout', _timeout_ms::text, true); + + BEGIN + -- c) normal blocking advisory lock + PERFORM pg_advisory_lock(_namespace, _worker); + -- d) on success, clear the timeout and stop looping + PERFORM set_config('lock_timeout', '0', true); + EXIT; + EXCEPTION + WHEN SQLSTATE '55P03' -- lock_timeout + OR SQLSTATE '57014' -- statement_timeout + THEN + -- If the race gave us the lock just as the timeout fired, + -- detect it in pg_locks, clear timeout, and exit. + IF EXISTS (SELECT FROM pg_locks + WHERE locktype = 'advisory' + AND classid = _namespace + AND objid = _worker + AND pid = pg_backend_pid() + ) THEN + PERFORM set_config('lock_timeout', '0', true); + EXIT; + END IF; + -- otherwise fall through to retry + END; + END LOOP; +END; +$$; + + CREATE OR REPLACE PROCEDURE hivesense_app.worker_loop( IN _worker INT, IN _app_context_name hive.context_name, @@ -656,17 +745,16 @@ BEGIN -- PERFORM set_config('log_lock_waits', 'off', true); -- workers run forever until break conditions tell them to exit LOOP - -------------------------------------------------------------------------------- - -- Grab done_key_i so scheduler can wait on it later. This succeeds - -- immediately the very first time (because we left done_key_i unlocked). - -------------------------------------------------------------------------------- + -------------------------------------------------------------------- + -- Take done_key_i (never blocks for long) + -------------------------------------------------------------------- PERFORM pg_advisory_lock(__done_key_namespace, _worker); -------------------------------------------------------------------------------- -- Now block until the scheduler says “go” by unlocking start_key_i. -- As soon as that happens, we acquire start_key_i. -------------------------------------------------------------------------------- - PERFORM pg_advisory_lock(__start_key_namespace, _worker); + CALL hivesense_app.wait_for_advisory_lock(__start_key_namespace, _worker); -------------------------------------------------------------------------------- -- Immediately release start_key_i so it’s available for the next batch. @@ -746,7 +834,7 @@ BEGIN PERFORM pg_advisory_unlock(__done_key_namespace, _worker); -- Now block on ack_key_i until the scheduler “acks” our done_key_i. - PERFORM pg_advisory_lock(__ack_key_namespace, _worker); + CALL hivesense_app.wait_for_advisory_lock(__ack_key_namespace, _worker); -- As soon as the scheduler does `UNLOCK(ack_key_i)`, this returns. -------------------------------------------------------------------------------- -- GitLab