Optimize subscribe performance to process twice as fast
Based on my testing on 10k subscribe mocks:
develop:
INFO - hive.utils.stats:156 - Community: Pushed 10000 records in 21.8617 seconds
This branch:
INFO - hive.utils.stats:156 - Community: Pushed 10000 records in 11.1936 seconds
Script to generate mocks:
#!/usr/bin/env python3
"""
Script to generate large mock data for testing:
- Block 10: 10 communities (hive-100000 to hive-100009) + 10,000 accounts
- Block 11: 10,000 subscribes distributed across the 10 communities
"""
import json
import sys
from datetime import datetime, timedelta
def generate_mock_data():
"""Generate mock data with communities, accounts, and subscribes."""
base_time = datetime(2020, 1, 1, 12, 0, 0)
communities = [f"hive-287{str(i).zfill(3)}" for i in range(10)]
# Number of accounts and subscribes
num_accounts = 10000
num_subscribes = 10000
print(f"Generating mock data for blocks 10-11...")
print(f"Block 10: Creating {len(communities)} communities and {num_accounts} accounts")
print(f"Block 11: Creating {num_subscribes} subscribes")
# === BLOCK 10: Account Creations ===
block_10_operations = []
expiration_time_10 = (base_time + timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%S")
# 1. Create community accounts
print("Generating community creation operations (Block 10)...")
for i, community_name in enumerate(communities):
block_10_operations.append({
"type": "account_create_operation",
"value": {
"creator": "initminer",
"new_account_name": community_name,
"owner": {
"weight_threshold": 1,
"account_auths": [],
"key_auths": [
["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
]
},
"active": {
"weight_threshold": 1,
"account_auths": [],
"key_auths": [
["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
]
},
"posting": {
"weight_threshold": 1,
"account_auths": [],
"key_auths": [
["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
]
},
"memo_key": "STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25",
"json_metadata": "{}",
"extensions": []
}
})
if (i + 1) % 100 == 0 or (i + 1) == len(communities):
print(f" Generated {i + 1}/{len(communities)} community operations")
# 2. Create regular user accounts
print("Generating account creation operations (Block 10)...")
for i in range(num_accounts):
account_name = f"howo{i}"
block_10_operations.append({
"type": "account_create_operation",
"value": {
"creator": "initminer",
"new_account_name": account_name,
"owner": {
"weight_threshold": 1,
"account_auths": [],
"key_auths": [
["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
]
},
"active": {
"weight_threshold": 1,
"account_auths": [],
"key_auths": [
["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
]
},
"posting": {
"weight_threshold": 1,
"account_auths": [],
"key_auths": [
["STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25", 1]
]
},
"memo_key": "STM8JH4fTJr73FQimysjmXCEh2UvRwZsG6ftjxsVTmYCeEehZgh25",
"json_metadata": "{}",
"extensions": []
}
})
if (i + 1) % 10000 == 0 or (i + 1) == num_accounts:
print(f" Generated {i + 1}/{num_accounts} account operations")
# === BLOCK 11: Subscribe Operations ===
block_11_operations = []
expiration_time_11 = (base_time + timedelta(minutes=8)).strftime("%Y-%m-%dT%H:%M:%S")
# 3. Create subscribe operations distributed across communities
print("Generating subscribe operations (Block 11)...")
for i in range(num_subscribes):
account_name = f"howo{i}"
# Distribute subscribes evenly across the 10 communities
community_name = communities[i % len(communities)]
block_11_operations.append({
"type": "custom_json_operation",
"value": {
"required_auths": [],
"required_posting_auths": [account_name],
"id": "community",
"json": json.dumps(["subscribe", {"community": community_name}])
}
})
if (i + 1) % 10000 == 0 or (i + 1) == num_subscribes:
print(f" Generated {i + 1}/{num_subscribes} subscribe operations")
# Create the transaction structures
transaction_10 = {
"ref_block_num": 10,
"ref_block_prefix": 1,
"expiration": expiration_time_10,
"operations": block_10_operations
}
transaction_11 = {
"ref_block_num": 11,
"ref_block_prefix": 1,
"expiration": expiration_time_11,
"operations": block_11_operations
}
# Create the final mock data structure with two blocks
mock_data = {
"10": {
"transactions": [transaction_10]
},
"11": {
"transactions": [transaction_11]
}
}
return mock_data
def main():
"""Main function to generate and save mock data."""
print("=" * 80)
print("Large Mock Data Generator")
print("=" * 80)
print()
# Generate the mock data
mock_data = generate_mock_data()
# Output file
output_file = "large_mock_block_10.json"
print()
print(f"Writing mock data to {output_file}...")
# Write to file with indentation for readability (warning: large file!)
with open(output_file, 'w') as f:
json.dump(mock_data, f, indent=2)
print(f"Mock data successfully written to {output_file}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\nOperation cancelled by user")
sys.exit(1)
except Exception as e:
print(f"\n\nError: {e}", file=sys.stderr)
sys.exit(1)
Edited by Howo