Aug_23_2023-Dev

This commit is contained in:
Ned Wright
2023-08-23 14:15:32 +00:00
parent 702c1184ea
commit b25fd8def5
115 changed files with 8292 additions and 1189 deletions

View File

@@ -216,10 +216,6 @@ public:
void
init()
{
proxies = {
{ProxyProtocol::HTTP, ProxyData()},
{ProxyProtocol::HTTPS, ProxyData()}
};
initSSL();
timer = Singleton::Consume<I_TimeGet>::by<ProtoMessageComp>();
encryptor = Singleton::Consume<I_Encryptor>::by<ProtoMessageComp>();
@@ -227,8 +223,8 @@ public:
msg_buffer = Singleton::Consume<I_MessagingBuffer>::by<ProtoMessageComp>();
MessageConnection::timer = Singleton::Consume<I_TimeGet>::by<ProtoMessageComp>();
agent_details = Singleton::Consume<I_AgentDetails>::by<ProtoMessageComp>();
proxy_configuration = Singleton::Consume<I_ProxyConfiguration>::by<ProtoMessageComp>();
agent_details->readAgentDetails();
loadAccessToken();
if (!setActiveFog()) {
dbgDebug(D_COMMUNICATION) << "Could not initialize active fog connection";
@@ -240,42 +236,6 @@ public:
auto cache_timeout = getConfigurationWithDefault<int>(2, "message", "Cache timeout");
cache.startExpiration(seconds(cache_timeout), mainloop, timer);
auto proxy_config = getProfileAgentSetting<string>("agent.config.message.proxy");
if (proxy_config.ok()) {
agent_details->setProxy(*proxy_config);
agent_details->writeAgentDetails();
}
registerConfigLoadCb(
[&]()
{
auto proxy_config = getProfileAgentSetting<string>("agent.config.message.proxy");
if (proxy_config.ok()) {
is_proxy_configured_via_settings = true;
agent_details->setProxy(*proxy_config);
agent_details->writeAgentDetails();
} else if (is_proxy_configured_via_settings) {
is_proxy_configured_via_settings = false;
agent_details->setProxy(string(""));
agent_details->writeAgentDetails();
}
}
);
auto load_env_proxy = loadProxy();
if (!load_env_proxy.ok()) {
dbgDebug(D_COMMUNICATION)
<< "Could not initialize load proxy from environment, Error: "
<< load_env_proxy.getErr();
}
Singleton::Consume<I_MainLoop>::by<ProtoMessageComp>()->addRecurringRoutine(
I_MainLoop::RoutineType::System,
seconds(60),
[this] () { loadAccessToken(); },
"Load access token"
);
auto metrics_debugs_interval =
chrono::seconds(getConfigurationWithDefault<uint64_t>(
600,
@@ -324,30 +284,6 @@ public:
MessageConnection::timer = nullptr;
}
void
loadAccessToken()
{
filesystem_prefix = getFilesystemPathConfig();
dbgTrace(D_COMMUNICATION) << "ProtoMessageComp, file systen prefix: " << filesystem_prefix << endl;
auto data_path = getConfigurationWithDefault<string>(
filesystem_prefix + "/data/",
"encryptor",
"Data files directory"
);
ifstream token_file(data_path + session_token_file_name);
if (!token_file.is_open()) {
dbgWarning(D_COMMUNICATION) << "Failed to open session token file";
return;
}
stringstream token_steam;
token_steam << token_file.rdbuf();
auto new_token = token_steam.str();
if (access_token != new_token) {
access_token = new_token;
dbgTrace(D_COMMUNICATION) << "Loaded the new token";
}
}
// LCOV_EXCL_START Reason: No proxy for ut
void
setFogProxy(const string &host, const uint16_t port, ProxyProtocol proto)
@@ -355,7 +291,7 @@ public:
dbgTrace(D_COMMUNICATION) << "Proxy was set. Proxy: " << host << ":" << port;
MessageConnection::proxy_host = host;
MessageConnection::proxy_port = port;
auto proxy_auth = getProxyCredentials(proto);
auto proxy_auth = proxy_configuration->getProxyCredentials(proto);
if (proxy_auth.ok()) {
MessageConnection::proxy_auth = proxy_auth.unpack();
}
@@ -368,23 +304,23 @@ public:
MessageConnKey fog_key = make_tuple("fog", 0, tag);
proxy_protocol = is_secure ? ProxyProtocol::HTTPS : ProxyProtocol::HTTP;
auto load_env_proxy = loadProxy();
auto load_env_proxy = proxy_configuration->loadProxy();
if (!load_env_proxy.ok()) {
dbgDebug(D_COMMUNICATION)
<< "Could not initialize load proxy from environment, Error: "
<< load_env_proxy.getErr();
}
if (getProxyExists(proxy_protocol)) {
auto proxy_host = getProxyDomain(proxy_protocol);
auto proxy_port = getProxyPort(proxy_protocol);
if (proxy_configuration->getProxyExists(proxy_protocol)) {
auto proxy_host = proxy_configuration->getProxyDomain(proxy_protocol);
auto proxy_port = proxy_configuration->getProxyPort(proxy_protocol);
if (proxy_host.ok() && proxy_port.ok()) {
setFogProxy(proxy_host.unpack(), proxy_port.unpack(), proxy_protocol);
}
}
Maybe<MessageConnection> conn = MessageConnection::startNewConnection(
host, port, is_secure, tag, getProxyExists(proxy_protocol)
host, port, is_secure, tag, proxy_configuration->getProxyExists(proxy_protocol)
);
if (!conn.ok()) {
dbgWarning(D_COMMUNICATION)
@@ -403,7 +339,7 @@ public:
<< ":"
<< port
<< " via "
<< (getProxyExists(proxy_protocol) ? "proxy, using " : "")
<< (proxy_configuration->getProxyExists(proxy_protocol) ? "proxy, using " : "")
<< (is_secure ? "secure" : "clear")
<< " connection";
@@ -652,7 +588,7 @@ public:
return sendMessage(conn_iter->second, get_reply, body, method, url, headers, err_call_back);
}
}
auto load_env_proxy = loadProxy();
auto load_env_proxy = proxy_configuration->loadProxy();
if (!load_env_proxy.ok()) return genError(load_env_proxy.getErr());
Maybe<MessageConnection> conn = MessageConnection::startNewConnection(
@@ -682,153 +618,6 @@ public:
return sendMessage(active_conn, get_reply, body, method, url, headers, err_call_back);
}
string getAccessToken() override
{
return access_token;
}
Maybe<string>
getProxyDomain(ProxyProtocol protocol) const override
{
if (proxies.find(protocol) == proxies.end()) {
return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol));
}
if (proxies.at(protocol).domain.empty()) return genError(
convertProxyProtocolToString(protocol) + string(" proxy domain is unset")
);
return proxies.at(protocol).domain;
}
Maybe<string>
getProxyCredentials(ProxyProtocol protocol) const override
{
if (proxies.find(protocol) == proxies.end()) {
return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol));
}
if (proxies.at(protocol).auth.empty()) return genError(
convertProxyProtocolToString(protocol) + string(" proxy auth is unset")
);
return proxies.at(protocol).auth;
}
Maybe<uint16_t>
getProxyPort(ProxyProtocol protocol) const override
{
if (proxies.find(protocol) == proxies.end()) {
return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol));
}
if (proxies.at(protocol).port == 0) return genError(
convertProxyProtocolToString(protocol) + string(" proxy port is unset")
);
return proxies.at(protocol).port;
}
bool
getProxyExists(ProxyProtocol protocol) const override
{
if (proxies.find(protocol) == proxies.end()) {
dbgInfo(D_COMMUNICATION)
<< "Proxy type is not loaded in map, type: "
<< convertProxyProtocolToString(protocol);
return false;
}
return proxies.at(protocol).is_exists;
}
Maybe<string>
getProxyAddress(ProxyProtocol protocol) const override
{
if (proxies.find(protocol) == proxies.end()) {
return genError("Proxy type is not loaded in map, type: " + convertProxyProtocolToString(protocol));
}
if (proxies.at(protocol).protocol.empty() ||
proxies.at(protocol).domain.empty() ||
proxies.at(protocol).port == 0) {
return genError(
string("Can't construct ") +
convertProxyProtocolToString(protocol) +
string(" proxy address")
);
}
return proxies.at(protocol).protocol +
"://" +
proxies.at(protocol).domain +
":" +
to_string(proxies.at(protocol).port);
}
Maybe<void>
loadProxy() override
{
if (getConfigurationFlag("orchestration-mode") == "offline_mode") return Maybe<void>();
for (const auto &proxy_type : proxies) {
auto loaded_proxy = loadProxyType(proxy_type.first);
if (!loaded_proxy.ok()) return loaded_proxy;
}
return Maybe<void>();
}
Maybe<void>
loadProxyType(ProxyProtocol protocol)
{
dbgAssert(protocol == ProxyProtocol::HTTP || protocol == ProxyProtocol::HTTPS)
<< "Unsupported Proxy Protocol " << static_cast<int>(protocol);
static const map<ProxyProtocol, string> env_var_name = {
{ProxyProtocol::HTTPS, "https_proxy"},
{ProxyProtocol::HTTP, "http_proxy"}
};
auto env_proxy = loadProxyType(env_var_name.at(protocol));
if (!env_proxy.ok()) return genError(env_proxy.getErr());
if (env_proxy.unpack().empty()) {
return Maybe<void>();
}
string protocol_regex = "(http|https)://";
const static boost::regex no_auth_proxy_regex(protocol_regex + "(.)*:[0-9]{0,5}(/|)");
const static boost::regex auth_proxy_regex(protocol_regex + "(.)*:(.)*@(.)*:[0-9]{0,5}(/|)");
ProxyData env_proxy_data;
env_proxy_data.is_exists = true;
string proxy_copy;
if (!NGEN::Regex::regexMatch(__FILE__, __LINE__, env_proxy.unpack(), boost::regex(protocol_regex + "(.)*"))) {
env_proxy = "http://" + env_proxy.unpack();
}
proxy_copy.assign(env_proxy.unpack());
env_proxy_data.protocol = env_proxy.unpack().substr(0, proxy_copy.find(":"));
proxy_copy.erase(0, proxy_copy.find(":") + 3); //remove "http://" or "https://"
if (NGEN::Regex::regexMatch(__FILE__, __LINE__, env_proxy.unpack(), auth_proxy_regex)) {
env_proxy_data.auth = string(&proxy_copy[0], &proxy_copy[proxy_copy.find("@")]);
proxy_copy.erase(0, proxy_copy.find("@") + 1); // remove "user:pass@"
} else if (!NGEN::Regex::regexMatch(__FILE__, __LINE__, env_proxy.unpack(), no_auth_proxy_regex)) {
return genError(string("Provided proxy has wrong syntax: ") + env_proxy.unpack());
}
env_proxy_data.domain = proxy_copy.substr(0, proxy_copy.find(":"));
proxy_copy.erase(0, proxy_copy.find(":") + 1); // remove "host:"
env_proxy_data.port = static_cast<uint16_t>(stoi(proxy_copy));
auto proxy_syntax = verifyProxySyntax(
env_proxy_data.protocol,
env_proxy_data.auth,
env_proxy_data.domain,
to_string(env_proxy_data.port),
env_proxy.unpack()
);
if (!proxy_syntax.ok()) return proxy_syntax;
if (env_proxy_data == proxies.at(protocol)) {
return Maybe<void>();
}
proxies.at(protocol) = env_proxy_data;
dbgInfo(D_COMMUNICATION)
<< convertProxyProtocolToString(protocol)
<< " proxy was successfully loaded, "
<< getProxyAddress(protocol).unpack();
return Maybe<void>();
}
private:
Maybe<string>
sendMessage(
@@ -935,85 +724,6 @@ private:
return genError("Failed to send HTTP request");
}
Maybe<void>
verifyProxySyntax(
const string &protocol,
const string &auth,
const string &domain,
const string &port,
const string &env_proxy)
{
stringstream verify_string;
verify_string
<< protocol
<< "://"
<< (!auth.empty() ? auth + string("@") : "")
<< domain
<< ":"
<< port
<< (env_proxy.back() == '/' ? "/" : "");
if (env_proxy.compare(verify_string.str()) != 0) {
return genError(string("Provided proxy has the wrong syntax:" ) + env_proxy);
}
return Maybe<void>();
}
Maybe<string>
loadProxyType(const string &proxy_type)
{
agent_details->readAgentDetails();
auto proxy_config = agent_details->getProxy();
if (proxy_config.ok()) {
if (proxy_config.unpack() == "none") {
return Maybe<string>(string());
}
return proxy_config;
}
#ifdef gaia
I_ShellCmd *shell_cmd = Singleton::Consume<I_ShellCmd>::by<ProtoMessageComp>();
auto proxy_ip = shell_cmd->getExecOutput("dbget proxy:ip-address| tr -d '\n'");
if (!proxy_ip.ok()) return proxy_ip;
auto proxy_port = shell_cmd->getExecOutput("dbget proxy:port| tr -d '\n'");
if (!proxy_port.ok()) return proxy_port;
if (*proxy_port != "" && *proxy_ip != "") return ("http://" + *proxy_ip + ":" + *proxy_port);
const string umis_file_path(string(getenv("CPDIR")) + "/tmp/umis_objects.C");
{
ifstream umis_file(umis_file_path.c_str());
if (!umis_file.good()) return Maybe<string>(string());
}
const string read_umis_cmd = "cat " + umis_file_path + " | grep -w \"";
const string parse_value_command = "\" | awk -F \"[ \\t]+\" '{printf $NF}' | tr -d \"()\"";
auto use_proxy = shell_cmd->getExecOutput(read_umis_cmd + "use_proxy" + parse_value_command);
if (!use_proxy.ok())
return genError("Failed to read use_proxy from " + umis_file_path + ": " + use_proxy.getErr());
if (use_proxy.unpack() == "true") {
auto umis_proxy_add = shell_cmd->getExecOutput(read_umis_cmd + "proxy_address" + parse_value_command);
if (!umis_proxy_add.ok() || *umis_proxy_add == "") return umis_proxy_add;
auto umis_proxy_port = shell_cmd->getExecOutput(read_umis_cmd + "proxy_port" + parse_value_command);
if (!umis_proxy_port.ok() || *umis_proxy_port == "") return umis_proxy_port;
return ("http://" + *umis_proxy_add + ":" + *umis_proxy_port);
} else {
dbgTrace(D_COMMUNICATION) << "Smart Console Proxy is turned off";
}
return Maybe<string>(string());
#else // not gaia
char *proxy = getenv(proxy_type.c_str());
if (proxy) return string(proxy);
proxy = getenv(boost::algorithm::to_upper_copy(proxy_type).c_str());
if (proxy) return string(proxy);
return Maybe<string>(string());
#endif // gaia
}
string
base64Decode(const string &input) const
{
@@ -1087,6 +797,8 @@ private:
}
}
const string &access_token = agent_details->getAccessToken();
if (!conn.isExternal() && !access_token.empty() && headers.find("Authorization") == std::string::npos) {
req.insertHeader("Authorization", "Bearer " + access_token);
}
@@ -1186,17 +898,6 @@ private:
return count;
}
string
convertProxyProtocolToString(ProxyProtocol proto) const
{
switch(proto) {
case ProxyProtocol::HTTP: return "http";
case ProxyProtocol::HTTPS: return "https";
}
dbgAssert(false) << "Unsupported Proxy Protocol " << static_cast<int>(proto);
return "";
}
Maybe<Method>
stringToMethod(const string &name)
{
@@ -1209,40 +910,19 @@ private:
return genError("Cannot convert unknown HTTP method to Enum. Method name: " + name);
}
class ProxyData
{
public:
bool
operator==(const ProxyData &other) const
{
return protocol==other.protocol &&
domain==other.domain &&
is_exists==other.is_exists &&
port==other.port &&
auth==other.auth;
}
string protocol = "";
string domain = "";
string auth = "";
bool is_exists = false;
uint16_t port = 0;
};
bool is_proxy_configured_via_settings = false;
uint64_t number_of_reconnects = 0;
uint64_t number_of_reconnect_failures = 0;
uint64_t number_of_send_failure = 0;
I_AgentDetails *agent_details = nullptr;
I_MainLoop *mainloop = nullptr;
I_TimeGet *timer = nullptr;
I_Encryptor *encryptor = nullptr;
I_MessagingBuffer *msg_buffer = nullptr;
bool is_proxy_configured_via_settings = false;
uint64_t number_of_reconnects = 0;
uint64_t number_of_reconnect_failures = 0;
uint64_t number_of_send_failure = 0;
I_AgentDetails *agent_details = nullptr;
I_MainLoop *mainloop = nullptr;
I_TimeGet *timer = nullptr;
I_Encryptor *encryptor = nullptr;
I_MessagingBuffer *msg_buffer = nullptr;
I_ProxyConfiguration *proxy_configuration = nullptr;
map<MessageConnKey, MessageConnection> active_connections;
map<MessageTypeTag, MessageConnKey> tag_to_active_conn_key;
map<ProxyProtocol, ProxyData> proxies;
ProxyProtocol proxy_protocol;
string access_token;
TemporaryCache<string, string> cache;
static const map<ProxyProtocol, string> proxyProtocolToString;
set<HTTPRequestSignature> pending_signatures;
@@ -1671,7 +1351,7 @@ MessageConnection::receiveResponse(I_MessageDecoder<T> &decoder)
}
if (connection_closed_count > 0) {
dbgInfo(D_COMMUNICATION)
dbgTrace(D_COMMUNICATION)
<< "Connection was reconnected. Type: "
<< tagToString(tag)
<< ", number of attempts: "