diff --git a/attachments/nginx/ngx_module/ngx_cp_hooks.c b/attachments/nginx/ngx_module/ngx_cp_hooks.c index 2c3bdc1..f7166ef 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hooks.c +++ b/attachments/nginx/ngx_module/ngx_cp_hooks.c @@ -320,79 +320,19 @@ calcProcessingTime(ngx_http_cp_session_data *session_data_p, struct timespec *ho } } -/// -/// @brief Calculates the size of a request. -/// @details Calculates the size of a given request according to headers -/// and body parts lengths. -/// @param[in] request NGINX request. -/// @return the calculated size of the request. -/// -static uint64_t -calc_request_size(ngx_http_request_t *request) +ngx_int_t +ngx_http_cp_request_and_response_size_handler(ngx_http_request_t *request) { - uint64_t request_size = 0; - ngx_list_part_t *part; - ngx_table_elt_t *header; - static const uint64_t max_expected_request_size = 100ULL * 1024 * 1024; + write_dbg( + DBG_LEVEL_TRACE, + "Updateing metrics with request size %ld and response size %ld", + request->request_length, + request->connection->sent + ); - // Calculate the size of request headers - for (part = &request->headers_in.headers.part; part != NULL; part = part->next) { - header = part->elts; - for (ngx_uint_t i = 0; i < part->nelts; i++) { - request_size += header[i].key.len + header[i].value.len + 2; // 2 bytes for CRLF - } - } - request_size += 2; - - // Calculate the size of the request body - if (request->request_body && request->request_body->buf) { - request_size += ngx_buf_size(request->request_body->buf); - } - write_dbg(DBG_LEVEL_TRACE, "Request size %d", request_size); - if (request_size > max_expected_request_size) { - write_dbg(DBG_LEVEL_WARNING, "Request size is higher than expected: %d", request_size); - } - return request_size; -} - -/// -/// @brief Calculates the size of a response. -/// @details Calculates the size of a response according to Content-Length -/// header if available or according to header and body parts lengths if it -/// is not available. -/// @param[in] request NGINX request. -/// @return the calculated size of the response. -/// -static uint64_t -calc_response_size(ngx_http_request_t *request) -{ - uint64_t response_size = 0; - ngx_list_part_t *part; - ngx_table_elt_t *header; - - // Calculate the size of response headers - for (part = &request->headers_out.headers.part; part != NULL; part = part->next) { - header = part->elts; - for (ngx_uint_t i = 0; i < part->nelts; i++) { - response_size += header[i].key.len + header[i].value.len + 2; // 2 bytes for CRLF - } - } - response_size += 2; - - // Calculate the size of the request body - if (request->headers_out.content_length_n != -1) { - // If Content-Length header is set, use it - response_size += request->headers_out.content_length_n; - } else { - // Otherwise, iterate through response buffers and add their sizes - ngx_chain_t *chain = request->out; - for (chain = request->out; chain != NULL ; chain = chain->next) { - if (chain->buf) response_size += ngx_buf_size(chain->buf); - } - } - - write_dbg(DBG_LEVEL_TRACE, "Response size %d", response_size); - return response_size; + updateMetricField(REQUEST_OVERALL_SIZE_COUNT, request->request_length); + updateMetricField(RESPONSE_OVERALL_SIZE_COUNT, request->connection->sent); + return NGX_DECLINED; } ngx_int_t @@ -423,7 +363,6 @@ ngx_http_cp_req_header_handler(ngx_http_request_t *request) reset_dbg_ctx(); write_dbg(DBG_LEVEL_DEBUG, "Request headers received"); - updateMetricField(REQUEST_OVERALL_SIZE_COUNT, calc_request_size(request)); if (is_in_transparent_mode()) { updateMetricField(TRANSPARENTS_COUNT, 1); return fail_mode_verdict; @@ -800,8 +739,6 @@ ngx_http_cp_res_header_filter(ngx_http_request_t *request) write_dbg(DBG_LEVEL_DEBUG, "Response header filter handling session ID: %d", session_data_p->session_id); - updateMetricField(RESPONSE_OVERALL_SIZE_COUNT, calc_response_size(request)); - if (!isIpcReady()) { write_dbg( DBG_LEVEL_TRACE, diff --git a/attachments/nginx/ngx_module/ngx_cp_hooks.h b/attachments/nginx/ngx_module/ngx_cp_hooks.h index a8c5c00..9d172b4 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hooks.h +++ b/attachments/nginx/ngx_module/ngx_cp_hooks.h @@ -127,4 +127,13 @@ ngx_int_t was_transaction_timedout(ngx_http_cp_session_data *ctx); /// ngx_http_cp_verdict_e enforce_sessions_rate(); + +/// +/// @Updates request and response sizes metrics. +/// @param[in, out] request NGINX request. +/// @returns ngx_int_t +/// - #NGX_DECLINED +/// +ngx_int_t ngx_http_cp_request_and_response_size_handler(ngx_http_request_t *request); + #endif // __NGX_CP_HOOKS_H__ diff --git a/attachments/nginx/ngx_module/ngx_cp_initializer.c b/attachments/nginx/ngx_module/ngx_cp_initializer.c index 5a26959..8f17864 100644 --- a/attachments/nginx/ngx_module/ngx_cp_initializer.c +++ b/attachments/nginx/ngx_module/ngx_cp_initializer.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -34,22 +35,116 @@ #include "ngx_http_cp_attachment_module.h" typedef enum ngx_cp_attachment_registration_state { - SET_UNIQUE_ID, - RESGISTER_TO_NODE, - LOAD_CONFIG, - LOAD_IPC, - DONE -} ngx_cp_attachment_registration_state_e; ///< Indicates the current initialization stage. + NOT_REGISTERED, + PENDING, + REGISTERED +} ngx_cp_attachment_registration_state_e; ///< Indicates the current attachment registation stage. char unique_id[MAX_NGINX_UID_LEN] = ""; // Holds the unique identifier for this instance. char shared_verdict_signal_path[128]; // Holds the path associating the attachment and service. int registration_socket = -1; // Holds the file descriptor used for registering the instance. +static pthread_t registration_thread; + +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +static ngx_cp_attachment_registration_state_e need_registration = NOT_REGISTERED; + struct sockaddr_un server; uint32_t nginx_user_id, nginx_group_id; // Hold the process UID and GID respectively. +/// +/// @brief Set shared registration value to the provided state. +/// @param[in] ngx_cp_attachment_registration_state_e Provided state that needs to be set. +/// +static void +set_need_registration(ngx_cp_attachment_registration_state_e state) +{ + pthread_mutex_lock(&mutex); + need_registration = state; + pthread_mutex_unlock(&mutex); +} + +/// +/// @brief Get shared registration value to the provided state. +/// @returns ngx_cp_attachment_registration_state_e +/// - #NOT_REGISTERED, +/// - #PENDING, +/// - #REGISTERED +/// +static ngx_cp_attachment_registration_state_e +get_need_registration() +{ + ngx_cp_attachment_registration_state_e state; + + pthread_mutex_lock(&mutex); + state = need_registration; + pthread_mutex_unlock(&mutex); + + return state; +} + +void * +register_workers() { + int num_of_workers = get_saved_num_of_workers(); + + if (num_of_workers == 0) { + write_dbg(DBG_LEVEL_INFO, "Number of workers is 0, ignore registration"); + set_need_registration(NOT_REGISTERED); + return NULL; + } + + write_dbg( + DBG_LEVEL_INFO, + "Initiating registration of %d workers to the attachment", + num_of_workers + ); + + while (register_to_attachments_manager(num_of_workers) != NGX_OK) { + write_dbg( + DBG_LEVEL_INFO, + "unable to register %d workers to the attachment, will try again in 5 seconds", + num_of_workers + ); + if (get_need_registration() != PENDING) { + write_dbg(DBG_LEVEL_INFO, "Drop registration attempt, registration is not needed anymore"); + return NULL; + } + sleep(5); + } + set_need_registration(REGISTERED); + return NULL; +} + +void +init_attachment_registration_thread() +{ + pthread_mutex_lock(&mutex); + if (need_registration != NOT_REGISTERED) { + pthread_mutex_unlock(&mutex); + return; + } + need_registration = PENDING; + pthread_mutex_unlock(&mutex); + + int result = pthread_create(®istration_thread, NULL, register_workers, NULL); + if (result != 0) { + write_dbg(DBG_LEVEL_INFO, "Failed to create thread"); + } +} + +void +reset_attachment_registration() +{ + int result = pthread_cancel(registration_thread); + if (result != 0) { + write_dbg(DBG_LEVEL_INFO, "Failed to cancel thread %d", result); + } + set_need_registration(NOT_REGISTERED); +} + int exchange_communication_data_with_service( int socket, @@ -114,7 +209,7 @@ exchange_communication_data_with_service( /// @returns ngx_int_t /// - #NGX_OK /// - #NGX_ERROR -/// +/// static ngx_int_t init_signaling_socket() { @@ -257,28 +352,21 @@ get_docker_id(char **_docker_id) free(line); fclose(file); - // Return the answer and set the indication so we won't have to + // Return the answer and set the indication so we won't have to *_docker_id = docker_id; already_evaluated = 1; return NGX_OK; } -/// -/// @brief Register the attachment instance with the attachment manager to associate it with a service. -/// @param[in] request Points to an HTTP request, needed to get the number of workers. -/// @returns ngx_int_t -/// - #NGX_OK -/// - #NGX_ERROR -/// -static ngx_int_t -register_to_attachments_manager(ngx_http_request_t *request) +ngx_int_t +register_to_attachments_manager(ngx_int_t num_of_workers) { uint8_t path_length; int res = 0; uint8_t family_name_size = strlen(unique_id); uint8_t attachment_type = NGINX_ATT_ID; uint8_t worker_id = ngx_worker + 1; - uint8_t workers_amount = get_num_of_workers(request); + uint8_t workers_amount = num_of_workers; char *family_name = NULL; int cur_errno = 0; // temp fix for errno changing during print struct timeval timeout = get_timeout_val_sec(1); @@ -509,7 +597,6 @@ ngx_cp_attachment_init_process(ngx_http_request_t *request) ngx_pool_t *memory_pool; nginx_user_id = getuid(); nginx_group_id = getgid(); - static int need_registration = 1; num_of_connection_attempts++; // Best-effort attempt to read the configuration before we start. @@ -533,19 +620,28 @@ ngx_cp_attachment_init_process(ngx_http_request_t *request) } } - if (need_registration) { - if (register_to_attachments_manager(request) == NGX_ERROR) { + if (get_need_registration() == PENDING) { + write_dbg(DBG_LEVEL_INFO, "Registration to the Attachments Manager is in process"); + return NGX_ERROR; + } + + if (get_need_registration() == NOT_REGISTERED) { + if (register_to_attachments_manager(get_num_of_workers(request)) == NGX_ERROR) { write_dbg(DBG_LEVEL_INFO, "Failed to register to Attachments Manager service"); return NGX_ERROR; } - need_registration = 0; + set_need_registration(REGISTERED); } if (comm_socket < 0) { write_dbg(DBG_LEVEL_DEBUG, "Registering to nano service"); if (init_signaling_socket() == NGX_ERROR) { write_dbg(DBG_LEVEL_DEBUG, "Failed to register to the Nano Service"); - need_registration = 1; + pthread_mutex_lock(&mutex); + if (need_registration != PENDING) { + need_registration = NOT_REGISTERED; + } + pthread_mutex_unlock(&mutex); return NGX_ERROR; } } @@ -615,7 +711,7 @@ restart_communication(ngx_http_request_t *request) } if (init_signaling_socket() == NGX_ERROR) { - if (register_to_attachments_manager(request) == NGX_ERROR) { + if (register_to_attachments_manager(get_num_of_workers(request)) == NGX_ERROR) { write_dbg(DBG_LEVEL_DEBUG, "Failed to register to Attachments Manager service"); return -1; } @@ -638,6 +734,9 @@ disconnect_communication() destroyIpc(nano_service_ipc, 0); nano_service_ipc = NULL; } + + set_need_registration(NOT_REGISTERED); + init_attachment_registration_thread(); } ngx_int_t diff --git a/attachments/nginx/ngx_module/ngx_cp_initializer.h b/attachments/nginx/ngx_module/ngx_cp_initializer.h index ed30a7e..207d309 100644 --- a/attachments/nginx/ngx_module/ngx_cp_initializer.h +++ b/attachments/nginx/ngx_module/ngx_cp_initializer.h @@ -23,7 +23,7 @@ typedef enum ngx_cp_comm_direction { READ_FROM_SOCKET, WRITE_TO_SOCKET -} ngx_cp_comm_direction_e; ///< Indicate whether communication exchange is to read or to write from a socket. +} ngx_cp_comm_direction_e; ///< Indicate whether communication exchange is to read or to write from a socket. /// /// @brief Initialize all the attachments resources and communication channels. @@ -43,7 +43,7 @@ ngx_int_t ngx_cp_attachment_init_process(ngx_http_request_t *); /// - #READ_FROM_SOCKET /// - #WRITE_TO_SOCKET /// @param[in] remaining_timeout Points to the maximal time point the function is allowed to reach. -/// @return int - positive if successful, other values indicate an error. +/// @return int - positive if successful, other values indicate an error. /// int exchange_communication_data_with_service( int socket, @@ -93,4 +93,20 @@ void disconnect_communication(); /// ngx_int_t isIpcReady(); +void init_attachment_registration_thread(); + +/// +/// @brief Cancels registration thread and reset attachment registration status. +/// +void reset_attachment_registration(); + +/// +/// @brief Register the attachment instance with the attachment manager to associate it with a service. +/// @param[in] num_of_workers The number of workers that is needed to be registered. +/// @returns ngx_int_t +/// - #NGX_OK +/// - #NGX_ERROR +/// +ngx_int_t register_to_attachments_manager(ngx_int_t num_of_workers); + #endif // __NGX_CP_INITIALIZER_H__ diff --git a/attachments/nginx/ngx_module/ngx_cp_io.c b/attachments/nginx/ngx_module/ngx_cp_io.c index 6376ffc..76a9909 100644 --- a/attachments/nginx/ngx_module/ngx_cp_io.c +++ b/attachments/nginx/ngx_module/ngx_cp_io.c @@ -647,11 +647,11 @@ convert_sock_addr_to_string(const struct sockaddr *sa, char *ip_addr) void *ip = NULL; if (sa->sa_family == AF_INET) { ip = (void *) &(((struct sockaddr_in*)sa)->sin_addr); + inet_ntop(AF_INET, ip, ip_addr, INET6_ADDRSTRLEN); } else { ip = (void *)&(((struct sockaddr_in6*)sa)->sin6_addr); + inet_ntop(AF_INET6, ip, ip_addr, INET6_ADDRSTRLEN); } - - inet_ntop(AF_INET, ip, ip_addr, INET6_ADDRSTRLEN); } ngx_int_t diff --git a/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c b/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c index f4f6284..997597c 100644 --- a/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c +++ b/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c @@ -17,6 +17,7 @@ #include #include #include +#include #include "ngx_cp_hooks.h" #include "ngx_cp_utils.h" @@ -213,6 +214,11 @@ ngx_cp_attachment_create_conf(ngx_conf_t *conf) return module_conf; } +ngx_uint_t +get_saved_num_of_workers() +{ + return workers_amount_to_send; +} ngx_uint_t get_num_of_workers(ngx_http_request_t *request) @@ -440,6 +446,8 @@ ngx_cp_attachment_init_worker(ngx_cycle_t *cycle) keep_alive_interval_msec, timer_interval_msec ); + + init_attachment_registration_thread(); } return NGX_OK; } @@ -452,6 +460,8 @@ ngx_cp_attachment_fini_worker(ngx_cycle_t *cycle) // only worker number 0 (always exists since it is worker number 1 is allowed to create // the single instance of the timer and destroy it) if (ngx_worker != 0) return; + + reset_attachment_registration(); (void)cycle; if (is_timer_active) ngx_del_timer(&ngx_keep_alive_event); @@ -463,6 +473,7 @@ static ngx_int_t ngx_cp_attachment_init(ngx_conf_t *conf) { ngx_http_handler_pt *handler; + ngx_http_handler_pt *size_metrics_handler; ngx_http_core_main_conf_t *http_core_main_conf; write_dbg(DBG_LEVEL_TRACE, "Setting the memory pool used in the current context"); if (conf->pool == NULL) { @@ -497,6 +508,14 @@ ngx_cp_attachment_init(ngx_conf_t *conf) ngx_http_next_response_body_filter = ngx_http_top_body_filter; ngx_http_top_body_filter = ngx_http_cp_res_body_filter; + size_metrics_handler = ngx_array_push(&http_core_main_conf->phases[NGX_HTTP_LOG_PHASE].handlers); + if (size_metrics_handler == NULL) { + write_dbg(DBG_LEVEL_WARNING, "Failed to set sizes calculation handler"); + return NGX_ERROR; + } + + *size_metrics_handler = ngx_http_cp_request_and_response_size_handler; + write_dbg(DBG_LEVEL_TRACE, "Successfully set attachment module's hooks"); return NGX_OK; diff --git a/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.h b/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.h index f44efd6..9e85b9e 100644 --- a/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.h +++ b/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.h @@ -36,6 +36,12 @@ extern ngx_module_t ngx_http_cp_attachment_module; ///< NGINX Module. /// ngx_int_t is_ngx_cp_attachment_disabled(ngx_http_request_t *request); +/// +/// @brief Returns the saved number of workers value. +/// @returns ngx_uint_t returns number of workers. +/// +ngx_uint_t get_saved_num_of_workers(); + /// /// @brief Get the number of workers. /// @param[in] request NGINX request.