diff --git a/.gitignore b/.gitignore index ae112248130fa235dae62b779070f81642cc578b..b82e46f22b462519cb9e05963728b494fc595d1b 100644 --- a/.gitignore +++ b/.gitignore @@ -151,3 +151,4 @@ pghero.yml # report file tavern_benchmarks_report.html +endpoints_openapi diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1de1ff7a73fa5611eee31559307d87a247fd2e58..3883346be71467ad8a67aaeebcea083a04dcdb1c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -84,11 +84,10 @@ prepare_haf_image: extends: .prepare_haf_image variables: SUBMODULE_DIR: "$CI_PROJECT_DIR/submodules/haf" - REGISTRY_USER: "$CI_REGISTRY_USER" - REGISTRY_PASS: "$CI_REGISTRY_PASSWORD" + REGISTRY_USER: "$HAF_IMG_BUILDER_USER" + REGISTRY_PASS: "$HAF_IMG_BUILDER_PASSWORD" before_script: - git config --global --add safe.directory $CI_PROJECT_DIR/submodules/haf - - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY tags: - public-runner-docker - hived-for-tests @@ -204,4 +203,4 @@ sync: expire_in: 1 week when: always tags: - - data-cache-storage \ No newline at end of file + - data-cache-storage diff --git a/Dockerfile b/Dockerfile index 05fd52bca2d8ba7e95f38d33555fe0488a846b3a..9f6601093acc7bb214281d35e250b9d7f2ac1e2d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,4 +53,4 @@ COPY db /app/db COPY endpoints /app/endpoints COPY docker/scripts/docker-entrypoint.sh /app/docker-entrypoint.sh -ENTRYPOINT ["/app/docker-entrypoint.sh"] \ No newline at end of file +ENTRYPOINT ["/app/docker-entrypoint.sh"] diff --git a/Dockerfile.syncer b/Dockerfile.syncer new file mode 100644 index 0000000000000000000000000000000000000000..4ece20050385fa3a8cc8ebf106626c6b79a16433 --- /dev/null +++ b/Dockerfile.syncer @@ -0,0 +1,25 @@ +# Stage 1 – base image with only needed packages +FROM registry.gitlab.syncad.com/hive/hivesense/python:3.13-slim + +# Set environment variables to reduce image size and avoid Python warnings +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + POETRY_VIRTUALENVS_CREATE=false + +# Install system packages required for psycopg2 (libpq) +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Create working directory and copy script +WORKDIR /app +COPY scripts/sync_embeddings.py . + +# Install only necessary Python packages +RUN pip install --no-cache-dir \ + psycopg2-binary \ + requests + +# Default command +CMD ["python", "sync_embeddings.py"] diff --git a/db/database_schema.sql b/db/database_schema.sql index bdc8a55b78998011a381b17865015d134248e51f..7a58ddbc86ea2b3983ed93157d05d34dd67ddc47 100644 --- a/db/database_schema.sql +++ b/db/database_schema.sql @@ -39,7 +39,9 @@ BEGIN embedding_dimensionality INT NOT NULL DEFAULT 768, -- size of embedding vectors generated by `llm` min_token_threshold INT NOT NULL DEFAULT 75, -- don't generate embeddings for posts shorter than this number of tokens min_token_search_threshold INT NOT NULL DEFAULT 0, -- ignore posts < this size when *searching* (0 = disabled) - max_embeddings_per_post INT -- max number of chunks per post, NULL for unlimited + max_embeddings_per_post INT, -- max number of chunks per post, NULL for unlimited + advisory_lock_namespace_begin INT, -- start of advisory lock namespace, if running multiple instances, use different values (separated by, say, 10 or so) + max_visible_sync_seq INT NOT NULL DEFAULT 0 -- highest sync sequence number to publish, anything higher may have gaps that will be filled later ); IF NOT hive.app_context_exists(__schema_name) THEN @@ -54,15 +56,28 @@ BEGIN runtime_hash TEXT ); + -- Monotonic sequence that orders every logical operation + CREATE SEQUENCE IF NOT EXISTS hivesense_app.sync_seq; + + -- This table will hold, for every post_id that we generate embeddings for, + -- the total number of tokens in that post. We insert one row per post_id. + CREATE TABLE IF NOT EXISTS hivesense_app.post_data + ( + post_id INT PRIMARY KEY REFERENCES hivemind_app.hive_posts(id), + number_of_tokens INT NOT NULL, + last_vectors_block INT NOT NULL DEFAULT -1 + ); + -- extend to public, to find vector from pgvector EXECUTE format( 'SET SEARCH_PATH TO %s, public', __schema_name ); IF __store_halfvec_embeddings THEN EXECUTE format($$ CREATE TABLE IF NOT EXISTS posts_vectors ( - post_id INT NOT NULL, + post_id INT NOT NULL REFERENCES hivesense_app.post_data(post_id), chunk_number INT NOT NULL, embedding public.halfvec(%s) NOT NULL, + sync_seq INT NOT NULL, -- drawn from hivesense_app.sync_seq PRIMARY KEY (post_id, chunk_number) ); $$, __vector_size); @@ -70,21 +85,16 @@ BEGIN EXECUTE format($$ CREATE TABLE IF NOT EXISTS posts_vectors ( - post_id INT NOT NULL, + post_id INT NOT NULL REFERENCES hivesense_app.post_data(post_id), chunk_number INT NOT NULL, embedding vector(%s) NOT NULL, + sync_seq INT NOT NULL, -- drawn from hivesense_app.sync_seq PRIMARY KEY (post_id, chunk_number) ); $$, __vector_size); END IF; - - -- This table will hold, for every post_id that we generate embeddings for, - -- the total number of tokens in that post. We insert one row per post_id. - CREATE TABLE IF NOT EXISTS hivesense_app.post_data - ( - post_id INT PRIMARY KEY, - number_of_tokens INT NOT NULL - ); + -- Helpful index for “give me everything > after_seq” + CREATE INDEX IF NOT EXISTS posts_vectors_sync_seq_idx ON hivesense_app.posts_vectors(sync_seq); -- the current version of sqlfluff doesn't understand 'GRANT MAINTAIN' EXECUTE format( 'GRANT MAINTAIN ON ALL TABLES IN SCHEMA %s TO hived_group' , __schema_name ); @@ -113,7 +123,9 @@ INSERT INTO hivesense_app_status embedding_dimensionality, min_token_threshold, min_token_search_threshold, - max_embeddings_per_post + max_embeddings_per_post, + advisory_lock_namespace_begin, + max_visible_sync_seq ) VALUES ( @@ -135,7 +147,9 @@ VALUES current_setting('PG_TEMP.VECTOR_SIZE', TRUE)::INT, current_setting('PG_TEMP.MIN_TOKEN_THRESHOLD', TRUE)::INT, current_setting('PG_TEMP.MIN_TOKEN_SEARCH_THRESHOLD', TRUE)::INT, - NULLIF(current_setting('PG_TEMP.MAX_EMBEDINGS_PER_POST', TRUE)::INT, 0) + NULLIF(current_setting('PG_TEMP.MAX_EMBEDINGS_PER_POST', TRUE)::INT, 0), + 10000, + 0 ) ON CONFLICT (id) DO UPDATE SET @@ -152,10 +166,18 @@ CREATE TABLE IF NOT EXISTS hivesense_app.block_tasks ( shard INT, -- worker number (1…N), NULL if unclaimed first_block INT NOT NULL, last_block INT NOT NULL, + post_ids INT[] NOT NULL DEFAULT '{}', status TEXT NOT NULL DEFAULT 'pending', -- pending | running | done claimed_at TIMESTAMPTZ, finished_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS block_tasks_pending_idx ON hivesense_app.block_tasks (status, shard); +-- Table that records logical deletions +CREATE TABLE IF NOT EXISTS hivesense_app.deleted_embeddings ( + post_id INT NOT NULL REFERENCES hivesense_app.post_data (post_id), + sync_seq INT NOT NULL UNIQUE, + PRIMARY KEY (post_id, sync_seq) +); + RESET ROLE; diff --git a/db/helpers.sql b/db/helpers.sql index 6411cf9b08ac2c15786f2ebad2c8c5ba3991565c..3783415112580a62fd68a73fdb054fb4e4b996f8 100644 --- a/db/helpers.sql +++ b/db/helpers.sql @@ -96,29 +96,69 @@ $$; CREATE OR REPLACE PROCEDURE ENSURE_INDEXES_ARE_CREATED() LANGUAGE plpgsql -AS -$$ +AS $$ DECLARE dim int := hivesense_app.embedding_dims(); half boolean := hivesense_app.use_halfvec_index(); store boolean := hivesense_app.store_halfvec_embeddings(); + idx_exists boolean; BEGIN IF store THEN - RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; - CREATE INDEX IF NOT EXISTS posts_vectors_embedding_half_hnsw - ON hivesense_app.posts_vectors - USING hnsw (embedding public.halfvec_cosine_ops); + -- half‐precision on‐disk embedding index + SELECT EXISTS( + SELECT 1 + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'hivesense_app' + AND c.relname = 'posts_vectors_embedding_half_hnsw' + ) INTO idx_exists; + + IF NOT idx_exists THEN + RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; + CREATE INDEX posts_vectors_embedding_half_hnsw + ON hivesense_app.posts_vectors + USING hnsw (embedding public.halfvec_cosine_ops) + WITH (m = 32, ef_construction = 400); + END IF; + ELSIF half THEN - RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; - EXECUTE format( - 'CREATE INDEX IF NOT EXISTS posts_vectors_embedding_half_hnsw - ON hivesense_app.posts_vectors - USING hnsw ((embedding::public.halfvec(%1$s)) public.halfvec_cosine_ops);', dim); + -- half‐precision casted at index time + SELECT EXISTS( + SELECT 1 + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'hivesense_app' + AND c.relname = 'posts_vectors_embedding_half_hnsw' + ) INTO idx_exists; + + IF NOT idx_exists THEN + RAISE NOTICE 'Creating half-precision HNSW index (%s-d)…', dim; + EXECUTE format( + 'CREATE INDEX posts_vectors_embedding_half_hnsw + ON hivesense_app.posts_vectors + USING hnsw ((embedding::public.halfvec(%1$s)) public.halfvec_cosine_ops) + WITH (m = 32, ef_construction = 400)', + dim + ); + END IF; + ELSE - RAISE NOTICE 'Creating full-precision HNSW index (%s-d)…', dim; - CREATE INDEX IF NOT EXISTS posts_vectors_embedding_hnsw - ON hivesense_app.posts_vectors - USING hnsw (embedding public.vector_cosine_ops); + -- full‐precision embedding index + SELECT EXISTS( + SELECT 1 + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'hivesense_app' + AND c.relname = 'posts_vectors_embedding_hnsw' + ) INTO idx_exists; + + IF NOT idx_exists THEN + RAISE NOTICE 'Creating full-precision HNSW index (%s-d)…', dim; + CREATE INDEX posts_vectors_embedding_hnsw + ON hivesense_app.posts_vectors + USING hnsw (embedding public.vector_cosine_ops) + WITH (m = 32, ef_construction = 400); + END IF; END IF; END; $$; diff --git a/db/main_loop.sql b/db/main_loop.sql index b17b0b445e5bf3fcad4397c8ead92760298814b0..31f9b0f2ee44b640d1514e6523db565c07c5de7a 100644 --- a/db/main_loop.sql +++ b/db/main_loop.sql @@ -35,47 +35,38 @@ BEGIN END; $BODY$; -CREATE OR REPLACE FUNCTION hivesense_block_range_data( - _first_block_num INT, - _last_block_num INT, - _logs BOOLEAN, - _worker INT +CREATE OR REPLACE FUNCTION generate_embeddings_for_posts( + _post_ids INT[], + _logs BOOLEAN, + _worker INT ) -RETURNS INT -- NULL need to wait for hivemind, otherwise number of vectorized posts +RETURNS INT LANGUAGE plpgsql VOLATILE PARALLEL SAFE AS $$ DECLARE - __hivemind_current_block INT; - __start_ts timestamptz; - __end_ts timestamptz; - __number_of_posts INT; - __number_of_chunks INT; - - __number_of_workers INT; - __tokenizer_name TEXT; - __max_tokens INT; - __min_new_ratio REAL; - __lang_model TEXT; - __doc_prefix TEXT; - __min_token_threshold INT; + __number_of_posts INT := 0; + __number_of_chunks INT := 0; + __c INT; + __tokenizer_name TEXT; + __max_tokens INT; + __min_new_ratio REAL; + __lang_model TEXT; + __doc_prefix TEXT; + __min_token_threshold INT; __max_embeddings_per_post INT; + rec RECORD; + __sync_seq INT; BEGIN - ASSERT _first_block_num <= _last_block_num, 'Invalid range of blocks'; - - -- will RAISE when hivemind context does not exist - SELECT hive.app_get_current_block_num('hivemind_app') INTO __hivemind_current_block; - SELECT parallel_workers, - tokenizer_model, + SELECT tokenizer_model, tokens_per_chunk, 1 - overlap_amount, sentence_language_model, document_prefix, min_token_threshold, max_embeddings_per_post - INTO __number_of_workers, - __tokenizer_name, + INTO __tokenizer_name, __max_tokens, __min_new_ratio, __lang_model, @@ -85,55 +76,26 @@ BEGIN FROM hivesense_app.hivesense_app_status WHERE id = 1; - ASSERT __number_of_workers IS NOT NULL, 'NULL number of workers'; - ASSERT __number_of_workers > 0 , 'number of workers less than 1'; - - -- hivemind exists - IF __hivemind_current_block < _first_block_num THEN - RETURN NULL; - END IF; - - IF __hivemind_current_block < _last_block_num THEN - RETURN NULL; - END IF; - - -- TODO(mickiewicz@syncad.com) when hivemind is not in a live stage then do not process - -- maybe it is not required because last_completed in enough ? - -- but last completed does not guaranteen index on block_num_created, but maybe this is an edge case - --IF hive.get_current_stage_name( 'hivemind_app' ) != 'live' THEN - -- RETURN NULL; - --END IF; - - IF _logs THEN - RAISE NOTICE 'Hivesense % is processing block range: <%, %>', _worker, _first_block_num, _last_block_num; - __start_ts := clock_timestamp(); - END IF; - - -- The preprocessing step is pretty expensive. Way cheaper than generating embeddings, of course, but - -- still more expensive than postgres thinks. If we're not really careful about our CTEs below, - -- postgresql will happily preprocess posts multiple times. Instead, we don't let it, explicitly - -- preprocessing into a temp table here - CREATE TEMP TABLE tmp_posts_to_vectorize ( - post_id INT PRIMARY KEY, - bodies TEXT[], - token_count INT - ) ON COMMIT DROP; - - -- Fill tmp_posts_to_vectorize exactly once for each post - INSERT INTO tmp_posts_to_vectorize(post_id, bodies, token_count) + /* ===================================================================== + * 0️⃣ PRE-PROCESS EVERY POST ONCE + * --------------------------------- + * We call preprocess_post with _min_token_threshold = 0 so we + * *always* get (token_count, chunks[]). Later we decide whether + * the post is “big enough” (token_count ≥ __min_token_threshold). + * ====================================================================*/ + CREATE TEMP TABLE tmp_pre ON COMMIT DROP AS SELECT - hp.id, - pp.chunks, - pp.token_count - FROM hivemind_app.hive_posts AS hp - JOIN hivemind_app.hive_post_data AS hpd - ON hpd.id = hp.id - - -- Single CROSS JOIN LATERAL that calls preprocess_post exactly once - CROSS JOIN LATERAL ( - SELECT * - FROM preprocess_post( - hpd.title || '.\n\n' || hpd.body, + hp.id AS post_id, + hp.block_num, + COALESCE(pp.token_count, 0) AS token_count, + COALESCE(pp.chunks, ARRAY[]::TEXT[]) AS chunks + FROM unnest(_post_ids) AS sel(id) + JOIN hivemind_app.hive_posts hp ON hp.id = sel.id + LEFT JOIN LATERAL preprocess_post( + /* body ---------------------------------------------------- */ + (SELECT hpd.title || '.\n\n' || hpd.body + FROM hivemind_app.hive_post_data hpd + WHERE hpd.id = hp.id), hp.id, '[permlink disabled]', __tokenizer_name, @@ -141,71 +103,139 @@ BEGIN __min_new_ratio, __lang_model, __max_embeddings_per_post, - TRUE, -- _truncate_long_sentences + TRUE, __doc_prefix, - __min_token_threshold - ) - ) AS pp(chunks, token_count) - - WHERE (hp.root_id = hp.id OR hp.root_id = 0) - AND hp.block_num_created BETWEEN _first_block_num AND _last_block_num - AND pp.chunks IS NOT NULL; - - - -- record the token counts for posterity - INSERT INTO hivesense_app.post_data(post_id, number_of_tokens) - SELECT post_id, token_count - FROM tmp_posts_to_vectorize - ON CONFLICT (post_id) DO NOTHING; - - -- Explode bodies[] into chunks - WITH post_chunks AS ( - SELECT + 0 -- ← return *even if very short* + ) AS pp + ON TRUE; + + /* ===================================================================== + * 1️⃣ CHUNKS FOR POSTS THAT ARE STILL “BIG ENOUGH” + * ====================================================================*/ + CREATE TEMP TABLE tmp_chunks ON COMMIT DROP AS + SELECT post_id, - bodies[idx] AS chunk_text, - (idx - 1) AS chunk_number - FROM tmp_posts_to_vectorize - CROSS JOIN LATERAL generate_subscripts(bodies, 1) AS idx - ), id_and_body_agg AS ( - SELECT - ARRAY_AGG( (pc.post_id, pc.chunk_text, pc.chunk_number) - ::hivesense_app.id_and_post_chunk ) AS id_and_body - FROM post_chunks pc - ), embeddings AS ( - SELECT - (pv).post_id, - (pv).chunk_number, - (pv).vec - FROM ( - SELECT UNNEST(hivesense_app.hivesense_embed( ibagg.id_and_body) - ) AS pv - FROM id_and_body_agg ibagg - ) AS subquery - ), insert_into_posts_vectors AS ( - INSERT INTO hivesense_app.posts_vectors(post_id, chunk_number, embedding) - SELECT e.post_id, - e.chunk_number, - CASE - WHEN hivesense_app.store_halfvec_embeddings() - THEN e.vec::public.halfvec - ELSE e.vec - END - FROM embeddings e + generate_subscripts(chunks, 1) - 1 AS chunk_number, + chunks[generate_subscripts(chunks, 1)] AS chunk_text, + token_count, + block_num + FROM tmp_pre + WHERE token_count >= __min_token_threshold + AND array_length(chunks, 1) IS NOT NULL; + + /* ===================================================================== + * 2️⃣ EMBED THOSE CHUNKS + * ====================================================================*/ + CREATE TEMP TABLE tmp_vectors ON COMMIT DROP AS + WITH all_chunks AS ( + SELECT ARRAY_AGG( + (post_id, chunk_text, chunk_number) + ::hivesense_app.id_and_post_chunk + ORDER BY post_id, chunk_number + ) AS arr + FROM tmp_chunks ) SELECT - (SELECT CARDINALITY(id_and_body) FROM id_and_body_agg), - (SELECT COUNT(*) FROM embeddings) - INTO __number_of_posts, __number_of_chunks; + pv.post_id, + pv.chunk_number, + pv.vec + FROM all_chunks + CROSS JOIN LATERAL UNNEST(hivesense_app.hivesense_embed(all_chunks.arr)) AS pv; + + /* ===================================================================== + * 3️⃣ POSTS THAT NOW PRODUCE ZERO CHUNKS + * (true deletions / under-threshold edits) + * ====================================================================*/ + CREATE TEMP TABLE tmp_empty_posts ON COMMIT DROP AS + SELECT post_id, + block_num, + token_count + FROM tmp_pre + WHERE token_count < __min_token_threshold + OR array_length(chunks,1) IS NULL; + + /* nothing to do if no deletions */ + IF EXISTS (SELECT 1 FROM tmp_empty_posts) THEN + + /* ➤ fresh sync_seq for every empty post */ + CREATE TEMP TABLE tmp_empty_seq ON COMMIT DROP AS + SELECT post_id, + nextval('hivesense_app.sync_seq') AS sync_seq + FROM tmp_empty_posts; + + /* ➤ delete lingering vectors */ + DELETE FROM hivesense_app.posts_vectors pv + USING tmp_empty_posts ep + WHERE pv.post_id = ep.post_id; + + /* ➤ record the logical deletion */ + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + SELECT post_id, sync_seq + FROM tmp_empty_seq; + + /* ➤ bring post_data up to date */ + INSERT INTO hivesense_app.post_data(post_id, number_of_tokens, last_vectors_block) + SELECT post_id, token_count, block_num + FROM tmp_empty_posts + ON CONFLICT (post_id) DO UPDATE + SET number_of_tokens = EXCLUDED.number_of_tokens, + last_vectors_block = EXCLUDED.last_vectors_block; + END IF; - -- clean up - DROP TABLE tmp_posts_to_vectorize; + /* ===================================================================== + * (the original code starting at + * “-- 3) per-post sync_seq, delete & insert” continues unmodified) + * ====================================================================*/ - --RAISE NOTICE 'End of hivesense_block_range_data, % posts, % chunks', __number_of_posts, __number_of_chunks; - RETURN coalesce(__number_of_posts, 0); -END; -$$; + -- 3) per-post sync_seq, delete & insert + FOR rec IN + SELECT DISTINCT post_id, block_num, + MAX(token_count) AS tcnt + FROM tmp_chunks + GROUP BY post_id, block_num + LOOP + SELECT nextval('hivesense_app.sync_seq') INTO __sync_seq; + + IF EXISTS ( + SELECT 1 FROM hivesense_app.posts_vectors + WHERE post_id = rec.post_id + ) THEN + DELETE FROM hivesense_app.posts_vectors + WHERE post_id = rec.post_id; + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + VALUES (rec.post_id, __sync_seq); + END IF; + + INSERT INTO hivesense_app.posts_vectors( + post_id, chunk_number, embedding, sync_seq + ) + SELECT + post_id, + chunk_number, + CASE WHEN hivesense_app.store_halfvec_embeddings() + THEN vec::public.halfvec + ELSE vec + END, + __sync_seq + FROM tmp_vectors + WHERE post_id = rec.post_id + ORDER BY chunk_number; + + GET DIAGNOSTICS __c = ROW_COUNT; + __number_of_chunks := __number_of_chunks + __c; + + UPDATE hivesense_app.post_data + SET number_of_tokens = rec.tcnt, + last_vectors_block = rec.block_num + WHERE post_id = rec.post_id; + + __number_of_posts := __number_of_posts + 1; + END LOOP; + RETURN __number_of_posts; +END; +$$; CREATE OR REPLACE PROCEDURE hivesense_massive_processing( IN _from INT, IN _to INT, IN _logs BOOLEAN, IN _worker INT, OUT _done INT @@ -320,23 +350,27 @@ CREATE OR REPLACE PROCEDURE hivesense_app.scheduler( LANGUAGE plpgsql AS $$ DECLARE - __context_name hive.context_name := _app_context_base_name; -- single context - __start_block INT := 0; - __blocks_range hive.blocks_range := (0,0); - __batch_id BIGINT; - __todo INT; - __breaking_reason break_reason := NULL; - __blocks INT; - __blocks_per_chunk INT; - __number_of_chunks INT; - __chunks_per_worker INT; - __extra INT; - __from_block INT; - __to_block INT; - _start_key BIGINT; - _done_key BIGINT; - _ack_key BIGINT; - __shard INT; + __hivemind_current_block INT; + __context_name hive.context_name := _app_context_base_name; -- single context + __start_block INT := 0; + __blocks_range hive.blocks_range := (0,0); + __planned_range hive.blocks_range; + __batch_id BIGINT; + __todo INT; + __breaking_reason break_reason := NULL; + __blocks INT; + __blocks_per_chunk INT; + __number_of_chunks INT; + __chunks_per_worker INT; + __extra INT; + __from_block INT; + __to_block INT; + __advisory_lock_namespace_begin INT; + __start_key_namespace INT; + __done_key_namespace INT; + __ack_key_namespace INT; + __shard INT; + __posts_per_chunk INT; BEGIN -- by default, postgresql logs when threads are blocked on a lock for more than a second. -- we use locks for synchronization, and expect threads to be blocked for at least 3s @@ -346,27 +380,27 @@ BEGIN -- -- PERFORM set_config('deadlock_timeout', '5s', true); -- PERFORM set_config('log_lock_waits', 'off', true); + + -- read configured start_block + SELECT start_block, advisory_lock_namespace_begin INTO __start_block, __advisory_lock_namespace_begin + FROM hivesense_app.hivesense_app_status; + + __start_key_namespace := __advisory_lock_namespace_begin; + __done_key_namespace := __advisory_lock_namespace_begin + 1; + __ack_key_namespace := __advisory_lock_namespace_begin + 2; + -------------------------------------------------------------------------------- -- **At initialization: acquire every start_key_i** so that workers block. - -- - -- We'll define: - -- start_key_i := 10_000_000 + i - -- done_key_i := 20_000_000 + i - -- ack_key_i := 30_000_000 + i - -- -- Here, we grab start_key_i and ack_key_i. done_key_i is left unlocked, -- so that as soon as a worker tries to lock it at loop-top, it succeeds. -------------------------------------------------------------------------------- FOR __shard IN 1.._workers LOOP - PERFORM pg_advisory_lock(10_000_000 + __shard); -- hold start_key_i - PERFORM pg_advisory_lock(30_000_000 + __shard); -- hold ack_key_i + PERFORM pg_advisory_lock(__start_key_namespace, __shard); -- hold start_key_i + PERFORM pg_advisory_lock(__ack_key_namespace, __shard); -- hold ack_key_i END LOOP; RAISE NOTICE 'Scheduler: start_keys locked for all % workers; entering main loop...', _workers; - -- read configured start_block - SELECT start_block INTO __start_block - FROM hivesense_app.hivesense_app_status; IF _max_block_limit IS NOT NULL THEN RAISE NOTICE 'Max block limit is specified as: %', _max_block_limit; @@ -394,8 +428,9 @@ BEGIN CONTINUE; END IF; - RAISE NOTICE 'Past start block...'; - -- request the next range from HAF + -- RAISE NOTICE 'Past start block...'; + -- request the next range from HAF *inside a tx* … + -- … but roll it back immediately so we don’t keep xmin. CALL hive.app_next_iteration( __context_name, __blocks_range, @@ -403,13 +438,58 @@ BEGIN _limit => _max_block_limit ); - RAISE NOTICE 'App_next_iteration returned'; + -- keep a copy across the ROLLBACK + __planned_range := __blocks_range; + + ROLLBACK; -- <<< frees RowExclusiveLock on context & xmin + -- pl/pgsql variables survive the ROLLBACK, so __planned_range is safe + + -- nothing to do yet + IF __blocks_range IS NULL THEN + RAISE NOTICE '__blocks_range IS NULL'; + CONTINUE; + END IF; + + ------------------------------------------------------------------ + -- Wait for hivemind in a *read-only* loop (no open tx) + ------------------------------------------------------------------ + LOOP + SELECT hive.app_get_current_block_num('hivemind_app') + INTO __hivemind_current_block; + + EXIT WHEN __hivemind_current_block >= __planned_range.last_block; + + RAISE NOTICE 'Waiting for hivemind to reach block % (currently at %) [not in transaction]', + __planned_range.last_block, + __hivemind_current_block; + ---------------------------------------------------------------- + -- finish the txn *immediately* so backend_xmin is released + ---------------------------------------------------------------- + COMMIT; + + PERFORM pg_sleep(1); + + __breaking_reason := isbreakingpending(__context_name, _max_block_limit, NULL); + IF __breaking_reason IS NOT NULL THEN + RETURN; + END IF; + END LOOP; + + ------------------------------------------------------------------ + -- Re-enter a write tx and (re)claim the same block range. + -- This updates hafd.contexts correctly *after* the long wait. + ------------------------------------------------------------------ + CALL hive.app_next_iteration( + __context_name, + __blocks_range, + _override_max_batch => NULL, + _limit => _max_block_limit + ); + + -- RAISE NOTICE 'App_next_iteration returned'; -- check global break conditions - __breaking_reason := isbreakingpending( - __context_name, - _max_block_limit, - __blocks_range); + __breaking_reason := isbreakingpending(__context_name, _max_block_limit, __blocks_range); IF __breaking_reason IS NOT NULL THEN ROLLBACK; IF __breaking_reason = 'BLOCK_LIMIT_REACHED' @@ -426,51 +506,83 @@ BEGIN CONTINUE; END IF; - ------------------------------------------------------------------- - -- Split the range into contiguous slices and enqueue one per shard - ------------------------------------------------------------------- - __blocks := __blocks_range.last_block - __blocks_range.first_block + 1; - __chunks_per_worker := 50; - __number_of_chunks := _workers * __chunks_per_worker; - __blocks_per_chunk := GREATEST(1, CEILING(__blocks / (_workers * __chunks_per_worker))); - __extra := __blocks % __number_of_chunks; -- first ⟂extra⟂ chunks get +1 - - __batch_id := nextval('hivesense_app.batch_seq'); - __from_block := __blocks_range.first_block; - - RAISE NOTICE 'Splitting range % to %', __blocks_range.first_block, __blocks_range.last_block; - WHILE __from_block <= __blocks_range.last_block LOOP - -- RAISE NOTICE 'SCHEDULER: computing work for'; - __to_block := __from_block + __blocks_per_chunk - 1; - IF __extra > 0 THEN - __to_block := __to_block + 1; - __extra := __extra - 1; - END IF; - - IF __to_block > __blocks_range.last_block THEN - __to_block := __blocks_range.last_block; + -- wait until hivemind has processed through this range + LOOP + SELECT hive.app_get_current_block_num('hivemind_app') + INTO __hivemind_current_block; + EXIT WHEN __hivemind_current_block >= __blocks_range.last_block; + RAISE NOTICE 'Waiting for hivemind to reach block % (currently at %) [in transaction]', + __blocks_range.last_block, + __hivemind_current_block; + PERFORM pg_sleep(1); + __breaking_reason := isbreakingpending(__context_name, _max_block_limit, NULL); + IF __breaking_reason IS NOT NULL THEN + RETURN; END IF; + END LOOP; - -- only insert if this worker actually has work - IF __to_block >= __from_block THEN - -- RAISE NOTICE 'SCHEDULER: enqueuing work [%, %]', __from_block, __to_block; - INSERT INTO hivesense_app.block_tasks( - batch_id, - shard, - first_block, - last_block - ) - VALUES ( - __batch_id, - NULL, -- unclaimed - __from_block, - __to_block - ); - END IF; + /* ------------------------------------------------------------------ + * Build ordered list of posts in this range + * ----------------------------------------------------------------*/ + __batch_id := nextval('hivesense_app.batch_seq'); + WITH posts AS ( + SELECT hp.id AS post_id, + GREATEST(hp.block_num_created, hp.block_num) AS blk + FROM hivemind_app.hive_posts hp + JOIN hivemind_app.hive_post_data hpd USING(id) + WHERE (hp.root_id = hp.id OR hp.root_id = 0) + AND ( + hp.block_num_created BETWEEN __blocks_range.first_block AND __blocks_range.last_block + OR hp.block_num BETWEEN __blocks_range.first_block AND __blocks_range.last_block + ) + ), + numbered AS ( + SELECT post_id, + blk, + ROW_NUMBER() OVER (ORDER BY blk) AS rn, + COUNT(*) OVER () AS total_posts + FROM posts + ), + chunked AS ( + SELECT post_id, + blk, + ((rn - 1) / CEILING(total_posts::NUMERIC / + (_workers * 50)))::INT AS chunk_idx + FROM numbered + ), + grouped AS ( + SELECT chunk_idx, + ARRAY_AGG(post_id ORDER BY post_id) AS pids, + MIN(blk) AS first_blk, + MAX(blk) AS last_blk + FROM chunked + GROUP BY chunk_idx + ) + INSERT INTO hivesense_app.block_tasks( + batch_id, shard, post_ids, first_block, last_block + ) + SELECT + __batch_id, + NULL, + pids, + first_blk, + last_blk + FROM grouped; + + LOCK TABLE hivemind_app.hive_posts IN SHARE MODE; + + /* bulk-upsert post_data for this batch (unchanged but re-uses grouped) */ + INSERT INTO hivesense_app.post_data (post_id, number_of_tokens, last_vectors_block) + SELECT id, 0, -1 + FROM ( + SELECT DISTINCT UNNEST(post_ids) AS id + FROM hivesense_app.block_tasks + WHERE batch_id = __batch_id + ) AS src + ORDER BY id -- ☚ guarantees lock order + ON CONFLICT (post_id) DO NOTHING; - __from_block := __to_block + 1; - END LOOP; - COMMIT; -- we have to commit here so the workers can pick up the tasks + COMMIT; -- let workers see tasks -------------------------------------------------------------------------------- -- WAKE ALL WORKERS by dropping each start_key_i @@ -481,8 +593,7 @@ BEGIN -- • then signal back on done_key_i. -------------------------------------------------------------------------------- FOR __shard IN 1.._workers LOOP - _start_key := 10_000_000 + __shard; - PERFORM pg_advisory_unlock(_start_key); + PERFORM pg_advisory_unlock(__start_key_namespace, __shard); END LOOP; -------------------------------------------------------------------------------- @@ -497,24 +608,21 @@ BEGIN -- We do it in the order: LOCK(done_key_i) → UNLOCK(done_key_i) → UNLOCK(ack_key_i) → LOCK(ack_key_i) → LOCK(start_key_i) -------------------------------------------------------------------------------- FOR __shard IN 1.._workers LOOP - _done_key := 20_000_000 + __shard; - _ack_key := 30_000_000 + __shard; - _start_key := 10_000_000 + __shard; -- Wait for worker_i to signal “done”: - PERFORM pg_advisory_lock(_done_key); + PERFORM pg_advisory_lock(__done_key_namespace, __shard); -- Immediately drop done_key_i so that next time the worker can LOCK it: - PERFORM pg_advisory_unlock(_done_key); + PERFORM pg_advisory_unlock(__done_key_namespace, __shard); -- ACK the worker’s “done” by unlocking ack_key_i - PERFORM pg_advisory_unlock(_ack_key); + PERFORM pg_advisory_unlock(__ack_key_namespace, __shard); -- re‐grab ack_key_i so that the next time the worker tries to LOCK it, it will block - PERFORM pg_advisory_lock(_ack_key); + PERFORM pg_advisory_lock(__ack_key_namespace, __shard); -- Pre‐lock start_key_i again so that the worker will block on it next loop - PERFORM pg_advisory_lock(_start_key); + PERFORM pg_advisory_lock(__start_key_namespace, __shard); END LOOP; ------------------------------------------------------------------- @@ -539,6 +647,14 @@ BEGIN -- clean up the tasks table, the tasks are all done, we don't need to keep that info around forever TRUNCATE hivesense_app.block_tasks; + -- mark all new sync_seq as visible only after batch completion + UPDATE hivesense_app.hivesense_app_status + SET max_visible_sync_seq = GREATEST( + COALESCE((SELECT MAX(sync_seq) FROM hivesense_app.posts_vectors),0), + COALESCE((SELECT MAX(sync_seq) FROM hivesense_app.deleted_embeddings),0) + ) + WHERE id = 1; + ------------------------------------------------------------------- -- After the batch, create indexes when appropriate ------------------------------------------------------------------- @@ -558,6 +674,44 @@ END $$; +CREATE OR REPLACE PROCEDURE hivesense_app.wait_for_advisory_lock(_namespace INT, _worker INT, _timeout_ms INT DEFAULT 5000) +LANGUAGE plpgsql +AS $$ +BEGIN + LOOP + -- a) close out any open tx so xmin can advance + COMMIT; + -- b) bound our blocking lock to _timeout_ms + PERFORM set_config('lock_timeout', _timeout_ms::text, true); + + BEGIN + -- c) normal blocking advisory lock + PERFORM pg_advisory_lock(_namespace, _worker); + -- d) on success, clear the timeout and stop looping + PERFORM set_config('lock_timeout', '0', true); + EXIT; + EXCEPTION + WHEN SQLSTATE '55P03' -- lock_timeout + OR SQLSTATE '57014' -- statement_timeout + THEN + -- If the race gave us the lock just as the timeout fired, + -- detect it in pg_locks, clear timeout, and exit. + IF EXISTS (SELECT FROM pg_locks + WHERE locktype = 'advisory' + AND classid = _namespace + AND objid = _worker + AND pid = pg_backend_pid() + ) THEN + PERFORM set_config('lock_timeout', '0', true); + EXIT; + END IF; + -- otherwise fall through to retry + END; + END LOOP; +END; +$$; + + CREATE OR REPLACE PROCEDURE hivesense_app.worker_loop( IN _worker INT, IN _app_context_name hive.context_name, @@ -566,13 +720,21 @@ CREATE OR REPLACE PROCEDURE hivesense_app.worker_loop( LANGUAGE plpgsql AS $$ DECLARE - __task RECORD; - __done_posts INT; - __breaking_reason break_reason := NULL; - _start_key BIGINT := 10_000_000 + _worker; - _done_key BIGINT := 20_000_000 + _worker; - _ack_key BIGINT := 30_000_000 + _worker; + __task RECORD; + __done_posts INT; + __breaking_reason break_reason := NULL; + __advisory_lock_namespace_begin INT; + __start_key_namespace INT; + __done_key_namespace INT; + __ack_key_namespace INT; BEGIN + SELECT advisory_lock_namespace_begin INTO __advisory_lock_namespace_begin + FROM hivesense_app.hivesense_app_status; + + __start_key_namespace := __advisory_lock_namespace_begin; + __done_key_namespace := __advisory_lock_namespace_begin + 1; + __ack_key_namespace := __advisory_lock_namespace_begin + 2; + -- by default, postgresql logs when threads are blocked on a lock for more than a second. -- we use locks for synchronization, and expect threads to be blocked for at least 3s -- at a time. Disable that logging to avoid spamming the log file @@ -583,22 +745,21 @@ BEGIN -- PERFORM set_config('log_lock_waits', 'off', true); -- workers run forever until break conditions tell them to exit LOOP - -------------------------------------------------------------------------------- - -- Grab done_key_i so scheduler can wait on it later. This succeeds - -- immediately the very first time (because we left done_key_i unlocked). - -------------------------------------------------------------------------------- - PERFORM pg_advisory_lock(_done_key); + -------------------------------------------------------------------- + -- Take done_key_i (never blocks for long) + -------------------------------------------------------------------- + PERFORM pg_advisory_lock(__done_key_namespace, _worker); -------------------------------------------------------------------------------- -- Now block until the scheduler says “go” by unlocking start_key_i. -- As soon as that happens, we acquire start_key_i. -------------------------------------------------------------------------------- - PERFORM pg_advisory_lock(_start_key); + CALL hivesense_app.wait_for_advisory_lock(__start_key_namespace, _worker); -------------------------------------------------------------------------------- -- Immediately release start_key_i so it’s available for the next batch. -------------------------------------------------------------------------------- - PERFORM pg_advisory_unlock(_start_key); + PERFORM pg_advisory_unlock(__start_key_namespace, _worker); LOOP ------------------------------------------------------------------ @@ -606,11 +767,12 @@ BEGIN ------------------------------------------------------------------ -- RAISE NOTICE 'getting new task for worker %', _worker; SELECT task_id, + post_ids, first_block, last_block - INTO __task - FROM hivesense_app.block_tasks - WHERE status = 'pending' + INTO __task + FROM hivesense_app.block_tasks + WHERE status = 'pending' FOR UPDATE SKIP LOCKED LIMIT 1; @@ -638,22 +800,22 @@ BEGIN -- Execute the heavy work for this block sub-range ------------------------------------------------------------------ -- RAISE NOTICE 'worker % processing block range % to %', _worker, __task.first_block, __task.last_block; - SELECT hivesense_block_range_data( - __task.first_block, - __task.last_block, - TRUE, -- logs - _worker -- keep original param order - ) - INTO __done_posts; - - RAISE NOTICE 'worker % processed block range % to % (% blocks) containing % posts', _worker, __task.first_block, __task.last_block, __task.last_block - __task.first_block, __done_posts; - -- If Hivemind isn’t caught up yet → rollback & wait0 - IF __done_posts IS NULL THEN - ROLLBACK; - PERFORM pg_sleep(5); - CONTINUE; + IF __task.last_block = __task.first_block THEN + RAISE NOTICE 'worker % processing block % (% posts)', + _worker, + __task.first_block, + array_length(__task.post_ids,1); + ELSE + RAISE NOTICE 'worker % processing blocks % to % (% blocks, % posts)', + LPAD(_worker::text, 2), + __task.first_block, + __task.last_block, + __task.last_block - __task.first_block + 1, + array_length(__task.post_ids,1); END IF; + SELECT generate_embeddings_for_posts(__task.post_ids, TRUE, _worker) INTO __done_posts; + ------------------------------------------------------------------ -- Mark task finished ------------------------------------------------------------------ @@ -669,16 +831,16 @@ BEGIN -- We have emptied the queue (or hit break conditions). -- Signal “I’m done” by UNLOCKing done_key_i. -------------------------------------------------------------------------------- - PERFORM pg_advisory_unlock(_done_key); + PERFORM pg_advisory_unlock(__done_key_namespace, _worker); -- Now block on ack_key_i until the scheduler “acks” our done_key_i. - PERFORM pg_advisory_lock(_ack_key); + CALL hivesense_app.wait_for_advisory_lock(__ack_key_namespace, _worker); -- As soon as the scheduler does `UNLOCK(ack_key_i)`, this returns. -------------------------------------------------------------------------------- -- Immediately release ack_key_i so that the scheduler can lock it for the next iteration -------------------------------------------------------------------------------- - PERFORM pg_advisory_unlock(_ack_key); + PERFORM pg_advisory_unlock(__ack_key_namespace, _worker); __breaking_reason := isbreakingpending(_app_context_name, _max_block_limit, NULL); IF __breaking_reason IS NOT NULL THEN diff --git a/db/search.sql b/db/search.sql index 57bd1faf1aedec5cf90625a98799f2468528c2c0..0ef181f23d5e88438898457cfdb3d086a51945cf 100644 --- a/db/search.sql +++ b/db/search.sql @@ -31,13 +31,37 @@ DECLARE max_posts int := LEAST(_limit, 1000); -- if _start_post_id=0 we collect immediately; otherwise skip until we see it collecting bool := (_start_post_id = 0); - batch_size int := GREATEST(_limit * 5, 50); -- start with 5× - sql text; + + -- pull headers and defaults + req_headers json; + batch_multiplier int := 5; -- default multiplier + exploratory_factor int := 1000; -- default ef_search + + -- batch size will be set in BEGIN + batch_size int; + sql text; __min_search_tokens int; BEGIN + -- grab the incoming headers JSON (if any) + SELECT current_setting('request.headers', true)::json + INTO req_headers; + + -- override defaults if headers are present + batch_multiplier := COALESCE( + (req_headers->>'x-batch-size-multiplier')::int, + batch_multiplier + ); + exploratory_factor := COALESCE( + (req_headers->>'x-exploratory-factor')::int, + exploratory_factor + ); + + -- initialize batch_size using the (possibly overridden) multiplier + batch_size := GREATEST(_limit * batch_multiplier, 50); + -- tune pgvector index parameters - PERFORM set_config('ivfflat.probes', '4', true); - PERFORM set_config('hnsw.ef_search', '1000', true); + PERFORM set_config('ivfflat.probes', '4', true); + PERFORM set_config('hnsw.ef_search', exploratory_factor::text, true); SELECT min_token_search_threshold INTO __min_search_tokens @@ -47,7 +71,7 @@ BEGIN RAISE NOTICE 'In find_nearest_posts_with_embedding(vec, %, %, %, %)', _limit, _exclude_post_id, _observer_id, _start_post_id; LOOP - RAISE NOTICE 'Getting % posts', batch_size; + RAISE NOTICE 'Getting % posts (batch_size: %)', batch_size, batch_size; sql := format($q$ SELECT hpv.post_id, %s AS similarity FROM hivesense_app.posts_vectors hpv diff --git a/docker/rewriter_entrypoint.sh b/docker/rewriter_entrypoint.sh index 0072e77ea772815ffcae2eabdd9ae85761ab7a7b..530b8673939fece5a8093c2312d39d4a75ba8f41 100755 --- a/docker/rewriter_entrypoint.sh +++ b/docker/rewriter_entrypoint.sh @@ -11,4 +11,4 @@ fi sed "s|\${REWRITE_LOG}|$REWRITE_LOG|g" /usr/local/openresty/nginx/conf/nginx.conf.template > /usr/local/openresty/nginx/conf/nginx.conf # Start nginx -/usr/local/openresty/bin/openresty -g 'daemon off;' +exec /usr/local/openresty/bin/openresty -g 'daemon off;' diff --git a/endpoints/embedding_updates.sql b/endpoints/embedding_updates.sql new file mode 100644 index 0000000000000000000000000000000000000000..4a10d9921a74bd641fb4ed16ffdeb29e290584cf --- /dev/null +++ b/endpoints/embedding_updates.sql @@ -0,0 +1,118 @@ +SET ROLE hivesense_owner; + +/** openapi:paths +/embedding-updates: + get: + tags: + - AI + summary: Stream post-level embedding operations since a given sequence number + operationId: hivesense_endpoints.embedding_updates + parameters: + - in: query + name: after_seq + required: true + schema: { type: integer } + description: | + Clients pass the highest sync_seq they have applied. + The server returns every operation with sync_seq > after_seq. + - in: query + name: page_size + required: true + schema: { type: integer } + description: Maximum number of operations to return. + responses: + '200': + description: JSON array of operations, ordered by sync_seq. + content: + application/json: + schema: { type: string, x-sql-datatype: JSONB } +*/ +-- openapi-generated-code-begin +DROP FUNCTION IF EXISTS hivesense_endpoints.embedding_updates; +CREATE OR REPLACE FUNCTION hivesense_endpoints.embedding_updates( + "after_seq" INT, + "page_size" INT +) +RETURNS JSONB +-- openapi-generated-code-end +LANGUAGE plpgsql STABLE +AS +$$ +DECLARE + __result JSONB; + __max_visible INT; +BEGIN + -- only expose completed sync_seq up to this threshold + SELECT max_visible_sync_seq + INTO __max_visible + FROM hivesense_app.hivesense_app_status + WHERE id = 1; + + WITH joined_ops AS ( + SELECT + COALESCE(pv.sync_seq, de.sync_seq) AS sync_seq, + COALESCE(pv.post_id, de.post_id) AS post_id, + CASE + WHEN pv.sync_seq IS NOT NULL AND de.sync_seq IS NOT NULL THEN 'update' + WHEN pv.sync_seq IS NOT NULL THEN 'insert' + ELSE 'delete' + END AS op + FROM ( + SELECT DISTINCT sync_seq, post_id + FROM hivesense_app.posts_vectors + WHERE sync_seq > after_seq + AND sync_seq <= __max_visible + ) pv + FULL OUTER JOIN ( + SELECT sync_seq, post_id + FROM hivesense_app.deleted_embeddings + WHERE sync_seq > after_seq + AND sync_seq <= __max_visible + ) de USING (sync_seq, post_id) + ), + ops_with_embeddings AS ( + SELECT + jo.sync_seq, + jo.op, + ha.name AS author, + hpd.permlink AS permlink, + pd.number_of_tokens, + pd.last_vectors_block, + CASE + WHEN jo.op IN ('insert','update') THEN ( + -- array of embeddings, ordered by chunk_number + SELECT jsonb_agg(to_jsonb(pv2.embedding::real[]) ORDER BY pv2.chunk_number) + FROM hivesense_app.posts_vectors pv2 + WHERE pv2.sync_seq = jo.sync_seq + AND pv2.post_id = jo.post_id + ) + ELSE '[]'::JSONB + END AS embeddings + FROM joined_ops jo + JOIN hivemind_app.hive_posts hp ON hp.id = jo.post_id + JOIN hivemind_app.hive_accounts ha ON ha.id = hp.author_id + JOIN hivemind_app.hive_permlink_data hpd ON hpd.id = hp.permlink_id + JOIN hivesense_app.post_data pd ON pd.post_id = jo.post_id + ORDER BY jo.sync_seq + LIMIT "page_size" + ) + + SELECT jsonb_agg( + jsonb_build_object( + 'sync_seq', sync_seq, + 'op', op, + 'author', author, + 'permlink', permlink, + 'number_of_tokens', number_of_tokens, + 'last_vectors_block', last_vectors_block, + 'embeddings', embeddings + ) + ) + INTO __result + FROM ops_with_embeddings; + + RETURN COALESCE(__result, '[]'::JSONB); +END +$$; + +RESET ROLE; diff --git a/endpoints/endpoint_schema.sql b/endpoints/endpoint_schema.sql index 3c017ada18c73d1a20d0e2b180056a0972ed83f0..2c0a4e8f85e77025be28d2a79798a442422a67d3 100644 --- a/endpoints/endpoint_schema.sql +++ b/endpoints/endpoint_schema.sql @@ -299,6 +299,48 @@ DO $__$ } } } + }, + "/embedding-updates": { + "get": { + "tags": [ + "AI" + ], + "summary": "Stream post-level embedding operations since a given sequence number", + "operationId": "hivesense_endpoints.embedding_updates", + "parameters": [ + { + "in": "query", + "name": "after_seq", + "required": true, + "schema": { + "type": "integer" + }, + "description": "Clients pass the highest sync_seq they have applied. \nThe server returns every operation with sync_seq > after_seq.\n" + }, + { + "in": "query", + "name": "page_size", + "required": true, + "schema": { + "type": "integer" + }, + "description": "Maximum number of operations to return." + } + ], + "responses": { + "200": { + "description": "JSON array of operations, ordered by sync_seq.", + "content": { + "application/json": { + "schema": { + "type": "string", + "x-sql-datatype": "JSONB" + } + } + } + } + } + } } } } diff --git a/readme.md b/readme.md index 3208d7dc99bc1c4b49dce1cfe156e9ce03a13b54..a434741398461572777d7dd344bc9f4964f58c8b 100644 --- a/readme.md +++ b/readme.md @@ -298,7 +298,7 @@ add hivesense to `COMPOSE_PROFILES` variable in th `.env` file. To customize set ```bash registry.gitlab.syncad.com/hive/hivesense:<8 digit git sha> - registry.gitlab.syncad.com/hive/hivesense/rewiter:<8 digit git sha> + registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter:<8 digit git sha> ``` With using switch '--push' new images will also be pushed to registry diff --git a/rewrite_rules.conf b/rewrite_rules.conf index aafe6ce323bf882393d492abf648fbbaf1d2c9e7..05c621cbeb2f17c3b131c5023d8ab03c66591bd7 100644 --- a/rewrite_rules.conf +++ b/rewrite_rules.conf @@ -1,12 +1,15 @@ +rewrite ^/embedding-updates /rpc/embedding_updates break; +# endpoint for get /embedding-updates + +rewrite ^/thematiccontributors /rpc/find_thematic_contributors break; +# endpoint for get /thematiccontributors + rewrite ^/similarpostsbypost /rpc/get_similar_posts_by_post break; # endpoint for get /similarpostsbypost rewrite ^/similarposts /rpc/get_similar_posts break; # endpoint for get /similarposts -rewrite ^/thematiccontributors /rpc/find_thematic_contributors break; -# endpoint for get /similarposts - rewrite ^/$ / break; # endpoint for openapi spec itself diff --git a/scripts/build_images.sh b/scripts/build_images.sh index d501a5c5513803865fbca338b508d18ca9187904..7981024e8660af73107c83d42f67cf7cecf33d8f 100755 --- a/scripts/build_images.sh +++ b/scripts/build_images.sh @@ -51,12 +51,14 @@ set -eu pipefail SCRIPTPATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" docker build -t "registry.gitlab.syncad.com/hive/hivesense:${TAG}" "${SCRIPTPATH}/.." -docker build -t "registry.gitlab.syncad.com/hive/hivesense/rewiter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." +docker build -t "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.rewriter" "${SCRIPTPATH}/.." +docker build -t "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" -f "${SCRIPTPATH}/../Dockerfile.syncer" "${SCRIPTPATH}/.." echo "Build images tag ${TAG}" if [ -n "${PUSH:-}" ]; then docker push "registry.gitlab.syncad.com/hive/hivesense:${TAG}" - docker push "registry.gitlab.syncad.com/hive/hivesense/rewiter:${TAG}" + docker push "registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter:${TAG}" + docker push "registry.gitlab.syncad.com/hive/hivesense/syncer:${TAG}" echo "Pushed images tag ${TAG}" fi diff --git a/scripts/ci-helpers/compose_variables.sh b/scripts/ci-helpers/compose_variables.sh index e255961e5c6bd615b907197b30a0b5504f5ec8a6..9c91fefd31071fce4beceadb93c767451d81bedb 100644 --- a/scripts/ci-helpers/compose_variables.sh +++ b/scripts/ci-helpers/compose_variables.sh @@ -39,7 +39,7 @@ HIVEMIND_SYNC_ARGS="--test-max-block=${NUMBER_OF_BLOCKS_TO_SYNC}" # Hivesense HIVESENSE_IMAGE="registry.gitlab.syncad.com/hive/hivesense" HIVESENSE_VERSION="$HIVESENSE_TAG" -HIVESENSE_REWRITER_IMAGE="registry.gitlab.syncad.com/hive/hivesense/rewiter" +HIVESENSE_REWRITER_IMAGE="registry.gitlab.syncad.com/hive/hivesense/postgrest-rewriter" HIVESENSE_SYNC_ARGS="--stop-at-block=${NUMBER_OF_BLOCKS_TO_SYNC}" diff --git a/scripts/ci-helpers/wait-for-hivesense-startup.sh b/scripts/ci-helpers/wait-for-hivesense-startup.sh index 05677b12514802fd283767b5ffa0ade6113673d9..10282d46cbc8d5cd3c5c5ff1bf6caff1dfc438f6 100755 --- a/scripts/ci-helpers/wait-for-hivesense-startup.sh +++ b/scripts/ci-helpers/wait-for-hivesense-startup.sh @@ -7,7 +7,7 @@ wait_for_hivesense_startup() { MESSAGE="Waiting for Hivesense to finish processing blocks..." HIVEMIND_BLOCK_COMMAND="SELECT last_completed_block_num FROM hivemind_app.hive_state" HAF_BLOCK_COMMAND="SELECT consistent_block FROM hafd.hive_state" - HIVESENSE_BLOCK_COMMAND="SELECT hive.app_get_current_block_num('hivesense_app1')" + HIVESENSE_BLOCK_COMMAND="SELECT hive.app_get_current_block_num('hivesense_app')" i=0 while : diff --git a/scripts/install_app.sh b/scripts/install_app.sh index a8a07ae8fda32908a5c6138ca44aede4c4288b6f..1fc2815227dfdb468d27ecf964fa8e616b5156c1 100755 --- a/scripts/install_app.sh +++ b/scripts/install_app.sh @@ -212,6 +212,7 @@ psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET custom.swagger_url = '$SWAGG psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/find_similar_posts.sql" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/get_similar_posts_by_post.sql" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/find_thematic_contributors.sql" +psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET SEARCH_PATH TO ${HIVESENSE_SCHEMA};" -f "$SRCPATH/endpoints/embedding_updates.sql" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET ROLE hivesense_owner;GRANT USAGE ON SCHEMA ${HIVESENSE_SCHEMA} to hivesense_user;" psql "$POSTGRES_ACCESS" -v ON_ERROR_STOP=on -c "SET ROLE hivesense_owner;GRANT SELECT ON ALL TABLES IN SCHEMA ${HIVESENSE_SCHEMA} TO hivesense_user;" diff --git a/scripts/openapi_rewrite.sh b/scripts/openapi_rewrite.sh index c8aade4418ef546cbf9b688323ba0e26e2f6e944..21bfe2892d39515a285a3fc6f7492bcb0b6e11a4 100755 --- a/scripts/openapi_rewrite.sh +++ b/scripts/openapi_rewrite.sh @@ -18,7 +18,8 @@ ENDPOINTS_IN_ORDER=" ../$endpoints/endpoint_schema.sql ../$endpoints/find_similar_posts.sql ../$endpoints/get_similar_posts_by_post.sql -../$endpoints/find_thematic_contributors.sql" +../$endpoints/find_thematic_contributors.sql +../$endpoints/embedding_updates.sql" # Function to reverse the lines reverse_lines() { diff --git a/scripts/sync_embeddings.py b/scripts/sync_embeddings.py new file mode 100755 index 0000000000000000000000000000000000000000..896ee49b887c9297dea2ef8bf6ebd473e0f9fa19 --- /dev/null +++ b/scripts/sync_embeddings.py @@ -0,0 +1,188 @@ +import os +import time +import logging +import requests +import psycopg2 +from psycopg2.extras import execute_values +from datetime import datetime, timezone + +API_URL = os.environ.get("HIVESENSE_EMBED_API") # e.g. https://upstream/embedding-updates +DB_DSN = os.environ.get("POSTGRES_URI") # postgres://… (same schema names) + +BATCH = 1000 +RETRY_SLEEP = 3 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s" +) + + +def get_last_seq(cur) -> int: + cur.execute(""" + SELECT COALESCE(MAX(sync_seq), 0) FROM ( + SELECT MAX(sync_seq) AS sync_seq FROM hivesense_app.posts_vectors + UNION ALL + SELECT MAX(sync_seq) FROM hivesense_app.deleted_embeddings + ) AS t; + """) + return cur.fetchone()[0] or 0 + + +POST_LOOKUP_SQL = """ +SELECT hp.id + FROM hivemind_app.hive_posts hp + JOIN hivemind_app.hive_accounts ha ON ha.id = hp.author_id + JOIN hivemind_app.hive_permlink_data pd ON pd.id = hp.permlink_id + WHERE ha.name = %s + AND pd.permlink = %s + AND hp.counter_deleted = 0 + LIMIT 1; +""" + + +def resolve_post_id(cur, author, permlink): + cur.execute(POST_LOOKUP_SQL, (author, permlink)) + row = cur.fetchone() + return row[0] if row else None + + +def apply_op(cur, op, post_id): + """ + Apply a single operation; assumes post_id already resolved. + Uses a savepoint so individual ops can roll back without aborting the whole batch. + """ + cur.execute("SAVEPOINT op_sp") + try: + if op["op"] == "delete": + cur.execute( + "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", + (post_id,) + ) + cur.execute( + """ + INSERT INTO hivesense_app.deleted_embeddings(post_id, sync_seq) + VALUES (%s, %s) + ON CONFLICT (sync_seq, post_id) DO NOTHING + """, + (post_id, op["sync_seq"]) + ) + else: + cur.execute( + "DELETE FROM hivesense_app.posts_vectors WHERE post_id = %s", + (post_id,) + ) + upsert_vectors(cur, post_id, op["sync_seq"], op["embeddings"]) + cur.execute( + """ + INSERT INTO hivesense_app.post_data + (post_id, number_of_tokens, last_vectors_block) + VALUES (%s, %s, %s) + ON CONFLICT (post_id) DO UPDATE + SET number_of_tokens = EXCLUDED.number_of_tokens, + last_vectors_block = EXCLUDED.last_vectors_block + """, + (post_id, op["number_of_tokens"], op["last_vectors_block"]) + ) + except Exception: + cur.execute("ROLLBACK TO SAVEPOINT op_sp") + raise + finally: + cur.execute("RELEASE SAVEPOINT op_sp") + + +def main(): + conn = psycopg2.connect(DB_DSN, autocommit=False) + # Fast-fail on deadlocks + with conn.cursor() as cur: + cur.execute("SET deadlock_timeout = '1s'") + + while True: + # determine how far we've synced + with conn.cursor() as cur: + after_seq = get_last_seq(cur) + + response = requests.get( + API_URL, + params={"after_seq": after_seq, "page_size": BATCH}, + timeout=60 + ) + response.raise_for_status() + ops = response.json() + if not ops: + time.sleep(5) + continue + + # wait until hivemind has caught up to the highest block in this batch + block_nums = [op.get('last_vectors_block') for op in ops if op.get('last_vectors_block') is not None] + max_block = max(block_nums) if block_nums else None + if max_block is not None: + while True: + with conn.cursor() as cur: + cur.execute("SELECT hive.app_get_current_block_num('hivemind_app')") + head = cur.fetchone()[0] + if head >= max_block: + break + logging.info( + "Waiting for hivemind head block %s (currently at %s)", + max_block, head + ) + time.sleep(1) + + # resolve all post_ids (should now exist) + resolved = [] # list of tuples (op, post_id) + with conn.cursor() as cur: + for op in ops: + post_id = resolve_post_id(cur, op["author"], op["permlink"]) + if post_id is None: + raise RuntimeError( + f"Post {op['author']}/{op['permlink']} not found after block {max_block}" + ) + resolved.append((op, post_id)) + + # apply each operation in one batch transaction + with conn.cursor() as cur: + for op, post_id in resolved: + logging.info( + "Applying %s %s/%s (seq %s, block %s)", + op["op"], op["author"], op["permlink"], + op["sync_seq"], op.get("last_vectors_block") + ) + apply_op(cur, op, post_id) + + # advance local sequence & visibility + max_seq = max(op["sync_seq"] for op, _ in resolved) + cur.execute( + "SELECT setval('hivesense_app.sync_seq', %s, true)", + (max_seq,) + ) + cur.execute( + "UPDATE hivesense_app.hivesense_app_status + SET max_visible_sync_seq = %s + WHERE id = 1", + (max_seq,) + ) + conn.commit() + + # Optionally create indexes if caught up + if max_block is not None: + with conn.cursor() as cur: + cur.execute( + "SELECT created_at FROM hafd.blocks WHERE num = %s", + (max_block,) + ) + row = cur.fetchone() + if row: + created_at = row[0].replace(tzinfo=timezone.utc) + age = datetime.now(timezone.utc) - created_at + if age.total_seconds() <= 60: + logging.info( + "Latest block %s is recent (%.1fs old); creating indexes", + max_block, age.total_seconds() + ) + cur.execute("CALL hivesense_app.ensure_indexes_are_created()") + conn.commit() + + +if __name__ == "__main__": + main() diff --git a/submodules/haf b/submodules/haf index 61e4e13a348ea3deac6ac118bfc7b99e23fec0be..9ef8b5df2abe939aa0e4b219b903b2cd4fcc0bb5 160000 --- a/submodules/haf +++ b/submodules/haf @@ -1 +1 @@ -Subproject commit 61e4e13a348ea3deac6ac118bfc7b99e23fec0be +Subproject commit 9ef8b5df2abe939aa0e4b219b903b2cd4fcc0bb5 diff --git a/submodules/hivemind b/submodules/hivemind index 1242d377bf0bc6b35ede2567e7a0340f2efdd178..e37876656e274b5657b682a2c57978603340a4e6 160000 --- a/submodules/hivemind +++ b/submodules/hivemind @@ -1 +1 @@ -Subproject commit 1242d377bf0bc6b35ede2567e7a0340f2efdd178 +Subproject commit e37876656e274b5657b682a2c57978603340a4e6 diff --git a/tests/integration/api_node/hivesense_synced_api_node_test.sh b/tests/integration/api_node/hivesense_synced_api_node_test.sh index 31c744aa7123f9516f623a6c9d09d9d23942268d..495e2d6fd528555c507aa6ca572efdcfa0e912ac 100755 --- a/tests/integration/api_node/hivesense_synced_api_node_test.sh +++ b/tests/integration/api_node/hivesense_synced_api_node_test.sh @@ -9,7 +9,7 @@ query_database() { } # 1. check if hivesense is synced -hivesense_block_num=$(query_database "SELECT current_block_num FROM hafd.contexts WHERE name = 'hivesense_app1'") +hivesense_block_num=$(query_database "SELECT current_block_num FROM hafd.contexts WHERE name = 'hivesense_app'") if [ "$hivesense_block_num" -ne 1000000 ]; then echo "Current block num ${hivesense_block_num} != 1000000" >&2 exit 1