Commit bd9d512a authored by Eric Frias's avatar Eric Frias Committed by Dan Notestein
Browse files

Include zstd as a submodule so we have a known recent version.

Improve zstd compression by:
 - supporting pre-computed dictionaries
 - tweaking compression flags to shrink zstd output by omitting
   magic numbers, dictionary ids, etc
(the format of the compressed block log has changed so it can
call out dictionaries, previous compressed block logs will
be unreadable)
parent dff685d0
......@@ -13,3 +13,6 @@
[submodule "schemas"]
path = tests/schemas
url = ../schemas.git
[submodule "libraries/vendor/zstd"]
path = libraries/vendor/zstd
url = https://github.com/facebook/zstd.git
......@@ -220,13 +220,13 @@ else()
set(BROTLI_INCLUDE_DIRS)
endif()
FIND_PACKAGE(Zstd)
if( Zstd_FOUND )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DHAVE_ZSTD")
else()
set(Zstd_LIBRARIES)
set(Zstd_INCLUDE_DIRS)
endif()
# FIND_PACKAGE(Zstd)
# if( Zstd_FOUND )
# else()
# set(Zstd_LIBRARIES)
# set(Zstd_INCLUDE_DIRS)
# endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DHAVE_ZSTD")
FIND_PACKAGE(ZLIB)
if( ZLIB_FOUND )
......
file(GLOB HEADERS "include/hive/chain/*.hpp" "include/hive/chain/util/*.hpp" "include/hive/chain/smt_objects/*.hpp" "include/hive/chain/sps_objects/*.hpp")
add_subdirectory(zstd_dictionaries)
## SORT .cpp by most likely to change / break compile
add_library( hive_chain
......@@ -22,6 +23,7 @@ add_library( hive_chain
shared_authority.cpp
block_log.cpp
block_compression_dictionaries.cpp
generic_custom_operation_interpreter.cpp
......@@ -40,10 +42,10 @@ add_library( hive_chain
${HEADERS}
)
target_link_libraries( hive_chain hive_jsonball hive_protocol fc chainbase hive_schema appbase
${PATCH_MERGE_LIB} ${BROTLI_LIBRARIES} ${Zstd_LIBRARIES} ${ZLIB_LIBRARIES})
target_link_libraries( hive_chain hive_jsonball hive_protocol fc chainbase hive_schema appbase libzstd_static hive_chain_zstd_dictionaries
${PATCH_MERGE_LIB} ${BROTLI_LIBRARIES} ${ZLIB_LIBRARIES})
target_include_directories( hive_chain
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}" "${BROTLI_INCLUDE_DIRS}" "${Zstd_INCLUDE_DIRS}" "${ZLIB_INCLUDE_DIRS}"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}" "${BROTLI_INCLUDE_DIRS}" "${ZLIB_INCLUDE_DIRS}"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" )
if( CLANG_TIDY_EXE )
......
#include <thread>
#include <mutex>
#include <fc/log/logger.hpp>
#include <fc/exception/exception.hpp>
#include <hive/chain/block_compression_dictionaries.hpp>
// we store our dictionaries in compressed form, this is the maximum size
// one will be when decompressed
#define MAX_DICTIONARY_LENGTH (4 << 20)
std::mutex dictionaries_mutex;
struct raw_dictionary_info
{
const void* buffer;
unsigned size;
};
struct decompressed_raw_dictionary_info
{
std::unique_ptr<char[]> buffer;
size_t size;
};
// maps dictionary_number to raw data (zstd compressed zstd dictionaries)
std::map<uint8_t, raw_dictionary_info> raw_dictionaries;
// maps dictionary_number to zstd dictionaries
typedef std::map<uint8_t, decompressed_raw_dictionary_info> decompressed_raw_dictionary_map_t;
decompressed_raw_dictionary_map_t decompressed_raw_dictionaries;
// maps a dictionary_number to the ready-to-use decompression dictionary
std::map<uint8_t, ZSTD_DDict*> decompression_dictionaries;
// maps a (dictionary_number, compression_level) pair to a ready-to-use compression dictionary
std::map<std::pair<uint8_t, int>, ZSTD_CDict*> compression_dictionaries;
// helper function, assumes the upper level function holds the mutex on our maps
const decompressed_raw_dictionary_info& get_decompressed_raw_dictionary(uint8_t dictionary_number)
{
auto decompressed_dictionary_iter = decompressed_raw_dictionaries.find(dictionary_number);
if (decompressed_dictionary_iter == decompressed_raw_dictionaries.end())
{
// we don't. do we have the raw, compressed dictionary?
auto raw_iter = raw_dictionaries.find(dictionary_number);
if (raw_iter == raw_dictionaries.end())
FC_THROW_EXCEPTION(fc::key_not_found_exception, "No dictionary ${dictionary_number} available", (dictionary_number));
// if so, uncompress it
std::unique_ptr<char[]> buffer(new char[MAX_DICTIONARY_LENGTH]);
size_t uncompressed_dictionary_size = ZSTD_decompress(buffer.get(), MAX_DICTIONARY_LENGTH,
raw_iter->second.buffer, raw_iter->second.size);
if (ZSTD_isError(uncompressed_dictionary_size))
FC_THROW("Error decompressing dictionary ${dictionary_number} with zstd", (dictionary_number));
ilog("Decompressing dictionary number ${dictionary_number}, expanded ${compressed_size} to ${uncompressed_dictionary_size}",
(dictionary_number)("compressed_size", raw_iter->second.size)(uncompressed_dictionary_size));
// copy into a memory buffer of exactly the right size
std::unique_ptr<char[]> resized_buffer(new char[uncompressed_dictionary_size]);
memcpy(resized_buffer.get(), buffer.get(), uncompressed_dictionary_size);
// store off the uncompressed version
bool insert_succeeded;
std::tie(decompressed_dictionary_iter, insert_succeeded) = decompressed_raw_dictionaries.insert(std::make_pair(dictionary_number,
decompressed_raw_dictionary_info{std::move(resized_buffer), uncompressed_dictionary_size}));
if (!insert_succeeded)
FC_THROW("Error storing decompressing dictionary ${dictionary_number}", (dictionary_number));
}
return decompressed_dictionary_iter->second;
}
void init_raw_dictionaries();
ZSTD_DDict* get_zstd_decompression_dictionary(uint8_t dictionary_number)
{
init_raw_dictionaries();
std::lock_guard<std::mutex> guard(dictionaries_mutex);
// try to find the dictionary already fully loaded for decompression
auto iter = decompression_dictionaries.find(dictionary_number);
if (iter != decompression_dictionaries.end())
return iter->second;
// it's not there, see if we have, or can create, a decompressed raw dictionary
const decompressed_raw_dictionary_info& decompressed_dictionary = get_decompressed_raw_dictionary(dictionary_number);
// then create a usable decompression dictionary for it
ZSTD_DDict* dictionary = ZSTD_createDDict_byReference(decompressed_dictionary.buffer.get(), decompressed_dictionary.size);
decompression_dictionaries[dictionary_number] = dictionary;
return dictionary;
}
ZSTD_CDict* get_zstd_compression_dictionary(uint8_t dictionary_number, int compression_level)
{
init_raw_dictionaries();
std::lock_guard<std::mutex> guard(dictionaries_mutex);
auto iter = compression_dictionaries.find(std::make_pair(dictionary_number, compression_level));
if (iter != compression_dictionaries.end())
return iter->second;
// it's not there, see if we have, or can create, a decompressed raw dictionary
const decompressed_raw_dictionary_info& decompressed_dictionary = get_decompressed_raw_dictionary(dictionary_number);
ZSTD_CDict* dictionary = ZSTD_createCDict_byReference(decompressed_dictionary.buffer.get(), decompressed_dictionary.size, compression_level);
compression_dictionaries[std::make_pair(dictionary_number, compression_level)] = dictionary;
return dictionary;
}
extern "C"
{
#define DICT(prefixed_num) \
extern unsigned char __ ## prefixed_num ## M_dict_zst[]; \
extern unsigned __ ## prefixed_num ## M_dict_zst_len;
DICT(000);
DICT(001);
DICT(002);
DICT(003);
DICT(004);
DICT(005);
DICT(006);
DICT(007);
DICT(008);
DICT(009);
DICT(010);
DICT(011);
DICT(012);
DICT(013);
DICT(014);
DICT(015);
DICT(016);
DICT(017);
DICT(018);
DICT(019);
DICT(020);
DICT(021);
DICT(022);
DICT(023);
DICT(024);
DICT(025);
DICT(026);
DICT(027);
DICT(028);
DICT(029);
DICT(030);
DICT(031);
DICT(032);
DICT(033);
DICT(034);
DICT(035);
DICT(036);
DICT(037);
DICT(038);
DICT(039);
DICT(040);
DICT(041);
DICT(042);
DICT(043);
DICT(044);
DICT(045);
DICT(046);
DICT(047);
DICT(048);
DICT(049);
DICT(050);
DICT(051);
DICT(052);
DICT(053);
DICT(054);
DICT(055);
DICT(056);
DICT(057);
DICT(058);
DICT(059);
DICT(060);
DICT(061);
DICT(062);
#undef DICT
}
void init_raw_dictionaries()
{
std::lock_guard<std::mutex> guard(dictionaries_mutex);
bool initialized = false;
if (initialized)
return;
initialized = true;
#define DICT(num, prefixed_num) \
raw_dictionaries[num] = {(const void*)__ ## prefixed_num ## M_dict_zst, (unsigned)__ ## prefixed_num ## M_dict_zst_len};
DICT(0, 000);
DICT(1, 001);
DICT(2, 002);
DICT(3, 003);
DICT(4, 004);
DICT(5, 005);
DICT(6, 006);
DICT(7, 007);
DICT(8, 008);
DICT(9, 009);
DICT(10, 010);
DICT(11, 011);
DICT(12, 012);
DICT(13, 013);
DICT(14, 014);
DICT(15, 015);
DICT(16, 016);
DICT(17, 017);
DICT(18, 018);
DICT(19, 019);
DICT(20, 020);
DICT(21, 021);
DICT(22, 022);
DICT(23, 023);
DICT(24, 024);
DICT(25, 025);
DICT(26, 026);
DICT(27, 027);
DICT(28, 028);
DICT(29, 029);
DICT(30, 030);
DICT(31, 031);
DICT(32, 032);
DICT(33, 033);
DICT(34, 034);
DICT(35, 035);
DICT(36, 036);
DICT(37, 037);
DICT(38, 038);
DICT(39, 039);
DICT(40, 040);
DICT(41, 041);
DICT(42, 042);
DICT(43, 043);
DICT(44, 044);
DICT(45, 045);
DICT(46, 046);
DICT(47, 047);
DICT(48, 048);
DICT(49, 049);
DICT(50, 050);
DICT(51, 051);
DICT(52, 052);
DICT(53, 053);
DICT(54, 054);
DICT(55, 055);
DICT(56, 056);
DICT(57, 057);
DICT(58, 058);
DICT(59, 059);
DICT(60, 060);
DICT(61, 061);
DICT(62, 062);
}
This diff is collapsed.
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#define DICTIONARY_NUMBER_FROM_BLOCK_NUMBER(x) (x / 1000000)
ZSTD_CDict* get_zstd_compression_dictionary(uint8_t dictionary_number, int compression_level);
ZSTD_DDict* get_zstd_decompression_dictionary(uint8_t dictionary_number);
......@@ -2,6 +2,11 @@
#include <fc/filesystem.hpp>
#include <hive/protocol/block.hpp>
struct ZSTD_CCtx_s;
typedef struct ZSTD_CCtx_s ZSTD_CCtx;
struct ZSTD_DCtx_s;
typedef struct ZSTD_DCtx_s ZSTD_DCtx;
namespace hive { namespace chain {
using namespace hive::protocol;
......@@ -35,13 +40,24 @@ namespace hive { namespace chain {
class block_log {
public:
typedef uint8_t block_flags_t;
// in the block log (and index), the positions are stored as 64-bit integers. We'll use the lower
// 48-bits as the actual position, and the upper 16 as flags that tell us how the block is stored
// hi lo|hi lo|hi | | | | | lo|
// cc.....d|<-dict->|<--------------------- position -------------------->|
// cc = block_flags, two bits specifying the compression method, or uncompressed
// d = one bit, if 1 the block uses a custom dictionary
// dict = the number specifying the dictionary used to compress the block, if d = 1, otherwise undefined
// . = unused
enum class block_flags {
uncompressed = 0,
deflate = 1,
brotli = 2,
zstd = 3
};
struct block_attributes_t {
block_flags flags = block_flags::uncompressed;
fc::optional<uint8_t> dictionary_number;
};
block_log();
~block_log();
......@@ -53,12 +69,12 @@ namespace hive { namespace chain {
void close();
bool is_open()const;
uint64_t append( const signed_block& b );
uint64_t append_raw(const char* raw_block_data, size_t raw_block_size, block_flags_t flags);
uint64_t append(const signed_block& b);
uint64_t append_raw(const char* raw_block_data, size_t raw_block_size, block_attributes_t flags);
void flush();
std::tuple<std::unique_ptr<char[]>, size_t, block_flags_t> read_raw_block_data_by_num(uint32_t block_num) const;
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_raw_block(std::tuple<std::unique_ptr<char[]>, size_t, block_log::block_flags_t>&& raw_block_data_tuple);
std::tuple<std::unique_ptr<char[]>, size_t, block_attributes_t> read_raw_block_data_by_num(uint32_t block_num) const;
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_raw_block(std::tuple<std::unique_ptr<char[]>, size_t, block_attributes_t>&& raw_block_data_tuple);
optional<signed_block> read_block_by_num( uint32_t block_num )const;
optional<signed_block_header> read_block_header_by_num( uint32_t block_num )const;
......@@ -68,19 +84,24 @@ namespace hive { namespace chain {
const boost::shared_ptr<signed_block> head() const;
void set_compression(bool enabled);
static std::tuple<std::unique_ptr<char[]>, size_t> compress_block_zstd(const char* uncompressed_block_data, size_t uncompressed_block_size, fc::optional<int> compression_level = fc::optional<int>());
static std::tuple<std::unique_ptr<char[]>, size_t> compress_block_zstd(const char* uncompressed_block_data, size_t uncompressed_block_size, fc::optional<uint8_t> dictionary_number,
fc::optional<int> compression_level = fc::optional<int>(),
fc::optional<ZSTD_CCtx*> compression_context = fc::optional<ZSTD_CCtx*>());
static std::tuple<std::unique_ptr<char[]>, size_t> compress_block_brotli(const char* uncompressed_block_data, size_t uncompressed_block_size, fc::optional<int> compression_quality = fc::optional<int>());
static std::tuple<std::unique_ptr<char[]>, size_t> compress_block_deflate(const char* uncompressed_block_data, size_t uncompressed_block_size, fc::optional<int> compression_level = fc::optional<int>());
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_block_zstd(const char* compressed_block_data, size_t compressed_block_size);
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_block_zstd(const char* compressed_block_data, size_t compressed_block_size,
fc::optional<uint8_t> dictionary_number = fc::optional<int>(),
fc::optional<ZSTD_DCtx*> decompression_context_for_reuse = fc::optional<ZSTD_DCtx*>());
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_block_brotli(const char* compressed_block_data, size_t compressed_block_size);
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_block_deflate(const char* compressed_block_data, size_t compressed_block_size);
private:
void construct_index( bool resume = false, uint64_t index_pos = 0 );
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_raw_block(const char* raw_block_data, size_t raw_block_size, block_log::block_flags_t flags);
static std::tuple<std::unique_ptr<char[]>, size_t> decompress_raw_block(const char* raw_block_data, size_t raw_block_size, block_attributes_t attributes);
std::unique_ptr<detail::block_log_impl> my;
};
} }
FC_REFLECT_ENUM(hive::chain::block_log::block_flags, (uncompressed)(deflate)(brotli)(zstd))
FC_REFLECT(hive::chain::block_log::block_attributes_t, (flags)(dictionary_number))
SET(ZSTD_DICTIONARY_C_FILES)
FIND_PROGRAM(RESOURCE_COMPILER xxd)
FOREACH(DICTIONARY_NUM RANGE 62)
STRING(LENGTH "${DICTIONARY_NUM}" DICTIONARY_NUM_LEN)
MATH(EXPR LEADING_ZEROS_NEEDED "2 - ${DICTIONARY_NUM_LEN}")
SET(DICTIONARY_NUM_STRING "${DICTIONARY_NUM}")
FOREACH(DUMMY RANGE ${LEADING_ZEROS_NEEDED})
SET(DICTIONARY_NUM_STRING "0${DICTIONARY_NUM_STRING}")
ENDFOREACH()
SET(ZSTD_DICTIONARY_SOURCE_FILENAME "${DICTIONARY_NUM_STRING}M.dict.zst")
MESSAGE("Looking at ${DICTIONARY_NUM_STRING}")
SET(ZSTD_DICTIONARY_C_FILENAME "${DICTIONARY_NUM_STRING}M.c")
LIST(APPEND ZSTD_DICTIONARY_C_FILES ${ZSTD_DICTIONARY_C_FILENAME})
ADD_CUSTOM_COMMAND(OUTPUT ${ZSTD_DICTIONARY_C_FILENAME}
DEPENDS ${ZSTD_DICTIONARY_SOURCE_FILENAME}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${RESOURCE_COMPILER} -i ${ZSTD_DICTIONARY_SOURCE_FILENAME} ${CMAKE_CURRENT_BINARY_DIR}/${ZSTD_DICTIONARY_C_FILENAME}
COMMENT "Compiling ${ZSTD_DICTIONARY_SOURCE_FILENAME} to .c")
ENDFOREACH()
add_library( hive_chain_zstd_dictionaries
${ZSTD_DICTIONARY_C_FILES} )
if( CLANG_TIDY_EXE )
set_target_properties(
hive_chain_zstd_dictionaries PROPERTIES
CXX_CLANG_TIDY "${DO_CLANG_TIDY}"
)
endif( CLANG_TIDY_EXE )
INSTALL( TARGETS
hive_chain_zstd_dictionaries
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
)
......@@ -28,12 +28,20 @@ if (NOT DEFINED BZIP2_ROOT_DIR)
endif()
endif()
file( GLOB children RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} * )
foreach( child ${children} )
if( IS_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/${child}" )
if( EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${child}/CMakeLists.txt" )
add_subdirectory( "${child}" )
endif()
endif()
endforeach()
# file( GLOB children RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} * )
# foreach( child ${children} )
# if( IS_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/${child}" )
# if( EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/${child}/CMakeLists.txt" )
# add_subdirectory( "${child}" )
# endif()
# endif()
# endforeach()
add_subdirectory(rocksdb)
SET(ZSTD_BUILD_SHARED OFF)
SET(ZSTD_BUILD_STATIC ON)
SET(ZSTD_BUILD_CONTRIB OFF)
SET(ZSTD_BUILD_TESTS OFF)
SET(ZSTD_BUILD_PROGRAMS OFF CACHE BOOL "Build ZSTD programs")
add_subdirectory(zstd/build/cmake)
Subproject commit e47e674cd09583ff0503f0f6defd6d23d8b718d3
#include <fc/log/logger.hpp>
#include <fc/crypto/hex.hpp>
#include <fc/filesystem.hpp>
#include <hive/chain/block_log.hpp>
......@@ -9,6 +10,7 @@
#include <chrono>
#include <memory>
#include <iostream>
#include <fstream>
#include <mutex>
#include <condition_variable>
#include <queue>
......@@ -34,7 +36,7 @@ struct compressed_block
uint32_t block_number;
size_t compressed_block_size = 0;
std::unique_ptr<char[]> compressed_block_data;
hive::chain::block_log::block_flags_t flags = 0;
hive::chain::block_log::block_attributes_t attributes;
};
bool enable_zstd = true;
......@@ -46,6 +48,7 @@ fc::optional<int> deflate_level;
uint32_t starting_block_number = 1;
fc::optional<uint32_t> blocks_to_compress;
fc::optional<fc::path> raw_block_output_path;
std::mutex queue_mutex;
std::condition_variable queue_condition_variable;
......@@ -71,6 +74,11 @@ const uint32_t blocks_to_prefetch = 100;
void compress_blocks()
{
#ifdef HAVE_ZSTD
// each compression thread gets its own context
ZSTD_CCtx* zstd_compression_context = ZSTD_createCCtx();
ZSTD_DCtx* zstd_decompression_context = ZSTD_createDCtx();
#endif
while (true)
{
// wait until there is work in the pending queue
......@@ -104,20 +112,28 @@ void compress_blocks()
size_t size;
std::unique_ptr<char[]> data;
hive::chain::block_log::block_flags method;
fc::optional<uint8_t> dictionary_number;
};
std::vector<compressed_data> compressed_versions;
fc::optional<uint8_t> dictionary_number_to_use = std::min<uint8_t>(uncompressed->block_number / 1000000, 62);
// zstd
if (enable_zstd)
try
{
compressed_data zstd_compressed_data;
fc::time_point before = fc::time_point::now();
std::tie(zstd_compressed_data.data, zstd_compressed_data.size) = hive::chain::block_log::compress_block_zstd(uncompressed->uncompressed_block_data.get(), uncompressed->uncompressed_block_size, zstd_level);
//idump((uncompressed->block_number)(uncompressed->uncompressed_block_size));
std::tie(zstd_compressed_data.data, zstd_compressed_data.size) = hive::chain::block_log::compress_block_zstd(uncompressed->uncompressed_block_data.get(), uncompressed->uncompressed_block_size, dictionary_number_to_use, zstd_level, zstd_compression_context);
//idump((fc::to_hex(zstd_compressed_data.data.get(), zstd_compressed_data.size))(uncompressed->uncompressed_block_size)(zstd_compressed_data.size));
//idump((zstd_compressed_data.size));
fc::time_point after_compress = fc::time_point::now();
hive::chain::block_log::decompress_block_zstd(zstd_compressed_data.data.get(), zstd_compressed_data.size);
hive::chain::block_log::decompress_block_zstd(zstd_compressed_data.data.get(), zstd_compressed_data.size, dictionary_number_to_use, zstd_decompression_context);
fc::time_point after_decompress = fc::time_point::now();
zstd_compressed_data.method = hive::chain::block_log::block_flags::zstd;
zstd_compressed_data.dictionary_number = dictionary_number_to_use;
{
std::unique_lock<std::mutex> lock(queue_mutex);
......@@ -192,7 +208,8 @@ void compress_blocks()
if (compressed_versions.front().size < uncompressed->uncompressed_block_size)
{
++total_count_by_method[compressed_versions.front().method];
compressed->flags = (hive::chain::block_log::block_flags_t)compressed_versions.front().method;
compressed->attributes.flags = compressed_versions.front().method;
compressed->attributes.dictionary_number = compressed_versions.front().dictionary_number;
compressed->compressed_block_size = compressed_versions.front().size;
compressed->compressed_block_data = std::move(compressed_versions.front().data);
}
......@@ -201,7 +218,7 @@ void compress_blocks()
++total_count_by_method[hive::chain::block_log::block_flags::uncompressed];
compressed->compressed_block_size = uncompressed->uncompressed_block_size;
compressed->compressed_block_data = std::move(uncompressed->uncompressed_block_data);
compressed->flags = (hive::chain::block_log::block_flags_t)hive::chain::block_log::block_flags::uncompressed;
compressed->attributes.flags = hive::chain::block_log::block_flags::uncompressed;
}
}
else
......@@ -209,7 +226,7 @@ void compress_blocks()
++total_count_by_method[hive::chain::block_log::block_flags::uncompressed];
compressed->compressed_block_size = uncompressed->uncompressed_block_size;
compressed->compressed_block_data = std::move(uncompressed->uncompressed_block_data);
compressed->flags = (hive::chain::block_log::block_flags_t)hive::chain::block_log::block_flags::uncompressed;
compressed->attributes.flags = hive::chain::block_log::block_flags::uncompressed;
}
{
......@@ -274,7 +291,7 @@ void drain_completed_queue(const fc::path& block_log)
dlog("writer thread writing compressed block ${block_number} to the compressed block log", ("block_number", compressed->block_number));
// write it out
log.append_raw(compressed->compressed_block_data.get(), compressed->compressed_block_size, compressed->flags);
log.append_raw(compressed->compressed_block_data.get(), compressed->compressed_block_size, compressed->attributes);
if (compressed->block_number % 100000 == 0)
{
......@@ -307,6 +324,7 @@ void fill_pending_queue(const fc::path& block_log)
exit(1);
}
uint32_t head_block_num = log.head()->block_num();
idump((head_block_num));
if (blocks_to_compress && *blocks_to_compress > head_block_num - 1)
{
elog("Error: input block log does not contain ${blocks_to_compress} blocks (it's head block number is ${head_block_num})", (blocks_to_compress)(head_block_num));
......@@ -335,6 +353,18 @@ void fill_pending_queue(const fc::path& block_log)
uncompressed_block->uncompressed_block_size = std::get<1>(raw_block_data);
uncompressed_block->uncompressed_block_data = std::get<0>(std::move(raw_block_data));
idump((uncompressed_block->uncompressed_block_size));