sync code

This commit is contained in:
Ned Wright
2024-11-28 10:41:59 +00:00
parent 6255e1f30d
commit 1c1f0b7e29
59 changed files with 842 additions and 707 deletions

View File

@@ -37,7 +37,7 @@ target_link_libraries(
ngen_core
-Wl,-whole-archive
"table;debug_is;shell_cmd;metric;tenant_manager;messaging;encryptor;time_proxy;singleton;mainloop;environment;logging;report;rest"
"config;intelligence_is_v2;event_is;memory_consumption;connkey"
"compression_utils;-lz;config;intelligence_is_v2;event_is;memory_consumption;connkey"
"instance_awareness;socket_is;agent_details;agent_details_reporter;buffers;cpu;agent_core_utilities"
"report_messaging"
-Wl,-no-whole-archive

View File

@@ -117,7 +117,7 @@ private:
return;
}
dbgInfo(D_AGENT_DETAILS)
dbgDebug(D_AGENT_DETAILS)
<< "Successfully handled attributes persistence. Operation: "
<< operation
<< ", Path "

View File

@@ -31,15 +31,26 @@ public:
HTTPResponse() = default;
// LCOV_EXCL_STOP
HTTPResponse(HTTPStatusCode _status_code, const std::string &_body) : status_code(_status_code), body(_body) {}
HTTPResponse(
HTTPStatusCode _status_code,
const std::string &_body,
std::unordered_map<std::string, std::string> _headers = std::unordered_map<std::string, std::string>()
)
:
status_code(_status_code),
body(_body),
headers(_headers)
{}
HTTPStatusCode getHTTPStatusCode() const;
const std::string & getBody() const;
std::string toString() const;
Maybe<std::string> getHeaderVal(const std::string &header_key);
private:
HTTPStatusCode status_code;
std::string body;
std::unordered_map<std::string, std::string> headers;
};
#endif // __HTTP_RESPONSE_H__

View File

@@ -63,6 +63,7 @@ enum class HTTPStatusCode
HTTP_PROXY_AUTHENTICATION_REQUIRED = 407,
HTTP_REQUEST_TIME_OUT = 408,
HTTP_PAYLOAD_TOO_LARGE = 413,
HTTP_TOO_MANY_REQUESTS = 429,
// 5xx - Server error responses.
HTTP_INTERNAL_SERVER_ERROR = 500,
HTTP_NOT_IMPLEMENTED = 501,

View File

@@ -8,6 +8,7 @@
#include "config.h"
#include "singleton.h"
#include "i_agent_details.h"
#include "i_time_get.h"
class MessageProxySettings
{
@@ -54,7 +55,7 @@ private:
uint16_t proxy_port = 0;
};
class MessageMetadata
class MessageMetadata : Singleton::Consume<I_TimeGet>
{
public:
inline MessageMetadata();
@@ -227,6 +228,26 @@ public:
return sni_host_name;
}
void
setRateLimitBlock(uint block_time)
{
is_rate_limit_block = true;
auto timer = Singleton::Consume<I_TimeGet>::by<MessageMetadata>();
auto current_timeout = timer->getMonotonicTime() + std::chrono::seconds(block_time);
rate_limit_block_time = current_timeout.count();
}
bool
isRateLimitBlock() const
{
if (is_rate_limit_block) {
auto timer = Singleton::Consume<I_TimeGet>::by<MessageMetadata>();
uint current_time = timer->getMonotonicTime().count();
if (current_time < rate_limit_block_time) return true;
}
return false;
}
template <class Archive>
void
serialize(Archive &ar)
@@ -243,7 +264,9 @@ public:
cereal::make_nvp("is_to_fog", is_to_fog),
cereal::make_nvp("ca_path", ca_path),
cereal::make_nvp("client_cert_path", client_cert_path),
cereal::make_nvp("client_key_path", client_key_path)
cereal::make_nvp("client_key_path", client_key_path),
cereal::make_nvp("is_rate_limit_block", is_rate_limit_block),
cereal::make_nvp("rate_limit_block_time", rate_limit_block_time)
);
}
@@ -262,6 +285,8 @@ private:
std::string external_certificate = "";
bool should_buffer = false;
bool is_to_fog = false;
bool is_rate_limit_block = false;
uint rate_limit_block_time = 0;
};
#endif // __MESSAGING_METADATA_H__

View File

@@ -107,6 +107,8 @@ DEFINE_FLAG(D_COMPONENT, D_ALL)
DEFINE_FLAG(D_WAAP_PARSER_PAIRS, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_PDF, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_BINARY_FILE, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_KNOWN_SOURCE_SKIPPER, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_SCREENED_JSON, D_WAAP_PARSER)
DEFINE_FLAG(D_IPS, D_COMPONENT)
DEFINE_FLAG(D_FILE_UPLOAD, D_COMPONENT)

View File

@@ -24,6 +24,7 @@
#include "i_mainloop.h"
#include "i_time_get.h"
#include "i_agent_details.h"
#include "i_encryptor.h"
#include "i_instance_awareness.h"
#include "i_environment.h"
#include "i_messaging.h"
@@ -57,6 +58,7 @@ class GenericMetric
Singleton::Consume<I_Environment>,
Singleton::Consume<I_Messaging>,
Singleton::Consume<I_RestApi>,
Singleton::Consume<I_Encryptor>,
public Listener<AllMetricEvent>
{
public:

View File

@@ -23,6 +23,8 @@
#include "report/report.h"
#include "customized_cereal_map.h"
#include "compression_utils.h"
#include "i_encryptor.h"
class GenericMetric;
@@ -56,6 +58,11 @@ public:
value(_value)
{
timestamp = Singleton::Consume<I_TimeGet>::by<GenericMetric>()->getWalltimeStr();
// convert timestamp to RFC 3339 format
std::size_t pos = timestamp.find('.');
if (pos != std::string::npos) {
timestamp = timestamp.substr(0, pos) + "Z";
}
asset_id = Singleton::Consume<I_AgentDetails>::by<GenericMetric>()->getAgentId();
}
@@ -118,7 +125,73 @@ public:
}
// LCOV_EXCL_START Reason: Tested in unit test (testAIOPSMapMetric), but not detected by coverage
std::string
Maybe<std::string>
toString() const
{
std::stringstream ss;
{
cereal::JSONOutputArchive ar(ss);
serialize(ar);
}
auto res = compressAndEncodeData(ss.str());
if (!res.ok()) {
return genError("Failed to compress and encode the data");
}
return res.unpack();
}
// LCOV_EXCL_STOP
private:
Maybe<std::string>
compressAndEncodeData(const std::string &unhandled_data) const
{
std::string data_holder = unhandled_data;
auto compression_stream = initCompressionStream();
CompressionResult compression_response = compressData(
compression_stream,
CompressionType::GZIP,
data_holder.size(),
reinterpret_cast<const unsigned char *>(data_holder.c_str()),
true
);
finiCompressionStream(compression_stream);
if (!compression_response.ok) {
// send log to Kibana
return genError("Failed to compress(gzip) data");
}
std::string compressed_data =
std::string((const char *)compression_response.output, compression_response.num_output_bytes);
auto encryptor = Singleton::Consume<I_Encryptor>::by<GenericMetric>();
Maybe<std::string> handled_data = encryptor->base64Encode(compressed_data);
if (compression_response.output) free(compression_response.output);
compression_response.output = nullptr;
compression_response.num_output_bytes = 0;
return handled_data;
}
std::vector<AiopsMetricData> metrics;
};
class CompressAndEncodeAIOPSMetrics
{
public:
CompressAndEncodeAIOPSMetrics(const AiopsMetricList &_aiops_metrics) : aiops_metrics(_aiops_metrics) {}
void
serialize(cereal::JSONOutputArchive &ar) const
{
auto metric_str = aiops_metrics.toString();
if (!metric_str.ok()) {
return;
}
ar(cereal::make_nvp("records", metric_str.unpack()));
}
// LCOV_EXCL_START Reason: Tested in unit test (testAIOPSMapMetric), but not detected by coverage
Maybe<std::string>
toString() const
{
std::stringstream ss;
@@ -131,7 +204,7 @@ public:
// LCOV_EXCL_STOP
private:
std::vector<AiopsMetricData> metrics;
AiopsMetricList aiops_metrics;
};
class MetricCalc

View File

@@ -33,6 +33,8 @@
#include "maybe_res.h"
#include "rest/schema_printer.h"
static const std::string BULK_ARRAY_NAME = "bulkArray";
/// @class JsonError
/// @brief Class representing JSON parsing errors.
///

View File

@@ -93,10 +93,13 @@ public:
res << "\"name\": \"" << details->getAgentId() << "\", ";
auto rest = Singleton::Consume<I_RestApi>::by<IntelligenceComponentV2>();
res << "\"url\": \"http://127.0.0.1:" << rest->getListeningPort() <<"/set-new-invalidation\", ";
res << "\"capabilities\": { \"getBulkCallback\": " << "true" << " }, ";
res << "\"dataMap\": [";
res << stream.str();
res << " ] }";
dbgTrace(D_INTELLIGENCE) << res.str();
return res;
}
@@ -148,13 +151,19 @@ public:
void
performCallBacks(const Invalidation &invalidation, const string &registration_id) const override
{
dbgDebug(D_INTELLIGENCE) << "Looking for callbacks for invalidation " << invalidation.genObject();
dbgTrace(D_INTELLIGENCE)
<< "Looking for callbacks for invalidation "
<< invalidation.genObject()
<< " registration id: "
<< registration_id;
if (registration_id != "") {
auto invalidation_cb = registration_id_to_cb.find(registration_id);
if (invalidation_cb != registration_id_to_cb.end()) return invalidation_cb->second(invalidation);
}
dbgDebug(D_INTELLIGENCE) << "Have not found callback per registration id";
for (auto &registed_invalidation : callbacks) {
dbgTrace(D_INTELLIGENCE) << "Checking against: " << registed_invalidation.second.first.genObject();
dbgDebug(D_INTELLIGENCE) << "Checking against: " << registed_invalidation.second.first.genObject();
performCallBacksImpl(invalidation, registed_invalidation.second);
}
}
@@ -169,6 +178,7 @@ private:
auto &registereed_invalidation = invalidation_and_cb.first;
auto &cb = invalidation_and_cb.second;
if (!registereed_invalidation.matches(actual_invalidation)) return;
dbgTrace(D_INTELLIGENCE) << "Found a matching invalidation registration, should callback";
cb(actual_invalidation);
}
@@ -177,7 +187,7 @@ private:
uint running_id = 0;
};
class ReceiveInvalidation : public ServerRest
class SingleReceivedInvalidation : public ServerRest
{
public:
void
@@ -248,6 +258,29 @@ private:
C2S_OPTIONAL_PARAM(string, invalidationType);
};
class ReceiveInvalidation : public ServerRest
{
public:
void
doCall() override
{
dbgTrace(D_INTELLIGENCE)
<< (bulkArray.isActive() ?
"BULK invalidations, receiving invalidations in bulks"
: "error in format, expected bulk invalidations, not single");
for (SingleReceivedInvalidation &r : bulkArray.get()) {
r.doCall();
}
return;
}
private:
C2S_LABEL_PARAM(vector<SingleReceivedInvalidation>, bulkArray, BULK_ARRAY_NAME);
};
class PagingController
{
public:

View File

@@ -486,6 +486,8 @@ TEST_F(IntelligenceInvalidation, register_for_invalidation)
EXPECT_THAT(body, HasSubstr("\"mainAttributes\": [ { \"attr2\": \"2\" } ]"));
EXPECT_THAT(body, HasSubstr("\"attributes\": [ { \"attr3\": \"3\" } ]"));
EXPECT_TRUE(md.getConnectionFlags().isSet(MessageConnectionConfig::UNSECURE_CONN));
EXPECT_THAT(body, HasSubstr("\"capabilities\": { \"getBulkCallback\": true }"));
}
TEST_F(IntelligenceInvalidation, register_for_multiple_assets_invalidation)
@@ -529,6 +531,7 @@ TEST_F(IntelligenceInvalidation, register_for_multiple_assets_invalidation)
));
EXPECT_NE(i_intelligence->registerInvalidation(invalidation, callback), 0);
EXPECT_THAT(body, HasSubstr("\"capabilities\": { \"getBulkCallback\": true }"));
EXPECT_THAT(
body,
@@ -606,7 +609,7 @@ TEST_F(IntelligenceInvalidation, invalidation_callback)
.setObjectType(Intelligence::ObjectType::ASSET);
stringstream json;
json << invalidation2.genObject();
json << "[" << invalidation2.genObject() << "]";
mock_invalidation->performRestCall(json);
EXPECT_EQ(recieved_invalidations.size(), 1u);
@@ -650,7 +653,7 @@ TEST_F(IntelligenceInvalidation, delete_invalidation_callback)
.setObjectType(Intelligence::ObjectType::ASSET);
stringstream json;
json << invalidation2.genObject();
json << "[" << invalidation2.genObject() << "]";
mock_invalidation->performRestCall(json);
EXPECT_EQ(recieved_invalidations.size(), 0u);
@@ -694,7 +697,7 @@ TEST_F(IntelligenceInvalidation, invalidation_short_handling)
.setObjectType(Intelligence::ObjectType::ASSET);
stringstream json;
json << invalidation2.genObject();
json << "[" << invalidation2.genObject() << "]";
mock_invalidation->performRestCall(json);
EXPECT_EQ(recieved_invalidations.size(), 0u);
@@ -789,7 +792,7 @@ TEST_F(IntelligenceInvalidation, invalidation_flow_with_multiple_assets)
.setObjectType(Intelligence::ObjectType::ASSET);
stringstream json1;
json1 << not_matching_invalidation.genObject();
json1 << "[" << not_matching_invalidation.genObject() << "]";
mock_invalidation->performRestCall(json1);
EXPECT_EQ(recieved_invalidations.size(), 0u);
@@ -805,7 +808,7 @@ TEST_F(IntelligenceInvalidation, invalidation_flow_with_multiple_assets)
.setObjectType(Intelligence::ObjectType::ASSET);
stringstream json2;
json2 << matching_invalidation.genObject();
json2 << "[" << matching_invalidation.genObject() << "]";
mock_invalidation->performRestCall(json2);
EXPECT_EQ(recieved_invalidations.size(), 1u);
@@ -865,7 +868,7 @@ TEST_F(IntelligenceInvalidation, invalidation_cb_match_2_registred_assets)
auto stop_listening_2 = make_scope_exit([&] { invalidation_2_to_register.stopListening(i_intelligence); });
stringstream json;
json << matching_invalidation.genObject();
json << "[" << matching_invalidation.genObject() << "]";
mock_invalidation->performRestCall(json);
EXPECT_EQ(recieved_invalidations.size(), 2u);
@@ -927,9 +930,43 @@ TEST_F(IntelligenceInvalidation, invalidation_cb_match_by_registration_id)
string modifiedJsonString = matching_invalidation.genObject().substr(2);
stringstream json;
json << "{ \"invalidationRegistrationId\": \""<< *registration_id << "\", " << modifiedJsonString;
json << "[{ \"invalidationRegistrationId\": \""<< *registration_id << "\", " << modifiedJsonString << "]";
cout << json.str() << endl;
mock_invalidation->performRestCall(json);
EXPECT_EQ(recieved_invalidations.size(), 1u);
}
TEST_F(IntelligenceInvalidation, bulk_invalidation_callback)
{
stringstream configuration;
configuration << "{";
configuration << " \"agentSettings\":[";
configuration << " {\"key\":\"agent.config.useLocalIntelligence\",\"id\":\"id1\",\"value\":\"true\"}";
configuration << " ],";
configuration << " \"intelligence\":{";
configuration << " \"local intelligence server ip\":\"127.0.0.1\",";
configuration << " \"local intelligence server primary port\":9090";
configuration << " }";
configuration << "}";
Singleton::Consume<Config::I_Config>::from(conf)->loadConfiguration(configuration);
EXPECT_CALL(
messaging_mock,
sendSyncMessage(_, "/api/v2/intelligence/invalidation/register", _, _, _)
).WillRepeatedly(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, "")));
auto invalidation = Invalidation("agent_y");
EXPECT_NE(i_intelligence->registerInvalidation(invalidation, callback), 0);
auto invalidation2 = Invalidation("agent2");
EXPECT_NE(i_intelligence->registerInvalidation(invalidation2, callback), 0);
auto invalidation3 = Invalidation("agent3");
EXPECT_NE(i_intelligence->registerInvalidation(invalidation3, callback), 0);
dbgTrace(D_INTELLIGENCE) << "2 callbacks: ";
stringstream json3;
json3 << "[{\"class\":\"agent3\"},{\"class\":\"agent2\"}]";
mock_invalidation->performRestCall(json3);
EXPECT_EQ(recieved_invalidations.size(), 2u);
}

View File

@@ -25,7 +25,7 @@ private:
bool isLegalChunkedResponse(const std::string &res);
Maybe<HTTPStatusCode> status_code = genError("Not received");
Maybe<std::map<std::string, std::string>> headers = genError("Not received");
Maybe<std::unordered_map<std::string, std::string>> headers = genError("Not received");
std::string body;
std::string raw_response;
bool error = false;

View File

@@ -109,7 +109,7 @@ MessagingBufferComponent::Impl::init()
encryptor = Singleton::Consume<I_Encryptor>::by<Messaging>();
mainloop = Singleton::Consume<I_MainLoop>::by<Messaging>();
messaging = Singleton::Consume<I_Messaging>::from<Messaging>();
auto sub_path = getProfileAgentSettingWithDefault<string>("nano_agent/event_buffer/", "eventBuffer.baseFolder");
buffer_root_path = getLogFilesPathConfig() + "/" + sub_path;
string executable_name =
@@ -276,11 +276,18 @@ MessagingBufferComponent::Impl::sendMessage()
}
if (res == HTTPStatusCode::HTTP_SUSPEND) {
dbgDebug(D_MESSAGING) << "Suspended connection - sleeping for a while";
dbgDebug(D_MESSAGING) << "Message is Suspended - sleeping for a while";
mainloop->yield(chrono::seconds(1));
return true;
}
if (res == HTTPStatusCode::HTTP_TOO_MANY_REQUESTS) {
dbgDebug(D_MESSAGING) << "Suspended message due to rate limit block - sleeping for a while";
mainloop->yield(chrono::seconds(1));
popMessage();
return true;
}
++curr_no_retries;
if (curr_no_retries >= getProfileAgentSettingWithDefault<uint>(10, "eventBuffer.maxNumOfSendigRetries")) {
dbgWarning(D_MESSAGING) << "Reached maximum number of retries - poping message";
@@ -295,6 +302,12 @@ MessagingBufferComponent::Impl::sendMessage(const BufferedMessage &message) cons
{
MessageMetadata message_metadata = message.getMessageMetadata();
message_metadata.setShouldBufferMessage(false);
if (message_metadata.isRateLimitBlock()) {
mainloop->yield(chrono::seconds(1));
return HTTPStatusCode::HTTP_SUSPEND;
}
auto res = messaging->sendSyncMessage(
message.getMethod(),
message.getURI(),
@@ -305,6 +318,9 @@ MessagingBufferComponent::Impl::sendMessage(const BufferedMessage &message) cons
if (res.ok()) return HTTPStatusCode::HTTP_OK;
if (res.getErr().getHTTPStatusCode() == HTTPStatusCode::HTTP_SUSPEND) return HTTPStatusCode::HTTP_SUSPEND;
if (res.getErr().getHTTPStatusCode() == HTTPStatusCode::HTTP_TOO_MANY_REQUESTS) {
return HTTPStatusCode::HTTP_TOO_MANY_REQUESTS;
}
return HTTPStatusCode::HTTP_UNKNOWN;
}

View File

@@ -36,6 +36,7 @@ static const map<HTTPStatusCode, string> status_code_to_string = {
{ HTTPStatusCode::HTTP_PROXY_AUTHENTICATION_REQUIRED, "407 - HTTP_PROXY_AUTHENTICATION_REQUIRED" },
{ HTTPStatusCode::HTTP_REQUEST_TIME_OUT, "408 - HTTP_REQUEST_TIME_OUT" },
{ HTTPStatusCode::HTTP_PAYLOAD_TOO_LARGE, "413 - HTTP_PAYLOAD_TOO_LARGE" },
{ HTTPStatusCode::HTTP_TOO_MANY_REQUESTS, "429 - HTTP_TOO_MANY_REQUESTS" },
{ HTTPStatusCode::HTTP_INTERNAL_SERVER_ERROR, "500 - HTTP_INTERNAL_SERVER_ERROR" },
{ HTTPStatusCode::HTTP_NOT_IMPLEMENTED, "501 - HTTP_NOT_IMPLEMENTED" },
{ HTTPStatusCode::HTTP_BAD_GATEWAY, "502 - HTTP_BAD_GATEWAY" },
@@ -63,6 +64,7 @@ static const map<int, HTTPStatusCode> num_to_status_code = {
{ 407, HTTPStatusCode::HTTP_PROXY_AUTHENTICATION_REQUIRED },
{ 408, HTTPStatusCode::HTTP_REQUEST_TIME_OUT },
{ 413, HTTPStatusCode::HTTP_PAYLOAD_TOO_LARGE },
{ 429, HTTPStatusCode::HTTP_TOO_MANY_REQUESTS },
{ 500, HTTPStatusCode::HTTP_INTERNAL_SERVER_ERROR },
{ 501, HTTPStatusCode::HTTP_NOT_IMPLEMENTED },
{ 502, HTTPStatusCode::HTTP_BAD_GATEWAY },
@@ -100,6 +102,16 @@ HTTPResponse::toString() const
return "[Status-code]: " + code->second + ", [Body]: " + (body.empty() ? "{}" : body);
}
Maybe<string>
HTTPResponse::getHeaderVal(const string &header_key)
{
auto header = headers.find(header_key);
if (header == headers.end()) {
return genError("Header \'" + header_key + "\' not found.");
}
return header->second;
}
Maybe<HTTPResponse>
HTTPResponseParser::parseData(const string &data, bool is_connect)
{
@@ -116,7 +128,7 @@ HTTPResponseParser::parseData(const string &data, bool is_connect)
if (!handleBody(is_connect)) return genError("Response not ready!");
return HTTPResponse(status_code.unpack(), body);
return HTTPResponse(status_code.unpack(), body, headers.unpack());
}
static string
@@ -133,7 +145,7 @@ bool
HTTPResponseParser::handleHeaders()
{
stringstream ss(raw_response);
map<string, string> header_map;
unordered_map<string, string> header_map;
while (true) {
string header;
@@ -171,7 +183,7 @@ HTTPResponseParser::getHeaderVal(const string &header_key)
auto headers_map = headers.unpack();
auto header = headers_map.find(header_key);
if (header == headers_map.end()) {
return genError("Header\'" + header_key + "\' not found.");
return genError("Header \'" + header_key + "\' not found.");
}
return header->second;
}

View File

@@ -148,6 +148,21 @@ MessagingComp::sendMessage(
auto response = i_conn->sendRequest(conn, *req);
if (!response.ok()) return response.passErr();
auto response_data = response.unpack();
if (response_data.getHTTPStatusCode() == HTTPStatusCode::HTTP_TOO_MANY_REQUESTS) {
dbgDebug(D_MESSAGING) << "Too many requests. Suspend the message";
auto rate_limit_metadata = message_metadata;
uint retry_after_sec = 60;
auto retry_after_header = response_data.getHeaderVal("retry-after");
if (retry_after_header.ok()) {
retry_after_sec = stoi(*retry_after_header);
}
rate_limit_metadata.setShouldBufferMessage(true);
rate_limit_metadata.setRateLimitBlock(retry_after_sec);
return suspendMessage(body, method, uri, category, rate_limit_metadata);
}
if (is_to_fog && method == HTTPMethod::GET) fog_get_requests_cache.emplaceEntry(uri, *response);
return response;
}
@@ -355,6 +370,15 @@ MessagingComp::suspendMessage(
const MessageMetadata &message_metadata
) const
{
if (message_metadata.isRateLimitBlock()) {
dbgInfo(D_MESSAGING) << "Rate limit block is active, message is suspended, message is buffered.";
i_messaging_buffer->pushNewBufferedMessage(body, method, uri, category, message_metadata, false);
return genError<HTTPResponse>(
HTTPStatusCode::HTTP_TOO_MANY_REQUESTS,
"The connection is suspended due to rate limit block, message is buffered."
);
}
if (message_metadata.shouldBufferMessage()) {
dbgWarning(D_MESSAGING) << "Buffering message due to connection suspended";
i_messaging_buffer->pushNewBufferedMessage(body, method, uri, category, message_metadata, false);

View File

@@ -276,3 +276,30 @@ TEST_F(TestMessagingComp, testSetFogConnection)
EXPECT_CALL(mock_messaging_connection, establishConnection(metadata, category)).WillOnce(Return(conn));
EXPECT_TRUE(messaging_comp.setFogConnection(category));
}
TEST_F(TestMessagingComp, testRateLimitBlock)
{
setAgentDetails();
string body = "test body";
HTTPMethod method = HTTPMethod::POST;
string uri = "/test-uri";
MessageCategory category = MessageCategory::GENERIC;
MessageConnectionKey conn_key(fog_addr, fog_port, MessageCategory::GENERIC);
Flags<MessageConnectionConfig> conn_flags;
conn_flags.setFlag(MessageConnectionConfig::UNSECURE_CONN);
MessageMetadata conn_metadata(fog_addr, fog_port, conn_flags, false, true);
Connection conn(conn_key, conn_metadata);
EXPECT_CALL(mock_messaging_connection, getFogConnectionByCategory(MessageCategory::GENERIC))
.WillOnce(Return(conn));
unordered_map<string, string> res_headers = {{"Retry-After", "10"}};
HTTPResponse res(HTTPStatusCode::HTTP_TOO_MANY_REQUESTS, "response!!", res_headers);
EXPECT_CALL(mock_messaging_connection, mockSendRequest(_, _, _)).WillOnce(Return(res));
auto sending_res = messaging_comp.sendSyncMessage(method, uri, body, category, conn_metadata);
ASSERT_FALSE(sending_res.ok());
HTTPResponse http_res = sending_res.getErr();
EXPECT_EQ(http_res.getBody(), "The connection is suspended due to rate limit block, message is buffered.");
EXPECT_EQ(http_res.getHTTPStatusCode(), HTTPStatusCode::HTTP_TOO_MANY_REQUESTS);
}

View File

@@ -32,7 +32,10 @@ MetricMetadata::Units operator"" _unit(const char *str, size_t) { return MetricM
MetricMetadata::Description operator"" _desc(const char *str, size_t) { return MetricMetadata::Description{str}; }
// LCOV_EXCL_START Reason: Tested in unit test (testAIOPSMapMetric), but not detected by coverage
static ostream & operator<<(ostream &os, const AiopsMetricList &metrics) { return os << metrics.toString(); }
static ostream & operator<<(ostream &os, const CompressAndEncodeAIOPSMetrics &metrics)
{
return os << metrics.toString();
}
// LCOV_EXCL_STOP
vector<AiopsMetricData>
@@ -44,8 +47,8 @@ MetricCalc::getAiopsMetrics() const
string name = getMetricDotName() != "" ? getMetricDotName() : getMetricName();
string units = getMetircUnits();
string description = getMetircDescription();
string type = getMetricType() == MetricType::GAUGE ? "gauge" : "counter";
string type = getMetricType() == MetricType::GAUGE ? "Gauge" : "Counter";
return { AiopsMetricData(name, type, units, description, getBasicLabels(), value) };
}
@@ -320,17 +323,17 @@ class PrometheusRest : public ClientRest
public:
Metric(const string &n, const string &t, const string &d, const string &l, const string &v)
:
name(n),
type(t),
description(d),
metric_name(n),
metric_type(t),
metric_description(d),
labels(l),
value(v)
{}
private:
C2S_PARAM(string, name);
C2S_PARAM(string, type);
C2S_PARAM(string, description);
C2S_PARAM(string, metric_name);
C2S_PARAM(string, metric_type);
C2S_PARAM(string, metric_description);
C2S_PARAM(string, labels);
C2S_PARAM(string, value);
};
@@ -356,6 +359,7 @@ void
GenericMetric::generatePrometheus()
{
if (!getProfileAgentSettingWithDefault(false, "prometheus")) return;
dbgTrace(D_METRICS) << "Generate prometheus metric";
vector<PrometheusData> all_metrics;
for (auto &calc : calcs) {
@@ -371,7 +375,7 @@ GenericMetric::generatePrometheus()
new_config_req_md.setConnectioFlag(MessageConnectionConfig::UNSECURE_CONN);
Singleton::Consume<I_Messaging>::by<GenericMetric>()->sendSyncMessage(
HTTPMethod::POST,
"/set-prometheus-data",
"/add-metrics",
rest,
MessageCategory::GENERIC,
new_config_req_md
@@ -382,6 +386,7 @@ void
GenericMetric::generateAiopsLog()
{
if (!getConfigurationWithDefault<bool>(true, "metric", "aiopsMetricSendEnable")) return;
dbgTrace(D_METRICS) << "Generate AIOPS metric";
AiopsMetricList aiops_metrics;
@@ -397,18 +402,17 @@ GenericMetric::generateAiopsLog()
Level::LOG,
LogLevel::INFO,
audience,
team,
ReportIS::AudienceTeam::HORIZON_TELEMETRY,
Severity::INFO,
Priority::LOW,
report_interval,
LogField("agentId", Singleton::Consume<I_AgentDetails>::by<GenericMetric>()->getAgentId()),
tags,
Tags::INFORMATIONAL,
issuing_engine
ReportIS::IssuingEngine::HORIZON_TELEMETRY_METRICS
);
metric_to_fog << LogField("eventObject", aiops_metrics);
metric_to_fog << LogField("eventObject", CompressAndEncodeAIOPSMetrics(aiops_metrics));
LogRest metric_client_rest(metric_to_fog);
sendLog(metric_client_rest);
}

View File

@@ -9,6 +9,7 @@
#include "mock/mock_time_get.h"
#include "mock/mock_rest_api.h"
#include "agent_details.h"
#include "mock/mock_encryptor.h"
#include "mock/mock_messaging.h"
#include "mock/mock_instance_awareness.h"
#include "config.h"
@@ -219,6 +220,7 @@ public:
::Environment env;
ConfigComponent conf;
AgentDetails agent_details;
StrictMock<MockEncryptor> mock_encryptor;
NiceMock<MockMessaging> messaging_mock;
stringstream debug_output;
I_MainLoop::Routine routine;
@@ -556,7 +558,7 @@ TEST_F(MetricTest, printPromeathus)
cpu_event.notify();
string message_body;
EXPECT_CALL(messaging_mock, sendSyncMessage(_, "/set-prometheus-data", _, _, _))
EXPECT_CALL(messaging_mock, sendSyncMessage(_, "/add-metrics", _, _, _))
.WillOnce(DoAll(SaveArg<2>(&message_body), Return(HTTPResponse())));
routine();
@@ -564,44 +566,44 @@ TEST_F(MetricTest, printPromeathus)
"{\n"
" \"metrics\": [\n"
" {\n"
" \"name\": \"cpuMax\",\n"
" \"type\": \"gauge\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"cpuMax\",\n"
" \"metric_type\": \"gauge\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\"}\",\n"
" \"value\": \"89\"\n"
" },\n"
" {\n"
" \"name\": \"cpuMin\",\n"
" \"type\": \"gauge\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"cpuMin\",\n"
" \"metric_type\": \"gauge\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\"}\",\n"
" \"value\": \"89\"\n"
" },\n"
" {\n"
" \"name\": \"cpuAvg\",\n"
" \"type\": \"gauge\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"cpuAvg\",\n"
" \"metric_type\": \"gauge\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\"}\",\n"
" \"value\": \"89\"\n"
" },\n"
" {\n"
" \"name\": \"cpuCurrent\",\n"
" \"type\": \"gauge\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"cpuCurrent\",\n"
" \"metric_type\": \"gauge\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\"}\",\n"
" \"value\": \"89\"\n"
" },\n"
" {\n"
" \"name\": \"cpuCounter\",\n"
" \"type\": \"gauge\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"cpuCounter\",\n"
" \"metric_type\": \"gauge\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\"}\",\n"
" \"value\": \"1\"\n"
" },\n"
" {\n"
" \"name\": \"cpuTotalCounter\",\n"
" \"type\": \"counter\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"cpuTotalCounter\",\n"
" \"metric_type\": \"counter\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\"}\",\n"
" \"value\": \"1\"\n"
" }\n"
@@ -636,7 +638,7 @@ TEST_F(MetricTest, printPromeathusMultiMap)
HttpTransaction("/index.html", "POST", 40).notify();
string message_body;
EXPECT_CALL(messaging_mock, sendSyncMessage(_, "/set-prometheus-data", _, _, _))
EXPECT_CALL(messaging_mock, sendSyncMessage(_, "/add-metrics", _, _, _))
.WillOnce(DoAll(SaveArg<2>(&message_body), Return(HTTPResponse())));
routine();
@@ -644,25 +646,25 @@ TEST_F(MetricTest, printPromeathusMultiMap)
"{\n"
" \"metrics\": [\n"
" {\n"
" \"name\": \"request.total\",\n"
" \"type\": \"counter\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"request.total\",\n"
" \"metric_type\": \"counter\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\","
"method=\\\"GET\\\",url=\\\"/index.html\\\"}\",\n"
" \"value\": \"1\"\n"
" },\n"
" {\n"
" \"name\": \"request.total\",\n"
" \"type\": \"counter\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"request.total\",\n"
" \"metric_type\": \"counter\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\","
"method=\\\"POST\\\",url=\\\"/index.html\\\"}\",\n"
" \"value\": \"1\"\n"
" },\n"
" {\n"
" \"name\": \"request.total\",\n"
" \"type\": \"counter\",\n"
" \"description\": \"\",\n"
" \"metric_name\": \"request.total\",\n"
" \"metric_type\": \"counter\",\n"
" \"metric_description\": \"\",\n"
" \"labels\": \"{agent=\\\"Unknown\\\",id=\\\"87\\\","
"method=\\\"GET\\\",url=\\\"/index2.html\\\"}\",\n"
" \"value\": \"1\"\n"
@@ -1396,6 +1398,8 @@ TEST_F(MetricTest, testManyValuesOutOfOrder)
TEST_F(MetricTest, basicAIOPSMetricTest)
{
EXPECT_CALL(timer, getWalltimeStr()).WillRepeatedly(Return(string("2016-11-13T17:31:24.087")));
EXPECT_CALL(mock_encryptor, base64Encode(_)).WillRepeatedly(Return("compress and encode metric payload"));
CPUMetric cpu_mt;
cpu_mt.init(
"CPU usage",
@@ -1452,14 +1456,14 @@ TEST_F(MetricTest, basicAIOPSMetricTest)
" \"eventLevel\": \"Log\",\n"
" \"eventLogLevel\": \"info\",\n"
" \"eventAudience\": \"Internal\",\n"
" \"eventAudienceTeam\": \"Agent Core\",\n"
" \"eventAudienceTeam\": \"\",\n"
" \"eventFrequency\": 5,\n"
" \"eventTags\": [\n"
" \"Informational\"\n"
" ],\n"
" \"eventSource\": {\n"
" \"agentId\": \"Unknown\",\n"
" \"issuingEngine\": \"Agent Core\",\n"
" \"issuingEngine\": \"horizonTelemetryMetrics\",\n"
" \"eventTraceId\": \"\",\n"
" \"eventSpanId\": \"\",\n"
" \"issuingEngineVersion\": \"\",\n"
@@ -1469,92 +1473,7 @@ TEST_F(MetricTest, basicAIOPSMetricTest)
" },\n"
" \"eventData\": {\n"
" \"eventObject\": {\n"
" \"Metrics\": [\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"cpu.max\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"percrnt\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 89.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {},\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"cpuMin\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 89.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {},\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"cpuAvg\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 89.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {},\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"cpuCurrent\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 89.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {},\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"cpuCounter\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 1.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {},\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"cpuTotalCounter\",\n"
" \"MetricType\": \"counter\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 1.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {},\n"
" \"AssetID\": \"Unknown\"\n"
" }\n"
" ]\n"
" \"records\": \"compress and encode metric payload\"\n"
" }\n"
" }\n"
" }\n"
@@ -1595,8 +1514,68 @@ TEST_F(MetricTest, testAIOPSMapMetric)
_,
_
)).WillRepeatedly(SaveArg<2>(&message_body));
EXPECT_CALL(mock_encryptor, base64Encode(_)).WillRepeatedly(Return("compress and encode metric payload"));
routine();
// aiops data example
// " \"Metrics\": [\n"
// " {\n"
// " \"Timestamp\": \"2016-11-13T17:31:24Z\",\n"
// " \"MetricName\": \"/index.html\",\n"
// " \"MetricType\": \"Gauge\",\n"
// " \"MetricUnit\": \"\",\n"
// " \"MetricDescription\": \"\",\n"
// " \"MetricValue\": 0.0,\n"
// " \"ResourceAttributes\": {},\n"
// " \"MetricAttributes\": {\n"
// " \"key1\": \"value1\",\n"
// " \"key2\": \"value2\"\n"
// " },\n"
// " \"AssetID\": \"Unknown\"\n"
// " },\n"
// " {\n"
// " \"Timestamp\": \"2016-11-13T17:31:24Z\",\n"
// " \"MetricName\": \"/index2.html\",\n"
// " \"MetricType\": \"Gauge\",\n"
// " \"MetricUnit\": \"\",\n"
// " \"MetricDescription\": \"\",\n"
// " \"MetricValue\": 0.0,\n"
// " \"ResourceAttributes\": {},\n"
// " \"MetricAttributes\": {\n"
// " \"key1\": \"value1\",\n"
// " \"key2\": \"value2\"\n"
// " },\n"
// " \"AssetID\": \"Unknown\"\n"
// " },\n"
// " {\n"
// " \"Timestamp\": \"2016-11-13T17:31:24Z\",\n"
// " \"MetricName\": \"/index.html\",\n"
// " \"MetricType\": \"Counter\",\n"
// " \"MetricUnit\": \"\",\n"
// " \"MetricDescription\": \"\",\n"
// " \"MetricValue\": 0.0,\n"
// " \"ResourceAttributes\": {},\n"
// " \"MetricAttributes\": {},\n"
// " \"AssetID\": \"Unknown\"\n"
// " },\n"
// " {\n"
// " \"Timestamp\": \"2016-11-13T17:31:24Z\",\n"
// " \"MetricName\": \"/index2.html\",\n"
// " \"MetricType\": \"Counter\",\n"
// " \"MetricUnit\": \"\",\n"
// " \"MetricDescription\": \"\",\n"
// " \"MetricValue\": 0.0,\n"
// " \"ResourceAttributes\": {},\n"
// " \"MetricAttributes\": {},\n"
// " \"AssetID\": \"Unknown\"\n"
// " }\n"
// " ]\n"
// " }\n"
// " }\n"
// " }\n"
// "}";
string expected_message =
"{\n"
" \"log\": {\n"
@@ -1608,14 +1587,14 @@ TEST_F(MetricTest, testAIOPSMapMetric)
" \"eventLevel\": \"Log\",\n"
" \"eventLogLevel\": \"info\",\n"
" \"eventAudience\": \"Internal\",\n"
" \"eventAudienceTeam\": \"Agent Core\",\n"
" \"eventAudienceTeam\": \"\",\n"
" \"eventFrequency\": 5,\n"
" \"eventTags\": [\n"
" \"Informational\"\n"
" ],\n"
" \"eventSource\": {\n"
" \"agentId\": \"Unknown\",\n"
" \"issuingEngine\": \"Agent Core\",\n"
" \"issuingEngine\": \"horizonTelemetryMetrics\",\n"
" \"eventTraceId\": \"\",\n"
" \"eventSpanId\": \"\",\n"
" \"issuingEngineVersion\": \"\",\n"
@@ -1625,72 +1604,7 @@ TEST_F(MetricTest, testAIOPSMapMetric)
" },\n"
" \"eventData\": {\n"
" \"eventObject\": {\n"
" \"Metrics\": [\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"/index.html\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 25.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {\n"
" \"url\": \"/index.html\"\n"
" },\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"/index2.html\",\n"
" \"MetricType\": \"gauge\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 20.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {\n"
" \"url\": \"/index2.html\"\n"
" },\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"/index.html\",\n"
" \"MetricType\": \"counter\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 2.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {\n"
" \"url\": \"/index.html\"\n"
" },\n"
" \"AssetID\": \"Unknown\"\n"
" },\n"
" {\n"
" \"Timestamp\": \"2016-11-13T17:31:24.087\",\n"
" \"MetricName\": \"/index2.html\",\n"
" \"MetricType\": \"counter\",\n"
" \"MetricUnit\": \"\",\n"
" \"MetricDescription\": \"\",\n"
" \"MetricValue\": 1.0,\n"
" \"ResourceAttributes\": {\n"
" \"agent\": \"Unknown\",\n"
" \"id\": \"87\"\n"
" },\n"
" \"MetricAttributes\": {\n"
" \"url\": \"/index2.html\"\n"
" },\n"
" \"AssetID\": \"Unknown\"\n"
" }\n"
" ]\n"
" \"records\": \"compress and encode metric payload\"\n"
" }\n"
" }\n"
" }\n"

View File

@@ -33,6 +33,18 @@ ServerRest::performRestCall(istream &in)
{
try {
try {
int firstChar = in.peek();
if (firstChar != EOF) {
// array as root is not supported in JSONInputArchive format
// but this struct is in bulk invalidations requirements
// we will change the format here
if (firstChar == '[') {
std::string content((std::istreambuf_iterator<char>(in)), std::istreambuf_iterator<char>());
std::string modifiedContent = "{\"" + BULK_ARRAY_NAME + "\": " + content + "}";
std::istringstream* modifiedStream = new std::istringstream(modifiedContent);
in.rdbuf(modifiedStream->rdbuf());
}
}
cereal::JSONInputArchive in_ar(in);
load(in_ar);
} catch (cereal::Exception &e) {