Commit 05339f4b authored by Krzysztof Mochocki's avatar Krzysztof Mochocki

removed mutexes

parent 2e0a6a6c
......@@ -29,34 +29,19 @@ namespace hive
namespace detail
{
struct mutex_deleter
{
std::mutex &mtx;
void operator()(work *ptr)
{
if (ptr)
{
mtx.unlock();
delete ptr;
}
}
};
using transaction_t = std::unique_ptr<work, mutex_deleter>;
// using transaction_t = std::unique_ptr<work>;
using transaction_t = std::unique_ptr<work>;
struct postgress_connection_holder
{
explicit postgress_connection_holder(const fc::string &url)
: _connection{std::make_unique<pqxx::connection>(url)} {}
: _connection{std::make_unique<pqxx::connection>(url)}, connection_string{url} {}
transaction_t start_transaction()
{
// std::cout << "started transaction" << std::endl;
transaction_mutex.lock();
work *trx = new work{*_connection};
trx->exec("SET CONSTRAINTS ALL DEFERRED;");
return transaction_t{trx, mutex_deleter{transaction_mutex}};
return transaction_t{trx};
}
bool exec_transaction(transaction_t &trx, const fc::string &sql)
......@@ -82,12 +67,11 @@ namespace hive
bool exec_no_transaction(const fc::string &sql, pqxx::result *result = nullptr)
{
const std::lock_guard<std::mutex> lck{ transaction_mutex };
if (sql == fc::string())
return true;
return sql_safe_execution([&]() {
pqxx::nontransaction _work{*_connection};
pqxx::connection conn{ this->connection_string };
pqxx::nontransaction _work{conn};
if (result)
*result = _work.exec(sql);
else
......@@ -119,8 +103,7 @@ namespace hive
private:
std::unique_ptr<pqxx::connection> _connection;
std::mutex transaction_mutex;
fc::string connection_string;
void destroy_transaction(transaction_t &trx) const { trx.~unique_ptr(); }
......@@ -232,9 +215,10 @@ namespace hive
if (accs != fc::string()) connection.exec_transaction(trx, accs);
}
bool process_and_send_data(PSQL::sql_dumper &dumper, const cached_data_t &data, transaction_t& transaction)
bool process_and_send_data(PSQL::sql_dumper &dumper, const cached_data_t &data)
{
constexpr size_t max_data_length = 256*1024*1024; // 256 KB
constexpr size_t max_data_length = 512*1024*1024; // 512 KB
transaction_t transaction = connection.start_transaction();
for (size_t i = 0; i < data.blocks.size(); i++)
{
......@@ -264,6 +248,8 @@ namespace hive
}
}
connection.commit_transaction(transaction);
for (size_t i = 0; i < data.operations.size(); i++)
{
const size_t sz = dumper.process_operation(data.operations[i]);
......@@ -273,8 +259,13 @@ namespace hive
i == data.operations.size() - 1
)
{
transaction = connection.start_transaction();
upload_caches(transaction, dumper);
if(!send_data( dumper.reset_operations_stream(), transaction )) return false;
if(!send_data( dumper.reset_operations_stream(), transaction ))
{
connection.abort_transaction(transaction);
return false;
}else connection.commit_transaction(transaction);
}
}
......@@ -287,8 +278,14 @@ namespace hive
i == data.virtual_operations.size() - 1
)
{
transaction = connection.start_transaction();
upload_caches(transaction, dumper);
if(!send_data( dumper.reset_virtual_operation_stream(), transaction )) return false;
if(!send_data( dumper.reset_virtual_operation_stream(), transaction ))
{
connection.abort_transaction(transaction);
return false;
}
connection.commit_transaction(transaction);
}
}
......@@ -324,35 +321,18 @@ namespace hive
{
if (worker.first.get())
return;
const auto cache_processor_function = [this](cached_containter_t input, bool *is_ready) {
auto tm = fc::time_point().now();
auto measure_time = [&](const char *caption) {
// std::cout << caption << (fc::time_point().now() - tm).count() << std::endl;
tm = fc::time_point().now();
};
const auto cache_processor_function = [this](cached_containter_t input, bool* is_ready) {
if (input.get() == nullptr)
return;
PSQL::sql_dumper dumper{_db, names_to_flush, this->connection.get_escaping_charachter_methode()};
// acquiring lock
transaction_t trx = this->connection.start_transaction();
measure_time("starting transaction: ");
// sending
if (!this->process_and_send_data(dumper, *input, trx))
if (!this->process_and_send_data(dumper, *input))
{
this->connection.abort_transaction(trx);
if(is_ready) *is_ready = true;
FC_ASSERT(false);
}
else
{
if(is_ready) *is_ready = true;
this->connection.commit_transaction(trx);
}
measure_time("processing and commiting: ");
}else if(is_ready) *is_ready = true;
};
if(in_new_thread)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment