Commit b846e5f1 authored by inertia's avatar inertia
Browse files

Merge branch 'get-block-range' into 'master'

Get block range

Closes #1

See merge request !3
parents 6927f6ab 7230bb75
Pipeline #26141 failed with stage
......@@ -153,9 +153,13 @@ namespace :stream do
first_block_num = args[:at_block_num].to_i if !!args[:at_block_num]
stream = Hive::Stream.new(url: ENV['TEST_NODE'], mode: mode)
api = Hive::Api.new(url: ENV['TEST_NODE'])
block_api = Hive::BlockApi.new(url: ENV['TEST_NODE'])
last_block_num = nil
last_timestamp = nil
range_complete = false
round_pool = {}
aging_blocks = {}
aged_block_interval = 630
api.get_dynamic_global_properties do |properties|
current_block_num = if mode == :head
......@@ -165,13 +169,14 @@ namespace :stream do
end
# First pass replays latest a random number of blocks to test chunking.
first_block_num ||= current_block_num - (rand * 200).to_i
first_block_num ||= current_block_num - (rand * 2000).to_i
range = first_block_num..current_block_num
puts "Initial block range: #{range.size}"
stream.blocks(at_block_num: range.first) do |block, block_num|
current_timestamp = Time.parse(block.timestamp + 'Z')
round_pool[current_timestamp] = {block_num: block_num, block: block}
if !range_complete && block_num > range.last
puts 'Done with initial range.'
......@@ -188,9 +193,35 @@ namespace :stream do
exit
end
puts "\t#{block_num} Timestamp: #{current_timestamp}, witness: #{block.witness}"
round_pool.each do |k, v|
aging_blocks[k] = v if Time.now - k > aged_block_interval
end
round_pool = round_pool.select{|k, v| Time.now - k <= aged_block_interval}.to_h
drift = last_timestamp.nil? ? 0 : (current_timestamp - last_timestamp) - Hive::Stream::BLOCK_INTERVAL.to_f
puts "\t#{block_num} Timestamp: #{current_timestamp}, witness: #{block.witness}, aging blocks: #{aging_blocks.size}, drift: #{drift}"
last_block_num = block_num
last_timestamp = current_timestamp
if range_complete && aging_blocks.any?
aging_block_nums = aging_blocks.map{|k, v| v[:block_num]}
wire_block_range = (aging_block_nums.first..aging_block_nums.last)
block_api.get_block_headers(block_range: wire_block_range) do |wire_header, wire_block_num|
wire_timestamp = Time.parse(wire_header.timestamp + 'Z')
aging_block = aging_blocks[wire_timestamp][:block]
if wire_header.previous == aging_block.previous
puts "\t\tAged block test #{wire_block_num}: √"
aging_blocks.delete(wire_timestamp)
else
puts "\t\tAged block test #{wire_block_num}: detected block-reorganization (#{wire_header.previous} != #{aging_block.previous})"
exit
end
end
end
end
end
end
......@@ -247,6 +278,8 @@ namespace :stream do
first_block_num = args[:at_block_num].to_i if !!args[:at_block_num]
stream = Hive::Stream.new(url: ENV['TEST_NODE'], mode: mode)
api = Hive::Api.new(url: ENV['TEST_NODE'])
ah_api = Hive::AccountHistoryApi.new(url: ENV['TEST_NODE'])
round_vops = {}
api.get_dynamic_global_properties do |properties|
current_block_num = if mode == :head
......@@ -259,6 +292,31 @@ namespace :stream do
first_block_num ||= current_block_num - (rand * 200).to_i
stream.operations(at_block_num: first_block_num, only_virtual: true) do |op, trx_id, block_num|
# 126 is about two shuffle rounds (if mode == :head), we need to avoid
# the current block_num because we're still in the middle of reading
# all of the vops for that block.
if round_vops.size > 126 && !round_vops.include?(block_num)
ah_api.enum_virtual_ops(block_range_begin: round_vops.keys.min, block_range_end: round_vops.keys.max + 1, include_reversible: true) do |result|
round_vops.each do |k, v|
later_ops = result.ops.select{|vop| vop.block == k}
if (verify_count = later_ops.size) == v.size
puts "\t\t#{k} :: streamed vop count was #{v.size} √"
else
puts "\t\t#{k} :: streamed vop count was #{v.size}, later became #{verify_count}"
puts "\t\t\t#{v.map{|op| op.type}.join(', ')}"
puts "\t\tLater ops:\n\t\t\t#{later_ops.map{|vop| vop.op.type}.join(', ')}"
exit
end
end
end
round_vops = {}
end
round_vops[block_num] ||= []
round_vops[block_num] << op
puts "#{block_num} :: #{trx_id}; op: #{op.type}"
end
end
......@@ -286,6 +344,78 @@ namespace :stream do
end
end
end
desc 'Test the ability to stream all operations (including virtual) that match a pattern.'
task :op_pattern, [:pattern, :mode, :at_block_num] do |t, args|
mode = (args[:mode] || 'irreversible').to_sym
first_block_num = args[:at_block_num].to_i if !!args[:at_block_num]
stream = Hive::Stream.new(url: ENV['TEST_NODE'], mode: mode)
api = Hive::Api.new(url: ENV['TEST_NODE'])
pattern = /#{args[:pattern]}/i
api.get_dynamic_global_properties do |properties|
current_block_num = if mode == :head
properties.head_block_number
else
properties.last_irreversible_block_num
end
# First pass replays latest a random number of blocks to test chunking.
first_block_num ||= current_block_num - (rand * 200).to_i
stream.operations(at_block_num: first_block_num, include_virtual: true) do |op, trx_id, block_num|
next unless op.to_json =~ pattern
puts "#{block_num} :: #{trx_id}; op: #{op.to_json}"
end
end
end
desc 'Test the ability to stream all effective_comment_vote_operation operations.'
task :effective_comment_vote_operation, [:mode, :at_block_num] do |t, args|
mode = (args[:mode] || 'irreversible').to_sym
first_block_num = args[:at_block_num].to_i if !!args[:at_block_num]
stream = Hive::Stream.new(url: ENV['TEST_NODE'], mode: mode, no_warn: true)
api = Hive::Api.new(url: ENV['TEST_NODE'])
api.get_dynamic_global_properties do |properties|
current_block_num = if mode == :head
properties.head_block_number
else
properties.last_irreversible_block_num
end
# First pass replays latest a random number of blocks to test chunking.
first_block_num ||= current_block_num - (rand * 200).to_i
stream.operations(at_block_num: first_block_num, include_virtual: true) do |op, trx_id, block_num|
next unless op.type == 'effective_comment_vote_operation'
pending_payout = Hive::Type::Amount.new(op.value.pending_payout)
puts "#{block_num} :: #{trx_id}; voter: #{op.value.voter}, author: #{op.value.author}, pending_payout: #{pending_payout}"
end
end
end
end
desc 'List hardforks.'
task :hardforks do
database_api = Hive::DatabaseApi.new(url: ENV['TEST_NODE'])
block_api = Hive::BlockApi.new(url: ENV['TEST_NODE'])
ah_api = Hive::AccountHistoryApi.new(url: ENV['TEST_NODE'])
last_hf_timestamp = block_api.get_block(block_num: 1) do |result|
Time.parse(result.block.timestamp + 'Z')
end
database_api.get_hardfork_properties do |properties|
processed_hardforks = properties.processed_hardforks
processed_hardforks.each_with_index do |timestamp, index|
timestamp = Time.parse(timestamp + 'Z')
puts "HF#{index}: #{timestamp}"
end
end
end
YARD::Rake::YardocTask.new do |t|
......
......@@ -32,7 +32,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'json', '~> 2.1', '>= 2.1.0'
spec.add_dependency 'logging', '~> 2.2', '>= 2.2.0'
spec.add_dependency 'hashie', '~> 4.1', '>= 3.5.7'
spec.add_dependency 'hashie', '>= 3.5'
spec.add_dependency 'bitcoin-ruby', '~> 0.0', '0.0.20'
spec.add_dependency 'ffi', '~> 1.9', '>= 1.9.23'
spec.add_dependency 'bindata', '~> 2.4', '>= 2.4.4'
......
......@@ -193,7 +193,18 @@ module Hive
# Some argument are optional, but if the arguments passed are greater
# than the expected arguments size, we can warn.
if args_size > expected_args_size
@error_pipe.puts "Warning #{rpc_method_name} expects arguments: #{expected_args_size}, got: #{args_size}"
if rpc_method_name == 'account_history_api.get_account_history' && expected_args_size == 3 && args_size == 6
# TODO Remove this condition if they ever fix this issue:
# https://gitlab.syncad.com/hive/hive/-/issues/100
elsif rpc_method_name == 'account_history_api.get_ops_in_block' && expected_args_size == 2 && args_size == 3
# TODO Remove this condition if they ever fix this issue:
# https://gitlab.syncad.com/hive/hive/-/issues/100
elsif rpc_method_name == 'account_history_api.enum_virtual_ops' && expected_args_size == 2 && args_size == 3
# TODO Remove this condition if they ever fix this issue:
# https://gitlab.syncad.com/hive/hive/-/issues/100
else
@error_pipe.puts "Warning #{rpc_method_name} expects arguments: #{expected_args_size}, got: #{args_size}"
end
end
rescue NoMethodError => e
error = Hive::ArgumentError.new("#{rpc_method_name} expects arguments: #{expected_args_size}", e)
......
......@@ -6,6 +6,8 @@ module Hive
# Also see: {https://developers.hive.io/apidefinitions/block-api.html Block API Definitions}
class BlockApi < Api
MAX_RANGE_SIZE = 50
MAX_NO_BATCH_RANGE_SIZE = 200
MAX_NO_BATCH_NO_RANGE_SIZE = 1
def initialize(options = {})
self.class.api_name = :block_api
......@@ -20,24 +22,30 @@ module Hive
get_block_objects(options.merge(object: :block_header), block)
end
# Uses a batched requst on a range of blocks.
# Uses get_block_range (or batched requsts) on a range of blocks.
#
# @param options [Hash] The attributes to get a block range with.
# @option options [Range] :block_range starting on one block number and ending on an higher block number.
def get_blocks(options = {block_range: (0..0)}, &block)
# @option options [Boolean] :use_batch use json-rpc batch instead of get_block_range (preferred)
def get_blocks(options = {block_range: (0..0), use_batch: false}, &block)
get_block_objects(options.merge(object: :block), block)
end
private
def get_block_objects(options = {block_range: (0..0)}, block = nil)
def get_block_objects(options = {block_range: (0..0), use_batch: false}, block = nil)
object = options[:object]
object_method = "get_#{object}".to_sym
block_range = options[:block_range] || (0..0)
use_batch = !!options[:use_batch]
object = :block_range if object == :block && !use_batch
object_method = "get_#{object}".to_sym
if (start = block_range.first) < 1
if !!block_range && block_range.any? && (start = block_range.first) < 1
raise Hive::ArgumentError, "Invalid starting block: #{start}"
end
chunks = if block_range.size > MAX_RANGE_SIZE
chunks = if object == :block_range
block_range.each_slice(MAX_NO_BATCH_RANGE_SIZE)
elsif block_range.size > MAX_RANGE_SIZE
block_range.each_slice(MAX_RANGE_SIZE)
else
[block_range]
......@@ -46,27 +54,65 @@ module Hive
for sub_range in chunks do
request_object = []
for i in sub_range do
@rpc_client.put(self.class.api_name, object_method, block_num: i, request_object: request_object)
if !!use_batch
for i in sub_range do
@rpc_client.put(self.class.api_name, object_method, block_num: i, request_object: request_object)
end
else
case object
when :block_header
# Must use json-rpc batch for block headers request.
for i in sub_range do
@rpc_client.put(self.class.api_name, :get_block_header, block_num: i, request_object: request_object)
end
when :block, :block_range
if sub_range.size == 1
@rpc_client.put(self.class.api_name, :get_block, block_num: sub_range.first, request_object: request_object)
else
@rpc_client.put(self.class.api_name, :get_block_range, starting_block_num: sub_range.first, count: sub_range.size, request_object: request_object)
end
end
end
if !!block
index = 0
@rpc_client.rpc_batch_execute(api_name: self.class.api_name, request_object: request_object) do |result, error, id|
raise Hive::RemoteNodeError, error.to_json if !!error
block_num = sub_range.to_a[index]
index = index + 1
case object
when :block_header
block.call(result.nil? ? nil : result[:header], block_num)
block.call(result[:header], block_num)
else
block.call(result.nil? ? nil : result[object], block_num)
if !!use_batch || !!result[:block]
block.call(result[:block] || result[object], block_num)
else
current_block_num = block_num
result[:blocks].each do |b|
# Now verify that the previous block_num really is the
# previous block.
decoded_previous_block_num = b.previous[0..7].to_i(16)
previous_block_num = current_block_num - 1
unless decoded_previous_block_num == previous_block_num
raise Hive::RemoteNodeError, "Wrong block_num. Got #{decoded_previous_block_num}, expected #{previous_block_num}"
end
block.call(b, current_block_num)
current_block_num = current_block_num + 1
end
end
end
end
else
blocks = []
@rpc_client.rpc_batch_execute(api_name: self.class.api_name, request_object: request_object) do |result, error, id|
raise Hive::RemoteNodeError, error.to_json if !!error
blocks << result
end
end
......
......@@ -5,7 +5,7 @@ module Hive
include Utils
# IDs derrived from:
# https://gitlab.syncad.com/hive/hive/-/blob/master/libraries/protocol/include/steem/protocol/operations.hpp
# https://gitlab.syncad.com/hive/hive/-/blob/master/libraries/protocol/include/hive/protocol/operations.hpp
IDS = [
:vote_operation,
......@@ -64,18 +64,18 @@ module Hive
:create_proposal_operation,
:update_proposal_votes_operation,
:remove_proposal_operation,
:update_proposal_operation,
# SMT operations
:claim_reward_balance2_operation,
:smt_setup_operation,
:smt_cap_reveal_operation,
:smt_refund_operation,
:smt_setup_emissions_operation,
:smt_set_setup_parameters_operation,
:smt_set_runtime_parameters_operation,
:smt_create_operation,
:smt_contribute_operation
] + VIRTUAL_OP_IDS = [
# virtual operations below this point
:fill_convert_request_operation,
:author_reward_operation,
......@@ -92,7 +92,16 @@ module Hive
:return_vesting_delegation_operation,
:comment_benefactor_reward_operation,
:producer_reward_operation,
:clear_null_account_balance_operation
:clear_null_account_balance_operation,
:proposal_pay_operation,
:sps_fund_operation,
:hardfork_hive_operation,
:hardfork_hive_restore_operation,
:delayed_voting_operation,
:consolidate_treasury_balance_operation,
:effective_comment_vote_operation,
:ineffective_delete_comment_operation,
:sps_convert_operation
]
def self.op_id(op)
......
......@@ -62,12 +62,13 @@ module Hive
response = nil
loop do
sub_options = options.dup
request = http_post(api_name)
request_object = if !!api_name && !!api_method
put(api_name, api_method, options)
elsif !!options && defined?(options.delete)
options.delete(:request_object)
put(api_name, api_method, sub_options)
elsif !!options && defined?(sub_options.delete)
sub_options.delete(:request_object)
end
if request_object.size > JSON_RPC_BATCH_SIZE_MAXIMUM
......@@ -124,7 +125,7 @@ module Hive
raise_error_response rpc_method_name, rpc_args, r
rescue *TIMEOUT_ERRORS => e
timeout_detected = true
timeout_cause = nil
timeout_cause = JSON[e.message]['error'] + " while posting: #{rpc_args}" rescue e.to_s
break # fail fast
end
......
......@@ -35,7 +35,9 @@ module Hive
MAX_RETRY_COUNT = 10
VOP_TRX_ID = ('0' * 40).freeze
MAX_VOP_READ_AHEAD = 100
SHUFFLE_ROUND_LENGTH = 21
# @param options [Hash] additional options
# @option options [Hive::DatabaseApi] :database_api
# @option options [Hive::BlockApi] :block_api
......@@ -92,7 +94,7 @@ module Hive
def transactions(options = {}, &block)
blocks(options) do |block, block_num|
if block.nil?
warn "Batch missing block_num: #{block_num}, retrying ..."
warn "Batch missing block_num: #{block_num}, retrying ..." unless @no_warn
block = block_api.get_block(block_num: block_num) do |result|
result.block
......@@ -214,6 +216,10 @@ module Hive
only_virtual = false
include_virtual = false
last_block_num = nil
within_shuffle_round = nil
initial_head_block_number = database_api.get_dynamic_global_properties do |dgpo|
dgpo.head_block_number
end
case args.first
when Hash
......@@ -226,7 +232,9 @@ module Hive
if only_virtual
block_numbers(options) do |block_num|
get_virtual_ops(types, block_num, block)
within_shuffle_round ||= initial_head_block_number - block_num < SHUFFLE_ROUND_LENGTH * 2
get_virtual_ops(types, block_num, within_shuffle_round, block)
end
else
transactions(options) do |transaction, trx_id, block_num|
......@@ -236,8 +244,9 @@ module Hive
next unless last_block_num != block_num
last_block_num = block_num
within_shuffle_round ||= initial_head_block_number - block_num < SHUFFLE_ROUND_LENGTH * 2
get_virtual_ops(types, block_num, block) if include_virtual
get_virtual_ops(types, block_num, within_shuffle_round, block) if include_virtual
end
end
end
......@@ -257,6 +266,7 @@ module Hive
object = options[:object]
object_method = "get_#{object}".to_sym
block_interval = BLOCK_INTERVAL
use_block_range = true
at_block_num, until_block_num = if !!block_range = options[:block_range]
[block_range.first, block_range.last]
......@@ -281,9 +291,32 @@ module Hive
block_interval = BLOCK_INTERVAL
end
else
block_api.send(object_method, block_range: range) do |b, n|
block.call b, n
block_interval = BLOCK_INTERVAL
loop do
begin
if use_block_range
block_api.send(object_method, block_range: range) do |b, n|
block.call b, n
block_interval = BLOCK_INTERVAL
end
else
range.each do |block_num|
block_api.get_block(block_num: block_num) do |b, n|
block.call b.block, b.block.block_id[0..7].to_i(16)
block_interval = BLOCK_INTERVAL
end
end
end
rescue Hive::UnknownError => e
if e.message =~ /Could not find method get_block_range/
use_block_range = false
redo
end
raise e
end
break
end
end
......@@ -325,22 +358,96 @@ module Hive
end
# @private
def get_virtual_ops(types, block_num, block)
def get_virtual_ops(types, block_num, within_shuffle_round, block)
retries = 0
vop_read_ahead = within_shuffle_round ? 1 : MAX_VOP_READ_AHEAD
@virtual_ops_cache ||= {}
@virtual_ops_cache = @virtual_ops_cache.reject do |k, v|
if k < block_num
warn "Found orphaned virtual operations for block_num #{k}: #{v.to_json}" unless @no_warn
true
end
false
end
loop do
get_ops_in_block_options = case account_history_api
vops_found = false
if account_history_api.class == Hive::AccountHistoryApi || @enum_virtual_ops_supported.nil? && @enum_virtual_ops_supported != false
begin
# Use account_history_api.enum_virtual_ops, if supported.
if @virtual_ops_cache.empty? || !@virtual_ops_cache.keys.include?(block_num)
(block_num..(block_num + vop_read_ahead)).each do |block_num|
@virtual_ops_cache[block_num] = []
end
enum_virtual_ops_options = {
block_range_begin: block_num,
block_range_end: block_num + vop_read_ahead,
# TODO Use: mode != :irreversible
include_reversible: true
}
account_history_api.enum_virtual_ops(enum_virtual_ops_options) do |result|
@enum_virtual_ops_supported = true
result.ops.each do |vop|
@virtual_ops_cache[vop.block] << vop
end
end
end
vops_found = true
if !!@virtual_ops_cache[block_num]
@virtual_ops_cache[block_num].each do |vop|
next unless block_num == vop.block
next if types.any? && !types.include?(vop.op.type)
if vop.virtual_op == 0
# require 'pry' ; binding.pry if vop.op.type == 'producer_reward_operation'
warn "Found non-virtual operation (#{vop.op.type}) in enum_virtual_ops result for block: #{block_num}" unless @no_warn
next
end
block.call vop.op, vop.trx_id, block_num
end
@virtual_ops_cache.delete(block_num)
end
rescue Hive::UnknownError => e
if e.message =~ /This API is not supported for account history backed by Chainbase/
warn "Retrying with get_ops_in_block (api does not support enum_virtual_ops)" unless @no_warn
@enum_virtual_ops_supported = false
vops_found = false
else
raise e
end
end
end
break if vops_found
# Fallback to previous method.
warn "Retrying with get_ops_in_block (did not find ops for block #{block_num} using enum_virtual_ops)" unless @no_warn
response = case account_history_api
when Hive::CondenserApi
[block_num, true]
account_history_api.get_ops_in_block(block_num, true)
when Hive::AccountHistoryApi
{
account_history_api.get_ops_in_block(
block_num: block_num,
only_virtual: true
}
only_virtual: true,
# TODO Use: mode != :irreversible
include_reversible: true
)
end
response = account_history_api.get_ops_in_block(*get_ops_in_block_options)
if response.nil? || (result = response.result).nil?
if retries < MAX_RETRY_COUNT
warn "Retrying get_ops_in_block on block #{block_num}" unless @no_warn
......@@ -367,7 +474,7 @@ module Hive
retries = retries + 1
redo
else
warn "unable to find virtual operations for block: #{block_num}"
warn "unable to find virtual operations for block: #{block_num}" unless @no_warn
# raise TooManyRetriesError, "unable to find virtual operations for block: #{block_num}"
end
end
......@@ -375,7 +482,7 @@ module Hive