Optimize subscribe performance to process twice as fast

Based on my testing on 10k subscribe mocks:

develop:

INFO - hive.utils.stats:156 - Community: Pushed 10000 records in 21.8617 seconds

This branch:

INFO - hive.utils.stats:156 - Community: Pushed 10000 records in 11.1936 seconds

Script to generate mocks:

#!/usr/bin/env python3
"""
Script to generate large mock data for testing:
- Block 10: 10 communities (hive-100000 to hive-100009) + 10,000 accounts
- Block 11: 10,000 subscribes distributed across the 10 communities
"""

import json
import sys
from datetime import datetime, timedelta


def generate_mock_data():
    """Generate mock data with communities, accounts, and subscribes."""

    base_time = datetime(2020, 1, 1, 12, 0, 0)

    communities = [f"hive-287{str(i).zfill(3)}" for i in range(10)]

    # Number of accounts and subscribes
    num_accounts = 10000
    num_subscribes = 10000

    print(f"Generating mock data for blocks 10-11...")
    print(f"Block 10: Creating {len(communities)} communities and {num_accounts} accounts")
    print(f"Block 11: Creating {num_subscribes} subscribes")

    # === BLOCK 10: Account Creations ===
    block_10_operations = []
    expiration_time_10 = (base_time + timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%S")

    # 1. Create community accounts
    print("Generating community creation operations (Block 10)...")
    for i, community_name in enumerate(communities):
        block_10_operations.append({
            "type": "account_create_operation",
            "value": {
                "creator": "initminer",
                "new_account_name": community_name,
                "owner": {
                    "weight_threshold": 1,
                    "account_auths": [],
                    "key_auths": [
                        ["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
                    ]
                },
                "active": {
                    "weight_threshold": 1,
                    "account_auths": [],
                    "key_auths": [
                        ["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
                    ]
                },
                "posting": {
                    "weight_threshold": 1,
                    "account_auths": [],
                    "key_auths": [
                        ["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
                    ]
                },
                "memo_key": "STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25",
                "json_metadata": "{}",
                "extensions": []
            }
        })
        if (i + 1) % 100 == 0 or (i + 1) == len(communities):
            print(f"  Generated {i + 1}/{len(communities)} community operations")

    # 2. Create regular user accounts
    print("Generating account creation operations (Block 10)...")
    for i in range(num_accounts):
        account_name = f"howo{i}"
        block_10_operations.append({
            "type": "account_create_operation",
            "value": {
                "creator": "initminer",
                "new_account_name": account_name,
                "owner": {
                    "weight_threshold": 1,
                    "account_auths": [],
                    "key_auths": [
                        ["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
                    ]
                },
                "active": {
                    "weight_threshold": 1,
                    "account_auths": [],
                    "key_auths": [
                        ["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
                    ]
                },
                "posting": {
                    "weight_threshold": 1,
                    "account_auths": [],
                    "key_auths": [
                        ["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
                    ]
                },
                "memo_key": "STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25",
                "json_metadata": "{}",
                "extensions": []
            }
        })
        if (i + 1) % 10000 == 0 or (i + 1) == num_accounts:
            print(f"  Generated {i + 1}/{num_accounts} account operations")

    # === BLOCK 11: Subscribe Operations ===
    block_11_operations = []
    expiration_time_11 = (base_time + timedelta(minutes=8)).strftime("%Y-%m-%dT%H:%M:%S")

    # 3. Create subscribe operations distributed across communities
    print("Generating subscribe operations (Block 11)...")
    for i in range(num_subscribes):
        account_name = f"howo{i}"
        # Distribute subscribes evenly across the 10 communities
        community_name = communities[i % len(communities)]

        block_11_operations.append({
            "type": "custom_json_operation",
            "value": {
                "required_auths": [],
                "required_posting_auths": [account_name],
                "id": "community",
                "json": json.dumps(["subscribe", {"community": community_name}])
            }
        })
        if (i + 1) % 10000 == 0 or (i + 1) == num_subscribes:
            print(f"  Generated {i + 1}/{num_subscribes} subscribe operations")

    # Create the transaction structures
    transaction_10 = {
        "ref_block_num": 10,
        "ref_block_prefix": 1,
        "expiration": expiration_time_10,
        "operations": block_10_operations
    }

    transaction_11 = {
        "ref_block_num": 11,
        "ref_block_prefix": 1,
        "expiration": expiration_time_11,
        "operations": block_11_operations
    }

    # Create the final mock data structure with two blocks
    mock_data = {
        "10": {
            "transactions": [transaction_10]
        },
        "11": {
            "transactions": [transaction_11]
        }
    }

    return mock_data


def main():
    """Main function to generate and save mock data."""
    print("=" * 80)
    print("Large Mock Data Generator")
    print("=" * 80)
    print()

    # Generate the mock data
    mock_data = generate_mock_data()

    # Output file
    output_file = "large_mock_block_10.json"

    print()
    print(f"Writing mock data to {output_file}...")

    # Write to file with indentation for readability (warning: large file!)
    with open(output_file, 'w') as f:
        json.dump(mock_data, f, indent=2)

    print(f"Mock data successfully written to {output_file}")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n\nOperation cancelled by user")
        sys.exit(1)
    except Exception as e:
        print(f"\n\nError: {e}", file=sys.stderr)
        sys.exit(1)
Edited by Howo

Merge request reports

Loading