From fcc3e9ad4095ba7f769e458d32a3dec13880e753 Mon Sep 17 00:00:00 2001 From: Ned Wright Date: Wed, 31 Jan 2024 17:52:50 +0000 Subject: [PATCH] Jan_31_2024-Dev --- .../nginx/ngx_module/ngx_cp_compression.c | 14 ++++- .../nginx/ngx_module/ngx_cp_hook_threads.c | 4 +- attachments/nginx/ngx_module/ngx_cp_hooks.c | 4 ++ .../nginx/ngx_module/ngx_cp_initializer.c | 5 +- attachments/nginx/ngx_module/ngx_cp_utils.c | 61 +++++++++++++++++++ attachments/nginx/ngx_module/ngx_cp_utils.h | 19 ++++++ .../ngx_http_cp_attachment_module.c | 2 +- core/compression/compression_utils.cc | 2 +- 8 files changed, 100 insertions(+), 11 deletions(-) diff --git a/attachments/nginx/ngx_module/ngx_cp_compression.c b/attachments/nginx/ngx_module/ngx_cp_compression.c index 35c4d68..39add8f 100644 --- a/attachments/nginx/ngx_module/ngx_cp_compression.c +++ b/attachments/nginx/ngx_module/ngx_cp_compression.c @@ -190,6 +190,8 @@ compression_data_filter( if (should_compress) { // Compressing data. + write_dbg(DBG_LEVEL_TRACE, "compressing data(%d), is last: %d", input->len, params->is_last_part); + compression_result = compressData( compression_stream, params->compression_type, @@ -258,7 +260,6 @@ compression_buffer_filter( ngx_str_t src_data; ngx_str_t dest_data; ngx_int_t compression_result; - write_dbg(DBG_LEVEL_TRACE, "Performing %s on buffer", should_compress ? "compression" : "decompression"); if (is_valid_compression_buffer(should_compress, src) != NGX_OK) { @@ -270,7 +271,7 @@ compression_buffer_filter( if (should_compress) { // Preparing data for compression. - params->is_last_part = src->last_buf; + params->is_last_part |= src->last_buf; if (params->is_last_part && src->pos == NULL) { src->start = (u_char *)""; @@ -336,6 +337,10 @@ compression_chain_filter( ngx_buf_t *output_buffer = ngx_calloc_buf(pool); ngx_chain_t *curr_input_link = NULL; ngx_chain_t *curr_original_contents_link = original_body_contents == NULL ? NULL : *original_body_contents; + ngx_cp_http_compression_params params_copy; + if (params != NULL) { + params_copy = *params; + } if (body == NULL) { // Null body parameter has been passed. @@ -351,6 +356,9 @@ compression_chain_filter( for (curr_input_link = *body; curr_input_link != NULL; curr_input_link = curr_input_link->next) { // Decompress or compresses buffer + if (params != NULL) { + params_copy.is_last_part = (params->is_last_part && curr_input_link->next == NULL); + } compression_result = compression_buffer_filter( should_compress, compression_stream, @@ -358,7 +366,7 @@ compression_chain_filter( output_buffer, curr_input_link->buf, pool, - params + params != NULL ? ¶ms_copy : NULL ); if (compression_result != NGX_OK) { // Failed to decompress or compress. diff --git a/attachments/nginx/ngx_module/ngx_cp_hook_threads.c b/attachments/nginx/ngx_module/ngx_cp_hook_threads.c index eacdfcb..3896c9a 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hook_threads.c +++ b/attachments/nginx/ngx_module/ngx_cp_hook_threads.c @@ -132,7 +132,7 @@ ngx_http_cp_registration_thread(void *_ctx) /// /// @brief Sends end request header to the attachment's service. -/// @details Communicates with the attachment service by sending end request header +/// @details Communicates with the attachment service by sending end request header /// to the attachment's service and returns verdict. /// @param[in, out] session_data_p If the function returns NGX_OK, session data will be modified. /// @param[in, out] request NGINX Request. @@ -383,7 +383,7 @@ ngx_http_cp_res_header_filter_thread(void *_ctx) } session_data_p->remaining_messages_to_reply += num_messages_sent; - + // Sets response body's content encoding. set_response_content_encoding_res = set_response_content_encoding( &session_data_p->response_data.original_compression_type, diff --git a/attachments/nginx/ngx_module/ngx_cp_hooks.c b/attachments/nginx/ngx_module/ngx_cp_hooks.c index 58a0aa5..2c3bdc1 100644 --- a/attachments/nginx/ngx_module/ngx_cp_hooks.c +++ b/attachments/nginx/ngx_module/ngx_cp_hooks.c @@ -934,6 +934,8 @@ ngx_http_cp_res_body_filter(ngx_http_request_t *request, ngx_chain_t *body_chain set_current_session_id(session_data_p->session_id); write_dbg(DBG_LEVEL_DEBUG, "Response body filter handling response ID: %d", session_data_p->session_id); + print_buffer_chain(body_chain, "incoming", 32, DBG_LEVEL_TRACE); + if (!isIpcReady()) { write_dbg( DBG_LEVEL_TRACE, @@ -1213,5 +1215,7 @@ ngx_http_cp_res_body_filter(ngx_http_request_t *request, ngx_chain_t *body_chain } } + print_buffer_chain(body_chain, "outgoing", 32, DBG_LEVEL_TRACE); + return ngx_http_next_response_body_filter(request, body_chain); } diff --git a/attachments/nginx/ngx_module/ngx_cp_initializer.c b/attachments/nginx/ngx_module/ngx_cp_initializer.c index 7764d4f..5a26959 100644 --- a/attachments/nginx/ngx_module/ngx_cp_initializer.c +++ b/attachments/nginx/ngx_module/ngx_cp_initializer.c @@ -557,8 +557,7 @@ ngx_cp_attachment_init_process(ngx_http_request_t *request) // Initalize the the communication channel with the service. // If we encounter repeated failures - we will restart the whole communication. - static const int max_ipc_init_retry_count = 10; - static int max_retry_count = max_ipc_init_retry_count; + int max_retry_count = 10; if (nano_service_ipc == NULL) { write_dbg(DBG_LEVEL_INFO, "Initializing IPC channel"); nano_service_ipc = initIpc( @@ -572,13 +571,11 @@ ngx_cp_attachment_init_process(ngx_http_request_t *request) if (nano_service_ipc == NULL) { if (max_retry_count-- == 0) { restart_communication(request); - max_retry_count = max_ipc_init_retry_count; } write_dbg(DBG_LEVEL_INFO, "Failed to initialize IPC with nano service"); return NGX_ERROR; } } - max_retry_count = max_ipc_init_retry_count; // Initialize internal resources. if (!is_static_resources_table_initialized()) { diff --git a/attachments/nginx/ngx_module/ngx_cp_utils.c b/attachments/nginx/ngx_module/ngx_cp_utils.c index 21ac8bf..d49f671 100644 --- a/attachments/nginx/ngx_module/ngx_cp_utils.c +++ b/attachments/nginx/ngx_module/ngx_cp_utils.c @@ -1123,3 +1123,64 @@ set_metric_memory_usage(void) updateMetricField(MAX_VM_MEMORY_USAGE, vm_size); updateMetricField(MAX_RSS_MEMORY_USAGE, rss_size); } + +void +print_buffer(ngx_buf_t *buf, int num_bytes, int _dbg_level) +{ + if (_dbg_level < dbg_level || !is_ctx_match) return; + + char fmt[64]; + unsigned char *pos = buf->pos; + unsigned char *last = buf->last; + int i = 0; + + if (num_bytes > 0) { + if (last - pos > num_bytes) { + last = pos + num_bytes; + } + } else if (num_bytes < 0) { + if (last - pos > -num_bytes) { + pos = last + num_bytes; + } + } + + for (; pos + 16 < last; pos += 16) + { + write_dbg( + _dbg_level, + "%02X %02X %02X %02X %02X %02X %02X %02X %02X %02X %02X %02X %02X %02X %02X %02X", + *pos, *(pos+1), *(pos+2), *(pos+3), *(pos+4), *(pos+5), *(pos+6), *(pos+7), + *(pos+8), *(pos+9), *(pos+10), *(pos+11), *(pos+12), *(pos+13), *(pos+14), *(pos+15) + ); + } + + for(; pos < last; pos++){ + sprintf(fmt+i*3, "%02X ", *pos); + i++; + } + if (i > 0) { + fmt[i*3 - 1] = 0; + write_dbg( + _dbg_level, + "%s", + fmt + ); + } +} + +void +print_buffer_chain(ngx_chain_t *chain, char *msg, int num_bytes, int _dbg_level) +{ + if (_dbg_level < dbg_level || !is_ctx_match) return; + + for (ngx_chain_t *chain_elem = chain; chain_elem != NULL; chain_elem = chain_elem->next) { + write_dbg( + DBG_LEVEL_WARNING, + "%s chain elem: size: %d, is last buf: %d", + msg, + chain_elem->buf->last - chain_elem->buf->pos, + chain_elem->buf->last_buf + ); + print_buffer(chain_elem->buf, num_bytes, _dbg_level); + } +} diff --git a/attachments/nginx/ngx_module/ngx_cp_utils.h b/attachments/nginx/ngx_module/ngx_cp_utils.h index 46b9452..83db947 100644 --- a/attachments/nginx/ngx_module/ngx_cp_utils.h +++ b/attachments/nginx/ngx_module/ngx_cp_utils.h @@ -436,4 +436,23 @@ void set_metric_cpu_usage(void); /// void set_metric_memory_usage(void); +/// +/// @brief prints to debug the buffer in hex with optional limit +/// @param[in] buf nginx buffer to print +/// @param[in] num_bytes number of bytes to print in the buffer. +/// num_bytes > 0 : prints prefix, num_bytes < 0 : prints suffix, num_bytes = 0 : prints all +/// @param[in] _dbg_level debug level +/// +void print_buffer(ngx_buf_t *buf, int num_bytes, int _dbg_level); + +/// +/// @brief prints to debug the buffer chain in hex with optional limit +/// @param[in] chain nginx buffers chain to print +/// @param[in] num_bytes number of bytes to print of each buffer in the chain. +/// num_bytes > 0 : prints prefix, num_bytes < 0 : prints suffix, num_bytes = 0 : prints all +/// @param[in] _dbg_level debug level +/// +void print_buffer_chain(ngx_chain_t *chain, char *msg, int num_bytes, int _dbg_level); + + #endif // __NGX_CP_UTILS_H__ diff --git a/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c b/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c index ac61ff7..f4f6284 100644 --- a/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c +++ b/attachments/nginx/ngx_module/ngx_http_cp_attachment_module.c @@ -34,7 +34,7 @@ typedef struct { /// /// @brief Creates NGINX cp attachment configuration. /// @param[in, out] conf NGINX configuration. -/// @return +/// @return /// - #ngx_cp_attachment_conf_t if successed to create conf. /// - #NULL if failed to create conf. /// diff --git a/core/compression/compression_utils.cc b/core/compression/compression_utils.cc index 2b79d45..0aaa656 100644 --- a/core/compression/compression_utils.cc +++ b/core/compression/compression_utils.cc @@ -167,7 +167,7 @@ struct CompressionStream basic_string res; int retries = 0; - while (stream.avail_in != 0) { + while (stream.avail_in != 0 || is_last_chunk) { stream.avail_out = work_space.capacity(); stream.next_out = work_space.data();