sync code

This commit is contained in:
Ned Wright
2024-12-29 12:13:27 +00:00
parent 96ce290e5f
commit 64ebf013eb
43 changed files with 1058 additions and 406 deletions

View File

@@ -98,6 +98,8 @@ public:
}
sni_hostname = metadata.getSniHostName();
dn_host_name = metadata.getDnHostName();
}
void
@@ -328,17 +330,24 @@ private:
SSL_set_hostflags(ssl_socket, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
auto host = key.getHostName().c_str();
if (!SSL_set1_host(ssl_socket, host)) {
return genError("Failed to set host name verification. Host: " + string(host));
auto dn = host;
auto sni = host;
if (dn_host_name.ok()) {
dn = dn_host_name->c_str();
}
dbgDebug(D_CONNECTION) << "Setting host DN: " << dn;
if (!SSL_set1_host(ssl_socket, dn)) {
return genError("Failed to set host name verification. Host: " + string(dn));
}
if (sni_hostname.ok()) {
host = sni_hostname->c_str();
sni = sni_hostname->c_str();
}
dbgDebug(D_CONNECTION) << "Setting TLS host name extension. Host: " << host;
if (!SSL_set_tlsext_host_name(ssl_socket, host)) {
return genError("Failed to set TLS host name extension. Host: " + string(host));
dbgDebug(D_CONNECTION) << "Setting TLS host name extension. Host: " << sni;
if (!SSL_set_tlsext_host_name(ssl_socket, sni)) {
return genError("Failed to set TLS host name extension. Host: " + string(sni));
}
return Maybe<void>();
@@ -698,6 +707,8 @@ private:
bool should_close_connection = false;
bool is_dual_auth = false;
Maybe<string> sni_hostname = genError<string>("Uninitialized");
Maybe<string> dn_host_name = genError<string>("Uninitialized");
};
Connection::Connection(const MessageConnectionKey &key, const MessageMetadata &metadata)
@@ -716,7 +727,7 @@ Connection::setProxySettings(const MessageProxySettings &settings)
map<string, string> headers;
auto i_encrypt = Singleton::Consume<I_Encryptor>::by<Messaging>();
if (!settings.getProxyAuth().empty()) {
headers["Proxy-Authorization"] = i_encrypt->base64Encode(settings.getProxyAuth());
headers["Proxy-Authorization"] = "Basic " + i_encrypt->base64Encode(settings.getProxyAuth());
}
auto req = HTTPRequest::prepareRequest(*this, HTTPMethod::CONNECT, "", headers, "");

View File

@@ -30,11 +30,15 @@ using namespace std;
USE_DEBUG_FLAG(D_MESSAGING_BUFFER);
#ifndef smb
static constexpr uint buffer_max_size_MB = 100;
static constexpr uint buffer_max_size_MB = 300;
#else
static constexpr uint buffer_max_size_MB = 3;
#endif
static const uint reservation_default_size = 32;
static const uint memory_messages_max_size_default = 1024;
static const uint additional_buffer_size_default = 128;
static bool
checkExistence(const string &path)
{
@@ -74,7 +78,7 @@ private:
void handleInMemoryMessages();
void writeToDisk(const BufferedMessage &message);
bool writeToDisk(const BufferedMessage &message);
static Maybe<uint32_t> seekStartOfMessage(FILE *file);
static bool readBytes(FILE *file, uint size_to_read, char *output_bytes);
@@ -94,6 +98,9 @@ private:
string buffer_output;
string buffer_root_path;
uint max_size_on_disk_MB = 0;
uint memory_messages_max_size = 0;
uint additional_buffer_size = 0;
uint memory_messages_reserve_size = reservation_default_size;
uint curr_no_retries = 0;
I_ShellCmd *shell_cmd = nullptr;
I_Encryptor *encryptor = nullptr;
@@ -105,6 +112,16 @@ void
MessagingBufferComponent::Impl::init()
{
max_size_on_disk_MB = getProfileAgentSettingWithDefault<uint>(buffer_max_size_MB, "eventBuffer.maxSizeOnDiskInMB");
memory_messages_max_size =
getProfileAgentSettingWithDefault<uint>(
memory_messages_max_size_default,
"eventBuffer.maxMemoryMessagesToStore"
);
additional_buffer_size =
getProfileAgentSettingWithDefault<uint>(
additional_buffer_size_default,
"eventBuffer.additionalBufferSize"
);
shell_cmd = Singleton::Consume<I_ShellCmd>::by<Messaging>();
encryptor = Singleton::Consume<I_Encryptor>::by<Messaging>();
mainloop = Singleton::Consume<I_MainLoop>::by<Messaging>();
@@ -121,7 +138,7 @@ MessagingBufferComponent::Impl::init()
string unique_id = instance_awareness->getInstanceID().ok() ? instance_awareness->getInstanceID().unpack() : "";
buffer_input = buffer_root_path + "/" + executable_name + unique_id + ".input";
buffer_output = buffer_root_path + "/" + executable_name + unique_id + ".output";
memory_messages.reserve(32);
memory_messages.reserve(memory_messages_reserve_size);
uint tmo = getConfigurationWithDefault<uint>(5, "message", "Send event retry in sec");
mainloop->addRecurringRoutine(
@@ -138,6 +155,26 @@ MessagingBufferComponent::Impl::init()
"Handling in-memory messages",
false
);
registerConfigLoadCb(
[this]() {
memory_messages_max_size =
getProfileAgentSettingWithDefault<uint>(
1000,
"eventBuffer.maxMemoryMessagesToStore"
);
max_size_on_disk_MB =
getProfileAgentSettingWithDefault<uint>(
buffer_max_size_MB,
"eventBuffer.maxSizeOnDiskInMB"
);
additional_buffer_size =
getProfileAgentSettingWithDefault<uint>(
100,
"eventBuffer.additionalBufferSize"
);
}
);
}
void
@@ -152,8 +189,10 @@ MessagingBufferComponent::Impl::pushNewBufferedMessage(
{
dbgTrace(D_MESSAGING_BUFFER) << "Pushing new message to buffer";
if (!force_immediate_writing) {
dbgDebug(D_MESSAGING_BUFFER) << "Holding message temporarily in memory";
if (!force_immediate_writing && memory_messages.size() < memory_messages_max_size + additional_buffer_size) {
dbgTrace(D_MESSAGING_BUFFER)
<< "Holding message temporarily in memory. Memory messages size: "
<< memory_messages.size();
memory_messages.emplace_back(body, method, uri, category, message_metadata);
return;
}
@@ -328,30 +367,80 @@ void
MessagingBufferComponent::Impl::handleInMemoryMessages()
{
auto messages = move(memory_messages);
memory_messages.reserve(32);
uint failed_messages = 0;
dbgDebug(D_MESSAGING_BUFFER) << "Handling " << to_string(messages.size()) <<" new in-memory messages";
for (const auto &message : messages) {
if (sendMessage(message) != HTTPStatusCode::HTTP_OK) {
if (message.getMessageMetadata().shouldBufferMessage()) writeToDisk(message);
memory_messages.reserve(memory_messages_reserve_size);
auto it = messages.begin();
for (; it != messages.end() && memory_messages.size() < memory_messages_max_size; ++it) {
if (sendMessage(*it) != HTTPStatusCode::HTTP_OK) {
if (it->getMessageMetadata().shouldBufferMessage()) {
if (!writeToDisk(*it)) ++failed_messages;
}
}
dbgTrace(D_MESSAGING_BUFFER)
<< "Processed "
<< (it - messages.begin() + 1)
<< " messages out of "
<< messages.size();
mainloop->yield();
}
if (it == messages.end()) {
memory_messages_reserve_size = reservation_default_size;
if (failed_messages > 0) {
dbgDebug(D_MESSAGING_BUFFER)
<< "Failed to handle "
<< to_string(failed_messages)
<< " messages out of "
<< to_string(messages.size());
}
return;
}
memory_messages_reserve_size =
min(memory_messages_reserve_size * 2, memory_messages_max_size + additional_buffer_size);
dbgDebug(D_MESSAGING_BUFFER) << "Heap buffer is full. Storing messages to disk";
auto it2 = messages.end() - 1;
do {
if (it2->getMessageMetadata().shouldBufferMessage() && !writeToDisk(*it2)) {
failed_messages += it2 - it + 1;
break;
}
} while(it2-- != it);
if (failed_messages > 0) {
dbgDebug(D_MESSAGING_BUFFER)
<< "Failed to handle "
<< to_string(failed_messages)
<< " messages out of "
<< to_string(messages.size());
}
}
void
bool
MessagingBufferComponent::Impl::writeToDisk(const BufferedMessage &message)
{
static uint full_buffer_failed_messages = 0;
auto serialized_message = message.toString();
if (!canWriteToDisk(serialized_message.size())) {
dbgWarning(D_MESSAGING_BUFFER) << "Buffer is full. Message will not be written to disk: " << message.getURI();
return;
full_buffer_failed_messages++;
if (full_buffer_failed_messages % 10 == 0) {
dbgWarning(D_MESSAGING_BUFFER)
<< "Buffer is full. "
<< full_buffer_failed_messages
<< " messages will not be written to disk";
}
dbgDebug(D_MESSAGING_BUFFER) << "Buffer is full. Message will not be written to disk: " << message.getURI();
return false;
}
full_buffer_failed_messages = 0;
ofstream file(buffer_input, ios::app);
if (!file.is_open()) {
dbgWarning(D_MESSAGING_BUFFER) << "Failed to open file for writing. File: " << buffer_input;
return;
return false;
}
uint32_t size = serialized_message.size();
@@ -359,6 +448,8 @@ MessagingBufferComponent::Impl::writeToDisk(const BufferedMessage &message)
file.write(reinterpret_cast<char *>(&size), sizeof(size));
char type = 0;
file.write(&type, 1);
return true;
}
Maybe<uint32_t>
@@ -538,7 +629,7 @@ MessagingBufferComponent::Impl::canWriteToDisk(size_t message_size) const
return true;
}
dbgWarning(D_MESSAGING_BUFFER)
dbgDebug(D_MESSAGING_BUFFER)
<< "Buffer size is full. Directry size: "
<< *maybe_directory_size
<< ", Message size: "

View File

@@ -282,3 +282,51 @@ TEST_F(TestMessagingBuffer, testRoutinInMemory)
msg = buffer_provider->peekMessage();
ASSERT_FALSE(msg.ok());
}
TEST_F(TestMessagingBuffer, testRoutinInMemoryOverflow)
{
string config_json =
"{"
" \"agentSettings\": [\n"
" {\n"
" \"id\": \"123\",\n"
" \"key\": \"eventBuffer.maxMemoryMessagesToStore\",\n"
" \"value\": \"5\"\n"
" },\n"
" {\n"
" \"id\": \"123\",\n"
" \"key\": \"eventBuffer.additionalBufferSize\",\n"
" \"value\": \"1\"\n"
" }]\n"
"}";
istringstream ss(config_json);
Singleton::Consume<Config::I_Config>::from(config)->loadConfiguration(ss);
MessageCategory category = MessageCategory::GENERIC;
MessageMetadata message_metadata = MessageMetadata();
message_metadata.setShouldBufferMessage(true);
HTTPMethod method = HTTPMethod::POST;
HTTPResponse res(HTTPStatusCode::HTTP_OK, "");
for (int i = 0; i < 6; i++) {
string body = "body" + to_string(i);
buffer_provider->pushNewBufferedMessage(body, method, "/" + to_string(i), category, message_metadata, false);
EXPECT_CALL(mock_messaging, sendSyncMessage(method, "/" + to_string(i), body, _, _)).WillOnce(Return(res));
}
for (int i = 0; i < 2; i++) {
string body = "body" + to_string(i);
buffer_provider->pushNewBufferedMessage(body, method, "/" + to_string(i), category, message_metadata, false);
}
memory_routine();
for (int i = 0; i < 2; i++) {
auto msg = buffer_provider->peekMessage();
ASSERT_TRUE(msg.ok());
buffer_provider->popMessage();
}
auto msg = buffer_provider->peekMessage();
ASSERT_FALSE(msg.ok());
}