Commit dff685d0 authored by Dan Notestein's avatar Dan Notestein Committed by Dan Notestein
Browse files

Add stats for compression and decompression time

parent b7387da4
......@@ -44,6 +44,7 @@ fc::optional<int> brotli_quality;
bool enable_deflate = true;
fc::optional<int> deflate_level;
uint32_t starting_block_number = 1;
fc::optional<uint32_t> blocks_to_compress;
std::mutex queue_mutex;
......@@ -56,8 +57,14 @@ uint64_t total_compressed_size = 0;
uint64_t total_uncompressed_size = 0;
uint64_t size_of_start_positions = 0;
uint64_t total_zstd_size = 0;
fc::microseconds total_zstd_compression_time;
fc::microseconds total_zstd_decompression_time;
uint64_t total_brotli_size = 0;
fc::microseconds total_brotli_compression_time;
fc::microseconds total_brotli_decompression_time;
uint64_t total_deflate_size = 0;
fc::microseconds total_deflate_compression_time;
fc::microseconds total_deflate_decompression_time;
std::map<hive::chain::block_log::block_flags, uint32_t> total_count_by_method;
const uint32_t blocks_to_prefetch = 100;
......@@ -105,12 +112,18 @@ void compress_blocks()
try
{
compressed_data zstd_compressed_data;
fc::time_point before = fc::time_point::now();
std::tie(zstd_compressed_data.data, zstd_compressed_data.size) = hive::chain::block_log::compress_block_zstd(uncompressed->uncompressed_block_data.get(), uncompressed->uncompressed_block_size, zstd_level);
fc::time_point after_compress = fc::time_point::now();
hive::chain::block_log::decompress_block_zstd(zstd_compressed_data.data.get(), zstd_compressed_data.size);
fc::time_point after_decompress = fc::time_point::now();
zstd_compressed_data.method = hive::chain::block_log::block_flags::zstd;
{
std::unique_lock<std::mutex> lock(queue_mutex);
total_zstd_size += zstd_compressed_data.size;
total_zstd_compression_time += after_compress - before;
total_zstd_decompression_time += after_decompress - after_compress;
}
compressed_versions.push_back(std::move(zstd_compressed_data));
}
......@@ -124,12 +137,18 @@ void compress_blocks()
try
{
compressed_data brotli_compressed_data;
fc::time_point before = fc::time_point::now();
std::tie(brotli_compressed_data.data, brotli_compressed_data.size) = hive::chain::block_log::compress_block_brotli(uncompressed->uncompressed_block_data.get(), uncompressed->uncompressed_block_size, brotli_quality);
fc::time_point after_compress = fc::time_point::now();
hive::chain::block_log::decompress_block_brotli(brotli_compressed_data.data.get(), brotli_compressed_data.size);
fc::time_point after_decompress = fc::time_point::now();
brotli_compressed_data.method = hive::chain::block_log::block_flags::brotli;
{
std::unique_lock<std::mutex> lock(queue_mutex);
total_brotli_size += brotli_compressed_data.size;
total_brotli_compression_time += after_compress - before;
total_brotli_decompression_time += after_decompress - after_compress;
}
compressed_versions.push_back(std::move(brotli_compressed_data));
}
......@@ -143,12 +162,18 @@ void compress_blocks()
try
{
compressed_data deflate_compressed_data;
fc::time_point before = fc::time_point::now();
std::tie(deflate_compressed_data.data, deflate_compressed_data.size) = hive::chain::block_log::compress_block_deflate(uncompressed->uncompressed_block_data.get(), uncompressed->uncompressed_block_size, deflate_level);
fc::time_point after_compress = fc::time_point::now();
hive::chain::block_log::decompress_block_deflate(deflate_compressed_data.data.get(), deflate_compressed_data.size);
fc::time_point after_decompress = fc::time_point::now();
deflate_compressed_data.method = hive::chain::block_log::block_flags::deflate;
{
std::unique_lock<std::mutex> lock(queue_mutex);
total_deflate_size += deflate_compressed_data.size;
total_deflate_compression_time += after_compress - before;
total_deflate_decompression_time += after_decompress - after_compress;
}
compressed_versions.push_back(std::move(deflate_compressed_data));
}
......@@ -284,14 +309,14 @@ void fill_pending_queue(const fc::path& block_log)
uint32_t head_block_num = log.head()->block_num();
if (blocks_to_compress && *blocks_to_compress > head_block_num - 1)
{
elog("Error: input block log does not contain ${blocks_to_compress} blocks", (blocks_to_compress));
elog("Error: input block log does not contain ${blocks_to_compress} blocks (it's head block number is ${head_block_num})", (blocks_to_compress)(head_block_num));
exit(1);
}
uint32_t stop_at_block = blocks_to_compress.value_or(head_block_num - 1);
ilog("Compressing up to block ${stop_at_block}", (stop_at_block));
uint32_t stop_at_block = blocks_to_compress ? starting_block_number + *blocks_to_compress - 1 : head_block_num - 1;
ilog("Compressing blocks ${starting_block_number} to ${stop_at_block}", (starting_block_number)(stop_at_block));
uint32_t current_block_number = 1;
uint32_t current_block_number = starting_block_number;
while (current_block_number <= stop_at_block)
{
......@@ -355,6 +380,7 @@ int main(int argc, char** argv)
options.add_options()("jobs,j", boost::program_options::value<int>()->default_value(1), "The number of threads to use for compression");
options.add_options()("input-block-log,i", boost::program_options::value<std::string>()->required(), "The directory containing the input block log");
options.add_options()("output-block-log,o", boost::program_options::value<std::string>()->required(), "The directory to contain the compressed block log");
options.add_options()("starting-block-number,s", boost::program_options::value<uint32_t>()->default_value(1), "Start at the given block number (for benchmarking only, values > 1 will generate an unusable block log)");
options.add_options()("block-count,n", boost::program_options::value<uint32_t>(), "Stop after this many blocks");
options.add_options()("help,h", "Print usage instructions");
......@@ -378,6 +404,7 @@ int main(int argc, char** argv)
unsigned jobs = options_map["jobs"].as<int>();
starting_block_number = options_map["starting-block-number"].as<uint32_t>();
if (options_map.count("block-count"))
blocks_to_compress = options_map["block-count"].as<uint32_t>();
......@@ -399,7 +426,6 @@ int main(int argc, char** argv)
fc::path output_block_log_path;
if (options_map.count("output-block-log"))
output_block_log_path = options_map["output-block-log"].as<std::string>();
std::shared_ptr<std::thread> fill_queue_thread = std::make_shared<std::thread>([&](){ fill_pending_queue(input_block_log_path / "block_log"); });
std::shared_ptr<std::thread> drain_queue_thread = std::make_shared<std::thread>([&](){ drain_completed_queue(output_block_log_path / "block_log"); });
std::vector<std::shared_ptr<std::thread>> compress_blocks_threads;
......@@ -410,12 +436,46 @@ int main(int argc, char** argv)
compress_blocks_thread->join();
drain_queue_thread->join();
ilog("Total number of blocks by compression method:");
uint32_t total_blocks_processed = 0;
for (const auto& value : total_count_by_method)
{
ilog(" ${method}: ${count}", ("method", value.first)("count", value.second));
total_blocks_processed += value.second;
}
ilog("Total bytes if all blocks compressed by compression method:");
ilog(" zstd: ${total_zstd_size}", ("total_zstd_size", total_zstd_size + size_of_start_positions));
ilog(" brotli: ${total_brotli_size}", ("total_brotli_size", total_brotli_size + size_of_start_positions));
ilog(" deflate: ${total_deflate_size}", ("total_deflate_size", total_deflate_size + size_of_start_positions));
if (enable_zstd)
{
ilog(" zstd: ${total_zstd_size} bytes, total time: ${total_zstd_compression_time}μs, average time per block: ${average_zstd_time}μs",
("total_zstd_size", total_zstd_size + size_of_start_positions)
(total_zstd_compression_time)
("average_zstd_time", total_zstd_compression_time.count() / total_blocks_processed));
ilog(" decompression total time: ${total_zstd_decompression_time}μs, average time per block: ${average_zstd_decompression_time}μs",
("total_zstd_size", total_zstd_size + size_of_start_positions)
(total_zstd_decompression_time)
("average_zstd_decompression_time", total_zstd_decompression_time.count() / total_blocks_processed));
}
if (enable_brotli)
{
ilog(" brotli: ${total_brotli_size} bytes, total time: ${total_brotli_compression_time}μs, average time per block: ${average_brotli_time}μs",
("total_brotli_size", total_brotli_size + size_of_start_positions)
(total_brotli_compression_time)
("average_brotli_time", total_brotli_compression_time.count() / total_blocks_processed));
ilog(" decompression total time: ${total_brotli_decompression_time}μs, average time per block: ${average_brotli_decompression_time}μs",
("total_brotli_size", total_brotli_size + size_of_start_positions)
(total_brotli_decompression_time)
("average_brotli_decompression_time", total_brotli_decompression_time.count() / total_blocks_processed));
}
if (enable_deflate)
{
ilog(" deflate: ${total_deflate_size} bytes, total time: ${total_deflate_compression_time}μs, average time per block: ${average_deflate_time}μs",
("total_deflate_size", total_deflate_size + size_of_start_positions)
(total_deflate_compression_time)
("average_deflate_time", total_deflate_compression_time.count() / total_blocks_processed));
ilog(" decompression total time: ${total_deflate_decompression_time}μs, average time per block: ${average_deflate_decompression_time}μs",
("total_deflate_size", total_deflate_size + size_of_start_positions)
(total_deflate_decompression_time)
("average_deflate_decompression_time", total_deflate_decompression_time.count() / total_blocks_processed));
}
}
catch ( const std::exception& e )
{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment