Mar 21st 2024 update

This commit is contained in:
Ned Wright
2024-03-21 15:31:38 +00:00
parent 0d22790ebe
commit c20fa9f966
100 changed files with 3851 additions and 453 deletions

View File

@@ -38,6 +38,10 @@ USE_DEBUG_FLAG(D_CONNECTION);
static const HTTPResponse sending_timeout(HTTPStatusCode::HTTP_UNKNOWN, "Failed to send all data in time");
static const HTTPResponse receving_timeout(HTTPStatusCode::HTTP_UNKNOWN, "Failed to receive all data in time");
static const HTTPResponse parsing_error(HTTPStatusCode::HTTP_UNKNOWN, "Failed to parse the HTTP response");
static const HTTPResponse close_error(
HTTPStatusCode::HTTP_UNKNOWN,
"The previous request failed to receive a response. Closing the connection"
);
const string &
MessageConnectionKey::getHostName() const
@@ -125,6 +129,12 @@ public:
return key;
}
bool
shouldCloseConnection() const
{
return should_close_connection;
}
bool
isOverProxy() const
{
@@ -152,12 +162,12 @@ public:
if (establishConnection().ok()) {
dbgDebug(D_MESSAGING) << "Reestablish connection";
return true;
return false;
}
dbgWarning(D_MESSAGING) << "Reestablish connection failed";
active = genError(curr_time + chrono::seconds(300));
return false;
return true;
}
Maybe<void>
@@ -200,6 +210,7 @@ public:
<< key.getPort()
<< (isOverProxy() ? ", Over proxy: " + settings.getProxyHost() + ":" + to_string(key.getPort()) : "");
active = Maybe<void, chrono::seconds>();
should_close_connection = false;
return Maybe<void>();
}
@@ -443,25 +454,40 @@ private:
I_MainLoop *i_mainloop = Singleton::Consume<I_MainLoop>::by<Messaging>();
I_TimeGet *i_time = Singleton::Consume<I_TimeGet>::by<Messaging>();
auto conn_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
auto bio_connect = BIO_do_connect(bio.get());
uint attempts_count = 0;
while (i_time->getMonotonicTime() < conn_end_time) {
attempts_count++;
if (BIO_do_connect(bio.get()) > 0) {
if (isUnsecure() || isOverProxy()) return Maybe<void>();
return performHandshakeAndVerifyCert(i_time, i_mainloop);
}
auto conn_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
while (i_time->getMonotonicTime() < conn_end_time && bio_connect <= 0) {
if (!BIO_should_retry(bio.get())) {
auto curr_time = chrono::duration_cast<chrono::seconds>(i_time->getMonotonicTime());
active = genError(curr_time + chrono::seconds(60));
string bio_error = ERR_error_string(ERR_get_error(), nullptr);
return genError("Failed to connect (BIO won't retry!): " + bio_error);
return genError(
"Failed to connect to: " +
full_address +
", error: " +
bio_error +
". Connection suspended for 60 seconds");
}
if ((attempts_count % 10) == 0) i_mainloop->yield(true);
attempts_count++;
if (!isBioSocketReady()) {
i_mainloop->yield((attempts_count % 10) == 0);
continue;
}
bio_connect = BIO_do_connect(bio.get());
}
return genError("Failed to establish new connection to " + full_address + " after reaching timeout.");
if (bio_connect > 0) {
if (isUnsecure() || isOverProxy()) return Maybe<void>();
return performHandshakeAndVerifyCert(i_time, i_mainloop);
}
auto curr_time = chrono::duration_cast<chrono::seconds>(i_time->getMonotonicTime());
active = genError(curr_time + chrono::seconds(60));
return genError(
"Failed to establish new connection to: " +
full_address +
" after reaching timeout." +
" Connection suspended for 60 seconds");
}
Maybe<uint, HTTPResponse>
@@ -544,6 +570,7 @@ private:
Maybe<HTTPResponse, HTTPResponse>
sendAndReceiveData(const string &request, bool is_connect)
{
dbgFlow(D_CONNECTION) << "Sending and receiving data";
I_MainLoop *i_mainloop = Singleton::Consume<I_MainLoop>::by<Messaging>();
while (lock) {
i_mainloop->yield(true);
@@ -551,6 +578,11 @@ private:
lock = true;
auto unlock = make_scope_exit([&] () { lock = false; });
if (should_close_connection) {
dbgWarning(D_CONNECTION) << close_error.getBody();
return genError(close_error);
}
I_TimeGet *i_time = Singleton::Consume<I_TimeGet>::by<Messaging>();
auto sending_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
size_t data_left_to_send = request.length();
@@ -567,9 +599,15 @@ private:
HTTPResponseParser http_parser;
dbgTrace(D_CONNECTION) << "Sent the message, now waiting for response";
while (!http_parser.hasReachedError()) {
if (i_time->getMonotonicTime() > receiving_end_time) return genError(receving_timeout);
if (i_time->getMonotonicTime() > receiving_end_time) {
should_close_connection = true;
return genError(receving_timeout);
};
auto receieved = receiveData();
if (!receieved.ok()) return receieved.passErr();
if (!receieved.ok()) {
should_close_connection = true;
return receieved.passErr();
}
auto response = http_parser.parseData(*receieved, is_connect);
if (response.ok()) {
dbgTrace(D_MESSAGING) << printOut(response.unpack().toString());
@@ -611,6 +649,7 @@ private:
uint failed_attempts = 0;
bool lock = false;
bool should_close_connection = false;
};
Connection::Connection(const MessageConnectionKey &key, const MessageMetadata &metadata)
@@ -663,6 +702,12 @@ Connection::getConnKey() const
return pimpl->getConnKey();
}
bool
Connection::shouldCloseConnection() const
{
return pimpl->shouldCloseConnection();
}
bool
Connection::isOverProxy() const
{

View File

@@ -53,6 +53,10 @@ public:
{
auto conn = persistent_connections.find(MessageConnectionKey(host_name, port, category));
if (conn == persistent_connections.end()) return genError("No persistent connection found");
if (conn->second.shouldCloseConnection()) {
persistent_connections.erase(conn);
return genError("The connection needs to reestablish");
}
return conn->second;
}
@@ -93,6 +97,7 @@ private:
if (!external_certificate.empty()) conn.setExternalCertificate(external_certificate);
auto connected = conn.establishConnection();
persistent_connections.emplace(conn_key, conn);
if (!connected.ok()) {
string connection_err = "Failed to establish connection. Error: " + connected.getErr();
@@ -101,7 +106,6 @@ private:
}
dbgTrace(D_CONNECTION) << "Connection establish succssesfuly";
persistent_connections.emplace(conn_key, conn);
return conn;
}

View File

@@ -23,83 +23,13 @@
#include "mock/mock_encryptor.h"
#include "rest.h"
#include "rest_server.h"
#include "dummy_socket.h"
using namespace std;
using namespace testing;
USE_DEBUG_FLAG(D_CONNECTION);
class DummySocket : Singleton::Consume<I_MainLoop>
{
public:
~DummySocket()
{
if (server_fd != -1) close(server_fd);
if (connection_fd != -1) close(connection_fd);
}
void
init()
{
server_fd = socket(AF_INET, SOCK_STREAM, 0);
dbgAssert(server_fd >= 0) << "Failed to open a socket";
int socket_enable = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &socket_enable, sizeof(int));
struct sockaddr_in addr;
bzero(&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(8080);
bind(server_fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
listen(server_fd, 100);
}
void
acceptSocket()
{
if (connection_fd == -1) connection_fd = accept(server_fd, nullptr, nullptr);
}
string
readFromSocket()
{
acceptSocket();
string res;
char buffer[1024];
while (int bytesRead = readRaw(buffer, sizeof(buffer))) {
res += string(buffer, bytesRead);
}
return res;
}
void
writeToSocket(const string &msg)
{
acceptSocket();
EXPECT_EQ(write(connection_fd, msg.data(), msg.size()), msg.size());
}
private:
int
readRaw(char *buf, uint len)
{
struct pollfd s_poll;
s_poll.fd = connection_fd;
s_poll.events = POLLIN;
s_poll.revents = 0;
if (poll(&s_poll, 1, 0) <= 0 || (s_poll.revents & POLLIN) == 0) return 0;
return read(connection_fd, buf, len);
}
int server_fd = -1;
int connection_fd = -1;
};
static ostream &
operator<<(ostream &os, const BufferedMessage &)
{
@@ -179,7 +109,6 @@ TEST_F(TestConnectionComp, testEstablishNewConnection)
conn_metadata.setExternalCertificate("external cert");
auto maybe_connection = i_conn->establishConnection(conn_metadata, MessageCategory::LOG);
cout << "[OREN] read: " << endl;
ASSERT_TRUE(maybe_connection.ok());
auto get_conn = maybe_connection.unpack();
EXPECT_EQ(get_conn.getConnKey().getHostName(), "127.0.0.1");
@@ -228,6 +157,43 @@ TEST_F(TestConnectionComp, testSendRequest)
EXPECT_EQ(dummy_socket.readFromSocket(), expected_msg);
}
TEST_F(TestConnectionComp, testCloseConnectionBeforeResponse)
{
// Create a connection
Flags<MessageConnectionConfig> conn_flags;
conn_flags.setFlag(MessageConnectionConfig::UNSECURE_CONN);
MessageMetadata conn_metadata("127.0.0.1", 8080, conn_flags);
// Insert the connection to the map
auto maybe_connection = i_conn->establishConnection(conn_metadata, MessageCategory::LOG);
ASSERT_TRUE(maybe_connection.ok());
// Get the connection from the map - Should be successful
auto maybe_get_connection = i_conn->getPersistentConnection("127.0.0.1", 8080, MessageCategory::LOG);
ASSERT_TRUE(maybe_get_connection.ok());
auto conn = maybe_get_connection.unpack();
auto req = HTTPRequest::prepareRequest(conn, HTTPMethod::POST, "/test", conn_metadata.getHeaders(), "test-body");
ASSERT_TRUE(req.ok());
// force the connection to be closed
ON_CALL(mock_mainloop, yield(false)).WillByDefault(InvokeWithoutArgs([&] () { return; }));
EXPECT_CALL(mock_timer, getMonotonicTime())
.WillRepeatedly(Invoke([] () {static int j = 0; return chrono::microseconds(++j * 1000 * 1000);}));
auto maybe_response = i_conn->sendRequest(conn, *req);
ASSERT_TRUE(!maybe_response.ok());
ASSERT_EQ(
maybe_response.getErr().toString(),
"[Status-code]: -1 - HTTP_UNKNOWN, [Body]: Failed to receive all data in time"
);
auto maybe_get_closed_connection = i_conn->getPersistentConnection("127.0.0.1", 8080, MessageCategory::LOG);
ASSERT_TRUE(!maybe_get_closed_connection.ok());
ASSERT_EQ(maybe_get_closed_connection.getErr(), "The connection needs to reestablish");
}
TEST_F(TestConnectionComp, testSendRequestReplyChunked)
{
Flags<MessageConnectionConfig> conn_flags;

View File

@@ -63,6 +63,7 @@ public:
bool isOverProxy() const;
bool isUnsecure() const;
bool isSuspended();
bool shouldCloseConnection() const;
Maybe<void> establishConnection();
Maybe<HTTPResponse, HTTPResponse> sendRequest(const std::string &request);

View File

@@ -0,0 +1,100 @@
// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __DUMMY_SOCKET_H__
#define __DUMMY_SOCKET_H__
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <thread>
#include <fcntl.h>
#include <poll.h>
#include "singleton.h"
#include "i_mainloop.h"
#include "agent_core_utilities.h"
class DummySocket : Singleton::Consume<I_MainLoop>
{
public:
~DummySocket()
{
if (server_fd != -1) close(server_fd);
if (connection_fd != -1) close(connection_fd);
}
void
init()
{
server_fd = socket(AF_INET, SOCK_STREAM, 0);
dbgAssert(server_fd >= 0) << "Failed to open a socket";
int socket_enable = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &socket_enable, sizeof(int));
struct sockaddr_in addr;
bzero(&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = htons(8080);
bind(server_fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
listen(server_fd, 100);
}
void
acceptSocket()
{
if (connection_fd == -1) connection_fd = accept(server_fd, nullptr, nullptr);
}
std::string
readFromSocket()
{
acceptSocket();
std::string res;
char buffer[1024];
while (int bytesRead = readRaw(buffer, sizeof(buffer))) {
res += std::string(buffer, bytesRead);
}
return res;
}
void
writeToSocket(const std::string &msg)
{
acceptSocket();
EXPECT_EQ(write(connection_fd, msg.data(), msg.size()), msg.size());
}
private:
int
readRaw(char *buf, uint len)
{
struct pollfd s_poll;
s_poll.fd = connection_fd;
s_poll.events = POLLIN;
s_poll.revents = 0;
if (poll(&s_poll, 1, 0) <= 0 || (s_poll.revents & POLLIN) == 0) return 0;
return read(connection_fd, buf, len);
}
int server_fd = -1;
int connection_fd = -1;
};
#endif // __DUMMY_SOCKET_H__

View File

@@ -50,7 +50,8 @@ public:
const std::string &uri,
const std::string &body,
MessageCategory category,
const MessageMetadata &message_metadata
const MessageMetadata &message_metadata,
bool force_buffering = true
);
Maybe<HTTPStatusCode, HTTPResponse> downloadFile(

View File

@@ -55,10 +55,11 @@ public:
const std::string &uri,
const std::string &body,
const MessageCategory category,
MessageMetadata message_metadata
const MessageMetadata &message_metadata,
bool force_buffering
) override
{
return messaging_comp.sendAsyncMessage(method, uri, body, category, message_metadata);
return messaging_comp.sendAsyncMessage(method, uri, body, category, message_metadata, force_buffering);
}
Maybe<HTTPStatusCode, HTTPResponse>

View File

@@ -153,8 +153,6 @@ MessagingBufferComponent::Impl::pushNewBufferedMessage(
{
dbgTrace(D_MESSAGING_BUFFER) << "Pushing new message to buffer";
message_metadata.setShouldBufferMessage(false);
if (!force_immediate_writing) {
dbgDebug(D_MESSAGING_BUFFER) << "Holding message temporarily in memory";
memory_messages.emplace_back(body, method, uri, category, message_metadata);
@@ -296,12 +294,14 @@ MessagingBufferComponent::Impl::sendMessage()
HTTPStatusCode
MessagingBufferComponent::Impl::sendMessage(const BufferedMessage &message) const
{
MessageMetadata message_metadata = message.getMessageMetadata();
message_metadata.setShouldBufferMessage(false);
auto res = messaging->sendSyncMessage(
message.getMethod(),
message.getURI(),
message.getBody(),
message.getCategory(),
message.getMessageMetadata()
message_metadata
);
if (res.ok()) return HTTPStatusCode::HTTP_OK;
@@ -316,7 +316,9 @@ MessagingBufferComponent::Impl::handleInMemoryMessages()
memory_messages.reserve(32);
for (const auto &message : messages) {
if (sendMessage(message) != HTTPStatusCode::HTTP_OK) writeToDisk(message);
if (sendMessage(message) != HTTPStatusCode::HTTP_OK) {
if (message.getMessageMetadata().shouldBufferMessage()) writeToDisk(message);
}
mainloop->yield();
}
}

View File

@@ -233,17 +233,36 @@ TEST_F(TestMessagingBuffer, testRoutinInMemory)
string body_1 = "body1";
string body_2 = "body2";
string body_3 = "body3";
string body_4 = "body4";
string uri_1 = "uri_1";
string uri_2 = "uri_2";
string uri_3 = "uri_3";
string uri_4 = "uri_4";
MessageCategory category = MessageCategory::GENERIC;
MessageMetadata message_metadata = MessageMetadata();
MessageMetadata msg_2_message_metadata = MessageMetadata();
msg_2_message_metadata.setShouldBufferMessage(true);
HTTPMethod method = HTTPMethod::POST;
buffer_provider->pushNewBufferedMessage(body_1, method, uri_1, category, message_metadata, false);
buffer_provider->pushNewBufferedMessage(body_2, method, uri_2, category, message_metadata, false);
buffer_provider->pushNewBufferedMessage(
body_2,
method,
uri_2,
category,
msg_2_message_metadata,
false
); // should be buffered
buffer_provider->pushNewBufferedMessage(body_3, method, uri_3, category, message_metadata, false);
buffer_provider->pushNewBufferedMessage(
body_4,
method,
uri_4,
category,
message_metadata,
false
); // shouldn't be buffered
HTTPResponse res(HTTPStatusCode::HTTP_OK, "");
Maybe<HTTPResponse, HTTPResponse> err = genError(res);
@@ -251,6 +270,7 @@ TEST_F(TestMessagingBuffer, testRoutinInMemory)
EXPECT_CALL(mock_messaging, sendSyncMessage(method, uri_1, body_1, _, _)).WillOnce(Return(res));
EXPECT_CALL(mock_messaging, sendSyncMessage(method, uri_2, body_2, _, _)).WillOnce(Return(err));
EXPECT_CALL(mock_messaging, sendSyncMessage(method, uri_3, body_3, _, _)).WillOnce(Return(res));
EXPECT_CALL(mock_messaging, sendSyncMessage(method, uri_4, body_4, _, _)).WillOnce(Return(err));
memory_routine();

View File

@@ -136,10 +136,13 @@ MessagingComp::sendAsyncMessage(
const string &uri,
const string &body,
MessageCategory category,
const MessageMetadata &message_metadata
const MessageMetadata &message_metadata,
bool force_buffering
)
{
i_messaging_buffer->pushNewBufferedMessage(body, method, uri, category, message_metadata, false);
MessageMetadata new_message_metadata = message_metadata;
new_message_metadata.setShouldBufferMessage(force_buffering);
i_messaging_buffer->pushNewBufferedMessage(body, method, uri, category, new_message_metadata, false);
}
Maybe<HTTPStatusCode, HTTPResponse>

View File

@@ -15,6 +15,7 @@
#include "mocks/mock_messaging_connection.h"
#include "rest.h"
#include "rest_server.h"
#include "dummy_socket.h"
using namespace std;
using namespace testing;
@@ -48,11 +49,13 @@ class TestMessagingComp : public testing::Test
public:
TestMessagingComp()
{
Debug::setUnitTestFlag(D_MESSAGING, Debug::DebugLevel::TRACE);
EXPECT_CALL(mock_time_get, getMonotonicTime()).WillRepeatedly(Return(chrono::microseconds(0)));
ON_CALL(mock_agent_details, getFogDomain()).WillByDefault(Return(Maybe<string>(fog_addr)));
ON_CALL(mock_agent_details, getFogPort()).WillByDefault(Return(Maybe<uint16_t>(fog_port)));
messaging_comp.init();
dummy_socket.init();
}
void
@@ -70,8 +73,8 @@ public:
EXPECT_CALL(mock_proxy_conf, getProxyAuthentication(_)).WillRepeatedly(Return(string("cred")));
}
const string fog_addr = "1.2.3.4";
uint16_t fog_port = 80;
const string fog_addr = "127.0.0.1";
int fog_port = 8080;
CPTestTempfile agent_details_file;
MessagingComp messaging_comp;
::Environment env;
@@ -82,6 +85,7 @@ public:
NiceMock<MockTimeGet> mock_time_get;
NiceMock<MockAgentDetails> mock_agent_details;
NiceMock<MockProxyConfiguration> mock_proxy_conf;
DummySocket dummy_socket;
};
TEST_F(TestMessagingComp, testInitComp)
@@ -100,16 +104,19 @@ TEST_F(TestMessagingComp, testSendSyncMessage)
HTTPMethod method = HTTPMethod::POST;
string uri = "/test-uri";
MessageCategory category = MessageCategory::GENERIC;
MessageMetadata message_metadata;
MessageConnectionKey conn_key(fog_addr, fog_port, MessageCategory::GENERIC);
Connection conn(conn_key, message_metadata);
Flags<MessageConnectionConfig> conn_flags;
conn_flags.setFlag(MessageConnectionConfig::UNSECURE_CONN);
MessageMetadata conn_metadata(fog_addr, fog_port, conn_flags, false, true);
Connection conn(conn_key, conn_metadata);
EXPECT_CALL(mock_messaging_connection, getFogConnectionByCategory(MessageCategory::GENERIC))
.WillOnce(Return(conn));
HTTPResponse res(HTTPStatusCode::HTTP_OK, "response!!");
EXPECT_CALL(mock_messaging_connection, mockSendRequest(_, _, _)).WillOnce(Return(res));
auto sending_res = messaging_comp.sendSyncMessage(method, uri, body, category, message_metadata);
auto sending_res = messaging_comp.sendSyncMessage(method, uri, body, category, conn_metadata);
ASSERT_TRUE(sending_res.ok());
HTTPResponse http_res = sending_res.unpack();
EXPECT_EQ(http_res.getBody(), "response!!");
@@ -126,7 +133,7 @@ TEST_F(TestMessagingComp, testSendAsyncMessage)
MessageMetadata message_metadata;
EXPECT_CALL(mock_messaging_buffer, pushNewBufferedMessage(body, method, uri, category, _, _)).Times(1);
messaging_comp.sendAsyncMessage(method, uri, body, category, message_metadata);
messaging_comp.sendAsyncMessage(method, uri, body, category, message_metadata, true);
}
TEST_F(TestMessagingComp, testSendSyncMessageOnSuspendedConn)
@@ -163,16 +170,18 @@ TEST_F(TestMessagingComp, testUploadFile)
setAgentDetails();
string uri = "/test-uri";
MessageCategory category = MessageCategory::GENERIC;
MessageMetadata message_metadata;
MessageConnectionKey conn_key(fog_addr, fog_port, MessageCategory::GENERIC);
Connection conn(conn_key, message_metadata);
Flags<MessageConnectionConfig> conn_flags;
conn_flags.setFlag(MessageConnectionConfig::UNSECURE_CONN);
MessageMetadata conn_metadata(fog_addr, fog_port, conn_flags, false, true);
Connection conn(conn_key, conn_metadata);
EXPECT_CALL(mock_messaging_connection, getFogConnectionByCategory(MessageCategory::GENERIC))
.WillOnce(Return(conn));
HTTPResponse res(HTTPStatusCode::HTTP_OK, "");
EXPECT_CALL(mock_messaging_connection, mockSendRequest(_, _, _)).WillOnce(Return(res));
auto upload_res = messaging_comp.uploadFile(uri, path, category, message_metadata);
auto upload_res = messaging_comp.uploadFile(uri, path, category, conn_metadata);
ASSERT_TRUE(upload_res.ok());
EXPECT_EQ(upload_res.unpack(), HTTPStatusCode::HTTP_OK);
}
@@ -185,16 +194,18 @@ TEST_F(TestMessagingComp, testDownloadFile)
string uri = "/test-uri";
HTTPMethod method = HTTPMethod::GET;
MessageCategory category = MessageCategory::GENERIC;
MessageMetadata message_metadata;
MessageConnectionKey conn_key(fog_addr, fog_port, MessageCategory::GENERIC);
Connection conn(conn_key, message_metadata);
Flags<MessageConnectionConfig> conn_flags;
conn_flags.setFlag(MessageConnectionConfig::UNSECURE_CONN);
MessageMetadata conn_metadata(fog_addr, fog_port, conn_flags, false, true);
Connection conn(conn_key, conn_metadata);
EXPECT_CALL(mock_messaging_connection, getFogConnectionByCategory(MessageCategory::GENERIC))
.WillOnce(Return(conn));
HTTPResponse res(HTTPStatusCode::HTTP_OK, "");
EXPECT_CALL(mock_messaging_connection, mockSendRequest(_, _, _)).WillOnce(Return(res));
auto upload_res = messaging_comp.downloadFile(method, uri, "/tmp/test.txt", category, message_metadata);
auto upload_res = messaging_comp.downloadFile(method, uri, "/tmp/test.txt", category, conn_metadata);
ASSERT_TRUE(upload_res.ok());
EXPECT_EQ(upload_res.unpack(), HTTPStatusCode::HTTP_OK);
}