diff --git a/example/websocket/client/CMakeLists.txt b/example/websocket/client/CMakeLists.txt index e399c3daa2..66b61a101f 100644 --- a/example/websocket/client/CMakeLists.txt +++ b/example/websocket/client/CMakeLists.txt @@ -17,5 +17,6 @@ if (OPENSSL_FOUND) add_subdirectory(async-ssl) add_subdirectory(async-ssl-system-executor) add_subdirectory(coro-ssl) + add_subdirectory(crypto-ai-ssl) add_subdirectory(sync-ssl) endif () diff --git a/example/websocket/client/Jamfile b/example/websocket/client/Jamfile index bd4402181d..0f1cec2d23 100644 --- a/example/websocket/client/Jamfile +++ b/example/websocket/client/Jamfile @@ -17,4 +17,5 @@ build-project sync ; build-project async-ssl ; build-project async-ssl-system-executor ; build-project coro-ssl ; +build-project crypto-ai-ssl ; build-project sync-ssl ; diff --git a/example/websocket/client/crypto-ai-ssl/CMakeLists.txt b/example/websocket/client/crypto-ai-ssl/CMakeLists.txt new file mode 100644 index 0000000000..e415e5bfd1 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/CMakeLists.txt @@ -0,0 +1,31 @@ +# +# Copyright (c) 2025 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/boostorg/beast +# + +add_executable(websocket-client-crypto-ai-ssl + Jamfile + ai_querier.cpp + historic_price_fetcher.cpp + live_price_listener.cpp + websocket_client_crypto_ai_ssl.cpp) + +source_group("" FILES + Jamfile + ai_querier.cpp + historic_price_fetcher.cpp + live_price_listener.cpp + websocket_client_crypto_ai_ssl.cpp) + +target_include_directories(websocket-client-crypto-ai-ssl + PRIVATE ${PROJECT_SOURCE_DIR}) + +target_link_libraries(websocket-client-crypto-ai-ssl + PRIVATE Boost::beast Boost::json Boost::url OpenSSL::SSL OpenSSL::Crypto) + +set_target_properties(websocket-client-crypto-ai-ssl + PROPERTIES FOLDER "example-websocket-client") diff --git a/example/websocket/client/crypto-ai-ssl/Jamfile b/example/websocket/client/crypto-ai-ssl/Jamfile new file mode 100644 index 0000000000..6b8ae75055 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/Jamfile @@ -0,0 +1,25 @@ +# +# Copyright (c) 2015 Mungo Gill +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/boostorg/beast +# + +import ac ; + +project + : requirements + [ ac.check-library /boost/beast/test//lib-asio-ssl : /boost/beast/test//lib-asio-ssl/static : no ] + ; + +exe websocket-client-crypto-ai-ssl : + ai_querier.cpp + historic_price_fetcher.cpp + live_price_listener.cpp + websocket_client_crypto_ai_ssl.cpp + : + coverage:no + ubasan:no + ; diff --git a/example/websocket/client/crypto-ai-ssl/historic_price_fetcher.cpp b/example/websocket/client/crypto-ai-ssl/historic_price_fetcher.cpp new file mode 100644 index 0000000000..7fa69752ad --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/historic_price_fetcher.cpp @@ -0,0 +1,34 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#include "historic_price_fetcher.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + diff --git a/example/websocket/client/crypto-ai-ssl/historic_price_fetcher.hpp b/example/websocket/client/crypto-ai-ssl/historic_price_fetcher.hpp new file mode 100644 index 0000000000..6114465820 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/historic_price_fetcher.hpp @@ -0,0 +1,433 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_EXAMPLE_HISTORIC_PRICE_FETCHER +#define BOOST_BEAST_EXAMPLE_HISTORIC_PRICE_FETCHER + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "processor_base.hpp" + +namespace boost { + +template +class historic_fetcher; + +template +class historic_fetcher_op +{ + struct on_resolve {}; + struct on_connect {}; + struct on_ssl_handshake {}; + struct on_write_request {}; + struct on_read_result {}; + + using resolver_type = asio::ip::basic_resolver< + asio::ip::tcp, Executor>; + + using tcp_stream_type = beast::basic_stream< + asio::ip::tcp, + Executor>; + + using ssl_stream_type = boost::asio::ssl::stream; + + using client_type = historic_fetcher; + + using buffer_type = boost::beast::flat_buffer; + using request_type = beast::http::request; + using response_type = beast::http::response; + +public: + + explicit + historic_fetcher_op( + client_type& client) + : client_(client) + , start_of_day_(0) + { + } + + template + void operator()( + Self& self) + { + // Look up the domain name + client_.resolver_.async_resolve( + client_.get_host(), + "https", + asio::cancel_after( + std::chrono::seconds(30), + asio::prepend(std::move(self), on_resolve{})) + ); + }; + + + template + void operator()( + Self& self + , on_resolve + , system::error_code ec + , asio::ip::tcp::resolver::results_type results) + { + if (ec) { + return do_complete(self, ec); + } + + // Make the connection on the IP address we get from a lookup + beast::get_lowest_layer(client_.ssl_stream_).async_connect( + results, + asio::cancel_after( + std::chrono::seconds(30), + asio::prepend(std::move(self), on_connect{}))); + + }; + + + + template + void operator()( + Self& self + , on_connect + , system::error_code ec + , asio::ip::tcp::resolver::results_type::endpoint_type ep) + { + boost::ignore_unused(ep); + + if (ec) { + return do_complete(self, ec); + } + + // Set the expected hostname in the peer certificate for verification + client_.ssl_stream_.set_verify_callback(boost::asio::ssl::host_name_verification(client_.get_host())); + + // Set SNI Hostname (many hosts need this to handshake successfully) + if (!SSL_set_tlsext_host_name(client_.ssl_stream_.native_handle(), client_.get_host())) + { + system::error_code ssl_ec{ + static_cast(::ERR_get_error()), + asio::error::get_ssl_category() }; + return do_complete(self, ssl_ec); + } + + // Perform the SSL handshake + client_.ssl_stream_.async_handshake( + boost::asio::ssl::stream_base::client, + asio::cancel_after( + std::chrono::seconds(30), + asio::prepend(std::move(self), on_ssl_handshake{})) + ); + }; + + + template + void operator()( + Self& self + , on_ssl_handshake + , system::error_code ec) + { + if (ec) { + return do_complete(self, ec); + } + + // Find the subscription start time + posix_time::ptime ptime = posix_time::second_clock::universal_time(); + posix_time::ptime sod = posix_time::ptime(ptime.date()); + start_of_day_ = posix_time::to_time_t(sod); + + // Set up an HTTP GET request message + client_.request_.version(11); + client_.request_.method(beast::http::verb::get); + client_.request_.set(beast::http::field::host, client_.get_host()); + client_.request_.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING); + + // If there are any coins left to request, request one. + return send_next_request(self); + }; + + template + void operator()( + Self& self + , on_write_request + , system::error_code ec + , std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) { + return do_complete(self, ec); + } + + // Read a message into our buffer + beast::http::async_read( + client_.ssl_stream_, + client_.buffer_, + client_.response_, + asio::cancel_after( + std::chrono::seconds(30), + asio::prepend(std::move(self), on_read_result{})) + ); + }; + + template + void operator()( + Self& self + , on_read_result + , system::error_code ec + , std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec == beast::http::error::end_of_stream) { + system::error_code ok{}; + return do_complete(self, ok); + } + else if (ec) { + return do_complete(self, ec); + } + + // Process the received message + + // Write the message to standard out + //std::cout << "Response: " << response_ << "\n" << std::endl; + std::cout << "Body: " << client_.response_.body() << "\n\n" << std::endl; + + // The response can be quite long, and we can avoid an allocation + // by swapping the body into an empty string. + std::string temp; + temp.swap(client_.response_.body()); + + client_.process_response(temp, start_of_day_, ec); + if (ec) { + return do_complete(self, ec); + } + + send_next_request(self); + + }; + + template + void do_complete(Self& self, system::error_code ec) + { + self.complete(ec); + client_.running_.clear(); + return; + } + + template + void send_next_request(Self& self) + { + if (!client_.requests_outstanding()) { + system::error_code ok{}; + return do_complete(self, ok); + } + + // Set up an HTTP GET request message + client_.request_.target(client_.next_request()); + + // Send the message + beast::http::async_write( + client_.ssl_stream_, + client_.request_, + asio::cancel_after( + std::chrono::seconds(30), + asio::prepend(std::move(self), on_write_request{})) + ); + } + +private: + client_type& client_; + std::time_t start_of_day_; +}; + +template +class historic_fetcher +{ + using resolver_type = asio::ip::basic_resolver< + asio::ip::tcp, Executor>; + + using tcp_stream_type = beast::basic_stream< + asio::ip::tcp, + Executor>; + + using ssl_stream_type = boost::asio::ssl::stream; + + using buffer_type = boost::beast::flat_buffer; + using request_type = beast::http::request; + using response_type = beast::http::response; + + friend class historic_fetcher_op; + +public: + + explicit + historic_fetcher( + Executor& exec + , boost::asio::ssl::context& ctx + , const boost::string_view host + , std::function receive_handler) + : receive_handler_(receive_handler) + , resolver_(exec) + , ssl_stream_(exec, ctx) + , host_(host) + { + running_.clear(); + } + + template< + BOOST_ASIO_COMPLETION_TOKEN_FOR(void( + system::error_code)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void( + system::error_code)) + async_historic_fetch( + const std::vector& coins, + CompletionToken&& token) + { + coins_ = coins; + + bool already_running = running_.test_and_set(); + BOOST_ASSERT(!already_running); + + return asio::async_compose< + CompletionToken, + void(system::error_code)>( + historic_fetcher_op( + *this) + , token + , ssl_stream_); + } + +private: + const char* get_host() { + return host_.c_str(); + } + + bool requests_outstanding() + { + return coins_.size() != 0; + } + + std::string next_request() + { + current_coin_ = coins_.back(); + coins_.pop_back(); + + urls::url url = + urls::format( + "{}://api.coinbase.com/api/v3/brokerage/market/products/{}/candles", + "https", + current_coin_); + + // Data + url.params().append({ "granularity", "ONE_MINUTE" }); + //url.params().append({ "start", std::to_string(start_of_day_) }); + url.params().append({ "limit", std::to_string(5) }); + + return url.buffer(); + } + + void process_response( + boost::core::string_view str, std::time_t start_of_day, boost::system::error_code& ec) + { + parser_.reset(); + + parser_.write(str, ec); + + if (ec) return; + + json::value jv(parser_.release()); + + // Design note: the json parsing could be done without using the exception + // interface, but the resulting code would be considerably more verbose. + try { + auto candle_list = jv.as_object().at("candles").as_array(); + + if (candle_list.size() == 0) { + ec = json::make_error_code(boost::json::error::size_mismatch); + return; + } + + // The coinbase API provides the most recent values first. + for (auto it = candle_list.crbegin(); it != candle_list.crend(); it++) { + core::string_view startstr = it->as_object().at("start").as_string(); + core::string_view openstr = it->as_object().at("open").as_string(); + core::string_view closestr = it->as_object().at("close").as_string(); + + std::size_t str_size = 0; + std::time_t start_time = static_cast(std::stoll(startstr, &str_size)); + if (start_time > start_of_day) { + double open = std::stod(openstr, &str_size); + + if (receive_handler_) + receive_handler_(current_coin_, std::chrono::system_clock::from_time_t(start_time), open); + + std::cout << "Decoded historic " << current_coin_ << " price: " << open << " at " << std::chrono::system_clock::from_time_t(start_time) << std::endl; + } + } + } + catch (boost::system::system_error se) { + ec = se.code(); + } + catch (std::invalid_argument se) { + ec = json::make_error_code(boost::json::error::incomplete); + } + catch (std::out_of_range se) { + ec = json::make_error_code(boost::json::error::out_of_range); + } + } + + std::function receive_handler_; + + resolver_type resolver_; + ssl_stream_type ssl_stream_; + + buffer_type buffer_; + request_type request_; + response_type response_; + + std::vector coins_; + std::string host_; + std::atomic_flag running_; + + std::string current_coin_; + + // It is more efficient to persist the json parser so that memory allocation does not need + // to be repeated each time we docode a message + json::parser parser_; +}; + +} // end namespace boost + +#endif + diff --git a/example/websocket/client/crypto-ai-ssl/live_price_listener.cpp b/example/websocket/client/crypto-ai-ssl/live_price_listener.cpp new file mode 100644 index 0000000000..d913a28004 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/live_price_listener.cpp @@ -0,0 +1,421 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#include "processor_base.hpp" +#include "live_price_listener.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#if 0 + +using namespace boost; +using namespace std::placeholders; + +using namespace beast; // from +using namespace http; // from +using namespace websocket; // from + +namespace net = boost::asio; // from +using tcp = boost::asio::ip::tcp; // from + + +// Start the asynchronous operation +void live_price_listener::run() +{ + + + // Set SNI Hostname (many hosts need this to handshake successfully) + // Note that ws_.next_layer() references the asio::ssl::stream object. + // Note, SSL_set_tlsext_host_name is an OpenSSL C function, not + // part of asio or beast. + if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host_.c_str())) + { + system::error_code ec{ + static_cast(::ERR_get_error()), + net::error::get_ssl_category() }; + return error_handler_(ec, "SNI"); + } + + // Set the expected hostname in the peer certificate for verification. + // OpenSSL will, whenever a certificate is received, call this function + // to check that the certificate's host matches what we think it should be. + ws_.next_layer().set_verify_callback(boost::asio::ssl::host_name_verification(host_)); + + // Ensure any future callbacks do not early-exit. + // (design note: could also have been done at construction time). + active_ = true; + + // Request that ASIO lookup the domain name. For the sake of this example we have + // hard-coded the port to 443 (the usual port for this). + // For contrast with other examples, this has been written using a lambda, but + // `beast::bind_front_handler` is an equally viable alternative. + resolver_.async_resolve( + host_, + "https", + [this](error_code ec, tcp::resolver::results_type results) + { + // Note that `results` is actually an iterator into a container of + // endpoints representing all the IP addresses found by the DNS lookup. + on_resolve(ec, results); + } + ); +} + +void live_price_listener::cancel() +{ + // We set active_=false to rapidly consume all the pending + // completion handlers. + active_ = false; + + // We use net::post to ensure the websocket closure takes place after + // all the currently pending completion handlers. + net::post(strand_, [this]() + { + // If the websocket is still open, close it. + if (ws_.is_open()) + ws_.async_close(websocket::close_code::normal, + [this](error_code ec) + { + on_close(ec); + } + ); + } + ); +} + +// This is the function called when hostname resolution completes. +void live_price_listener::on_resolve(system::error_code ec, tcp::resolver::results_type results) +{ + // In the event of an error call the `cancel` function which will drain + // any pending completion handlers. In this case the websocket is not yet + // open so the `cancel` function will not attempt to close it. + if (ec) { + cancel(); + return error_handler_(ec, "resolve"); + } + + // If we have been asked to shut down then do no processing. + if (!active_) + return; + + // Set the timeout for the operation. Note that this needs to be set + // each time to reset the countdown. + // This is applied on the underlying socket because neither the ssl + // layer nor the websocket layer have been started yet. + beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); + + // Make the connection on on of the IP address we got from the lookup. + // If multiple IP addresses were found then the first one to sucessfully + // connect is used. + // ws_ has 3 layers websocket->ssl->socket + // `get_lowest_layer` returns the bottom-most socket. + beast::get_lowest_layer(ws_).async_connect( + results, + [this](error_code ec, tcp::resolver::results_type::endpoint_type ep) + { + // Note that the endpoint `ep` represents the single IP to which + // we successfully connected (if any). + on_connect(ec, ep); + } + ); +} + +// Once the underlying socket is connected, this function performs the next step, +// namely getting the SSL layer running. +void live_price_listener::on_connect(system::error_code ec, tcp::resolver::results_type::endpoint_type ep) +{ + if (ec) { + // In the event of a connection error call the `cancel` function which will drain + // any pending completion handlers. In this case the websocket is not yet + // open so the `cancel` function will not attempt to close it. + cancel(); + return error_handler_(ec, "connect"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Set the timeout for the operation. Note that this needs to be set + // each time to reset the countdown. + // This is applied on the underlying socket because neither the ssl + // layer nor the websocket layer have been started yet. + beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); + + // Update the host_ string to add the port. This will provide the value of the + // Host HTTP header during the WebSocket handshake. + // See https://tools.ietf.org/html/rfc7230#section-5.4 + host_ += ':' + std::to_string(ep.port()); + + // Perform the SSL handshake + // ws_ has 3 layers websocket->ssl->socket + // `get_next_layer` returns the ssl layer. + ws_.next_layer().async_handshake( + boost::asio::ssl::stream_base::client, + [this](error_code ec) + { + on_ssl_handshake(ec); + } + ); +} + +void live_price_listener::on_ssl_handshake(system::error_code ec) +{ + if (ec) { + // In the event of an ssl error call the `cancel` function which will drain + // any pending completion handlers. In this case the websocket is not yet + // open so the `cancel` function will not attempt to close it. + cancel(); + return error_handler_(ec, "ssl_handshake"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws_).expires_never(); + + // Set suggested timeout settings for the websocket + ws_.set_option( + websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + // We need to set the User-Agent of the handshake. Beast's websocket + // requires that this be done using a decorator. + ws_.set_option(websocket::stream_base::decorator( + [](websocket::request_type& req) + { + req.set(http::field::user_agent, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-client-async-ssl"); + }) + ); + + // The websocket should use deflate where possible, to reduce bandwidth + // by compressing the messages on the wire. + // This requires that zlib be included as a dependency. + websocket::permessage_deflate opt; + opt.client_enable = true; // for clients + opt.server_enable = true; // for servers + ws_.set_option(opt); + + // Perform the websocket handshake + ws_.async_handshake(host_, "/", + [this](error_code ec) + { + on_handshake(ec); + } + ); +} + +// This is the function that is called when the websocket is up and usable. +// The previous steps were relatively generic across all websocket connections, +// and from this point on we need to include business logic. +void live_price_listener::on_handshake(system::error_code ec) +{ + if (ec) { + cancel(); + return error_handler_(ec, "handshake"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Construct a coinbase json subscription message, using Boost::json + json::value jv = { + { "type", "subscribe" }, + { "product_ids", json::array(coins_.cbegin(), coins_.cend()) }, + { "channels", json::array{ + "heartbeat", + "ticker_batch" } + } + }; + + // Convert the json object into a string. + subscribe_json_str_ = serialize(jv); + + // Send the subscription message to the server. + ws_.async_write( + net::buffer(subscribe_json_str_), + [this](error_code ec, std::size_t bytes_transferred) + { + on_write(ec, bytes_transferred); + } + ); +} + +void live_price_listener::on_write( + system::error_code ec + , std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // Check for errors. + if (ec) { + cancel(); + return error_handler_(ec, "write"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Read a message into our buffer. Note that buffer_ is a member variable as it has + // to persist for the life of the read and until the on_read completion handler is + // finished. + ws_.async_read( + buffer_, + [this](error_code ec, std::size_t bytes_transferred) + { + on_read(ec, bytes_transferred); + } + ); +} + +void live_price_listener::on_read(system::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == websocket::error::closed) { + cancel(); + return; + } + else if (ec) { + cancel(); + if (active_) return error_handler_(ec, "read"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // The asynchronous read performs its own commit() on the dynamic buffer, thus the readable + // section of the dynamic buffer contains the message we want to decode. + asio::const_buffer buf(buffer_.cdata()); + + // We convert the const_buffer buf into a string_view, and then parse the json string itself. + // Note that an alternative would be to use beast::buffers_to_string but that would + // perform an additional allocation. + parse_json(core::string_view(static_cast(buf.data()), buf.size())); + + std::cout << "Interim: " << beast::make_printable(buffer_.data()) << "\n\n" << std::endl; + + //receive_handler_(beast::buffers_to_string(buffer_.data())); + + // Erase the const section of the dynamic buffer. + // Note: the clear() function does not deallocate so the capactity of the flat_buffer is + // unchanged, preventing the need for a reallocation each time a message is received. + buffer_.clear(); + + testing_count++; + if (testing_count > 20) cancel(); + + // This is a very common idiom in async programming. As soon as a read completes, we + // initiate another asynchronous read, almost like an infinite loop. + ws_.async_read( + buffer_, + [this](error_code ec, std::size_t bytes_transferred) + { + on_read(ec, bytes_transferred); + } + ); +} + + + +void live_price_listener::parse_json(core::string_view str) +{ + parser_.reset(); + + boost::system::error_code ec; + parser_.write(str, ec); + + if (ec) + return error_handler_(ec, "json_price_decoder::parse_json"); + + json::value jv(parser_.release()); + + core::string_view productstr; + core::string_view timestr; + + double price = 0; + + // Design note: the json parsing could be done without using the exception + // interface, but the resulting code would be considerably more verbose. + try { + if (jv.as_object().at("type").as_string() != "ticker") + return; + + productstr = jv.as_object().at("product_id").as_string(); + + core::string_view pricestr = jv.as_object().at("price").as_string(); + + timestr = jv.as_object().at("time").as_string(); + + std::size_t str_size = 0; + price = std::stod(pricestr, &str_size); + } + catch (boost::system::system_error se) { + return error_handler_(ec, "json_price_decoder::parse_json parse failure"); + } + catch (std::invalid_argument se) { + return error_handler_(ec, "json_price_decoder::parse_json parse failure"); + } + catch (std::out_of_range se) { + return error_handler_(ec, "json_price_decoder::parse_json parse failure"); + } + + // As timestr is a *UTC* string, we want to generate a chrono::system_clock::time_point + // representing the UTC time. + posix_time::ptime ptime = posix_time::from_iso_extended_string(timestr); + std::time_t epoch_time = posix_time::to_time_t(ptime); + const auto price_time = std::chrono::system_clock::from_time_t(epoch_time); + + if (receive_handler_) + receive_handler_(productstr, price_time, price); + + std::cout << "Decoded live " << productstr << " price: " << price << " at " << price_time << std::endl; + + //std::this_thread::sleep_for(std::chrono::milliseconds(10*1000)); +} + +void live_price_listener::on_close(system::error_code ec) +{ + if (ec && ec != boost::asio::ssl::error::stream_truncated) + return error_handler_(ec, "close"); + + // If we get here then the connection is closed gracefully + + // The make_printable() function helps print a ConstBufferSequence + std::cout << "Final buffer content:" << beast::make_printable(buffer_.data()) << std::endl; +} + +#endif diff --git a/example/websocket/client/crypto-ai-ssl/live_price_listener.hpp b/example/websocket/client/crypto-ai-ssl/live_price_listener.hpp new file mode 100644 index 0000000000..e170865221 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/live_price_listener.hpp @@ -0,0 +1,539 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_EXAMPLE_LIVE_PRICE_LISTENER +#define BOOST_BEAST_EXAMPLE_LIVE_PRICE_LISTENER + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "processor_base.hpp" + +namespace boost +{ + +// Opens a websocket and subsscribes to price ticks +template +class live_price_listener : public processor_base +{ + // This holds the function called when a live price is received. + std::function receive_handler_; + + // This holds the function called when an error happens. + std::function error_handler_; + + // Note that the websocket uses composed operations internally, as well as having internal + // timers. Thus this executor needs to be an (implicit or explicit) strand, which + // guarantees that no two completion handlers will be run simultaneously (from differnt threads). + Executor& exec_; + + // The resolver's role is to perform DNS lookups from a hostname to a set of ip addresses. + asio::ip::tcp::resolver resolver_; + + // The key structure in this listener is the websocket itself. + boost::beast::websocket::stream> ws_; + + // Any calls to async_read need to be passed a buffer into which the response will + // be written. That buffer needs to persist until after the read completes and the + // completion handler is called. + boost::beast::flat_buffer buffer_; + + // The subscription message needs to persist until the asynchronous operation initiated + // by ws_.async_write() completes. Thus it is held here as a member. + std::string subscribe_json_str_; + + // The host will be used at multiple stages during the websocket's setup process. + std::string host_; + + // A list of coins that we want to get the prices for. + std::vector coins_; + + // Provide a mechanism to exit the websocket subscription. When active_ is false, + // no more asynchronous calls will be made, and every completion handler called will exit + // immediately. + bool active_; + + // It is more efficient to persist the json parser so that memory allocation does not need + // to be repeated each time we docode a message + boost::json::parser parser_; + + int testing_count = 0; + +public: + // It is worth noting that we do not retain the reference to the passed-in io_context. + // The reason for this is that we use a strand to ensure the + explicit + live_price_listener( + Executor& exec + , asio::ssl::context& ctx + , string_view host + , const std::vector& coins + , std::function receive_handler + , std::function err_handler) + : receive_handler_(receive_handler) + , error_handler_(err_handler) + , exec_(exec) + , resolver_(exec_) + , ws_(exec_, ctx) + , host_(host) + , coins_(coins) + , active_(false) + { + // For this example hard-code the host. + //host_ = "ws-feed-public.sandbox.exchange.coinbase.com"; + host_ = "ws-feed.exchange.coinbase.com"; + } + + // Start the asynchronous operation + void + run(); + + void + cancel() override; + +private: + void + on_resolve( + system::error_code ec, + asio::ip::tcp::resolver::results_type results); + + void + on_connect( + system::error_code ec, + asio::ip::tcp::resolver::results_type::endpoint_type ep); + + void + on_ssl_handshake(system::error_code ec); + + void + on_handshake(system::error_code ec); + + void + on_write( + system::error_code ec, + std::size_t bytes_transferred); + + void + on_read( + system::error_code ec, + std::size_t bytes_transferred); + + void parse_json( + boost::core::string_view str); + + void + on_close(system::error_code ec); +}; + +// Start the asynchronous operation +template +void live_price_listener::run() +{ + + + // Set SNI Hostname (many hosts need this to handshake successfully) + // Note that ws_.next_layer() references the asio::ssl::stream object. + // Note, SSL_set_tlsext_host_name is an OpenSSL C function, not + // part of asio or beast. + if (!SSL_set_tlsext_host_name(ws_.next_layer().native_handle(), host_.c_str())) + { + system::error_code ec{ + static_cast(::ERR_get_error()), + asio::error::get_ssl_category() }; + return error_handler_(ec, "SNI"); + } + + // Set the expected hostname in the peer certificate for verification. + // OpenSSL will, whenever a certificate is received, call this function + // to check that the certificate's host matches what we think it should be. + ws_.next_layer().set_verify_callback(asio::ssl::host_name_verification(host_)); + + // Ensure any future callbacks do not early-exit. + // (design note: could also have been done at construction time). + active_ = true; + + // Request that ASIO lookup the domain name. For the sake of this example we have + // hard-coded the port to 443 (the usual port for this). + // For contrast with other examples, this has been written using a lambda, but + // `beast::bind_front_handler` is an equally viable alternative. + resolver_.async_resolve( + host_, + "https", + [this](system::error_code ec, asio::ip::tcp::resolver::results_type results) + { + // Note that `results` is actually an iterator into a container of + // endpoints representing all the IP addresses found by the DNS lookup. + on_resolve(ec, results); + } + ); +} + +template +void live_price_listener::cancel() +{ + // We set active_=false to rapidly consume all the pending + // completion handlers. + active_ = false; + + // We use asio::post to ensure the websocket closure takes place after + // all the currently pending completion handlers. + asio::post(exec_, [this]() + { + // If the websocket is still open, close it. + if (ws_.is_open()) + ws_.async_close(beast::websocket::close_code::normal, + [this](system::error_code ec) + { + on_close(ec); + } + ); + } + ); +} + +// This is the function called when hostname resolution completes. +template +void live_price_listener::on_resolve(system::error_code ec, asio::ip::tcp::resolver::results_type results) +{ + // In the event of an error call the `cancel` function which will drain + // any pending completion handlers. In this case the websocket is not yet + // open so the `cancel` function will not attempt to close it. + if (ec) { + cancel(); + return error_handler_(ec, "resolve"); + } + + // If we have been asked to shut down then do no processing. + if (!active_) + return; + + // Set the timeout for the operation. Note that this needs to be set + // each time to reset the countdown. + // This is applied on the underlying socket because neither the ssl + // layer nor the websocket layer have been started yet. + beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); + + // Make the connection on on of the IP address we got from the lookup. + // If multiple IP addresses were found then the first one to sucessfully + // connect is used. + // ws_ has 3 layers websocket->ssl->socket + // `get_lowest_layer` returns the bottom-most socket. + beast::get_lowest_layer(ws_).async_connect( + results, + [this](system::error_code ec, asio::ip::tcp::resolver::results_type::endpoint_type ep) + { + // Note that the endpoint `ep` represents the single IP to which + // we successfully connected (if any). + on_connect(ec, ep); + } + ); +} + +// Once the underlying socket is connected, this function performs the next step, +// namely getting the SSL layer running. +template +void live_price_listener::on_connect(system::error_code ec, asio::ip::tcp::resolver::results_type::endpoint_type ep) +{ + if (ec) { + // In the event of a connection error call the `cancel` function which will drain + // any pending completion handlers. In this case the websocket is not yet + // open so the `cancel` function will not attempt to close it. + cancel(); + return error_handler_(ec, "connect"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Set the timeout for the operation. Note that this needs to be set + // each time to reset the countdown. + // This is applied on the underlying socket because neither the ssl + // layer nor the websocket layer have been started yet. + beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); + + // Update the host_ string to add the port. This will provide the value of the + // Host HTTP header during the WebSocket handshake. + // See https://tools.ietf.org/html/rfc7230#section-5.4 + host_ += ':' + std::to_string(ep.port()); + + // Perform the SSL handshake + // ws_ has 3 layers websocket->ssl->socket + // `get_next_layer` returns the ssl layer. + ws_.next_layer().async_handshake( + asio::ssl::stream_base::client, + [this](system::error_code ec) + { + on_ssl_handshake(ec); + } + ); +} + +template +void live_price_listener::on_ssl_handshake(system::error_code ec) +{ + if (ec) { + // In the event of an ssl error call the `cancel` function which will drain + // any pending completion handlers. In this case the websocket is not yet + // open so the `cancel` function will not attempt to close it. + cancel(); + return error_handler_(ec, "ssl_handshake"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws_).expires_never(); + + // Set suggested timeout settings for the websocket + ws_.set_option( + beast::websocket::stream_base::timeout::suggested( + beast::role_type::client)); + + // We need to set the User-Agent of the handshake. Beast's websocket + // requires that this be done using a decorator. + ws_.set_option(beast::websocket::stream_base::decorator( + [](beast::websocket::request_type& req) + { + req.set(beast::http::field::user_agent, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-client-async-ssl"); + }) + ); + + // The websocket should use deflate where possible, to reduce bandwidth + // by compressing the messages on the wire. + // This requires that zlib be included as a dependency. + beast::websocket::permessage_deflate opt; + opt.client_enable = true; // for clients + opt.server_enable = true; // for servers + ws_.set_option(opt); + + // Perform the websocket handshake + ws_.async_handshake(host_, "/", + [this](system::error_code ec) + { + on_handshake(ec); + } + ); +} + +// This is the function that is called when the websocket is up and usable. +// The previous steps were relatively generic across all websocket connections, +// and from this point on we need to include business logic. +template +void live_price_listener::on_handshake(system::error_code ec) +{ + if (ec) { + cancel(); + return error_handler_(ec, "handshake"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Construct a coinbase json subscription message, using Boost::json + json::value jv = { + { "type", "subscribe" }, + { "product_ids", json::array(coins_.cbegin(), coins_.cend()) }, + { "channels", json::array{ + "heartbeat", + "ticker_batch" } + } + }; + + // Convert the json object into a string. + subscribe_json_str_ = serialize(jv); + + // Send the subscription message to the server. + ws_.async_write( + asio::buffer(subscribe_json_str_), + [this](system::error_code ec, std::size_t bytes_transferred) + { + on_write(ec, bytes_transferred); + } + ); +} + +template +void live_price_listener::on_write( + system::error_code ec + , std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // Check for errors. + if (ec) { + cancel(); + return error_handler_(ec, "write"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // Read a message into our buffer. Note that buffer_ is a member variable as it has + // to persist for the life of the read and until the on_read completion handler is + // finished. + ws_.async_read( + buffer_, + [this](system::error_code ec, std::size_t bytes_transferred) + { + on_read(ec, bytes_transferred); + } + ); +} + +template +void live_price_listener::on_read(system::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == beast::websocket::error::closed) { + cancel(); + return; + } + else if (ec) { + cancel(); + if (active_) return error_handler_(ec, "read"); + } + + // If we have been asked to shut down then do no further processing. + if (!active_) + return; + + // The asynchronous read performs its own commit() on the dynamic buffer, thus the readable + // section of the dynamic buffer contains the message we want to decode. + asio::const_buffer buf(buffer_.cdata()); + + // We convert the const_buffer buf into a string_view, and then parse the json string itself. + // Note that an alternative would be to use beast::buffers_to_string but that would + // perform an additional allocation. + parse_json(core::string_view(static_cast(buf.data()), buf.size())); + + std::cout << "Interim: " << beast::make_printable(buffer_.data()) << "\n\n" << std::endl; + + //receive_handler_(beast::buffers_to_string(buffer_.data())); + + // Erase the const section of the dynamic buffer. + // Note: the clear() function does not deallocate so the capactity of the flat_buffer is + // unchanged, preventing the need for a reallocation each time a message is received. + buffer_.clear(); + + testing_count++; + if (testing_count > 20) cancel(); + + // This is a very common idiom in async programming. As soon as a read completes, we + // initiate another asynchronous read, almost like an infinite loop. + ws_.async_read( + buffer_, + [this](system::error_code ec, std::size_t bytes_transferred) + { + on_read(ec, bytes_transferred); + } + ); +} + + + +template +void live_price_listener::parse_json(core::string_view str) +{ + parser_.reset(); + + system::error_code ec; + parser_.write(str, ec); + + if (ec) + return error_handler_(ec, "json_price_decoder::parse_json"); + + json::value jv(parser_.release()); + + core::string_view productstr; + core::string_view timestr; + + double price = 0; + + // Design note: the json parsing could be done without using the exception + // interface, but the resulting code would be considerably more verbose. + try { + if (jv.as_object().at("type").as_string() != "ticker") + return; + + productstr = jv.as_object().at("product_id").as_string(); + + core::string_view pricestr = jv.as_object().at("price").as_string(); + + timestr = jv.as_object().at("time").as_string(); + + std::size_t str_size = 0; + price = std::stod(pricestr, &str_size); + } + catch (system::system_error se) { + return error_handler_(ec, "json_price_decoder::parse_json parse failure"); + } + catch (std::invalid_argument se) { + return error_handler_(ec, "json_price_decoder::parse_json parse failure"); + } + catch (std::out_of_range se) { + return error_handler_(ec, "json_price_decoder::parse_json parse failure"); + } + + // As timestr is a *UTC* string, we want to generate a chrono::system_clock::time_point + // representing the UTC time. + posix_time::ptime ptime = posix_time::from_iso_extended_string(timestr); + std::time_t epoch_time = posix_time::to_time_t(ptime); + const auto price_time = std::chrono::system_clock::from_time_t(epoch_time); + + if (receive_handler_) + receive_handler_(productstr, price_time, price); + + std::cout << "Decoded live " << productstr << " price: " << price << " at " << price_time << std::endl; + + //std::this_thread::sleep_for(std::chrono::milliseconds(10*1000)); +} + +template +void live_price_listener::on_close(system::error_code ec) +{ + if (ec && ec != asio::ssl::error::stream_truncated) + return error_handler_(ec, "close"); + + // If we get here then the connection is closed gracefully + + // The make_printable() function helps print a ConstBufferSequence + std::cout << "Final buffer content:" << beast::make_printable(buffer_.data()) << std::endl; +} + +} + +#endif diff --git a/example/websocket/client/crypto-ai-ssl/price_store.hpp b/example/websocket/client/crypto-ai-ssl/price_store.hpp new file mode 100644 index 0000000000..e173744747 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/price_store.hpp @@ -0,0 +1,126 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_EXAMPLE_PRICE_STORE_H +#define BOOST_BEAST_EXAMPLE_PRICE_STORE_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + + +// Opens a websocket and subsscribes to price ticks +class price_store +{ + std::function update_handler_; + +public: + struct price_entry { + std::chrono::system_clock::time_point time_; + double price_; + }; + +private: + // As we wish to support one thread posting prices and one thread reading prices, + // without having a thread-safe vector and without copying the entire vector each + // time, and given we know we have a limited volume of data, we can adopt a + // "double-buffer" technique (and accept the memory hit). + std::map< + std::string, + std::pair, + std::vector > > entries_; + + std::mutex mutex_; + +public: + explicit + price_store( + const std::vector& coins + , std::function update_handler + ) + : update_handler_(update_handler) + { + const std::lock_guard guard(mutex_); + // Prepopulate the map. + // Note: this could, as an alternative design, be done "lazily" + // as prices come in. + for (auto& s : coins) { + auto rv = entries_.emplace(std::piecewise_construct, std::forward_as_tuple(s), std::make_tuple()); + rv.first->second.first.reserve(60 * 24); + rv.first->second.second.reserve(60 * 24); + } + } + + void + post(const std::string& coin, + std::chrono::system_clock::time_point time, + double price) + { + const std::lock_guard guard(mutex_); + bool update_callback_required = false; + auto it = entries_.find(coin); + if (it == entries_.end()) { + // Unsupported coin - do not record the price + } + else if (it->second.first.size() == 0) { + it->second.first.emplace_back(time, price); + update_callback_required = true; + } + else { + // We limit the stored prices to one every 5 minutes. + static const auto one_minute = + std::chrono::duration_cast( + std::chrono::seconds(60)); + auto gap = time - it->second.first.back().time_; + if (gap >= one_minute) { + it->second.first.emplace_back(time, price); + update_callback_required = true; + } + } + if (update_handler_ && update_callback_required) + update_handler_(coin); + return; + } + + const std::vector& + get(const std::string &coin) + { + const std::lock_guard guard(mutex_); + static const std::vector empty; + + auto pos = entries_.find(coin); + if (pos != entries_.end()) { + std::vector& v1 = pos->second.first; + std::vector& v2 = pos->second.second; + // Add the missing entries into v2. + v2.reserve(v1.capacity()); + for (std::size_t i = v2.size(); i < v1.size(); i++) { + v2.push_back(v1[i]); + } + return v2; + } + else + return empty; + } +}; + +#endif + diff --git a/example/websocket/client/crypto-ai-ssl/processor_base.hpp b/example/websocket/client/crypto-ai-ssl/processor_base.hpp new file mode 100644 index 0000000000..ba7ebdf021 --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/processor_base.hpp @@ -0,0 +1,28 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#ifndef BOOST_BEAST_EXAMPLE_PROCESSOR_BASE_H +#define BOOST_BEAST_EXAMPLE_PROCESSOR_BASE_H + +class processor_base { + +public: + + enum class input_type { + LIVE, + HISTORIC + }; + + virtual void cancel() = 0; + + virtual ~processor_base() {} +}; + +#endif + diff --git a/example/websocket/client/crypto-ai-ssl/websocket_client_crypto_ai_ssl.cpp b/example/websocket/client/crypto-ai-ssl/websocket_client_crypto_ai_ssl.cpp new file mode 100644 index 0000000000..731bd5123c --- /dev/null +++ b/example/websocket/client/crypto-ai-ssl/websocket_client_crypto_ai_ssl.cpp @@ -0,0 +1,395 @@ +// +// Copyright (c) 2025 Mungo Gill +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/boostorg/beast +// + +#if 1 + +#include "example/common/root_certificates.hpp" + +#include "historic_price_fetcher.hpp" +#include "live_price_listener.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "price_store.hpp" + +#include +#include + + +using namespace boost; +using namespace std::placeholders; + +using namespace beast; // from +using namespace http; // from +using namespace websocket; // from + +namespace net = boost::asio; // from +using tcp = boost::asio::ip::tcp; // from + +// Report a failure +void +fail(system::error_code ec, char const* what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +//------------------------------------------------------------------------------ + +int main(int argc, char** argv) +{ + // Check command line arguments. + if (argc != 2 && argc != 3) + { + std::cerr << + "Usage: " << *argv << " [] \n" << + "Example:\n" << + " " << *argv << " 'BTC-USD,ETH-USD' \n" << + " " << *argv << " 'BTC-USD,ETH-USD' ABC-DEF-GHI-JKL\n"; + return EXIT_FAILURE; + } + + std::string coin_list_str(argv[1]); + std::vector coins; + + char_separator separator(", "); + tokenizer> tokens(coin_list_str, separator); + for (auto it = tokens.begin(); it != tokens.end(); it++) { + coins.push_back(*it); + }; + + // The SSL context is required, and holds certificates + ssl::context ssl_ctx{ ssl::context::tlsv12_client }; + + // Verify the remote server's certificate + ssl_ctx.set_verify_mode(ssl::verify_peer); + + // This holds the root certificate used for verification + load_root_certificates(ssl_ctx); + + auto decoded_recv = [](const std::string& symbol, double price) { + std::cout << "Decoded Recv" << symbol << ":" << price << "\n" << std::endl; + }; + + auto price_store_update_recv = [](const std::string&) { + + }; + + price_store store(coins, price_store_update_recv); + + auto live_input_recv = [&store](const std::string& coin, + std::chrono::system_clock::time_point time, + double price) { + store.post(coin, time, price); + }; + + auto historic_input_recv = [&store](const std::string& coin, + std::chrono::system_clock::time_point time, + double price) { + store.post(coin, time, price); + }; + + // The io_context is required for all I/O + net::io_context listen_ioc; + + // Construct and start a the fetcher of historic prices. + asio::strand strand = asio::make_strand(listen_ioc); + + std::string host = "api.coinbase.com"; + + auto fetcher = boost::historic_fetcher( + strand, + ssl_ctx, + host, + historic_input_recv); + + fetcher.async_historic_fetch( + coins, + [](boost::system::error_code ec) + { + if (ec.failed()) + { + fail(ec, "async_historic_fetch"); + } + }); + //historic_price_fetcher historic_fetcher(listen_ioc, ssl_ctx, coins, historic_input_recv, fail); + //historic_fetcher.run(); + + // Now we run the event loop until all historic prices have been received. + listen_ioc.run(); + + // Skip the live stuff until we get the changes to historic pricing working. + //return EXIT_SUCCESS; + + // Construct and start a the websocket listener. + // For this example hard-code the host. + //host_ = "ws-feed-public.sandbox.exchange.coinbase.com"; + std::string ws_host = "ws-feed.exchange.coinbase.com"; + live_price_listener listen_worker(listen_ioc, ssl_ctx, ws_host, coins, live_input_recv, fail); + listen_worker.run(); + + // Restart the event loop. The run() call will return when + // the socket is closed. + listen_ioc.restart(); + listen_ioc.run(); + + return EXIT_SUCCESS; +} + +#endif + +#if 0 +// Type your code here, or load an example. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace boost { + + template + class historic_fetcher_op + { + enum class state { + starting, + resolving, + connecting, + ssl_handshaking, + writing, + reading, + error, + complete + }; + + using resolver_type = asio::ip::basic_resolver< + asio::ip::tcp, Executor>; + + using tcp_stream_type = beast::basic_stream< + asio::ip::tcp, + Executor>; + + using ssl_stream_type = boost::asio::ssl::stream; + + std::unique_ptr resolver_p_; + std::unique_ptr ssl_stream_p_; + + std::string host_; + state state_; + + public: + explicit + historic_fetcher_op( + Executor& exec + , boost::asio::ssl::context& ctx) + : state_(state::starting) + { + // For this example use a hard-coded host name. + // In reality this would be stored in some form of configuration. + //host_ = "api.coinbase.com"; + host_ = "www.boost.org"; + //host_ = "ws-feed.exchange.coinbase.com"; + + resolver_p_ = std::make_unique(exec); + ssl_stream_p_ = std::make_unique(exec, ctx); + + // Set the expected hostname in the peer certificate for verification + + ssl_stream_p_->set_verify_callback(boost::asio::ssl::host_name_verification(host_)); + } + + template + void operator()( + Self& self + , system::error_code ec = {}) + { + if (ec) { + state_ = state::error; + return self.complete(ec); + } + + switch (state_) { + case state::starting: { + // Set SNI Hostname (many hosts need this to handshake successfully) + if (!SSL_set_tlsext_host_name(ssl_stream_p_->native_handle(), host_.c_str())) + { + system::error_code ssl_ec{ + static_cast(::ERR_get_error()), + asio::error::get_ssl_category() }; + state_ = state::error; + return self.complete(ssl_ec); + } + + // Look up the domain name + state_ = state::resolving; + resolver_p_->async_resolve( + host_, + "443", + std::move(self) + ); + } break; + case state::ssl_handshaking: { + // TODO: If there are any coins left to request, request one. + } break; + default: { + // This should not happen. + throw std::logic_error("unreachable"); + } + } + }; + + template + void operator()( + Self& self + , system::error_code ec + , asio::ip::tcp::resolver::results_type results) + { + if (ec) { + state_ = state::error; + return self.complete(ec); + } + + switch (state_) { + case state::resolving: { + // Set a timeout on the operation + beast::get_lowest_layer(*ssl_stream_p_).expires_after(std::chrono::seconds(30)); + + // Make the connection on the IP address we get from a lookup + state_ = state::connecting; + for (auto& r : results) { + std::cout << r.endpoint() << std::endl; + } + beast::get_lowest_layer(*ssl_stream_p_).async_connect( + results, + std::move(self) + ); + std::cout << "async_connect has been called" << std::endl; + } break; + default: { + // This should not happen. + throw std::logic_error("unreachable"); + } + } + }; + + template + void operator()( + Self& self + , system::error_code ec + , asio::ip::tcp::resolver::results_type::endpoint_type ep) + { + boost::ignore_unused(ep); + + if (ec) { + std::cout << "Connection failed" << std::endl; + state_ = state::error; + return self.complete(ec); + } + + switch (state_) { + case state::connecting: { + // Set a timeout on the operation + std::cout << "Connection succeeded" << std::endl; + + // This is where the ssl handshaking stuff will go if we + // can work out why the connection is failing. + + self.complete(ec); + } break; + default: { + // This should not happen. + throw std::logic_error("unreachable"); + } + } + }; + + }; + + template< + class Executor, + BOOST_ASIO_COMPLETION_TOKEN_FOR(void( + system::error_code)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void( + system::error_code)) + async_historic_fetch( + Executor& exec + , boost::asio::ssl::context& ssl_ctx + , CompletionToken&& token) + { + return asio::async_compose< + CompletionToken, + void(system::error_code)>( + historic_fetcher_op( + exec + , ssl_ctx) + , token + , exec); + } + +} // end namespace boost + +using namespace boost; + +int main() +{ + // The SSL context is required, and holds certificates + asio::ssl::context ssl_ctx{ asio::ssl::context::tlsv12_client }; + + // Verify the remote server's certificate + ssl_ctx.set_verify_mode(asio::ssl::verify_peer); + + // The io_context is required for all I/O + asio::io_context listen_ioc; + + // Construct and start a the fetcher of historic prices. + //asio::strand strand = asio::make_strand(listen_ioc); + auto executor = listen_ioc.get_executor(); + boost::async_historic_fetch( + executor, + ssl_ctx, + [](boost::system::error_code ec) + { + if (ec.failed()) + { + std::cerr << ec.message() << "\n"; + } + }); + + // Now we run the event loop until all historic prices have been received. + listen_ioc.run(); +} +#endif