diff --git a/include/chainbase.hpp b/include/chainbase/chainbase.hpp similarity index 86% rename from include/chainbase.hpp rename to include/chainbase/chainbase.hpp index 0790e59519ec588b3254b69e5d0409c624f969f9..b9f59799ccb0ce532bd62c92005293b8c01b5753 100644 --- a/include/chainbase.hpp +++ b/include/chainbase/chainbase.hpp @@ -7,17 +7,27 @@ #include <boost/interprocess/containers/deque.hpp> #include <boost/interprocess/containers/string.hpp> #include <boost/interprocess/allocators/allocator.hpp> +#include <boost/interprocess/sync/interprocess_sharable_mutex.hpp> +#include <boost/interprocess/sync/sharable_lock.hpp> +#include <boost/interprocess/sync/file_lock.hpp> #include <boost/multi_index_container.hpp> +#include <boost/chrono.hpp> #include <boost/filesystem.hpp> -#include <boost/lexical_cast.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/thread.hpp> #include <boost/throw_exception.hpp> -#include <stdexcept> -#include <typeindex> +#include <array> +#include <atomic> #include <fstream> +#include <stdexcept> +#include <typeindex> +#ifndef CHAINBASE_DEFAULT_NUM_RW_LOCKS + #define CHAINBASE_DEFAULT_NUM_RW_LOCKS 10 +#endif namespace chainbase { @@ -54,6 +64,9 @@ namespace chainbase { } }; + typedef boost::interprocess::interprocess_sharable_mutex read_write_mutex; + typedef boost::interprocess::sharable_lock< read_write_mutex > read_lock; + typedef boost::unique_lock< read_write_mutex > write_lock; class database; @@ -93,7 +106,7 @@ namespace chainbase { * This macro must be used at global scope and OBJECT_TYPE and INDEX_TYPE must be fully qualified */ #define CHAINBASE_SET_INDEX_TYPE( OBJECT_TYPE, INDEX_TYPE ) \ - namespace chainbase { template<> struct get_index_type<OBJECT_TYPE> { typedef INDEX_TYPE type; }; } + namespace chainbase { template<> struct get_index_type<OBJECT_TYPE> { typedef INDEX_TYPE type; }; } #define CHAINBASE_DEFAULT_CONSTRUCTOR( OBJECT_TYPE ) \ template<typename Constructor, typename Allocator> \ @@ -117,7 +130,7 @@ namespace chainbase { :_stack(a),_indices( a ),_size_of_value_type( sizeof(typename MultiIndexType::node_type) ),_size_of_this(sizeof(*this)){} void validate()const { - if( sizeof(typename MultiIndexType::node_type) != _size_of_value_type || sizeof(*this) != _size_of_this ) + if( sizeof(typename MultiIndexType::node_type) != _size_of_value_type || sizeof(*this) != _size_of_this ) BOOST_THROW_EXCEPTION( std::runtime_error("content of memory does not match data expected by executable") ); } @@ -137,7 +150,7 @@ namespace chainbase { auto insert_result = _indices.emplace( constructor, _indices.get_allocator() ); if( !insert_result.second ) { - throw std::logic_error("could not insert object, most likely a uniqueness constraint was violated"); + BOOST_THROW_EXCEPTION( std::logic_error("could not insert object, most likely a uniqueness constraint was violated") ); } ++_next_id; @@ -559,6 +572,33 @@ namespace chainbase { }; + class read_write_mutex_manager + { + public: + read_write_mutex_manager() + { + _current_lock = 0; + } + + ~read_write_mutex_manager(){} + + void next_lock() + { + _current_lock++; + new( &_locks[ _current_lock % 10 ] ) read_write_mutex(); + } + + read_write_mutex& current_lock() + { + return _locks[ _current_lock % 10 ]; + } + + private: + std::array< read_write_mutex, 10 > _locks; + std::atomic< uint32_t > _current_lock; + }; + + /** * This class */ @@ -570,7 +610,7 @@ namespace chainbase { read_write = 1 }; - void open( const bfs::path& dir, int write = read_only, uint64_t shared_file_size = 0 ); + void open( const bfs::path& dir, uint32_t write = read_only, uint64_t shared_file_size = 0 ); void close(); void flush(); void wipe( const bfs::path& dir ); @@ -663,7 +703,6 @@ namespace chainbase { auto new_index = new index<index_type>( *idx_ptr ); _index_map[ type_id ].reset( new_index ); _index_list.push_back( new_index ); - _type_name_to_id.emplace( type_name, type_id ); } auto get_segment_manager() -> decltype( ((bip::managed_mapped_file*)nullptr)->get_segment_manager()) { @@ -674,6 +713,8 @@ namespace chainbase { const generic_index<MultiIndexType>& get_index()const { typedef generic_index<MultiIndexType> index_type; typedef index_type* index_type_ptr; + assert( _index_map.size() > index_type::value_type::type_id ); + assert( _index_map[index_type::value_type::type_id] ); return *index_type_ptr( _index_map[index_type::value_type::type_id]->get() ); } @@ -681,14 +722,18 @@ namespace chainbase { auto get_index()const -> decltype( ((generic_index<MultiIndexType>*)( nullptr ))->indicies().template get<ByIndex>() ) { typedef generic_index<MultiIndexType> index_type; typedef index_type* index_type_ptr; + assert( _index_map.size() > index_type::value_type::type_id ); + assert( _index_map[index_type::value_type::type_id] ); return index_type_ptr( _index_map[index_type::value_type::type_id]->get() )->indicies().template get<ByIndex>(); } template<typename MultiIndexType> generic_index<MultiIndexType>& get_mutable_index() { - typedef generic_index<MultiIndexType> index_type; - typedef index_type* index_type_ptr; - return *index_type_ptr( _index_map[index_type::value_type::type_id]->get() ); + typedef generic_index<MultiIndexType> index_type; + typedef index_type* index_type_ptr; + assert( _index_map.size() > index_type::value_type::type_id ); + assert( _index_map[index_type::value_type::type_id] ); + return *index_type_ptr( _index_map[index_type::value_type::type_id]->get() ); } template< typename ObjectType, typename IndexedByType, typename CompatibleKey > @@ -720,7 +765,7 @@ namespace chainbase { } template< typename ObjectType > - const ObjectType& get( oid< ObjectType > key = oid< ObjectType >() )const + const ObjectType& get( const oid< ObjectType >& key = oid< ObjectType >() )const { auto obj = find< ObjectType >( key ); if( !obj ) BOOST_THROW_EXCEPTION( std::out_of_range( "unknown key") ); @@ -748,22 +793,67 @@ namespace chainbase { return get_mutable_index<index_type>().emplace( std::forward<Constructor>(con) ); } + template< typename Lambda > + auto with_read_lock( Lambda&& callback, uint64_t wait_micro = 1000000 ) -> decltype( (*(Lambda*)nullptr)() ) + { + read_lock lock( _rw_manager->current_lock(), bip::defer_lock_type() ); + + if( !wait_micro ) + { + lock.lock(); + } + else + { + + if( !lock.timed_lock( boost::posix_time::microsec_clock::local_time() + boost::posix_time::microseconds( wait_micro ) ) ) + BOOST_THROW_EXCEPTION( std::runtime_error( "unable to acquire lock" ) ); + } + + return callback(); + } + + template< typename Lambda > + auto with_write_lock( Lambda&& callback, uint64_t wait_micro = 1000000 ) -> decltype( (*(Lambda*)nullptr)() ) + { + if( _read_only ) + BOOST_THROW_EXCEPTION( std::logic_error( "cannot acquire write lock on read-only process" ) ); + + write_lock lock( _rw_manager->current_lock(), boost::defer_lock_t() ); + + if( !wait_micro ) + { + lock.lock(); + } + else + { + while( !lock.timed_lock( boost::posix_time::microsec_clock::local_time() + boost::posix_time::microseconds( wait_micro ) ) ) + { + _rw_manager->next_lock(); + lock = write_lock( _rw_manager->current_lock(), boost::defer_lock_t() ); + } + } + + return callback(); + } + private: - unique_ptr<bip::managed_mapped_file> _segment; - bool _read_only = false; + unique_ptr<bip::managed_mapped_file> _segment; + unique_ptr<bip::managed_mapped_file> _meta; + read_write_mutex_manager* _rw_manager = nullptr; + bool _read_only = false; + bip::file_lock _flock; /** * This is a sparse list of known indicies kept to accelerate creation of undo sessions */ - vector<abstract_index*> _index_list; + vector<abstract_index*> _index_list; + /** * This is a full map (size 2^16) of all possible index designed for constant time lookup */ - vector<unique_ptr<abstract_index>> _index_map; - - bfs::path _data_dir; + vector<unique_ptr<abstract_index>> _index_map; - boost::container::flat_map< std::string, uint16_t > _type_name_to_id; + bfs::path _data_dir; }; diff --git a/src/chainbase.cpp b/src/chainbase.cpp index cd96468810d283c51a9481739bbf91245146bf30..d92989c9e612d904231f2943f4de9afd3bf72695 100644 --- a/src/chainbase.cpp +++ b/src/chainbase.cpp @@ -1,4 +1,4 @@ -#include <chainbase.hpp> +#include <chainbase/chainbase.hpp> #include <boost/array.hpp> #include <iostream> @@ -30,23 +30,31 @@ namespace chainbase { bool windows = false; }; - void database::open( const bfs::path& dir, int flags, uint64_t shared_file_size ) { + void database::open( const bfs::path& dir, uint32_t flags, uint64_t shared_file_size ) { bool write = flags & database::read_write; if( !bfs::exists( dir ) ) { if( !write ) BOOST_THROW_EXCEPTION( std::runtime_error( "database file not found at " + dir.native() ) ); } - + bfs::create_directories( dir ); if( _data_dir != dir ) close(); _data_dir = dir; - auto abs_path = bfs::absolute( dir / "shared_memory" ); + auto abs_path = bfs::absolute( dir / "shared_memory.bin" ); if( bfs::exists( abs_path ) ) { - if( write ) { + if( write ) + { + auto existing_file_size = bfs::file_size( abs_path ); + if( shared_file_size > existing_file_size ) + { + if( !bip::managed_mapped_file::grow( abs_path.generic_string().c_str(), shared_file_size - existing_file_size ) ) + BOOST_THROW_EXCEPTION( std::runtime_error( "could not grow database file to requested size." ) ); + } + _segment.reset( new bip::managed_mapped_file( bip::open_only, abs_path.generic_string().c_str() ) ); @@ -67,24 +75,59 @@ namespace chainbase { ) ); _segment->find_or_construct< environment_check >( "environment" )(); } + + + abs_path = bfs::absolute( dir / "shared_memory.meta" ); + + if( bfs::exists( abs_path ) ) + { + _meta.reset( new bip::managed_mapped_file( bip::open_only, abs_path.generic_string().c_str() + ) ); + + _rw_manager = _meta->find< read_write_mutex_manager >( "rw_manager" ).first; + if( !_rw_manager ) + BOOST_THROW_EXCEPTION( std::runtime_error( "could not find read write lock manager" ) ); + } + else + { + _meta.reset( new bip::managed_mapped_file( bip::create_only, + abs_path.generic_string().c_str(), sizeof( read_write_mutex_manager ) * 2 + ) ); + + _rw_manager = _meta->find_or_construct< read_write_mutex_manager >( "rw_manager" )(); + } + + if( write ) + { + _flock = bip::file_lock( abs_path.generic_string().c_str() ); + if( !_flock.try_lock() ) + BOOST_THROW_EXCEPTION( std::runtime_error( "could not gain write access to the shared memory file" ) ); + } } void database::flush() { if( _segment ) _segment->flush(); + if( _meta ) + _meta->flush(); } void database::close() { _segment.reset(); + _meta.reset(); _data_dir = bfs::path(); } void database::wipe( const bfs::path& dir ) { _segment.reset(); - bfs::remove_all( dir / "shared_memory" ); + _meta.reset(); + bfs::remove_all( dir / "shared_memory.bin" ); + bfs::remove_all( dir / "shared_memory.meta" ); _data_dir = bfs::path(); + _index_list.clear(); + _index_map.clear(); } void database::undo() @@ -94,6 +137,7 @@ namespace chainbase { item->undo(); } } + void database::squash() { for( auto& item : _index_list ) @@ -101,6 +145,7 @@ namespace chainbase { item->squash(); } } + void database::commit( int64_t revision ) { for( auto& item : _index_list ) diff --git a/test/test.cpp b/test/test.cpp index 2282238e746b641d5c22c6c5f9f4dd9593f5b565..6efbd0bb7a5cec8cd5680de59dd336be621c0409 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -1,7 +1,7 @@ #define BOOST_TEST_MODULE chainbase test #include <boost/test/unit_test.hpp> -#include <chainbase.hpp> +#include <chainbase/chainbase.hpp> #include <boost/multi_index_container.hpp> #include <boost/multi_index/ordered_index.hpp> @@ -15,12 +15,12 @@ using namespace boost::multi_index; //BOOST_TEST_SUITE( serialization_tests, clean_database_fixture ) struct book : public chainbase::object<0, book> { - + template<typename Constructor, typename Allocator> book( Constructor&& c, Allocator&& a ) { c(*this); } - + id_type id; int a = 0; int b = 1;