From db304bfec4d3cc1d16a6cf2505c9d29774a24d9c Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sun, 21 Feb 2016 18:23:58 -0500 Subject: [PATCH] Remove the use of cached read and write handlers in the asio transport. references #490 #525 There isn't a clean way to implement this performance optimization without adding global state/locking, which performs worse. This should fix --- changelog.md | 6 + websocketpp/transport/asio/connection.hpp | 165 ++++++++-------------- 2 files changed, 67 insertions(+), 104 deletions(-) diff --git a/changelog.md b/changelog.md index 5471095..7c0c013 100644 --- a/changelog.md +++ b/changelog.md @@ -46,6 +46,12 @@ HEAD Bastien Brunnenstein for reporting and a patch. #462 - Bug: Fix an issue where TLS includes were broken for Asio Standalone builds. Thank you giachi and Bastien Brunnenstein for reporting. #491 +- Bug: Remove the use of cached read and write handlers in the Asio transport. + This feature caused memory leaks when the io_service the connection was + running on was abruptly stopped. There isn't a clean and safe way of using + this optimization without global state and the associated locks. The locks + perform worse. Thank you Xavier Gibert for reporting, test cases, and code. + Fixes #490. - Compatibility: Fixes a number of build & config issues on Visual Studio 2015 - Compatibility: Removes non-standards compliant masking behavior. #395, #469 - Compatibility: Replace deprecated use of auto_ptr on systems where unique_ptr diff --git a/websocketpp/transport/asio/connection.hpp b/websocketpp/transport/asio/connection.hpp index 8121598..8eb8c75 100644 --- a/websocketpp/transport/asio/connection.hpp +++ b/websocketpp/transport/asio/connection.hpp @@ -415,12 +415,11 @@ protected: // TODO: pre-init timeout. Right now no implemented socket policies // actually have an asyncronous pre-init - m_init_handler = callback; - socket_con_type::pre_init( lib::bind( &type::handle_pre_init, get_shared(), + callback, lib::placeholders::_1 ) ); @@ -465,26 +464,14 @@ protected: m_strand = lib::make_shared( lib::ref(*io_service)); } - m_async_read_handler = lib::bind(&type::handle_async_read, - get_shared(), lib::placeholders::_1, lib::placeholders::_2); - - m_async_write_handler = lib::bind(&type::handle_async_write, - get_shared(), lib::placeholders::_1, lib::placeholders::_2); lib::error_code ec = socket_con_type::init_asio(io_service, m_strand, m_is_server); - if (ec) { - // reset the handlers to break the circular reference: - // this->handler->this - lib::clear_function(m_async_read_handler); - lib::clear_function(m_async_write_handler); - } - return ec; } - void handle_pre_init(lib::error_code const & ec) { + void handle_pre_init(init_handler callback, lib::error_code const & ec) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"asio connection handle pre_init"); } @@ -494,19 +481,19 @@ protected: } if (ec) { - m_init_handler(ec); + callback(ec); } // If we have a proxy set issue a proxy connect, otherwise skip to // post_init if (!m_proxy.empty()) { - proxy_write(); + proxy_write(callback); } else { - post_init(); + post_init(callback); } } - void post_init() { + void post_init(init_handler callback) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"asio connection post_init"); } @@ -520,7 +507,7 @@ protected: &type::handle_post_init_timeout, get_shared(), post_timer, - m_init_handler, + callback, lib::placeholders::_1 ) ); @@ -531,7 +518,7 @@ protected: &type::handle_post_init, get_shared(), post_timer, - m_init_handler, + callback, lib::placeholders::_1 ) ); @@ -607,7 +594,7 @@ protected: callback(ec); } - void proxy_write() { + void proxy_write(init_handler callback) { if (m_alog.static_test(log::alevel::devel)) { m_alog.write(log::alevel::devel,"asio connection proxy_write"); } @@ -615,7 +602,7 @@ protected: if (!m_proxy_data) { m_elog.write(log::elevel::library, "assertion failed: !m_proxy_data in asio::connection::proxy_write"); - m_init_handler(make_error_code(error::general)); + callback(make_error_code(error::general)); return; } @@ -632,7 +619,7 @@ protected: lib::bind( &type::handle_proxy_timeout, get_shared(), - m_init_handler, + callback, lib::placeholders::_1 ) ); @@ -644,7 +631,7 @@ protected: m_bufs, m_strand->wrap(lib::bind( &type::handle_proxy_write, get_shared(), - m_init_handler, + callback, lib::placeholders::_1 )) ); @@ -654,7 +641,7 @@ protected: m_bufs, lib::bind( &type::handle_proxy_write, get_shared(), - m_init_handler, + callback, lib::placeholders::_1 ) ); @@ -825,15 +812,11 @@ protected: m_proxy_data.reset(); // Continue with post proxy initialization - post_init(); + post_init(callback); } } /// read at least num_bytes bytes into buf and then call handler. - /** - * - * - */ void async_read_at_least(size_t num_bytes, char *buf, size_t len, read_handler handler) { @@ -843,13 +826,6 @@ protected: m_alog.write(log::alevel::devel,s.str()); } - if (!m_async_read_handler) { - m_alog.write(log::alevel::devel, - "async_read_at_least called after async_shutdown"); - handler(make_error_code(transport::error::action_after_shutdown),0); - return; - } - // TODO: safety vs speed ? // maybe move into an if devel block /*if (num_bytes > len) { @@ -860,22 +836,19 @@ protected: return; }*/ - m_read_handler = handler; - - if (!m_read_handler) { - m_alog.write(log::alevel::devel, - "asio con async_read_at_least called with bad handler"); - } - if (config::enable_multithreading) { lib::asio::async_read( socket_con_type::get_socket(), lib::asio::buffer(buf,len), lib::asio::transfer_at_least(num_bytes), - m_strand->wrap( - make_custom_alloc_handler( - m_read_handler_allocator, - m_async_read_handler)) + m_strand->wrap(make_custom_alloc_handler( + m_read_handler_allocator, + lib::bind( + &type::handle_async_read, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + )) ); } else { lib::asio::async_read( @@ -884,14 +857,18 @@ protected: lib::asio::transfer_at_least(num_bytes), make_custom_alloc_handler( m_read_handler_allocator, - m_async_read_handler + lib::bind( + &type::handle_async_read, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) ) ); } } - void handle_async_read(lib::asio::error_code const & ec, + void handle_async_read(read_handler handler, lib::asio::error_code const & ec, size_t bytes_transferred) { m_alog.write(log::alevel::devel, "asio con handle_async_read"); @@ -915,10 +892,8 @@ protected: log_err(log::elevel::info,"asio async_read_at_least",ec); } } - if (m_read_handler) { - m_read_handler(tec,bytes_transferred); - // TODO: why does this line break things? - //m_read_handler = _WEBSOCKETPP_NULL_FUNCTION_; + if (handler) { + handler(tec,bytes_transferred); } else { // This can happen in cases where the connection is terminated while // the transport is waiting on a read. @@ -927,26 +902,22 @@ protected: } } + /// Initiate a potentially asyncronous write of the given buffer void async_write(const char* buf, size_t len, write_handler handler) { - if (!m_async_write_handler) { - m_alog.write(log::alevel::devel, - "async_write (single) called after async_shutdown"); - handler(make_error_code(transport::error::action_after_shutdown)); - return; - } - m_bufs.push_back(lib::asio::buffer(buf,len)); - m_write_handler = handler; - if (config::enable_multithreading) { lib::asio::async_write( socket_con_type::get_socket(), m_bufs, - m_strand->wrap( - make_custom_alloc_handler( - m_write_handler_allocator, - m_async_write_handler)) + m_strand->wrap(make_custom_alloc_handler( + m_write_handler_allocator, + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + )) ); } else { lib::asio::async_write( @@ -954,35 +925,36 @@ protected: m_bufs, make_custom_alloc_handler( m_write_handler_allocator, - m_async_write_handler + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) ) ); } } + /// Initiate a potentially asyncronous write of the given buffers void async_write(std::vector const & bufs, write_handler handler) { - if (!m_async_write_handler) { - m_alog.write(log::alevel::devel, - "async_write (vector) called after async_shutdown"); - handler(make_error_code(transport::error::action_after_shutdown)); - return; - } std::vector::const_iterator it; for (it = bufs.begin(); it != bufs.end(); ++it) { m_bufs.push_back(lib::asio::buffer((*it).buf,(*it).len)); } - m_write_handler = handler; - if (config::enable_multithreading) { lib::asio::async_write( socket_con_type::get_socket(), m_bufs, - m_strand->wrap( - make_custom_alloc_handler( - m_write_handler_allocator, - m_async_write_handler)) + m_strand->wrap(make_custom_alloc_handler( + m_write_handler_allocator, + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) + )) ); } else { lib::asio::async_write( @@ -990,7 +962,11 @@ protected: m_bufs, make_custom_alloc_handler( m_write_handler_allocator, - m_async_write_handler + lib::bind( + &type::handle_async_write, get_shared(), + handler, + lib::placeholders::_1, lib::placeholders::_2 + ) ) ); } @@ -1001,17 +977,15 @@ protected: * @param ec The status code * @param bytes_transferred The number of bytes read */ - void handle_async_write(lib::asio::error_code const & ec, size_t) { + void handle_async_write(write_handler handler, lib::asio::error_code const & ec, size_t) { m_bufs.clear(); lib::error_code tec; if (ec) { log_err(log::elevel::info,"asio async_write",ec); tec = make_error_code(transport::error::pass_through); } - if (m_write_handler) { - m_write_handler(tec); - // TODO: why does this line break things? - //m_write_handler = _WEBSOCKETPP_NULL_FUNCTION_; + if (handler) { + handler(tec); } else { // This can happen in cases where the connection is terminated while // the transport is waiting on a read. @@ -1064,16 +1038,6 @@ protected: m_alog.write(log::alevel::devel,"asio connection async_shutdown"); } - // Reset cached handlers now that we won't be reading or writing anymore - // These cached handlers store shared pointers to this connection and - // will leak the connection if not destroyed. - lib::clear_function(m_async_read_handler); - lib::clear_function(m_async_write_handler); - lib::clear_function(m_init_handler); - - lib::clear_function(m_read_handler); - lib::clear_function(m_write_handler); - timer_ptr shutdown_timer; shutdown_timer = set_timer( config::timeout_socket_shutdown, @@ -1230,13 +1194,6 @@ private: handler_allocator m_read_handler_allocator; handler_allocator m_write_handler_allocator; - - read_handler m_read_handler; - write_handler m_write_handler; - init_handler m_init_handler; - - async_read_handler m_async_read_handler; - async_write_handler m_async_write_handler; };