Jan 06 2026 dev (#56)

* sync code

* sync code

* sync code

* sync code

* sync code

* sync code

---------

Co-authored-by: Daniel Eisenberg <danielei@checkpoint.com>
Co-authored-by: Ned Wright <nedwright@proton.me>
This commit is contained in:
Daniel-Eisenberg
2026-01-13 17:17:18 +02:00
committed by GitHub
parent b799acf8ff
commit 5dfa150635
91 changed files with 7906 additions and 804 deletions

View File

@@ -0,0 +1,148 @@
#include "ngx_cp_async_body.h"
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
#include <errno.h>
#include <stddef.h>
#include "ngx_cp_async_core.h"
#include "ngx_cp_async_ctx_validation.h"
#include "ngx_cp_async_sender.h"
#include "../ngx_cp_hooks.h"
#include "../ngx_cp_initializer.h"
#include "../ngx_cp_utils.h"
ngx_int_t
ngx_http_cp_req_body_filter_async(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_http_cp_session_data *sd = recover_cp_session_data(r);
ngx_http_cp_async_ctx_t *ctx;
write_dbg(DBG_LEVEL_DEBUG, "=== ASYNC REQUEST BODY FILTER START ===");
print_buffer_chain(in, "outgoing", 32, DBG_LEVEL_TRACE);
if (!sd->initial_async_mode || (sd->initial_async_mode && !is_async_mode_enabled)) {
write_dbg(DBG_LEVEL_WARNING, "Async mode not initialized or changed - passing through");
return ngx_http_next_request_body_filter(r, in);
}
if (!isIpcReady() || !sd->async_processing_needed) {
write_dbg(DBG_LEVEL_DEBUG, "No async processing needed - passing through");
return ngx_http_next_request_body_filter(r, in);
}
ctx = ngx_cp_async_find_ctx(sd->session_id);
if (!ctx) {
write_dbg(DBG_LEVEL_DEBUG, "No async ctx; pass-through session %d", sd->session_id);
return ngx_http_next_request_body_filter(r, in);
}
if (sd->verdict != TRAFFIC_VERDICT_INSPECT) {
write_dbg(DBG_LEVEL_DEBUG, "Request already inspected; applying verdict for session %d", sd->session_id);
SAFE_DESTROY_CTX(ctx);
return sd->verdict == TRAFFIC_VERDICT_ACCEPT ? ngx_http_next_request_body_filter(r, in) : NGX_HTTP_FORBIDDEN;
}
if (ngx_cp_async_ctx_get_flow_error_safe(ctx)) {
write_dbg(DBG_LEVEL_DEBUG, "Flow error detected for session %d", ngx_cp_async_ctx_get_session_id_safe(ctx));
SAFE_DESTROY_CTX(ctx);
return ngx_http_next_request_body_filter(r, in);
}
write_dbg(DBG_LEVEL_DEBUG, "Found async context for session %d - processing body", sd->session_id);
if (!ctx->body_phase_started){
r->request_body->filter_need_buffering = 1;
r->request_body_no_buffering = 0;
ctx->body_phase_started = 1;
}
if (ngx_cp_async_ctx_get_released_safe(ctx) && ngx_cp_async_ctx_get_queue_head_safe(ctx)) {
write_dbg(DBG_LEVEL_DEBUG, "Agent released; forwarding queued body for session %d", ngx_cp_async_ctx_get_session_id_safe(ctx));
ngx_int_t rc = ngx_http_next_request_body_filter(r, ctx->queue_head);
queue_free(r, ctx);
ctx->queue_head = ctx->queue_tail = NULL;
SAFE_DESTROY_CTX(ctx);
return rc;
}
if (!ctx->req_seq && ctx->queue_head && !in) {
write_dbg(DBG_LEVEL_DEBUG, "Forwarding queued body (no new data) for session %d", ctx->session_id);
ngx_int_t rc = ngx_http_next_request_body_filter(r, ctx->queue_head);
queue_free(r, ctx);
ctx->queue_head = ctx->queue_tail = NULL;
return rc;
}
if (in) {
write_dbg(DBG_LEVEL_DEBUG, "New body chunk received for session %d", ctx->session_id);
if (chain_add_copy(r, ctx, in) != NGX_OK) {
write_dbg(DBG_LEVEL_ERROR, "Queue copy failed; session %d", ctx->session_id);
ctx->session_data->async_processing_needed = 0;
queue_free(r, ctx);
ctx->queue_head = ctx->queue_tail = NULL;
SAFE_DESTROY_CTX(ctx);
return NGX_ERROR;
}
for (ngx_chain_t *cl = in; cl; cl = cl->next) {
ngx_buf_t *b = cl->buf;
if (b == NULL) continue;
ngx_uint_t nmsgs = 0;
ngx_int_t rc = NGX_OK;
if (ngx_cp_async_send_single_body_chunk_nonblocking(ctx, cl, &nmsgs) != NGX_OK) {
write_dbg(DBG_LEVEL_DEBUG, "IPC send failed; fail-safe pass-through session %d", ctx->session_id);
if (ctx->queue_head) {
rc = ngx_http_next_request_body_filter(r, ctx->queue_head);
ctx->session_data->async_processing_needed = 0;
queue_free(r, ctx);
ctx->queue_head = ctx->queue_tail = NULL;
}
SAFE_DESTROY_CTX(ctx);
return rc;
}
ctx->req_seq += nmsgs;
sd->remaining_messages_to_reply += nmsgs;
if (b->last_buf) {
if (ngx_cp_async_send_end_transaction_nonblocking(ctx, &nmsgs) != NGX_OK) {
write_dbg(DBG_LEVEL_DEBUG, "IPC send failed; fail-safe pass-through session %d", ctx->session_id);
if (ctx->queue_head) {
rc = ngx_http_next_request_body_filter(r, ctx->queue_head);
queue_free(r, ctx);
ctx->queue_head = ctx->queue_tail = NULL;
}
SAFE_DESTROY_CTX(ctx);
return rc;
}
ctx->req_seen_last = 1;
ctx->req_seq += nmsgs;
sd->remaining_messages_to_reply += nmsgs;
write_dbg(DBG_LEVEL_DEBUG, "Seen last body chunk for session %d", ctx->session_id);
}
}
write_dbg(DBG_LEVEL_DEBUG, "Queued body chunk and waiting for verdict; session %d", ctx->session_id);
if (ctx->req_seen_last == 1) {
write_dbg(DBG_LEVEL_DEBUG, "Last chunk sent; waiting for release; session %d", ctx->session_id);
ngx_cp_async_start_deadline_timer(ctx, ngx_max(req_max_proccessing_ms_time, async_body_stage_timeout));
ctx->waiting = 1;
if (!ctx->request_ref_incremented && r->http_version == NGX_HTTP_VERSION_20) {
r->main->count++;
ctx->request_ref_incremented = 1;
write_dbg(DBG_LEVEL_DEBUG, "Incremented request main reference count for HTTP/2 session %d", ctx->session_id);
}
return NGX_DONE;
}
return ngx_http_next_request_body_filter(r, NULL);
}
write_dbg(DBG_LEVEL_DEBUG, "No new input; pass-through session %d", ctx->session_id);
return ngx_http_next_request_body_filter(r, in);
}

View File

@@ -0,0 +1,30 @@
// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// @file ngx_cp_async_body.h
///
/// Body async filter processing for Check Point Nano Agent NGINX module.
///
#ifndef __NGX_CP_ASYNC_BODY_H__
#define __NGX_CP_ASYNC_BODY_H__
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
ngx_int_t ngx_http_cp_req_body_filter_async(ngx_http_request_t *r, ngx_chain_t *in);
#endif // __NGX_CP_ASYNC_BODY_H__

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,169 @@
#ifndef __NGX_CP_ASYNC_CORE_H__
#define __NGX_CP_ASYNC_CORE_H__
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
#include "../ngx_cp_hook_threads.h"
#include "nano_attachment_common.h"
extern ngx_module_t ngx_http_cp_attachment_module; ///< CP Attachment module
extern ngx_http_request_body_filter_pt ngx_http_next_request_body_filter; ///< NGINX request body filter.
extern ngx_uint_t async_backpressure_threshold;
extern ngx_msec_t async_header_timeout_ms; // Default 3s for headers/meta_data/end_transaction
extern ngx_msec_t async_body_stage_timeout; // Default 5s for body stage
extern ngx_msec_t async_wait_verdict_timeout_ms; // Default 50ms for wait verdict polling
extern ngx_msec_t async_first_wait_verdict_timeout_ms; // Default 10s for first wait verdict deadline timer
extern ngx_msec_t async_signal_timeout_ms; // Default 10ms for service signal timeout
extern ngx_msec_t async_context_cleanup_timeout_ms; // Default 5 minutes for context cleanup
/// @struct ngx_http_cp_async_ctx
/// @brief Simplified async context for handling non-blocking agent communication
typedef struct ngx_http_cp_async_ctx {
ngx_http_request_t *request; ///< Original request
ngx_http_cp_session_data *session_data; ///< Session data
uint32_t session_id; ///< Session ID for this context
ngx_int_t stage; ///< Current processing stage
ngx_event_t agent_event; ///< Event for agent communication
ngx_event_t deadline_event; ///< Deadline timeout event for current stage
ngx_event_t cleanup_event; ///< Context cleanup timeout event
ngx_event_t resume_event; ///< Event to resume request processing
struct timespec start_time; ///< Processing start time
ngx_str_t waf_tag; ///< WAF tag for this request
ngx_http_cp_modification_list *modifications; ///< Modifications data
unsigned waiting:1; ///< Flag to indicate waiting for verdict
unsigned body_phase_started:1; ///< Flag to indicate if body phase started
unsigned released:1; ///< Flag to indicate if request is released
unsigned req_seen_last:1; ///< Flag to indicate if last chunk seen
ngx_uint_t req_seq; ///< Request body chunk sequence number
ngx_chain_t *queue_head; ///< Saved chains to forward later
ngx_chain_t *queue_tail; ///< Tail of saved chains
ngx_uint_t meta_data_sent; ///< Flag to track if meta data was sent
ngx_uint_t headers_sent; ///< Flag to track if headers were sent
ngx_uint_t end_transaction_sent; ///< Flag to track if end transaction was sent
ngx_uint_t header_declined; ///< Flag to track if headers were declined
unsigned first_wait_verdict_encountered:1; ///< Flag to track if first wait verdict was encountered
struct ngx_http_cp_async_ctx *map_next; ///< Next context in hash bucket chain
unsigned flow_error:1; ///< Flag to indicate flow error/failure/abort occurred
unsigned request_ref_incremented:1; ///< Flag to track if request reference count was incremented
struct timespec request_start_time; ///< Current stage start time
} ngx_http_cp_async_ctx_t;
///
/// @brief Initialize the async connection management system
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t ngx_cp_async_init();
///
/// @brief Cleanup the async connection management system
///
void ngx_cp_async_cleanup();
///
/// @brief Create and initialize async context
/// @param[in] request NGINX request
/// @param[in] session_data Session data
/// @return Async context pointer or NULL on failure
///
ngx_http_cp_async_ctx_t *ngx_cp_async_create_ctx(ngx_http_request_t *request, ngx_http_cp_session_data *session_data);
///
/// @brief Destroy async context
/// @param[in] ctx Async context to destroy
///
void ngx_cp_async_destroy_ctx(ngx_http_cp_async_ctx_t *ctx);
///
/// @brief Find async context by session ID
/// @param[in] session_id Session ID to find
/// @return Async context pointer or NULL if not found
///
ngx_http_cp_async_ctx_t *ngx_cp_async_find_ctx(uint32_t session_id);
///
/// @brief Add async context to connection map
/// @param[in] ctx Async context to add
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t ngx_cp_async_add_ctx(ngx_http_cp_async_ctx_t *ctx);
///
/// @brief Remove async context from connection map
/// @param[in] ctx Async context to remove
///
void ngx_cp_async_remove_ctx(ngx_http_cp_async_ctx_t *ctx);
///
/// @brief Main async event handler
/// @param[in] ev Event that triggered the handler
///
void ngx_cp_async_event_handler(ngx_event_t *ev);
///
/// @brief Start async agent communication
/// @param[in] ctx Async context
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t ngx_cp_async_start_agent_communication(ngx_http_cp_async_ctx_t *ctx);
///
/// @brief Continue processing to next stage
/// @param[in] ctx Async context
/// @return NGX_OK, NGX_AGAIN, NGX_HTTP_FORBIDDEN, or NGX_ERROR
///
ngx_int_t ngx_cp_async_continue_processing(ngx_http_cp_async_ctx_t *ctx);
///
/// @brief Start deadline timer for current stage
/// @param[in] ctx Async context
/// @param[in] timeout_ms Timeout in milliseconds
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t ngx_cp_async_start_deadline_timer(ngx_http_cp_async_ctx_t *ctx, ngx_msec_t timeout_ms);
///
/// @brief Disable IPC verdict event handler and free connection
///
void disable_ipc_verdict_event_handler(void);
///
/// @brief Enable IPC verdict event handler and setup connection
///
void enable_ipc_verdict_event_handler(void);
///
/// @brief Setup IPC verdict event handler
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t ngx_cp_async_setup_verdict_event_handler(void);
///
/// @brief Add chain of buffers to async context queue
/// @param[in] request NGINX request
/// @param[in] ctx Async context
/// @param[in] in Chain of buffers to add
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t chain_add_copy(ngx_http_request_t *request, ngx_http_cp_async_ctx_t *ctx, ngx_chain_t *in);
///
/// @brief Free queued chains in async context
/// @param[in] r NGINX request
/// @param[in] ctx Async context
///
void queue_free(ngx_http_request_t *r, ngx_http_cp_async_ctx_t *ctx);
void ngx_cp_async_increment_pending_chunks(uint32_t session_id, const char *chunk_type);
void ngx_cp_async_decrement_pending_chunks(uint32_t session_id, const char *verdict_type);
///
/// @brief Post backpressure drain event if conditions are met
///
void ngx_cp_async_post_backpressure_drain_event(void);
#endif // __NGX_CP_ASYNC_CORE_H__

View File

@@ -0,0 +1,202 @@
#include "ngx_cp_async_ctx_validation.h"
#include "ngx_cp_async_core.h"
#include "../ngx_cp_utils.h"
///
/// @brief Check if a context pointer is valid and not destroyed
/// @param[in] ctx Context to validate
/// @return 1 if valid, 0 if invalid/destroyed
///
ngx_int_t
ngx_cp_async_ctx_is_valid(ngx_http_cp_async_ctx_t *ctx)
{
if (!ctx) {
return 0;
}
if (!ctx->session_id) {
write_dbg(DBG_LEVEL_WARNING, "Context validation failed: invalid session_id 0");
return 0;
}
if (!ctx->request) {
write_dbg(DBG_LEVEL_WARNING, "Context validation failed: NULL request for session %d", ctx->session_id);
return 0;
}
if (!ctx->session_data) {
write_dbg(DBG_LEVEL_WARNING, "Context validation failed: NULL session_data for session %d", ctx->session_id);
return 0;
}
return 1;
}
///
/// @brief Safely get session ID from context
/// @param[in] ctx Context to get session ID from
/// @return Session ID or 0 if invalid
///
uint32_t
ngx_cp_async_ctx_get_session_id_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return 0;
}
return ctx->session_id;
}
///
/// @brief Safely get request from context
/// @param[in] ctx Context to get request from
/// @return Request pointer or NULL if invalid
///
ngx_http_request_t *
ngx_cp_async_ctx_get_request_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return NULL;
}
return ctx->request;
}
///
/// @brief Safely get session data from context
/// @param[in] ctx Context to get session data from
/// @return Session data pointer or NULL if invalid
///
ngx_http_cp_session_data *
ngx_cp_async_ctx_get_session_data_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return NULL;
}
return ctx->session_data;
}
///
/// @brief Safely get stage from context
/// @param[in] ctx Context to get stage from
/// @return Stage or NGX_CP_ASYNC_STAGE_ERROR if invalid
///
ngx_cp_async_stage_t
ngx_cp_async_ctx_get_stage_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return NGX_CP_ASYNC_STAGE_ERROR;
}
return ctx->stage;
}
///
/// @brief Safely get flow error flag from context
/// @param[in] ctx Context to get flow error from
/// @return Flow error flag or 1 (error) if invalid
///
ngx_int_t
ngx_cp_async_ctx_get_flow_error_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return 1; // Assume error if context is invalid
}
return ctx->flow_error;
}
///
/// @brief Safely get header declined flag from context
/// @param[in] ctx Context to get header declined from
/// @return Header declined flag or 0 if invalid
///
ngx_int_t
ngx_cp_async_ctx_get_header_declined_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return 0;
}
return ctx->header_declined;
}
///
/// @brief Safely get request sequence from context
/// @param[in] ctx Context to get req_seq from
/// @return Request sequence or 0 if invalid
///
ngx_uint_t
ngx_cp_async_ctx_get_req_seq_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return 0;
}
return ctx->req_seq;
}
///
/// @brief Safely get waiting flag from context
/// @param[in] ctx Context to get waiting from
/// @return Waiting flag or 0 if invalid
///
ngx_int_t
ngx_cp_async_ctx_get_waiting_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return 0;
}
return ctx->waiting;
}
///
/// @brief Safely get released flag from context
/// @param[in] ctx Context to get released from
/// @return Released flag or 0 if invalid
///
ngx_int_t
ngx_cp_async_ctx_get_released_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return 0;
}
return ctx->released;
}
///
/// @brief Safely get queue head from context
/// @param[in] ctx Context to get queue_head from
/// @return Queue head or NULL if invalid
///
ngx_chain_t *
ngx_cp_async_ctx_get_queue_head_safe(ngx_http_cp_async_ctx_t *ctx)
{
if (!ngx_cp_async_ctx_is_valid(ctx)) {
return NULL;
}
return ctx->queue_head;
}
///
/// @brief Nullify all references to a context in event handlers
/// @param[in] ctx Context being destroyed
///
void
ngx_cp_async_nullify_ctx_refs(ngx_http_cp_async_ctx_t *ctx)
{
if (ctx == NULL) {
return;
}
// Clear event data pointers to prevent dangling references
if (ctx->agent_event.data == ctx) {
ctx->agent_event.data = NULL;
}
if (ctx->cleanup_event.data == ctx) {
ctx->cleanup_event.data = NULL;
}
if (ctx->resume_event.data == ctx) {
ctx->resume_event.data = NULL;
}
if (ctx->deadline_event.data == ctx) {
ctx->deadline_event.data = NULL;
}
write_dbg(DBG_LEVEL_DEBUG, "Nullified context references for session %d", ctx->session_id);
}

View File

@@ -0,0 +1,33 @@
#ifndef __NGX_CP_ASYNC_CTX_VALIDATION_H__
#define __NGX_CP_ASYNC_CTX_VALIDATION_H__
#include "ngx_cp_async_types.h"
// Context validation functions
ngx_int_t ngx_cp_async_ctx_is_valid(ngx_http_cp_async_ctx_t *ctx);
uint32_t ngx_cp_async_ctx_get_session_id_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_http_request_t *ngx_cp_async_ctx_get_request_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_http_cp_session_data *ngx_cp_async_ctx_get_session_data_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_cp_async_stage_t ngx_cp_async_ctx_get_stage_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_int_t ngx_cp_async_ctx_get_flow_error_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_int_t ngx_cp_async_ctx_get_header_declined_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_uint_t ngx_cp_async_ctx_get_req_seq_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_int_t ngx_cp_async_ctx_get_waiting_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_int_t ngx_cp_async_ctx_get_released_safe(ngx_http_cp_async_ctx_t *ctx);
ngx_chain_t *ngx_cp_async_ctx_get_queue_head_safe(ngx_http_cp_async_ctx_t *ctx);
// Context nullification function
void ngx_cp_async_nullify_ctx_refs(ngx_http_cp_async_ctx_t *ctx);
// Forward declaration for find function
ngx_http_cp_async_ctx_t *ngx_cp_async_find_ctx(uint32_t session_id);
// Macro for safe context destruction with null assignment
#define SAFE_DESTROY_CTX(ctx_ptr) do { \
if (ctx_ptr) { \
ngx_cp_async_destroy_ctx(ctx_ptr); \
ctx_ptr = NULL; \
} \
} while(0)
#endif // __NGX_CP_ASYNC_CTX_VALIDATION_H__

View File

@@ -0,0 +1,263 @@
#include "ngx_cp_async_headers.h"
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
#include <errno.h>
#include <stddef.h>
#include "ngx_cp_async_core.h"
#include "ngx_cp_async_ctx_validation.h"
#include "ngx_cp_async_sender.h"
#include "../ngx_cp_hooks.h"
#include "../ngx_cp_initializer.h"
#include "../ngx_http_cp_attachment_module.h"
#include "../ngx_cp_utils.h"
#include "../ngx_cp_failing_state.h"
#include "../ngx_cp_metric.h"
#include "../ngx_cp_thread.h"
#include "../ngx_cp_static_content.h"
extern ngx_int_t is_initialized;
ngx_int_t
ngx_http_cp_req_header_handler_async(ngx_http_request_t *request)
{
ngx_http_cp_session_data *session_data_p;
ngx_http_cp_async_ctx_t *ctx;
ngx_int_t handle_static_resource_result;
ServiceVerdict sessions_per_minute_verdict;
ngx_cp_attachment_conf_t *conf;
ngx_int_t final_res;
struct timespec hook_time_begin;
static int is_failure_state_initialized = 0;
static int is_metric_data_initialized = 0;
write_dbg(DBG_LEVEL_DEBUG, "=== ASYNC REQUEST HEADER HANDLER START ===");
clock_gettime(CLOCK_REALTIME, &hook_time_begin);
if (is_async_mode_enabled && !is_initialized) {
ngx_cp_async_init();
}
if (is_failure_state_initialized == 0) {
write_dbg(DBG_LEVEL_ERROR, "Initializing failure state (first time)");
reset_transparent_mode();
is_failure_state_initialized = 1;
}
if (is_metric_data_initialized == 0) {
write_dbg(DBG_LEVEL_ERROR, "Initializing metric data (first time)");
reset_metric_data();
is_metric_data_initialized = 1;
}
set_current_session_id(0);
reset_dbg_ctx();
if (is_in_transparent_mode()) {
write_dbg(DBG_LEVEL_DEBUG, "In transparent mode - updating metrics and returning");
updateMetricField(TRANSPARENTS_COUNT, 1);
return fail_mode_verdict == NGX_OK ? NGX_DECLINED : NGX_ERROR;
}
if (is_ngx_cp_attachment_disabled(request)) {
write_dbg(DBG_LEVEL_DEBUG, "Ignoring inspection of request on a disabled location");
return NGX_DECLINED;
}
conf = ngx_http_get_module_loc_conf(request, ngx_http_cp_attachment_module);
if (conf == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Failed to get module configuration");
return NGX_DECLINED;
}
session_data_p = ngx_http_get_module_ctx(request, ngx_http_cp_attachment_module);
if (session_data_p == NULL) {
write_dbg(DBG_LEVEL_DEBUG, "No existing session data - initializing new session");
session_data_p = init_cp_session_data(request);
if (session_data_p == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Failed to initialize session data");
return NGX_DECLINED;
}
}
set_current_session_id(session_data_p->session_id);
write_dbg(DBG_LEVEL_DEBUG, "Async request header filter handling session ID: %d", session_data_p->session_id);
session_data_p->initial_async_mode = 1;
if (!is_async_mode_enabled) {
write_dbg(DBG_LEVEL_WARNING, "Async mode is not enabled for request");
return NGX_DECLINED;
}
sessions_per_minute_verdict = enforce_sessions_rate();
if (sessions_per_minute_verdict != TRAFFIC_VERDICT_INSPECT) {
session_data_p->verdict = sessions_per_minute_verdict;
return sessions_per_minute_verdict == TRAFFIC_VERDICT_ACCEPT ? NGX_DECLINED : NGX_ERROR;
}
// Do immediate blocking registration (same as sync version)
if (!get_already_registered() || !isIpcReady()) {
struct ngx_http_cp_event_thread_ctx_t ctx;
int res;
init_thread_ctx(&ctx, request, session_data_p, NULL);
ctx.waf_tag = conf->waf_tag;
if (is_registration_timeout_reached()) {
write_dbg(DBG_LEVEL_DEBUG, "spawn ngx_http_cp_registration_thread");
reset_registration_timeout();
res = ngx_cp_run_in_thread_timeout(
ngx_http_cp_registration_thread,
(void *)&ctx,
ngx_max(registration_thread_timeout_msec, 200),
"ngx_http_cp_registration_thread"
);
} else {
res = 0;
write_dbg(DBG_LEVEL_DEBUG, "Attachment registration has recently started, wait for timeout");
}
if (!res) {
// failed to execute thread task, or it timed out
session_data_p->verdict = fail_mode_verdict == NGX_OK ? TRAFFIC_VERDICT_ACCEPT : TRAFFIC_VERDICT_DROP;
write_dbg(
DBG_LEVEL_DEBUG,
"registraton thread failed, returning default fail mode verdict. Session id: %d, verdict: %s",
session_data_p->session_id,
session_data_p->verdict == TRAFFIC_VERDICT_ACCEPT ? "accept" : "drop"
);
updateMetricField(REG_THREAD_TIMEOUT, 1);
return fail_mode_verdict == NGX_OK ? NGX_DECLINED : fail_mode_verdict;
}
write_dbg(
DBG_LEVEL_DEBUG,
"finished ngx_http_cp_registration_thread successfully. return=%d res=%d",
ctx.should_return,
ctx.res
);
if (ctx.should_return) {
session_data_p->verdict = TRAFFIC_VERDICT_ACCEPT;
return ctx.res == NGX_OK ? NGX_DECLINED : ctx.res;
}
if (ngx_cp_async_setup_verdict_event_handler() != NGX_OK) {
write_dbg(DBG_LEVEL_WARNING, "Failed to set up verdict event handler for session %d", session_data_p->session_id);
return fail_mode_verdict == NGX_OK ? NGX_DECLINED : fail_mode_verdict;
}
}
set_already_registered(1);
reset_registration_timeout_duration();
if (handle_shmem_corruption() == NGX_ERROR) {
session_data_p->verdict = fail_mode_verdict == NGX_OK ? TRAFFIC_VERDICT_ACCEPT : TRAFFIC_VERDICT_DROP;
write_dbg(
DBG_LEVEL_DEBUG,
"Shared memory is corrupted, returning default fail mode verdict. Session id: %d, verdict: %s",
session_data_p->session_id,
session_data_p->verdict == TRAFFIC_VERDICT_ACCEPT ? "accept" : "drop"
);
return fail_mode_verdict == NGX_OK ? NGX_DECLINED : fail_mode_verdict;
}
ctx = ngx_cp_async_find_ctx(session_data_p->session_id);
if (ctx != NULL) {
write_dbg(
DBG_LEVEL_DEBUG,
"Found existing async context for session %d - stage: %d, header_declined: %d",
session_data_p->session_id,
ctx->stage, ctx->header_declined
);
if (ctx->header_declined) {
write_dbg(
DBG_LEVEL_DEBUG,
"Header already declined for body processing - returning NGX_DECLINED again for session %d",
session_data_p->session_id
);
return NGX_DECLINED;
}
return ngx_cp_async_continue_processing(ctx);
}
if (
session_data_p->async_processing_needed == 0
&& (session_data_p->verdict != TRAFFIC_VERDICT_INSPECT || session_data_p->was_request_fully_inspected)
) {
write_dbg(DBG_LEVEL_DEBUG, "Async processing already completed for session %d - allowing to pass through", session_data_p->session_id);
SAFE_DESTROY_CTX(ctx);
return NGX_DECLINED;
}
handle_static_resource_result = handle_static_resource_request(
session_data_p->session_id,
&session_data_p->verdict,
request
);
if (handle_static_resource_result != NOT_A_STATIC_RESOURCE) {
write_dbg(DBG_LEVEL_DEBUG, "Static resource handled - result: %d", handle_static_resource_result);
return handle_static_resource_result;
}
ctx = ngx_cp_async_create_ctx(request, session_data_p);
if (ctx == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Failed to create async context - allowing request to continue");
return NGX_DECLINED;
}
ctx->waf_tag.data = conf->waf_tag.data;
ctx->waf_tag.len = conf->waf_tag.len;
ngx_cp_async_add_ctx(ctx);
final_res = ngx_cp_async_start_agent_communication(ctx);
session_data_p->async_processing_needed = 1;
write_dbg(
DBG_LEVEL_DEBUG,
"Async processing started with result: %d for session %d",
final_res,
session_data_p->session_id
);
if (final_res == NGX_AGAIN) {
write_dbg(
DBG_LEVEL_DEBUG,
"Async processing in progress - HOLDING REQUEST until verdict received for session %d",
session_data_p->session_id
);
ngx_cp_async_start_deadline_timer(ctx, ngx_max(req_header_thread_timeout_msec, async_header_timeout_ms));
ctx->waiting = 1;
if (!ctx->request_ref_incremented && ctx->request->http_version == NGX_HTTP_VERSION_20) {
ctx->request->main->count++;
ctx->request_ref_incremented = 1;
write_dbg(DBG_LEVEL_DEBUG, "Incremented request main reference count for HTTP/2 session %d", session_data_p->session_id);
}
return NGX_DONE;
} else if (final_res == NGX_DECLINED) {
write_dbg(
DBG_LEVEL_DEBUG,
"Async processing completed immediately - allowing request to continue for session %d",
session_data_p->session_id
);
return NGX_DECLINED;
} else {
write_dbg(
DBG_LEVEL_WARNING,
"Async processing failed - fail-open request for session %d",
session_data_p->session_id
);
return final_res;
}
write_dbg(DBG_LEVEL_DEBUG, "=== ASYNC REQUEST HEADER HANDLER END ===");
}

View File

@@ -0,0 +1,11 @@
#ifndef __NGX_CP_ASYNC_HEADERS_H__
#define __NGX_CP_ASYNC_HEADERS_H__
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
ngx_int_t ngx_http_cp_req_header_handler_async(ngx_http_request_t *request);
#endif // __NGX_CP_ASYNC_HEADERS_H__

View File

@@ -0,0 +1,605 @@
#include "ngx_cp_async_sender.h"
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
#include <errno.h>
#include <stddef.h>
#include <poll.h>
#include "ngx_cp_async_core.h"
#include "ngx_cp_async_ctx_validation.h"
#include "../ngx_cp_utils.h"
#include "../ngx_cp_io.h"
#include "../ngx_cp_initializer.h"
///
/// @brief Signals nano service about new session to inspect with timeout protection.
/// @param[in] cur_session_id Session's Id.
/// @param[in] ctx Async context for setting flow_error on timeout (can be NULL).
/// @param[in] timeout_ms Write timeout in milliseconds (default 200ms if 0).
/// @returns ngx_int_t
/// - #NGX_OK
/// - #NGX_ERROR
/// - #NGX_HTTP_REQUEST_TIME_OUT
///
static ngx_int_t
ngx_http_cp_signal_to_service_with_timeout(uint32_t cur_session_id, ngx_uint_t timeout_ms)
{
int res = 0;
size_t bytes_written = 0;
ngx_uint_t actual_timeout_ms = timeout_ms > 0 ? timeout_ms : 200; // Default 200ms
struct pollfd poll_fd;
int poll_result;
write_dbg(DBG_LEVEL_TRACE, "Sending signal to the service to notify about new session data to inspect (timeout: %dms)", actual_timeout_ms);
while (bytes_written < sizeof(cur_session_id)) {
res = write(comm_socket, ((char *)&cur_session_id) + bytes_written, sizeof(cur_session_id) - bytes_written);
if (res > 0) {
bytes_written += res;
continue;
}
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Socket would block - use poll to wait for write readiness with timeout
poll_fd.fd = comm_socket;
poll_fd.events = POLLOUT;
poll_fd.revents = 0;
poll_result = poll(&poll_fd, 1, actual_timeout_ms);
if (poll_result < 0) {
write_dbg(DBG_LEVEL_WARNING, "Poll failed for comm_socket write: %s", strerror(errno));
disconnect_communication();
return NGX_ERROR;
} else if (poll_result == 0) {
write_dbg(DBG_LEVEL_DEBUG, "Write timeout (%dms) reached during signal to nano service for session %d", actual_timeout_ms, cur_session_id);
return NGX_HTTP_REQUEST_TIME_OUT;
} else {
// Socket is ready for writing, continue the loop
continue;
}
} else {
// Fatal write error - disconnect and return error
write_dbg(DBG_LEVEL_WARNING, "Fatal write error on comm_socket: %s", strerror(errno));
disconnect_communication();
return NGX_ERROR;
}
} else {
// res == 0, which shouldn't happen for write() on a socket
write_dbg(DBG_LEVEL_WARNING, "Unexpected write() return value 0 on comm_socket");
disconnect_communication();
return NGX_ERROR;
}
}
write_dbg(DBG_LEVEL_DEBUG, "Successfully signaled nano service for session %d", cur_session_id);
return NGX_OK;
}
/// @brief Generic wait verdict handler for all wait stages
/// @param[in] ctx Async context
/// @param[in] stage_name Stage name for logging
/// @return NGX_OK, NGX_AGAIN, NGX_HTTP_FORBIDDEN, or NGX_ERROR
///
ngx_int_t
ngx_cp_async_wait_signal_sender(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent)
{
int err_code = 0;
ngx_int_t signal_res;
uint32_t session_id = ngx_cp_async_ctx_get_session_id_safe(ctx);
if (session_id == 0) {
write_dbg(DBG_LEVEL_WARNING, "Wait signal sender: invalid session ID");
return NGX_ERROR;
}
static const ngx_uint_t wait_fragments_count = 2;
char *fragments[wait_fragments_count];
uint16_t fragments_sizes[wait_fragments_count];
AttachmentDataType transaction_type = REQUEST_DELAYED_VERDICT;
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&transaction_type, &session_id);
write_dbg(DBG_LEVEL_DEBUG, "Sending async wait data to shared memory for session %d", session_id);
err_code = sendChunkedData(nano_service_ipc, fragments_sizes, (const char **)fragments, wait_fragments_count);
if (err_code != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send wait data to shared memory for session %d, error: %d", session_id, err_code);
return NGX_ERROR;
}
ngx_cp_async_increment_pending_chunks(session_id, "wait_signal");
write_dbg(DBG_LEVEL_DEBUG, "Signaling agent service about wait data for session %d with %dms timeout protection", session_id, async_signal_timeout_ms);
signal_res = ngx_http_cp_signal_to_service_with_timeout(session_id, async_signal_timeout_ms);
if (signal_res != NGX_OK) {
if (signal_res == NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_DEBUG, "Signal timeout (%dms) reached for wait data, session %d - flow_error set", async_signal_timeout_ms, session_id);
ngx_cp_async_post_backpressure_drain_event();
return NGX_HTTP_REQUEST_TIME_OUT;
} else {
write_dbg(DBG_LEVEL_WARNING, "Failed to signal service for wait data, session %d", session_id);
}
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_DEBUG, "Successfully sent async wait signal for session %d", session_id);
*num_messages_sent = 1;
return NGX_OK;
}
///
/// @brief Async version of ngx_http_cp_meta_data_sender - sends data but doesn't wait
/// @param[in] ctx Async context
/// @param[out] num_messages_sent Number of messages sent
/// @return NGX_OK on success, NGX_ERROR on failure, INSPECTION_IRRELEVANT if irrelevant
///
ngx_int_t
ngx_cp_async_send_meta_data_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent)
{
static ngx_str_t ngx_parsed_host_str = ngx_string("host");
char client_ip[INET6_ADDRSTRLEN];
char listening_ip[INET6_ADDRSTRLEN];
uint16_t client_ip_len;
uint16_t listening_ip_len;
uint16_t client_port;
uint16_t chunck_type;
uint16_t listening_port;
ngx_int_t res;
ngx_str_t maybe_host = { 0, (u_char *)"" };
ngx_str_t ngx_parsed_host = { 0, (u_char *)"" };
ngx_str_t parsed_uri = { 0, (u_char *)"" };
ngx_http_variable_value_t *ngx_var;
char *fragments[META_DATA_COUNT + 2];
uint16_t fragments_sizes[META_DATA_COUNT + 2];
int err_code = 0;
write_dbg(DBG_LEVEL_DEBUG, "Async sending request start meta data for inspection");
convert_sock_addr_to_string(((struct sockaddr *)ctx->request->connection->sockaddr), client_ip);
if(!is_inspection_required_for_source(client_ip)) return INSPECTION_IRRELEVANT;
chunck_type = REQUEST_START;
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &ctx->session_id);
set_fragment_elem(
fragments,
fragments_sizes,
&ctx->request->http_protocol.len,
sizeof(uint16_t),
HTTP_PROTOCOL_SIZE + 2
);
set_fragment_elem(
fragments,
fragments_sizes,
ctx->request->http_protocol.data,
ctx->request->http_protocol.len,
HTTP_PROTOCOL_DATA + 2
);
set_fragment_elem(fragments, fragments_sizes, &ctx->request->method_name.len, sizeof(uint16_t), HTTP_METHOD_SIZE + 2);
set_fragment_elem(
fragments,
fragments_sizes,
ctx->request->method_name.data,
ctx->request->method_name.len,
HTTP_METHOD_DATA + 2
);
ngx_var = ngx_http_get_variable(ctx->request, &ngx_parsed_host_str, ngx_hash_key(ngx_parsed_host_str.data, ngx_parsed_host_str.len));
if (ngx_var == NULL || ngx_var->not_found) {
write_dbg(DBG_LEVEL_DEBUG, "No parsed host found, using headers host");
if (ctx->request->headers_in.host != NULL) {
maybe_host.data = ctx->request->headers_in.host->value.data;
maybe_host.len = ctx->request->headers_in.host->value.len;
}
} else {
ngx_parsed_host.data = ngx_var->data;
ngx_parsed_host.len = ngx_var->len;
}
if (ctx->request->uri.len > 0) {
parsed_uri.data = ctx->request->uri.data;
parsed_uri.len = ctx->request->uri.len;
} else {
parsed_uri.data = ctx->request->unparsed_uri.data;
parsed_uri.len = ctx->request->unparsed_uri.len;
}
set_fragment_elem(
fragments,
fragments_sizes,
&maybe_host.len,
sizeof(uint16_t),
HOST_NAME_SIZE + 2
);
set_fragment_elem(
fragments,
fragments_sizes,
maybe_host.data,
maybe_host.len,
HOST_NAME_DATA + 2
);
// Add listening IP and port data (exact same logic)
convert_sock_addr_to_string(((struct sockaddr *)ctx->request->connection->local_sockaddr), listening_ip);
listening_ip_len = strlen(listening_ip);
set_fragment_elem(fragments, fragments_sizes, &listening_ip_len, sizeof(uint16_t), LISTENING_ADDR_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, listening_ip, listening_ip_len, LISTENING_ADDR_DATA + 2);
listening_port = htons(((struct sockaddr_in *)ctx->request->connection->local_sockaddr)->sin_port);
set_fragment_elem(fragments, fragments_sizes, &listening_port, sizeof(listening_port), LISTENING_PORT + 2);
// Add URI data (exact same logic)
set_fragment_elem(fragments, fragments_sizes, &ctx->request->unparsed_uri.len, sizeof(uint16_t), URI_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, ctx->request->unparsed_uri.data, ctx->request->unparsed_uri.len, URI_DATA + 2);
// Add client IP and port data (exact same logic)
client_ip_len = strlen(client_ip);
set_fragment_elem(fragments, fragments_sizes, &client_ip_len, sizeof(uint16_t), CLIENT_ADDR_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, client_ip, client_ip_len, CLIENT_ADDR_DATA + 2);
client_port = htons(((struct sockaddr_in *)ctx->request->connection->sockaddr)->sin_port);
set_fragment_elem(fragments, fragments_sizes, &client_port, sizeof(client_port), CLIENT_PORT + 2);
// Add parsed host and URI data (exact same logic)
set_fragment_elem(fragments, fragments_sizes, &ngx_parsed_host.len, sizeof(uint16_t), PARSED_HOST_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, ngx_parsed_host.data, ngx_parsed_host.len, PARSED_HOST_DATA + 2);
set_fragment_elem(fragments, fragments_sizes, &parsed_uri.len, sizeof(uint16_t), PARSED_URI_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, parsed_uri.data, parsed_uri.len, PARSED_URI_DATA + 2);
// Add WAF tag data (exact same logic)
if (ctx->waf_tag.len > 0) {
set_fragment_elem(fragments, fragments_sizes, &ctx->waf_tag.len, sizeof(uint16_t), WAF_TAG_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, ctx->waf_tag.data, ctx->waf_tag.len, WAF_TAG_DATA + 2);
} else {
uint16_t zero = 0;
set_fragment_elem(fragments, fragments_sizes, &zero, sizeof(uint16_t), WAF_TAG_SIZE + 2);
set_fragment_elem(fragments, fragments_sizes, "", 0, WAF_TAG_DATA + 2);
}
write_dbg(DBG_LEVEL_DEBUG, "Async sending meta data chunk to shared memory");
err_code = sendChunkedData(nano_service_ipc, fragments_sizes, (const char **)fragments, META_DATA_COUNT + 2);
if (err_code != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send meta data chunk - error code %d", err_code);
return NGX_ERROR;
}
ngx_cp_async_increment_pending_chunks(ctx->session_id, "meta_data");
write_dbg(DBG_LEVEL_DEBUG, "Async signaling agent for meta data with %dms timeout protection", async_signal_timeout_ms);
res = ngx_http_cp_signal_to_service_with_timeout(ctx->session_id, async_signal_timeout_ms);
if (res != NGX_OK && res != NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_WARNING, "Failed to signal agent for single body chunk, session %d", ctx->session_id);
return NGX_ERROR;
}
if (res == NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_DEBUG, "Signal timeout (%dms) reached for single body chunk, session %d", async_signal_timeout_ms, ctx->session_id);
ngx_cp_async_post_backpressure_drain_event();
}
*num_messages_sent = 1;
write_dbg(DBG_LEVEL_DEBUG, "Async meta data sent and signaled successfully");
return NGX_OK;
}
///
/// @brief Async version of ngx_http_cp_header_sender - sends data but doesn't wait
/// @param[in] ctx Async context
/// @param[out] num_messages_sent Number of messages sent
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t
ngx_cp_async_send_headers_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent)
{
ngx_uint_t header_idx = 0;
ngx_uint_t idx_in_bulk = 0;
ngx_uint_t num_of_bulks_sent = 0;
uint8_t part_count = 0;
uint8_t bulk_part_idx = 0;
uint8_t is_last_part;
ngx_list_part_t *headers_iter;
ngx_table_elt_t *headers_to_inspect;
ngx_table_elt_t *header;
const ngx_uint_t max_bulk_size = 10;
char *fragments[HEADER_DATA_COUNT * max_bulk_size + 4];
uint16_t fragments_sizes[HEADER_DATA_COUNT * max_bulk_size + 4];
int err_code = 0;
ngx_int_t res;
write_dbg(DBG_LEVEL_DEBUG, "Async sending request headers for inspection");
uint16_t header_type = REQUEST_HEADER;
set_fragments_identifiers(fragments, fragments_sizes, &header_type, &ctx->session_id);
for (headers_iter = &(ctx->request->headers_in.headers.part); headers_iter; headers_iter = headers_iter->next) {
for (header_idx = 0; header_idx < headers_iter->nelts; ++header_idx) {
headers_to_inspect = headers_iter->elts;
header = headers_to_inspect + header_idx;
write_dbg(
DBG_LEVEL_DEBUG,
"Async sending header (key: '%.*s', value: '%.*s')",
header->key.len,
header->key.data,
header->value.len,
header->value.data
);
is_last_part = (headers_iter->next == NULL && header_idx + 1 == headers_iter->nelts) ? 1 : 0;
add_header_to_bulk(fragments, fragments_sizes, header, idx_in_bulk);
idx_in_bulk++;
part_count++;
if (idx_in_bulk < max_bulk_size && !is_last_part) continue;
set_fragment_elem(fragments, fragments_sizes, &is_last_part, sizeof(is_last_part), 2);
set_fragment_elem(fragments, fragments_sizes, &bulk_part_idx, sizeof(bulk_part_idx), 3);
write_dbg(DBG_LEVEL_DEBUG, "Async sending header bulk to shared memory");
err_code = sendChunkedData(
nano_service_ipc,
fragments_sizes,
(const char **)fragments,
HEADER_DATA_COUNT * idx_in_bulk + 4
);
if (err_code != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send header bulk - error code %d", err_code);
return NGX_ERROR;
}
ngx_cp_async_increment_pending_chunks(ctx->session_id, "headers");
num_of_bulks_sent++;
write_dbg(DBG_LEVEL_DEBUG, "Async header bulk sent successfully (no signal yet)");
if (is_last_part) break;
idx_in_bulk = 0;
bulk_part_idx = part_count;
}
}
if (part_count == 0) {
write_dbg(DBG_LEVEL_DEBUG, "Async sending empty header list");
uint8_t is_last_part = 1;
uint8_t bulk_part_idx = 0;
set_fragment_elem(fragments, fragments_sizes, &is_last_part, sizeof(is_last_part), 2);
set_fragment_elem(fragments, fragments_sizes, &bulk_part_idx, sizeof(bulk_part_idx), 3);
err_code = sendChunkedData(
nano_service_ipc,
fragments_sizes,
(const char **)fragments,
HEADER_DATA_COUNT * 1 + 4
);
if (err_code != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send empty header list - error code %d", err_code);
return NGX_ERROR;
}
// Increment pending chunks counter for empty headers
ngx_cp_async_increment_pending_chunks(ctx->session_id, "headers");
num_of_bulks_sent = 1;
}
// Signal agent once after all header bulks are sent
write_dbg(DBG_LEVEL_DEBUG, "Async signaling agent for all headers with %dms timeout protection", async_signal_timeout_ms);
res = ngx_http_cp_signal_to_service_with_timeout(ctx->session_id, async_signal_timeout_ms);
if (res != NGX_OK && res != NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_WARNING, "Failed to signal agent for single body chunk, session %d", ctx->session_id);
return NGX_ERROR;
}
if (res == NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_DEBUG, "Signal timeout (%dms) reached for single body chunk, session %d", async_signal_timeout_ms, ctx->session_id);
ngx_cp_async_post_backpressure_drain_event();
}
*num_messages_sent = num_of_bulks_sent;
write_dbg(DBG_LEVEL_DEBUG, "Async headers sent and signaled successfully - %d bulks", num_of_bulks_sent);
return NGX_OK;
}
///
/// @brief Async version of ngx_http_cp_end_transaction_sender - sends data but doesn't wait
/// @param[in] ctx Async context
/// @param[out] num_messages_sent Number of messages sent
/// @return NGX_OK on success, NGX_ERROR on failure
///
ngx_int_t
ngx_cp_async_send_end_transaction_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent)
{
char *fragments[2];
uint16_t fragments_sizes[2];
uint16_t chunck_type = REQUEST_END;
int err_code = 0;
ngx_int_t res;
write_dbg(DBG_LEVEL_DEBUG, "Async sending end transaction for inspection");
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &ctx->session_id);
write_dbg(DBG_LEVEL_DEBUG, "Async sending end transaction to shared memory");
err_code = sendChunkedData(nano_service_ipc, fragments_sizes, (const char **)fragments, 2);
if (err_code != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send end transaction - error code %d", err_code);
return NGX_ERROR;
}
ngx_cp_async_increment_pending_chunks(ctx->session_id, "end_transaction");
write_dbg(DBG_LEVEL_DEBUG, "Async signaling agent for end transaction with %dms timeout protection", async_signal_timeout_ms);
res = ngx_http_cp_signal_to_service_with_timeout(ctx->session_id, async_signal_timeout_ms);
if (res != NGX_OK && res != NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_WARNING, "Failed to signal agent for end transaction");
return NGX_ERROR;
}
if (res == NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_DEBUG, "Signal timeout (%dms) reached for end transaction, session %d", async_signal_timeout_ms, ctx->session_id);
ngx_cp_async_post_backpressure_drain_event();
}
*num_messages_sent = 1;
ctx->end_transaction_sent = 1;
write_dbg(DBG_LEVEL_DEBUG, "Async end transaction sent and signaled successfully");
return NGX_OK;
}
ngx_int_t
ngx_cp_async_send_single_body_chunk_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_chain_t *chunk, ngx_uint_t *num_messages_sent)
{
static const ngx_uint_t num_body_chunk_fragments = 5;
ngx_buf_t *buf;
ngx_int_t res = NGX_ERROR;
uint8_t is_last_chunk;
uint8_t part_count = 0;
size_t buf_size;
char *fragments[num_body_chunk_fragments];
uint16_t fragments_sizes[num_body_chunk_fragments];
AttachmentDataType body_type = REQUEST_BODY;
write_dbg(DBG_LEVEL_DEBUG, "Sending single body chunk for session %d", ctx->session_id);
if (chunk == NULL) {
write_dbg(DBG_LEVEL_WARNING, "No chunk data to send for session %d", ctx->session_id);
*num_messages_sent = 0;
return NGX_OK;
}
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&body_type, &ctx->session_id);
buf = chunk->buf;
is_last_chunk = buf->last_buf ? 1 : 0;
buf_size = buf->last - buf->pos;
write_dbg(
DBG_LEVEL_DEBUG,
"Processing single body chunk of size: %zu, last_chunk: %d for session %d",
buf_size,
is_last_chunk,
ctx->session_id
);
if (buf_size > 0 || is_last_chunk) {
set_fragment_elem(fragments, fragments_sizes, &is_last_chunk, sizeof(is_last_chunk), 2);
set_fragment_elem(fragments, fragments_sizes, &part_count, sizeof(part_count), 3);
set_fragment_elem(fragments, fragments_sizes, buf->pos, buf->last - buf->pos, 4);
ctx->session_data->processed_req_body_size += (buf->last - buf->pos);
write_dbg(DBG_LEVEL_DEBUG, "Sending single body chunk to agent for session %d", ctx->session_id);
res = sendChunkedData(nano_service_ipc, fragments_sizes, (const char **)fragments, num_body_chunk_fragments);
if (res != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send single body chunk to agent for session %d: %d", ctx->session_id, res);
return NGX_ERROR;
}
ngx_cp_async_increment_pending_chunks(ctx->session_id, "body_chunk");
write_dbg(DBG_LEVEL_DEBUG, "Successfully sent single body chunk to agent for session %d", ctx->session_id);
res = ngx_http_cp_signal_to_service_with_timeout(ctx->session_id, async_signal_timeout_ms);
if (res != NGX_OK && res != NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_WARNING, "Failed to signal agent for single body chunk, session %d", ctx->session_id);
return NGX_ERROR;
}
if (res == NGX_HTTP_REQUEST_TIME_OUT) {
write_dbg(DBG_LEVEL_DEBUG, "Signal timeout (%dms) reached for single body chunk, session %d", async_signal_timeout_ms, ctx->session_id);
ngx_cp_async_post_backpressure_drain_event();
}
*num_messages_sent = 1;
write_dbg(DBG_LEVEL_DEBUG, "Single chunk sender completed successfully for session %d", ctx->session_id);
return NGX_OK;
}
write_dbg(DBG_LEVEL_DEBUG, "Empty single chunk for session %d", ctx->session_id);
*num_messages_sent = 0;
return NGX_OK;
}
ngx_int_t
ngx_cp_async_send_to_agent_nonblocking(
ngx_http_cp_async_ctx_t *ctx,
AttachmentDataType chunk_type,
const void *data,
uint16_t data_size)
{
ngx_int_t res;
NanoHttpRequestData *request_data;
uint16_t total_size;
const char *chunks[2];
uint16_t chunk_sizes[2];
write_dbg(
DBG_LEVEL_DEBUG,
"Sending non-blocking data to agent for session %d, type: %d",
ctx->session_id,
chunk_type
);
total_size = sizeof(NanoHttpRequestData) + data_size;
request_data = ngx_palloc(ctx->request->pool, total_size);
if (request_data == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Failed to allocate request data for session %d", ctx->session_id);
return NGX_ERROR;
}
request_data->data_type = chunk_type;
request_data->session_id = ctx->session_id;
if (data && data_size > 0) {
ngx_memcpy(request_data->data, data, data_size);
}
chunks[0] = (const char *)request_data;
chunk_sizes[0] = total_size;
res = sendChunkedData(nano_service_ipc, chunk_sizes, chunks, 1);
if (res != 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send data to agent for session %d: %d", ctx->session_id, res);
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_DEBUG, "Successfully sent data to agent for session %d", ctx->session_id);
return NGX_OK;
}
ngx_int_t
ngx_cp_async_signal_agent_nonblocking(ngx_http_cp_async_ctx_t *ctx)
{
ssize_t bytes_written;
uint32_t session_id = ctx->session_id;
write_dbg(DBG_LEVEL_DEBUG, "Signaling agent for session %d (non-blocking)", ctx->session_id);
if (comm_socket < 0) {
write_dbg(DBG_LEVEL_ERROR, "Communication socket not ready yet for session %d - skipping signal", ctx->session_id);
return NGX_OK;
}
bytes_written = write(comm_socket, &session_id, sizeof(session_id));
if (bytes_written != sizeof(session_id)) {
write_dbg(DBG_LEVEL_WARNING, "Failed to signal agent for session %d: %zd", ctx->session_id, bytes_written);
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_DEBUG, "Successfully signaled agent for session %d", ctx->session_id);
return NGX_OK;
}

View File

@@ -0,0 +1,37 @@
#ifndef __NGX_CP_ASYNC_SENDER_H__
#define __NGX_CP_ASYNC_SENDER_H__
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <ngx_event.h>
#include "ngx_cp_async_core.h"
#include "nano_attachment_common.h"
ngx_int_t ngx_cp_async_wait_signal_sender(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent);
ngx_int_t ngx_cp_async_send_meta_data_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent);
ngx_int_t ngx_cp_async_wait_signal_sender(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent);
ngx_int_t ngx_cp_async_send_headers_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent);
ngx_int_t ngx_cp_async_send_end_transaction_nonblocking(ngx_http_cp_async_ctx_t *ctx, ngx_uint_t *num_messages_sent);
ngx_int_t
ngx_cp_async_send_single_body_chunk_nonblocking(
ngx_http_cp_async_ctx_t *ctx,
ngx_chain_t *chunk,
ngx_uint_t *num_messages_sent
);
ngx_int_t
ngx_cp_async_send_to_agent_nonblocking(
ngx_http_cp_async_ctx_t *ctx,
AttachmentDataType chunk_type,
const void *data,
uint16_t data_size
);
#endif // __NGX_CP_ASYNC_SENDER_H__

View File

@@ -0,0 +1,29 @@
#ifndef __NGX_CP_ASYNC_TYPES_H__
#define __NGX_CP_ASYNC_TYPES_H__
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
// Forward declarations
typedef struct ngx_http_cp_async_ctx ngx_http_cp_async_ctx_t;
typedef struct ngx_http_cp_session_data ngx_http_cp_session_data;
/// @enum ngx_cp_async_stage_t
/// @brief Processing stages for async operations
typedef enum {
NGX_CP_ASYNC_STAGE_INIT = 0,
NGX_CP_ASYNC_STAGE_META_DATA,
NGX_CP_ASYNC_STAGE_WAIT_META_VERDICT,
NGX_CP_ASYNC_STAGE_HEADERS,
NGX_CP_ASYNC_STAGE_WAIT_HEADER_VERDICT,
NGX_CP_ASYNC_STAGE_END_TRANSACTION,
NGX_CP_ASYNC_STAGE_WAIT_END_VERDICT,
NGX_CP_ASYNC_STAGE_BODY,
NGX_CP_ASYNC_STAGE_WAIT_BODY_VERDICT,
NGX_CP_ASYNC_STAGE_VERDICT,
NGX_CP_ASYNC_STAGE_COMPLETE,
NGX_CP_ASYNC_STAGE_ERROR = -1
} ngx_cp_async_stage_t;
#endif // __NGX_CP_ASYNC_TYPES_H__