Remove the use of cached read and write handlers in the asio transport. references #490 #525

There isn't a clean way to implement this performance optimization without adding global state/locking, which performs worse. This should fix
This commit is contained in:
Peter Thorson
2016-02-21 18:23:58 -05:00
parent 9dc013a3aa
commit db304bfec4
2 changed files with 67 additions and 104 deletions
+6
View File
@@ -46,6 +46,12 @@ HEAD
Bastien Brunnenstein for reporting and a patch. #462
- Bug: Fix an issue where TLS includes were broken for Asio Standalone builds.
Thank you giachi and Bastien Brunnenstein for reporting. #491
- Bug: Remove the use of cached read and write handlers in the Asio transport.
This feature caused memory leaks when the io_service the connection was
running on was abruptly stopped. There isn't a clean and safe way of using
this optimization without global state and the associated locks. The locks
perform worse. Thank you Xavier Gibert for reporting, test cases, and code.
Fixes #490.
- Compatibility: Fixes a number of build & config issues on Visual Studio 2015
- Compatibility: Removes non-standards compliant masking behavior. #395, #469
- Compatibility: Replace deprecated use of auto_ptr on systems where unique_ptr
+61 -104
View File
@@ -415,12 +415,11 @@ protected:
// TODO: pre-init timeout. Right now no implemented socket policies
// actually have an asyncronous pre-init
m_init_handler = callback;
socket_con_type::pre_init(
lib::bind(
&type::handle_pre_init,
get_shared(),
callback,
lib::placeholders::_1
)
);
@@ -465,26 +464,14 @@ protected:
m_strand = lib::make_shared<lib::asio::io_service::strand>(
lib::ref(*io_service));
}
m_async_read_handler = lib::bind(&type::handle_async_read,
get_shared(), lib::placeholders::_1, lib::placeholders::_2);
m_async_write_handler = lib::bind(&type::handle_async_write,
get_shared(), lib::placeholders::_1, lib::placeholders::_2);
lib::error_code ec = socket_con_type::init_asio(io_service, m_strand,
m_is_server);
if (ec) {
// reset the handlers to break the circular reference:
// this->handler->this
lib::clear_function(m_async_read_handler);
lib::clear_function(m_async_write_handler);
}
return ec;
}
void handle_pre_init(lib::error_code const & ec) {
void handle_pre_init(init_handler callback, lib::error_code const & ec) {
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection handle pre_init");
}
@@ -494,19 +481,19 @@ protected:
}
if (ec) {
m_init_handler(ec);
callback(ec);
}
// If we have a proxy set issue a proxy connect, otherwise skip to
// post_init
if (!m_proxy.empty()) {
proxy_write();
proxy_write(callback);
} else {
post_init();
post_init(callback);
}
}
void post_init() {
void post_init(init_handler callback) {
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection post_init");
}
@@ -520,7 +507,7 @@ protected:
&type::handle_post_init_timeout,
get_shared(),
post_timer,
m_init_handler,
callback,
lib::placeholders::_1
)
);
@@ -531,7 +518,7 @@ protected:
&type::handle_post_init,
get_shared(),
post_timer,
m_init_handler,
callback,
lib::placeholders::_1
)
);
@@ -607,7 +594,7 @@ protected:
callback(ec);
}
void proxy_write() {
void proxy_write(init_handler callback) {
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection proxy_write");
}
@@ -615,7 +602,7 @@ protected:
if (!m_proxy_data) {
m_elog.write(log::elevel::library,
"assertion failed: !m_proxy_data in asio::connection::proxy_write");
m_init_handler(make_error_code(error::general));
callback(make_error_code(error::general));
return;
}
@@ -632,7 +619,7 @@ protected:
lib::bind(
&type::handle_proxy_timeout,
get_shared(),
m_init_handler,
callback,
lib::placeholders::_1
)
);
@@ -644,7 +631,7 @@ protected:
m_bufs,
m_strand->wrap(lib::bind(
&type::handle_proxy_write, get_shared(),
m_init_handler,
callback,
lib::placeholders::_1
))
);
@@ -654,7 +641,7 @@ protected:
m_bufs,
lib::bind(
&type::handle_proxy_write, get_shared(),
m_init_handler,
callback,
lib::placeholders::_1
)
);
@@ -825,15 +812,11 @@ protected:
m_proxy_data.reset();
// Continue with post proxy initialization
post_init();
post_init(callback);
}
}
/// read at least num_bytes bytes into buf and then call handler.
/**
*
*
*/
void async_read_at_least(size_t num_bytes, char *buf, size_t len,
read_handler handler)
{
@@ -843,13 +826,6 @@ protected:
m_alog.write(log::alevel::devel,s.str());
}
if (!m_async_read_handler) {
m_alog.write(log::alevel::devel,
"async_read_at_least called after async_shutdown");
handler(make_error_code(transport::error::action_after_shutdown),0);
return;
}
// TODO: safety vs speed ?
// maybe move into an if devel block
/*if (num_bytes > len) {
@@ -860,22 +836,19 @@ protected:
return;
}*/
m_read_handler = handler;
if (!m_read_handler) {
m_alog.write(log::alevel::devel,
"asio con async_read_at_least called with bad handler");
}
if (config::enable_multithreading) {
lib::asio::async_read(
socket_con_type::get_socket(),
lib::asio::buffer(buf,len),
lib::asio::transfer_at_least(num_bytes),
m_strand->wrap(
make_custom_alloc_handler(
m_read_handler_allocator,
m_async_read_handler))
m_strand->wrap(make_custom_alloc_handler(
m_read_handler_allocator,
lib::bind(
&type::handle_async_read, get_shared(),
handler,
lib::placeholders::_1, lib::placeholders::_2
)
))
);
} else {
lib::asio::async_read(
@@ -884,14 +857,18 @@ protected:
lib::asio::transfer_at_least(num_bytes),
make_custom_alloc_handler(
m_read_handler_allocator,
m_async_read_handler
lib::bind(
&type::handle_async_read, get_shared(),
handler,
lib::placeholders::_1, lib::placeholders::_2
)
)
);
}
}
void handle_async_read(lib::asio::error_code const & ec,
void handle_async_read(read_handler handler, lib::asio::error_code const & ec,
size_t bytes_transferred)
{
m_alog.write(log::alevel::devel, "asio con handle_async_read");
@@ -915,10 +892,8 @@ protected:
log_err(log::elevel::info,"asio async_read_at_least",ec);
}
}
if (m_read_handler) {
m_read_handler(tec,bytes_transferred);
// TODO: why does this line break things?
//m_read_handler = _WEBSOCKETPP_NULL_FUNCTION_;
if (handler) {
handler(tec,bytes_transferred);
} else {
// This can happen in cases where the connection is terminated while
// the transport is waiting on a read.
@@ -927,26 +902,22 @@ protected:
}
}
/// Initiate a potentially asyncronous write of the given buffer
void async_write(const char* buf, size_t len, write_handler handler) {
if (!m_async_write_handler) {
m_alog.write(log::alevel::devel,
"async_write (single) called after async_shutdown");
handler(make_error_code(transport::error::action_after_shutdown));
return;
}
m_bufs.push_back(lib::asio::buffer(buf,len));
m_write_handler = handler;
if (config::enable_multithreading) {
lib::asio::async_write(
socket_con_type::get_socket(),
m_bufs,
m_strand->wrap(
make_custom_alloc_handler(
m_write_handler_allocator,
m_async_write_handler))
m_strand->wrap(make_custom_alloc_handler(
m_write_handler_allocator,
lib::bind(
&type::handle_async_write, get_shared(),
handler,
lib::placeholders::_1, lib::placeholders::_2
)
))
);
} else {
lib::asio::async_write(
@@ -954,35 +925,36 @@ protected:
m_bufs,
make_custom_alloc_handler(
m_write_handler_allocator,
m_async_write_handler
lib::bind(
&type::handle_async_write, get_shared(),
handler,
lib::placeholders::_1, lib::placeholders::_2
)
)
);
}
}
/// Initiate a potentially asyncronous write of the given buffers
void async_write(std::vector<buffer> const & bufs, write_handler handler) {
if (!m_async_write_handler) {
m_alog.write(log::alevel::devel,
"async_write (vector) called after async_shutdown");
handler(make_error_code(transport::error::action_after_shutdown));
return;
}
std::vector<buffer>::const_iterator it;
for (it = bufs.begin(); it != bufs.end(); ++it) {
m_bufs.push_back(lib::asio::buffer((*it).buf,(*it).len));
}
m_write_handler = handler;
if (config::enable_multithreading) {
lib::asio::async_write(
socket_con_type::get_socket(),
m_bufs,
m_strand->wrap(
make_custom_alloc_handler(
m_write_handler_allocator,
m_async_write_handler))
m_strand->wrap(make_custom_alloc_handler(
m_write_handler_allocator,
lib::bind(
&type::handle_async_write, get_shared(),
handler,
lib::placeholders::_1, lib::placeholders::_2
)
))
);
} else {
lib::asio::async_write(
@@ -990,7 +962,11 @@ protected:
m_bufs,
make_custom_alloc_handler(
m_write_handler_allocator,
m_async_write_handler
lib::bind(
&type::handle_async_write, get_shared(),
handler,
lib::placeholders::_1, lib::placeholders::_2
)
)
);
}
@@ -1001,17 +977,15 @@ protected:
* @param ec The status code
* @param bytes_transferred The number of bytes read
*/
void handle_async_write(lib::asio::error_code const & ec, size_t) {
void handle_async_write(write_handler handler, lib::asio::error_code const & ec, size_t) {
m_bufs.clear();
lib::error_code tec;
if (ec) {
log_err(log::elevel::info,"asio async_write",ec);
tec = make_error_code(transport::error::pass_through);
}
if (m_write_handler) {
m_write_handler(tec);
// TODO: why does this line break things?
//m_write_handler = _WEBSOCKETPP_NULL_FUNCTION_;
if (handler) {
handler(tec);
} else {
// This can happen in cases where the connection is terminated while
// the transport is waiting on a read.
@@ -1064,16 +1038,6 @@ protected:
m_alog.write(log::alevel::devel,"asio connection async_shutdown");
}
// Reset cached handlers now that we won't be reading or writing anymore
// These cached handlers store shared pointers to this connection and
// will leak the connection if not destroyed.
lib::clear_function(m_async_read_handler);
lib::clear_function(m_async_write_handler);
lib::clear_function(m_init_handler);
lib::clear_function(m_read_handler);
lib::clear_function(m_write_handler);
timer_ptr shutdown_timer;
shutdown_timer = set_timer(
config::timeout_socket_shutdown,
@@ -1230,13 +1194,6 @@ private:
handler_allocator m_read_handler_allocator;
handler_allocator m_write_handler_allocator;
read_handler m_read_handler;
write_handler m_write_handler;
init_handler m_init_handler;
async_read_handler m_async_read_handler;
async_write_handler m_async_write_handler;
};