Skip to content
Snippets Groups Projects
Commit 5374cc07 authored by Michael Vandeberg's avatar Michael Vandeberg
Browse files

Add rw locking

parent 0cfef4c4
No related branches found
No related tags found
No related merge requests found
...@@ -7,17 +7,25 @@ ...@@ -7,17 +7,25 @@
#include <boost/interprocess/containers/deque.hpp> #include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp> #include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp> #include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/interprocess_upgradable_mutex.hpp>
#include <boost/multi_index_container.hpp> #include <boost/multi_index_container.hpp>
#include <boost/chrono.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/throw_exception.hpp> #include <boost/throw_exception.hpp>
#include <stdexcept>
#include <typeindex> #include <array>
#include <atomic>
#include <fstream> #include <fstream>
#include <stdexcept>
#include <typeindex>
#ifndef CHAINBASE_DEFAULT_NUM_RW_LOCKS
#define CHAINBASE_DEFAULT_NUM_RW_LOCKS 10
#endif
namespace chainbase { namespace chainbase {
...@@ -31,6 +39,10 @@ namespace chainbase { ...@@ -31,6 +39,10 @@ namespace chainbase {
typedef bip::basic_string< char, std::char_traits< char >, allocator< char > > shared_string; typedef bip::basic_string< char, std::char_traits< char >, allocator< char > > shared_string;
typedef boost::interprocess::interprocess_upgradable_mutex read_write_mutex;
typedef boost::shared_lock< read_write_mutex > read_lock;
typedef boost::unique_lock< read_write_mutex > write_lock;
class database; class database;
/** /**
...@@ -69,7 +81,7 @@ namespace chainbase { ...@@ -69,7 +81,7 @@ namespace chainbase {
* This macro must be used at global scope and OBJECT_TYPE and INDEX_TYPE must be fully qualified * This macro must be used at global scope and OBJECT_TYPE and INDEX_TYPE must be fully qualified
*/ */
#define CHAINBASE_SET_INDEX_TYPE( OBJECT_TYPE, INDEX_TYPE ) \ #define CHAINBASE_SET_INDEX_TYPE( OBJECT_TYPE, INDEX_TYPE ) \
namespace chainbase { template<> struct get_index_type<OBJECT_TYPE> { typedef INDEX_TYPE type; }; } namespace chainbase { template<> struct get_index_type<OBJECT_TYPE> { typedef INDEX_TYPE type; }; }
#define CHAINBASE_DEFAULT_CONSTRUCTOR( OBJECT_TYPE ) \ #define CHAINBASE_DEFAULT_CONSTRUCTOR( OBJECT_TYPE ) \
template<typename Constructor, typename Allocator> \ template<typename Constructor, typename Allocator> \
...@@ -93,7 +105,7 @@ namespace chainbase { ...@@ -93,7 +105,7 @@ namespace chainbase {
:_stack(a),_indices( a ),_size_of_value_type( sizeof(typename MultiIndexType::node_type) ),_size_of_this(sizeof(*this)){} :_stack(a),_indices( a ),_size_of_value_type( sizeof(typename MultiIndexType::node_type) ),_size_of_this(sizeof(*this)){}
void validate()const { void validate()const {
if( sizeof(typename MultiIndexType::node_type) != _size_of_value_type || sizeof(*this) != _size_of_this ) if( sizeof(typename MultiIndexType::node_type) != _size_of_value_type || sizeof(*this) != _size_of_this )
BOOST_THROW_EXCEPTION( std::runtime_error("content of memory does not match data expected by executable") ); BOOST_THROW_EXCEPTION( std::runtime_error("content of memory does not match data expected by executable") );
} }
...@@ -113,7 +125,7 @@ namespace chainbase { ...@@ -113,7 +125,7 @@ namespace chainbase {
auto insert_result = _indices.emplace( constructor, _indices.get_allocator() ); auto insert_result = _indices.emplace( constructor, _indices.get_allocator() );
if( !insert_result.second ) { if( !insert_result.second ) {
throw std::logic_error("could not insert object, most likely a uniqueness constraint was violated"); BOOST_THROW_EXCEPTION( std::logic_error("could not insert object, most likely a uniqueness constraint was violated") );
} }
++_next_id; ++_next_id;
...@@ -535,6 +547,33 @@ namespace chainbase { ...@@ -535,6 +547,33 @@ namespace chainbase {
}; };
class read_write_mutex_manager
{
public:
read_write_mutex_manager()
{
_current_lock = 0;
}
~read_write_mutex_manager(){}
void next_lock()
{
_current_lock++;
new( &_locks[ _current_lock ] ) read_write_mutex();
}
read_write_mutex& current_lock()
{
return _locks[ _current_lock ];
}
private:
std::array< read_write_mutex, 10 > _locks;
std::atomic< uint32_t > _current_lock;
};
/** /**
* This class * This class
*/ */
...@@ -639,7 +678,6 @@ namespace chainbase { ...@@ -639,7 +678,6 @@ namespace chainbase {
auto new_index = new index<index_type>( *idx_ptr ); auto new_index = new index<index_type>( *idx_ptr );
_index_map[ type_id ].reset( new_index ); _index_map[ type_id ].reset( new_index );
_index_list.push_back( new_index ); _index_list.push_back( new_index );
_type_name_to_id.emplace( type_name, type_id );
} }
auto get_segment_manager() -> decltype( ((bip::managed_mapped_file*)nullptr)->get_segment_manager()) { auto get_segment_manager() -> decltype( ((bip::managed_mapped_file*)nullptr)->get_segment_manager()) {
...@@ -696,7 +734,7 @@ namespace chainbase { ...@@ -696,7 +734,7 @@ namespace chainbase {
} }
template< typename ObjectType > template< typename ObjectType >
const ObjectType& get( oid< ObjectType > key = oid< ObjectType >() )const const ObjectType& get( const oid< ObjectType >& key = oid< ObjectType >() )const
{ {
auto obj = find< ObjectType >( key ); auto obj = find< ObjectType >( key );
if( !obj ) BOOST_THROW_EXCEPTION( std::out_of_range( "unknown key") ); if( !obj ) BOOST_THROW_EXCEPTION( std::out_of_range( "unknown key") );
...@@ -724,22 +762,55 @@ namespace chainbase { ...@@ -724,22 +762,55 @@ namespace chainbase {
return get_mutable_index<index_type>().emplace( std::forward<Constructor>(con) ); return get_mutable_index<index_type>().emplace( std::forward<Constructor>(con) );
} }
template< typename Lambda >
void with_read_lock( Lambda&& callback, uint64_t wait_micro = 1000000 )
{
read_lock lock( _rw_manager->current_lock(), boost::defer_lock_t() );
if( !wait_micro )
lock.lock();
else
lock.try_lock_for( boost::chrono::microseconds( wait_micro ) );
if( lock.owns_lock() )
callback();
}
template< typename Lambda >
void with_write_lock( Lambda&& callback, uint64_t wait_micro = 1000000 )
{
write_lock lock( _rw_manager->current_lock() );
/*if( !wait_micro )
lock.lock();
else
{
lock.try_lock_for( boost::chrono::microseconds( wait_micro ) );
while( !lock.owns_lock() )
{
_rw_manager->next_lock();
lock = write_lock( _rw_manager->current_lock(), boost::defer_lock_t() );
lock.try_lock_for( boost::chrono::microseconds( wait_micro ) );
}
}*/
callback();
}
private: private:
unique_ptr<bip::managed_mapped_file> _segment; unique_ptr<bip::managed_mapped_file> _segment;
bool _read_only = false; read_write_mutex_manager* _rw_manager;
bool _read_only = false;
/** /**
* This is a sparse list of known indicies kept to accelerate creation of undo sessions * This is a sparse list of known indicies kept to accelerate creation of undo sessions
*/ */
vector<abstract_index*> _index_list; vector<abstract_index*> _index_list;
/** /**
* This is a full map (size 2^16) of all possible index designed for constant time lookup * This is a full map (size 2^16) of all possible index designed for constant time lookup
*/ */
vector<unique_ptr<abstract_index>> _index_map; vector<unique_ptr<abstract_index>> _index_map;
bfs::path _data_dir;
boost::container::flat_map< std::string, uint16_t > _type_name_to_id; bfs::path _data_dir;
}; };
......
#include <chainbase.hpp> #include <chainbase/chainbase.hpp>
#include <boost/array.hpp> #include <boost/array.hpp>
#include <iostream> #include <iostream>
...@@ -37,7 +37,7 @@ namespace chainbase { ...@@ -37,7 +37,7 @@ namespace chainbase {
if( !bfs::exists( dir ) ) { if( !bfs::exists( dir ) ) {
if( !write ) BOOST_THROW_EXCEPTION( std::runtime_error( "database file not found at " + dir.native() ) ); if( !write ) BOOST_THROW_EXCEPTION( std::runtime_error( "database file not found at " + dir.native() ) );
} }
bfs::create_directories( dir ); bfs::create_directories( dir );
if( _data_dir != dir ) close(); if( _data_dir != dir ) close();
...@@ -46,7 +46,19 @@ namespace chainbase { ...@@ -46,7 +46,19 @@ namespace chainbase {
if( bfs::exists( abs_path ) ) if( bfs::exists( abs_path ) )
{ {
if( write ) { if( write )
{
auto existing_file_size = bfs::file_size( abs_path );
if( shared_file_size > existing_file_size )
{
if( !bip::managed_mapped_file::grow( abs_path.generic_string().c_str(), shared_file_size - existing_file_size ) )
BOOST_THROW_EXCEPTION( std::runtime_error( "could not grow database file to requested size." ) );
}
else
{
shared_file_size = existing_file_size;
}
_segment.reset( new bip::managed_mapped_file( bip::open_only, _segment.reset( new bip::managed_mapped_file( bip::open_only,
abs_path.generic_string().c_str() abs_path.generic_string().c_str()
) ); ) );
...@@ -57,6 +69,7 @@ namespace chainbase { ...@@ -57,6 +69,7 @@ namespace chainbase {
_read_only = true; _read_only = true;
} }
_rw_manager = _segment->find< read_write_mutex_manager >( "rw_manager" ).first;
auto env = _segment->find< environment_check >( "environment" ); auto env = _segment->find< environment_check >( "environment" );
if( !env.first || !( *env.first == environment_check()) ) { if( !env.first || !( *env.first == environment_check()) ) {
BOOST_THROW_EXCEPTION( std::runtime_error( "database created by a different compiler, build, or operating system" ) ); BOOST_THROW_EXCEPTION( std::runtime_error( "database created by a different compiler, build, or operating system" ) );
...@@ -65,6 +78,7 @@ namespace chainbase { ...@@ -65,6 +78,7 @@ namespace chainbase {
_segment.reset( new bip::managed_mapped_file( bip::create_only, _segment.reset( new bip::managed_mapped_file( bip::create_only,
abs_path.generic_string().c_str(), shared_file_size abs_path.generic_string().c_str(), shared_file_size
) ); ) );
_rw_manager = _segment->find_or_construct< read_write_mutex_manager >( "rw_manager" )();
_segment->find_or_construct< environment_check >( "environment" )(); _segment->find_or_construct< environment_check >( "environment" )();
} }
} }
...@@ -85,6 +99,8 @@ namespace chainbase { ...@@ -85,6 +99,8 @@ namespace chainbase {
_segment.reset(); _segment.reset();
bfs::remove_all( dir / "shared_memory" ); bfs::remove_all( dir / "shared_memory" );
_data_dir = bfs::path(); _data_dir = bfs::path();
_index_list.clear();
_index_map.clear();
} }
void database::undo() void database::undo()
...@@ -94,6 +110,7 @@ namespace chainbase { ...@@ -94,6 +110,7 @@ namespace chainbase {
item->undo(); item->undo();
} }
} }
void database::squash() void database::squash()
{ {
for( auto& item : _index_list ) for( auto& item : _index_list )
...@@ -101,6 +118,7 @@ namespace chainbase { ...@@ -101,6 +118,7 @@ namespace chainbase {
item->squash(); item->squash();
} }
} }
void database::commit( int64_t revision ) void database::commit( int64_t revision )
{ {
for( auto& item : _index_list ) for( auto& item : _index_list )
......
#define BOOST_TEST_MODULE chainbase test #define BOOST_TEST_MODULE chainbase test
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <chainbase.hpp> #include <chainbase/chainbase.hpp>
#include <boost/multi_index_container.hpp> #include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp> #include <boost/multi_index/ordered_index.hpp>
...@@ -15,12 +15,12 @@ using namespace boost::multi_index; ...@@ -15,12 +15,12 @@ using namespace boost::multi_index;
//BOOST_TEST_SUITE( serialization_tests, clean_database_fixture ) //BOOST_TEST_SUITE( serialization_tests, clean_database_fixture )
struct book : public chainbase::object<0, book> { struct book : public chainbase::object<0, book> {
template<typename Constructor, typename Allocator> template<typename Constructor, typename Allocator>
book( Constructor&& c, Allocator&& a ) { book( Constructor&& c, Allocator&& a ) {
c(*this); c(*this);
} }
id_type id; id_type id;
int a = 0; int a = 0;
int b = 1; int b = 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment