mirror of
https://github.com/openappsec/openappsec.git
synced 2026-01-17 16:00:26 +03:00
Jan 06 2026 dev (#387)
* sync code * update code to support brotli * update code to support brotli * update code to support brotli * sync code * fix findBrotli * sync code * sync code * sync code * sync code --------- Co-authored-by: Ned Wright <nedwright@proton.me> Co-authored-by: Daniel Eisenberg <danielei@checkpoint.com>
This commit is contained in:
@@ -39,7 +39,7 @@ using namespace smartBIO;
|
||||
USE_DEBUG_FLAG(D_CONNECTION);
|
||||
|
||||
static const HTTPResponse sending_timeout(HTTPStatusCode::HTTP_UNKNOWN, "Failed to send all data in time");
|
||||
static const HTTPResponse receving_timeout(HTTPStatusCode::HTTP_UNKNOWN, "Failed to receive all data in time");
|
||||
static const HTTPResponse receiving_timeout(HTTPStatusCode::HTTP_UNKNOWN, "Failed to receive all data in time");
|
||||
static const HTTPResponse parsing_error(HTTPStatusCode::HTTP_UNKNOWN, "Failed to parse the HTTP response");
|
||||
static const HTTPResponse close_error(
|
||||
HTTPStatusCode::HTTP_UNKNOWN,
|
||||
@@ -271,18 +271,11 @@ private:
|
||||
return *details_ssl_dir;
|
||||
}
|
||||
|
||||
// Use detail_resolver to determine platform-specific certificate directory
|
||||
#if defined(alpine)
|
||||
string platform = "alpine";
|
||||
return "/etc/ssl/certs/";
|
||||
#else
|
||||
string platform = "linux";
|
||||
#endif
|
||||
|
||||
if (platform == "alpine") {
|
||||
return "/etc/ssl/certs/";
|
||||
}
|
||||
|
||||
return "/usr/lib/ssl/certs/";
|
||||
#endif
|
||||
}
|
||||
|
||||
Maybe<void>
|
||||
@@ -741,20 +734,54 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
auto receiving_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
|
||||
auto base_timeout_config = getProfileAgentSettingWithDefault<uint>(
|
||||
10,
|
||||
"agent.config.message.chunk.connection.timeout"
|
||||
);
|
||||
auto base_timeout = chrono::seconds(base_timeout_config); // 10 seconds between data chunks
|
||||
|
||||
auto global_timeout_config = getProfileAgentSettingWithDefault<uint>(
|
||||
600,
|
||||
"agent.config.message.global.connection.timeout"
|
||||
);
|
||||
auto global_timeout = chrono::seconds(global_timeout_config); // 600 seconds maximum for entire download
|
||||
|
||||
auto receiving_end_time = i_time->getMonotonicTime() + base_timeout;
|
||||
auto global_end_time = i_time->getMonotonicTime() + global_timeout;
|
||||
HTTPResponseParser http_parser;
|
||||
dbgTrace(D_CONNECTION) << "Sent the message, now waiting for response";
|
||||
dbgTrace(D_CONNECTION)
|
||||
<< "Sent the message, now waiting for response (global timeout: "
|
||||
<< global_timeout.count()
|
||||
<< " seconds)";
|
||||
|
||||
while (!http_parser.hasReachedError()) {
|
||||
// Check global timeout first
|
||||
if (i_time->getMonotonicTime() > global_end_time) {
|
||||
should_close_connection = true;
|
||||
dbgWarning(D_CONNECTION)
|
||||
<< "Global receive timeout reached after "
|
||||
<< global_timeout.count() << " seconds";
|
||||
return genError(receiving_timeout);
|
||||
}
|
||||
|
||||
// Check per-chunk timeout
|
||||
if (i_time->getMonotonicTime() > receiving_end_time) {
|
||||
should_close_connection = true;
|
||||
return genError(receving_timeout);
|
||||
};
|
||||
dbgWarning(D_CONNECTION) << "No data received for " << base_timeout.count() << " seconds";
|
||||
return genError(receiving_timeout);
|
||||
}
|
||||
|
||||
auto receieved = receiveData();
|
||||
if (!receieved.ok()) {
|
||||
should_close_connection = true;
|
||||
return receieved.passErr();
|
||||
}
|
||||
// Reset timeout each time we receive data
|
||||
if (!receieved.unpack().empty()) {
|
||||
receiving_end_time = i_time->getMonotonicTime() + base_timeout;
|
||||
}
|
||||
auto response = http_parser.parseData(*receieved, is_connect);
|
||||
|
||||
i_mainloop->yield(receieved.unpack().empty());
|
||||
if (response.ok()) {
|
||||
dbgTrace(D_MESSAGING) << printOut(response.unpack().toString());
|
||||
|
||||
@@ -48,6 +48,13 @@ public:
|
||||
return establishNewConnection(metadata, category);
|
||||
}
|
||||
|
||||
void
|
||||
clearConnections() override
|
||||
{
|
||||
dbgTrace(D_CONNECTION) << "Clearing all persistent connections";
|
||||
persistent_connections.clear();
|
||||
}
|
||||
|
||||
Maybe<Connection>
|
||||
getPersistentConnection(const string &host_name, uint16_t port, MessageCategory category) override
|
||||
{
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "rest.h"
|
||||
#include "rest_server.h"
|
||||
#include "dummy_socket.h"
|
||||
#include <atomic>
|
||||
|
||||
using namespace std;
|
||||
using namespace testing;
|
||||
@@ -100,6 +101,11 @@ TEST_F(TestConnectionComp, testSetAndGetConnection)
|
||||
EXPECT_EQ(get_conn.getConnKey().getHostName(), "127.0.0.1");
|
||||
EXPECT_EQ(get_conn.getConnKey().getPort(), 8080);
|
||||
EXPECT_EQ(get_conn.getConnKey().getCategory(), MessageCategory::LOG);
|
||||
|
||||
i_conn->clearConnections();
|
||||
maybe_get_connection = i_conn->getPersistentConnection("127.0.0.1", 8080, MessageCategory::LOG);
|
||||
ASSERT_FALSE(maybe_get_connection.ok());
|
||||
|
||||
}
|
||||
|
||||
TEST_F(TestConnectionComp, testEstablishNewConnection)
|
||||
@@ -279,19 +285,27 @@ TEST_F(TestConnectionComp, testSendRequestWithOneTimeFogConnection)
|
||||
auto req = HTTPRequest::prepareRequest(conn, HTTPMethod::POST, "/test", conn_metadata.getHeaders(), "test-body");
|
||||
ASSERT_TRUE(req.ok());
|
||||
|
||||
// Ensure we accept+respond exactly once regardless of yield overload order
|
||||
std::atomic<bool> responded{false};
|
||||
EXPECT_CALL(mock_mainloop, yield(A<std::chrono::microseconds>()))
|
||||
.WillOnce(
|
||||
InvokeWithoutArgs(
|
||||
[&]() {
|
||||
cerr << "accepting socket" << endl;
|
||||
dummy_socket.acceptSocket();
|
||||
dummy_socket.writeToSocket("HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\nmy-test");
|
||||
}
|
||||
)
|
||||
).WillRepeatedly(Return());
|
||||
.WillRepeatedly(InvokeWithoutArgs([&]() {
|
||||
if (!responded.exchange(true)) {
|
||||
cerr << "accepting socket" << endl;
|
||||
dummy_socket.acceptSocket();
|
||||
dummy_socket.writeToSocket("HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\nmy-test");
|
||||
}
|
||||
}));
|
||||
EXPECT_CALL(mock_mainloop, yield(A<bool>()))
|
||||
.WillRepeatedly(InvokeWithoutArgs([&]() {
|
||||
if (!responded.exchange(true)) {
|
||||
cerr << "accepting socket while receiving" << endl;
|
||||
dummy_socket.acceptSocket();
|
||||
dummy_socket.writeToSocket("HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\nmy-test");
|
||||
}
|
||||
}));
|
||||
|
||||
EXPECT_CALL(mock_timer, getMonotonicTime())
|
||||
.WillRepeatedly(Invoke([]() { static int j = 0; return chrono::microseconds(++j * 10); }));
|
||||
.WillRepeatedly(Invoke([]() { static int j = 0; return chrono::microseconds(++j * 1000 * 1000); }));
|
||||
|
||||
auto maybe_response = i_conn->sendRequest(conn, *req);
|
||||
if (!maybe_response.ok()) {
|
||||
|
||||
@@ -27,6 +27,7 @@ class I_MessagingConnection
|
||||
{
|
||||
public:
|
||||
virtual Maybe<Connection> establishConnection(const MessageMetadata &metadata, MessageCategory category) = 0;
|
||||
virtual void clearConnections() = 0;
|
||||
|
||||
virtual Maybe<Connection> getPersistentConnection(
|
||||
const std::string &host_name, uint16_t port, MessageCategory category
|
||||
|
||||
@@ -71,6 +71,7 @@ public:
|
||||
|
||||
bool setFogConnection(const std::string &host, uint16_t port, bool is_secure, MessageCategory category);
|
||||
bool setFogConnection(MessageCategory category);
|
||||
void clearConnections();
|
||||
|
||||
private:
|
||||
Maybe<Connection> getConnection(MessageCategory category, const MessageMetadata &message_metadata);
|
||||
@@ -96,6 +97,7 @@ private:
|
||||
I_MessageBuffer *i_messaging_buffer;
|
||||
I_AgentDetails *agent_details;
|
||||
bool should_buffer_failed_messages;
|
||||
std::string proxy_addr;
|
||||
TemporaryCache<std::string, HTTPResponse> fog_get_requests_cache;
|
||||
};
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ public:
|
||||
|
||||
MOCK_METHOD3(mockSendRequest, Maybe<HTTPResponse, HTTPResponse>(Connection &, HTTPRequest, bool));
|
||||
|
||||
MOCK_METHOD0(clearConnections, void());
|
||||
MOCK_METHOD3(getPersistentConnection, Maybe<Connection>(const string &, uint16_t, MessageCategory));
|
||||
MOCK_METHOD1(getFogConnectionByCategory, Maybe<Connection>(MessageCategory));
|
||||
};
|
||||
|
||||
@@ -97,6 +97,12 @@ public:
|
||||
return messaging_comp.setFogConnection(category);
|
||||
}
|
||||
|
||||
void
|
||||
clearConnections() override
|
||||
{
|
||||
messaging_comp.clearConnections();
|
||||
}
|
||||
|
||||
private:
|
||||
MessagingComp messaging_comp;
|
||||
ConnectionComponent connection_comp;
|
||||
@@ -119,7 +125,7 @@ void
|
||||
Messaging::preload()
|
||||
{
|
||||
registerExpectedConfiguration<int>("message", "Cache timeout");
|
||||
registerExpectedConfiguration<uint>("message", "Connection timeout");
|
||||
registerExpectedConfigurationWithCache<uint>("assetId", "message", "Connection timeout");
|
||||
registerExpectedConfiguration<uint>("message", "Connection handshake timeout");
|
||||
registerExpectedConfiguration<bool>("message", "Verify SSL pinning");
|
||||
registerExpectedConfiguration<bool>("message", "Buffer Failed Requests");
|
||||
|
||||
@@ -204,12 +204,15 @@ MessagingBufferComponent::Impl::pushNewBufferedMessage(
|
||||
Maybe<BufferedMessage>
|
||||
MessagingBufferComponent::Impl::peekMessage()
|
||||
{
|
||||
auto move_cmd =
|
||||
"if [ -s " + buffer_input + " ] && [ ! -s " + buffer_output + " ];"
|
||||
"then mv " + buffer_input + " " + buffer_output + ";"
|
||||
"fi";
|
||||
|
||||
shell_cmd->getExecOutput(move_cmd);
|
||||
// Native replacement for shell mv command
|
||||
struct stat stat_input, stat_output;
|
||||
bool input_exists = (stat(buffer_input.c_str(), &stat_input) == 0 && stat_input.st_size > 0);
|
||||
bool output_exists = (stat(buffer_output.c_str(), &stat_output) == 0 && stat_output.st_size > 0);
|
||||
if (input_exists && !output_exists) {
|
||||
if (rename(buffer_input.c_str(), buffer_output.c_str()) != 0) {
|
||||
dbgWarning(D_MESSAGING_BUFFER) << "Failed to move buffer input to output: " << strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
if (!checkExistence(buffer_output)) return genError(buffer_output + " does not exist");
|
||||
|
||||
|
||||
@@ -72,6 +72,11 @@ MessagingComp::init()
|
||||
auto i_time_get = Singleton::Consume<I_TimeGet>::by<Messaging>();
|
||||
auto cache_timeout = getConfigurationWithDefault<int>(40, "message", "Cache timeout");
|
||||
fog_get_requests_cache.startExpiration(chrono::seconds(cache_timeout), i_mainloop, i_time_get);
|
||||
proxy_addr = getConfigurationWithDefault<string>(
|
||||
getProfileAgentSettingWithDefault<string>("", "proxy.address"),
|
||||
"message",
|
||||
"Proxy Address"
|
||||
);
|
||||
|
||||
should_buffer_failed_messages = getConfigurationWithDefault<bool>(
|
||||
getProfileAgentSettingWithDefault<bool>(true, "eventBuffer.bufferFailedRequests"),
|
||||
@@ -125,7 +130,7 @@ MessagingComp::sendMessage(
|
||||
dbgWarning(D_MESSAGING) << "Failed to get connection. Error: " << maybe_conn.getErr();
|
||||
return genError<HTTPResponse>(HTTPStatusCode::HTTP_UNKNOWN, maybe_conn.getErr());
|
||||
}
|
||||
|
||||
|
||||
Connection conn = maybe_conn.unpack();
|
||||
if (message_metadata.shouldSuspend() && conn.isSuspended()) {
|
||||
return suspendMessage(body, method, uri, category, message_metadata);
|
||||
@@ -133,12 +138,11 @@ MessagingComp::sendMessage(
|
||||
|
||||
bool is_to_fog = isMessageToFog(message_metadata);
|
||||
auto metadata = message_metadata;
|
||||
|
||||
|
||||
if (is_to_fog) {
|
||||
if (method == HTTPMethod::GET && fog_get_requests_cache.doesKeyExists(uri)) {
|
||||
HTTPResponse res = fog_get_requests_cache.getEntry(uri);
|
||||
dbgTrace(D_MESSAGING) << "Response returned from Fog cache. res body: " << res.getBody();
|
||||
|
||||
return fog_get_requests_cache.getEntry(uri);
|
||||
}
|
||||
|
||||
@@ -197,7 +201,6 @@ MessagingComp::sendSyncMessage(
|
||||
)
|
||||
{
|
||||
Maybe<HTTPResponse, HTTPResponse> is_msg_send = sendMessage(method, uri, body, category, message_metadata);
|
||||
|
||||
if (is_msg_send.ok()) return *is_msg_send;
|
||||
|
||||
if (should_buffer_failed_messages && message_metadata.shouldBufferMessage()) {
|
||||
@@ -412,3 +415,10 @@ MessagingComp::suspendMessage(
|
||||
HTTPStatusCode::HTTP_SUSPEND, "The connection is suspended due to consecutive message sending errors."
|
||||
);
|
||||
}
|
||||
|
||||
void
|
||||
MessagingComp::clearConnections()
|
||||
{
|
||||
dbgTrace(D_MESSAGING) << "Clearing all connections (called from AgentDetails)";
|
||||
i_conn->clearConnections();
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ public:
|
||||
{
|
||||
EXPECT_CALL(mock_agent_details, getFogDomain()).WillRepeatedly(Return(string(fog_addr)));
|
||||
EXPECT_CALL(mock_agent_details, getFogPort()).WillRepeatedly(Return(fog_port));
|
||||
EXPECT_CALL(mock_agent_details, getProxy()).WillRepeatedly(Return(string("")));
|
||||
EXPECT_CALL(mock_agent_details, getOpenSSLDir()).WillRepeatedly(Return(string("/usr/lib/ssl/certs/")));
|
||||
EXPECT_CALL(mock_agent_details, getAccessToken()).WillRepeatedly(Return(string("accesstoken")));
|
||||
EXPECT_CALL(mock_agent_details, readAgentDetails()).WillRepeatedly(Return(true));
|
||||
@@ -262,6 +263,28 @@ operator==(const MessageMetadata &one, const MessageMetadata &two)
|
||||
one.isDualAuth() == two.isDualAuth();
|
||||
}
|
||||
|
||||
TEST_F(TestMessagingComp, testClearConnections)
|
||||
{
|
||||
setAgentDetails();
|
||||
|
||||
MessageCategory category = MessageCategory::GENERIC;
|
||||
MessageConnectionKey conn_key(fog_addr, fog_port, category);
|
||||
MessageMetadata metadata(fog_addr, fog_port, true);
|
||||
MessageProxySettings proxy_settings("7.7.7.7", "cred", 8080);
|
||||
metadata.setProxySettings(proxy_settings);
|
||||
Connection conn(conn_key, metadata);
|
||||
|
||||
EXPECT_CALL(mock_messaging_connection, establishConnection(metadata, category)).WillOnce(Return(conn));
|
||||
EXPECT_TRUE(messaging_comp.setFogConnection(category));
|
||||
|
||||
EXPECT_CALL(mock_messaging_connection, clearConnections()).Times(1);
|
||||
messaging_comp.clearConnections();
|
||||
|
||||
EXPECT_CALL(mock_messaging_connection, establishConnection(metadata, category)).WillOnce(Return(conn));
|
||||
EXPECT_TRUE(messaging_comp.setFogConnection(category));
|
||||
}
|
||||
|
||||
|
||||
TEST_F(TestMessagingComp, testSetFogConnection)
|
||||
{
|
||||
setAgentDetails();
|
||||
|
||||
Reference in New Issue
Block a user