Compare commits

..

42 Commits

Author SHA1 Message Date
Peter Thorson f6fd37ae05 reset some defaults 2013-11-01 16:39:52 -05:00
Peter Thorson 2f2213f562 increase listen backlog 2013-11-01 16:11:52 -05:00
Peter Thorson 9d51669fb8 adds port and num_threads command line parameters 2013-11-01 09:09:29 -05:00
Peter Thorson 01969452fd switches testee_server back to non-TLS 2013-11-01 09:09:29 -05:00
Peter Thorson c0a28e27aa separates read and write handler allocators 2013-10-31 18:53:06 -05:00
Peter Thorson a48fc73edb splits tcp init into pre and post init 2013-10-24 07:30:55 -05:00
Peter Thorson 531a98fcad update connection interface to message creation 2013-10-24 07:30:28 -05:00
Peter Thorson fc0a8755ad fixed message reset upon recycling 2013-10-24 07:30:14 -05:00
Peter Thorson b783abe6c6 add message reset method 2013-10-24 07:29:52 -05:00
Peter Thorson 0e00cdcfee add ability for fixed con_msg_manager to also be an endpoint_msg_manager 2013-10-24 07:29:42 -05:00
Peter Thorson 382b4433cf add write handler hook 2013-10-24 07:29:12 -05:00
Peter Thorson 2ceac23e60 add additional statistics to utility client 2013-10-24 07:26:29 -05:00
Peter Thorson 7c6b781433 add server.pem to root directory for convenience
add some initial tls support to the testee server
2013-10-24 07:25:38 -05:00
Peter Thorson 3597ab0a26 switch testee server to fixed buffer policy 2013-10-21 08:49:51 -05:00
Peter Thorson 6f46a2c2da update message buffer policies for incoming/outgoing distinction 2013-10-21 08:49:39 -05:00
Peter Thorson 1b85ae7a9d update library to reflect incoming/outgoing message distinction 2013-10-21 08:49:22 -05:00
Peter Thorson 195a29dc46 update ''alloc"strategy and library to use new message buffer api 2013-10-21 07:10:08 -05:00
Peter Thorson 2ff367514f add new 'fixed' message buffer strategy 2013-10-21 07:09:41 -05:00
Peter Thorson 00b5bed0d1 moves base64 code into websocketpp namespace 2013-10-20 15:02:51 -05:00
Peter Thorson 0f6e93a13e removes original copies of header-converted libraries to avoid confusion 2013-10-20 15:02:29 -05:00
Peter Thorson 84efd0425a moves sleep call out of a critical section references #283 2013-10-20 14:44:37 -05:00
Peter Thorson f4d3640870 update changelog 2013-10-20 14:30:54 -05:00
Peter Thorson 6680606523 Merge pull request #297 from evgeni/master
fix "recieve" typo in various places
2013-10-20 12:17:58 -07:00
Peter Thorson 962a090300 update code style and add docs+tests for uri::get_query references #298 2013-10-20 14:15:05 -05:00
Peter Thorson 4d51d990a4 Merge pull request #298 from Banaan/master
add get_query to URI handling
2013-10-20 12:00:54 -07:00
Peter Thorson 362f828767 update changelog 2013-10-20 13:14:12 -05:00
Peter Thorson aa74d2b295 switched asio read and write handler to use a custom allocator 2013-10-20 13:09:37 -05:00
Peter Thorson 746389efab removes some range checking in an inner read loop 2013-10-20 13:05:46 -05:00
Peter Thorson 5894601291 caches callback functions rather than copying them from handler to handler 2013-10-20 13:03:32 -05:00
Peter Thorson 17c9831449 cache the binding of async read and write handler callbacks 2013-10-20 12:57:02 -05:00
Peter Thorson 45a612f44b additional support for compile time disabling of multithreading features 2013-10-20 12:54:59 -05:00
Peter Thorson 2e3dfe7935 disable boundary checking in inner read loop 2013-10-20 12:50:10 -05:00
Peter Thorson 1b453e4679 adjust default read buffer size based on profiling 2013-10-20 12:49:22 -05:00
Peter Thorson 72a3bd6e4e add custom config to testee server to allow easier tuning 2013-10-20 12:48:09 -05:00
Banaan e73f4cdb10 Update uri.hpp
get query of URI method
2013-10-20 19:35:47 +02:00
Evgeni Golov 86fe22334c fix "recieve" typo in various places 2013-10-20 17:19:26 +02:00
Peter Thorson 71e6babd93 statically bind frame read and write handlers 2013-10-16 08:25:10 -05:00
Peter Thorson 8b9fa5db72 add switchable 1 vs 2 thread testee server operation 2013-10-16 08:24:28 -05:00
Peter Thorson 57d8e5cb6b add better clarification to exception printing 2013-10-16 08:24:02 -05:00
Peter Thorson 1e97f6c67c statically bind the async_read_handler 2013-10-16 08:23:42 -05:00
Peter Thorson 5c14c8e71e adds a compile time multithreading switch 2013-10-16 08:22:14 -05:00
Peter Thorson 933533c623 initial notes about tutorials 2013-10-13 12:49:30 -05:00
30 changed files with 1378 additions and 550 deletions
+8
View File
@@ -1,4 +1,12 @@
HEAD
- Adds URI method to extract query string from URI. Thank you Banaan for code.
#298
- Numerous performance improvements. Including: tuned default buffer sizes based
on profiling, caching of handler binding for async reads/writes, non-malloc
allocators for read/write handlers, disabling of a number of questionably
useful range sanity checks in tight inner loops.
- Adds a compile time switch to asio transport config to disable certain
multithreading features (some locks, asio strands)
0.3.0-alpha4 - 2013-10-11
- HTTP requests ending normally are no longer logged as errors. Thank you Banaan
@@ -97,6 +97,8 @@ public:
websocketpp::lib::error_code ec;
while(1) {
bool wait = false;
{
scoped_lock guard(m_lock);
// If the connection has been closed, stop generating telemetry
@@ -104,11 +106,15 @@ public:
// If the connection hasn't been opened yet wait a bit and retry
if (!m_open) {
sleep(1);
continue;
wait = true;
}
}
if (wait) {
sleep(1);
continue;
}
val.str("");
val << "count is " << count++;
+3 -2
View File
@@ -6,6 +6,7 @@ Import('env_cpp11')
Import('boostlibs')
Import('platform_libs')
Import('polyfill_libs')
Import('tls_libs')
env = env.Clone ()
env_cpp11 = env_cpp11.Clone ()
@@ -14,10 +15,10 @@ prgs = []
# if a C++11 environment is available build using that, otherwise use boost
if env_cpp11.has_key('WSPP_CPP11_ENABLED'):
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs]
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs] + [tls_libs]
prgs += env_cpp11.Program('testee_server', ["testee_server.cpp"], LIBS = ALL_LIBS)
else:
ALL_LIBS = boostlibs(['system'],env) + [platform_libs] + [polyfill_libs]
ALL_LIBS = boostlibs(['system'],env) + [platform_libs] + [polyfill_libs] + [tls_libs]
prgs += env.Program('testee_server', ["testee_server.cpp"], LIBS = ALL_LIBS)
Return('prgs')
+101 -10
View File
@@ -25,11 +25,80 @@
*
*/
#include <websocketpp/config/asio_no_tls.hpp>
//#include <websocketpp/config/asio_no_tls.hpp>
#include <websocketpp/config/asio.hpp>
#include <websocketpp/server.hpp>
#include <websocketpp/message_buffer/fixed.hpp>
#include <iostream>
typedef websocketpp::server<websocketpp::config::asio> server;
typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr;
std::string get_password() {
return "test";
}
context_ptr on_tls_init(websocketpp::connection_hdl hdl) {
//std::cout << "on_tls_init called with hdl: " << hdl.lock().get() << std::endl;
context_ptr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
try {
ctx->set_options(boost::asio::ssl::context::default_workarounds |
boost::asio::ssl::context::no_sslv2 |
boost::asio::ssl::context::single_dh_use);
ctx->set_password_callback(bind(&get_password));
ctx->use_certificate_chain_file("server.pem");
ctx->use_private_key_file("server.pem", boost::asio::ssl::context::pem);
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
}
return ctx;
}
struct testee_config : public websocketpp::config::asio {
// pull default settings from our core config
typedef websocketpp::config::asio core;
typedef core::concurrency_type concurrency_type;
typedef core::request_type request_type;
typedef core::response_type response_type;
typedef core::message_type message_type;
typedef core::con_msg_manager_type con_msg_manager_type;
typedef core::endpoint_msg_manager_type endpoint_msg_manager_type;
//typedef websocketpp::message_buffer::fixed::policy::message message_type;
//typedef websocketpp::message_buffer::fixed::policy::con_msg_manager con_msg_manager_type;
////typedef websocketpp::message_buffer::fixed::policy::con_msg_manager endpoint_msg_manager_type;
//typedef websocketpp::message_buffer::fixed::policy::endpoint_msg_manager endpoint_msg_manager_type;
typedef core::alog_type alog_type;
typedef core::elog_type elog_type;
typedef core::rng_type rng_type;
typedef core::endpoint_base endpoint_base;
static bool const enable_multithreading = true;
struct transport_config : public core::transport_config {
typedef core::concurrency_type concurrency_type;
typedef core::elog_type elog_type;
typedef core::alog_type alog_type;
typedef core::request_type request_type;
typedef core::response_type response_type;
static bool const enable_multithreading = true;
};
typedef websocketpp::transport::asio::endpoint<transport_config>
transport_type;
static const websocketpp::log::level elog_level =
websocketpp::log::elevel::none;
static const websocketpp::log::level alog_level =
websocketpp::log::alevel::none;
};
typedef websocketpp::server<testee_config> server;
using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
@@ -43,33 +112,55 @@ void on_message(server* s, websocketpp::connection_hdl hdl, message_ptr msg) {
s->send(hdl, msg->get_payload(), msg->get_opcode());
}
int main() {
int main(int argc, char * argv[]) {
// Create a server endpoint
server testee_server;
short port = 9002;
size_t num_threads = 1;
if (argc == 3) {
port = atoi(argv[1]);
num_threads = atoi(argv[2]);
}
try {
// Total silence
testee_server.clear_access_channels(websocketpp::log::alevel::all);
testee_server.clear_error_channels(websocketpp::log::alevel::all);
//testee_server.clear_access_channels(websocketpp::log::alevel::all);
//testee_server.clear_error_channels(websocketpp::log::alevel::all);
// Initialize ASIO
testee_server.init_asio();
// Register our message handler
testee_server.set_message_handler(bind(&on_message,&testee_server,::_1,::_2));
//testee_server.set_tls_init_handler(bind(&on_tls_init,::_1));
// Listen on port 9002
testee_server.listen(9002);
// Listen on port
testee_server.listen(port);
// Start the server accept loop
testee_server.start_accept();
// Start the ASIO io_service run loop
testee_server.run();
if (num_threads == 1) {
testee_server.run();
} else {
typedef websocketpp::lib::shared_ptr<websocketpp::lib::thread> thread_ptr;
std::vector<thread_ptr> ts;
for (size_t i = 0; i < num_threads; i++) {
ts.push_back(thread_ptr(new websocketpp::lib::thread(&server::run, &testee_server)));
}
for (size_t i = 0; i < num_threads; i++) {
ts[i]->join();
}
}
} catch (const std::exception & e) {
std::cout << e.what() << std::endl;
std::cout << "exception: " << e.what() << std::endl;
} catch (websocketpp::lib::error_code e) {
std::cout << e.message() << std::endl;
std::cout << "error code: " << e.message() << std::endl;
} catch (...) {
std::cout << "other exception" << std::endl;
}
+49 -9
View File
@@ -29,15 +29,18 @@ public:
typedef std::chrono::duration<int,std::micro> dur_type;
perftest () {
m_endpoint.set_access_channels(websocketpp::log::alevel::all);
m_endpoint.set_error_channels(websocketpp::log::elevel::all);
m_endpoint.set_access_channels(websocketpp::log::alevel::none);
m_endpoint.set_error_channels(websocketpp::log::elevel::none);
// Initialize ASIO
m_endpoint.init_asio();
// Register our handlers
m_endpoint.set_socket_init_handler(bind(&type::on_socket_init,this,::_1));
m_endpoint.set_tls_init_handler(bind(&type::on_tls_init,this,::_1));
m_endpoint.set_tcp_pre_init_handler(bind(&type::on_tcp_pre_init,this,::_1));
m_endpoint.set_tcp_post_init_handler(bind(&type::on_tcp_post_init,this,::_1));
m_endpoint.set_socket_init_handler(bind(&type::on_socket_init,this,::_1));
m_endpoint.set_message_handler(bind(&type::on_message,this,::_1,::_2));
m_endpoint.set_open_handler(bind(&type::on_open,this,::_1));
m_endpoint.set_close_handler(bind(&type::on_close,this,::_1));
@@ -60,12 +63,17 @@ public:
m_endpoint.run();
}
void on_tcp_pre_init(websocketpp::connection_hdl hdl) {
m_tcp_pre_init = std::chrono::high_resolution_clock::now();
}
void on_tcp_post_init(websocketpp::connection_hdl hdl) {
m_tcp_post_init = std::chrono::high_resolution_clock::now();
}
void on_socket_init(websocketpp::connection_hdl hdl) {
m_socket_init = std::chrono::high_resolution_clock::now();
}
context_ptr on_tls_init(websocketpp::connection_hdl hdl) {
m_tls_init = std::chrono::high_resolution_clock::now();
context_ptr ctx(new boost::asio::ssl::context(boost::asio::ssl::context::tlsv1));
try {
@@ -80,27 +88,59 @@ public:
void on_open(websocketpp::connection_hdl hdl) {
m_open = std::chrono::high_resolution_clock::now();
m_endpoint.send(hdl, "", websocketpp::frame::opcode::text);
client::connection_ptr con = m_endpoint.get_con_from_hdl(hdl);
m_msg = con->get_message(websocketpp::frame::opcode::text,64);
m_msg->append_payload(std::string(60,'*'));
m_msg_count = 1;
//m_message_stamps.reserve(1000);
m_con_start = std::chrono::high_resolution_clock::now();
m_endpoint.send(hdl, m_msg);
//m_endpoint.send(hdl, "", websocketpp::frame::opcode::text);
}
void on_message(websocketpp::connection_hdl hdl, message_ptr msg) {
m_message = std::chrono::high_resolution_clock::now();
m_endpoint.close(hdl,websocketpp::close::status::going_away,"");
if (m_msg_count == 1000) {
m_message = std::chrono::high_resolution_clock::now();
m_endpoint.close(hdl,websocketpp::close::status::going_away,"");
} else {
m_msg_count++;
m_endpoint.send(hdl, m_msg);
}
}
void on_close(websocketpp::connection_hdl hdl) {
m_close = std::chrono::high_resolution_clock::now();
std::cout << "Socket Init: " << std::chrono::duration_cast<dur_type>(m_socket_init-m_start).count() << std::endl;
std::cout << "TLS Init: " << std::chrono::duration_cast<dur_type>(m_tls_init-m_start).count() << std::endl;
std::cout << "TCP Pre Init: " << std::chrono::duration_cast<dur_type>(m_tcp_pre_init-m_start).count() << std::endl;
std::cout << "TCP Post Init: " << std::chrono::duration_cast<dur_type>(m_tcp_post_init-m_start).count() << std::endl;
std::cout << "Open: " << std::chrono::duration_cast<dur_type>(m_open-m_start).count() << std::endl;
std::cout << "Start: " << std::chrono::duration_cast<dur_type>(m_con_start-m_start).count() << std::endl;
std::cout << "Message: " << std::chrono::duration_cast<dur_type>(m_message-m_start).count() << std::endl;
std::cout << "Close: " << std::chrono::duration_cast<dur_type>(m_close-m_start).count() << std::endl;
std::cout << std::endl;
std::cout << "Message: " << std::chrono::duration_cast<dur_type>(m_message-m_con_start).count() << std::endl;
std::cout << "Close: " << std::chrono::duration_cast<dur_type>(m_close-m_message).count() << std::endl;
}
private:
client m_endpoint;
client::message_ptr m_msg;
size_t m_msg_count;
std::chrono::high_resolution_clock::time_point m_start;
std::chrono::high_resolution_clock::time_point m_tcp_pre_init;
std::chrono::high_resolution_clock::time_point m_tcp_post_init;
std::chrono::high_resolution_clock::time_point m_socket_init;
std::chrono::high_resolution_clock::time_point m_tls_init;
std::vector<std::chrono::high_resolution_clock::time_point> m_message_stamps;
std::chrono::high_resolution_clock::time_point m_con_start;
std::chrono::high_resolution_clock::time_point m_open;
std::chrono::high_resolution_clock::time_point m_message;
std::chrono::high_resolution_clock::time_point m_close;
+2
View File
@@ -95,6 +95,8 @@ struct config {
typedef websocketpp::http::parser::response response_type;
typedef websocketpp::transport::asio::tls_socket::endpoint socket_type;
static const bool enable_multithreading = true;
static const long timeout_socket_pre_init = 1000;
static const long timeout_proxy = 1000;
static const long timeout_socket_post_init = 1000;
+2
View File
@@ -43,6 +43,7 @@ BOOST_AUTO_TEST_CASE( uri_valid ) {
BOOST_CHECK_EQUAL( uri.get_host(), "localhost");
BOOST_CHECK_EQUAL( uri.get_port(), 9000 );
BOOST_CHECK_EQUAL( uri.get_resource(), "/chat" );
BOOST_CHECK_EQUAL( uri.get_query(), "" );
}
// Test a regular valid ws URI
@@ -201,6 +202,7 @@ BOOST_AUTO_TEST_CASE( uri_valid_4 ) {
BOOST_CHECK_EQUAL( uri.get_host(), "localhost");
BOOST_CHECK_EQUAL( uri.get_port(), 9000 );
BOOST_CHECK_EQUAL( uri.get_resource(), "/chat/foo/bar?foo=bar" );
BOOST_CHECK_EQUAL( uri.get_query(), "foo=bar" );
}
// Valid URI with a mapped v4 ipv6 literal
@@ -0,0 +1,45 @@
WebSocket++ (0.3.0-alpha3)
==========================
WebSocket++ is a header only C++ library that implements RFC6455 The WebSocket
Protocol. It allows integrating WebSocket client and server functionality into
C++ programs. It uses interchangeable network transport modules including one
based on C++ iostreams and one based on Boost Asio.
Major Features
==============
* Full support for RFC6455
* Partial support for Hixie 76 / Hybi 00, 07-17 draft specs (server only)
* Message/event based interface
* Supports secure WebSockets (TLS), IPv6, and explicit proxies.
* Flexible dependency management (C++11 Standard Library or Boost)
* Interchangeable network transport modules (iostream and Boost Asio)
* Portable/cross platform (Posix/Windows, 32/64bit, Intel/ARM/PPC)
* Thread-safe
Get Involved
============
[![Build Status](https://travis-ci.org/zaphoyd/websocketpp.png)](https://travis-ci.org/zaphoyd/websocketpp)
**Project Website**
http://www.zaphoyd.com/websocketpp/
**User Manual**
http://www.zaphoyd.com/websocketpp/manual/
**GitHub Repository**
https://github.com/zaphoyd/websocketpp/
**Announcements Mailing List**
http://groups.google.com/group/websocketpp-announcements/
**IRC Channel**
#websocketpp (freenode)
**Discussion / Development / Support Mailing List / Forum**
http://groups.google.com/group/websocketpp/
Author
======
Peter Thorson - websocketpp@zaphoyd.com
+75
View File
@@ -0,0 +1,75 @@
Chat Tutorial
=============
Goals of this tutorial:
- Impliment a realtime websocket chat server
Server
- Nicknames
- Channels
- Subprotocol
- Origin restrictions
- HTTP statistics page
Hi all,
I am working on some tutorials for WebSocket++ and was wondering if anyone here has specific features or concepts that they feel would be particularly useful to have covered? Anything that you wish there was a tutorial for now or wished there had been when you were first setting up your WebSocket++ application?
Best,
Peter
WebSocket++ is a header only C++ library that implements RFC6455 The WebSocket
Protocol. It allows integrating WebSocket client and server functionality into
C++ programs. It uses interchangeable network transport modules including one
based on C++ iostreams and one based on Boost Asio.
Major Features
==============
* Full support for RFC6455
* Partial support for Hixie 76 / Hybi 00, 07-17 draft specs (server only)
* Message/event based interface
* Supports secure WebSockets (TLS), IPv6, and explicit proxies.
* Flexible dependency management (C++11 Standard Library or Boost)
* Interchangeable network transport modules (iostream and Boost Asio)
* Portable/cross platform (Posix/Windows, 32/64bit, Intel/ARM/PPC)
* Thread-safe
Get Involved
============
[![Build Status](https://travis-ci.org/zaphoyd/websocketpp.png)](https://travis-ci.org/zaphoyd/websocketpp)
**Project Website**
http://www.zaphoyd.com/websocketpp/
**User Manual**
http://www.zaphoyd.com/websocketpp/manual/
**GitHub Repository**
https://github.com/zaphoyd/websocketpp/
**Announcements Mailing List**
http://groups.google.com/group/websocketpp-announcements/
**IRC Channel**
#websocketpp (freenode)
**Discussion / Development / Support Mailing List / Forum**
http://groups.google.com/group/websocketpp/
Author
======
Peter Thorson - websocketpp@zaphoyd.com
-123
View File
@@ -1,123 +0,0 @@
/*
base64.cpp and base64.h
Copyright (C) 2004-2008 René Nyffenegger
This source code is provided 'as-is', without any express or implied
warranty. In no event will the author be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this source code must not be misrepresented; you must not
claim that you wrote the original source code. If you use this source code
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original source code.
3. This notice may not be removed or altered from any source distribution.
René Nyffenegger rene.nyffenegger@adp-gmbh.ch
*/
#include "base64.h"
#include <iostream>
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
static inline bool is_base64(unsigned char c) {
return (isalnum(c) || (c == '+') || (c == '/'));
}
std::string base64_encode(unsigned char const* bytes_to_encode, unsigned int in_len) {
std::string ret;
int i = 0;
int j = 0;
unsigned char char_array_3[3];
unsigned char char_array_4[4];
while (in_len--) {
char_array_3[i++] = *(bytes_to_encode++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for(i = 0; (i <4) ; i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}
if (i)
{
for(j = i; j < 3; j++)
char_array_3[j] = '\0';
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; (j < i + 1); j++)
ret += base64_chars[char_array_4[j]];
while((i++ < 3))
ret += '=';
}
return ret;
}
std::string base64_decode(std::string const& encoded_string) {
size_t in_len = encoded_string.size();
int i = 0;
int j = 0;
int in_ = 0;
unsigned char char_array_4[4], char_array_3[3];
std::string ret;
while (in_len-- && ( encoded_string[in_] != '=') && is_base64(encoded_string[in_])) {
char_array_4[i++] = encoded_string[in_]; in_++;
if (i ==4) {
for (i = 0; i <4; i++)
char_array_4[i] = static_cast<unsigned char>(base64_chars.find(char_array_4[i]));
char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for (i = 0; (i < 3); i++)
ret += char_array_3[i];
i = 0;
}
}
if (i) {
for (j = i; j <4; j++)
char_array_4[j] = 0;
for (j = 0; j <4; j++)
char_array_4[j] = static_cast<unsigned char>(base64_chars.find(char_array_4[j]));
char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for (j = 0; (j < i - 1); j++) ret += char_array_3[j];
}
return ret;
}
-4
View File
@@ -1,4 +0,0 @@
#include <string>
std::string base64_encode(unsigned char const* , unsigned int len);
std::string base64_decode(std::string const& s);
+8 -4
View File
@@ -38,7 +38,9 @@
#include <string>
static const std::string base64_chars =
namespace websocketpp {
static std::string const base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
@@ -50,7 +52,7 @@ static inline bool is_base64(unsigned char c) {
(c >= 97 && c <= 122)); // a-z
}
inline std::string base64_encode(unsigned char const* bytes_to_encode, unsigned
inline std::string base64_encode(unsigned char const * bytes_to_encode, unsigned
int in_len)
{
std::string ret;
@@ -100,11 +102,11 @@ inline std::string base64_encode(unsigned char const* bytes_to_encode, unsigned
return ret;
}
inline std::string base64_encode(const std::string & data) {
inline std::string base64_encode(std::string const & data) {
return base64_encode(reinterpret_cast<const unsigned char *>(data.data()),data.size());
}
inline std::string base64_decode(std::string const& encoded_string) {
inline std::string base64_decode(std::string const & encoded_string) {
size_t in_len = encoded_string.size();
int i = 0;
int j = 0;
@@ -149,4 +151,6 @@ inline std::string base64_decode(std::string const& encoded_string) {
return ret;
}
} // namespace websocketpp
#endif // _BASE64_HPP_
+1 -1
View File
@@ -197,7 +197,7 @@ union code_converter {
*
* If the value is in an invalid or reserved range ec is set accordingly.
*
* @param [in] payload Close frame payload value recieved over the wire.
* @param [in] payload Close frame payload value received over the wire.
* @param [out] ec Set to indicate what error occurred, if any.
* @return The extracted value
*/
+11 -1
View File
@@ -91,6 +91,11 @@ struct core {
/// RNG policies
typedef websocketpp::random::none::int_generator<uint32_t> rng_type;
/// Controls compile time enabling/disabling of thread syncronization
/// code Disabling can provide a minor performance improvement to single
/// threaded applications
static bool const enable_multithreading = true;
struct transport_config {
typedef type::concurrency_type concurrency_type;
typedef type::elog_type elog_type;
@@ -98,6 +103,11 @@ struct core {
typedef type::request_type request_type;
typedef type::response_type response_type;
/// Controls compile time enabling/disabling of thread syncronization
/// code Disabling can provide a minor performance improvement to single
/// threaded applications
static bool const enable_multithreading = true;
/// Default timer values (in ms)
/// Length of time to wait for socket pre-initialization
@@ -179,7 +189,7 @@ struct core {
websocketpp::log::alevel::all ^ websocketpp::log::alevel::devel;
///
static const size_t connection_read_buffer_size = 512;
static const size_t connection_read_buffer_size = 16384;
/// Drop connections immediately on protocol error.
/**
+11 -1
View File
@@ -92,6 +92,11 @@ struct core_client {
typedef websocketpp::random::random_device::int_generator<uint32_t,
concurrency_type> rng_type;
/// Controls compile time enabling/disabling of thread syncronization code
/// Disabling can provide a minor performance improvement to single threaded
/// applications
static bool const enable_multithreading = true;
struct transport_config {
typedef type::concurrency_type concurrency_type;
typedef type::elog_type elog_type;
@@ -99,6 +104,11 @@ struct core_client {
typedef type::request_type request_type;
typedef type::response_type response_type;
/// Controls compile time enabling/disabling of thread syncronization
/// code Disabling can provide a minor performance improvement to single
/// threaded applications
static bool const enable_multithreading = true;
/// Default timer values (in ms)
/// Length of time to wait for socket pre-initialization
@@ -180,7 +190,7 @@ struct core_client {
websocketpp::log::alevel::all ^ websocketpp::log::alevel::devel;
///
static const size_t connection_read_buffer_size = 512;
static const size_t connection_read_buffer_size = 16384;
/// Drop connections immediately on protocol error.
/**
+11 -1
View File
@@ -92,6 +92,11 @@ struct debug_core {
/// RNG policies
typedef websocketpp::random::none::int_generator<uint32_t> rng_type;
/// Controls compile time enabling/disabling of thread syncronization
/// code Disabling can provide a minor performance improvement to single
/// threaded applications
static bool const enable_multithreading = true;
struct transport_config {
typedef type::concurrency_type concurrency_type;
typedef type::elog_type elog_type;
@@ -99,6 +104,11 @@ struct debug_core {
typedef type::request_type request_type;
typedef type::response_type response_type;
/// Controls compile time enabling/disabling of thread syncronization
/// code Disabling can provide a minor performance improvement to single
/// threaded applications
static bool const enable_multithreading = true;
/// Default timer values (in ms)
/// Length of time to wait for socket pre-initialization
@@ -180,7 +190,7 @@ struct debug_core {
websocketpp::log::alevel::all;
///
static const size_t connection_read_buffer_size = 512;
static const size_t connection_read_buffer_size = 16384;
/// Drop connections immediately on protocol error.
/**
+21 -2
View File
@@ -149,6 +149,10 @@ typedef lib::function<bool(connection_hdl)> validate_handler;
*/
typedef lib::function<void(connection_hdl)> http_handler;
//
typedef lib::function<void(lib::error_code const & ec, size_t bytes_transferred)> read_handler;
typedef lib::function<void(lib::error_code const & ec)> write_frame_handler;
// constants related to the default WebSocket protocol versions available
#ifdef _WEBSOCKETPP_INITIALIZER_LISTS_ // simplified C++11 version
/// Container that stores the list of protocol versions supported
@@ -278,6 +282,17 @@ public:
explicit connection(bool is_server, std::string const & ua, alog_type& alog,
elog_type& elog, rng_type & rng)
: transport_con_type(is_server,alog,elog)
, m_handle_read_frame(lib::bind(
&type::handle_read_frame,
this,
lib::placeholders::_1,
lib::placeholders::_2
))
, m_write_frame_handler(lib::bind(
&type::handle_write_frame,
this,
lib::placeholders::_1
))
, m_user_agent(ua)
, m_state(session::state::connecting)
, m_internal_state(session::internal_state::USER_INIT)
@@ -983,7 +998,7 @@ public:
message_ptr get_message(websocketpp::frame::opcode::value op, size_t size)
const
{
return m_msg_manager->get_message(op, size);
return m_msg_manager->get_outgoing_message(op, size);
}
void start();
@@ -1034,7 +1049,7 @@ public:
* @param ec A status code from the transport layer, zero on success,
* non-zero otherwise.
*/
void handle_write_frame(bool terminate, lib::error_code const & ec);
void handle_write_frame(lib::error_code const & ec);
protected:
void handle_transport_init(lib::error_code const & ec);
@@ -1190,6 +1205,10 @@ private:
*/
void log_fail_result();
// internal handler functions
read_handler m_handle_read_frame;
write_frame_handler m_write_frame_handler;
// static settings
const std::string m_user_agent;
+38 -26
View File
@@ -75,17 +75,17 @@ template <typename config>
lib::error_code connection<config>::send(const std::string& payload,
frame::opcode::value op)
{
message_ptr msg = m_msg_manager->get_message(op,payload.size());
message_ptr msg = m_msg_manager->get_outgoing_message(op,payload.size());
msg->append_payload(payload);
return send(msg);
}
template <typename config>
lib::error_code connection<config>::send(const void* payload, size_t len,
lib::error_code connection<config>::send(void const * payload, size_t len,
frame::opcode::value op)
{
message_ptr msg = m_msg_manager->get_message(op,len);
message_ptr msg = m_msg_manager->get_outgoing_message(op,len);
msg->append_payload(payload,len);
return send(msg);
@@ -111,7 +111,7 @@ lib::error_code connection<config>::send(typename config::message_type::ptr msg)
write_push(outgoing_msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
} else {
outgoing_msg = m_msg_manager->get_message();
outgoing_msg = m_msg_manager->get_outgoing_message(msg->get_opcode(),0);
if (!outgoing_msg) {
return error::make_error_code(error::no_outgoing_buffers);
@@ -139,7 +139,8 @@ lib::error_code connection<config>::send(typename config::message_type::ptr msg)
}
template <typename config>
void connection<config>::ping(const std::string& payload, lib::error_code& ec) {
void connection<config>::ping(std::string const & payload, lib::error_code & ec)
{
m_alog.write(log::alevel::devel,"connection ping");
if (m_state != session::state::open) {
@@ -147,7 +148,7 @@ void connection<config>::ping(const std::string& payload, lib::error_code& ec) {
return;
}
message_ptr msg = m_msg_manager->get_message();
message_ptr msg = m_msg_manager->get_outgoing_message(frame::opcode::ping, payload.size());
if (!msg) {
ec = error::make_error_code(error::no_outgoing_buffers);
return;
@@ -207,8 +208,8 @@ void connection<config>::ping(const std::string& payload) {
}
template<typename config>
void connection<config>::handle_pong_timeout(std::string payload, const lib::error_code &
ec)
void connection<config>::handle_pong_timeout(std::string payload,
lib::error_code const & ec)
{
if (ec) {
if (ec == transport::error::operation_aborted) {
@@ -226,7 +227,8 @@ void connection<config>::handle_pong_timeout(std::string payload, const lib::err
}
template <typename config>
void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
void connection<config>::pong(std::string const & payload, lib::error_code & ec)
{
m_alog.write(log::alevel::devel,"connection pong");
if (m_state != session::state::open) {
@@ -234,7 +236,7 @@ void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
return;
}
message_ptr msg = m_msg_manager->get_message();
message_ptr msg = m_msg_manager->get_outgoing_message(frame::opcode::pong, payload.size());
if (!msg) {
ec = error::make_error_code(error::no_outgoing_buffers);
return;
@@ -261,7 +263,7 @@ void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
}
template<typename config>
void connection<config>::pong(const std::string& payload) {
void connection<config>::pong(std::string const & payload) {
lib::error_code ec;
pong(payload,ec);
if (ec) {
@@ -270,8 +272,8 @@ void connection<config>::pong(const std::string& payload) {
}
template <typename config>
void connection<config>::close(const close::status::value code,
const std::string & reason, lib::error_code & ec)
void connection<config>::close(close::status::value const code,
std::string const & reason, lib::error_code & ec)
{
m_alog.write(log::alevel::devel,"connection close");
@@ -288,7 +290,7 @@ void connection<config>::close(const close::status::value code,
}
template<typename config>
void connection<config>::close(const close::status::value code,
void connection<config>::close(close::status::value const code,
const std::string & reason)
{
lib::error_code ec;
@@ -624,7 +626,7 @@ void connection<config>::handle_transport_init(lib::error_code const & ec) {
if (ec) {
std::stringstream s;
s << "handle_transport_init recieved error: "<< ec.message();
s << "handle_transport_init received error: "<< ec.message();
m_elog.write(log::elevel::fatal,s.str());
this->terminate(ec);
@@ -850,11 +852,11 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
}
// Boundaries checking. TODO: How much of this should be done?
if (bytes_transferred > config::connection_read_buffer_size) {
/*if (bytes_transferred > config::connection_read_buffer_size) {
m_elog.write(log::elevel::fatal,"Fatal boundaries checking error");
this->terminate(make_error_code(error::general));
return;
}
}*/
size_t p = 0;
@@ -914,6 +916,9 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
message_ptr msg = m_processor->get_message();
// TODO: do we need to catch exceptions here and call the message
// handler hook. WebSocket++ doesn't throw any exceptions here, is
// there anything that could be thrown that could be recovered from?
if (!msg) {
m_alog.write(log::alevel::devel,
"null message from m_processor");
@@ -925,8 +930,10 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
} else if (m_message_handler) {
m_message_handler(m_connection_hdl, msg);
}
m_msg_manager->message_handler_hook(msg);
} else {
process_control_frame(msg);
m_msg_manager->message_handler_hook(msg);
}
}
}
@@ -942,12 +949,13 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
1,
m_buf,
config::connection_read_buffer_size,
lib::bind(
/*lib::bind(
&type::handle_read_frame,
type::get_shared(),
lib::placeholders::_1,
lib::placeholders::_2
)
)*/
m_handle_read_frame
);
}
@@ -1540,8 +1548,8 @@ void connection<config>::write_frame() {
m_write_flag = true;
}
const std::string& header = m_current_msg->get_header();
const std::string& payload = m_current_msg->get_payload();
std::string const & header = m_current_msg->get_header();
std::string const & payload = m_current_msg->get_payload();
m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
@@ -1563,26 +1571,30 @@ void connection<config>::write_frame() {
}
}
transport_con_type::async_write(
m_send_buffer,
lib::bind(
/*lib::bind(
&type::handle_write_frame,
type::get_shared(),
m_current_msg->get_terminal(),
lib::placeholders::_1
)
)*/
m_write_frame_handler
);
}
template <typename config>
void connection<config>::handle_write_frame(bool terminate,
const lib::error_code& ec)
void connection<config>::handle_write_frame(lib::error_code const & ec)
{
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"connection handle_write_frame");
}
bool terminate = m_current_msg->get_terminal();
m_send_buffer.clear();
m_msg_manager->write_handler_hook(m_current_msg);
m_current_msg.reset();
if (ec) {
@@ -1835,7 +1847,7 @@ lib::error_code connection<config>::send_close_frame(close::status::value code,
<< m_local_close_reason;
m_alog.write(log::alevel::devel,s.str());
message_ptr msg = m_msg_manager->get_message();
message_ptr msg = m_msg_manager->get_outgoing_message(frame::opcode::close,m_local_close_reason.size());
if (!msg) {
return error::make_error_code(error::no_outgoing_buffers);
}
+24
View File
@@ -67,6 +67,30 @@ public:
return message_ptr(new message(type::shared_from_this(),op,size));
}
/*message_ptr get_incoming_message() {
return get_message();
}
message_ptr get_outgoing_message() {
return get_message();
}*/
message_ptr get_incoming_message(frame::opcode::value op, size_t size) {
return get_message(op,size);
}
message_ptr get_outgoing_message(frame::opcode::value op, size_t size) {
return get_message(op,size);
}
void message_handler_hook(message_ptr msg) {
// nothing to do here
}
void write_handler_hook(message_ptr msg) {
// nothing to do here
}
/// Recycle a message
/**
* This method shouldn't be called. If it is, return false to indicate an
+160
View File
@@ -0,0 +1,160 @@
/*
* Copyright (c) 2013, Peter Thorson. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the WebSocket++ Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef WEBSOCKETPP_MESSAGE_BUFFER_FIXED_HPP
#define WEBSOCKETPP_MESSAGE_BUFFER_FIXED_HPP
#include <websocketpp/message_buffer/fixed_message.hpp>
#include <websocketpp/common/memory.hpp>
#include <websocketpp/frame.hpp>
namespace websocketpp {
namespace message_buffer {
namespace fixed {
class con_msg_manager;
class endpoint_msg_manager;
struct policy {
typedef websocketpp::message_buffer::fixed_message message;
typedef websocketpp::message_buffer::fixed::con_msg_manager con_msg_manager;
typedef websocketpp::message_buffer::fixed::endpoint_msg_manager endpoint_msg_manager;
};
/// A connection message manager that allocates a new message for each
/// request.
class con_msg_manager : public lib::enable_shared_from_this<con_msg_manager> {
public:
typedef con_msg_manager type;
typedef std::shared_ptr<type> ptr;
typedef policy::message message;
typedef policy::message::ptr message_ptr;
con_msg_manager()
: m_incoming_message(new message())
, m_incoming_message_busy(false)
, m_outgoing_message(new message())
, m_outgoing_message_busy(false) {}
/// Get an empty message buffer
/**
* @return A shared pointer to an empty new message
*/
message_ptr get_message() {
return message_ptr(new message());
}
message_ptr get_incoming_message(frame::opcode::value op, size_t size) {
if (frame::opcode::is_control(op)) {
return message_ptr(new message(op, size));
} else {
if (m_incoming_message_busy) {
return message_ptr();
} else {
m_incoming_message->set_opcode(op);
m_incoming_message->reserve(size);
return m_incoming_message;
}
}
}
message_ptr get_outgoing_message(frame::opcode::value op, size_t size) {
if (frame::opcode::is_control(op)) {
return message_ptr(new message(op, size));
} else {
if (m_outgoing_message_busy) {
return message_ptr();
} else {
m_outgoing_message->set_opcode(op);
m_outgoing_message->reserve(size);
return m_outgoing_message;
}
}
}
void message_handler_hook(message_ptr msg) {
recycle(msg);
}
void write_handler_hook(message_ptr msg) {
// TODO: we shouldn't recycle a message every time it is written. If a
// message is queued for writing multiple times... Needs a reference
// count in the message to deal with this.
recycle(msg);
}
bool recycle(message_ptr msg) {
if (msg == m_incoming_message) {
m_incoming_message_busy = false;
msg->reset();
return true;
} else if (msg == m_outgoing_message) {
m_outgoing_message_busy = false;
msg->reset();
return true;
} else {
// not a message we are managing, ignore
return false;
}
}
/// Get a pointer to a connection message manager
/**
* @return A pointer to the requested connection message manager.
*/
ptr get_manager() {
return shared_from_this();
}
private:
message_ptr m_incoming_message;
bool m_incoming_message_busy;
message_ptr m_outgoing_message;
bool m_outgoing_message_busy;
};
/// An endpoint message manager that allocates a new manager for each
/// connection.
class endpoint_msg_manager {
public:
typedef policy::con_msg_manager::ptr con_msg_man_ptr;
/// Get a pointer to a connection message manager
/**
* @return A pointer to the requested connection message manager.
*/
con_msg_man_ptr get_manager() const {
return con_msg_man_ptr(new con_msg_manager());
}
};
} // namespace fixed
} // namespace message_buffer
} // namespace websocketpp
#endif // WEBSOCKETPP_MESSAGE_BUFFER_FIXED_HPP
@@ -0,0 +1,321 @@
/*
* Copyright (c) 2013, Peter Thorson. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the WebSocket++ Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef WEBSOCKETPP_MESSAGE_BUFFER_FIXED_MESSAGE_HPP
#define WEBSOCKETPP_MESSAGE_BUFFER_FIXED_MESSAGE_HPP
#include <websocketpp/common/memory.hpp>
#include <websocketpp/frame.hpp>
#include <string>
namespace websocketpp {
namespace message_buffer {
/* # message:
* object that stores a message while it is being sent or received. Contains
* the message payload itself, the message header, the extension data, and the
* opcode.
*
* # connection_message_manager:
* An object that manages all of the message_buffers associated with a given
* connection. Impliments the get_message_buffer(size) method that returns
* a message buffer at least size bytes long.
*
* Message buffers are reference counted with shared ownership semantics. Once
* requested from the manager the requester and it's associated downstream code
* may keep a pointer to the message indefinitely at a cost of extra resource
* usage. Once the reference count drops to the point where the manager is the
* only reference the messages is recycled using whatever method is implemented
* in the manager.
*
* # endpoint_message_manager:
* An object that manages connection_message_managers. Impliments the
* get_message_manager() method. This is used once by each connection to
* request the message manager that they are supposed to use to manage message
* buffers for their own use.
*
* TYPES OF CONNECTION_MESSAGE_MANAGERS
* - allocate a message with the exact size every time one is requested
* - maintain a pool of pre-allocated messages and return one when needed.
* Recycle previously used messages back into the pool
*
* TYPES OF ENDPOINT_MESSAGE_MANAGERS
* - allocate a new connection manager for each connection. Message pools
* become connection specific. This increases memory usage but improves
* concurrency.
* - allocate a single connection manager and share a pointer to it with all
* connections created by this endpoint. The message pool will be shared
* among all connections, improving memory usage and performance at the cost
* of reduced concurrency
*/
/// Represents a buffer for a single WebSocket message.
/**
*
*
*/
class fixed_message {
public:
typedef lib::shared_ptr<fixed_message> ptr;
/// Construct an empty message
/**
* Construct an empty message
*/
fixed_message()
: m_prepared(false)
, m_fin(true)
, m_terminal(false)
, m_compressed(false) {}
/// Construct a message and fill in some values
/**
*
*/
fixed_message(frame::opcode::value op, size_t size = 128)
: m_opcode(op)
, m_prepared(false)
, m_fin(true)
, m_terminal(false)
, m_compressed(false)
{
m_payload.reserve(size);
}
void reset() {
m_payload.clear();
m_prepared = false;
m_fin = true;
m_terminal = false;
m_compressed = false;
}
/// Return whether or not the message has been prepared for sending
/**
* The prepared flag indicates that the message has been prepared by a
* websocket protocol processor and is ready to be written to the wire.
*
* @return whether or not the message has been prepared for sending
*/
bool get_prepared() const {
return m_prepared;
}
/// Set or clear the flag that indicates that the message has been prepared
/**
* This flag should not be set by end user code without a very good reason.
*
* @param value The value to set the prepared flag to
*/
void set_prepared(bool value) {
m_prepared = value;
}
/// Return whether or not the message is flagged as compressed
/**
* @return whether or not the message is/should be compressed
*/
bool get_compressed() const {
return m_compressed;
}
/// Set or clear the compression flag
/**
* The compression flag is used to indicate whether or not the message is
* or should be compressed. Compression is not guaranteed. Both endpoints
* must support a compression extension and the connection must have had
* that extension negotiated in its handshake.
*
* @param value The value to set the compressed flag to
*/
void set_compressed(bool value) {
m_compressed = value;
}
/// Get whether or not the message is terminal
/**
* Messages can be flagged as terminal, which results in the connection
* being close after they are written rather than the implementation going
* on to the next message in the queue. This is typically used internally
* for close messages only.
*
* @return Whether or not this message is marked terminal
*/
bool get_terminal() const {
return m_terminal;
}
/// Set the terminal flag
/**
* This flag should not be set by end user code without a very good reason.
*
* @see get_terminal()
*
* @param value The value to set the terminal flag to.
*/
void set_terminal(bool value) {
m_terminal = value;
}
/// Read the fin bit
/**
* A message with the fin bit set will be sent as the last message of its
* sequence. A message with the fin bit cleared will require subsequent
* frames of opcode continuation until one of them has the fin bit set.
*
* The remote end likely will not deliver any bytes until the frame with the fin
* bit set has been received.
*
* @return Whether or not the fin bit is set
*/
bool get_fin() const {
return m_fin;
}
/// Set the fin bit
/**
* @see get_fin for a more detailed explaination of the fin bit
*
* @param value The value to set the fin bit to.
*/
void set_fin(bool value) {
m_fin = value;
}
/// Return the message opcode
frame::opcode::value get_opcode() const {
return m_opcode;
}
/// Set the opcode
void set_opcode(frame::opcode::value op) {
m_opcode = op;
}
/// Return the prepared frame header
/**
* This value is typically set by a websocket protocol processor
* and shouldn't be tampered with.
*/
std::string const & get_header() const {
return m_header;
}
/// Set prepared frame header
/**
* Under normal circumstances this should not be called by end users
*
* @param header A string to set the header to.
*/
void set_header(std::string const & header) {
m_header = header;
}
std::string const & get_extension_data() const {
return m_extension_data;
}
/// Get a reference to the payload string
/**
* @return A const reference to the message's payload string
*/
std::string const & get_payload() const {
return m_payload;
}
/// Get a non-const reference to the payload string
/**
* @return A reference to the message's payload string
*/
std::string & get_raw_payload() {
return m_payload;
}
/// Set payload data
/**
* Set the message buffer's payload to the given value.
*
* @param payload A string to set the payload to.
*/
void set_payload(std::string const & payload) {
m_payload = payload;
}
/// Set payload data
/**
* Set the message buffer's payload to the given value.
*
* @param payload A pointer to a data array to set to.
* @param len The length of new payload in bytes.
*/
void set_payload(void const * payload, size_t len) {
m_payload.reserve(len);
char const * pl = static_cast<char const *>(payload);
m_payload.assign(pl, pl + len);
}
/// Append payload data
/**
* Append data to the message buffer's payload.
*
* @param payload A string containing the data array to append.
*/
void append_payload(std::string const & payload) {
m_payload.append(payload);
}
/// Append payload data
/**
* Append data to the message buffer's payload.
*
* @param payload A pointer to a data array to append
* @param len The length of payload in bytes
*/
void append_payload(void const * payload, size_t len) {
m_payload.reserve(m_payload.size()+len);
m_payload.append(static_cast<char const *>(payload),len);
}
void reserve(size_t size) {
m_payload.reserve(size);
}
private:
std::string m_header;
std::string m_extension_data;
std::string m_payload;
frame::opcode::value m_opcode;
bool m_prepared;
bool m_fin;
bool m_terminal;
bool m_compressed;
};
} // namespace message_buffer
} // namespace websocketpp
#endif // WEBSOCKETPP_MESSAGE_BUFFER_FIXED_MESSAGE_HPP
+1 -1
View File
@@ -217,7 +217,7 @@ public:
if (m_state == HEADER) {
if (buf[p] == msg_hdr) {
p++;
m_msg_ptr = m_msg_manager->get_message(frame::opcode::text,1);
m_msg_ptr = m_msg_manager->get_incoming_message(frame::opcode::text,1);
if (!m_msg_ptr) {
ec = make_error_code(websocketpp::error::no_incoming_buffers);
+2 -2
View File
@@ -362,7 +362,7 @@ public:
if (frame::opcode::is_control(op)) {
m_control_msg = msg_metadata(
m_msg_manager->get_message(op,m_bytes_needed),
m_msg_manager->get_incoming_message(op,m_bytes_needed),
frame::get_masking_key(m_basic_header,m_extended_header)
);
@@ -370,7 +370,7 @@ public:
} else {
if (!m_data_msg.msg_ptr) {
m_data_msg = msg_metadata(
m_msg_manager->get_message(op,m_bytes_needed),
m_msg_manager->get_incoming_message(op,m_bytes_needed),
frame::get_masking_key(m_basic_header,m_extended_header)
);
} else {
-185
View File
@@ -1,185 +0,0 @@
/*
Copyright (c) 2011, Micael Hildenborg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Micael Hildenborg nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY Micael Hildenborg ''AS IS'' AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL Micael Hildenborg BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
Contributors:
Gustav
Several members in the gamedev.se forum.
Gregory Petrosyan
*/
#include "sha1.h"
namespace sha1
{
namespace // local
{
// Rotate an integer value to left.
inline const unsigned int rol(const unsigned int value,
const unsigned int steps)
{
return ((value << steps) | (value >> (32 - steps)));
}
// Sets the first 16 integers in the buffert to zero.
// Used for clearing the W buffert.
inline void clearWBuffert(unsigned int* buffert)
{
for (int pos = 16; --pos >= 0;)
{
buffert[pos] = 0;
}
}
void innerHash(unsigned int* result, unsigned int* w)
{
unsigned int a = result[0];
unsigned int b = result[1];
unsigned int c = result[2];
unsigned int d = result[3];
unsigned int e = result[4];
int round = 0;
#define sha1macro(func,val) \
{ \
const unsigned int t = rol(a, 5) + (func) + e + val + w[round]; \
e = d; \
d = c; \
c = rol(b, 30); \
b = a; \
a = t; \
}
while (round < 16)
{
sha1macro((b & c) | (~b & d), 0x5a827999)
++round;
}
while (round < 20)
{
w[round] = rol((w[round - 3] ^ w[round - 8] ^ w[round - 14] ^ w[round - 16]), 1);
sha1macro((b & c) | (~b & d), 0x5a827999)
++round;
}
while (round < 40)
{
w[round] = rol((w[round - 3] ^ w[round - 8] ^ w[round - 14] ^ w[round - 16]), 1);
sha1macro(b ^ c ^ d, 0x6ed9eba1)
++round;
}
while (round < 60)
{
w[round] = rol((w[round - 3] ^ w[round - 8] ^ w[round - 14] ^ w[round - 16]), 1);
sha1macro((b & c) | (b & d) | (c & d), 0x8f1bbcdc)
++round;
}
while (round < 80)
{
w[round] = rol((w[round - 3] ^ w[round - 8] ^ w[round - 14] ^ w[round - 16]), 1);
sha1macro(b ^ c ^ d, 0xca62c1d6)
++round;
}
#undef sha1macro
result[0] += a;
result[1] += b;
result[2] += c;
result[3] += d;
result[4] += e;
}
} // namespace
void calc(const void* src, const int bytelength, unsigned char* hash)
{
// Init the result array.
unsigned int result[5] = { 0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0 };
// Cast the void src pointer to be the byte array we can work with.
const unsigned char* sarray = (const unsigned char*) src;
// The reusable round buffer
unsigned int w[80];
// Loop through all complete 64byte blocks.
const int endOfFullBlocks = bytelength - 64;
int endCurrentBlock;
int currentBlock = 0;
while (currentBlock <= endOfFullBlocks)
{
endCurrentBlock = currentBlock + 64;
// Init the round buffer with the 64 byte block data.
for (int roundPos = 0; currentBlock < endCurrentBlock; currentBlock += 4)
{
// This line will swap endian on big endian and keep endian on little endian.
w[roundPos++] = (unsigned int) sarray[currentBlock + 3]
| (((unsigned int) sarray[currentBlock + 2]) << 8)
| (((unsigned int) sarray[currentBlock + 1]) << 16)
| (((unsigned int) sarray[currentBlock]) << 24);
}
innerHash(result, w);
}
// Handle the last and not full 64 byte block if existing.
endCurrentBlock = bytelength - currentBlock;
clearWBuffert(w);
int lastBlockBytes = 0;
for (;lastBlockBytes < endCurrentBlock; ++lastBlockBytes)
{
w[lastBlockBytes >> 2] |= (unsigned int) sarray[lastBlockBytes + currentBlock] << ((3 - (lastBlockBytes & 3)) << 3);
}
w[lastBlockBytes >> 2] |= 0x80 << ((3 - (lastBlockBytes & 3)) << 3);
if (endCurrentBlock >= 56)
{
innerHash(result, w);
clearWBuffert(w);
}
w[15] = bytelength << 3;
innerHash(result, w);
// Store hash in result pointer, and make sure we get in in the correct order on both endian models.
for (int hashByte = 20; --hashByte >= 0;)
{
hash[hashByte] = (result[hashByte >> 2] >> (((3 - hashByte) & 0x3) << 3)) & 0xff;
}
}
void toHexString(const unsigned char* hash, char* hexstring)
{
const char hexDigits[] = { "0123456789abcdef" };
for (int hashByte = 20; --hashByte >= 0;)
{
hexstring[hashByte << 1] = hexDigits[(hash[hashByte] >> 4) & 0xf];
hexstring[(hashByte << 1) + 1] = hexDigits[hash[hashByte] & 0xf];
}
hexstring[40] = 0;
}
} // namespace sha1
-49
View File
@@ -1,49 +0,0 @@
/*
Copyright (c) 2011, Micael Hildenborg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Micael Hildenborg nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY Micael Hildenborg ''AS IS'' AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL Micael Hildenborg BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef SHA1_DEFINED
#define SHA1_DEFINED
namespace sha1
{
/**
@param src points to any kind of data to be hashed.
@param bytelength the number of bytes to hash from the src pointer.
@param hash should point to a buffer of at least 20 bytes of size for storing the sha1 result in.
*/
void calc(const void* src, const int bytelength, unsigned char* hash);
/**
@param hash is 20 bytes of sha1 hash. This is the same data that is the result from the calc function.
@param hexstring should point to a buffer of at least 41 bytes of size for storing the hexadecimal representation of the hash. A zero will be written at position 40, so the buffer will be a valid zero ended string.
*/
void toHexString(const unsigned char* hash, char* hexstring);
} // namespace sha1
#endif // SHA1_DEFINED
+123
View File
@@ -34,6 +34,10 @@
#include <boost/system/error_code.hpp>
#include <boost/aligned_storage.hpp>
#include <boost/noncopyable.hpp>
#include <boost/array.hpp>
#include <string>
namespace websocketpp {
@@ -45,6 +49,110 @@ namespace transport {
*/
namespace asio {
//
// Class to manage the memory to be used for handler-based custom allocation.
// It contains a single block of memory which may be returned for allocation
// requests. If the memory is in use when an allocation request is made, the
// allocator delegates allocation to the global heap.
class handler_allocator
: private boost::noncopyable
{
public:
handler_allocator()
: in_use_(false)
{
}
void* allocate(std::size_t size)
{
if (!in_use_ && size < storage_.size)
{
in_use_ = true;
return storage_.address();
}
else
{
return ::operator new(size);
}
}
void deallocate(void* pointer)
{
if (pointer == storage_.address())
{
in_use_ = false;
}
else
{
::operator delete(pointer);
}
}
private:
// Storage space used for handler-based custom memory allocation.
boost::aligned_storage<1024> storage_;
// Whether the handler-based custom allocation storage has been used.
bool in_use_;
};
// Wrapper class template for handler objects to allow handler memory
// allocation to be customised. Calls to operator() are forwarded to the
// encapsulated handler.
template <typename Handler>
class custom_alloc_handler
{
public:
custom_alloc_handler(handler_allocator& a, Handler h)
: allocator_(a),
handler_(h)
{
}
template <typename Arg1>
void operator()(Arg1 arg1)
{
handler_(arg1);
}
template <typename Arg1, typename Arg2>
void operator()(Arg1 arg1, Arg2 arg2)
{
handler_(arg1, arg2);
}
friend void* asio_handler_allocate(std::size_t size,
custom_alloc_handler<Handler>* this_handler)
{
return this_handler->allocator_.allocate(size);
}
friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,
custom_alloc_handler<Handler>* this_handler)
{
this_handler->allocator_.deallocate(pointer);
}
private:
handler_allocator& allocator_;
Handler handler_;
};
// Helper function to wrap a handler object to add custom allocation.
template <typename Handler>
inline custom_alloc_handler<Handler> make_custom_alloc_handler(
handler_allocator& a, Handler h)
{
return custom_alloc_handler<Handler>(a, h);
}
// Forward declaration of class endpoint so that it can be friended/referenced
// before being included.
template <typename config>
@@ -53,6 +161,21 @@ class endpoint;
typedef lib::function<void(boost::system::error_code const &)>
socket_shutdown_handler;
typedef lib::function<void (boost::system::error_code const & ec,
size_t bytes_transferred)> async_read_handler;
typedef lib::function<void (boost::system::error_code const & ec,
size_t bytes_transferred)> async_write_handler;
typedef lib::function<void (lib::error_code const & ec)> pre_init_handler;
// handle_timer: dynamic parameters, multiple copies
// handle_proxy_write
// handle_proxy_read
// handle_async_write
// handle_pre_init
/// Asio transport errors
namespace error {
enum value {
+195 -76
View File
@@ -109,15 +109,47 @@ public:
return socket_con_type::is_secure();
}
/// Sets the tcp init handler
/// Sets the tcp pre init handler
/**
* The tcp init handler is called after the tcp connection has been
* established.
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @param h The handler to call on tcp init.
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_pre_init_handler(tcp_init_handler h) {
m_tcp_pre_init_handler = h;
}
/// Sets the tcp pre init handler (deprecated)
/**
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @deprecated Use set_tcp_pre_init_handler instead
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_init_handler(tcp_init_handler h) {
m_tcp_init_handler = h;
set_tcp_pre_init_handler(h);
}
/// Sets the tcp post init handler
/**
* The tcp post init handler is called after the tcp connection has been
* established and all additional wrappers (proxy connects, TLS handshakes,
* etc have been performed. This is fired before any bytes are read or any
* WebSocket specific handshake logic has been performed.
*
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp post init.
*/
void set_tcp_post_init_handler(tcp_init_handler h) {
m_tcp_post_init_handler = h;
}
/// Set the proxy to connect through (exception free)
@@ -264,15 +296,21 @@ public:
)
);
new_timer->async_wait(
m_strand->wrap(lib::bind(
&type::handle_timer,
get_shared(),
if (config::enable_multithreading) {
new_timer->async_wait(m_strand->wrap(lib::bind(
&type::handle_timer, get_shared(),
new_timer,
callback,
lib::placeholders::_1
))
);
)));
} else {
new_timer->async_wait(lib::bind(
&type::handle_timer, get_shared(),
new_timer,
callback,
lib::placeholders::_1
));
}
return new_timer;
}
@@ -288,8 +326,8 @@ public:
* @param callback The function to call back
* @param ec The status code
*/
void handle_timer(timer_ptr t, timer_handler callback, const
boost::system::error_code& ec)
void handle_timer(timer_ptr t, timer_handler callback,
boost::system::error_code const & ec)
{
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
@@ -333,11 +371,12 @@ protected:
// TODO: pre-init timeout. Right now no implemented socket policies
// actually have an asyncronous pre-init
m_init_handler = callback;
socket_con_type::pre_init(
lib::bind(
&type::handle_pre_init,
get_shared(),
callback,
lib::placeholders::_1
)
);
@@ -379,34 +418,57 @@ protected:
// do we need to store or use the io_service at this level?
m_io_service = io_service;
m_strand.reset(new boost::asio::strand(*io_service));
if (config::enable_multithreading) {
m_strand.reset(new boost::asio::strand(*io_service));
m_async_read_handler = m_strand->wrap(lib::bind(
&type::handle_async_read, get_shared(),
lib::placeholders::_1, lib::placeholders::_2
));
m_async_write_handler = m_strand->wrap(lib::bind(
&type::handle_async_write, get_shared(),
lib::placeholders::_1, lib::placeholders::_2
));
} else {
// TODO: goal: not have this line here
//m_strand.reset(new boost::asio::strand(*io_service));
m_async_read_handler = lib::bind(
&type::handle_async_read, get_shared(),
lib::placeholders::_1, lib::placeholders::_2
);
m_async_write_handler = lib::bind(&type::handle_async_write,
get_shared(), lib::placeholders::_1, lib::placeholders::_2);
}
return socket_con_type::init_asio(io_service, m_strand, m_is_server);
}
void handle_pre_init(init_handler callback, const lib::error_code& ec) {
void handle_pre_init(lib::error_code const & ec) {
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection handle pre_init");
}
if (m_tcp_init_handler) {
m_tcp_init_handler(m_connection_hdl);
if (m_tcp_pre_init_handler) {
m_tcp_pre_init_handler(m_connection_hdl);
}
if (ec) {
callback(ec);
m_init_handler(ec);
}
// If we have a proxy set issue a proxy connect, otherwise skip to
// post_init
if (!m_proxy.empty()) {
proxy_write(callback);
proxy_write();
} else {
post_init(callback);
post_init();
}
}
void post_init(init_handler callback) {
void post_init() {
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection post_init");
}
@@ -418,7 +480,7 @@ protected:
&type::handle_post_init_timeout,
get_shared(),
post_timer,
callback,
m_init_handler,
lib::placeholders::_1
)
);
@@ -428,7 +490,7 @@ protected:
&type::handle_post_init,
get_shared(),
post_timer,
callback,
m_init_handler,
lib::placeholders::_1
)
);
@@ -477,10 +539,14 @@ protected:
m_alog.write(log::alevel::devel,"asio connection handle_post_init");
}
if (m_tcp_post_init_handler) {
m_tcp_post_init_handler(m_connection_hdl);
}
callback(ec);
}
void proxy_write(init_handler callback) {
void proxy_write() {
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection proxy_write");
}
@@ -488,7 +554,7 @@ protected:
if (!m_proxy_data) {
m_elog.write(log::elevel::library,
"assertion failed: !m_proxy_data in asio::connection::proxy_write");
callback(make_error_code(error::general));
m_init_handler(make_error_code(error::general));
return;
}
@@ -505,25 +571,37 @@ protected:
lib::bind(
&type::handle_proxy_timeout,
get_shared(),
callback,
m_init_handler,
lib::placeholders::_1
)
);
// Send proxy request
boost::asio::async_write(
socket_con_type::get_next_layer(),
m_bufs,
m_strand->wrap(lib::bind(
&type::handle_proxy_write,
get_shared(),
callback,
lib::placeholders::_1
))
);
if (config::enable_multithreading) {
boost::asio::async_write(
socket_con_type::get_next_layer(),
m_bufs,
m_strand->wrap(lib::bind(
&type::handle_proxy_write, get_shared(),
m_init_handler,
lib::placeholders::_1
))
);
} else {
boost::asio::async_write(
socket_con_type::get_next_layer(),
m_bufs,
lib::bind(
&type::handle_proxy_write, get_shared(),
m_init_handler,
lib::placeholders::_1
)
);
}
}
void handle_proxy_timeout(init_handler callback, const lib::error_code & ec) {
void handle_proxy_timeout(init_handler callback, lib::error_code const & ec)
{
if (ec == transport::error::operation_aborted) {
m_alog.write(log::alevel::devel,
"asio handle_proxy_write timer cancelled");
@@ -539,8 +617,8 @@ protected:
}
}
void handle_proxy_write(init_handler callback, const
boost::system::error_code& ec)
void handle_proxy_write(init_handler callback,
boost::system::error_code const & ec)
{
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection handle_proxy_write");
@@ -581,22 +659,33 @@ protected:
return;
}
boost::asio::async_read_until(
socket_con_type::get_next_layer(),
m_proxy_data->read_buf,
"\r\n\r\n",
m_strand->wrap(lib::bind(
&type::handle_proxy_read,
get_shared(),
callback,
lib::placeholders::_1,
lib::placeholders::_2
))
);
if (config::enable_multithreading) {
boost::asio::async_read_until(
socket_con_type::get_next_layer(),
m_proxy_data->read_buf,
"\r\n\r\n",
m_strand->wrap(lib::bind(
&type::handle_proxy_read, get_shared(),
callback,
lib::placeholders::_1, lib::placeholders::_2
))
);
} else {
boost::asio::async_read_until(
socket_con_type::get_next_layer(),
m_proxy_data->read_buf,
"\r\n\r\n",
lib::bind(
&type::handle_proxy_read, get_shared(),
callback,
lib::placeholders::_1, lib::placeholders::_2
)
);
}
}
void handle_proxy_read(init_handler callback, const
boost::system::error_code& ec, size_t bytes_transferred)
void handle_proxy_read(init_handler callback,
boost::system::error_code const & ec, size_t bytes_transferred)
{
if (m_alog.static_test(log::alevel::devel)) {
m_alog.write(log::alevel::devel,"asio connection handle_proxy_read");
@@ -667,7 +756,7 @@ protected:
m_proxy_data.reset();
// Continue with post proxy initialization
post_init(callback);
post_init();
}
}
@@ -685,46 +774,51 @@ protected:
m_alog.write(log::alevel::devel,s.str());
}
if (num_bytes > len) {
// TODO: safety vs speed ?
// maybe move into an if devel block
/*if (num_bytes > len) {
m_elog.write(log::elevel::devel,
"asio async_read_at_least error::invalid_num_bytes");
handler(make_error_code(transport::error::invalid_num_bytes),
size_t(0));
return;
}
}*/
m_read_handler = handler;
boost::asio::async_read(
socket_con_type::get_socket(),
boost::asio::buffer(buf,len),
boost::asio::transfer_at_least(num_bytes),
m_strand->wrap(lib::bind(
/*m_strand->wrap(lib::bind(
&type::handle_async_read,
get_shared(),
handler,
lib::placeholders::_1,
lib::placeholders::_2
))
))*/
make_custom_alloc_handler(m_read_handler_allocator,m_async_read_handler)
);
}
void handle_async_read(read_handler handler, const
boost::system::error_code& ec, size_t bytes_transferred)
void handle_async_read(const boost::system::error_code& ec,
size_t bytes_transferred)
{
if (!ec) {
handler(lib::error_code(), bytes_transferred);
m_read_handler(lib::error_code(), bytes_transferred);
return;
}
// translate boost error codes into more lib::error_codes
if (ec == boost::asio::error::eof) {
handler(make_error_code(transport::error::eof),
m_read_handler(make_error_code(transport::error::eof),
bytes_transferred);
} else if (ec.value() == 335544539) {
handler(make_error_code(transport::error::tls_short_read),
m_read_handler(make_error_code(transport::error::tls_short_read),
bytes_transferred);
} else {
log_err(log::elevel::info,"asio async_read_at_least",ec);
handler(make_error_code(transport::error::pass_through),
m_read_handler(make_error_code(transport::error::pass_through),
bytes_transferred);
}
}
@@ -732,15 +826,18 @@ protected:
void async_write(const char* buf, size_t len, write_handler handler) {
m_bufs.push_back(boost::asio::buffer(buf,len));
m_write_handler = handler;
boost::asio::async_write(
socket_con_type::get_socket(),
m_bufs,
m_strand->wrap(lib::bind(
/*m_strand->wrap(lib::bind(
&type::handle_async_write,
get_shared(),
handler,
lib::placeholders::_1
))
))*/
make_custom_alloc_handler(m_write_handler_allocator,m_async_write_handler)
);
}
@@ -751,27 +848,30 @@ protected:
m_bufs.push_back(boost::asio::buffer((*it).buf,(*it).len));
}
m_write_handler = handler;
boost::asio::async_write(
socket_con_type::get_socket(),
m_bufs,
m_strand->wrap(lib::bind(
/*m_strand->wrap(lib::bind(
&type::handle_async_write,
get_shared(),
handler,
lib::placeholders::_1
))
))*/
make_custom_alloc_handler(m_write_handler_allocator,m_async_write_handler)
);
}
void handle_async_write(write_handler handler, const
boost::system::error_code& ec)
void handle_async_write(boost::system::error_code const & ec,
size_t bytes_transferred)
{
m_bufs.clear();
if (ec) {
log_err(log::elevel::info,"asio async_write",ec);
handler(make_error_code(transport::error::pass_through));
m_write_handler(make_error_code(transport::error::pass_through));
} else {
handler(lib::error_code());
m_write_handler(lib::error_code());
}
}
@@ -792,12 +892,20 @@ protected:
* This needs to be thread safe
*/
lib::error_code interrupt(interrupt_handler handler) {
m_io_service->post(m_strand->wrap(handler));
if (config::enable_multithreading) {
m_io_service->post(m_strand->wrap(handler));
} else {
m_io_service->post(handler);
}
return lib::error_code();
}
lib::error_code dispatch(dispatch_handler handler) {
m_io_service->post(m_strand->wrap(handler));
if (config::enable_multithreading) {
m_io_service->post(m_strand->wrap(handler));
} else {
m_io_service->post(handler);
}
return lib::error_code();
}
@@ -926,7 +1034,18 @@ private:
std::vector<boost::asio::const_buffer> m_bufs;
// Handlers
tcp_init_handler m_tcp_init_handler;
tcp_init_handler m_tcp_pre_init_handler;
tcp_init_handler m_tcp_post_init_handler;
handler_allocator m_read_handler_allocator;
handler_allocator m_write_handler_allocator;
read_handler m_read_handler;
write_handler m_write_handler;
init_handler m_init_handler;
async_read_handler m_async_read_handler;
async_write_handler m_async_write_handler;
};
+123 -41
View File
@@ -231,18 +231,56 @@ public:
return *m_io_service;
}
/// Sets the tcp init handler
/// Sets the default tcp pre init handler
/**
* The tcp init handler is called after the tcp connection has been
* established.
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @see WebSocket++ handler documentation for more information about
* handlers.
*
* @param h The handler to call on tcp init.
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_pre_init_handler(tcp_init_handler h) {
m_tcp_pre_init_handler = h;
}
/// Sets the default tcp pre init handler (deprecated)
/**
* The tcp pre init handler is called after the raw tcp connection has been
* established but before any additional wrappers (proxy connects, TLS
* handshakes, etc) have been performed.
*
* @see WebSocket++ handler documentation for more information about
* handlers.
*
* @deprecated Use set_tcp_pre_init_handler instead
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_init_handler(tcp_init_handler h) {
m_tcp_init_handler = h;
set_tcp_pre_init_handler(h);
}
/// Sets the default tcp pre init handler
/**
* The tcp post init handler is called after the tcp connection has been
* established and all additional wrappers (proxy connects, TLS handshakes,
* etc have been performed. This is fired before any bytes are read or any
* WebSocket specific handshake logic has been performed.
*
* @see WebSocket++ handler documentation for more information about
* handlers.
*
* @since 0.4.0-alpha1
*
* @param h The handler to call on tcp pre init.
*/
void set_tcp_post_init_handler(tcp_init_handler h) {
m_tcp_post_init_handler = h;
}
/// Set up endpoint for listening manually (exception free)
@@ -268,7 +306,7 @@ public:
m_acceptor->open(ep.protocol());
m_acceptor->set_option(boost::asio::socket_base::reuse_address(true));
m_acceptor->bind(ep);
m_acceptor->listen();
m_acceptor->listen(8192); // this should be a settable parameter
m_state = LISTENING;
ec = lib::error_code();
}
@@ -572,15 +610,27 @@ public:
m_alog->write(log::alevel::devel, "asio::async_accept");
m_acceptor->async_accept(
tcon->get_raw_socket(),
tcon->get_strand()->wrap(lib::bind(
&type::handle_accept,
this,
callback,
lib::placeholders::_1
))
);
if (config::enable_multithreading) {
m_acceptor->async_accept(
tcon->get_raw_socket(),
tcon->get_strand()->wrap(lib::bind(
&type::handle_accept,
this,
callback,
lib::placeholders::_1
))
);
} else {
m_acceptor->async_accept(
tcon->get_raw_socket(),
lib::bind(
&type::handle_accept,
this,
callback,
lib::placeholders::_1
)
);
}
}
/// Accept the next connection attempt and assign it to con.
@@ -683,18 +733,33 @@ protected:
)
);
m_resolver->async_resolve(
query,
tcon->get_strand()->wrap(lib::bind(
&type::handle_resolve,
this,
tcon,
dns_timer,
cb,
lib::placeholders::_1,
lib::placeholders::_2
))
);
if (config::enable_multithreading) {
m_resolver->async_resolve(
query,
tcon->get_strand()->wrap(lib::bind(
&type::handle_resolve,
this,
tcon,
dns_timer,
cb,
lib::placeholders::_1,
lib::placeholders::_2
))
);
} else {
m_resolver->async_resolve(
query,
lib::bind(
&type::handle_resolve,
this,
tcon,
dns_timer,
cb,
lib::placeholders::_1,
lib::placeholders::_2
)
);
}
}
void handle_resolve_timeout(timer_ptr dns_timer, connect_handler callback,
@@ -767,18 +832,33 @@ protected:
)
);
boost::asio::async_connect(
tcon->get_raw_socket(),
iterator,
tcon->get_strand()->wrap(lib::bind(
&type::handle_connect,
this,
tcon,
con_timer,
callback,
lib::placeholders::_1
))
);
if (config::enable_multithreading) {
boost::asio::async_connect(
tcon->get_raw_socket(),
iterator,
tcon->get_strand()->wrap(lib::bind(
&type::handle_connect,
this,
tcon,
con_timer,
callback,
lib::placeholders::_1
))
);
} else {
boost::asio::async_connect(
tcon->get_raw_socket(),
iterator,
lib::bind(
&type::handle_connect,
this,
tcon,
con_timer,
callback,
lib::placeholders::_1
)
);
}
}
void handle_connect_timeout(transport_con_ptr tcon, timer_ptr con_timer,
@@ -857,7 +937,8 @@ protected:
ec = tcon->init_asio(m_io_service);
if (ec) {return ec;}
tcon->set_tcp_init_handler(m_tcp_init_handler);
tcon->set_tcp_pre_init_handler(m_tcp_pre_init_handler);
tcon->set_tcp_post_init_handler(m_tcp_post_init_handler);
return lib::error_code();
}
@@ -877,7 +958,8 @@ private:
};
// Handlers
tcp_init_handler m_tcp_init_handler;
tcp_init_handler m_tcp_pre_init_handler;
tcp_init_handler m_tcp_post_init_handler;
// Network Resources
io_service_ptr m_io_service;
+19 -9
View File
@@ -228,15 +228,25 @@ protected:
m_ec = socket::make_error_code(socket::error::tls_handshake_timeout);
// TLS handshake
m_socket->async_handshake(
get_handshake_type(),
m_strand->wrap(lib::bind(
&type::handle_init,
get_shared(),
callback,
lib::placeholders::_1
))
);
if (m_strand) {
m_socket->async_handshake(
get_handshake_type(),
m_strand->wrap(lib::bind(
&type::handle_init, get_shared(),
callback,
lib::placeholders::_1
))
);
} else {
m_socket->async_handshake(
get_handshake_type(),
lib::bind(
&type::handle_init, get_shared(),
callback,
lib::placeholders::_1
)
);
}
}
/// Sets the connection handle
+16 -1
View File
@@ -282,7 +282,22 @@ public:
return s.str();
}
// get query?
/// Return the query portion
/**
* Returns the query portion (after the ?) of the URI or an empty string if
* there is none.
*
* @return query portion of the URI.
*/
std::string get_query() const {
std::size_t found = m_resource.find('?');
if (found != std::string::npos) {
return m_resource.substr(found + 1);
} else {
return "";
}
}
// get fragment
// hi <3