First release of open-appsec source code

This commit is contained in:
roybarda
2022-10-26 19:33:19 +03:00
parent 3883109caf
commit a883352f79
1353 changed files with 276290 additions and 1 deletions

View File

@@ -0,0 +1,6 @@
include_directories(${ng_module_osrc_zlib_path}/include)
include_directories(${Boost_INCLUDE_DIRS})
add_library(messaging_buffer messaging_buffer.cc event_queue.cc bucket_manager.cc)
add_subdirectory(messaging_buffer_ut)

View File

@@ -0,0 +1,217 @@
// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "messaging_buffer/bucket_manager.h"
#include "messaging_buffer.h"
#include "debug.h"
#include "config.h"
using namespace std;
USE_DEBUG_FLAG(D_EVENT_BUFFER);
void
BucketManager::init(const string &_service_name)
{
dbgTrace(D_EVENT_BUFFER) << "Initializing Bucket Manager: Service name: " << _service_name;
encryptor = Singleton::Consume<I_Encryptor>::by<MessagingBuffer>();
instance_awareness = Singleton::Consume<I_InstanceAwareness>::by<MessagingBuffer>();
string log_files_prefix = getLogFilesPathConfig();
string buffer_dir_base_folder_setting = getProfileAgentSettingWithDefault<string>(
log_files_prefix + "/nano_agent/event_buffer",
"eventBuffer.baseFolder"
);
dbgTrace(D_EVENT_BUFFER) << "buffer dir base folder setting path: " << buffer_dir_base_folder_setting;
buffer_directory = getConfigurationWithDefault<string>(
buffer_dir_base_folder_setting,
"Event Buffer",
"base folder"
);
uint buffer_max_size_base_settings = getProfileAgentSettingWithDefault<uint>(
1000,
"eventBuffer.maxBufferSizeInMB"
);
buffer_max_size = getConfigurationWithDefault<uint>(
buffer_max_size_base_settings,
"Event Buffer",
"max buffer size in MB"
);
uint max_buffer_files_base_settings = getProfileAgentSettingWithDefault<uint>(10, "eventBuffer.maxBufferFiles");
max_buffer_files = getConfigurationWithDefault<uint>(
max_buffer_files_base_settings,
"Event Buffer",
"max buffer files"
);
service_name = _service_name;
management_file_path = resolveFilesName(buffer_directory + "/manager");
iterator.init(management_file_path, buffer_max_size/max_buffer_files);
}
void
BucketManager::fini()
{
dbgTrace(D_EVENT_BUFFER) << "Finalizing Bucket Manager";
iterator.fini();
for (auto &bucket : buckets) {
bucket.second.fini();
}
}
bool
BucketManager::doesExist(const bucketName &name)
{
dbgTrace(D_EVENT_BUFFER) << "Checking if bucket exists and containing data: Bucket name: " << name;
string base64_name = encryptor->base64Encode(name);
if (buckets.find(base64_name) == buckets.end()) {
string management_file = resolveFilesName(buffer_directory + "/" + base64_name);
buckets.emplace(
piecewise_construct,
forward_as_tuple(base64_name),
forward_as_tuple()
);
buckets[base64_name].init(management_file, buffer_max_size/max_buffer_files);
}
return !buckets[base64_name].isEmpty();
}
void
BucketManager::push(const bucketName &name, string &&data)
{
dbgTrace(D_EVENT_BUFFER) << "Pushing data into bucket: Bucket name: " << name;
string base64_name = encryptor->base64Encode(name);
if (buckets.find(base64_name) == buckets.end()) {
dbgTrace(D_EVENT_BUFFER) << "Bucket does not exist, creating new. Bucket name: " << name;
string management_file = resolveFilesName(buffer_directory + "/" + base64_name);
buckets.emplace(
piecewise_construct,
forward_as_tuple(base64_name),
forward_as_tuple()
);
buckets[base64_name].init(management_file, buffer_max_size/max_buffer_files);
}
string copy_name = base64_name;
string copy_data = encryptor->base64Encode(data);
buckets[base64_name].push(move(copy_data));
iterator.push(move(copy_name));
if (next_bucket.empty()) {
next_bucket = base64_name;
}
}
bool
BucketManager::handleNextBucket()
{
if (!next_bucket.empty()) {
const string &iterator_peek = iterator.peek();
if (next_bucket != iterator_peek) {
dbgWarning(D_EVENT_BUFFER)
<< "Invalid Iteration value, current iteration value does not equal to next bucket"
<< endl
<< "Current iteration value:"
<< iterator_peek
<< endl
<< "Next bucket value:"
<< next_bucket;
}
if (!iterator_peek.empty()) {
iterator.trim();
}
buckets[next_bucket].trim();
}
if (iterator.isEmpty()) {
next_bucket.clear();
dbgTrace(D_EVENT_BUFFER) << "Iteration bucket is empty";
return false;
}
const string &next_req_bucket = iterator.peek();
if (next_req_bucket.empty()) {
dbgDebug(D_EVENT_BUFFER)
<< "Next request within iteration bucket is empty, removing sent messages from file:"
<< management_file_path;
iterator.refreshBufferFile();
next_bucket.clear();
return false;
}
dbgDebug(D_EVENT_BUFFER)
<< "Next request within iteration bucket is :"
<< next_req_bucket;
string bucket_path = resolveFilesName(buffer_directory + "/" + next_req_bucket);
auto bucket = buckets.find(next_req_bucket);
if (bucket == buckets.end()) {
dbgDebug(D_EVENT_BUFFER)
<< "Next request bucket was not found within the manager. trying to load it, bucket: "
<< next_req_bucket;
buckets.emplace(
std::piecewise_construct,
std::forward_as_tuple(next_req_bucket),
std::forward_as_tuple()
);
buckets[next_req_bucket].init(bucket_path, buffer_max_size/max_buffer_files);
}
next_bucket = next_req_bucket;
return true;
}
bool
BucketManager::hasValue()
{
if (iterator.isEmpty()) {
dbgDebug(D_EVENT_BUFFER) << "Iterator is empty";
return false;
}
if (next_bucket.empty()) {
dbgDebug(D_EVENT_BUFFER) << "Next bucket is empty";
return handleNextBucket();
}
return true;
}
EventQueue &
BucketManager::peek()
{
dbgAssert(!next_bucket.empty()) << "Invalid call, handleNextBucket must be called before";
return buckets[next_bucket];
}
void
BucketManager::flush()
{
dbgTrace(D_EVENT_BUFFER) << "Flushing all data from the Bucket Manager";
iterator.flush();
for (auto &bucket : buckets) {
bucket.second.flush();
}
}
string
BucketManager::resolveFilesName(const string &file_name)
{
string new_name = file_name;
if (instance_awareness != nullptr) new_name = new_name + instance_awareness->getUniqueID("");
return new_name + service_name;
}

View File

@@ -0,0 +1,846 @@
// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "messaging_buffer/event_queue.h"
#include <string>
#include <cstdio>
#include <stdio.h>
#include <vector>
#include <algorithm>
#include <sys/stat.h>
#include <chrono>
#include <dirent.h>
#include "zlib.h"
#include "config.h"
#include "debug.h"
#include "messaging_buffer.h"
#include "i_mainloop.h"
using namespace std;
USE_DEBUG_FLAG(D_EVENT_BUFFER);
class MessagingBuffer;
const string EventQueueFile::zip_file_suffix = ".cpz";
static const size_t size_of_new_line = strlen("\n");
static const uint MB_in_bytes = 1<<20;
string
parseFilePath(int suffix, string file_path)
{
return suffix == -1 ? file_path : file_path + string(".") + to_string(suffix);
}
static void
performSafeYield()
{
auto env = Singleton::Consume<I_Environment>::by<MessagingBuffer>();
auto can_not_yield = env->get<bool>("Not part of coroutine");
if (can_not_yield.ok() && *can_not_yield) return;
Singleton::Consume<I_MainLoop>::by<MessagingBuffer>()->yield();
}
// If the program crashed during rotation, a tmp file was created without removing the old one
// Thus need to remove non tmp file and rename the tmp file
void
restoreTmpFile(const string &file_name)
{
dbgTrace(D_EVENT_BUFFER) << "Handling a temporary file during init. File: " << file_name;
string new_file_name = file_name.substr(0, strlen(file_name.c_str()) - strlen(".tmp"));
remove(new_file_name.c_str());
if (rename(file_name.c_str(), new_file_name.c_str()) != 0) {
dbgWarning(D_EVENT_BUFFER)
<< "Couldn't handle a temporary file during init. Couldn't rename: "
<< file_name
<< ", to: "
<< new_file_name
<< ". Errno: "
<< strerror(errno);
}
}
EventQueue::~EventQueue()
{
if (reader.is_open()) reader.close();
if (writer.is_open()) writer.close();
}
void
EventQueue::setReaderFileAndOpen(const EventQueueFile &file)
{
string file_path = parseFilePath(file.getSuffix(), file.getFilePath());
reader.open(file_path);
if (!reader.is_open() && ifstream(file_path).good()) {
dbgWarning(D_EVENT_BUFFER)
<< "Failed to open the file for read. File name: "
<< file_path
<< ". Errno: "
<< errno
<< ", Errno: "
<< strerror(errno);
return;
}
}
void
EventQueue::sortEventFilesBySuffix(std::vector<EventQueueFile> &tmp_vec)
{
sort(
tmp_vec.begin(),
tmp_vec.end(),
[](const EventQueueFile &first, const EventQueueFile &second)
{
return first.getSuffix() < second.getSuffix();
}
);
for (const EventQueueFile &file: tmp_vec) {
files.emplace_back(file);
}
for (auto &file: files) {
file.restoreNumberOfLines();
num_of_events_on_disk += file.getNumOfEvents();
size_on_disk += file.getFileSizeInBytes();
}
}
void
EventQueue::enforceMaxNumberOfFiles()
{
uint max_files_to_rotate = getProfileAgentSettingWithDefault<uint>(
10,
"eventBuffer.maxNumberOfEventQueueFiles"
);
while (files.size() >= max_files_to_rotate) {
performSafeYield();
string file_to_delete = files.back().getFilePath() + string(".") + to_string(files.back().getSuffix());
dbgDebug(D_EVENT_BUFFER)
<< "Event Queue passed the maximum number of files it should contain by "
<< files.size() - max_files_to_rotate
<< " files. Removing the file: "
<< file_to_delete
<< ". This action reduced the number of events on disk by "
<< files.back().getNumOfEvents()
<< " and reduced the events buffers' size of disk by "
<< files.back().getFileSizeInBytes()
<< " bytes.";
num_of_events_on_disk -= files.back().getNumOfEvents();
size_on_disk -= files.back().getFileSizeInBytes();
updateReadFile();
}
}
// In case the agent crashed or stopped, iterate over all files.
// if files with same path prefix exist, insert them to the list.
void
EventQueue::reloadEventsIntoList(const string &path) {
dbgFlow(D_EVENT_BUFFER) << "Trying to reload event buffer from persistent location. Path: " << path;
string dir_path;
if (path.find("/") != string::npos) {
dir_path = path.substr(0, path.find_last_of("/"));
} else {
dir_path = path;
}
dbgDebug(D_EVENT_BUFFER) << "Event queue directory to iterate: " << dir_path;
struct dirent *entry = nullptr;
DIR *directory = opendir(dir_path.c_str());
if (directory == nullptr) {
int orig_errno = errno;
dbgWarning(D_EVENT_BUFFER) << "Failed to open directory. Path: " << dir_path << ", Errno: " << orig_errno;
return;
}
vector<EventQueueFile> tmp_vec;
while ((entry = readdir(directory))) {
string entry_file_name = entry->d_name;
static const string curr_dir(".");
static const string parent_dir("..");
if (entry_file_name == curr_dir || entry_file_name == parent_dir) {
dbgTrace(D_EVENT_BUFFER)
<< "Skipping reload of events from irrelevant directory entries. Entry name: "
<< entry_file_name;
continue;
}
bool file_has_extension = entry_file_name.find(".") != string::npos;
if (!file_has_extension) {
dbgTrace(D_EVENT_BUFFER)
<< "Skipping reload of events who's entry lack extension. Entry name: "
<< entry_file_name
<< ", Path: "
<< path;
continue;
}
string file_extension = entry_file_name.substr(entry_file_name.find_last_of("."));
dbgDebug(D_EVENT_BUFFER) << "Event queue file current extension: " << file_extension;
if (file_extension == ".tmp") {
restoreTmpFile(entry_file_name);
continue;
}
bool is_compressed = file_extension == EventQueueFile::zip_file_suffix;
string base_name =
file_has_extension ?
entry_file_name.substr(0, entry_file_name.find_last_of(".")) :
entry_file_name;
if (is_compressed && base_name.find(".") != string::npos) {
file_extension = base_name.substr(base_name.find_last_of(".") + 1);
base_name = base_name.substr(0, base_name.find_last_of("."));
}
dbgDebug(D_EVENT_BUFFER)
<< "Trying to load event queue file from directory. File name: "
<< entry_file_name
<< ", does file has extension: "
<< (file_has_extension ? "true" : "false")
<< ", base name: "
<< base_name
<< ", is compressed: "
<< (is_compressed ? "true" : "false");
if (path.find(base_name) == string::npos) {
dbgTrace(D_EVENT_BUFFER)
<< "Skipping reload of events from irrelevant directory entries. Entry name: "
<< entry_file_name
<< ", Entry path: "
<< path
<< ", Entry file base name: "
<< base_name;
continue;
}
int max_files_to_rotate = getProfileAgentSettingWithDefault<int>(
10,
"eventBuffer.maxNumberOfEventQueueFiles"
);
EventQueueFile new_file(path, file_extension, is_compressed);
if (new_file.getSuffix() < max_files_to_rotate) {
dbgDebug(D_EVENT_BUFFER)
<< "Reloading file "
<< new_file.getFilePath()
<< " with suffix "
<< new_file.getSuffix();
tmp_vec.push_back(new_file);
} else {
dbgWarning(D_EVENT_BUFFER)
<< "File "
<< new_file.getFilePath()
<< " with suffix "
<< new_file.getSuffix()
<< " will not be reloaded due to limitation of maximum number of event queue files.";
}
}
sortEventFilesBySuffix(tmp_vec);
}
void
EventQueueFile::restoreNumberOfLines()
{
string tmp_name;
if (isCompressed()) {
string compressed_name = getFilePath() + "." + to_string(getSuffix()) + zip_file_suffix;
tmp_name = getFilePath() + "." + to_string(getSuffix());
decompress(compressed_name, tmp_name, false);
}
string line;
ifstream reader(parseFilePath(getSuffix(), getFilePath()));
while (getline(reader, line)) {
incFileSize(line.size() + size_of_new_line);
}
remove(tmp_name.c_str());
}
void
EventQueue::init(const string &path, uint max_buff_size)
{
dbgTrace(D_EVENT_BUFFER) << "Initializing Event Queue. Path: " << path << ", Max buffer size: " << max_buff_size;
max_size = max_buff_size;
files.emplace_front(EventQueueFile(path));
reloadEventsIntoList(path);
if (timer == nullptr) timer = Singleton::Consume<I_TimeGet>::by<MessagingBuffer>();;
dbgAssert(timer != nullptr) << "Failed to find the time component";
uint next_sync_in_sec_base_settings = getProfileAgentSettingWithDefault<uint>(
10,
"eventBuffer.syncToDiskFrequencyInSec"
);
next_sync_freq_in_sec =
timer->getMonotonicTime() +
chrono::seconds(getConfigurationWithDefault<uint>(
next_sync_in_sec_base_settings,
"Event Buffer",
"sync to disk frequency in sec"
));
setReaderFileAndOpen(files.back());
reader.seekg(0, ios::beg);
}
// if current reader file is empty, iterate over to the next one
Maybe<void>
EventQueue::refreshBufferFile()
{
if (read_events_on_disk == 0) {
dbgDebug(D_EVENT_BUFFER) << "Nothing to refresh: all events on the disk still pending";
return Maybe<void>();
}
if (!reader.is_open()) return genError("nothing to trim since the file is still unopened");
int num_of_events_to_transfare = 0;
uint64_t size_of_events_to_transfare = 0;
string line;
vector<string> file_content;
while (getline(reader, line)) {
performSafeYield();
file_content.push_back(line);
num_of_events_to_transfare++;
size_of_events_to_transfare += (line.size() + size_of_new_line);
}
reader.close();
string read_file = parseFilePath(files.back().getSuffix(), files.back().getFilePath());
string temp_file = read_file + ".tmp";
remove(temp_file.c_str());
writer.open(temp_file, ios_base::app);
if (!writer.is_open()) {
dbgWarning(D_EVENT_BUFFER)
<< "Failed to open the file for write (append): "
<< temp_file
<< ". Errno: "
<< errno
<< ", Errno: "
<< strerror(errno);
for (auto &line : file_content) {
performSafeYield();
read_cache_buff.push_back(line);
read_cache_size += line.size();
}
return genError("cannot open new cache file");
}
num_of_events_on_disk -= files.back().getNumOfEvents();
size_on_disk -= files.back().getFileSizeInBytes();
for (const string &single_event: file_content) {
performSafeYield();
writer << single_event << '\n';
num_of_events_on_disk++;
size_on_disk += (single_event.size() + size_of_new_line);
}
writer.close();
remove(read_file.c_str());
rename(temp_file.c_str(), read_file.c_str());
reader.open(read_file);
if (!reader.is_open()) return genError("failed to open cache file to skip cached events");
EventQueueFile updated_file{
files.back(),
num_of_events_to_transfare,
size_of_events_to_transfare
};
files.pop_back();
files.emplace_back(updated_file);
return Maybe<void>();
}
void
EventQueue::push(string &&event_data)
{
if (files.front().getFilePath() == "") {
dbgWarning(D_EVENT_BUFFER) << "Cannot save events to a non-existent file";
return;
}
event_data.erase(remove(event_data.begin(), event_data.end(), '\n'), event_data.end()); // remove all new-line
write_cache_size += event_data.size();
write_cache_buff.push_back(move(event_data)); // hold data in RAM in case write will fail
if (is_pending_rotate) {
dbgDebug(D_EVENT_BUFFER)
<< "Rotation pending. Accumulating events (write_cache_buff size="
<< write_cache_buff.size()
<< ")";
return;
}
uint cache_buff_max_size_base_settings = getProfileAgentSettingWithDefault<uint>(
100,
"eventBuffer.syncToDiskWriteCacheBufferSize"
);
uint cache_buff_max_size = getConfigurationWithDefault<uint>(
cache_buff_max_size_base_settings,
"Event Buffer",
"sync to disk write cache buffer size"
);
if (timer->getMonotonicTime() < next_sync_freq_in_sec && write_cache_buff.size() < cache_buff_max_size) {
dbgTrace(D_EVENT_BUFFER)
<< "Not writing event to disk because cache buffer is not full and time is before sync time interval ";
return;
}
uint next_sync_in_sec_base_settings = getProfileAgentSettingWithDefault<uint>(
10,
"eventBuffer.syncToDiskFrequencyInSec"
);
next_sync_freq_in_sec =
timer->getMonotonicTime() +
chrono::seconds(getConfigurationWithDefault<uint>(
next_sync_in_sec_base_settings,
"Event Buffer",
"sync to disk frequency in sec"
));
if (
files.front().getNumOfEvents() != 0 &&
getSizeMB(write_cache_size + files.front().getFileSizeInBytes()) >= max_size
) {
dbgTrace(D_EVENT_BUFFER) << "Event buffer queue reached max size, pending files rotation.";
is_pending_rotate = true;
Singleton::Consume<I_MainLoop>::by<MessagingBuffer>()->addOneTimeRoutine(
I_MainLoop::RoutineType::System,
[&] ()
{
dbgWarning(D_EVENT_BUFFER)
<< "Failed to buffer a message after reaching the maximum buffer size."
<< "Compressing the buffer and creating a new one.";
rotate();
files.push_front(EventQueueFile(files.front().getFilePath()));
dbgInfo(D_EVENT_BUFFER) << "Successfully appended new buffer to list";
is_pending_rotate = false;
},
"Event queue rotation",
false
);
return;
}
if (is_pending_write) {
dbgDebug(D_EVENT_BUFFER)
<< "Writing events pending. Accumulating events (write_cache_buff size="
<< write_cache_buff.size()
<< ")";
return;
}
is_pending_write = true;
Singleton::Consume<I_MainLoop>::by<MessagingBuffer>()->addOneTimeRoutine(
I_MainLoop::RoutineType::System,
[&] ()
{
writer.open(files.front().getFilePath(), ios_base::app);
if (!writer.is_open()) {
dbgWarning(D_EVENT_BUFFER)
<< "Failed to open the file for write (append):"
<< files.front().getFilePath()
<< ". Errno: "
<< errno
<< ", Errno: "
<< strerror(errno);
return;
}
for_each(
write_cache_buff.begin(),
write_cache_buff.end(),
[this](string &single_event)
{
size_t event_size = single_event.size();
write_cache_size -= event_size;
writer << single_event << '\n';
num_of_events_on_disk++;
files.front().incFileSize(event_size + size_of_new_line);
size_on_disk += (event_size + size_of_new_line);
performSafeYield();
}
);
write_cache_buff.clear();
writer.close();
is_pending_write = false;
},
"Event queue rotation",
false
);
}
Maybe<void>
EventQueue::writeCachesToFile()
{
vector<string> file_content(read_cache_buff.begin(), read_cache_buff.end());
if (num_of_events_on_disk > 0) {
reader.close();
reader.open(files.front().getFilePath());
if (!reader.is_open()) {
return genError("Failed to open the file for read: " + files.front().getFilePath());
}
string line;
reader.clear();
while (getline(reader, line)) {
file_content.push_back(line);
}
}
file_content.insert(file_content.end(), write_cache_buff.begin(), write_cache_buff.end());
string temp_file_name = files.front().getFilePath() + ".tmp";
writer.open(temp_file_name, ios_base::app|ios_base::out);
if (!writer.is_open()) {
return genError("Failed to open the file for write, file: " + temp_file_name);
}
int current_num_of_events = 0;
uint64_t current_size_of_events = 0;
for (const string &single_event: file_content) {
writer << single_event << '\n';
current_num_of_events++;
current_size_of_events = (single_event.size() + size_of_new_line);
}
writer.close();
remove(files.front().getFilePath().c_str()); // File possibly can be uncreated by this point
if (rename(temp_file_name.c_str(), files.front().getFilePath().c_str()) != 0) {
return genError("Error renaming temp file " + temp_file_name + " to " + files.front().getFilePath());
}
EventQueueFile new_file{files.front(), current_num_of_events, current_size_of_events};
files.pop_front();
files.emplace_front(new_file);
return Maybe<void>();
}
bool
EventQueue::isEmpty() const
{
return num_of_events_on_disk + read_events_on_disk + read_cache_buff.size() + write_cache_buff.size() == 0;
}
void
EventQueue::fini()
{
auto write_caches = writeCachesToFile();
if (!write_caches.ok()) {
dbgWarning(D_EVENT_BUFFER)
<< "Failed to write cache to file, Error: "
<< write_caches.getErr();
}
}
const string &
EventQueue::peek()
{
static const string error_reading = "";
if (isEmpty()) {
dbgDebug(D_EVENT_BUFFER)
<< "Number of events on disk: "
<< num_of_events_on_disk
<< endl
<< "Number of read events on disk: "
<< read_events_on_disk
<< endl
<< "Read cache size: "
<< read_cache_buff.size()
<< endl
<< "Write cache size: "
<< write_cache_buff.size();
dbgWarning(D_EVENT_BUFFER)
<< "Cannot peek at an empty queue. file: "
<< files.back().getFilePath();
return error_reading;
}
if (read_cache_buff.empty()) {
refreshReadBuff();
if (read_cache_buff.empty()) {
dbgDebug(D_EVENT_BUFFER) << "Read cache buffer is empty";
return error_reading;
}
}
return read_cache_buff.front();
}
void
EventQueue::refreshReadBuff()
{
if (files.empty()) {
dbgDebug(D_EVENT_BUFFER) << "Buffer files are empty";
return;
}
if (files.back().getNumOfEvents() == 0) {
updateReadFile();
if (files.empty() | (files.back().getNumOfEvents() == 0)) {
dbgDebug(D_EVENT_BUFFER) << "Buffered events file is empty.";
read_cache_buff.splice(read_cache_buff.begin(), write_cache_buff);
read_cache_size += write_cache_size;
write_cache_size = 0;
return;
}
}
if (!reader.is_open()) {
dbgTrace(D_EVENT_BUFFER)
<< "Buffered events file is closed trying to open it. file: "
<< files.back().getFilePath();
setReaderFileAndOpen(files.back());
}
uint cache_buff_max_size_base_settings = getProfileAgentSettingWithDefault<uint>(
100,
"eventBuffer.syncToDiskWriteCacheBufferSize"
);
uint cache_buff_max_size = getConfigurationWithDefault<uint>(
cache_buff_max_size_base_settings,
"Event Buffer",
"sync to disk write cache buffer size"
);
int counter = 0;
while (read_cache_buff.size() < cache_buff_max_size && counter < files.back().getNumOfEvents()) {
performSafeYield();
string line;
if (!getline(reader, line)) {
reader.clear();
break;
}
read_events_on_disk ++;
counter++;
read_cache_buff.push_back(line);
read_cache_size += line.size();
}
refreshBufferFile();
}
void
EventQueue::updateReadFile()
{
if (files.back().getSuffix() == -1) {
return;
}
if (!reader.is_open()) {
dbgTrace(D_EVENT_BUFFER)
<< "Buffered events file is closed trying to open it. file: "
<< files.back().getFilePath();
setReaderFileAndOpen(files.back());
}
string file_to_delete = files.back().getFilePath() + string(".") + to_string(files.back().getSuffix());
string new_file =
files.back().getSuffix() == 0 ?
files.back().getFilePath() :
files.back().getFilePath() + string(".") + to_string(files.back().getSuffix() - 1);
dbgDebug(D_EVENT_BUFFER)
<< "Updating the reader file. Current file: "
<< file_to_delete
<< ", New file: "
<< new_file;
reader.close();
files.pop_back();
remove(file_to_delete.c_str());
if (files.back().isCompressed()) files.back().decompress(new_file + files.back().zip_file_suffix, new_file);
reader.open(new_file);
if (!reader.is_open() && ifstream(new_file).good()) {
dbgWarning(D_EVENT_BUFFER)
<< "Failed to open the file for read: "
<< new_file
<< ". Errno: "
<< errno
<< ", Errno: "
<< strerror(errno);
return;
}
}
void
EventQueue::trim()
{
if (!read_cache_buff.empty()) {
read_cache_size -= read_cache_buff.front().size();
read_cache_buff.pop_front();
dbgTrace(D_EVENT_BUFFER) << "Removed first element in read cache buffer";
if (!read_cache_buff.empty()) return;
}
refreshReadBuff();
}
void
EventQueue::flush()
{
for (auto &file: files) {
string file_path = parseFilePath(file.getSuffix(), file.getFilePath());
remove(file_path.c_str());
}
write_cache_buff.clear();
read_cache_buff.clear();
size_on_disk = 0;
num_of_events_on_disk = 0;
write_cache_size = 0;
read_cache_size = 0;
read_events_on_disk = 0;
reader.close();
writer.close();
}
double
EventQueue::getSizeMB(double size_in_B) const
{
return size_in_B/MB_in_bytes;
}
void
EventQueue::rotate()
{
enforceMaxNumberOfFiles();
for_each(
files.rbegin(),
files.rend(),
[&](EventQueueFile &file)
{
file.handleCompression(files.size());
});
}
void
EventQueueFile::handleCompression(int list_length)
{
bool should_rename = true;
suffix++;
string old_name = suffix == 0 ? file_path : file_path + string(".") + to_string(suffix - 1);
string new_name = file_path + string(".") + to_string(suffix);
auto rename_on_exit = make_scope_exit(
[&]()
{
if (should_rename) rename(old_name.c_str(), new_name.c_str());
dbgTrace(D_EVENT_BUFFER)
<< "Renamed a file during rotation. Old file name: "
<< old_name
<< ". New file name: "
<< new_name;
}
);
if (suffix != list_length - 1) { // not the read file
new_name = new_name + zip_file_suffix;
if (is_compressed) {
old_name = old_name + zip_file_suffix;
return;
}
compress();
should_rename = false;
return;
}
if (is_compressed) {
old_name = old_name + zip_file_suffix;
decompress(old_name, new_name);
should_rename = false;
}
}
void
EventQueueFile::decompress(const string &infilename, const string &outfilename, bool remove_old)
{
gzFile infile = gzopen(infilename.c_str(), "rb");
FILE *outfile = fopen(outfilename.c_str(), "wb");
char buffer[128];
int num_read = 0;
while ((num_read = gzread(infile, buffer, sizeof(buffer))) > 0) {
fwrite(buffer, 1, num_read, outfile);
performSafeYield();
}
gzclose(infile);
fclose(outfile);
if (remove_old) {
remove(infilename.c_str());
is_compressed = false;
}
}
void
EventQueueFile::compress()
{
string infilename =
suffix == 0 ? file_path : file_path + string(".") + to_string(suffix - 1);
string outfilename = file_path + string(".") + to_string(suffix) + zip_file_suffix;
FILE *infile = fopen(infilename.c_str(), "rb");
gzFile outfile = gzopen(outfilename.c_str(), "wb");
char inbuffer[128];
int num_read = 0;
unsigned long total_read = 0;
while ((num_read = fread(inbuffer, 1, sizeof(inbuffer), infile)) > 0) {
total_read += num_read;
gzwrite(outfile, inbuffer, num_read);
performSafeYield();
}
fclose(infile);
gzclose(outfile);
dbgTrace(D_EVENT_BUFFER)
<< "After file compression: Read "
<< total_read
<< "bytes, Wrote "
<< getFileSizeInBytes()
<< "bytes, Compression factor "
<< ((1.0-getFileSizeInBytes()*1.0/total_read)*100.0);
remove(infilename.c_str());
is_compressed = true;
}
void
EventQueueFile::incFileSize(uint64_t size_to_add)
{
size_of_file += size_to_add;
num_of_events_in_file++;
}
EventQueueFile::EventQueueFile(
const string &file_location_path,
const string &file_extension_raw,
bool is_file_compressed)
{
dbgInfo(D_EVENT_BUFFER)
<< "Creating new event queue file. File's location path: "
<< file_location_path
<< ", File extension: "
<< file_extension_raw
<< "Is Compressed: "
<< (is_file_compressed ? "true" : "false");
file_path = file_location_path;
is_compressed = is_file_compressed;
string file_extension = file_extension_raw;
try {
if (!file_extension.empty() && file_extension.front() == '.') {
file_extension.erase(0, 1); // delete the '.' before the suffix
}
suffix = stoi(file_extension);
} catch (const exception &e) {
dbgWarning(D_EVENT_BUFFER)
<< "Error reloading event files. File: "
<< file_path
<< ", Error: "
<< e.what();
}
}

View File

@@ -0,0 +1,275 @@
// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "messaging_buffer.h"
#include <string>
#include <unordered_map>
#include <sstream>
#include <chrono>
#include <sys/stat.h>
#include "cereal/archives/json.hpp"
#include "messaging_buffer/event_queue.h"
#include "messaging_buffer/bucket_manager.h"
#include "maybe_res.h"
#include "config.h"
#include "debug.h"
USE_DEBUG_FLAG(D_EVENT_BUFFER);
using namespace std;
class MessagingBuffer::Impl
:
Singleton::Provide<I_MessagingBuffer>::From<MessagingBuffer>
{
public:
void init();
void fini();
Maybe<HTTPRequestEvent> peekRequest() override;
void popRequest() override;
void bufferNewRequest(const HTTPRequestEvent &request, bool is_rejected) override;
bool isPending(const HTTPRequestSignature &request) override;
void cleanBuffer() override;
private:
void loadConfig();
void initComponents();
void initRejectedQueue(const string &process_path);
Maybe<string> deserialize(const HTTPRequestEvent &req);
Maybe<HTTPRequestEvent> serialize(const string &data);
string buffer_directory = "";
string log_files_prefix = "";
I_TimeGet *timer = nullptr;
I_Encryptor *encryptor = nullptr;
I_InstanceAwareness *instance_awareness = nullptr;
BucketManager bucket_manager;
EventQueue rejected_events;
};
void
MessagingBuffer::Impl::init()
{
ScopedContext ctx;
ctx.registerValue<bool>("Not part of coroutine", true);
log_files_prefix = getLogFilesPathConfig();
dbgTrace(D_EVENT_BUFFER) << "Messaging buffer init, log files prefix: " << log_files_prefix;
I_Environment *env = Singleton::Consume<I_Environment>::by<MessagingBuffer>();
string process_path = env->get<string>("Executable Name").unpack();
string service_name = process_path.substr(process_path.find_last_of("/") + 1);
loadConfig();
initComponents();
mkdir(buffer_directory.c_str(), 0644);
bucket_manager.init(service_name);
initRejectedQueue(service_name);
}
void
MessagingBuffer::Impl::fini()
{
ScopedContext ctx;
ctx.registerValue<bool>("Not part of coroutine", true);
bucket_manager.fini();
}
Maybe<HTTPRequestEvent>
MessagingBuffer::Impl::peekRequest()
{
if (!bucket_manager.hasValue()) {
dbgDebug(D_EVENT_BUFFER) << "No data avaliable";
return genError("No data avaliable");
}
EventQueue &tmp = bucket_manager.peek();
if (tmp.isEmpty()) {
dbgDebug(D_EVENT_BUFFER) << "Next bucket returned empty queue";
return genError("No data available in empty bucket");
}
auto request = tmp.peek();
if (request.empty()) {
popRequest();
return genError("Request is empty, message is popped");
}
return serialize(encryptor->base64Decode(request));
}
void
MessagingBuffer::Impl::popRequest()
{
bucket_manager.handleNextBucket();
}
void
MessagingBuffer::Impl::bufferNewRequest(const HTTPRequestEvent &request, bool is_rejected)
{
auto raw_data = deserialize(request);
if (!raw_data.ok()) {
string dbg_msg =
"Cannot buffer the request. Error: " +
raw_data.getErr() +
". Request: "
+ request.getSignature();
dbgWarning(D_EVENT_BUFFER) << dbg_msg;
dbgDebug(D_EVENT_BUFFER)
<< dbg_msg
<< ", headers: "
<< request.getHeaders()
<< ", body: "
<< request.getBody();
return;
}
if (is_rejected) {
rejected_events.push(raw_data.unpackMove());
return;
}
string req_bucket_name = request.getSignature();
bucket_manager.push(req_bucket_name, raw_data.unpackMove());
}
bool
MessagingBuffer::Impl::isPending(const HTTPRequestSignature &request)
{
string req_bucket_name = request.getSignature();
return bucket_manager.doesExist(req_bucket_name);
}
void
MessagingBuffer::Impl::cleanBuffer()
{
bucket_manager.flush();
rejected_events.flush();
}
void
MessagingBuffer::Impl::initComponents()
{
encryptor = Singleton::Consume<I_Encryptor>::by<MessagingBuffer>();
instance_awareness = Singleton::Consume<I_InstanceAwareness>::by<MessagingBuffer>();
}
void
MessagingBuffer::Impl::loadConfig()
{
string base_folder_setting = getProfileAgentSettingWithDefault<string>(
log_files_prefix + "/nano_agent/event_buffer",
"eventBuffer.baseFolder"
);
buffer_directory = getConfigurationWithDefault<string>(
base_folder_setting,
"Event Buffer",
"base folder"
);
}
void
MessagingBuffer::Impl::initRejectedQueue(const string &service_name)
{
string buffer_dir_base_folder_setting = getProfileAgentSettingWithDefault<string>(
log_files_prefix + "/nano_agent/event_buffer",
"eventBuffer.baseFolder"
);
string buffer_directory = getConfigurationWithDefault<string>(
buffer_dir_base_folder_setting,
"Event Buffer",
"base folder"
);
uint buffer_max_size_base_settings = getProfileAgentSettingWithDefault<uint>(
1000,
"eventBuffer.maxBufferSizeInMB"
);
uint buffer_max_size = getConfigurationWithDefault<uint>(
buffer_max_size_base_settings,
"Event Buffer",
"max buffer size in MB"
);
uint max_buffer_files_base_settings = getProfileAgentSettingWithDefault<uint>(
10,
"eventBuffer.maxBufferFiles"
);
uint max_buffer_files = getConfigurationWithDefault<uint>(
max_buffer_files_base_settings,
"Event Buffer",
"max buffer files"
);
string service_file_name = instance_awareness->getUniqueID("") + service_name;
rejected_events.init(
buffer_directory + "/rejected_events" + service_file_name, buffer_max_size/max_buffer_files
);
}
Maybe<string>
MessagingBuffer::Impl::deserialize(const HTTPRequestEvent &req)
{
try {
stringstream out;
{
cereal::JSONOutputArchive out_ar(out);
req.save(out_ar);
}
return out.str();
} catch (cereal::Exception &e) {
return genError(e.what());
}
}
Maybe<HTTPRequestEvent>
MessagingBuffer::Impl::serialize(const string &data)
{
try {
HTTPRequestEvent req;
stringstream in;
in.str(data);
try {
cereal::JSONInputArchive in_ar(in);
req.load(in_ar);
} catch (cereal::Exception &e) {
return genError("JSON parsing failed: " + string(e.what()));
}
return req;
} catch (exception &e) {
return genError(e.what());
}
}
MessagingBuffer::MessagingBuffer()
:
Component("MessagingBuffer"),
pimpl(make_unique<Impl>())
{
}
void MessagingBuffer::init() { pimpl->init(); }
void MessagingBuffer::fini() { pimpl->fini(); }
MessagingBuffer::~MessagingBuffer() {}
void
MessagingBuffer::preload()
{
registerExpectedConfiguration<string>("Event Buffer", "base folder");
registerExpectedConfiguration<string>("Event Buffer", "base file name");
registerExpectedConfiguration<uint>("Event Buffer", "max buffer size in MB");
registerExpectedConfiguration<uint>("Event Buffer", "max buffer files");
registerExpectedConfiguration<uint>("Event Buffer", "sync to disk frequency in sec");
registerExpectedConfiguration<uint>("Event Buffer", "send event retry in sec");
}

View File

@@ -0,0 +1,6 @@
link_directories(${BOOST_ROOT}/lib)
include_directories(${ng_module_osrc_zlib_path}/include)
link_directories(${ng_module_osrc_zlib_path}/lib)
add_unit_test(messaging_buffer_ut "messaging_buffer_ut.cc" "singleton;environment;time_proxy;instance_awareness;messaging_buffer;-lz;encryptor;event_is;metric;-lboost_regex;-lboost_filesystem;-lcrypto")

View File

@@ -0,0 +1,755 @@
#include "messaging_buffer.h"
#include <chrono>
#include <vector>
#include <memory>
#include <boost/filesystem.hpp>
#include "singleton.h"
#include "environment.h"
#include "config.h"
#include "config_component.h"
#include "messaging_buffer/event_queue.h"
#include "messaging_buffer/http_request_event.h"
#include "cptest.h"
#include "mock/mock_instance_awareness.h"
#include "mock/mock_time_get.h"
#include "encryptor.h"
#include "mock/mock_mainloop.h"
using namespace testing;
using namespace std;
USE_DEBUG_FLAG(D_EVENT_BUFFER);
bool
operator==(const HTTPRequestEvent &a, const HTTPRequestEvent &b)
{
return
a.getBody() == b.getBody() &&
a.getHeaders() == b.getHeaders() &&
a.getMethod() == b.getMethod() &&
a.getURL() == b.getURL();
}
class MessagingBufferTest : public Test
{
public:
MessagingBufferTest()
{
mkdir("/tmp/event_buffer/", 0777);
instance_awareness_value = "ia";
process_name_value = "pn";
Debug::setNewDefaultStdout(&capture_debug);
EXPECT_CALL(instance_awareness, getUniqueID(_)).WillRepeatedly(Return(instance_awareness_value));
i_messaging_buffer = Singleton::Consume<I_MessagingBuffer>::from(messaging_buffer);
i_encryptor = Singleton::Consume<I_Encryptor>::from(encryptor);
env.preload();
}
void
init(bool proccess_name_value = true)
{
setConfiguration<string>("/tmp/event_buffer", "Event Buffer", "base folder");
string process_path = "";
if (proccess_name_value) process_path = "a/b/" + process_name_value;
Singleton::Consume<I_Environment>::from(env)->registerValue<string>("Executable Name", process_path);
messaging_buffer.init();
}
~MessagingBufferTest()
{
i_messaging_buffer->cleanBuffer();
Debug::setNewDefaultStdout(&cout);
boost::filesystem::path dir_path("/tmp/event_buffer/");
remove_all(dir_path);
rmdir(dir_path.filename().c_str());
}
I_MessagingBuffer *i_messaging_buffer;
ostringstream capture_debug;
string instance_awareness_value;
string process_name_value;
Encryptor encryptor;
I_Encryptor *i_encryptor;
StrictMock<MockTimeGet> timer;
NaggyMock<MockMainLoop> mock_mainloop;
StrictMock<MockInstanceAwareness> instance_awareness;
MessagingBuffer messaging_buffer;
private:
::Environment env;
ConfigComponent config;
};
TEST_F(MessagingBufferTest, doNothing)
{
}
TEST_F(MessagingBufferTest, init)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
}
TEST_F(MessagingBufferTest, popRequestFromEmpty)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
auto req = i_messaging_buffer->peekRequest();
EXPECT_FALSE(req.ok());
}
TEST_F(MessagingBufferTest, popRequestFromNonEmpty)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
auto empty_req = i_messaging_buffer->peekRequest();
EXPECT_FALSE(empty_req.ok());
HTTPRequestEvent req("0", "1", "2", "3");
i_messaging_buffer->bufferNewRequest(req);
auto req_1 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_1.ok());
i_messaging_buffer->popRequest();
auto req_2 = i_messaging_buffer->peekRequest();
EXPECT_FALSE(req_2.ok());
}
TEST_F(MessagingBufferTest, MultiRequestBuffering)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
HTTPRequestEvent req0123("0", "1", "2", "3");
i_messaging_buffer->bufferNewRequest(req0123);
HTTPRequestEvent req0124("0", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req0124);
HTTPRequestEvent req1124("1", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req1124);
auto req_1 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_1.ok());
i_messaging_buffer->popRequest();
auto req_2 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_2.ok());
i_messaging_buffer->popRequest();
auto req_3 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_3.ok());
i_messaging_buffer->popRequest();
EXPECT_EQ(req_1.unpack(), req0123);
EXPECT_EQ(req_2.unpack(), req0124);
EXPECT_EQ(req_3.unpack(), req1124);
}
TEST_F(MessagingBufferTest, isPendingTrue)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(Return(chrono::microseconds(10000000)));
init();
HTTPRequestEvent req0123("0", "1", "2", "3");
i_messaging_buffer->bufferNewRequest(req0123);
HTTPRequestEvent req0124("0", "1", "2", "4");
EXPECT_TRUE(i_messaging_buffer->isPending(req0124));
i_messaging_buffer->bufferNewRequest(req0124);
HTTPRequestEvent req1124("1", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req1124);
}
TEST_F(MessagingBufferTest, isPendingFalse)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(Return(chrono::microseconds(10000000)));
init();
HTTPRequestEvent req0123("0", "1", "2", "3");
i_messaging_buffer->bufferNewRequest(req0123);
HTTPRequestEvent req0124("0", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req0124);
HTTPRequestEvent req1124("1", "1", "2", "4");
EXPECT_FALSE(i_messaging_buffer->isPending(req1124));
i_messaging_buffer->bufferNewRequest(req1124);
}
TEST_F(MessagingBufferTest, noPopGivesSameRequest)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(Return(chrono::microseconds(10000000)));
init();
HTTPRequestEvent req0123("0", "1", "2", "3");
i_messaging_buffer->bufferNewRequest(req0123);
HTTPRequestEvent req0124("0", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req0124);
HTTPRequestEvent req1124("1", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req1124);
auto req_1 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_1.ok());
auto req_2 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_2.ok());
auto req_3 = i_messaging_buffer->peekRequest();
EXPECT_TRUE(req_3.ok());
EXPECT_EQ(req_1.unpack(), req0123);
EXPECT_EQ(req_2.unpack(), req0123);
EXPECT_EQ(req_3.unpack(), req0123);
}
TEST_F(MessagingBufferTest, nothingLeft)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
HTTPRequestEvent req0123("0", "1", "2", "3");
i_messaging_buffer->bufferNewRequest(req0123);
HTTPRequestEvent req0124("0", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req0124);
HTTPRequestEvent req1124("1", "1", "2", "4");
i_messaging_buffer->bufferNewRequest(req1124);
i_messaging_buffer->popRequest();
i_messaging_buffer->popRequest();
i_messaging_buffer->popRequest();
auto req_1 = i_messaging_buffer->peekRequest();
EXPECT_FALSE(req_1.ok());
}
TEST_F(MessagingBufferTest, hugeBuffering)
{
messaging_buffer.preload();
setConfiguration<uint>(0, "Event Buffer", "max buffer size in MB");
setConfiguration<uint>(1, "Event Buffer", "max buffer files");
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
return chrono::microseconds(10000001);
}
)
);
init();
const size_t events_size = 499;
vector<HTTPRequestEvent> events;
for (size_t i = 0; i < events_size; i++) {
events.push_back(HTTPRequestEvent("0", std::to_string(i/10), "2", std::to_string(i)));
i_messaging_buffer->bufferNewRequest(events[i]);
}
for (size_t i = 0; i < events_size; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
ASSERT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), events[i]);
}
events.clear();
for (size_t i = 0; i < events_size; i++) {
events.push_back(HTTPRequestEvent("0", std::to_string(i/10), "2", std::to_string(i)));
i_messaging_buffer->bufferNewRequest(events[i]);
}
for (size_t i = 0; i < events_size; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
ASSERT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), events[i]);
}
}
TEST_F(MessagingBufferTest, rejectedBufferOk)
{
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
I_MainLoop::Routine timer_routine;
EXPECT_CALL(mock_mainloop, addOneTimeRoutine(
I_MainLoop::RoutineType::System,
_,
_,
_
)).WillOnce(DoAll(SaveArg<1>(&timer_routine), Return(0)));
HTTPRequestEvent req("0", "1", "2", "2");
const size_t events_size = 3;
for (size_t i = 0; i < events_size; i++) {
i_messaging_buffer->bufferNewRequest(req, true);
}
timer_routine();
ifstream buffer_file(
"/tmp/event_buffer/rejected_events" + instance_awareness_value + process_name_value
);
ASSERT_TRUE(buffer_file.is_open());
string line;
vector<string> file_content;
while (getline(buffer_file, line)) {
file_content.push_back(line);
}
buffer_file.close();
ASSERT_FALSE(buffer_file.is_open());
for (auto content_line: file_content) {
HTTPRequestEvent rejected_req;
stringstream in;
in.str(content_line);
cereal::JSONInputArchive in_ar(in);
rejected_req.load(in_ar);
EXPECT_EQ(rejected_req, req);
}
}
TEST_F(MessagingBufferTest, startFromFile)
{
string event_as_string;
HTTPRequestEvent event("0", "1", "2", "3");
stringstream out;
{
cereal::JSONOutputArchive out_ar(out);
event.save(out_ar);
}
event_as_string = i_encryptor->base64Encode(out.str());
ofstream write_initial_file(
"/tmp/event_buffer/" + i_encryptor->base64Encode("01") + instance_awareness_value + process_name_value,
ios_base::app
);
ofstream manager_file(
"/tmp/event_buffer/manager" + instance_awareness_value + process_name_value, ios_base::app
);
ASSERT_TRUE(write_initial_file.is_open());
ASSERT_TRUE(manager_file.is_open());
for (int i = 0 ; i < 101 ; i++) {
write_initial_file << event_as_string << "\n";
manager_file << i_encryptor->base64Encode("01") << "\n";
}
write_initial_file.close();
ASSERT_FALSE(write_initial_file.is_open());
manager_file.close();
ASSERT_FALSE(manager_file.is_open());
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
return chrono::microseconds(10000001);
}
)
);
init();
for (int i = 0 ; i < 101 ; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
EXPECT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), event);
}
i_messaging_buffer->bufferNewRequest(event);
}
TEST_F(MessagingBufferTest, PushToBufferedFile)
{
messaging_buffer.preload();
setConfiguration<uint>(0, "Event Buffer", "max buffer size in MB");
setConfiguration<uint>(1, "Event Buffer", "max buffer files");
string event_as_string;
HTTPRequestEvent event("0", "1", "2", "3");
stringstream out;
{
cereal::JSONOutputArchive out_ar(out);
event.save(out_ar);
}
event_as_string = i_encryptor->base64Encode(out.str());
ofstream write_initial_file(
"/tmp/event_buffer/" + i_encryptor->base64Encode("01") + instance_awareness_value + process_name_value,
ios_base::app
);
ofstream manager_file(
"/tmp/event_buffer/manager" + instance_awareness_value + process_name_value, ios_base::app
);
ASSERT_TRUE(write_initial_file.is_open());
ASSERT_TRUE(manager_file.is_open());
for (int i = 0 ; i < 101 ; i++) {
write_initial_file << event_as_string << "\n";
manager_file << i_encryptor->base64Encode("01") << "\n";
}
write_initial_file.close();
ASSERT_FALSE(write_initial_file.is_open());
manager_file.close();
ASSERT_FALSE(manager_file.is_open());
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
return chrono::microseconds(10000001);
}
)
);
init();
i_messaging_buffer->bufferNewRequest(event);
for (int i = 0 ; i < 101 ; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
EXPECT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), event);
}
}
TEST_F(MessagingBufferTest, max_buffer_size)
{
messaging_buffer.preload();
setConfiguration<uint>(0, "Event Buffer", "max buffer size in MB");
setConfiguration<uint>(1, "Event Buffer", "max buffer files");
ostringstream capture_debug;
Debug::setNewDefaultStdout(&capture_debug);
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
static int counter = 1;
return chrono::microseconds(10000001 * (++counter));
}
)
);
init();
vector<I_MainLoop::Routine> timer_routines;
EXPECT_CALL(mock_mainloop, addOneTimeRoutine(
I_MainLoop::RoutineType::System,
_,
_,
_
)).WillRepeatedly(
WithArgs<1>(
Invoke(
[&](const I_MainLoop::Routine &routine)
{
timer_routines.push_back(routine);
return 0;
}
)
));
const size_t events_size = 3;
vector<HTTPRequestEvent> events;
for (size_t i = 0; i < events_size; i++) {
events.push_back(
HTTPRequestEvent(
"0",
to_string(i),
"00",
to_string(i)
)
);
i_messaging_buffer->bufferNewRequest(events[i]);
// Run all pending timers
for (auto &routine : timer_routines) routine();
timer_routines.clear();
}
EXPECT_THAT(
capture_debug.str(),
HasSubstr("Failed to buffer a message after reaching the maximum buffer size")
);
}
class MessagingBufferFiniTest : public Test
{
public:
MessagingBufferFiniTest()
{
messaging_buffer = make_unique<MessagingBuffer>();
mkdir("/tmp/event_buffer/", 0777);
instance_awareness_value = "ia";
process_name_value = "pn";
Debug::setUnitTestFlag(D_EVENT_BUFFER, Debug::DebugLevel::DEBUG);
EXPECT_CALL(instance_awareness, getUniqueID(_)).WillRepeatedly(Return(instance_awareness_value));
i_messaging_buffer = Singleton::Consume<I_MessagingBuffer>::from(*messaging_buffer);
i_encryptor = Singleton::Consume<I_Encryptor>::from(encryptor);
env.preload();
env.init();
}
void
init(bool proccess_name_value = true)
{
setConfiguration<string>("/tmp/event_buffer", "Event Buffer", "base folder");
string process_path = "";
if (proccess_name_value) process_path = "a/b/" + process_name_value;
Singleton::Consume<I_Environment>::from(env)->registerValue<string>("Executable Name", process_path);
messaging_buffer->init();
}
~MessagingBufferFiniTest()
{
i_messaging_buffer->cleanBuffer();
Debug::setUnitTestFlag(D_EVENT_BUFFER, Debug::DebugLevel::INFO);
Debug::setNewDefaultStdout(&cout);
boost::filesystem::path dir_path("/tmp/event_buffer/");
remove_all(dir_path);
rmdir(dir_path.filename().c_str());
}
void
preload()
{
messaging_buffer->preload();
}
void
release()
{
messaging_buffer->fini();
delete messaging_buffer.release();
messaging_buffer = make_unique<MessagingBuffer>();
i_messaging_buffer = Singleton::Consume<I_MessagingBuffer>::from(*messaging_buffer);
}
I_MessagingBuffer *i_messaging_buffer;
string instance_awareness_value;
string process_name_value;
Encryptor encryptor;
I_Encryptor *i_encryptor;
StrictMock<MockTimeGet> timer;
NiceMock<MockMainLoop> mock_mainloop;
StrictMock<MockInstanceAwareness> instance_awareness;
unique_ptr<MessagingBuffer> messaging_buffer;
private:
::Environment env;
ConfigComponent config;
};
TEST_F(MessagingBufferFiniTest, fini)
{
messaging_buffer->preload();
setConfiguration<uint>(1, "Event Buffer", "max buffer size in MB");
setConfiguration<uint>(1, "Event Buffer", "max buffer files");
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
return chrono::microseconds(10000001);
}
)
);
init();
const size_t events_size = 101;
vector<HTTPRequestEvent> events;
for (size_t i = 0; i < events_size; i++) {
events.push_back(HTTPRequestEvent("0", to_string(i), "2", to_string(i)));
i_messaging_buffer->bufferNewRequest(events[i]);
}
release();
init();
for (size_t i = 0; i < events_size; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
ASSERT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), events[i]);
}
}
static inline ostream &
operator<<(ostream &os, const HTTPRequestEvent &req)
{
return os
<< "Signature: "
<< req.getSignature()
<< ", Headers: "
<< req.getHeaders()
<< ", Body "
<< req.getBody();
}
TEST_F(MessagingBufferFiniTest, hugeBufferingDoubleInit)
{
messaging_buffer->preload();
setConfiguration<uint>(0, "Event Buffer", "max buffer size in MB");
setConfiguration<uint>(1, "Event Buffer", "max buffer files");
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
return chrono::microseconds(10000001);
}
)
);
init();
vector<I_MainLoop::Routine> timer_routines;
EXPECT_CALL(mock_mainloop, addOneTimeRoutine(
I_MainLoop::RoutineType::System,
_,
_,
_
)).WillRepeatedly(
WithArgs<1>(
Invoke(
[&](const I_MainLoop::Routine &routine)
{
timer_routines.push_back(routine);
return 0;
}
)
));
const size_t events_size = 499;
vector<HTTPRequestEvent> events;
for (size_t i = 0; i < events_size; i++) {
events.push_back(HTTPRequestEvent("0", "1", "2", std::to_string(i)));
i_messaging_buffer->bufferNewRequest(events[i]);
// Run all pending timers
for (auto &routine : timer_routines) routine();
timer_routines.clear();
}
release();
init();
for (size_t i = 0; i < events_size; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
ASSERT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), events[i]);
}
}
TEST_F(MessagingBufferFiniTest, initTempFile)
{
messaging_buffer->preload();
setConfiguration<uint>(0, "Event Buffer", "max buffer size in MB");
setConfiguration<uint>(1, "Event Buffer", "max buffer files");
EXPECT_CALL(timer, getMonotonicTime()).WillRepeatedly(
InvokeWithoutArgs(
[&]()
{
return chrono::microseconds(10000001);
}
)
);
init();
const size_t events_size = 1;
vector<HTTPRequestEvent> events;
for (size_t i = 0; i < events_size; i++) {
events.push_back(HTTPRequestEvent("0", "1", "2", "temp_file"));
i_messaging_buffer->bufferNewRequest(events[i]);
}
release();
std::ofstream outfile("/tmp/event_buffer/MDFidWZmZXJlZCBtZXNzYWdlcw==iapn.tmp");
string tmp_file =
"ewogICAgInRhZyI6ICJidWZmZXJlZCBtZXNzYWdlcyIsCiAgICAidmFsdWUwIjogIjAiLAo" \
"gICAgInZhbHVlMSI6ICIxIiwKICAgICJ2YWx1ZTIiOiAiMiIsCiAgICAidmFsdWUzIjogInRlbXBfZmlsZSIKfQ==";
outfile << tmp_file;
outfile.close();
init();
for (size_t i = 0; i < events_size; i++) {
auto req = i_messaging_buffer->peekRequest();
i_messaging_buffer->popRequest();
ASSERT_TRUE(req.ok());
EXPECT_EQ(req.unpack(), events[i]);
}
}