diff --git a/attachments/nginx/nginx_attachment_util/nginx_attachment_util.cc b/attachments/nginx/nginx_attachment_util/nginx_attachment_util.cc index 32e66cd..5135c72 100644 --- a/attachments/nginx/nginx_attachment_util/nginx_attachment_util.cc +++ b/attachments/nginx/nginx_attachment_util/nginx_attachment_util.cc @@ -155,6 +155,24 @@ getWaitingForVerdictThreadTimeout() return conf_data.getNumericalValue("waiting_for_verdict_thread_timeout_msec"); } +unsigned int +getMinRetriesForVerdict() +{ + return conf_data.getNumericalValue("min_retries_for_verdict"); +} + +unsigned int +getMaxRetriesForVerdict() +{ + return conf_data.getNumericalValue("max_retries_for_verdict"); +} + +unsigned int +getReqBodySizeTrigger() +{ + return conf_data.getNumericalValue("body_size_trigger"); +} + int isIPAddress(c_str ip_str) { diff --git a/attachments/nginx/ngx_module/ngx_cp_hook_threads.c b/attachments/nginx/ngx_module/ngx_cp_hook_threads.c index 95911fc..8f332fe 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hook_threads.c +++ b/attachments/nginx/ngx_module/ngx_cp_hook_threads.c @@ -168,7 +168,8 @@ end_req_header_handler( session_data_p->session_id, request, modifications, - REQUEST_END + REQUEST_END, + 0 ); } @@ -282,7 +283,8 @@ ngx_http_cp_req_body_filter_thread(void *_ctx) session_data_p->session_id, request, &ctx->modifications, - REQUEST_BODY + REQUEST_BODY, + session_data_p->processed_req_body_size ); if (is_last_part) session_data_p->was_request_fully_inspected = 1; @@ -322,7 +324,8 @@ ngx_http_cp_req_end_transaction_thread(void *_ctx) session_data_p->session_id, request, &ctx->modifications, - REQUEST_END + REQUEST_END, + session_data_p->processed_req_body_size ); } @@ -433,7 +436,8 @@ ngx_http_cp_res_header_filter_thread(void *_ctx) session_data_p->session_id, request, &ctx->modifications, - RESPONSE_HEADER + RESPONSE_HEADER, + 0 ); return NULL; @@ -497,7 +501,8 @@ ngx_http_cp_res_body_filter_thread(void *_ctx) session_data_p->session_id, request, &ctx->modifications, - RESPONSE_BODY + RESPONSE_BODY, + session_data_p->processed_res_body_size ); return NULL; @@ -533,7 +538,8 @@ ngx_http_cp_hold_verdict_thread(void *_ctx) session_data_p->session_id, request, &ctx->modifications, - HOLD_DATA + HOLD_DATA, + 0 ); write_dbg( diff --git a/attachments/nginx/ngx_module/ngx_cp_hooks.c b/attachments/nginx/ngx_module/ngx_cp_hooks.c index 7cd83bc..e5f0537 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hooks.c +++ b/attachments/nginx/ngx_module/ngx_cp_hooks.c @@ -83,7 +83,7 @@ init_cp_session_data(ngx_http_request_t *request) session_data->req_proccesing_time = 0; session_data->res_proccesing_time = 0; session_data->processed_req_body_size = 0; - session_data->processed_req_body_size = 0; + session_data->processed_res_body_size = 0; ngx_http_set_ctx(request, session_data, ngx_http_cp_attachment_module); diff --git a/attachments/nginx/ngx_module/ngx_cp_hooks.h b/attachments/nginx/ngx_module/ngx_cp_hooks.h index 9d172b4..72d036f 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hooks.h +++ b/attachments/nginx/ngx_module/ngx_cp_hooks.h @@ -38,7 +38,7 @@ typedef struct ngx_http_cp_session_data { ngx_int_t was_request_fully_inspected; ///< Holds if the request fully inspected. ngx_http_cp_verdict_e verdict; ///< Holds the session's verdict from the Nano Service. uint32_t session_id; ///< Current session's Id. - ngx_int_t remaining_messages_to_reply; ///< Remaining messages left for the agent to respond to. + ngx_int_t remaining_messages_to_reply; ///< Remaining messages left for the agent to respond to. ngx_http_response_data response_data; ///< Holds session's response data. struct timespec session_start_time; ///< Holds session's start time. double req_proccesing_time; ///< Holds session's request processing time. @@ -99,7 +99,7 @@ ngx_int_t ngx_http_cp_req_header_handler(ngx_http_request_t *request); /// /// @brief Sends a request to the nano service to update the verdict. -/// @note Should be called after the nano service provided the verdict TRAFFIC_VERDICT_WAIT to get the updated verdict. +/// @note Should be called after the nano service provided the verdict TRAFFIC_VERDICT_WAIT to get the updated verdict. /// @param[in, out] request Event thread context to be updated. /// @returns ngx_int_t /// - #1 if request was properly communicated with the nano service and provided an updated response. diff --git a/attachments/nginx/ngx_module/ngx_cp_io.c b/attachments/nginx/ngx_module/ngx_cp_io.c index bf25d21..666a955 100644 --- a/attachments/nginx/ngx_module/ngx_cp_io.c +++ b/attachments/nginx/ngx_module/ngx_cp_io.c @@ -82,6 +82,7 @@ ngx_http_cp_signal_to_service(uint32_t cur_session_id) /// @brief Signals and recieve signal to/from nano service about new session to inspect. /// @param[in] cur_session_id Session's Id. /// @param[in] chunk_type Chunk type that the attachment is waiting for a response from nano service. +/// @param[in] tout_retries NUmber of retries to wait for a response from nano service. /// @returns ngx_int_t /// - #NGX_OK /// - #NGX_ERROR @@ -89,7 +90,7 @@ ngx_http_cp_signal_to_service(uint32_t cur_session_id) /// - #NGX_AGAIN /// static ngx_int_t -ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chunk_type) +ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chunk_type, ngx_int_t tout_retries) { static int dbg_count = 0; static clock_t clock_start = (clock_t) 0; @@ -103,8 +104,8 @@ ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chun res = ngx_http_cp_signal_to_service(cur_session_id); if (res != NGX_OK) return res; - write_dbg(DBG_LEVEL_TRACE, "Successfully signaled to the service! pending to receive ack"); - for (retry = 0; retry < 3; ) { + write_dbg(DBG_LEVEL_TRACE, "Successfully signaled to the service! pending to receive ack, retries = %d", tout_retries); + for (retry = 0; retry < tout_retries; ) { // If inspection_mode is different from NON_BLOCKING_THREAD, then the loop will run indefinitely. if (!is_fail_open_disabled) { retry++; @@ -195,7 +196,8 @@ ngx_http_cp_send_data_to_service( uint8_t num_of_data_elem, uint32_t cur_session_id, int *was_waiting, - ngx_http_chunk_type_e chunk_type + ngx_http_chunk_type_e chunk_type, + ngx_int_t tout_retries ) { ngx_int_t max_retries; @@ -215,7 +217,7 @@ ngx_http_cp_send_data_to_service( *was_waiting = 1; } - res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type); + res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type, tout_retries); if (res != NGX_OK && res != NGX_AGAIN) return res; } @@ -453,7 +455,8 @@ ngx_http_cp_reply_receiver( uint32_t cur_session_id, ngx_http_request_t *request, ngx_http_cp_modification_list **modification_list, - ngx_http_chunk_type_e chunk_type + ngx_http_chunk_type_e chunk_type, + uint64_t processed_body_size ) { ngx_http_cp_reply_from_service_t *reply_p; @@ -461,6 +464,7 @@ ngx_http_cp_reply_receiver( ngx_http_cp_modification_list *current_modification = NULL; ngx_http_cp_inject_data_t *current_inject_data = NULL; ngx_int_t res; + ngx_int_t tout_retries = min_retries_for_verdict; uint8_t modification_count; unsigned int modification_index; @@ -471,8 +475,11 @@ ngx_http_cp_reply_receiver( return NGX_OK; } + if (processed_body_size > body_size_trigger) + tout_retries = max_retries_for_verdict; + do { - res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type); + res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type, tout_retries); } while (res == NGX_AGAIN); if (res != NGX_OK) return NGX_ERROR; @@ -782,7 +789,7 @@ ngx_http_cp_meta_data_sender(ngx_http_request_t *request, uint32_t cur_request_i set_fragment_elem(fragments, fragments_sizes, parsed_uri.data, parsed_uri.len, PARSED_URI_DATA + 2); // Sends all the data to the nano service. - res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL, fail_open_timeout); + res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict); if (res != NGX_OK) { // Failed to send the metadata to nano service. if (res == NGX_ERROR && failure_count++ == 5) { @@ -828,7 +835,7 @@ ngx_http_cp_end_transaction_sender( set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&end_transaction_type, &cur_request_id); - res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout); + res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict); if (res != NGX_OK) { return NGX_ERROR; } @@ -851,7 +858,7 @@ ngx_http_cp_wait_sender(uint32_t cur_request_id, ngx_uint_t *num_messages_sent) write_dbg(DBG_LEVEL_TRACE, "Sending wait event flag for inspection"); - res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout); + res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict); if (res != NGX_OK) { return NGX_ERROR; } @@ -876,7 +883,7 @@ ngx_http_cp_res_code_sender(uint16_t response_code, uint32_t cur_req_id, ngx_uin set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id); set_fragment_elem(fragments, fragments_sizes, &response_code, sizeof(uint16_t), 2); - if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL, fail_open_hold_timeout) != NGX_OK) { + if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL, fail_open_hold_timeout, min_retries_for_verdict) != NGX_OK) { return NGX_ERROR; } @@ -900,7 +907,7 @@ ngx_http_cp_content_length_sender(uint64_t content_length_n, uint32_t cur_req_id set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id); set_fragment_elem(fragments, fragments_sizes, &content_length_val, sizeof(content_length_val), 2); - if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL, fail_open_timeout) != NGX_OK) { + if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL, fail_open_timeout, min_retries_for_verdict) != NGX_OK) { return NGX_ERROR; } @@ -953,7 +960,7 @@ send_header_bulk( set_fragment_elem(data, data_sizes, &is_last_part, sizeof(is_last_part), 2); set_fragment_elem(data, data_sizes, &bulk_part_index, sizeof(bulk_part_index), 3); - res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL, fail_open_timeout); + res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict); if (res != NGX_OK) { write_dbg(DBG_LEVEL_TRACE, "Failed to send bulk of %iu headers", num_headers); return NGX_ERROR; @@ -1102,6 +1109,7 @@ ngx_http_cp_body_sender( ngx_int_t res = NGX_ERROR; uint8_t is_last_chunk; uint8_t part_count; + ngx_int_t tout_retries = min_retries_for_verdict; char *fragments[num_body_chunk_fragments]; uint16_t fragments_sizes[num_body_chunk_fragments]; int was_waiting = 0; @@ -1130,13 +1138,17 @@ ngx_http_cp_body_sender( set_fragment_elem(fragments, fragments_sizes, buf->pos, buf->last - buf->pos, 4); if (body_type == REQUEST_BODY) { - session_data->processed_req_body_size = (buf->last - buf->pos); + session_data->processed_req_body_size += (buf->last - buf->pos); + if (session_data->processed_req_body_size > body_size_trigger) + tout_retries = max_retries_for_verdict; } else if (body_type == RESPONSE_BODY) { - session_data->processed_res_body_size = (buf->last - buf->pos); + session_data->processed_res_body_size += (buf->last - buf->pos); + if (session_data->processed_res_body_size > body_size_trigger) + tout_retries = max_retries_for_verdict; } // Sending the data to the nano service. res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, num_body_chunk_fragments, session_data->session_id, - &was_waiting, fail_open_timeout); + &was_waiting, fail_open_timeout, tout_retries); if (res != NGX_OK) { // Failed to send the fragments to the nano service. @@ -1179,7 +1191,7 @@ ngx_http_cp_metric_data_sender() fragments = (char *)&data_to_send; fragments_sizes = sizeof(ngx_http_cp_metric_data_t); - res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL, fail_open_timeout); + res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL, fail_open_timeout, min_retries_for_verdict); reset_metric_data(); return res; } diff --git a/attachments/nginx/ngx_module/ngx_cp_io.h b/attachments/nginx/ngx_module/ngx_cp_io.h index d662bfe..6a0ef17 100644 --- a/attachments/nginx/ngx_module/ngx_cp_io.h +++ b/attachments/nginx/ngx_module/ngx_cp_io.h @@ -48,6 +48,7 @@ extern int comm_socket; ///< Communication socket. /// @param[in, out] request NGINX request. /// @param[in] modification_list /// @param[in] chunk_type Chunk type that the attachment is waiting for a response from nano service. +/// @param[in] processed_body_size Processed body size to determinate number of retries from nano service. /// @returns ngx_int_t /// - #NGX_OK /// - #NGX_HTTP_FORBIDDEN @@ -60,7 +61,8 @@ ngx_http_cp_reply_receiver( uint32_t cur_session_id, ngx_http_request_t *request, ngx_http_cp_modification_list **modification_list, - ngx_http_chunk_type_e chunk_type + ngx_http_chunk_type_e chunk_type, + uint64_t processed_body_size ); /// diff --git a/attachments/nginx/ngx_module/ngx_cp_utils.c b/attachments/nginx/ngx_module/ngx_cp_utils.c index 8d5c6aa..7c22bc0 100644 --- a/attachments/nginx/ngx_module/ngx_cp_utils.c +++ b/attachments/nginx/ngx_module/ngx_cp_utils.c @@ -100,6 +100,9 @@ ngx_uint_t waiting_for_verdict_thread_timeout_msec = 150; ///< Wait thread proce ngx_http_inspection_mode_e inspection_mode = NON_BLOCKING_THREAD; ///< Default inspection mode. ngx_uint_t num_of_nginx_ipc_elements = 200; ///< Number of NGINX IPC elements. ngx_msec_t keep_alive_interval_msec = DEFAULT_KEEP_ALIVE_INTERVAL_MSEC; +ngx_uint_t min_retries_for_verdict = 3; ///< Minimum number of retries for verdict. +ngx_uint_t max_retries_for_verdict = 15; ///< Maximum number of retries for verdict. +ngx_uint_t body_size_trigger = 200000; ///< Request body size in bytes to switch to maximum retries for verdict. static struct timeval getCurrTimeFast() @@ -948,6 +951,9 @@ init_general_config(const char *conf_path) res_header_thread_timeout_msec = getResHeaderThreadTimeout(); res_body_thread_timeout_msec = getResBodyThreadTimeout(); waiting_for_verdict_thread_timeout_msec = getWaitingForVerdictThreadTimeout(); + min_retries_for_verdict = getMinRetriesForVerdict(); + max_retries_for_verdict = getMaxRetriesForVerdict(); + body_size_trigger = getReqBodySizeTrigger(); num_of_nginx_ipc_elements = getNumOfNginxIpcElements(); keep_alive_interval_msec = (ngx_msec_t) getKeepAliveIntervalMsec(); @@ -976,7 +982,10 @@ init_general_config(const char *conf_path) "wait thread timeout: %u msec, " "static resources path: %s, " "num of nginx ipc elements: %u, " - "keep alive interval msec: %u msec", + "keep alive interval msec: %u msec" + "min retries for verdict: %u" + "max retries for verdict: %u" + "body size trigger for request: %u", inspection_mode, new_dbg_level, (fail_mode_verdict == NGX_OK ? "fail-open" : "fail-close"), @@ -995,7 +1004,10 @@ init_general_config(const char *conf_path) waiting_for_verdict_thread_timeout_msec, getStaticResourcesPath(), num_of_nginx_ipc_elements, - keep_alive_interval_msec + keep_alive_interval_msec, + min_retries_for_verdict, + max_retries_for_verdict, + body_size_trigger ); diff --git a/attachments/nginx/ngx_module/ngx_cp_utils.h b/attachments/nginx/ngx_module/ngx_cp_utils.h index 83db947..ad79d84 100644 --- a/attachments/nginx/ngx_module/ngx_cp_utils.h +++ b/attachments/nginx/ngx_module/ngx_cp_utils.h @@ -62,6 +62,9 @@ extern ngx_uint_t res_body_thread_timeout_msec; extern ngx_uint_t waiting_for_verdict_thread_timeout_msec; extern ngx_http_inspection_mode_e inspection_mode; extern ngx_uint_t num_of_nginx_ipc_elements; +extern ngx_uint_t min_retries_for_verdict; +extern ngx_uint_t max_retries_for_verdict; +extern ngx_uint_t body_size_trigger; /// /// @struct ngx_http_cp_list_iterator diff --git a/core/attachments/http_configuration/http_configuration.cc b/core/attachments/http_configuration/http_configuration.cc index f88ebbb..896c01e 100644 --- a/core/attachments/http_configuration/http_configuration.cc +++ b/core/attachments/http_configuration/http_configuration.cc @@ -108,7 +108,10 @@ HttpAttachmentConfiguration::save(cereal::JSONOutputArchive &archive) const ), cereal::make_nvp("nginx_inspection_mode", getNumericalValue("inspection_mode")), cereal::make_nvp("num_of_nginx_ipc_elements", getNumericalValue("num_of_nginx_ipc_elements")), - cereal::make_nvp("keep_alive_interval_msec", getNumericalValue("keep_alive_interval_msec")) + cereal::make_nvp("keep_alive_interval_msec", getNumericalValue("keep_alive_interval_msec")), + cereal::make_nvp("min_retries_for_verdict", getNumericalValue("min_retries_for_verdict")), + cereal::make_nvp("max_retries_for_verdict", getNumericalValue("max_retries_for_verdict")), + cereal::make_nvp("body_size_trigger", getNumericalValue("body_size_trigger")) ); } @@ -161,6 +164,9 @@ HttpAttachmentConfiguration::load(cereal::JSONInputArchive &archive) loadNumericalValue(archive, "nginx_inspection_mode", 0); loadNumericalValue(archive, "num_of_nginx_ipc_elements", 200); loadNumericalValue(archive, "keep_alive_interval_msec", DEFAULT_KEEP_ALIVE_INTERVAL_MSEC); + loadNumericalValue(archive, "min_retries_for_verdict", 3); + loadNumericalValue(archive, "max_retries_for_verdict", 15); + loadNumericalValue(archive, "body_size_trigger", 200000); } bool diff --git a/core/include/attachments/nginx_attachment_util.h b/core/include/attachments/nginx_attachment_util.h index b3240f4..5f0c533 100644 --- a/core/include/attachments/nginx_attachment_util.h +++ b/core/include/attachments/nginx_attachment_util.h @@ -54,6 +54,9 @@ unsigned int getReqBodyThreadTimeout(); unsigned int getResProccessingTimeout(); unsigned int getResHeaderThreadTimeout(); unsigned int getResBodyThreadTimeout(); +unsigned int getMinRetriesForVerdict(); +unsigned int getMaxRetriesForVerdict(); +unsigned int getReqBodySizeTrigger(); unsigned int getWaitingForVerdictThreadTimeout();