Skip to content
Snippets Groups Projects
Commit 50eda842 authored by Michael Vandeberg's avatar Michael Vandeberg
Browse files

Add support for multiple readers single writer on shared memory file

parent 5374cc07
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,9 @@
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/sync/interprocess_upgradable_mutex.hpp>
#include <boost/interprocess/sync/interprocess_sharable_mutex.hpp>
#include <boost/interprocess/sync/sharable_lock.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/multi_index_container.hpp>
......@@ -39,8 +41,8 @@ namespace chainbase {
typedef bip::basic_string< char, std::char_traits< char >, allocator< char > > shared_string;
typedef boost::interprocess::interprocess_upgradable_mutex read_write_mutex;
typedef boost::shared_lock< read_write_mutex > read_lock;
typedef boost::interprocess::interprocess_sharable_mutex read_write_mutex;
typedef boost::interprocess::sharable_lock< read_write_mutex > read_lock;
typedef boost::unique_lock< read_write_mutex > write_lock;
class database;
......@@ -560,12 +562,12 @@ namespace chainbase {
void next_lock()
{
_current_lock++;
new( &_locks[ _current_lock ] ) read_write_mutex();
new( &_locks[ _current_lock % 10 ] ) read_write_mutex();
}
read_write_mutex& current_lock()
{
return _locks[ _current_lock ];
return _locks[ _current_lock % 10 ];
}
private:
......@@ -585,7 +587,7 @@ namespace chainbase {
read_write = 1
};
void open( const bfs::path& dir, int write = read_only, uint64_t shared_file_size = 0 );
void open( const bfs::path& dir, uint32_t write = read_only, uint64_t shared_file_size = 0 );
void close();
void flush();
void wipe( const bfs::path& dir );
......@@ -688,6 +690,8 @@ namespace chainbase {
const generic_index<MultiIndexType>& get_index()const {
typedef generic_index<MultiIndexType> index_type;
typedef index_type* index_type_ptr;
assert( _index_map.size() > index_type::value_type::type_id );
assert( _index_map[index_type::value_type::type_id] );
return *index_type_ptr( _index_map[index_type::value_type::type_id]->get() );
}
......@@ -695,14 +699,18 @@ namespace chainbase {
auto get_index()const -> decltype( ((generic_index<MultiIndexType>*)( nullptr ))->indicies().template get<ByIndex>() ) {
typedef generic_index<MultiIndexType> index_type;
typedef index_type* index_type_ptr;
assert( _index_map.size() > index_type::value_type::type_id );
assert( _index_map[index_type::value_type::type_id] );
return index_type_ptr( _index_map[index_type::value_type::type_id]->get() )->indicies().template get<ByIndex>();
}
template<typename MultiIndexType>
generic_index<MultiIndexType>& get_mutable_index() {
typedef generic_index<MultiIndexType> index_type;
typedef index_type* index_type_ptr;
return *index_type_ptr( _index_map[index_type::value_type::type_id]->get() );
typedef generic_index<MultiIndexType> index_type;
typedef index_type* index_type_ptr;
assert( _index_map.size() > index_type::value_type::type_id );
assert( _index_map[index_type::value_type::type_id] );
return *index_type_ptr( _index_map[index_type::value_type::type_id]->get() );
}
template< typename ObjectType, typename IndexedByType, typename CompatibleKey >
......@@ -763,21 +771,29 @@ namespace chainbase {
}
template< typename Lambda >
void with_read_lock( Lambda&& callback, uint64_t wait_micro = 1000000 )
auto with_read_lock( Lambda&& callback, uint64_t wait_micro = 1000000 ) -> decltype( (*(Lambda*)nullptr)() )
{
read_lock lock( _rw_manager->current_lock(), boost::defer_lock_t() );
if( !wait_micro )
lock.lock();
else
lock.try_lock_for( boost::chrono::microseconds( wait_micro ) );
//if( !wait_micro )
{
read_lock lock( _rw_manager->current_lock() );
return callback();
}
/*else
{
read_lock lock( _rw_manager->current_lock(), bip::defer_lock_type() );
if( !lock.timed_lock( boost::chrono::system_clock::now() + boost::chrono::microseconds( wait_micro ) ) )
BOOST_THROW_EXCEPTION( std::runtime_error( "unable to acquire lock" ) );
if( lock.owns_lock() )
callback();
return callback();
}*/
}
template< typename Lambda >
void with_write_lock( Lambda&& callback, uint64_t wait_micro = 1000000 )
{
if( _read_only )
BOOST_THROW_EXCEPTION( std::logic_error( "cannot acquire write lock on read-only process" ) );
write_lock lock( _rw_manager->current_lock() );
/*if( !wait_micro )
lock.lock();
......@@ -797,8 +813,10 @@ namespace chainbase {
private:
unique_ptr<bip::managed_mapped_file> _segment;
read_write_mutex_manager* _rw_manager;
unique_ptr<bip::managed_mapped_file> _meta;
read_write_mutex_manager* _rw_manager = nullptr;
bool _read_only = false;
bip::file_lock _flock;
/**
* This is a sparse list of known indicies kept to accelerate creation of undo sessions
......
......@@ -30,7 +30,7 @@ namespace chainbase {
bool windows = false;
};
void database::open( const bfs::path& dir, int flags, uint64_t shared_file_size ) {
void database::open( const bfs::path& dir, uint32_t flags, uint64_t shared_file_size ) {
bool write = flags & database::read_write;
......@@ -42,7 +42,7 @@ namespace chainbase {
if( _data_dir != dir ) close();
_data_dir = dir;
auto abs_path = bfs::absolute( dir / "shared_memory" );
auto abs_path = bfs::absolute( dir / "shared_memory.bin" );
if( bfs::exists( abs_path ) )
{
......@@ -54,10 +54,6 @@ namespace chainbase {
if( !bip::managed_mapped_file::grow( abs_path.generic_string().c_str(), shared_file_size - existing_file_size ) )
BOOST_THROW_EXCEPTION( std::runtime_error( "could not grow database file to requested size." ) );
}
else
{
shared_file_size = existing_file_size;
}
_segment.reset( new bip::managed_mapped_file( bip::open_only,
abs_path.generic_string().c_str()
......@@ -69,7 +65,6 @@ namespace chainbase {
_read_only = true;
}
_rw_manager = _segment->find< read_write_mutex_manager >( "rw_manager" ).first;
auto env = _segment->find< environment_check >( "environment" );
if( !env.first || !( *env.first == environment_check()) ) {
BOOST_THROW_EXCEPTION( std::runtime_error( "database created by a different compiler, build, or operating system" ) );
......@@ -78,26 +73,58 @@ namespace chainbase {
_segment.reset( new bip::managed_mapped_file( bip::create_only,
abs_path.generic_string().c_str(), shared_file_size
) );
_rw_manager = _segment->find_or_construct< read_write_mutex_manager >( "rw_manager" )();
_segment->find_or_construct< environment_check >( "environment" )();
}
abs_path = bfs::absolute( dir / "shared_memory.meta" );
if( bfs::exists( abs_path ) )
{
_meta.reset( new bip::managed_mapped_file( bip::open_only, abs_path.generic_string().c_str()
) );
_rw_manager = _meta->find< read_write_mutex_manager >( "rw_manager" ).first;
if( !_rw_manager )
BOOST_THROW_EXCEPTION( std::runtime_error( "could not find read write lock manager" ) );
}
else
{
_meta.reset( new bip::managed_mapped_file( bip::create_only,
abs_path.generic_string().c_str(), sizeof( read_write_mutex_manager ) * 2
) );
_rw_manager = _meta->find_or_construct< read_write_mutex_manager >( "rw_manager" )();
}
if( write )
{
_flock = bip::file_lock( abs_path.generic_string().c_str() );
if( !_flock.try_lock() )
BOOST_THROW_EXCEPTION( std::runtime_error( "could not gain write access to the shared memory file" ) );
}
}
void database::flush() {
if( _segment )
_segment->flush();
if( _meta )
_meta->flush();
}
void database::close()
{
_segment.reset();
_meta.reset();
_data_dir = bfs::path();
}
void database::wipe( const bfs::path& dir )
{
_segment.reset();
bfs::remove_all( dir / "shared_memory" );
_meta.reset();
bfs::remove_all( dir / "shared_memory.bin" );
bfs::remove_all( dir / "shared_memory.meta" );
_data_dir = bfs::path();
_index_list.clear();
_index_map.clear();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment