From 77f1bf593d9796e7fec26bb4b430623cc7f8f341 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Mon, 8 Dec 2014 18:56:44 -0500 Subject: [PATCH] Adds telemetry_server example --- SConstruct | 3 + examples/telemetry_server/CMakeLists.txt | 10 + examples/telemetry_server/SConscript | 23 ++ examples/telemetry_server/index.html | 85 +++++++ .../telemetry_server/telemetry_server.cpp | 211 ++++++++++++++++++ 5 files changed, 332 insertions(+) create mode 100644 examples/telemetry_server/CMakeLists.txt create mode 100644 examples/telemetry_server/SConscript create mode 100644 examples/telemetry_server/index.html create mode 100644 examples/telemetry_server/telemetry_server.cpp diff --git a/SConstruct b/SConstruct index 2f35a05..3da1733 100644 --- a/SConstruct +++ b/SConstruct @@ -239,6 +239,9 @@ debug_server = SConscript('#/examples/debug_server/SConscript',variant_dir = bui # subprotocol_server subprotocol_server = SConscript('#/examples/subprotocol_server/SConscript',variant_dir = builddir + 'subprotocol_server',duplicate = 0) +# telemetry_server +telemetry_server = SConscript('#/examples/telemetry_server/SConscript',variant_dir = builddir + 'telemetry_server',duplicate = 0) + if not env['PLATFORM'].startswith('win'): # iostream_server iostream_server = SConscript('#/examples/iostream_server/SConscript',variant_dir = builddir + 'iostream_server',duplicate = 0) diff --git a/examples/telemetry_server/CMakeLists.txt b/examples/telemetry_server/CMakeLists.txt new file mode 100644 index 0000000..7ee569b --- /dev/null +++ b/examples/telemetry_server/CMakeLists.txt @@ -0,0 +1,10 @@ + +file (GLOB SOURCE_FILES *.cpp) +file (GLOB HEADER_FILES *.hpp) + +init_target (telemetry_server) + +build_executable (${TARGET_NAME} ${SOURCE_FILES} ${HEADER_FILES}) + +link_boost () +final_target () diff --git a/examples/telemetry_server/SConscript b/examples/telemetry_server/SConscript new file mode 100644 index 0000000..1b8ff22 --- /dev/null +++ b/examples/telemetry_server/SConscript @@ -0,0 +1,23 @@ +## Main development example +## + +Import('env') +Import('env_cpp11') +Import('boostlibs') +Import('platform_libs') +Import('polyfill_libs') + +env = env.Clone () +env_cpp11 = env_cpp11.Clone () + +prgs = [] + +# if a C++11 environment is available build using that, otherwise use boost +if env_cpp11.has_key('WSPP_CPP11_ENABLED'): + ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs] + prgs += env_cpp11.Program('telemetry_server', ["telemetry_server.cpp"], LIBS = ALL_LIBS) +else: + ALL_LIBS = boostlibs(['system'],env) + [platform_libs] + [polyfill_libs] + prgs += env.Program('telemetry_server', ["telemetry_server.cpp"], LIBS = ALL_LIBS) + +Return('prgs') diff --git a/examples/telemetry_server/index.html b/examples/telemetry_server/index.html new file mode 100644 index 0000000..def50dd --- /dev/null +++ b/examples/telemetry_server/index.html @@ -0,0 +1,85 @@ + + + +WebSocket++ Telemetry Client + + + + + + + +
+
+
+ +
+
+
+ + + diff --git a/examples/telemetry_server/telemetry_server.cpp b/examples/telemetry_server/telemetry_server.cpp new file mode 100644 index 0000000..2582a2b --- /dev/null +++ b/examples/telemetry_server/telemetry_server.cpp @@ -0,0 +1,211 @@ +#include + +#include + +#include +#include +#include +#include +#include + +/** + * The telemetry server accepts connections and sends a message every second to + * each client containing an integer count. This example can be used as the + * basis for programs that expose a stream of telemetry data for logging, + * dashboards, etc. + * + * This example uses the timer based concurrency method and is self contained + * and singled threaded. Refer to telemetry client for an example of a similar + * telemetry setup using threads rather than timers. + * + * This example also includes an example simple HTTP server that serves a web + * dashboard displaying the count. This simple design is suitable for use + * delivering a small number of files to a small number of clients. It is ideal + * for cases like embedded dashboards that don't want the complexity of an extra + * HTTP server to serve static files. + * + * This design *will* fall over under high traffic or DoS conditions. In such + * cases you are much better off proxying to a real HTTP server for the http + * requests. + */ +class telemetry_server { +public: + typedef websocketpp::connection_hdl connection_hdl; + typedef websocketpp::server server; + typedef websocketpp::lib::lock_guard scoped_lock; + + telemetry_server() : m_count(0) { + // set up access channels to only log interesting things + m_endpoint.clear_access_channels(websocketpp::log::alevel::all); + m_endpoint.set_access_channels(websocketpp::log::alevel::connect); + m_endpoint.set_access_channels(websocketpp::log::alevel::disconnect); + m_endpoint.set_access_channels(websocketpp::log::alevel::app); + + // Initialize the Asio transport policy + m_endpoint.init_asio(); + + // Bind the handlers we are using + using websocketpp::lib::placeholders::_1; + using websocketpp::lib::bind; + m_endpoint.set_open_handler(bind(&telemetry_server::on_open,this,::_1)); + m_endpoint.set_close_handler(bind(&telemetry_server::on_close,this,::_1)); + m_endpoint.set_fail_handler(bind(&telemetry_server::on_fail,this,::_1)); + m_endpoint.set_http_handler(bind(&telemetry_server::on_http,this,::_1)); + } + + void run(std::string docroot, uint16_t port) { + std::stringstream ss; + ss << "Running telemetry server on port "<< port <<" using docroot=" << docroot; + m_endpoint.get_alog().write(websocketpp::log::alevel::app,ss.str()); + + m_docroot = docroot; + + // listen on specified port + m_endpoint.listen(port); + + // Start the server accept loop + m_endpoint.start_accept(); + + // Set the initial timer to start telemetry + set_timer(); + + // Start the ASIO io_service run loop + try { + m_endpoint.run(); + } catch (websocketpp::exception const & e) { + std::cout << e.what() << std::endl; + } + } + + void set_timer() { + m_timer = m_endpoint.set_timer( + 1000, + websocketpp::lib::bind( + &telemetry_server::on_timer, + this, + websocketpp::lib::placeholders::_1 + ) + ); + } + + void on_timer(websocketpp::lib::error_code const & ec) { + if (ec) { + // there was an error, stop telemetry + m_endpoint.get_alog().write(websocketpp::log::alevel::app, + "Timer Error: "+ec.message()); + return; + } + + std::stringstream val; + val << "count is " << m_count++; + + // Broadcast count to all connections + con_list::iterator it; + for (it = m_connections.begin(); it != m_connections.end(); ++it) { + m_endpoint.send(*it,val.str(),websocketpp::frame::opcode::text); + } + + // set timer for next telemetry check + set_timer(); + } + + void on_http(connection_hdl hdl) { + // Upgrade our connection handle to a full connection_ptr + server::connection_ptr con = m_endpoint.get_con_from_hdl(hdl); + + std::ifstream file; + std::string filename = con->get_uri()->get_resource(); + std::string response; + + m_endpoint.get_alog().write(websocketpp::log::alevel::app, + "http request1: "+filename); + + if (filename == "/") { + filename = m_docroot+"index.html"; + } else { + filename = m_docroot+filename.substr(1); + } + + m_endpoint.get_alog().write(websocketpp::log::alevel::app, + "http request2: "+filename); + + file.open(filename.c_str(), std::ios::in); + if (!file) { + // 404 error + std::stringstream ss; + + ss << "" + << "Error 404 (Resource not found)" + << "

Error 404

" + << "

The requested URL " << filename << " was not found on this server.

" + << ""; + + con->set_body(ss.str()); + con->set_status(websocketpp::http::status_code::not_found); + return; + } + + file.seekg(0, std::ios::end); + response.reserve(file.tellg()); + file.seekg(0, std::ios::beg); + + response.assign((std::istreambuf_iterator(file)), + std::istreambuf_iterator()); + + con->set_body(response); + con->set_status(websocketpp::http::status_code::ok); + } + + // The open handler will signal that we are ready to start sending telemetry + void on_open(connection_hdl hdl) { + m_connections.insert(hdl); + } + + // The close handler will signal that we should stop sending telemetry + void on_close(connection_hdl hdl) { + m_connections.erase(hdl); + } + + // The fail handler will signal that we should stop sending telemetry + void on_fail(connection_hdl) { + } +private: + typedef std::set> con_list; + + server m_endpoint; + con_list m_connections; + server::timer_ptr m_timer; + + std::string m_docroot; + + // Telemetry data + uint64_t m_count; +}; + +int main(int argc, char* argv[]) { + telemetry_server s; + + std::string docroot; + uint16_t port = 9002; + + if (argc == 1) { + std::cout << "Usage: telemetry_server [documentroot] [port]" << std::endl; + } + + if (argc >= 2) { + docroot = std::string(argv[1]); + } + + if (argc >= 3) { + int i = atoi(argv[2]); + if (i <= 0 || i > 65535) { + std::cout << "invalid port" << std::endl; + return 1; + } + + port = uint16_t(i); + } + + s.run(docroot, port); + return 0; +} \ No newline at end of file