Compare commits

...

18 Commits

Author SHA1 Message Date
Peter Thorson f6fd37ae05 reset some defaults 2013-11-01 16:39:52 -05:00
Peter Thorson 2f2213f562 increase listen backlog 2013-11-01 16:11:52 -05:00
Peter Thorson 9d51669fb8 adds port and num_threads command line parameters 2013-11-01 09:09:29 -05:00
Peter Thorson 01969452fd switches testee_server back to non-TLS 2013-11-01 09:09:29 -05:00
Peter Thorson c0a28e27aa separates read and write handler allocators 2013-10-31 18:53:06 -05:00
Peter Thorson a48fc73edb splits tcp init into pre and post init 2013-10-24 07:30:55 -05:00
Peter Thorson 531a98fcad update connection interface to message creation 2013-10-24 07:30:28 -05:00
Peter Thorson fc0a8755ad fixed message reset upon recycling 2013-10-24 07:30:14 -05:00
Peter Thorson b783abe6c6 add message reset method 2013-10-24 07:29:52 -05:00
Peter Thorson 0e00cdcfee add ability for fixed con_msg_manager to also be an endpoint_msg_manager 2013-10-24 07:29:42 -05:00
Peter Thorson 382b4433cf add write handler hook 2013-10-24 07:29:12 -05:00
Peter Thorson 2ceac23e60 add additional statistics to utility client 2013-10-24 07:26:29 -05:00
Peter Thorson 7c6b781433 add server.pem to root directory for convenience
add some initial tls support to the testee server
2013-10-24 07:25:38 -05:00
Peter Thorson 3597ab0a26 switch testee server to fixed buffer policy 2013-10-21 08:49:51 -05:00
Peter Thorson 6f46a2c2da update message buffer policies for incoming/outgoing distinction 2013-10-21 08:49:39 -05:00
Peter Thorson 1b85ae7a9d update library to reflect incoming/outgoing message distinction 2013-10-21 08:49:22 -05:00
Peter Thorson 195a29dc46 update ''alloc"strategy and library to use new message buffer api 2013-10-21 07:10:08 -05:00
Peter Thorson 2ff367514f add new 'fixed' message buffer strategy 2013-10-21 07:09:41 -05:00
12 changed files with 745 additions and 65 deletions
+3 -2
View File
@@ -6,6 +6,7 @@ Import('env_cpp11')
Import('boostlibs')
Import('platform_libs')
Import('polyfill_libs')
Import('tls_libs')
env = env.Clone ()
env_cpp11 = env_cpp11.Clone ()
@@ -14,10 +15,10 @@ prgs = []
# if a C++11 environment is available build using that, otherwise use boost
if env_cpp11.has_key('WSPP_CPP11_ENABLED'):
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs]
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs] + [tls_libs]
prgs += env_cpp11.Program('testee_server', ["testee_server.cpp"], LIBS = ALL_LIBS)
else:
ALL_LIBS = boostlibs(['system'],env) + [platform_libs] + [polyfill_libs]
ALL_LIBS = boostlibs(['system'],env) + [platform_libs] + [polyfill_libs] + [tls_libs]
prgs += env.Program('testee_server', ["testee_server.cpp"], LIBS = ALL_LIBS)
Return('prgs')
+63 -15
View File
@@ -25,10 +25,37 @@
*
*/
#include <websocketpp/config/asio_no_tls.hpp>
//#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/config/asio.hpp>
#include <websocketpp/server.hpp>
#include <websocketpp/message_buffer/fixed.hpp>
#include <iostream>
typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr;
std::string get_password() {
return "test";
}
context_ptr on_tls_init(websocketpp::connection_hdl hdl) {
//std::cout << "on_tls_init called with hdl: " << hdl.lock().get() << std::endl;
context_ptr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
try {
ctx->set_options(boost::asio::ssl::context::default_workarounds |
boost::asio::ssl::context::no_sslv2 |
boost::asio::ssl::context::single_dh_use);
ctx->set_password_callback(bind(&get_password));
ctx->use_certificate_chain_file("server.pem");
ctx->use_private_key_file("server.pem", boost::asio::ssl::context::pem);
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
}
return ctx;
}
struct testee_config : public websocketpp::config::asio {
// pull default settings from our core config
typedef websocketpp::config::asio core;
@@ -39,12 +66,18 @@ struct testee_config : public websocketpp::config::asio {
typedef core::message_type message_type;
typedef core::con_msg_manager_type con_msg_manager_type;
typedef core::endpoint_msg_manager_type endpoint_msg_manager_type;
//typedef websocketpp::message_buffer::fixed::policy::message message_type;
//typedef websocketpp::message_buffer::fixed::policy::con_msg_manager con_msg_manager_type;
////typedef websocketpp::message_buffer::fixed::policy::con_msg_manager endpoint_msg_manager_type;
//typedef websocketpp::message_buffer::fixed::policy::endpoint_msg_manager endpoint_msg_manager_type;
typedef core::alog_type alog_type;
typedef core::elog_type elog_type;
typedef core::rng_type rng_type;
typedef core::endpoint_base endpoint_base;
static bool const enable_multithreading = false;
static bool const enable_multithreading = true;
struct transport_config : public core::transport_config {
typedef core::concurrency_type concurrency_type;
@@ -53,7 +86,7 @@ struct testee_config : public websocketpp::config::asio {
typedef core::request_type request_type;
typedef core::response_type response_type;
static bool const enable_multithreading = false;
static bool const enable_multithreading = true;
};
typedef websocketpp::transport::asio::endpoint<transport_config>
@@ -79,36 +112,51 @@ void on_message(server* s, websocketpp::connection_hdl hdl, message_ptr msg) {
s->send(hdl, msg->get_payload(), msg->get_opcode());
}
int main() {
int main(int argc, char * argv[]) {
// Create a server endpoint
server testee_server;
short port = 9002;
size_t num_threads = 1;
if (argc == 3) {
port = atoi(argv[1]);
num_threads = atoi(argv[2]);
}
try {
// Total silence
testee_server.clear_access_channels(websocketpp::log::alevel::all);
testee_server.clear_error_channels(websocketpp::log::alevel::all);
//testee_server.clear_access_channels(websocketpp::log::alevel::all);
//testee_server.clear_error_channels(websocketpp::log::alevel::all);
// Initialize ASIO
testee_server.init_asio();
// Register our message handler
testee_server.set_message_handler(bind(&on_message,&testee_server,::_1,::_2));
//testee_server.set_tls_init_handler(bind(&on_tls_init,::_1));
// Listen on port 9002
testee_server.listen(9002);
// Listen on port
testee_server.listen(port);
// Start the server accept loop
testee_server.start_accept();
// Start the ASIO io_service run loop
testee_server.run();
/*websocketpp::lib::thread t1(&server::run, &testee_server);
websocketpp::lib::thread t2(&server::run, &testee_server);
t1.join();
t2.join();*/
if (num_threads == 1) {
testee_server.run();
} else {
typedef websocketpp::lib::shared_ptr<websocketpp::lib::thread> thread_ptr;
std::vector<thread_ptr> ts;
for (size_t i = 0; i < num_threads; i++) {
ts.push_back(thread_ptr(new websocketpp::lib::thread(&server::run, &testee_server)));
}
for (size_t i = 0; i < num_threads; i++) {
ts[i]->join();
}
}
} catch (const std::exception & e) {
std::cout << "exception: " << e.what() << std::endl;
} catch (websocketpp::lib::error_code e) {
+49 -9
View File
@@ -29,15 +29,18 @@ public:
typedef std::chrono::duration<int,std::micro> dur_type;
perftest () {
m_endpoint.set_access_channels(websocketpp::log::alevel::all);
m_endpoint.set_error_channels(websocketpp::log::elevel::all);
m_endpoint.set_access_channels(websocketpp::log::alevel::none);
m_endpoint.set_error_channels(websocketpp::log::elevel::none);
// Initialize ASIO
m_endpoint.init_asio();
// Register our handlers
m_endpoint.set_socket_init_handler(bind(&type::on_socket_init,this,::_1));
m_endpoint.set_tls_init_handler(bind(&type::on_tls_init,this,::_1));
m_endpoint.set_tcp_pre_init_handler(bind(&type::on_tcp_pre_init,this,::_1));
m_endpoint.set_tcp_post_init_handler(bind(&type::on_tcp_post_init,this,::_1));
m_endpoint.set_socket_init_handler(bind(&type::on_socket_init,this,::_1));
m_endpoint.set_message_handler(bind(&type::on_message,this,::_1,::_2));
m_endpoint.set_open_handler(bind(&type::on_open,this,::_1));
m_endpoint.set_close_handler(bind(&type::on_close,this,::_1));
@@ -60,12 +63,17 @@ public:
m_endpoint.run();
}
void on_tcp_pre_init(websocketpp::connection_hdl hdl) {
m_tcp_pre_init = std::chrono::high_resolution_clock::now();
}
void on_tcp_post_init(websocketpp::connection_hdl hdl) {
m_tcp_post_init = std::chrono::high_resolution_clock::now();
}
void on_socket_init(websocketpp::connection_hdl hdl) {
m_socket_init = std::chrono::high_resolution_clock::now();
}
context_ptr on_tls_init(websocketpp::connection_hdl hdl) {
m_tls_init = std::chrono::high_resolution_clock::now();
context_ptr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
try {
@@ -80,27 +88,59 @@ public:
void on_open(websocketpp::connection_hdl hdl) {
m_open = std::chrono::high_resolution_clock::now();
m_endpoint.send(hdl, "", websocketpp::frame::opcode::text);
client::connection_ptr con = m_endpoint.get_con_from_hdl(hdl);
m_msg = con->get_message(websocketpp::frame::opcode::text,64);
m_msg->append_payload(std::string(60,'*'));
m_msg_count = 1;
//m_message_stamps.reserve(1000);
m_con_start = std::chrono::high_resolution_clock::now();
m_endpoint.send(hdl, m_msg);
//m_endpoint.send(hdl, "", websocketpp::frame::opcode::text);
}
void on_message(websocketpp::connection_hdl hdl, message_ptr msg) {
m_message = std::chrono::high_resolution_clock::now();
m_endpoint.close(hdl,websocketpp::close::status::going_away,"");
if (m_msg_count == 1000) {
m_message = std::chrono::high_resolution_clock::now();
m_endpoint.close(hdl,websocketpp::close::status::going_away,"");
} else {
m_msg_count++;
m_endpoint.send(hdl, m_msg);
}
}
void on_close(websocketpp::connection_hdl hdl) {
m_close = std::chrono::high_resolution_clock::now();
std::cout << "Socket Init: " << std::chrono::duration_cast<dur_type>(m_socket_init-m_start).count() << std::endl;
std::cout << "TLS Init: " << std::chrono::duration_cast<dur_type>(m_tls_init-m_start).count() << std::endl;
std::cout << "TCP Pre Init: " << std::chrono::duration_cast<dur_type>(m_tcp_pre_init-m_start).count() << std::endl;
std::cout << "TCP Post Init: " << std::chrono::duration_cast<dur_type>(m_tcp_post_init-m_start).count() << std::endl;
std::cout << "Open: " << std::chrono::duration_cast<dur_type>(m_open-m_start).count() << std::endl;
std::cout << "Start: " << std::chrono::duration_cast<dur_type>(m_con_start-m_start).count() << std::endl;
std::cout << "Message: " << std::chrono::duration_cast<dur_type>(m_message-m_start).count() << std::endl;
std::cout << "Close: " << std::chrono::duration_cast<dur_type>(m_close-m_start).count() << std::endl;
std::cout << std::endl;
std::cout << "Message: " << std::chrono::duration_cast<dur_type>(m_message-m_con_start).count() << std::endl;
std::cout << "Close: " << std::chrono::duration_cast<dur_type>(m_close-m_message).count() << std::endl;
}
private:
client m_endpoint;
client::message_ptr m_msg;
size_t m_msg_count;
std::chrono::high_resolution_clock::time_point m_start;
std::chrono::high_resolution_clock::time_point m_tcp_pre_init;
std::chrono::high_resolution_clock::time_point m_tcp_post_init;
std::chrono::high_resolution_clock::time_point m_socket_init;
std::chrono::high_resolution_clock::time_point m_tls_init;
std::vector<std::chrono::high_resolution_clock::time_point> m_message_stamps;
std::chrono::high_resolution_clock::time_point m_con_start;
std::chrono::high_resolution_clock::time_point m_open;
std::chrono::high_resolution_clock::time_point m_message;
std::chrono::high_resolution_clock::time_point m_close;
+1 -1
View File
@@ -998,7 +998,7 @@ public:
message_ptr get_message(websocketpp::frame::opcode::value op, size_t size)
const
{
return m_msg_manager->get_message(op, size);
return m_msg_manager->get_outgoing_message(op, size);
}
void start();
+23 -15
View File
@@ -75,17 +75,17 @@ template <typename config>
lib::error_code connection<config>::send(const std::string& payload,
frame::opcode::value op)
{
message_ptr msg = m_msg_manager->get_message(op,payload.size());
message_ptr msg = m_msg_manager->get_outgoing_message(op,payload.size());
msg->append_payload(payload);
return send(msg);
}
template <typename config>
lib::error_code connection<config>::send(const void* payload, size_t len,
lib::error_code connection<config>::send(void const * payload, size_t len,
frame::opcode::value op)
{
message_ptr msg = m_msg_manager->get_message(op,len);
message_ptr msg = m_msg_manager->get_outgoing_message(op,len);
msg->append_payload(payload,len);
return send(msg);
@@ -111,7 +111,7 @@ lib::error_code connection<config>::send(typename config::message_type::ptr msg)
write_push(outgoing_msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
} else {
outgoing_msg = m_msg_manager->get_message();
outgoing_msg = m_msg_manager->get_outgoing_message(msg->get_opcode(),0);
if (!outgoing_msg) {
return error::make_error_code(error::no_outgoing_buffers);
@@ -139,7 +139,8 @@ lib::error_code connection<config>::send(typename config::message_type::ptr msg)
}
template <typename config>
void connection<config>::ping(const std::string& payload, lib::error_code& ec) {
void connection<config>::ping(std::string const & payload, lib::error_code & ec)
{
m_alog.write(log::alevel::devel,"connection ping");
if (m_state != session::state::open) {
@@ -147,7 +148,7 @@ void connection<config>::ping(const std::string& payload, lib::error_code& ec) {
return;
}
message_ptr msg = m_msg_manager->get_message();
message_ptr msg = m_msg_manager->get_outgoing_message(frame::opcode::ping, payload.size());
if (!msg) {
ec = error::make_error_code(error::no_outgoing_buffers);
return;
@@ -207,8 +208,8 @@ void connection<config>::ping(const std::string& payload) {
}
template<typename config>
void connection<config>::handle_pong_timeout(std::string payload, const lib::error_code &
ec)
void connection<config>::handle_pong_timeout(std::string payload,
lib::error_code const & ec)
{
if (ec) {
if (ec == transport::error::operation_aborted) {
@@ -226,7 +227,8 @@ void connection<config>::handle_pong_timeout(std::string payload, const lib::err
}
template <typename config>
void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
void connection<config>::pong(std::string const & payload, lib::error_code & ec)
{
m_alog.write(log::alevel::devel,"connection pong");
if (m_state != session::state::open) {
@@ -234,7 +236,7 @@ void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
return;
}
message_ptr msg = m_msg_manager->get_message();
message_ptr msg = m_msg_manager->get_outgoing_message(frame::opcode::pong, payload.size());
if (!msg) {
ec = error::make_error_code(error::no_outgoing_buffers);
return;
@@ -261,7 +263,7 @@ void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
}
template<typename config>
void connection<config>::pong(const std::string& payload) {
void connection<config>::pong(std::string const & payload) {
lib::error_code ec;
pong(payload,ec);
if (ec) {
@@ -270,8 +272,8 @@ void connection<config>::pong(const std::string& payload) {
}
template <typename config>
void connection<config>::close(const close::status::value code,
const std::string & reason, lib::error_code & ec)
void connection<config>::close(close::status::value const code,
std::string const & reason, lib::error_code & ec)
{
m_alog.write(log::alevel::devel,"connection close");
@@ -288,7 +290,7 @@ void connection<config>::close(const close::status::value code,
}
template<typename config>
void connection<config>::close(const close::status::value code,
void connection<config>::close(close::status::value const code,
const std::string & reason)
{
lib::error_code ec;
@@ -914,6 +916,9 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
message_ptr msg = m_processor->get_message();
// TODO: do we need to catch exceptions here and call the message
// handler hook. WebSocket++ doesn't throw any exceptions here, is
// there anything that could be thrown that could be recovered from?
if (!msg) {
m_alog.write(log::alevel::devel,
"null message from m_processor");
@@ -925,8 +930,10 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
} else if (m_message_handler) {
m_message_handler(m_connection_hdl, msg);
}
m_msg_manager->message_handler_hook(msg);
} else {
process_control_frame(msg);
m_msg_manager->message_handler_hook(msg);
}
}
}
@@ -1587,6 +1594,7 @@ void connection<config>::handle_write_frame(lib::error_code const & ec)
bool terminate = m_current_msg->get_terminal();
m_send_buffer.clear();
m_msg_manager->write_handler_hook(m_current_msg);
m_current_msg.reset();
if (ec) {
@@ -1839,7 +1847,7 @@ lib::error_code connection<config>::send_close_frame(close::status::value code,
<< m_local_close_reason;
m_alog.write(log::alevel::devel,s.str());
message_ptr msg = m_msg_manager->get_message();
message_ptr msg = m_msg_manager->get_outgoing_message(frame::opcode::close,m_local_close_reason.size());
if (!msg) {
return error::make_error_code(error::no_outgoing_buffers);
}
+24
View File
@@ -67,6 +67,30 @@ public:
return message_ptr(new message(type::shared_from_this(),op,size));
}
/*message_ptr get_incoming_message() {
return get_message();
}
message_ptr get_outgoing_message() {
return get_message();
}*/
message_ptr get_incoming_message(frame::opcode::value op, size_t size) {
return get_message(op,size);
}
message_ptr get_outgoing_message(frame::opcode::value op, size_t size) {
return get_message(op,size);
}
void message_handler_hook(message_ptr msg) {
// nothing to do here
}
void write_handler_hook(message_ptr msg) {
// nothing to do here
}
/// Recycle a message
/**
* This method shouldn't be called. If it is, return false to indicate an
+160
View File
@@ -0,0 +1,160 @@
/*
* Copyright (c) 2013, Peter Thorson. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the WebSocket++ Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef WEBSOCKETPP_MESSAGE_BUFFER_FIXED_HPP
#define WEBSOCKETPP_MESSAGE_BUFFER_FIXED_HPP
#include <websocketpp/message_buffer/fixed_message.hpp>
#include <websocketpp/common/memory.hpp>
#include <websocketpp/frame.hpp>
namespace websocketpp {
namespace message_buffer {
namespace fixed {
class con_msg_manager;
class endpoint_msg_manager;
struct policy {
typedef websocketpp::message_buffer::fixed_message message;
typedef websocketpp::message_buffer::fixed::con_msg_manager con_msg_manager;
typedef websocketpp::message_buffer::fixed::endpoint_msg_manager endpoint_msg_manager;
};
/// A connection message manager that allocates a new message for each
/// request.
class con_msg_manager : public lib::enable_shared_from_this<con_msg_manager> {
public:
typedef con_msg_manager type;
typedef std::shared_ptr<type> ptr;
typedef policy::message message;
typedef policy::message::ptr message_ptr;
con_msg_manager()
: m_incoming_message(new message())
, m_incoming_message_busy(false)
, m_outgoing_message(new message())
, m_outgoing_message_busy(false) {}
/// Get an empty message buffer
/**
* @return A shared pointer to an empty new message
*/
message_ptr get_message() {
return message_ptr(new message());
}
message_ptr get_incoming_message(frame::opcode::value op, size_t size) {
if (frame::opcode::is_control(op)) {
return message_ptr(new message(op, size));
} else {
if (m_incoming_message_busy) {
return message_ptr();
} else {
m_incoming_message->set_opcode(op);
m_incoming_message->reserve(size);
return m_incoming_message;
}
}
}
message_ptr get_outgoing_message(frame::opcode::value op, size_t size) {
if (frame::opcode::is_control(op)) {
return message_ptr(new message(op, size));
} else {
if (m_outgoing_message_busy) {
return message_ptr();
} else {
m_outgoing_message->set_opcode(op);
m_outgoing_message->reserve(size);
return m_outgoing_message;
}
}
}
void message_handler_hook(message_ptr msg) {
recycle(msg);
}
void write_handler_hook(message_ptr msg) {
// TODO: we shouldn't recycle a message every time it is written. If a
// message is queued for writing multiple times... Needs a reference
// count in the message to deal with this.
recycle(msg);
}
bool recycle(message_ptr msg) {
if (msg == m_incoming_message) {
m_incoming_message_busy = false;
msg->reset();
return true;
} else if (msg == m_outgoing_message) {
m_outgoing_message_busy = false;
msg->reset();
return true;
} else {
// not a message we are managing, ignore
return false;
}
}
/// Get a pointer to a connection message manager
/**
* @return A pointer to the requested connection message manager.
*/
ptr get_manager() {
return shared_from_this();
}
private:
message_ptr m_incoming_message;
bool m_incoming_message_busy;
message_ptr m_outgoing_message;
bool m_outgoing_message_busy;
};
/// An endpoint message manager that allocates a new manager for each
/// connection.
class endpoint_msg_manager {
public:
typedef policy::con_msg_manager::ptr con_msg_man_ptr;
/// Get a pointer to a connection message manager
/**
* @return A pointer to the requested connection message manager.
*/
con_msg_man_ptr get_manager() const {
return con_msg_man_ptr(new con_msg_manager());
}
};
} // namespace fixed
} // namespace message_buffer
} // namespace websocketpp
#endif // WEBSOCKETPP_MESSAGE_BUFFER_FIXED_HPP
@@ -0,0 +1,321 @@
/*
* Copyright (c) 2013, Peter Thorson. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the WebSocket++ Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef WEBSOCKETPP_MESSAGE_BUFFER_FIXED_MESSAGE_HPP
#define WEBSOCKETPP_MESSAGE_BUFFER_FIXED_MESSAGE_HPP
#include <websocketpp/common/memory.hpp>
#include <websocketpp/frame.hpp>
#include <string>
namespace websocketpp {
namespace message_buffer {
/* # message:
* object that stores a message while it is being sent or received. Contains
* the message payload itself, the message header, the extension data, and the
* opcode.
*
* # connection_message_manager:
* An object that manages all of the message_buffers associated with a given
* connection. Impliments the get_message_buffer(size) method that returns
* a message buffer at least size bytes long.
*
* Message buffers are reference counted with shared ownership semantics. Once
* requested from the manager the requester and it's associated downstream code
* may keep a pointer to the message indefinitely at a cost of extra resource
* usage. Once the reference count drops to the point where the manager is the
* only reference the messages is recycled using whatever method is implemented
* in the manager.
*
* # endpoint_message_manager:
* An object that manages connection_message_managers. Impliments the
* get_message_manager() method. This is used once by each connection to
* request the message manager that they are supposed to use to manage message
* buffers for their own use.
*
* TYPES OF CONNECTION_MESSAGE_MANAGERS
* - allocate a message with the exact size every time one is requested
* - maintain a pool of pre-allocated messages and return one when needed.
* Recycle previously used messages back into the pool
*
* TYPES OF ENDPOINT_MESSAGE_MANAGERS
* - allocate a new connection manager for each connection. Message pools
* become connection specific. This increases memory usage but improves
* concurrency.
* - allocate a single connection manager and share a pointer to it with all
* connections created by this endpoint. The message pool will be shared
* among all connections, improving memory usage and performance at the cost
* of reduced concurrency
*/
/// Represents a buffer for a single WebSocket message.
/**
*
*
*/
class fixed_message {
public:
typedef lib::shared_ptr<fixed_message> ptr;
/// Construct an empty message
/**
* Construct an empty message
*/
fixed_message()
: m_prepared(false)
, m_fin(true)
, m_terminal(false)
, m_compressed(false) {}
/// Construct a message and fill in some values
/**
*
*/
fixed_message(frame::opcode::value op, size_t size = 128)
: m_opcode(op)
, m_prepared(false)
, m_fin(true)
, m_terminal(false)
, m_compressed(false)
{
m_payload.reserve(size);
}
void reset() {
m_payload.clear();
m_prepared = false;
m_fin = true;
m_terminal = false;
m_compressed = false;
}
/// Return whether or not the message has been prepared for sending
/**
* The prepared flag indicates that the message has been prepared by a
* websocket protocol processor and is ready to be written to the wire.
*
* @return whether or not the message has been prepared for sending
*/
bool get_prepared() const {
return m_prepared;
}
/// Set or clear the flag that indicates that the message has been prepared
/**
* This flag should not be set by end user code without a very good reason.
*
* @param value The value to set the prepared flag to
*/
void set_prepared(bool value) {
m_prepared = value;
}
/// Return whether or not the message is flagged as compressed
/**
* @return whether or not the message is/should be compressed
*/
bool get_compressed() const {
return m_compressed;
}
/// Set or clear the compression flag
/**
* The compression flag is used to indicate whether or not the message is
* or should be compressed. Compression is not guaranteed. Both endpoints
* must support a compression extension and the connection must have had
* that extension negotiated in its handshake.
*
* @param value The value to set the compressed flag to
*/
void set_compressed(bool value) {
m_compressed = value;
}
/// Get whether or not the message is terminal
/**
* Messages can be flagged as terminal, which results in the connection
* being close after they are written rather than the implementation going
* on to the next message in the queue. This is typically used internally
* for close messages only.
*
* @return Whether or not this message is marked terminal
*/
bool get_terminal() const {
return m_terminal;
}
/// Set the terminal flag
/**
* This flag should not be set by end user code without a very good reason.
*
* @see get_terminal()
*
* @param value The value to set the terminal flag to.
*/
void set_terminal(bool value) {
m_terminal = value;
}
/// Read the fin bit
/**
* A message with the fin bit set will be sent as the last message of its
* sequence. A message with the fin bit cleared will require subsequent
* frames of opcode continuation until one of them has the fin bit set.
*
* The remote end likely will not deliver any bytes until the frame with the fin
* bit set has been received.
*
* @return Whether or not the fin bit is set
*/
bool get_fin() const {
return m_fin;
}
/// Set the fin bit
/**
* @see get_fin for a more detailed explaination of the fin bit
*
* @param value The value to set the fin bit to.
*/
void set_fin(bool value) {
m_fin = value;
}
/// Return the message opcode
frame::opcode::value get_opcode() const {
return m_opcode;
}
/// Set the opcode
void set_opcode(frame::opcode::value op) {
m_opcode = op;
}
/// Return the prepared frame header
/**
* This value is typically set by a websocket protocol processor
* and shouldn't be tampered with.
*/
std::string const & get_header() const {
return m_header;
}
/// Set prepared frame header
/**
* Under normal circumstances this should not be called by end users
*
* @param header A string to set the header to.
*/
void set_header(std::string const & header) {
m_header = header;
}
std::string const & get_extension_data() const {
return m_extension_data;
}
/// Get a reference to the payload string
/**
* @return A const reference to the message's payload string
*/
std::string const & get_payload() const {
return m_payload;
}
/// Get a non-const reference to the payload string
/**
* @return A reference to the message's payload string
*/
std::string & get_raw_payload() {
return m_payload;
}
/// Set payload data
/**
* Set the message buffer's payload to the given value.
*
* @param payload A string to set the payload to.
*/
void set_payload(std::string const & payload) {
m_payload = payload;
}
/// Set payload data
/**
* Set the message buffer's payload to the given value.
*
* @param payload A pointer to a data array to set to.
* @param len The length of new payload in bytes.
*/
void set_payload(void const * payload, size_t len) {
m_payload.reserve(len);
char const * pl = static_cast<char const *>(payload);
m_payload.assign(pl, pl + len);
}
/// Append payload data
/**
* Append data to the message buffer's payload.
*
* @param payload A string containing the data array to append.
*/
void append_payload(std::string const & payload) {
m_payload.append(payload);
}
/// Append payload data
/**
* Append data to the message buffer's payload.
*
* @param payload A pointer to a data array to append
* @param len The length of payload in bytes
*/
void append_payload(void const * payload, size_t len) {
m_payload.reserve(m_payload.size()+len);
m_payload.append(static_cast<char const *>(payload),len);
}
void reserve(size_t size) {
m_payload.reserve(size);
}
private:
std::string m_header;
std::string m_extension_data;
std::string m_payload;
frame::opcode::value m_opcode;
bool m_prepared;
bool m_fin;
bool m_terminal;
bool m_compressed;
};
} // namespace message_buffer
} // namespace websocketpp
#endif // WEBSOCKETPP_MESSAGE_BUFFER_FIXED_MESSAGE_HPP
+1 -1
View File
@@ -217,7 +217,7 @@ public:
if (m_state == HEADER) {
if (buf[p] == msg_hdr) {
p++;
m_msg_ptr = m_msg_manager->get_message(frame::opcode::text,1);
m_msg_ptr = m_msg_manager->get_incoming_message(frame::opcode::text,1);
if (!m_msg_ptr) {
ec = make_error_code(websocketpp::error::no_incoming_buffers);
+2 -2
View File
@@ -362,7 +362,7 @@ public:
if (frame::opcode::is_control(op)) {
m_control_msg = msg_metadata(
m_msg_manager->get_message(op,m_bytes_needed),
m_msg_manager->get_incoming_message(op,m_bytes_needed),
frame::get_masking_key(m_basic_header,m_extended_header)
);
@@ -370,7 +370,7 @@ public:
} else {
if (!m_data_msg.msg_ptr) {
m_data_msg = msg_metadata(
m_msg_manager->get_message(op,m_bytes_needed),
m_msg_manager->get_incoming_message(op,m_bytes_needed),
frame::get_masking_key(m_basic_header,m_extended_header)
);
} else {
+50 -12
View File
@@ -109,15 +109,47 @@ public:
return socket_con_type::is_secure();
}
/// Sets the tcp init handler
/// Sets the tcp pre init handler
/**
* The tcp init handler is called after the tcp connection has been
* established.
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @param h The handler to call on tcp init.
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_pre_init_handler(tcp_init_handler h) {
m_tcp_pre_init_handler = h;
}
/// Sets the tcp pre init handler (deprecated)
/**
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @deprecated Use set_tcp_pre_init_handler instead
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_init_handler(tcp_init_handler h) {
m_tcp_init_handler = h;
set_tcp_pre_init_handler(h);
}
/// Sets the tcp post init handler
/**
* The tcp post init handler is called after the tcp connection has been
* established and all additional wrappers (proxy connects, TLS handshakes,
* etc have been performed. This is fired before any bytes are read or any
* WebSocket specific handshake logic has been performed.
*
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp post init.
*/
void set_tcp_post_init_handler(tcp_init_handler h) {
m_tcp_post_init_handler = h;
}
/// Set the proxy to connect through (exception free)
@@ -419,8 +451,8 @@ protected:
m_alog.write(log::alevel::devel,"asio connection handle pre_init");
}
if (m_tcp_init_handler) {
m_tcp_init_handler(m_connection_hdl);
if (m_tcp_pre_init_handler) {
m_tcp_pre_init_handler(m_connection_hdl);
}
if (ec) {
@@ -507,6 +539,10 @@ protected:
m_alog.write(log::alevel::devel,"asio connection handle_post_init");
}
if (m_tcp_post_init_handler) {
m_tcp_post_init_handler(m_connection_hdl);
}
callback(ec);
}
@@ -761,7 +797,7 @@ protected:
lib::placeholders::_1,
lib::placeholders::_2
))*/
make_custom_alloc_handler(m_handler_allocator,m_async_read_handler)
make_custom_alloc_handler(m_read_handler_allocator,m_async_read_handler)
);
}
@@ -801,7 +837,7 @@ protected:
handler,
lib::placeholders::_1
))*/
make_custom_alloc_handler(m_handler_allocator,m_async_write_handler)
make_custom_alloc_handler(m_write_handler_allocator,m_async_write_handler)
);
}
@@ -823,7 +859,7 @@ protected:
handler,
lib::placeholders::_1
))*/
make_custom_alloc_handler(m_handler_allocator,m_async_write_handler)
make_custom_alloc_handler(m_write_handler_allocator,m_async_write_handler)
);
}
@@ -998,9 +1034,11 @@ private:
std::vector<boost::asio::const_buffer> m_bufs;
// Handlers
tcp_init_handler m_tcp_init_handler;
tcp_init_handler m_tcp_pre_init_handler;
tcp_init_handler m_tcp_post_init_handler;
handler_allocator m_handler_allocator;
handler_allocator m_read_handler_allocator;
handler_allocator m_write_handler_allocator;
read_handler m_read_handler;
write_handler m_write_handler;
+48 -8
View File
@@ -231,18 +231,56 @@ public:
return *m_io_service;
}
/// Sets the tcp init handler
/// Sets the default tcp pre init handler
/**
* The tcp init handler is called after the tcp connection has been
* established.
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @see WebSocket++ handler documentation for more information about
* handlers.
*
* @param h The handler to call on tcp init.
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_pre_init_handler(tcp_init_handler h) {
m_tcp_pre_init_handler = h;
}
/// Sets the default tcp pre init handler (deprecated)
/**
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @see WebSocket++ handler documentation for more information about
* handlers.
*
* @deprecated Use set_tcp_pre_init_handler instead
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_init_handler(tcp_init_handler h) {
m_tcp_init_handler = h;
set_tcp_pre_init_handler(h);
}
/// Sets the default tcp pre init handler
/**
* The tcp post init handler is called after the tcp connection has been
* established and all additional wrappers (proxy connects, TLS handshakes,
* etc have been performed. This is fired before any bytes are read or any
* WebSocket specific handshake logic has been performed.
*
* @see WebSocket++ handler documentation for more information about
* handlers.
*
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_post_init_handler(tcp_init_handler h) {
m_tcp_post_init_handler = h;
}
/// Set up endpoint for listening manually (exception free)
@@ -268,7 +306,7 @@ public:
m_acceptor->open(ep.protocol());
m_acceptor->set_option(boost::asio::socket_base::reuse_address(true));
m_acceptor->bind(ep);
m_acceptor->listen();
m_acceptor->listen(8192); // this should be a settable parameter
m_state = LISTENING;
ec = lib::error_code();
}
@@ -899,7 +937,8 @@ protected:
ec = tcon->init_asio(m_io_service);
if (ec) {return ec;}
tcon->set_tcp_init_handler(m_tcp_init_handler);
tcon->set_tcp_pre_init_handler(m_tcp_pre_init_handler);
tcon->set_tcp_post_init_handler(m_tcp_post_init_handler);
return lib::error_code();
}
@@ -919,7 +958,8 @@ private:
};
// Handlers
tcp_init_handler m_tcp_init_handler;
tcp_init_handler m_tcp_pre_init_handler;
tcp_init_handler m_tcp_post_init_handler;
// Network Resources
io_service_ptr m_io_service;