adds a vectored write handler for iostream transport
This commit is contained in:
@@ -117,7 +117,7 @@ struct stub_con : public iostream_con {
|
||||
indef_read_size = num_bytes;
|
||||
indef_read_buf = buf;
|
||||
indef_read_len = len;
|
||||
|
||||
|
||||
indef_read();
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ struct stub_con : public iostream_con {
|
||||
void handle_indef(websocketpp::lib::error_code const & e, size_t amt_read) {
|
||||
ec = e;
|
||||
indef_read_total += amt_read;
|
||||
|
||||
|
||||
indef_read();
|
||||
}
|
||||
|
||||
@@ -203,6 +203,15 @@ websocketpp::lib::error_code write_handler(std::string & o, websocketpp::connect
|
||||
return websocketpp::lib::error_code();
|
||||
}
|
||||
|
||||
websocketpp::lib::error_code vector_write_handler(std::string & o, websocketpp::connection_hdl, std::vector<websocketpp::transport::buffer> const & bufs) {
|
||||
std::vector<websocketpp::transport::buffer>::const_iterator it;
|
||||
for (it = bufs.begin(); it != bufs.end(); it++) {
|
||||
o += std::string((*it).buf, (*it).len);
|
||||
}
|
||||
|
||||
return websocketpp::lib::error_code();
|
||||
}
|
||||
|
||||
websocketpp::lib::error_code write_handler_error(websocketpp::connection_hdl, char const *, size_t) {
|
||||
return make_error_code(websocketpp::transport::error::general);
|
||||
}
|
||||
@@ -249,7 +258,7 @@ BOOST_AUTO_TEST_CASE( async_write_vector_0_write_handler ) {
|
||||
std::string output;
|
||||
|
||||
stub_con::ptr con(new stub_con(true,alogger,elogger));
|
||||
|
||||
|
||||
con->set_write_handler(websocketpp::lib::bind(
|
||||
&write_handler,
|
||||
websocketpp::lib::ref(output),
|
||||
@@ -354,6 +363,31 @@ BOOST_AUTO_TEST_CASE( async_write_vector_2_write_handler ) {
|
||||
BOOST_CHECK_EQUAL( output, "foobar" );
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( async_write_vector_2_vector_write_handler ) {
|
||||
std::string output;
|
||||
|
||||
stub_con::ptr con(new stub_con(true,alogger,elogger));
|
||||
con->set_vector_write_handler(websocketpp::lib::bind(
|
||||
&vector_write_handler,
|
||||
websocketpp::lib::ref(output),
|
||||
websocketpp::lib::placeholders::_1,
|
||||
websocketpp::lib::placeholders::_2
|
||||
));
|
||||
|
||||
std::vector<websocketpp::transport::buffer> bufs;
|
||||
|
||||
std::string foo = "foo";
|
||||
std::string bar = "bar";
|
||||
|
||||
bufs.push_back(websocketpp::transport::buffer(foo.data(),foo.size()));
|
||||
bufs.push_back(websocketpp::transport::buffer(bar.data(),bar.size()));
|
||||
|
||||
con->write(bufs);
|
||||
|
||||
BOOST_CHECK( !con->ec );
|
||||
BOOST_CHECK_EQUAL( output, "foobar" );
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE( async_read_at_least_too_much ) {
|
||||
stub_con::ptr con(new stub_con(true,alogger,elogger));
|
||||
|
||||
@@ -483,7 +517,7 @@ BOOST_AUTO_TEST_CASE( async_read_at_least_read_some_indef ) {
|
||||
BOOST_CHECK( !con->ec );
|
||||
BOOST_CHECK_EQUAL( std::string(buf,10), "aaaaaxxxxx" );
|
||||
BOOST_CHECK_EQUAL( con->indef_read_total, 5 );
|
||||
|
||||
|
||||
// A subsequent read should read 5 more because the indef read refreshes
|
||||
// itself. The new read will start again at the beginning of the buffer.
|
||||
BOOST_CHECK_EQUAL(con->read_some(input+5,5), 5);
|
||||
@@ -539,7 +573,7 @@ websocketpp::lib::error_code sd_handler(websocketpp::connection_hdl) {
|
||||
|
||||
BOOST_AUTO_TEST_CASE( shutdown_handler ) {
|
||||
stub_con::ptr con(new stub_con(true,alogger,elogger));
|
||||
|
||||
|
||||
con->set_shutdown_handler(&sd_handler);
|
||||
BOOST_CHECK_EQUAL( con->ec, make_error_code(websocketpp::error::test) );
|
||||
con->shutdown();
|
||||
|
||||
@@ -33,7 +33,10 @@
|
||||
#include <websocketpp/common/functional.hpp>
|
||||
#include <websocketpp/common/connection_hdl.hpp>
|
||||
|
||||
#include <websocketpp/transport/base/connection.hpp>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace websocketpp {
|
||||
namespace transport {
|
||||
@@ -41,10 +44,19 @@ namespace transport {
|
||||
namespace iostream {
|
||||
|
||||
/// The type and signature of the callback used by iostream transport to write
|
||||
typedef lib::function<lib::error_code(connection_hdl, char const *, size_t)>
|
||||
typedef lib::function<lib::error_code(connection_hdl, char const *, size_t)>
|
||||
write_handler;
|
||||
|
||||
/// The type and signature of the callback used by iostream transport to signal
|
||||
/// The type and signature of the callback used by iostream transport to perform
|
||||
/// vectored writes.
|
||||
/**
|
||||
* If a vectored write handler is not set the standard write handler will be
|
||||
* called multiple times.
|
||||
*/
|
||||
typedef lib::function<lib::error_code(connection_hdl, std::vector<transport::buffer> const
|
||||
& bufs)> vector_write_handler;
|
||||
|
||||
/// The type and signature of the callback used by iostream transport to signal
|
||||
/// a transport shutdown.
|
||||
typedef lib::function<lib::error_code(connection_hdl)> shutdown_handler;
|
||||
|
||||
|
||||
@@ -168,7 +168,7 @@ public:
|
||||
|
||||
return this->read_some_impl(buf,len);
|
||||
}
|
||||
|
||||
|
||||
/// Manual input supply (read all)
|
||||
/**
|
||||
* Similar to read_some, but continues to read until all bytes in the
|
||||
@@ -188,7 +188,7 @@ public:
|
||||
size_t read_all(char const * buf, size_t len) {
|
||||
// this serializes calls to external read.
|
||||
scoped_lock_type lock(m_read_mutex);
|
||||
|
||||
|
||||
size_t total_read = 0;
|
||||
size_t temp_read = 0;
|
||||
|
||||
@@ -327,25 +327,60 @@ public:
|
||||
timer_ptr set_timer(long, timer_handler) {
|
||||
return timer_ptr();
|
||||
}
|
||||
|
||||
|
||||
/// Sets the write handler
|
||||
/**
|
||||
* The write handler is called when the iostream transport receives data
|
||||
* that needs to be written to the appropriate output location. This handler
|
||||
* can be used in place of registering an ostream for output.
|
||||
*
|
||||
* The signature of the handler is
|
||||
* The signature of the handler is
|
||||
* `lib::error_code (connection_hdl, char const *, size_t)` The
|
||||
* code returned will be reported and logged by the core library.
|
||||
*
|
||||
* See also, set_vector_write_handler, for an optional write handler that
|
||||
* allows more efficient handling of multiple writes at once.
|
||||
*
|
||||
* @see set_vector_write_handler
|
||||
*
|
||||
* @since 0.5.0
|
||||
*
|
||||
* @param h The handler to call on connection shutdown.
|
||||
* @param h The handler to call when data is to be written.
|
||||
*/
|
||||
void set_write_handler(write_handler h) {
|
||||
m_write_handler = h;
|
||||
}
|
||||
|
||||
|
||||
/// Sets the vectored write handler
|
||||
/**
|
||||
* The vectored write handler is called when the iostream transport receives
|
||||
* multiple chunks of data that need to be written to the appropriate output
|
||||
* location. This handler can be used in conjunction with the write_handler
|
||||
* in place of registering an ostream for output.
|
||||
*
|
||||
* The sequence of buffers represents bytes that should be written
|
||||
* consecutively and it is suggested to group the buffers into as few next
|
||||
* layer packets as possible. Vector write is used to allow implementations
|
||||
* that support it to coalesce writes into a single TCP packet or TLS
|
||||
* segment for improved efficiency.
|
||||
*
|
||||
* This is an optional handler. If it is not defined then multiple calls
|
||||
* will be made to the standard write handler.
|
||||
*
|
||||
* The signature of the handler is
|
||||
* `lib::error_code (connection_hdl, std::vector<websocketpp::transport::buffer>
|
||||
* const & bufs)`. The code returned will be reported and logged by the core
|
||||
* library. The `websocketpp::transport::buffer` type is a struct with two
|
||||
* data members. buf (char const *) and len (size_t).
|
||||
*
|
||||
* @since 0.6.0
|
||||
*
|
||||
* @param h The handler to call when vectored data is to be written.
|
||||
*/
|
||||
void set_vector_write_handler(vector_write_handler h) {
|
||||
m_vector_write_handler = h;
|
||||
}
|
||||
|
||||
/// Sets the shutdown handler
|
||||
/**
|
||||
* The shutdown handler is called when the iostream transport receives a
|
||||
@@ -449,7 +484,7 @@ protected:
|
||||
* @param len number of bytes to write
|
||||
* @param handler Callback to invoke with operation status.
|
||||
*/
|
||||
void async_write(char const * buf, size_t len, transport::write_handler
|
||||
void async_write(char const * buf, size_t len, transport::write_handler
|
||||
handler)
|
||||
{
|
||||
m_alog.write(log::alevel::devel,"iostream_con async_write");
|
||||
@@ -459,7 +494,7 @@ protected:
|
||||
|
||||
if (m_output_stream) {
|
||||
m_output_stream->write(buf,len);
|
||||
|
||||
|
||||
if (m_output_stream->bad()) {
|
||||
ec = make_error_code(error::bad_stream);
|
||||
}
|
||||
@@ -507,17 +542,19 @@ protected:
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if (m_vector_write_handler) {
|
||||
ec = m_vector_write_handler(m_connection_hdl, bufs);
|
||||
} else if (m_write_handler) {
|
||||
std::vector<buffer>::const_iterator it;
|
||||
for (it = bufs.begin(); it != bufs.end(); it++) {
|
||||
ec = m_write_handler(m_connection_hdl, (*it).buf, (*it).len);
|
||||
if (ec) {break;}
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
ec = make_error_code(error::output_stream_required);
|
||||
}
|
||||
|
||||
|
||||
handler(ec);
|
||||
}
|
||||
|
||||
@@ -555,11 +592,11 @@ protected:
|
||||
*/
|
||||
void async_shutdown(transport::shutdown_handler handler) {
|
||||
lib::error_code ec;
|
||||
|
||||
|
||||
if (m_shutdown_handler) {
|
||||
ec = m_shutdown_handler(m_connection_hdl);
|
||||
}
|
||||
|
||||
|
||||
handler(ec);
|
||||
}
|
||||
private:
|
||||
@@ -651,6 +688,7 @@ private:
|
||||
std::ostream * m_output_stream;
|
||||
connection_hdl m_connection_hdl;
|
||||
write_handler m_write_handler;
|
||||
vector_write_handler m_vector_write_handler;
|
||||
shutdown_handler m_shutdown_handler;
|
||||
|
||||
bool m_reading;
|
||||
|
||||
Reference in New Issue
Block a user