Aug 08 2025 dev (#336)

* sync code

* sync code

* sync code

---------

Co-authored-by: Ned Wright <nedwright@proton.me>
This commit is contained in:
Daniel-Eisenberg
2025-08-10 13:21:52 +03:00
committed by GitHub
parent dd19bf6158
commit 6bbc89712a
153 changed files with 4864 additions and 1018 deletions

View File

@@ -21,6 +21,9 @@
#include <fstream>
#include <sstream>
#include <string>
#include <thread>
#include <future>
#include <atomic>
#include "config.h"
#include "http_request.h"
@@ -73,6 +76,7 @@ enum class ConnectionFlags
{
UNSECURE,
ONE_TIME,
ASYNC_ONE_TIME,
IGNORE_SSL_VALIDATION,
PROXY,
@@ -87,6 +91,9 @@ public:
auto metadata_flags = metadata.getConnectionFlags();
if (metadata_flags.isSet(MessageConnectionConfig::UNSECURE_CONN)) flags.setFlag(ConnectionFlags::UNSECURE);
if (metadata_flags.isSet(MessageConnectionConfig::ONE_TIME_CONN)) flags.setFlag(ConnectionFlags::ONE_TIME);
if (metadata_flags.isSet(MessageConnectionConfig::ONE_TIME_FOG_CONN)) {
flags.setFlag(ConnectionFlags::ASYNC_ONE_TIME);
}
if (metadata_flags.isSet(MessageConnectionConfig::IGNORE_SSL_VALIDATION)) {
flags.setFlag(ConnectionFlags::IGNORE_SSL_VALIDATION);
}
@@ -99,7 +106,6 @@ public:
sni_hostname = metadata.getSniHostName();
dn_host_name = metadata.getDnHostName();
}
void
@@ -511,12 +517,29 @@ private:
BioConnectionStatus bio_connect = tryToBioConnect(full_address);
uint attempts_count = 0;
auto conn_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
auto maybe_is_orch = Singleton::Consume<I_Environment>::by<Messaging>()->get<bool>("Is Orchestrator");
auto is_orch = maybe_is_orch.ok() && *maybe_is_orch;
while (i_time->getMonotonicTime() < conn_end_time && bio_connect == BioConnectionStatus::SHOULD_RETRY) {
if (is_orch) { // fixing code for orch case due to stability concerns - should be removed
if (isBioSocketReady()) {
bio_connect = tryToBioConnect(full_address);
} else {
i_mainloop->yield((attempts_count % 10) == 0);
}
continue;
}
if (isBioSocketReady()) {
bio_connect = tryToBioConnect(full_address);
} else {
i_mainloop->yield((attempts_count % 10) == 0);
}
dbgTrace(D_CONNECTION)
<< "Connection to: "
<< full_address
<< " should retry. number of made attempts: "
<< ++attempts_count;
i_mainloop->yield(true);
}
if (bio_connect == BioConnectionStatus::SUCCESS) {
@@ -553,7 +576,8 @@ private:
int data_sent_len = BIO_write(bio.get(), curr_data_to_send, data_left_to_send);
if (data_sent_len >= 0) {
dbgTrace(D_CONNECTION) << "Sent " << data_sent_len << " bytes, out of: " << data_left_to_send << " bytes.";
dbgTrace(D_CONNECTION) << "Sent " << data_sent_len << " bytes, out of: " << data_left_to_send
<< " bytes (total remaining: " << data_left_to_send - data_sent_len << " bytes).";
return data_sent_len;
}
@@ -637,13 +661,60 @@ private:
I_TimeGet *i_time = Singleton::Consume<I_TimeGet>::by<Messaging>();
auto sending_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
size_t data_left_to_send = request.length();
atomic<bool> cancel_task{false};
while (data_left_to_send > 0) {
if (i_time->getMonotonicTime() > sending_end_time) return genError(sending_timeout);
auto send_size = sendData(request, data_left_to_send);
if (!send_size.ok()) return send_size.passErr();
data_left_to_send -= *send_size;
i_mainloop->yield(*send_size == 0);
// Use smaller chunks for one-time connections with yielding between chunks
if (flags.isSet(ConnectionFlags::ASYNC_ONE_TIME)) {
// Launch async task to send full request only
// note do not add debug logs inside the async task
// as it will cause a crash
auto task = async(launch::async, [this, request, &cancel_task]() -> Maybe<void, HTTPResponse> {
size_t remaining = request.length();
while (remaining > 0) {
if (cancel_task.load()) {
return genError(HTTPResponse(HTTPStatusCode::HTTP_UNKNOWN, "Async send task was canceled"));
}
auto sent = sendData(request, remaining);
if (!sent.ok()) return genError(sent.getErr());
remaining -= *sent;
if (*sent == 0) {
// sleep for 25ms to avoid busy waiting
this_thread::sleep_for(chrono::milliseconds(25));
}
}
return Maybe<void, HTTPResponse>();
});
// Set a timeout of 1 minute for the task
auto timeout = chrono::minutes(1);
auto start_time = i_time->getMonotonicTime();
// poll and yield until send completes or timeout occurs
if (!task.valid()) {
return genError(HTTPResponse(HTTPStatusCode::HTTP_UNKNOWN,
"Async send future is not valid (no_state)"));
}
// Modify the loop to yield after setting cancel_task to true
while (task.wait_for(chrono::milliseconds(0)) != future_status::ready) {
if (i_time->getMonotonicTime() - start_time > timeout) {
cancel_task.store(true); // Signal the task to cancel
i_mainloop->yield(chrono::milliseconds(50)); // Yield for 50ms after canceling
return genError(HTTPResponse(HTTPStatusCode::HTTP_UNKNOWN, "Async send task timed out"));
}
i_mainloop->yield(chrono::milliseconds(30)); // Yield for 30ms
dbgTrace(D_CONNECTION) << "Waiting for async send to complete...";
}
dbgDebug(D_CONNECTION) << "Async send completed.";
auto send_res = task.get();
if (!send_res.ok()) return send_res.passErr();
} else {
while (data_left_to_send > 0) {
if (i_time->getMonotonicTime() > sending_end_time) return genError(sending_timeout);
auto send_size = sendData(request, data_left_to_send);
if (!send_size.ok()) return send_size.passErr();
data_left_to_send -= *send_size;
i_mainloop->yield(*send_size == 0);
}
}
auto receiving_end_time = i_time->getMonotonicTime() + getConnectionTimeout();
@@ -660,11 +731,11 @@ private:
return receieved.passErr();
}
auto response = http_parser.parseData(*receieved, is_connect);
i_mainloop->yield(receieved.unpack().empty());
if (response.ok()) {
dbgTrace(D_MESSAGING) << printOut(response.unpack().toString());
return response.unpack();
}
i_mainloop->yield(receieved.unpack().empty());
}
return genError(parsing_error);
}
@@ -708,7 +779,6 @@ private:
bool is_dual_auth = false;
Maybe<string> sni_hostname = genError<string>("Uninitialized");
Maybe<string> dn_host_name = genError<string>("Uninitialized");
};
Connection::Connection(const MessageConnectionKey &key, const MessageMetadata &metadata)