Jul 23rd update

This commit is contained in:
Ned Wright
2024-07-23 11:00:05 +00:00
parent 7b5683e4b0
commit 0ba243d5c0
10 changed files with 92 additions and 30 deletions

View File

@@ -82,6 +82,7 @@ ngx_http_cp_signal_to_service(uint32_t cur_session_id)
/// @brief Signals and recieve signal to/from nano service about new session to inspect.
/// @param[in] cur_session_id Session's Id.
/// @param[in] chunk_type Chunk type that the attachment is waiting for a response from nano service.
/// @param[in] tout_retries NUmber of retries to wait for a response from nano service.
/// @returns ngx_int_t
/// - #NGX_OK
/// - #NGX_ERROR
@@ -89,7 +90,7 @@ ngx_http_cp_signal_to_service(uint32_t cur_session_id)
/// - #NGX_AGAIN
///
static ngx_int_t
ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chunk_type)
ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chunk_type, ngx_int_t tout_retries)
{
static int dbg_count = 0;
static clock_t clock_start = (clock_t) 0;
@@ -103,8 +104,8 @@ ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chun
res = ngx_http_cp_signal_to_service(cur_session_id);
if (res != NGX_OK) return res;
write_dbg(DBG_LEVEL_TRACE, "Successfully signaled to the service! pending to receive ack");
for (retry = 0; retry < 3; ) {
write_dbg(DBG_LEVEL_TRACE, "Successfully signaled to the service! pending to receive ack, retries = %d", tout_retries);
for (retry = 0; retry < tout_retries; ) {
// If inspection_mode is different from NON_BLOCKING_THREAD, then the loop will run indefinitely.
if (!is_fail_open_disabled) {
retry++;
@@ -195,7 +196,8 @@ ngx_http_cp_send_data_to_service(
uint8_t num_of_data_elem,
uint32_t cur_session_id,
int *was_waiting,
ngx_http_chunk_type_e chunk_type
ngx_http_chunk_type_e chunk_type,
ngx_int_t tout_retries
)
{
ngx_int_t max_retries;
@@ -215,7 +217,7 @@ ngx_http_cp_send_data_to_service(
*was_waiting = 1;
}
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type);
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type, tout_retries);
if (res != NGX_OK && res != NGX_AGAIN) return res;
}
@@ -453,7 +455,8 @@ ngx_http_cp_reply_receiver(
uint32_t cur_session_id,
ngx_http_request_t *request,
ngx_http_cp_modification_list **modification_list,
ngx_http_chunk_type_e chunk_type
ngx_http_chunk_type_e chunk_type,
uint64_t processed_body_size
)
{
ngx_http_cp_reply_from_service_t *reply_p;
@@ -461,6 +464,7 @@ ngx_http_cp_reply_receiver(
ngx_http_cp_modification_list *current_modification = NULL;
ngx_http_cp_inject_data_t *current_inject_data = NULL;
ngx_int_t res;
ngx_int_t tout_retries = min_retries_for_verdict;
uint8_t modification_count;
unsigned int modification_index;
@@ -471,8 +475,11 @@ ngx_http_cp_reply_receiver(
return NGX_OK;
}
if (processed_body_size > body_size_trigger)
tout_retries = max_retries_for_verdict;
do {
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type);
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type, tout_retries);
} while (res == NGX_AGAIN);
if (res != NGX_OK) return NGX_ERROR;
@@ -782,7 +789,7 @@ ngx_http_cp_meta_data_sender(ngx_http_request_t *request, uint32_t cur_request_i
set_fragment_elem(fragments, fragments_sizes, parsed_uri.data, parsed_uri.len, PARSED_URI_DATA + 2);
// Sends all the data to the nano service.
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL, fail_open_timeout);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
if (res != NGX_OK) {
// Failed to send the metadata to nano service.
if (res == NGX_ERROR && failure_count++ == 5) {
@@ -828,7 +835,7 @@ ngx_http_cp_end_transaction_sender(
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&end_transaction_type, &cur_request_id);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
if (res != NGX_OK) {
return NGX_ERROR;
}
@@ -851,7 +858,7 @@ ngx_http_cp_wait_sender(uint32_t cur_request_id, ngx_uint_t *num_messages_sent)
write_dbg(DBG_LEVEL_TRACE, "Sending wait event flag for inspection");
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
if (res != NGX_OK) {
return NGX_ERROR;
}
@@ -876,7 +883,7 @@ ngx_http_cp_res_code_sender(uint16_t response_code, uint32_t cur_req_id, ngx_uin
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id);
set_fragment_elem(fragments, fragments_sizes, &response_code, sizeof(uint16_t), 2);
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL, fail_open_hold_timeout) != NGX_OK) {
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL, fail_open_hold_timeout, min_retries_for_verdict) != NGX_OK) {
return NGX_ERROR;
}
@@ -900,7 +907,7 @@ ngx_http_cp_content_length_sender(uint64_t content_length_n, uint32_t cur_req_id
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id);
set_fragment_elem(fragments, fragments_sizes, &content_length_val, sizeof(content_length_val), 2);
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL, fail_open_timeout) != NGX_OK) {
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL, fail_open_timeout, min_retries_for_verdict) != NGX_OK) {
return NGX_ERROR;
}
@@ -953,7 +960,7 @@ send_header_bulk(
set_fragment_elem(data, data_sizes, &is_last_part, sizeof(is_last_part), 2);
set_fragment_elem(data, data_sizes, &bulk_part_index, sizeof(bulk_part_index), 3);
res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL, fail_open_timeout);
res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
if (res != NGX_OK) {
write_dbg(DBG_LEVEL_TRACE, "Failed to send bulk of %iu headers", num_headers);
return NGX_ERROR;
@@ -1102,6 +1109,7 @@ ngx_http_cp_body_sender(
ngx_int_t res = NGX_ERROR;
uint8_t is_last_chunk;
uint8_t part_count;
ngx_int_t tout_retries = min_retries_for_verdict;
char *fragments[num_body_chunk_fragments];
uint16_t fragments_sizes[num_body_chunk_fragments];
int was_waiting = 0;
@@ -1130,13 +1138,17 @@ ngx_http_cp_body_sender(
set_fragment_elem(fragments, fragments_sizes, buf->pos, buf->last - buf->pos, 4);
if (body_type == REQUEST_BODY) {
session_data->processed_req_body_size = (buf->last - buf->pos);
session_data->processed_req_body_size += (buf->last - buf->pos);
if (session_data->processed_req_body_size > body_size_trigger)
tout_retries = max_retries_for_verdict;
} else if (body_type == RESPONSE_BODY) {
session_data->processed_res_body_size = (buf->last - buf->pos);
session_data->processed_res_body_size += (buf->last - buf->pos);
if (session_data->processed_res_body_size > body_size_trigger)
tout_retries = max_retries_for_verdict;
}
// Sending the data to the nano service.
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, num_body_chunk_fragments, session_data->session_id,
&was_waiting, fail_open_timeout);
&was_waiting, fail_open_timeout, tout_retries);
if (res != NGX_OK) {
// Failed to send the fragments to the nano service.
@@ -1179,7 +1191,7 @@ ngx_http_cp_metric_data_sender()
fragments = (char *)&data_to_send;
fragments_sizes = sizeof(ngx_http_cp_metric_data_t);
res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL, fail_open_timeout);
res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL, fail_open_timeout, min_retries_for_verdict);
reset_metric_data();
return res;
}