May 27 update

This commit is contained in:
Ned Wright
2024-05-27 08:45:25 +00:00
parent 253ca70de6
commit fdc148aa9b
39 changed files with 1140 additions and 359 deletions

View File

@@ -36,7 +36,9 @@ I_Intelligence_IS_V2::queryIntelligence(
query_request.setCursor(Intelligence_IS_V2::CursorState::DONE, "");
} else {
query_request.setCursor(Intelligence_IS_V2::CursorState::IN_PROGRESS, response->getCursor());
if (ignore_in_progress) return genError("Query intelligence response with InProgress status");
if (ignore_in_progress && response->getResponseStatus() == Intelligence_IS_V2::ResponseStatus::IN_PROGRESS) {
return genError("Query intelligence response with InProgress status");
}
}
return serializable_response.getData();

View File

@@ -61,27 +61,34 @@ DEFINE_FLAG(D_COMPONENT, D_ALL)
DEFINE_FLAG(D_STREAMING, D_COMPONENT)
DEFINE_FLAG(D_STREAMING_DATA, D_STREAMING)
DEFINE_FLAG(D_CHECKSUM, D_STREAMING)
DEFINE_FLAG(D_WAAP, D_COMPONENT)
DEFINE_FLAG(D_OA_SCHEMA_UPDATER, D_WAAP)
DEFINE_FLAG(D_WAAP_API, D_WAAP)
DEFINE_FLAG(D_WAAP_AUTOMATION, D_WAAP)
DEFINE_FLAG(D_WAAP_REGEX, D_WAAP)
DEFINE_FLAG(D_WAAP_SAMPLE_PREPROCESS, D_WAAP)
DEFINE_FLAG(D_WAAP_SAMPLE_SCAN, D_WAAP)
DEFINE_FLAG(D_WAAP_EVASIONS, D_WAAP)
DEFINE_FLAG(D_WAAP_ASSET_STATE, D_WAAP)
DEFINE_FLAG(D_WAAP_CONFIDENCE_CALCULATOR, D_WAAP)
DEFINE_FLAG(D_WAAP_REPUTATION, D_WAAP)
DEFINE_FLAG(D_WAAP_SCORE_BUILDER, D_WAAP)
DEFINE_FLAG(D_WAAP_ULIMITS, D_WAAP)
DEFINE_FLAG(D_WAAP_SCANNER, D_WAAP)
DEFINE_FLAG(D_WAAP_DEEP_PARSER, D_WAAP)
DEFINE_FLAG(D_WAAP_BASE64, D_WAAP)
DEFINE_FLAG(D_WAAP_JSON, D_WAAP)
DEFINE_FLAG(D_WAAP_BOT_PROTECTION, D_WAAP)
DEFINE_FLAG(D_WAAP_STREAMING_PARSING, D_WAAP)
DEFINE_FLAG(D_WAAP_HEADERS, D_WAAP)
DEFINE_FLAG(D_WAAP_PARSER, D_WAAP)
DEFINE_FLAG(D_WAAP_GLOBAL, D_COMPONENT)
DEFINE_FLAG(D_WAAP, D_WAAP_GLOBAL)
DEFINE_FLAG(D_NGINX_EVENTS, D_WAAP)
DEFINE_FLAG(D_OA_SCHEMA_UPDATER, D_WAAP)
DEFINE_FLAG(D_WAAP_API, D_WAAP)
DEFINE_FLAG(D_WAAP_AUTOMATION, D_WAAP)
DEFINE_FLAG(D_WAAP_REGEX, D_WAAP)
DEFINE_FLAG(D_WAAP_SAMPLE_SCAN, D_WAAP)
DEFINE_FLAG(D_WAAP_ASSET_STATE, D_WAAP)
DEFINE_FLAG(D_WAAP_CONFIDENCE_CALCULATOR, D_WAAP)
DEFINE_FLAG(D_WAAP_REPUTATION, D_WAAP)
DEFINE_FLAG(D_WAAP_SCORE_BUILDER, D_WAAP)
DEFINE_FLAG(D_WAAP_ULIMITS, D_WAAP)
DEFINE_FLAG(D_WAAP_SCANNER, D_WAAP)
DEFINE_FLAG(D_WAAP_DEEP_PARSER, D_WAAP)
DEFINE_FLAG(D_WAAP_BASE64, D_WAAP)
DEFINE_FLAG(D_WAAP_JSON, D_WAAP)
DEFINE_FLAG(D_WAAP_BOT_PROTECTION, D_WAAP)
DEFINE_FLAG(D_WAAP_STREAMING_PARSING, D_WAAP)
DEFINE_FLAG(D_WAAP_HEADERS, D_WAAP)
DEFINE_FLAG(D_WAAP_OVERRIDE, D_WAAP)
DEFINE_FLAG(D_WAAP_SAMPLE_HANDLING, D_WAAP_GLOBAL)
DEFINE_FLAG(D_WAAP_SAMPLE_PREPROCESS, D_WAAP_SAMPLE_HANDLING)
DEFINE_FLAG(D_WAAP_EVASIONS, D_WAAP_SAMPLE_HANDLING)
DEFINE_FLAG(D_WAAP_PARSER, D_WAAP_GLOBAL)
DEFINE_FLAG(D_WAAP_PARSER_XML, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_HTML, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_BINARY, D_WAAP_PARSER)
@@ -98,7 +105,7 @@ DEFINE_FLAG(D_COMPONENT, D_ALL)
DEFINE_FLAG(D_WAAP_PARSER_PERCENT, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_PAIRS, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_PARSER_PDF, D_WAAP_PARSER)
DEFINE_FLAG(D_WAAP_OVERRIDE, D_WAAP)
DEFINE_FLAG(D_WAAP_PARSER_BINARY_FILE, D_WAAP_PARSER)
DEFINE_FLAG(D_IPS, D_COMPONENT)
DEFINE_FLAG(D_FILE_UPLOAD, D_COMPONENT)

View File

@@ -20,7 +20,6 @@
#include "intelligence_invalidation.h"
#include "intelligence_is_v2/intelligence_response.h"
#include "intelligence_request.h"
#include "intelligence_server.h"
using namespace std;
using namespace chrono;
@@ -33,6 +32,8 @@ static const string primary_port_setting = "local intelligence server primary po
static const string secondary_port_setting = "local intelligence server secondary port";
static const string invalidation_uri = "/api/v2/intelligence/invalidation";
static const string registration_uri = "/api/v2/intelligence/invalidation/register";
static const string query_uri = "/api/v2/intelligence/assets/query";
static const string queries_uri = "/api/v2/intelligence/assets/queries";
class I_InvalidationCallBack
{
@@ -245,6 +246,51 @@ private:
C2S_OPTIONAL_PARAM(string, invalidationType);
};
class PagingController
{
public:
PagingController()
{
uint request_overall_timeout_conf = getConfigurationWithDefault<uint>(
20,
"intelligence",
"request overall timeout"
);
timer = Singleton::Consume<I_TimeGet>::by<IntelligenceComponentV2>();
mainloop = Singleton::Consume<I_MainLoop>::by<IntelligenceComponentV2>();
paging_timeout = timer->getMonotonicTime() + chrono::microseconds(request_overall_timeout_conf * 1000000);
}
bool
isMoreResponses(const Maybe<Response> &res, const IntelligenceRequest &req)
{
response = res;
if (!res.ok() || req.getPagingStatus().ok()) return false;
if (res->getResponseStatus() != ResponseStatus::IN_PROGRESS) return false;
dbgTrace(D_INTELLIGENCE) << "Intelligence paging response is in progress";
mainloop->yield(true);
return hasTimeoutRemaining();
}
Maybe<Response> getResponse() const { return response; }
private:
bool
hasTimeoutRemaining() const
{
if (timer->getMonotonicTime() < paging_timeout) return true;
dbgDebug(D_INTELLIGENCE) << "Intelligence paging response reached timeout";
return false;
}
chrono::microseconds paging_timeout;
Maybe<Response> response = genError("Paging response is uninitialized");
I_TimeGet *timer;
I_MainLoop *mainloop;
};
class IntelligenceComponentV2::Impl
:
Singleton::Provide<I_Intelligence_IS_V2>::From<IntelligenceComponentV2>
@@ -255,13 +301,12 @@ public:
init()
{
message = Singleton::Consume<I_Messaging>::by<IntelligenceComponentV2>();
timer = Singleton::Consume<I_TimeGet>::by<IntelligenceComponentV2>();
mainloop = Singleton::Consume<I_MainLoop>::by<IntelligenceComponentV2>();
mainloop->addRecurringRoutine(
I_MainLoop::RoutineType::System,
chrono::minutes(12),
[this] () { sendReccurringInvalidationRegistration(); },
[this] () { sendRecurringInvalidationRegistration(); },
"Sending intelligence invalidation"
);
@@ -272,12 +317,7 @@ public:
bool
sendInvalidation(const Invalidation &invalidation) const override
{
if (hasLocalInvalidationSupport()) {
return sendLocalInvalidation(invalidation);
}
else {
return sendGlobalInvalidation(invalidation);
}
return sendIntelligence(invalidation).ok();
}
Maybe<uint>
@@ -285,7 +325,7 @@ public:
{
if (!invalidation.isLegalInvalidation()) return genError("Attempting to register invalid invalidation");
auto res = invalidations.emplace(invalidation, cb);
sendReccurringInvalidationRegistration();
sendRecurringInvalidationRegistration();
return res;
}
@@ -312,8 +352,7 @@ public:
return genError("Paging is activated and already finished. No need for more queries.");
}
}
Sender intelligence_server(intelligence_req);
auto response = intelligence_server.sendIntelligenceRequest();
auto response = sendIntelligenceRequest(intelligence_req);
return response;
}
@@ -326,86 +365,163 @@ public:
private:
bool
hasLocalInvalidationSupport() const
hasLocalIntelligenceSupport() const
{
auto is_supported = getProfileAgentSettingWithDefault<bool>(false, "agent.config.useLocalIntelligence");
if (getProfileAgentSettingWithDefault<bool>(false, "agent.config.useLocalIntelligence")) return true;
if (!is_supported) {
is_supported = getProfileAgentSettingWithDefault<bool>(false, "agent.config.supportInvalidation");
auto crowsec_env = getenv("CROWDSEC_ENABLED");
bool crowdsec_enabled = crowsec_env != nullptr && string(crowsec_env) == "true";
if (getProfileAgentSettingWithDefault<bool>(crowdsec_enabled, "layer7AccessControl.crowdsec.enabled")) {
return true;
}
if (!is_supported) {
is_supported = getConfigurationWithDefault(false, "intelligence", "support Invalidation");
if (getProfileAgentSettingWithDefault<bool>(false, "agent.config.supportInvalidation")) return true;
dbgTrace(D_INTELLIGENCE) << "Local intelligence not supported";
return false;
}
template <typename IntelligenceRest>
Maybe<Response>
sendIntelligence(const IntelligenceRest &rest_req) const
{
dbgFlow(D_INTELLIGENCE) << "Sending intelligence request";
auto res = sendLocalIntelligenceToLocalServer(rest_req);
if (res.ok()) return res;
return sendGlobalIntelligence(rest_req);
}
template <typename IntelligenceRest>
Maybe<Response>
sendLocalIntelligenceToLocalServer(const IntelligenceRest &rest_req) const
{
dbgFlow(D_INTELLIGENCE) << "Sending local intelligence request";
if (!hasLocalIntelligenceSupport()) {
dbgDebug(D_INTELLIGENCE) << "Local intelligence not supported";
return genError("Local intelligence not configured");
}
return is_supported;
}
bool
sendLocalInvalidation(const Invalidation &invalidation) const
{
dbgFlow(D_INTELLIGENCE) << "Starting local invalidation";
return sendLocalInvalidationImpl(invalidation) || sendGlobalInvalidation(invalidation);
}
bool
sendLocalInvalidationImpl(const Invalidation &invalidation) const
{
auto server = getSetting<string>("intelligence", "local intelligence server ip");
if (!server.ok()) {
dbgWarning(D_INTELLIGENCE) << "Local intelligence server not configured";
return false;
dbgWarning(D_INTELLIGENCE) << "Local intelligence server ip not configured";
return genError("Local intelligence server ip not configured");
}
return
sendLocalInvalidationImpl(invalidation, *server, primary_port_setting) ||
sendLocalInvalidationImpl(invalidation, *server, secondary_port_setting);
auto res = sendLocalIntelligenceToLocalServer(rest_req, *server, primary_port_setting);
if (res.ok()) return res;
return sendLocalIntelligenceToLocalServer(rest_req, *server, secondary_port_setting);
}
bool
sendLocalInvalidationImpl(const Invalidation &invalidation, const string &server, const string &port_setting) const
template <typename IntelligenceRest>
Maybe<Response>
sendLocalIntelligenceToLocalServer(
const IntelligenceRest &rest_req,
const string &server,
const string &port_setting
) const
{
dbgFlow(D_INTELLIGENCE) << "Sending to local intelligence";
auto port = getSetting<uint>("intelligence", port_setting);
if (!port.ok()) {
dbgWarning(D_INTELLIGENCE) << "Could not resolve port for " << port_setting;
return false;
dbgWarning(D_INTELLIGENCE) << "Could not resolve port for " + port_setting;
return genError("Could not resolve port for " + port_setting);
}
dbgTrace(D_INTELLIGENCE)
<< "Invalidation value: "
<< (invalidation.genJson().ok() ? invalidation.genJson().unpack() : invalidation.genJson().getErr());
<< "Intelligence rest request value: "
<< (rest_req.genJson().ok() ? rest_req.genJson().unpack() : rest_req.genJson().getErr());
MessageMetadata invalidation_req_md(server, *port);
invalidation_req_md.insertHeaders(getHTTPHeaders());
invalidation_req_md.setConnectioFlag(MessageConnectionConfig::UNSECURE_CONN);
return message->sendSyncMessageWithoutResponse(
HTTPMethod::POST,
invalidation_uri,
invalidation,
MessageCategory::INTELLIGENCE,
invalidation_req_md
);
MessageMetadata req_md(server, *port);
req_md.insertHeaders(getHTTPHeaders());
req_md.setConnectioFlag(MessageConnectionConfig::UNSECURE_CONN);
return sendIntelligenceRequestImpl(rest_req, req_md);
}
bool
sendGlobalInvalidation(const Invalidation &invalidation) const
template <typename IntelligenceRest>
Maybe<Response>
sendGlobalIntelligence(const IntelligenceRest &rest_req) const
{
dbgFlow(D_INTELLIGENCE) << "Starting global invalidation";
dbgFlow(D_INTELLIGENCE) << "Sending global intelligence request";
dbgTrace(D_INTELLIGENCE)
<< "Invalidation value: "
<< (invalidation.genJson().ok() ? invalidation.genJson().unpack() : invalidation.genJson().getErr());
MessageMetadata global_invalidation_req_md;
global_invalidation_req_md.insertHeaders(getHTTPHeaders());
return message->sendSyncMessageWithoutResponse(
<< "Intelligence rest value: "
<< (rest_req.genJson().ok() ? rest_req.genJson().unpack() : rest_req.genJson().getErr());
MessageMetadata global_req_md;
global_req_md.insertHeaders(getHTTPHeaders());
return sendIntelligenceRequestImpl(rest_req, global_req_md);
}
Maybe<Response>
createResponse(const string &response_body, const IntelligenceRequest &query_request) const
{
Response response(response_body, query_request.getSize(), query_request.isBulk());
auto load_status = response.load();
if (load_status.ok()) return response;
dbgWarning(D_INTELLIGENCE) << "Could not create intelligence response.";
return load_status.passErr();
}
Maybe<Response>
sendIntelligenceRequestImpl(const Invalidation &invalidation, const MessageMetadata &local_req_md) const
{
dbgFlow(D_INTELLIGENCE) << "Sending intelligence invalidation";
auto res = message->sendSyncMessageWithoutResponse(
HTTPMethod::POST,
invalidation_uri,
invalidation,
MessageCategory::INTELLIGENCE,
global_invalidation_req_md
local_req_md
);
if (res) return Response();
dbgWarning(D_INTELLIGENCE) << "Could not send local intelligence invalidation.";
return genError("Could not send local intelligence invalidation");
}
Maybe<Response>
sendIntelligenceRequestImpl(
const InvalidationRegistration::RestCall &registration,
const MessageMetadata &registration_req_md
) const
{
dbgFlow(D_INTELLIGENCE) << "Sending intelligence invalidation registration";
auto res = message->sendSyncMessageWithoutResponse(
HTTPMethod::POST,
registration_uri,
registration,
MessageCategory::INTELLIGENCE,
registration_req_md
);
if (res) return Response();
dbgWarning(D_INTELLIGENCE) << "Could not send intelligence invalidation registration.";
return genError("Could not send intelligence invalidation registration");
}
Maybe<Response>
sendIntelligenceRequestImpl(const IntelligenceRequest &query_request, const MessageMetadata &global_req_md) const
{
dbgFlow(D_INTELLIGENCE) << "Sending intelligence query";
auto json_body = query_request.genJson();
if (!json_body.ok()) return json_body.passErr();
auto req_data = message->sendSyncMessage(
HTTPMethod::POST,
query_request.isBulk() ? queries_uri : query_uri,
*json_body,
MessageCategory::INTELLIGENCE,
global_req_md
);
if (!req_data.ok()) {
auto response_error = req_data.getErr().toString();
dbgWarning(D_INTELLIGENCE)
<< "Could not send intelligence query. "
<< req_data.getErr().getBody()
<< " "
<< req_data.getErr().toString();
return genError("Could not send intelligence query.");
} else if (req_data->getHTTPStatusCode() != HTTPStatusCode::HTTP_OK) {
dbgWarning(D_INTELLIGENCE) << "Invalid intelligence response: " << req_data->toString();
return genError(req_data->toString());
}
return createResponse(req_data->getBody(), query_request);
}
map<string, string>
@@ -423,66 +539,26 @@ private:
return headers;
}
bool
sendRegistration(const Invalidation &invalidation) const
{
InvalidationRegistration registration;
registration.addInvalidation(invalidation);
return sendLocalRegistrationImpl(registration.genJson());
}
bool
sendLocalRegistrationImpl(const InvalidationRegistration::RestCall &registration) const
{
auto server = getSetting<string>("intelligence", "local intelligence server ip");
if (!server.ok()) {
dbgWarning(D_INTELLIGENCE) << "Local intelligence server not configured";
return false;
}
return
sendLocalRegistrationImpl(registration, *server, primary_port_setting) ||
sendLocalRegistrationImpl(registration, *server, secondary_port_setting);
}
bool
sendLocalRegistrationImpl(
const InvalidationRegistration::RestCall &registration,
const string &server,
const string &port_setting
) const
{
dbgFlow(D_INTELLIGENCE) << "Sending to local registration";
auto port = getSetting<uint>("intelligence", port_setting);
if (!port.ok()) {
dbgWarning(D_INTELLIGENCE) << "Could not resolve port for " << port_setting;
return false;
}
dbgTrace(D_INTELLIGENCE) << "Invalidation value: " << registration.genJson();
MessageMetadata registration_req_md(server, *port);
registration_req_md.setConnectioFlag(MessageConnectionConfig::UNSECURE_CONN);
return message->sendSyncMessageWithoutResponse(
HTTPMethod::POST,
registration_uri,
registration,
MessageCategory::INTELLIGENCE,
registration_req_md
);
}
void
sendReccurringInvalidationRegistration() const
sendRecurringInvalidationRegistration() const
{
if (!hasLocalInvalidationSupport() || invalidations.empty()) return;
if (invalidations.empty()) return;
sendLocalRegistrationImpl(invalidations.getRegistration());
sendLocalIntelligenceToLocalServer(invalidations.getRegistration());
}
Maybe<Response>
sendIntelligenceRequest(const IntelligenceRequest& req) const
{
PagingController paging;
while (paging.isMoreResponses(sendIntelligence(req), req));
return paging.getResponse();
}
InvalidationCallBack invalidations;
I_Messaging *message = nullptr;
I_TimeGet *timer = nullptr;
I_MainLoop *mainloop = nullptr;
};

View File

@@ -8,6 +8,7 @@
#include "mock/mock_messaging.h"
#include "mock/mock_rest_api.h"
#include "mock/mock_time_get.h"
#include "mock/mock_agent_details.h"
#include "read_attribute_v2.h"
#include "singleton.h"
@@ -44,6 +45,9 @@ public:
addRecurringRoutine(I_MainLoop::RoutineType::System, chrono::microseconds(720000000), _, _, _)
).WillRepeatedly(Return(0));
EXPECT_CALL(mock_agent_details, getAgentId()).WillRepeatedly(Return("dummy_agent_id"));
EXPECT_CALL(mock_agent_details, getTenantId()).WillRepeatedly(Return("dummy_tenant_id"));
EXPECT_CALL(
mock_rest,
mockRestCall(_, "new-invalidation/source/invalidation", _)
@@ -62,6 +66,7 @@ public:
stringstream debug_output;
StrictMock<MockMainLoop> mock_ml;
StrictMock<MockRestApi> mock_rest;
StrictMock<MockAgentDetails> mock_agent_details;
NiceMock<MockTimeGet> mock_time;
::Environment env;
ConfigComponent conf;
@@ -411,6 +416,8 @@ TEST_F(IntelligenceComponentTestV2, fakeOnlineIntelligenceTest)
"}\n"
);
EXPECT_CALL(mock_rest, getListeningPort()).WillOnce(Return(8888));
MessageMetadata md;
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _)
).WillOnce(DoAll(SaveArg<4>(&md), Return(HTTPResponse(HTTPStatusCode::HTTP_OK, response_str))));
@@ -485,6 +492,8 @@ TEST_F(IntelligenceComponentTestV2, fakeLocalIntelligenceTest)
MessageMetadata md;
EXPECT_CALL(mock_rest, getListeningPort()).WillOnce(Return(8888));
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _)
).WillOnce(DoAll(SaveArg<4>(&md), Return(HTTPResponse(HTTPStatusCode::HTTP_OK, response_str))));
@@ -625,6 +634,8 @@ TEST_F(IntelligenceComponentTestV2, multiAssetsIntelligenceTest)
"}\n"
);
EXPECT_CALL(mock_rest, getListeningPort()).WillOnce(Return(8888));
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _)
).WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, response_str1)));
@@ -754,6 +765,8 @@ TEST_F(IntelligenceComponentTestV2, inProgressQueryTest)
"}\n"
);
EXPECT_CALL(mock_rest, getListeningPort()).Times(2).WillRepeatedly(Return(8888));
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _)
).WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, in_progress_response_str))
).WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, done_response_str)));
@@ -943,6 +956,8 @@ TEST_F(IntelligenceComponentTestV2, pagingQueryTest)
"}\n"
);
EXPECT_CALL(mock_rest, getListeningPort()).Times(3).WillRepeatedly(Return(8888));
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _)
).WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, paging_in_progress_response_str1)));
@@ -1126,6 +1141,8 @@ TEST_F(IntelligenceComponentTestV2, bulkOnlineIntelligenceTest)
"}\n"
);
Debug::setNewDefaultStdout(&cout);
EXPECT_CALL(mock_rest, getListeningPort()).WillOnce(Return(8888));
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _)
).WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, response_str)));
@@ -1289,6 +1306,8 @@ TEST_F(IntelligenceComponentTestV2, ignoreInProgressQueryTest_2)
"}\n"
);
EXPECT_CALL(mock_rest, getListeningPort()).Times(2).WillRepeatedly(Return(8888));
EXPECT_CALL(messaging_mock, sendSyncMessage(HTTPMethod::POST, _, _, MessageCategory::INTELLIGENCE, _))
.WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, paging_in_progress_response_str)))
.WillOnce(Return(HTTPResponse(HTTPStatusCode::HTTP_OK, paging_done_response_str)));

View File

@@ -86,6 +86,12 @@ public:
void resume(RoutineID id) override;
void
reloadConfigurationCb()
{
reload_configuration = true;
}
void
init()
{
@@ -120,7 +126,7 @@ public:
private:
void reportStartupEvent();
void stop(const RoutineMap::iterator &iter);
uint32_t getCurrentTimeSlice(uint32_t current_stress);
uint32_t getCurrentTimeSlice(uint32_t current_stress, int idle_time_slice, int busy_time_slice);
RoutineID getNextID();
I_TimeGet *
@@ -130,7 +136,14 @@ private:
return timer;
}
I_Environment *
getEnvironment() {
if (env == nullptr) env = Singleton::Consume<I_Environment>::by<MainloopComponent>();
return env;
}
I_TimeGet *timer = nullptr;
I_Environment *env = nullptr;
RoutineMap routines;
RoutineMap::iterator curr_iter = routines.end();
@@ -144,6 +157,7 @@ private:
chrono::seconds metric_report_interval;
MainloopEvent mainloop_event;
MainloopMetric mainloop_metric;
bool reload_configuration = false;
};
static I_MainLoop::RoutineType rounds[] = {
@@ -171,7 +185,7 @@ MainloopComponent::Impl::reportStartupEvent()
chrono::microseconds curr_time = Singleton::Consume<I_TimeGet>::by<MainloopComponent>()->getWalltime();
ReportIS::AudienceTeam audience_team = ReportIS::AudienceTeam::NONE;
auto i_env = Singleton::Consume<I_Environment>::by<MainloopComponent>();
auto i_env = getEnvironment();
auto team = i_env->get<ReportIS::AudienceTeam>("Audience Team");
if (team.ok()) audience_team = *team;
@@ -222,18 +236,27 @@ MainloopComponent::Impl::run()
const chrono::seconds one_sec(1);
string service_name = "Unnamed Nano Service";
auto name = Singleton::Consume<I_Environment>::by<MainloopComponent>()->get<string>("Service Name");
auto name = getEnvironment()->get<string>("Service Name");
if (name.ok()) service_name = *name;
string error_prefix = "Service " + service_name + " crashed. Error details: ";
string error;
int idle_time_slice = getConfigurationWithDefault<int>(1500, "Mainloop", "Idle routine time slice");
int busy_time_slice = getConfigurationWithDefault<int>(100, "Mainloop", "Busy routine time slice");
int exceed_warning_slice = getConfigurationWithDefault(100, "Mainloop", "Exceed Warning");
while (has_primary_routines) {
mainloop_event.setStressValue(current_stress);
int time_slice_to_use = getCurrentTimeSlice(current_stress);
if (reload_configuration) {
idle_time_slice = getConfigurationWithDefault<int>(1500, "Mainloop", "Idle routine time slice");
busy_time_slice = getConfigurationWithDefault<int>(100, "Mainloop", "Busy routine time slice");
exceed_warning_slice = getConfigurationWithDefault(100, "Mainloop", "Exceed Warning");
reload_configuration = false;
}
int time_slice_to_use = getCurrentTimeSlice(current_stress, idle_time_slice, busy_time_slice);
mainloop_event.setTimeSlice(time_slice_to_use);
chrono::microseconds basic_time_slice(time_slice_to_use);
chrono::milliseconds large_exceeding(getConfigurationWithDefault(100u, "Mainloop", "Exceed Warning"));
chrono::milliseconds large_exceeding(exceed_warning_slice);
auto start_time = getTimer()->getMonotonicTime();
has_primary_routines = false;
@@ -351,9 +374,9 @@ MainloopComponent::Impl::addOneTimeRoutine(
auto id = getNextID();
string routine_name = _routine_name.empty() ? string("Generic routine, id: " + to_string(id)) : _routine_name;
auto env = Singleton::Consume<I_Environment>::by<MainloopComponent>()->createEnvironment();
auto env = getEnvironment()->createEnvironment();
Routine func_wrapper = [this, env, func, routine_name] () mutable {
Singleton::Consume<I_Environment>::by<MainloopComponent>()->loadEnvironment(move(env));
getEnvironment()->loadEnvironment(move(env));
try {
if (this->do_stop) return;
@@ -446,9 +469,9 @@ MainloopComponent::Impl::yield(bool force)
if (do_stop) throw MainloopStop();
if (!force && getTimer()->getMonotonicTime() < stop_time) return;
auto env = Singleton::Consume<I_Environment>::by<MainloopComponent>()->saveEnvironment();
auto env = getEnvironment()->saveEnvironment();
curr_iter->second.yield();
Singleton::Consume<I_Environment>::by<MainloopComponent>()->loadEnvironment(move(env));
getEnvironment()->loadEnvironment(move(env));
if (do_stop) throw MainloopStop();
}
@@ -533,7 +556,7 @@ MainloopComponent::Impl::stop(const RoutineMap::iterator &iter)
if (iter->second.isActive()) {
dbgDebug(D_MAINLOOP) << "Stoping the routine " << iter->first;
do_stop = true;
auto env = Singleton::Consume<I_Environment>::by<MainloopComponent>()->saveEnvironment();
auto env = getEnvironment()->saveEnvironment();
RoutineMap::iterator save_routine = curr_iter;
curr_iter = iter;
// We are going to let the routine run one last time, so it can throw an exception which will cause the stack
@@ -560,10 +583,13 @@ MainloopComponent::Impl::getNextID()
void
MainloopComponent::Impl::updateCurrentStress(bool is_busy)
{
const int stress_factor = 6; // calculated by trial and error, should be revisited
const int ramp_stress_factor = 10; // calculated by trial and error, should be revisited
const int steady_stress_factor = 6; // calculated by trial and error, should be revisited
if (is_busy) {
if (current_stress < 95) {
current_stress += stress_factor;
if (current_stress < 50) {
current_stress += ramp_stress_factor;
} else if (current_stress < 95) {
current_stress += steady_stress_factor;
} else {
current_stress = 100;
}
@@ -573,10 +599,8 @@ MainloopComponent::Impl::updateCurrentStress(bool is_busy)
}
uint32_t
MainloopComponent::Impl::getCurrentTimeSlice(uint32_t current_stress)
MainloopComponent::Impl::getCurrentTimeSlice(uint32_t current_stress, int idle_time_slice, int busy_time_slice)
{
int idle_time_slice = getConfigurationWithDefault<int>(1000, "Mainloop", "Idle routine time slice");
int busy_time_slice = getConfigurationWithDefault<int>(1, "Mainloop", "Busy routine time slice");
return idle_time_slice - (((idle_time_slice - busy_time_slice) * current_stress) / 100);
}
@@ -598,4 +622,5 @@ MainloopComponent::preload()
registerExpectedConfiguration<int>("Mainloop", "Busy routine time slice");
registerExpectedConfiguration<uint>("Mainloop", "metric reporting interval");
registerExpectedConfiguration<uint>("Mainloop", "Exceed Warning");
registerConfigLoadCb([&] () { pimpl->reloadConfigurationCb(); });
}

View File

@@ -131,12 +131,12 @@ TEST_F(MainloopTest, basic_metrics_check)
"{\n"
" \"Metric\": \"Mainloop sleep time data\",\n"
" \"Reporting interval\": 600,\n"
" \"mainloopMaxTimeSliceSample\": 1000,\n"
" \"mainloopAvgTimeSliceSample\": 1000.0,\n"
" \"mainloopLastTimeSliceSample\": 1000,\n"
" \"mainloopMaxSleepTimeSample\": 1000,\n"
" \"mainloopAvgSleepTimeSample\": 1000.0,\n"
" \"mainloopLastSleepTimeSample\": 1000,\n"
" \"mainloopMaxTimeSliceSample\": 1500,\n"
" \"mainloopAvgTimeSliceSample\": 1500.0,\n"
" \"mainloopLastTimeSliceSample\": 1500,\n"
" \"mainloopMaxSleepTimeSample\": 1500,\n"
" \"mainloopAvgSleepTimeSample\": 1500.0,\n"
" \"mainloopLastSleepTimeSample\": 1500,\n"
" \"mainloopMaxStressValueSample\": 0,\n"
" \"mainloopAvgStressValueSample\": 0.0,\n"
" \"mainloopLastStressValueSample\": 0\n"

View File

@@ -91,8 +91,9 @@ public:
}
char buffer[128];
if (fgets(buffer, sizeof(buffer)-1, pipe) != nullptr) result += buffer;
if (do_yield && mainloop != nullptr) mainloop->yield();
bool did_get = fgets(buffer, sizeof(buffer)-1, pipe) != nullptr;
if (did_get) result += buffer;
if (do_yield && mainloop != nullptr) mainloop->yield(!did_get);
}
auto code = pclose(pipe) / 256;