sync code

This commit is contained in:
Ned Wright
2025-08-08 11:06:28 +00:00
parent dd19bf6158
commit da20943c09
145 changed files with 4157 additions and 1016 deletions

View File

@@ -21,6 +21,9 @@
#include <fstream>
#include <sstream>
#include <string>
#include <thread>
#include <future>
#include <atomic>
#include "config.h"
#include "http_request.h"
@@ -73,6 +76,7 @@ enum class ConnectionFlags
{
UNSECURE,
ONE_TIME,
ASYNC_ONE_TIME,
IGNORE_SSL_VALIDATION,
PROXY,
@@ -87,6 +91,9 @@ public:
auto metadata_flags = metadata.getConnectionFlags();
if (metadata_flags.isSet(MessageConnectionConfig::UNSECURE_CONN)) flags.setFlag(ConnectionFlags::UNSECURE);
if (metadata_flags.isSet(MessageConnectionConfig::ONE_TIME_CONN)) flags.setFlag(ConnectionFlags::ONE_TIME);
if (metadata_flags.isSet(MessageConnectionConfig::ONE_TIME_FOG_CONN)) {
flags.setFlag(ConnectionFlags::ASYNC_ONE_TIME);
}
if (metadata_flags.isSet(MessageConnectionConfig::IGNORE_SSL_VALIDATION)) {
flags.setFlag(ConnectionFlags::IGNORE_SSL_VALIDATION);
}
@@ -99,7 +106,6 @@ public:
sni_hostname = metadata.getSniHostName();
dn_host_name = metadata.getDnHostName();
}
void
@@ -511,12 +517,29 @@ private:
BioConnectionStatus bio_connect = tryToBioConnect(full_address);
uint attempts_count = 0;
auto conn_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
auto maybe_is_orch = Singleton::Consume<I_Environment>::by<Messaging>()->get<bool>("Is Orchestrator");
auto is_orch = maybe_is_orch.ok() && *maybe_is_orch;
while (i_time->getMonotonicTime() < conn_end_time && bio_connect == BioConnectionStatus::SHOULD_RETRY) {
if (is_orch) { // fixing code for orch case due to stability concerns - should be removed
if (isBioSocketReady()) {
bio_connect = tryToBioConnect(full_address);
} else {
i_mainloop->yield((attempts_count % 10) == 0);
}
continue;
}
if (isBioSocketReady()) {
bio_connect = tryToBioConnect(full_address);
} else {
i_mainloop->yield((attempts_count % 10) == 0);
}
dbgTrace(D_CONNECTION)
<< "Connection to: "
<< full_address
<< " should retry. number of made attempts: "
<< ++attempts_count;
i_mainloop->yield(true);
}
if (bio_connect == BioConnectionStatus::SUCCESS) {
@@ -553,7 +576,8 @@ private:
int data_sent_len = BIO_write(bio.get(), curr_data_to_send, data_left_to_send);
if (data_sent_len >= 0) {
dbgTrace(D_CONNECTION) << "Sent " << data_sent_len << " bytes, out of: " << data_left_to_send << " bytes.";
dbgTrace(D_CONNECTION) << "Sent " << data_sent_len << " bytes, out of: " << data_left_to_send
<< " bytes (total remaining: " << data_left_to_send - data_sent_len << " bytes).";
return data_sent_len;
}
@@ -637,13 +661,60 @@ private:
I_TimeGet *i_time = Singleton::Consume<I_TimeGet>::by<Messaging>();
auto sending_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
size_t data_left_to_send = request.length();
atomic<bool> cancel_task{false};
while (data_left_to_send > 0) {
if (i_time->getMonotonicTime() > sending_end_time) return genError(sending_timeout);
auto send_size = sendData(request, data_left_to_send);
if (!send_size.ok()) return send_size.passErr();
data_left_to_send -= *send_size;
i_mainloop->yield(*send_size == 0);
// Use smaller chunks for one-time connections with yielding between chunks
if (flags.isSet(ConnectionFlags::ASYNC_ONE_TIME)) {
// Launch async task to send full request only
// note do not add debug logs inside the async task
// as it will cause a crash
auto task = async(launch::async, [this, request, &cancel_task]() -> Maybe<void, HTTPResponse> {
size_t remaining = request.length();
while (remaining > 0) {
if (cancel_task.load()) {
return genError(HTTPResponse(HTTPStatusCode::HTTP_UNKNOWN, "Async send task was canceled"));
}
auto sent = sendData(request, remaining);
if (!sent.ok()) return genError(sent.getErr());
remaining -= *sent;
if (*sent == 0) {
// sleep for 25ms to avoid busy waiting
this_thread::sleep_for(chrono::milliseconds(25));
}
}
return Maybe<void, HTTPResponse>();
});
// Set a timeout of 1 minute for the task
auto timeout = chrono::minutes(1);
auto start_time = i_time->getMonotonicTime();
// poll and yield until send completes or timeout occurs
if (!task.valid()) {
return genError(HTTPResponse(HTTPStatusCode::HTTP_UNKNOWN,
"Async send future is not valid (no_state)"));
}
// Modify the loop to yield after setting cancel_task to true
while (task.wait_for(chrono::milliseconds(0)) != future_status::ready) {
if (i_time->getMonotonicTime() - start_time > timeout) {
cancel_task.store(true); // Signal the task to cancel
i_mainloop->yield(chrono::milliseconds(50)); // Yield for 50ms after canceling
return genError(HTTPResponse(HTTPStatusCode::HTTP_UNKNOWN, "Async send task timed out"));
}
i_mainloop->yield(chrono::milliseconds(30)); // Yield for 30ms
dbgTrace(D_CONNECTION) << "Waiting for async send to complete...";
}
dbgDebug(D_CONNECTION) << "Async send completed.";
auto send_res = task.get();
if (!send_res.ok()) return send_res.passErr();
} else {
while (data_left_to_send > 0) {
if (i_time->getMonotonicTime() > sending_end_time) return genError(sending_timeout);
auto send_size = sendData(request, data_left_to_send);
if (!send_size.ok()) return send_size.passErr();
data_left_to_send -= *send_size;
i_mainloop->yield(*send_size == 0);
}
}
auto receiving_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
@@ -660,11 +731,11 @@ private:
return receieved.passErr();
}
auto response = http_parser.parseData(*receieved, is_connect);
i_mainloop->yield(receieved.unpack().empty());
if (response.ok()) {
dbgTrace(D_MESSAGING) << printOut(response.unpack().toString());
return response.unpack();
}
i_mainloop->yield(receieved.unpack().empty());
}
return genError(parsing_error);
}
@@ -708,7 +779,6 @@ private:
bool is_dual_auth = false;
Maybe<string> sni_hostname = genError<string>("Uninitialized");
Maybe<string> dn_host_name = genError<string>("Uninitialized");
};
Connection::Connection(const MessageConnectionKey &key, const MessageMetadata &metadata)

View File

@@ -97,7 +97,9 @@ private:
if (!external_certificate.empty()) conn.setExternalCertificate(external_certificate);
auto connected = conn.establishConnection();
persistent_connections.emplace(conn_key, conn);
if (!metadata.getConnectionFlags().isSet(MessageConnectionConfig::ONE_TIME_FOG_CONN)) {
persistent_connections.emplace(conn_key, conn);
}
if (!connected.ok()) {
string connection_err = "Failed to establish connection. Error: " + connected.getErr();
@@ -140,8 +142,9 @@ private:
}
dbgTrace(D_CONNECTION) << "Connection over proxy established succssesfuly";
persistent_connections.emplace(conn_key, conn);
if (!metadata.getConnectionFlags().isSet(MessageConnectionConfig::ONE_TIME_FOG_CONN)) {
persistent_connections.emplace(conn_key, conn);
}
return conn;
}

View File

@@ -264,3 +264,51 @@ TEST_F(TestConnectionComp, testEstablishNewProxyConnection)
auto maybe_connection = i_conn->establishConnection(conn_metadata, MessageCategory::LOG);
}
TEST_F(TestConnectionComp, testSendRequestWithOneTimeFogConnection)
{
Flags<MessageConnectionConfig> conn_flags;
conn_flags.setFlag(MessageConnectionConfig::UNSECURE_CONN);
conn_flags.setFlag(MessageConnectionConfig::ONE_TIME_FOG_CONN);
MessageMetadata conn_metadata(fog_addr, fog_port, conn_flags);
auto maybe_connection = i_conn->establishConnection(conn_metadata, MessageCategory::LOG);
ASSERT_TRUE(maybe_connection.ok());
auto conn = maybe_connection.unpack();
auto req = HTTPRequest::prepareRequest(conn, HTTPMethod::POST, "/test", conn_metadata.getHeaders(), "test-body");
ASSERT_TRUE(req.ok());
EXPECT_CALL(mock_mainloop, yield(A<std::chrono::microseconds>()))
.WillOnce(
InvokeWithoutArgs(
[&]() {
cerr << "accepting socket" << endl;
dummy_socket.acceptSocket();
dummy_socket.writeToSocket("HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\nmy-test");
}
)
).WillRepeatedly(Return());
EXPECT_CALL(mock_timer, getMonotonicTime())
.WillRepeatedly(Invoke([]() { static int j = 0; return chrono::microseconds(++j * 10); }));
auto maybe_response = i_conn->sendRequest(conn, *req);
if (!maybe_response.ok()) {
cout << "Error: " << maybe_response.getErr().toString() << endl;
}
ASSERT_TRUE(maybe_response.ok());
EXPECT_EQ((*maybe_response).getBody(), "my-test");
string expected_msg =
"POST /test HTTP/1.1\r\n"
"Accept-Encoding: identity\r\n"
"Authorization: Bearer accesstoken\r\n"
"Connection: keep-alive\r\n"
"Content-Length: 9\r\n"
"Content-type: application/json\r\n"
"Host: 127.0.0.1\r\n"
"\r\n"
"test-body";
EXPECT_EQ(dummy_socket.readFromSocket(), expected_msg);
}

View File

@@ -25,6 +25,7 @@
using namespace std;
USE_DEBUG_FLAG(D_MESSAGING);
USE_DEBUG_FLAG(D_TRACE_ID);
class FogConnectionChecker : public ServerRest
{
@@ -95,13 +96,14 @@ isMessageToFog(const MessageMetadata message_metadata)
Maybe<Connection>
MessagingComp::getConnection(MessageCategory category, const MessageMetadata &metadata)
{
auto persistant_conn = getPersistentConnection(metadata, category);
if (persistant_conn.ok()) {
dbgTrace(D_MESSAGING) << "Found a persistant connection";
return persistant_conn;
if (!metadata.getConnectionFlags().isSet(MessageConnectionConfig::ONE_TIME_FOG_CONN)) {
auto persistant_conn = getPersistentConnection(metadata, category);
if (persistant_conn.ok()) {
dbgTrace(D_MESSAGING) << "Found a persistant connection";
return persistant_conn;
}
dbgDebug(D_MESSAGING) << persistant_conn.getErr();
}
dbgDebug(D_MESSAGING) << persistant_conn.getErr();
auto maybe_conn = i_conn->establishConnection(metadata, category);
if (!maybe_conn.ok()) {
dbgWarning(D_MESSAGING) << maybe_conn.getErr();
@@ -131,6 +133,7 @@ MessagingComp::sendMessage(
bool is_to_fog = isMessageToFog(message_metadata);
auto metadata = message_metadata;
if (is_to_fog) {
if (method == HTTPMethod::GET && fog_get_requests_cache.doesKeyExists(uri)) {
HTTPResponse res = fog_get_requests_cache.getEntry(uri);
@@ -141,7 +144,16 @@ MessagingComp::sendMessage(
auto i_env = Singleton::Consume<I_Environment>::by<Messaging>();
metadata.insertHeader("User-Agent", "Infinity Next (a7030abf93a4c13)");
metadata.insertHeaders(i_env->getCurrentHeadersMap());
if (!metadata.getTraceId().ok()) {
metadata.insertHeaders(i_env->getCurrentHeadersMap());
}
Maybe<string> trace_id = metadata.getTraceId();
if (trace_id.ok()) {
dbgTrace(D_TRACE_ID) << "Sending message to fog (trace ID: " << trace_id.unpack() << ")";
} else {
dbgTrace(D_TRACE_ID) << "Could not retrieve trace ID for fog message. Error: " << trace_id.getErr();
}
}
auto req = HTTPRequest::prepareRequest(