Add docket support

This commit is contained in:
Ned Wright
2022-10-25 14:45:02 +00:00
parent 09847ac179
commit 3703b14ba6
71 changed files with 7126 additions and 2644 deletions

View File

@@ -81,6 +81,7 @@ ngx_http_cp_signal_to_service(uint32_t cur_session_id)
///
/// @brief Signals and recieve signal to/from nano service about new session to inspect.
/// @param[in] cur_session_id Session's Id.
/// @param[in] chunk_type Chunk type that the attachment is waiting for a response from nano service.
/// @returns ngx_int_t
/// - #NGX_OK
/// - #NGX_ERROR
@@ -88,7 +89,7 @@ ngx_http_cp_signal_to_service(uint32_t cur_session_id)
/// - #NGX_AGAIN
///
static ngx_int_t
ngx_http_cp_wait_for_service(uint32_t cur_session_id)
ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chunk_type)
{
static int dbg_count = 0;
static clock_t clock_start = (clock_t) 0;
@@ -97,6 +98,7 @@ ngx_http_cp_wait_for_service(uint32_t cur_session_id)
uint32_t reply_from_service;
ngx_int_t retry;
int is_fail_open_disabled = (inspection_mode != NON_BLOCKING_THREAD);
ngx_uint_t timeout = chunk_type == HOLD_DATA ? fail_open_hold_timeout : fail_open_timeout;
res = ngx_http_cp_signal_to_service(cur_session_id);
if (res != NGX_OK) return res;
@@ -111,7 +113,7 @@ ngx_http_cp_wait_for_service(uint32_t cur_session_id)
s_poll.fd = comm_socket;
s_poll.events = POLLIN;
s_poll.revents = 0;
res = poll(&s_poll, 1, is_fail_open_disabled ? 150 : fail_open_timeout);
res = poll(&s_poll, 1, is_fail_open_disabled ? 150 : timeout);
if (res < 0) {
// Polling from the nano service has failed.
@@ -192,7 +194,9 @@ ngx_http_cp_send_data_to_service(
const uint16_t *fragments_sizes,
uint8_t num_of_data_elem,
uint32_t cur_session_id,
int *was_waiting)
int *was_waiting,
ngx_http_chunk_type_e chunk_type
)
{
ngx_int_t max_retries;
ngx_int_t res = NGX_OK;
@@ -209,7 +213,7 @@ ngx_http_cp_send_data_to_service(
*was_waiting = 1;
}
res = ngx_http_cp_wait_for_service(cur_session_id);
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type);
if (res != NGX_OK && res != NGX_AGAIN) return res;
}
@@ -428,7 +432,9 @@ ngx_http_cp_reply_receiver(
ngx_http_cp_verdict_e *verdict,
uint32_t cur_session_id,
ngx_http_request_t *request,
ngx_http_cp_modification_list **modification_list)
ngx_http_cp_modification_list **modification_list,
ngx_http_chunk_type_e chunk_type
)
{
ngx_http_cp_reply_from_service_t *reply_p;
ngx_http_cp_modification_list *new_modification = NULL;
@@ -446,7 +452,7 @@ ngx_http_cp_reply_receiver(
}
do {
res = ngx_http_cp_wait_for_service(cur_session_id);
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type);
} while (res == NGX_AGAIN);
if (res != NGX_OK) return NGX_ERROR;
@@ -554,10 +560,19 @@ ngx_http_cp_reply_receiver(
break;
}
case TRAFFIC_VERDICT_INSPECT:
case TRAFFIC_VERDICT_INSPECT: {
// After an irrelevant verdict, ignore the verdict and continue to the next response.
write_dbg(DBG_LEVEL_TRACE, "Verdict inspect received from the nano service");
updateMetricField(INSPECT_VERDICTS_COUNT, 1);
break;
}
case TRAFFIC_VERDICT_WAIT: {
// After a wait verdict, query the nano agent again to get an updated verdict.
write_dbg(DBG_LEVEL_DEBUG, "Verdict wait received from the nano service");
updateMetricField(HOLD_VERDICTS_COUNT, 1);
break;
}
}
free_data_from_service();
@@ -718,7 +733,7 @@ ngx_http_cp_meta_data_sender(ngx_http_request_t *request, uint32_t cur_request_i
set_fragment_elem(fragments, fragments_sizes, &client_port, sizeof(client_port), CLIENT_PORT + 2);
// Sends all the data to the nano service.
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL, fail_open_timeout);
if (res != NGX_OK) {
// Failed to send the metadata to nano service.
if (res == NGX_ERROR && failure_count++ == 5) {
@@ -764,7 +779,7 @@ ngx_http_cp_end_transaction_sender(
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&end_transaction_type, &cur_request_id);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL);
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout);
if (res != NGX_OK) {
return NGX_ERROR;
}
@@ -773,6 +788,30 @@ ngx_http_cp_end_transaction_sender(
return NGX_OK;
}
ngx_int_t
ngx_http_cp_wait_sender(uint32_t cur_request_id, ngx_uint_t *num_messages_sent)
{
static const ngx_uint_t end_transaction_num_fragments = 2;
char *fragments[end_transaction_num_fragments];
uint16_t fragments_sizes[end_transaction_num_fragments];
ngx_http_chunk_type_e transaction_type = HOLD_DATA;
ngx_int_t res;
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&transaction_type, &cur_request_id);
write_dbg(DBG_LEVEL_TRACE, "Sending wait event flag for inspection");
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout);
if (res != NGX_OK) {
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_TRACE, "Successfully sent wait event");
*num_messages_sent = 1;
return NGX_OK;
}
ngx_int_t
ngx_http_cp_res_code_sender(uint16_t response_code, uint32_t cur_req_id, ngx_uint_t *num_messages_sent)
{
@@ -788,7 +827,7 @@ ngx_http_cp_res_code_sender(uint16_t response_code, uint32_t cur_req_id, ngx_uin
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id);
set_fragment_elem(fragments, fragments_sizes, &response_code, sizeof(uint16_t), 2);
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL) != NGX_OK) {
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL, fail_open_hold_timeout) != NGX_OK) {
return NGX_ERROR;
}
@@ -812,7 +851,7 @@ ngx_http_cp_content_length_sender(uint64_t content_length_n, uint32_t cur_req_id
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id);
set_fragment_elem(fragments, fragments_sizes, &content_length_val, sizeof(content_length_val), 2);
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL) != NGX_OK) {
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL, fail_open_timeout) != NGX_OK) {
return NGX_ERROR;
}
@@ -865,7 +904,7 @@ send_header_bulk(
set_fragment_elem(data, data_sizes, &is_last_part, sizeof(is_last_part), 2);
set_fragment_elem(data, data_sizes, &bulk_part_index, sizeof(bulk_part_index), 3);
res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL);
res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL, fail_open_timeout);
if (res != NGX_OK) {
write_dbg(DBG_LEVEL_TRACE, "Failed to send bulk of %iu headers", num_headers);
return NGX_ERROR;
@@ -1048,7 +1087,7 @@ ngx_http_cp_body_sender(
}
// Sending the data to the nano service.
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, num_body_chunk_fragments, session_data->session_id,
&was_waiting);
&was_waiting, fail_open_timeout);
if (res != NGX_OK) {
// Failed to send the fragments to the nano service.
@@ -1091,7 +1130,7 @@ ngx_http_cp_metric_data_sender()
fragments = (char *)&data_to_send;
fragments_sizes = sizeof(ngx_http_cp_metric_data_t);
res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL);
res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL, fail_open_timeout);
reset_metric_data();
return res;
}