// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved. // Licensed under the Apache License, Version 2.0 (the "License"); // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "proto_message_comp.h" #include #include #include #include #include #include "boost/regex.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" #include "singleton.h" #include "debug.h" #include "rest.h" #include "config.h" #include "cache.h" #include "messaging/http_core.h" #include "http_encoder.h" #include "http_decoder.h" #include "agent_details.h" #include "messaging_buffer/http_request_event.h" #include "boost/asio.hpp" #include "message_metric.h" #include "smart_bio.h" #include "connkey.h" #include "agent_core_utilities.h" using namespace std; using namespace chrono; using namespace smartBIO; USE_DEBUG_FLAG(D_COMMUNICATION); USE_DEBUG_FLAG(D_HTTP_REQUEST); static string tagToString(MessageTypeTag tag) { switch(tag) { case MessageTypeTag::GENERIC: return "generic"; case MessageTypeTag::LOG: return "log"; case MessageTypeTag::DEBUG: return "debug"; case MessageTypeTag::METRIC: return "metric"; case MessageTypeTag::REPORT: return "report"; case MessageTypeTag::WAAP_LEARNING: return "waap learning"; case MessageTypeTag::INTELLIGENCE: return "intelligence"; case MessageTypeTag::BUFFERED_MESSAGES: return "buffered messages"; case MessageTypeTag::COUNT: break; } dbgAssert(false) << "Unsupported message type tag " << static_cast(tag); return ""; } class MessageConnection { public: MessageConnection(MessageConnection &&other); ~MessageConnection(); // To prevent wrong usage MessageConnection() = delete; MessageConnection(const MessageConnection &other) = delete; MessageConnection& operator=(const MessageConnection &other) = delete; MessageConnection& operator=(MessageConnection &&other) = delete; static Maybe startNewConnection( const string &host, const uint16_t port, const bool _is_secure, MessageTypeTag _tag, const bool over_proxy = false, bool _is_external = false, bool is_ssl_ignore_validation = false ); bool lock(); bool unlock(); bool sendData(const string &data) const; template Maybe receiveResponse(I_MessageDecoder &decoder); Maybe reconnect(bool should_lock = true); const string & getHost() const { return host; } uint16_t getPort() const { return port_num; } MessageTypeTag getTag() const { return tag; } bool shouldYieldOnFailure() const { return should_yield_on_failure; } bool overProxy() const { return over_proxy; } bool isSecure() const { return is_secure; } bool isExternal() const { return is_external; } bool isReady() const; void setShouldYieldOnFailure(const bool should_yield) { should_yield_on_failure = should_yield; } void waitForQueue(); void releaseQueue(); ostream & print(ostream &os) const; static I_MainLoop *mainloop; static I_TimeGet *timer; static I_Encryptor *encryptor; static string proxy_host; static uint16_t proxy_port; static string proxy_auth; private: MessageConnection( const string &_host, const uint16_t _port, bool _is_secure, const bool _over_proxy, bool _is_external, MessageTypeTag _tag, bool _is_ssl_ignore_validation = false ) : is_secure(_is_secure), is_external(_is_external), over_proxy(_over_proxy), host(_host), port_num(_port), current_messaging_queue_pos(0), available_messaging_queue_pos(0), tag(_tag), is_ssl_ignore_validation(_is_ssl_ignore_validation) {} Maybe doHandshake(const BioUniquePtr &bio); bool verifyCert(); bool encrypt(); bool setCnVerification(); bool setCtx(); bool setSocket(); bool connect(const string &host, const string &overwrite_port); bool shouldIgnoreSslValidation()const; Maybe calculatePublicKey(const BioUniquePtr &cert) const; Maybe getPinnedCertificate(); bool verifyCertPinning(const BioUniquePtr &cert); Maybe establishConnection(); Maybe establishConnectionOverProxy(); void getIpFromHostname(const string &hostname); static string printData(const string &data); static uint getConnectionTimeout(); bool is_secure; bool is_external; bool over_proxy; bool connection_lock = false; bool should_yield_on_failure = false; string ca_chain_dir; string host; string pinned_cert_pub_key; uint16_t port_num; uint64_t current_messaging_queue_pos = 0; uint64_t available_messaging_queue_pos = 0; MessageTypeTag tag; string filesystem_prefix = ""; vector current_ips; bool is_ssl_ignore_validation; uint connection_closed_count = 0; public: SSL *ssl_socket = nullptr; BioUniquePtr ssl_ctx = nullptr; BioUniquePtr bio = nullptr; static uint64_t metrics_current_size; }; I_MainLoop * MessageConnection::mainloop = nullptr; I_TimeGet * MessageConnection::timer = nullptr; I_Encryptor * MessageConnection::encryptor = nullptr; string MessageConnection::proxy_host = ""; uint16_t MessageConnection::proxy_port = 0; string MessageConnection::proxy_auth = ""; uint64_t MessageConnection::metrics_current_size = 0; class ProtoMessageComp::Impl : Singleton::Provide::From { using Method = I_Messaging::Method; using MessageConnKey = tuple; public: Impl() : active_connections() {} ~Impl() {} void init() { proxies = { {ProxyProtocol::HTTP, ProxyData()}, {ProxyProtocol::HTTPS, ProxyData()} }; initSSL(); timer = Singleton::Consume::by(); encryptor = Singleton::Consume::by(); MessageConnection::encryptor = Singleton::Consume::by(); msg_buffer = Singleton::Consume::by(); MessageConnection::timer = Singleton::Consume::by(); agent_details = Singleton::Consume::by(); agent_details->readAgentDetails(); loadAccessToken(); if (!setActiveFog()) { dbgDebug(D_COMMUNICATION) << "Could not initialize active fog connection"; } mainloop = Singleton::Consume::by(); MessageConnection::mainloop = Singleton::Consume::by(); auto cache_timeout = getConfigurationWithDefault(2, "message", "Cache timeout"); cache.startExpiration(seconds(cache_timeout), mainloop, timer); auto proxy_config = getProfileAgentSetting("agent.config.message.proxy"); if (proxy_config.ok()) { agent_details->setProxy(*proxy_config); agent_details->writeAgentDetails(); } registerConfigLoadCb( [&]() { auto proxy_config = getProfileAgentSetting("agent.config.message.proxy"); if (proxy_config.ok()) { is_proxy_configured_via_settings = true; agent_details->setProxy(*proxy_config); agent_details->writeAgentDetails(); } else if (is_proxy_configured_via_settings) { is_proxy_configured_via_settings = false; agent_details->setProxy(string("")); agent_details->writeAgentDetails(); } } ); auto load_env_proxy = loadProxy(); if (!load_env_proxy.ok()) { dbgDebug(D_COMMUNICATION) << "Could not initialize load proxy from environment, Error: " << load_env_proxy.getErr(); } Singleton::Consume::by()->addRecurringRoutine( I_MainLoop::RoutineType::System, seconds(60), [this] () { loadAccessToken(); }, "Load access token" ); auto metrics_debugs_interval = chrono::seconds(getConfigurationWithDefault( 600, "message", "Metrics Routine Interval" ) ); message_queue_metric.init( "Message queue elements", ReportIS::AudienceTeam::AGENT_CORE, ReportIS::IssuingEngine::AGENT_CORE, metrics_debugs_interval, false ); message_queue_metric.registerListener(); mainloop->addOneTimeRoutine( I_MainLoop::RoutineType::System, [&] () { while (true) { if ( agent_details->getOrchestrationMode() == OrchestrationMode::OFFLINE || handleBufferedEvents() == 0 ) { uint tmo = getConfigurationWithDefault(5, "message", "send event retry in sec"); mainloop->yield(chrono::seconds(tmo)); } else { mainloop->yield(false); } } }, "Persistent messaging stream", false ); } void fini() { MessageConnection::proxy_host = ""; MessageConnection::proxy_port = 0; MessageConnection::proxy_auth = ""; MessageConnection::encryptor = nullptr; MessageConnection::mainloop = nullptr; MessageConnection::timer = nullptr; } void loadAccessToken() { filesystem_prefix = getFilesystemPathConfig(); dbgTrace(D_COMMUNICATION) << "ProtoMessageComp, file systen prefix: " << filesystem_prefix << endl; auto data_path = getConfigurationWithDefault( filesystem_prefix + "/data/", "encryptor", "Data files directory" ); ifstream token_file(data_path + session_token_file_name); if (!token_file.is_open()) { dbgWarning(D_COMMUNICATION) << "Failed to open session token file"; return; } stringstream token_steam; token_steam << token_file.rdbuf(); auto new_token = token_steam.str(); if (access_token != new_token) { access_token = new_token; dbgTrace(D_COMMUNICATION) << "Loaded the new token"; } } // LCOV_EXCL_START Reason: No proxy for ut void setFogProxy(const string &host, const uint16_t port, ProxyProtocol proto) { dbgTrace(D_COMMUNICATION) << "Proxy was set. Proxy: " << host << ":" << port; MessageConnection::proxy_host = host; MessageConnection::proxy_port = port; auto proxy_auth = getProxyCredentials(proto); if (proxy_auth.ok()) { MessageConnection::proxy_auth = proxy_auth.unpack(); } } // LCOV_EXCL_STOP bool setActiveFog(const string &host, const uint16_t port, bool is_secure, MessageTypeTag tag) override { MessageConnKey fog_key = make_tuple("fog", 0, tag); proxy_protocol = is_secure ? ProxyProtocol::HTTPS : ProxyProtocol::HTTP; auto load_env_proxy = loadProxy(); if (!load_env_proxy.ok()) { dbgDebug(D_COMMUNICATION) << "Could not initialize load proxy from environment, Error: " << load_env_proxy.getErr(); } if (getProxyExists(proxy_protocol)) { auto proxy_host = getProxyDomain(proxy_protocol); auto proxy_port = getProxyPort(proxy_protocol); if (proxy_host.ok() && proxy_port.ok()) { setFogProxy(proxy_host.unpack(), proxy_port.unpack(), proxy_protocol); } } Maybe conn = MessageConnection::startNewConnection( host, port, is_secure, tag, getProxyExists(proxy_protocol) ); if (!conn.ok()) { dbgWarning(D_COMMUNICATION) << "Failed to establish connection to the Fog: " << conn.getErr(); return false; } if (active_connections.find(fog_key) == active_connections.end()) { active_connections.emplace(fog_key, conn.unpackMove()); } dbgInfo(D_COMMUNICATION) << "Successfully connected to the Fog: " << host << ":" << port << " via " << (getProxyExists(proxy_protocol) ? "proxy, using " : "") << (is_secure ? "secure" : "clear") << " connection"; tag_to_active_conn_key[tag] = fog_key; return true; } bool setActiveFog(MessageTypeTag tag = MessageTypeTag::GENERIC) override { string fog_host = ""; uint16_t fog_port = 0; bool is_secure_connection = false; if (agent_details->readAgentDetails()) { auto domain = agent_details->getFogDomain(); auto port = agent_details->getFogPort(); is_secure_connection = agent_details->getSSLFlag(); if (domain.ok() && port.ok()) { fog_host = domain.unpack(); fog_port = port.unpack(); } } if (agent_details->getOrchestrationMode() == OrchestrationMode::OFFLINE) { dbgDebug(D_COMMUNICATION) << "Agent Is in offline mode and would not attempt connecting to the fog"; return true; } if(fog_host.empty() || fog_port == 0) { dbgWarning(D_COMMUNICATION) << "Cannot establish connection to the Fog: " << "failed to get host and port details"; return false; } return setActiveFog( fog_host, fog_port, is_secure_connection, tag ); } string buildFogHeaders(const string &headers) { string modified_headers = headers; modified_headers += "User-Agent: Infinity Next (a7030abf93a4c13)\r\n"; auto i_env = Singleton::Consume::by(); modified_headers += i_env->getCurrentHeaders(); return modified_headers; } Maybe sendPersistentMessage( bool get_reply, const string &&body, Method method, const string &url, const string &headers, bool should_yield, MessageTypeTag tag = MessageTypeTag::GENERIC, bool skip_sending = false) override { if (agent_details->getOrchestrationMode() == OrchestrationMode::OFFLINE) { return genError("Agent is in offline mode and cannot communicate with the fog"); } string method_as_string; switch (method) { case Method::GET: { method_as_string = "GET"; break; } case Method::POST: { method_as_string = "POST"; break; } case Method::PUT: { method_as_string = "PUT"; break; } case Method::PATCH: { method_as_string = "PATCH"; break; } case Method::CONNECT: { method_as_string = "CONNECT"; break; } } HTTPRequestSignature req_sig(method_as_string, url, tagToString(tag)); bool should_buffer = false; if (pending_signatures.find(req_sig) != pending_signatures.end()) { dbgDebug(D_COMMUNICATION) << "Previous HTTP Request is already in queue. Buffering the request"; should_buffer = true; } bool is_rejected = false; if (!should_buffer && !skip_sending) { ErrorCB fog_server_err = [&] (HTTPStatusCode http_status_code) mutable { is_rejected = http_status_code == HTTPStatusCode::HTTP_PAYLOAD_TOO_LARGE || http_status_code == HTTPStatusCode::HTTP_MULTI_STATUS || http_status_code == HTTPStatusCode::HTTP_BAD_REQUEST; }; pending_signatures.insert(req_sig); try { auto res = sendMessage(get_reply, body, method, url, headers, fog_server_err, should_yield, tag); pending_signatures.erase(req_sig); if (res.ok()) return res; bool should_buffer_default = getProfileAgentSettingWithDefault( true, "eventBuffer.bufferFailedRequests" ); if (!getConfigurationWithDefault(should_buffer_default, "message", "Buffer Failed Requests")) { dbgWarning(D_COMMUNICATION) << "Failed to send Request."; return res; } } catch (...) { dbgWarning(D_COMMUNICATION) << "Can't send a persistent message, mainloop has been stopped"; return genError("mainloop has been stopped"); } dbgWarning(D_COMMUNICATION) << "Failed to send Request. Buffering the request."; } HTTPRequestEvent request_event(move(req_sig), headers, move(body)); msg_buffer->bufferNewRequest(request_event, is_rejected); return genError("HTTP Request is buffered"); } Maybe sendMessage( bool get_reply, const string &body, Method method, const string &url, const string &headers, ErrorCB err_callback, bool should_yield, MessageTypeTag tag = MessageTypeTag::GENERIC) override { bool reuse_conns = getConfigurationWithDefault(true, "message", "Reuse connection"); if (agent_details->getOrchestrationMode() == OrchestrationMode::OFFLINE) { return genError("Agent is in offline mode and cannot communicate with the fog"); } if (tag_to_active_conn_key.find(tag) == tag_to_active_conn_key.end()) { if (!setActiveFog(tag)) { dbgWarning(D_COMMUNICATION) << "Connection to fog for tag " << tagToString(tag) << " does not exist."; return genError("Cannot send message to the Fog"); } reuse_conns = true; } MessageConnection &curr_conn = active_connections.at(tag_to_active_conn_key[tag]); if (!reuse_conns) { Maybe res = curr_conn.reconnect(); if(!res.ok()) { active_connections.erase(tag_to_active_conn_key[tag]); tag_to_active_conn_key.erase(tag); return genError( "Cannot send message after failure in establishing new connection with the fog: " + res.getErr() ); } } ErrorCB fog_server_err = [this, err_callback, &curr_conn] (HTTPStatusCode http_status_code) { bool is_server_error = ( http_status_code >= HTTPStatusCode::HTTP_INTERNAL_SERVER_ERROR && http_status_code <= HTTPStatusCode::HTTP_NETWORK_AUTHENTICATION_REQUIRED ); if (is_server_error) { if (last_fog_server_error == chrono::microseconds(0)) { last_fog_server_error = timer->getMonotonicTime(); } chrono::seconds dead_fog_timeout( getConfigurationWithDefault( 300, "message", "Internal Fog error timeout" ) ); if (last_fog_server_error + chrono::microseconds(dead_fog_timeout) < timer->getMonotonicTime()) { curr_conn.reconnect(); dbgWarning(D_COMMUNICATION) << "Restarting the Fog connection after Fog error persists for more than " << dead_fog_timeout.count() << " seconds"; last_fog_server_error == chrono::microseconds(0); } } if (err_callback != nullptr) err_callback(http_status_code); }; auto fog_res = sendMessage( curr_conn, get_reply, body, method, url, buildFogHeaders(headers), fog_server_err, should_yield ); if (fog_res.ok()) last_fog_server_error = chrono::microseconds(0); return fog_res; } Maybe sendMessage( bool get_reply, const string &body, Method method, const string &host, uint16_t port, Flags &conn_flags, const string &url, const string &headers, ErrorCB err_call_back, MessageTypeTag tag = MessageTypeTag::GENERIC) override { const MessageConnKey key = make_tuple(host, port, tag); bool is_one_time_conn = conn_flags.isSet(MessageConnConfig::ONE_TIME_CONN); bool is_secure_conn = conn_flags.isSet(MessageConnConfig::SECURE_CONN); bool is_external = conn_flags.isSet(MessageConnConfig::EXTERNAL); bool is_ssl_ignore_validation = conn_flags.isSet(MessageConnConfig::IGNORE_SSL_VALIDATION); auto reuse_conns = getConfigurationWithDefault(true, "message", "Reuse connection"); if (reuse_conns) { map::iterator conn_iter = active_connections.find(key); if (conn_iter != active_connections.end()) { return sendMessage(conn_iter->second, get_reply, body, method, url, headers, err_call_back); } } auto load_env_proxy = loadProxy(); if (!load_env_proxy.ok()) return genError(load_env_proxy.getErr()); Maybe conn = MessageConnection::startNewConnection( host, port, is_secure_conn, tag, false, is_external, is_ssl_ignore_validation ); if (!conn.ok()) return conn.passErr(); if (!is_one_time_conn) { active_connections.emplace(key, conn.unpackMove()); return sendMessage( active_connections.find(key)->second, get_reply, body, method, url, headers, err_call_back ); } MessageConnection active_conn = conn.unpackMove(); return sendMessage(active_conn, get_reply, body, method, url, headers, err_call_back); } string getAccessToken() override { return access_token; } Maybe getProxyDomain(ProxyProtocol protocol) const override { if (proxies.find(protocol) == proxies.end()) { return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol)); } if (proxies.at(protocol).domain.empty()) return genError( convertProxyProtocolToString(protocol) + string(" proxy domain is unset") ); return proxies.at(protocol).domain; } Maybe getProxyCredentials(ProxyProtocol protocol) const override { if (proxies.find(protocol) == proxies.end()) { return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol)); } if (proxies.at(protocol).auth.empty()) return genError( convertProxyProtocolToString(protocol) + string(" proxy auth is unset") ); return proxies.at(protocol).auth; } Maybe getProxyPort(ProxyProtocol protocol) const override { if (proxies.find(protocol) == proxies.end()) { return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol)); } if (proxies.at(protocol).port == 0) return genError( convertProxyProtocolToString(protocol) + string(" proxy port is unset") ); return proxies.at(protocol).port; } bool getProxyExists(ProxyProtocol protocol) const override { if (proxies.find(protocol) == proxies.end()) { dbgInfo(D_COMMUNICATION) << "Proxy type is not loaded in map, type: " << convertProxyProtocolToString(protocol); return false; } return proxies.at(protocol).is_exists; } Maybe getProxyAddress(ProxyProtocol protocol) const override { if (proxies.find(protocol) == proxies.end()) { return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol)); } if (proxies.at(protocol).protocol.empty() || proxies.at(protocol).domain.empty() || proxies.at(protocol).port == 0) { return genError( string("Can't construct ") + convertProxyProtocolToString(protocol) + string(" proxy address") ); } return proxies.at(protocol).protocol + "://" + proxies.at(protocol).domain + ":" + to_string(proxies.at(protocol).port); } Maybe loadProxy() override { if (getConfigurationFlag("orchestration-mode") == "offline_mode") return Maybe(); for (const auto &proxy_type : proxies) { auto loaded_proxy = loadProxyType(proxy_type.first); if (!loaded_proxy.ok()) return loaded_proxy; } return Maybe(); } Maybe loadProxyType(ProxyProtocol protocol) { dbgAssert(protocol == ProxyProtocol::HTTP || protocol == ProxyProtocol::HTTPS) << "Unsupported Proxy Protocol " << static_cast(protocol); static const map env_var_name = { {ProxyProtocol::HTTPS, "https_proxy"}, {ProxyProtocol::HTTP, "http_proxy"} }; auto env_proxy = loadProxyType(env_var_name.at(protocol)); if (!env_proxy.ok()) return genError(env_proxy.getErr()); if (env_proxy.unpack().empty()) { return Maybe(); } string protocol_regex = "(http|https)://"; const static boost::regex no_auth_proxy_regex(protocol_regex + "(.)*:[0-9]{0,5}(/|)"); const static boost::regex auth_proxy_regex(protocol_regex + "(.)*:(.)*@(.)*:[0-9]{0,5}(/|)"); ProxyData env_proxy_data; env_proxy_data.is_exists = true; string proxy_copy; if (!NGEN::Regex::regexMatch(__FILE__, __LINE__, env_proxy.unpack(), boost::regex(protocol_regex + "(.)*"))) { env_proxy = "http://" + env_proxy.unpack(); } proxy_copy.assign(env_proxy.unpack()); env_proxy_data.protocol = env_proxy.unpack().substr(0, proxy_copy.find(":")); proxy_copy.erase(0, proxy_copy.find(":") + 3); //remove "http://" or "https://" if (NGEN::Regex::regexMatch(__FILE__, __LINE__, env_proxy.unpack(), auth_proxy_regex)) { env_proxy_data.auth = string(&proxy_copy[0], &proxy_copy[proxy_copy.find("@")]); proxy_copy.erase(0, proxy_copy.find("@") + 1); // remove "user:pass@" } else if (!NGEN::Regex::regexMatch(__FILE__, __LINE__, env_proxy.unpack(), no_auth_proxy_regex)) { return genError(string("Provided proxy has wrong syntax: ") + env_proxy.unpack()); } env_proxy_data.domain = proxy_copy.substr(0, proxy_copy.find(":")); proxy_copy.erase(0, proxy_copy.find(":") + 1); // remove "host:" env_proxy_data.port = static_cast(stoi(proxy_copy)); auto proxy_syntax = verifyProxySyntax( env_proxy_data.protocol, env_proxy_data.auth, env_proxy_data.domain, to_string(env_proxy_data.port), env_proxy.unpack() ); if (!proxy_syntax.ok()) return proxy_syntax; if (env_proxy_data == proxies.at(protocol)) { return Maybe(); } proxies.at(protocol) = env_proxy_data; dbgInfo(D_COMMUNICATION) << convertProxyProtocolToString(protocol) << " proxy was successfully loaded, " << getProxyAddress(protocol).unpack(); return Maybe(); } private: Maybe sendMessage( MessageConnection &conn, bool get_reply, const string &body, Method method, const string &url, const string &headers, ErrorCB err_call_back, bool should_yield = true) { dbgDebug(D_COMMUNICATION) << "Sending a new message"; if (conn.getHost() == "") return genError("No host provided"); if (mainloop && should_yield) mainloop->yield(false); string full_url = conn.getHost() + url; if (method == Method::GET && cache.doesKeyExists(full_url)) return cache.getEntry(full_url); while (!conn.lock()) { mainloop->yield(true); } conn.setShouldYieldOnFailure(should_yield); Maybe response = sendHTTPRequest(conn, body, method, url, headers); conn.unlock(); if (response.ok()) { auto response_data = response->getResponse(); if (response_data.ok()) { if (get_reply && method == Method::GET) cache.emplaceEntry(full_url, response_data.unpack()); } else { if (err_call_back != nullptr) err_call_back(response->getStatusCode()); } return response_data; } number_of_send_failure += 1; dbgTrace(D_COMMUNICATION) << "Number of a failed attempt to send a message " << number_of_send_failure; return response.passErr(); } Maybe sendHTTPRequest( MessageConnection &conn, const string &body, Method method, const string &url, const string &headers) { auto maybe_data = buildHTTPRequest(method, url, headers, body, conn); if (!maybe_data.ok()) { return maybe_data.passErr(); } string data = maybe_data.unpack(); uint num_of_retries = 0; const uint max_retries = 2; while (num_of_retries < max_retries) { Maybe response = sendMessage(conn, data); if (response.ok()) return response; dbgDebug(D_COMMUNICATION) << "Failed to send HTTP request, trying to restart the connection. " << "Error: " << response.getErr(); Maybe connection_result = conn.reconnect(false); number_of_reconnects += 1; dbgTrace(D_COMMUNICATION) << "Number of an attempt to reconnect is " << number_of_reconnects; if (!connection_result.ok()) { number_of_reconnect_failures += 1; dbgTrace(D_COMMUNICATION) << "Number of a failed attempt to reconnect is " << number_of_reconnect_failures; return genError( string("Failed to reconnect after send request failure. Error: ") + connection_result.getErr() ); } dbgDebug(D_COMMUNICATION) << "Successfully reconnected after a failure to send a request."; num_of_retries++; } return genError("Failed to send an HTTP request, reached the maximum number of retries " + max_retries); } Maybe sendMessage(MessageConnection &conn, const string &data) { dbgTrace(D_COMMUNICATION) << "Acquiring connection lock. Connection: " << conn; if (!conn.isReady()) { dbgTrace(D_COMMUNICATION) << "Cannot send data over uninitialized connection"; return genError("Failed to send HTTP request. The connection is uninitialized."); } conn.waitForQueue(); auto release_queue_on_exit = make_scope_exit([&conn] () { conn.releaseQueue(); }); if (conn.sendData(data)) { return getHttpResponse(conn); } return genError("Failed to send HTTP request"); } Maybe verifyProxySyntax( const string &protocol, const string &auth, const string &domain, const string &port, const string &env_proxy) { stringstream verify_string; verify_string << protocol << "://" << (!auth.empty() ? auth + string("@") : "") << domain << ":" << port << (env_proxy.back() == '/' ? "/" : ""); if (env_proxy.compare(verify_string.str()) != 0) { return genError(string("Provided proxy has the wrong syntax:" ) + env_proxy); } return Maybe(); } Maybe loadProxyType(const string &proxy_type) { agent_details->readAgentDetails(); auto proxy_config = agent_details->getProxy(); if (proxy_config.ok()) { if (proxy_config.unpack() == "none") { return Maybe(string()); } return proxy_config; } #ifdef gaia I_ShellCmd *shell_cmd = Singleton::Consume::by(); auto proxy_ip = shell_cmd->getExecOutput("dbget proxy:ip-address| tr -d '\n'"); if (!proxy_ip.ok()) return proxy_ip; auto proxy_port = shell_cmd->getExecOutput("dbget proxy:port| tr -d '\n'"); if (!proxy_port.ok()) return proxy_port; if (*proxy_port != "" && *proxy_ip != "") return ("http://" + *proxy_ip + ":" + *proxy_port); const string umis_file_path(string(getenv("CPDIR")) + "/tmp/umis_objects.C"); { ifstream umis_file(umis_file_path.c_str()); if (!umis_file.good()) return Maybe(string()); } const string read_umis_cmd = "cat " + umis_file_path + " | grep -w \""; const string parse_value_command = "\" | awk -F \"[ \\t]+\" '{printf $NF}' | tr -d \"()\""; auto use_proxy = shell_cmd->getExecOutput(read_umis_cmd + "use_proxy" + parse_value_command); if (!use_proxy.ok()) return genError("Failed to read use_proxy from " + umis_file_path + ": " + use_proxy.getErr()); if (use_proxy.unpack() == "true") { auto umis_proxy_add = shell_cmd->getExecOutput(read_umis_cmd + "proxy_address" + parse_value_command); if (!umis_proxy_add.ok() || *umis_proxy_add == "") return umis_proxy_add; auto umis_proxy_port = shell_cmd->getExecOutput(read_umis_cmd + "proxy_port" + parse_value_command); if (!umis_proxy_port.ok() || *umis_proxy_port == "") return umis_proxy_port; return ("http://" + *umis_proxy_add + ":" + *umis_proxy_port); } else { dbgTrace(D_COMMUNICATION) << "Smart Console Proxy is turned off"; } return Maybe(string()); #else // not gaia char *proxy = getenv(proxy_type.c_str()); if (proxy) return string(proxy); proxy = getenv(boost::algorithm::to_upper_copy(proxy_type).c_str()); if (proxy) return string(proxy); return Maybe(string()); #endif // gaia } string base64Decode(const string &input) const { string out; vector T(256, -1); static const string base = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; for (int i = 0; i < 64; i++) { T[base[i]] = i; } int val = 0, val_balancer = -8; for (unsigned char c : input) { if (T[c] == -1) break; val = (val << 6) + T[c]; val_balancer += 6; if (val_balancer >= 0) { out.push_back(char((val >> val_balancer) & 0xFF)); val_balancer -= 8; } } return out; } void initSSL() { SSL_library_init(); SSL_load_error_strings(); OpenSSL_add_all_algorithms(); } Maybe getHttpResponse(MessageConnection &conn) { HTTPDecoder http_decoder(Method::GET); return conn.receiveResponse(http_decoder); } Maybe buildHTTPRequest( Method _method, const string &url, const string &headers, const string &body, const MessageConnection &conn) { HTTPRequest req; HTTPEncoder http_encoder(conn.getHost(), to_string(conn.getPort())); if (conn.isSecure()) http_encoder.isOverSSL(); if (conn.overProxy()) http_encoder.isOverProxy(); switch (_method) { case Method::GET: { req = http_encoder.Get(url); break; } case Method::POST: { req = http_encoder.Post(url); break; } case Method::PATCH: { req = http_encoder.Patch(url); break; } case Method::PUT: { req = http_encoder.Put(url); break; } case Method::CONNECT: { return http_encoder.Connect().toString(); } } if (!conn.isExternal() && !access_token.empty() && headers.find("Authorization") == std::string::npos) { req.insertHeader("Authorization", "Bearer " + access_token); } // Proxy-Authorization if (conn.overProxy() && !conn.isSecure()) { if (!MessageConnection::proxy_auth.empty()) { req.insertHeader( "Proxy-Authorization", "Basic " + encryptor->base64Encode(MessageConnection::proxy_auth) ); } else { return genError("Failed to authenticate on a proxy with empty token."); } } req.insertHeader("Content-Length", to_string(body.size())); req.insertHeader("Content-type: application/json"); req.insertHeader("Accept-Encoding: identity"); if(headers.find("Connection:") == std::string::npos){ req.insertHeader("Connection: keep-alive"); } req.insertHeaders(headers); req.insertBody(body); return req.toString(); } int handleBufferedEvents() { bool should_buffer_default = getProfileAgentSettingWithDefault( true, "eventBuffer.bufferFailedRequests" ); if (!getConfigurationWithDefault( should_buffer_default, "message", "Buffer Failed Requests" ) ) { return 0; } int count = 0; Maybe event = genError("empty"); while ((event = msg_buffer->peekRequest()).ok()) { dbgTrace(D_COMMUNICATION) << "Trying to send HTTPEvent " << event.unpack().getSignature(); bool is_rejected = false; // LCOV_EXCL_START Reason: We can't check it, since we don't control the response on ut yet ErrorCB fog_server_err = [&] (HTTPStatusCode http_status_code) mutable { is_rejected = http_status_code == HTTPStatusCode::HTTP_PAYLOAD_TOO_LARGE || http_status_code == HTTPStatusCode::HTTP_MULTI_STATUS || http_status_code == HTTPStatusCode::HTTP_BAD_REQUEST; }; // LCOV_EXCL_STOP auto maybe_method = stringToMethod(event.unpack().getMethod()); if (!maybe_method.ok()) { dbgTrace(D_COMMUNICATION) << "Failed to sent the buffered request. Error: " << maybe_method.getErr(); msg_buffer->popRequest(); count++; mainloop->yield(false); continue; } auto resp = sendMessage( false, event.unpack().getBody(), *maybe_method, event.unpack().getURL(), event.unpack().getHeaders(), fog_server_err, false, MessageTypeTag::BUFFERED_MESSAGES ); if (resp.ok()) { dbgTrace(D_COMMUNICATION) << "Successfully sent the buffered request" << event.unpack().getSignature(); msg_buffer->popRequest(); count++; mainloop->yield(false); } else { if (!is_rejected) { dbgWarning(D_COMMUNICATION) << "Failed to send HTTPEvent " << event.unpack().getSignature(); return count; } msg_buffer->popRequest(); msg_buffer->bufferNewRequest(*event, is_rejected); dbgWarning(D_COMMUNICATION) << "HTTPEvent " << event.unpack().getSignature() << " was rejected"; mainloop->yield(false); } } return count; } string convertProxyProtocolToString(ProxyProtocol proto) const { switch(proto) { case ProxyProtocol::HTTP: return "http"; case ProxyProtocol::HTTPS: return "https"; } dbgAssert(false) << "Unsupported Proxy Protocol " << static_cast(proto); return ""; } Maybe stringToMethod(const string &name) { if (name == "GET") return Method::GET; if (name == "POST") return Method::POST; if (name == "PATCH") return Method::PATCH; if (name == "CONNECT") return Method::CONNECT; if (name == "PUT") return Method::PUT; return genError("Cannot convert unknown HTTP method to Enum. Method name: " + name); } class ProxyData { public: bool operator==(const ProxyData &other) const { return protocol==other.protocol && domain==other.domain && is_exists==other.is_exists && port==other.port && auth==other.auth; } string protocol = ""; string domain = ""; string auth = ""; bool is_exists = false; uint16_t port = 0; }; bool is_proxy_configured_via_settings = false; uint64_t number_of_reconnects = 0; uint64_t number_of_reconnect_failures = 0; uint64_t number_of_send_failure = 0; I_AgentDetails *agent_details = nullptr; I_MainLoop *mainloop = nullptr; I_TimeGet *timer = nullptr; I_Encryptor *encryptor = nullptr; I_MessagingBuffer *msg_buffer = nullptr; map active_connections; map tag_to_active_conn_key; map proxies; ProxyProtocol proxy_protocol; string access_token; TemporaryCache cache; static const map proxyProtocolToString; set pending_signatures; chrono::microseconds last_fog_server_error = chrono::microseconds(0); MessageQueueMetric message_queue_metric; string filesystem_prefix = ""; }; MessageConnection::MessageConnection(MessageConnection &&other) : is_secure(other.is_secure), is_external(other.is_external), over_proxy(other.over_proxy), host(other.host), pinned_cert_pub_key(other.pinned_cert_pub_key), port_num(other.port_num), current_messaging_queue_pos(other.current_messaging_queue_pos), available_messaging_queue_pos(other.available_messaging_queue_pos), tag(other.tag), is_ssl_ignore_validation(other.is_ssl_ignore_validation), connection_closed_count(other.connection_closed_count), ssl_socket(move(other.ssl_socket)), ssl_ctx(move(other.ssl_ctx)), bio(move(other.bio)) { other.ssl_socket = nullptr; other.ssl_ctx = nullptr; other.bio = nullptr; } bool MessageConnection::lock() { if (connection_lock) return false; connection_lock = true; dbgTrace(D_COMMUNICATION) << "The connection lock was taken. Connection: " << this; return true; } bool MessageConnection::unlock() { if (!connection_lock) return false; connection_lock = false; dbgTrace(D_COMMUNICATION) << "The connection lock was released. Connection: " << *this; return true; } // LCOV_EXCL_START Reason: No proxy for ut bool MessageConnection::shouldIgnoreSslValidation() const { if(is_ssl_ignore_validation) { dbgTrace(D_COMMUNICATION) << "Ignoring SSL validation"; return true; } bool ignore_ssl_validation = getProfileAgentSettingWithDefault( false, "agent.config.message.ignoreSslValidation"); if (ignore_ssl_validation) { dbgTrace(D_COMMUNICATION) << "ignoreSslValidation: " << (ignore_ssl_validation ? "true, Ignoring ssl validation of the current connection" : "false"); } return ignore_ssl_validation; } // LCOV_EXCL_START Reason: No proxy for ut bool MessageConnection::verifyCert() { dbgFlow(D_COMMUNICATION); BioUniquePtr cert = BioUniquePtr(SSL_get_peer_certificate(ssl_socket)); // In this case cert returned null from SSL_get_peer_certificate if (shouldIgnoreSslValidation()) return true; if (cert.get() == nullptr) { dbgWarning(D_COMMUNICATION) << "Server did not provide a certificate during handshake"; return false; } // Verify the result of chain verification int res = SSL_get_verify_result(ssl_socket); if (res != X509_V_OK) { dbgWarning(D_COMMUNICATION) << "Failed to verify server certificate. OpenSSL error: " << string(ERR_error_string(res, nullptr)) << ", OpenSSL error code: " << res; return false; } auto verify_pining_required = getConfigurationWithDefault(false, "message", "Verify SSL pinning"); if (verify_pining_required && !verifyCertPinning(cert)) { dbgWarning(D_COMMUNICATION) << "Couldn't verify server public certificate (pinning)"; return false; } return true; } Maybe MessageConnection::calculatePublicKey(const BioUniquePtr &cert) const { if (cert.get() == nullptr) return genError("Certificate is null"); BioUniquePtr outbio = BioUniquePtr(BIO_new(BIO_s_mem())); BioUniquePtr pkey = BioUniquePtr(X509_get_pubkey(cert.get())); if (pkey.get() == nullptr) { return genError("Error getting public key from certificate"); } if(!PEM_write_bio_PUBKEY(outbio.get(), pkey.get())) { return genError("Error writing public key data in PEM format"); } char *buf; size_t len = BIO_get_mem_data(outbio.get(), &buf); dbgTrace(D_COMMUNICATION) << "Provide public key has been loaded"; return move(string(buf, len)); } Maybe MessageConnection::getPinnedCertificate() { if (!pinned_cert_pub_key.empty()) return pinned_cert_pub_key; filesystem_prefix = getFilesystemPathConfig(); dbgTrace(D_COMMUNICATION) << "MessageConnection, file systen prefix: " << filesystem_prefix << endl; string public_key_path = getConfigurationWithDefault( filesystem_prefix + "/certs/public-key.pem", "message", "Public key path" ); dbgTrace(D_COMMUNICATION) << "Load public key path. Path: " << public_key_path; ifstream pinned_public_file(public_key_path); if (!pinned_public_file.is_open()) { return genError("Failed to open pinned public key file"); } stringstream pinned_public_key_steam; pinned_public_key_steam << pinned_public_file.rdbuf(); pinned_cert_pub_key = pinned_public_key_steam.str(); dbgTrace(D_COMMUNICATION) << "Pinned public key has been loaded"; return pinned_cert_pub_key; } bool MessageConnection::verifyCertPinning(const BioUniquePtr &cert) { dbgFlow(D_COMMUNICATION); if (cert.get() == nullptr) { dbgWarning(D_COMMUNICATION) << "Certificate is missing"; return false; } auto public_key = calculatePublicKey(cert); if (!public_key.ok()) { dbgWarning(D_COMMUNICATION) << "The provided public key is not valid. Error: " << public_key.getErr(); return false; } auto pinned_key = getPinnedCertificate(); if (!pinned_key.ok()) { dbgWarning(D_COMMUNICATION) << "The pinned public key is not valid. Error: " << pinned_key.getErr(); return false; } if(public_key.unpackMove().compare(pinned_key.unpack()) != 0) { dbgWarning(D_COMMUNICATION) << "The provided public key and the pinned public key are diffrent"; return false; } dbgTrace(D_COMMUNICATION) << "The provided public key is valid"; return true; } Maybe MessageConnection::doHandshake(const BioUniquePtr &bio) { auto timeout = chrono::microseconds( getConfigurationWithDefault(500000, "message", "Connection handshake timeout") ); auto end_time = timer->getMonotonicTime() + timeout; while (timer->getMonotonicTime() < end_time) { if (BIO_do_handshake(bio.get()) > 0 || shouldIgnoreSslValidation()) { return Maybe(); } if (!BIO_should_retry(bio.get())) { unsigned long ssl_err = ERR_get_error(); return genError( "Failed to obtain a successful SSL handshake. OpenSSL error: " + string(ERR_error_string(ssl_err, nullptr)) + ", OpenSSL error code: " + to_string(ssl_err) ); } } return genError("SSL handshake timed out"); } bool MessageConnection::setCnVerification() { SSL_set_hostflags(ssl_socket, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS); return SSL_set1_host(ssl_socket, host.c_str()) != 0; } bool MessageConnection::encrypt() { if (ssl_ctx.get() == nullptr) { dbgWarning(D_COMMUNICATION) << "SSL context does not exist"; return false; } BioUniquePtr s_bio = BioUniquePtr(BIO_new_ssl(ssl_ctx.get(), 1)); if (s_bio.get() == nullptr) { dbgWarning(D_COMMUNICATION) << "Failed to create encrypted BIO socket"; return false; } bio = BioUniquePtr(BIO_push(s_bio.release(), bio.release())); BIO_get_ssl(bio.get(), &ssl_socket); if (!ssl_socket) { dbgWarning(D_COMMUNICATION) << "Failed to locate SSL pointer"; return false; } if (!setCnVerification()) { dbgWarning(D_COMMUNICATION) << "Failed to set host name (CN) verification"; return false; } auto handshake_result = doHandshake(bio); if (!handshake_result.ok()) { dbgWarning(D_COMMUNICATION) << handshake_result.getErr(); return false; } if (!verifyCert()) { dbgWarning(D_COMMUNICATION) << "Failed to verify the certificate"; return false; } dbgTrace(D_COMMUNICATION) << "Successfully secured BIO socket for connection " << *this; return true; } // LCOV_EXCL_STOP bool MessageConnection::setSocket() { auto is_secure_conn = (is_secure && !over_proxy); bio = is_secure_conn ? BioUniquePtr(BIO_new_ssl_connect(ssl_ctx.get())) : BioUniquePtr(BIO_new(BIO_s_connect())); if (!bio.get()) { dbgWarning(D_COMMUNICATION) << "Failed to create new " << (is_secure_conn ? "secure" : "clear") << " BIO connection"; return false; } if (is_secure_conn) { BIO_get_ssl(bio.get(), &ssl_socket); if (!ssl_socket) { dbgWarning(D_COMMUNICATION) << "Failed to locate SSL pointer"; return false; } SSL_set_mode(ssl_socket, SSL_MODE_AUTO_RETRY); if (!setCnVerification()) { dbgWarning(D_COMMUNICATION) << "Failed to set host name (CN) verification"; return false; } if (!SSL_set_tlsext_host_name(ssl_socket, host.c_str())) { dbgWarning(D_COMMUNICATION) << "Failed to set TLS host name extension (SNI)"; return false; } } return true; } bool MessageConnection::connect(const string &host, const string &overwrite_port) { string address = host + ":" + overwrite_port; BIO_set_conn_hostname(bio.get(), address.c_str()); BIO_set_nbio(bio.get(), 1); auto timer = Singleton::Consume::by(); auto conn_timeout = chrono::microseconds(getConnectionTimeout()); auto end_time = timer->getMonotonicTime() + conn_timeout; int counter = 0; while (timer->getMonotonicTime() < end_time) { counter++; if (BIO_do_connect(bio.get()) > 0) { dbgDebug(D_COMMUNICATION) << "Successfully established new BIO connection. " << "Number of attempts: " << counter; if (is_secure && !over_proxy) { auto handshake_result = doHandshake(bio); if (!handshake_result.ok()) { dbgWarning(D_COMMUNICATION) << handshake_result.getErr(); return false; } return verifyCert(); } return true; } if (!BIO_should_retry(bio.get())) { int bio_err = ERR_get_error(); dbgWarning(D_COMMUNICATION) << "Failed completely to establish new BIO connection (BIO won't retry!)." << "trying next address. OpenSSL error: " << string(ERR_error_string(bio_err, nullptr)) << ", OpenSSL error code: " << bio_err << ", Number of attempts: " << counter; return false; } if (mainloop != nullptr && (counter % 10) == 0) mainloop->yield(true); } dbgWarning(D_COMMUNICATION) << "Failed to establish new connection after reaching timeout. " << "address: " << address << ", Number of attempts: " << counter; return false; } template Maybe MessageConnection::receiveResponse(I_MessageDecoder &decoder) { auto end_time = timer->getMonotonicTime() + chrono::microseconds(getConnectionTimeout()); uint counter = 0; char buf[1000]; while (timer->getMonotonicTime() < end_time) { int len_or_error_ret_val = BIO_read(bio.get(), buf, sizeof(buf) - 1); if (len_or_error_ret_val <= 0) { if (!BIO_should_retry(bio.get())) { if (len_or_error_ret_val == 0) { if (connection_closed_count == 1) { dbgWarning(D_COMMUNICATION) << "Connection closed. Type: " << tagToString(tag) << ", Count: " << connection_closed_count; } else { dbgDebug(D_COMMUNICATION) << "Connection closed. Type: " << tagToString(tag) << ", Count: " << connection_closed_count; } auto maybe_message = decoder.decodeBytes(string()); if (maybe_message.ok()) { return maybe_message.unpackMove(); } } if (connection_closed_count == 1) { dbgWarning(D_COMMUNICATION) << "Failed to read data from BIO socket. Type: " << tagToString(tag) << ", Count: " << connection_closed_count << ", Error code: " << len_or_error_ret_val; } else { dbgDebug(D_COMMUNICATION) << "Failed to read data from BIO socket. Type: " << tagToString(tag) << ", Count: " << connection_closed_count << ", Error code: " << len_or_error_ret_val; } connection_closed_count++; return genError("Error reading from BIO socket"); } if (mainloop != nullptr) mainloop->yield(true); continue; } if (connection_closed_count > 0) { dbgInfo(D_COMMUNICATION) << "Connection was reconnected. Type: " << tagToString(tag) << ", number of attempts: " << connection_closed_count; connection_closed_count = 0; } string data = string(buf, len_or_error_ret_val); dbgTrace(D_HTTP_REQUEST) << "Received the following data:\n" << data; auto maybe_message = decoder.decodeBytes(data); if (maybe_message.ok()) { return maybe_message.unpackMove(); } if (mainloop != nullptr && (counter++ % 5) == 0) mainloop->yield(true); } dbgWarning(D_COMMUNICATION) << "Failed to receive data after reaching timeout"; return genError("Reading took too long"); } Maybe MessageConnection::startNewConnection( const string &host, const uint16_t port_num, const bool is_secure, MessageTypeTag tag, const bool over_proxy, bool is_external, bool is_ssl_ignore_validation ) { MessageConnection conn = MessageConnection( host, port_num, is_secure, over_proxy, is_external, tag, is_ssl_ignore_validation ); Maybe conn_res = conn.establishConnection(); if (!conn_res.ok()) return conn_res.passErr(); dbgTrace(D_COMMUNICATION) << "Started new connection for tag: " << tagToString(tag); return move(conn); } MessageConnection::~MessageConnection() {} bool MessageConnection::setCtx() { if (!is_secure) return true; ssl_ctx = BioUniquePtr(SSL_CTX_new(TLS_client_method())); if (ssl_ctx.get() == nullptr) { dbgWarning(D_COMMUNICATION) << "Failed to initialize SSL context"; return false; } if (shouldIgnoreSslValidation()) return true; SSL_CTX_set_verify(ssl_ctx.get(), SSL_VERIFY_PEER, nullptr); filesystem_prefix = getFilesystemPathConfig(); dbgTrace(D_COMMUNICATION) << "MessageConnection, file systen prefix: " << filesystem_prefix << endl; string cert_file_path = getConfigurationWithDefault( filesystem_prefix + "/certs/fog.pem", "message", "Certificate chain file path" ); string openssl_dir = "/usr/lib/ssl/certs/"; auto openssl_dir_maybe = Singleton::Consume::by()->getOpenSSLDir(); if (openssl_dir_maybe.ok()) openssl_dir = openssl_dir_maybe.unpack(); auto trusted_ca_directory = getConfigurationWithDefault( openssl_dir, "message", "Trusted CA directory" ); const char *ca_dir_path = nullptr; if (!trusted_ca_directory.empty()) { ca_dir_path = trusted_ca_directory.c_str(); } if (SSL_CTX_load_verify_locations(ssl_ctx.get(), cert_file_path.c_str(), ca_dir_path) == 1) { return true; } dbgWarning(D_COMMUNICATION) << "Failed to load fog's certificate file. Path: " << cert_file_path; return false; } bool MessageConnection::isReady() const { dbgFlow(D_COMMUNICATION); if (!bio.get()) { dbgTrace(D_COMMUNICATION) << "Bio is uninitialized"; return false; } if (!is_secure) return true; if (!ssl_socket) { dbgTrace(D_COMMUNICATION) << "SSL socket is uninitialized"; return false; } if (!ssl_ctx.get()) { dbgTrace(D_COMMUNICATION) << "SSL context is uninitialized"; return false; } return true; } string MessageConnection::printData(const string &data) { auto type = getConfigurationWithDefault("chopped", "message", "Data printout type"); if (type == "chopped") return data.substr(0, 10) + (data.size() > 10 ? " ..." : ""); if (type == "full") return data; if (type == "size") return to_string(data.size()) + " bytes"; if (type == "none") return ""; dbgWarning(D_COMMUNICATION) << "Unknown data printout option '" << type << "' - going with 'chopped' instead."; return data.substr(0, 10) + (data.size() > 10 ? " ..." : ""); } uint MessageConnection::getConnectionTimeout() { I_Environment *environment = Singleton::Consume::by(); auto tmo_override = environment->get("Connection timeout Override"); uint conf_tmo = tmo_override.ok() ? *tmo_override : getConfigurationWithDefault(2000000, "message", "Connection timeout"); uint profile_setting_tmo = getProfileAgentSettingWithDefault( conf_tmo, "agent.config.message.connectionTimeout" ); auto executable = environment->get("Service Name"); auto nano_service_name_tmo = getProfileAgentSetting("agent.config.message.connectionTimeoutServiceName"); if (!nano_service_name_tmo.ok() || !executable.ok()) { dbgTrace(D_COMMUNICATION) << "Could not identify service name. Executable env state: " << (executable.ok() ? "true" : "false") << ", state of nano service name from settings: " << (nano_service_name_tmo.ok() ? "true" : "false") << ", timeout value to use: " << conf_tmo; return conf_tmo; } if (*nano_service_name_tmo == *executable) { dbgTrace(D_COMMUNICATION) << "Using profile setting for specific nano service. " << " nano service name: " << *nano_service_name_tmo << ", timeout value used: " << profile_setting_tmo; return profile_setting_tmo; } dbgTrace(D_COMMUNICATION) << "Using non profile config setting for nano service. " << " profile configuration for nano service name: " << *nano_service_name_tmo << ", actual service name: " << *executable << ", timeout value used: " << conf_tmo; return conf_tmo; } bool MessageConnection::sendData(const string &data) const { dbgTrace(D_HTTP_REQUEST) << "Sending the following data " << *this << ":\n" << printData(data); auto end_time = timer->getMonotonicTime() + chrono::microseconds(getConnectionTimeout()); uint counter = 0; int remaining_data_len = data.length(); while (timer->getMonotonicTime() < end_time) { int offset = data.length() - remaining_data_len; int data_sent_len = BIO_write(bio.get(), data.c_str() + offset, remaining_data_len); if (data_sent_len > 0) { if (remaining_data_len - data_sent_len < 0) { dbgWarning(D_COMMUNICATION) << "Sent data length exceeded actual data length (" << to_string(data_sent_len) << " > " << to_string(remaining_data_len) << ")"; return false; } dbgTrace(D_COMMUNICATION) << "Successfully sent " << to_string(data_sent_len) << " bytes of data out of total " << to_string(data.length()) << " bytes."; remaining_data_len -= data_sent_len; if (remaining_data_len == 0) return true; if (mainloop != nullptr && (counter++ % 5) == 0) mainloop->yield(true); continue; } if(!BIO_should_retry(bio.get())) { dbgWarning(D_COMMUNICATION) << "Failed to Write data into BIO socket. Error code: " << data_sent_len; return false; } dbgTrace(D_COMMUNICATION) << "Temporarily cannot send data. Will retry."; if (mainloop != nullptr) mainloop->yield(true); } dbgWarning(D_COMMUNICATION) << "Failed to send data after reaching timeout"; return false; } ostream & MessageConnection::print(ostream &os) const { os << "<" << host << ":" << port_num << " over " << (is_secure? "secure" : "clear") << " socket>"; return os; } ostream & operator<<(ostream &os, const MessageConnection &conn) { return conn.print(os); } void MessageConnection::getIpFromHostname(const string &hostname) { struct addrinfo *servinfo = nullptr; auto __scope_exit = make_scope_exit([&servinfo] () { if (servinfo) freeaddrinfo(servinfo); }); struct addrinfo hints; memset (&hints, 0, sizeof (hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags |= AI_CANONNAME; hints.ai_protocol = IPPROTO_TCP; vector res; if (IPAddr::isValidIPAddr(hostname)) { dbgDebug(D_COMMUNICATION) << "Provided host name is already an IP address. Host: " << hostname; current_ips.clear(); current_ips.push_back(hostname); return; } if (getaddrinfo(hostname.c_str(), nullptr, &hints, &servinfo) != 0) { dbgWarning(D_COMMUNICATION) << "IP address was not found for the given host name. Host: " << hostname; return; } for(struct addrinfo *addr_iter = servinfo; addr_iter != nullptr; addr_iter = addr_iter->ai_next) { char buf[INET6_ADDRSTRLEN]; const char *formatted_addr; if (addr_iter->ai_addr->sa_family == AF_INET) { struct in_addr addr = reinterpret_cast(addr_iter->ai_addr)->sin_addr; formatted_addr = inet_ntop(AF_INET, &addr, buf, sizeof(buf)); } else if (addr_iter->ai_addr->sa_family == AF_INET6) { struct in6_addr addr = reinterpret_cast(addr_iter->ai_addr)->sin6_addr; formatted_addr = inet_ntop(AF_INET6, &addr, buf, sizeof(buf)); } else { continue; } res.push_back(string(formatted_addr)); dbgDebug(D_COMMUNICATION) << "Successfully resolved host name to IP address. Host: " << hostname << ", IP: " << res.back(); } if (res.empty()) { dbgWarning(D_COMMUNICATION) << "No IPv4 / IPv6 addresses were found for the given host. Host: " << hostname; return; } current_ips = res; return; } Maybe MessageConnection::establishConnection() { if (!setCtx()) return genError("Failed to initialize SSL context"); dbgDebug(D_COMMUNICATION) << "Succesfully initialized SSL context"; if (!setSocket()) return genError("Failed to create new socket"); dbgDebug(D_COMMUNICATION) << "Succesfully created new socket"; string conn_host = over_proxy ? proxy_host : host; string conn_port = over_proxy ? to_string(proxy_port) : to_string(port_num); getIpFromHostname(conn_host); Maybe is_connected = genError("Failed to establish new connection with: " + conn_host + ":" + conn_port); for (const string &address : current_ips) { if (is_connected.ok()) break; dbgDebug(D_COMMUNICATION) << "Trying to connect to " << address << ":" << conn_port; if (!connect(address, conn_port)) { dbgWarning(D_COMMUNICATION) << "Failed to connect " << address << ":" << conn_port; continue; } is_connected = (over_proxy && is_secure) ? establishConnectionOverProxy() : Maybe(); if (!is_connected.ok()) { dbgWarning(D_COMMUNICATION) << "Failed to connect " << address << ":" << conn_port; continue; } dbgDebug(D_COMMUNICATION) << "Successfully connected to " << address << ":" << conn_port; } return is_connected; } // LCOV_EXCL_START Reason: No proxy for ut Maybe MessageConnection::establishConnectionOverProxy() { HTTPEncoder http_encoder(host, to_string(port_num)); HTTPRequest req = http_encoder.Connect(); if (!proxy_auth.empty()) req.insertHeader("Proxy-Authorization", "Basic " + encryptor->base64Encode(proxy_auth)); waitForQueue(); auto release_queue_on_exit = make_scope_exit([this] () { releaseQueue(); }); if (!sendData(req.toString())) { return genError("Failed to send CONNECT request to proxy"); } HTTPDecoder http_decoder(I_Messaging::Method::CONNECT); Maybe response = receiveResponse(http_decoder); if (!response.ok()) { return genError("Failed to receive a response from proxy"); } auto response_data = response->getResponse(); if (!response_data.ok()) { return genError("Failed to connect via proxy"); } if (!encrypt()) { return genError("Failed to encrypt the socket after the CONNECT request"); } return Maybe(); } // LCOV_EXCL_STOP Maybe MessageConnection::reconnect(bool should_lock) { if (should_lock) { while (!lock()) { mainloop->yield(true); } } auto res = establishConnection(); if (should_lock) { unlock(); } return res; } void MessageConnection::waitForQueue() { dbgTrace(D_COMMUNICATION) << "Pending queue position"; while (available_messaging_queue_pos == UINT64_MAX) { mainloop->yield(true); } uint64_t messaging_queue_pos = available_messaging_queue_pos++; dbgTrace(D_COMMUNICATION) << "Received an available queue position: " << messaging_queue_pos; metrics_current_size++; MessageQueueEvent queue_event; queue_event.setMessageQueueSize(metrics_current_size); queue_event.notify(); while (messaging_queue_pos != current_messaging_queue_pos) { mainloop->yield(true); } dbgTrace(D_COMMUNICATION) << "Reached the current queue position: " << messaging_queue_pos; return; } void MessageConnection::releaseQueue() { dbgTrace(D_COMMUNICATION) << "Released the queue position " << current_messaging_queue_pos; current_messaging_queue_pos++; if (current_messaging_queue_pos == UINT64_MAX) { current_messaging_queue_pos = 0; available_messaging_queue_pos = 0; } metrics_current_size--; dbgTrace(D_COMMUNICATION) << "Queue position was advanced"; } ProtoMessageComp::ProtoMessageComp() : Component("ProtoMessageComp"), pimpl(make_unique()) {} ProtoMessageComp::~ProtoMessageComp() {} void ProtoMessageComp::init() { pimpl->init(); } void ProtoMessageComp::fini() { pimpl->fini(); } void ProtoMessageComp::preload() { registerExpectedConfiguration("message", "Cache timeout"); registerExpectedConfiguration("message", "Connection timeout"); registerExpectedConfiguration("message", "send event retry in sec"); registerExpectedConfiguration("message", "Reuse connection"); registerExpectedConfiguration("message", "Verify SSL pinning"); registerExpectedConfiguration("message", "Buffer Failed Requests"); registerExpectedConfiguration("message", "Certificate chain file path"); registerExpectedConfiguration("message", "Trusted CA directory"); registerExpectedConfiguration("message", "Public key path"); registerExpectedConfiguration("message", "Metrics Routine Interval"); registerExpectedConfiguration("message", "Data printout type"); registerExpectedConfiguration("message", "Internal Fog error timeout"); }