mirror of
https://github.com/openappsec/attachment.git
synced 2025-06-28 16:41:03 +03:00
1242 lines
45 KiB
C
1242 lines
45 KiB
C
// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
/// @file ngx_cp_io.h
|
|
#include "ngx_cp_io.h"
|
|
|
|
#include <ngx_core.h>
|
|
|
|
#include <poll.h>
|
|
#include <stdint.h>
|
|
#include <stdbool.h>
|
|
#include <arpa/inet.h>
|
|
|
|
#include "ngx_cp_utils.h"
|
|
#include "ngx_cp_initializer.h"
|
|
#include "ngx_http_cp_attachment_module.h"
|
|
#include "ngx_cp_metric.h"
|
|
|
|
#define NGX_CP_CONF_DISABLED 0
|
|
#define NGX_CP_CONF_ENABLED 1
|
|
|
|
static const ngx_int_t inspection_irrelevant = INSPECTION_IRRELEVANT;
|
|
|
|
extern uint64_t metric_data[METRIC_TYPES_COUNT];
|
|
|
|
SharedMemoryIPC *nano_service_ipc = NULL;
|
|
int comm_socket = -1;
|
|
|
|
///
|
|
/// @brief Signals the nano service about new session to inspect.
|
|
/// @param[in] cur_session_id Session's Id.
|
|
/// @returns ngx_int_t
|
|
/// - #NGX_OK
|
|
/// - #NGX_ERROR
|
|
/// - #NGX_HTTP_REQUEST_TIME_OUT
|
|
///
|
|
static ngx_int_t
|
|
ngx_http_cp_signal_to_service(uint32_t cur_session_id)
|
|
{
|
|
int res = 0;
|
|
int bytes_written = 0;
|
|
struct timeval timeout = get_timeout_val_sec(1);
|
|
int is_fail_open_disabled = (inspection_mode != NON_BLOCKING_THREAD);
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending signal to the service to notify about new session data to inspect");
|
|
|
|
while (res >= 0) {
|
|
res = write(comm_socket, ((char *)&cur_session_id) + bytes_written, sizeof(cur_session_id) - bytes_written);
|
|
if (res > 0) {
|
|
bytes_written += res;
|
|
if (bytes_written == sizeof(cur_session_id)) break;
|
|
continue;
|
|
}
|
|
|
|
if (res < 0) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to signal nano service, trying to restart communications");
|
|
disconnect_communication();
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
if (!is_fail_open_disabled && is_timeout_reached(&timeout)) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Reached timeout during attempt to signal nano service");
|
|
return NGX_HTTP_REQUEST_TIME_OUT;
|
|
}
|
|
}
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
///
|
|
/// @brief Signals and recieve signal to/from nano service about new session to inspect.
|
|
/// @param[in] cur_session_id Session's Id.
|
|
/// @param[in] chunk_type Chunk type that the attachment is waiting for a response from nano service.
|
|
/// @param[in] tout_retries NUmber of retries to wait for a response from nano service.
|
|
/// @returns ngx_int_t
|
|
/// - #NGX_OK
|
|
/// - #NGX_ERROR
|
|
/// - #NGX_HTTP_REQUEST_TIME_OUT
|
|
/// - #NGX_AGAIN
|
|
///
|
|
static ngx_int_t
|
|
ngx_http_cp_wait_for_service(uint32_t cur_session_id, ngx_http_chunk_type_e chunk_type, ngx_int_t tout_retries)
|
|
{
|
|
static int dbg_count = 0;
|
|
static clock_t clock_start = (clock_t) 0;
|
|
int res = 0;
|
|
int bytes_read = 0;
|
|
uint32_t reply_from_service;
|
|
ngx_int_t retry;
|
|
int is_fail_open_disabled = (inspection_mode != NON_BLOCKING_THREAD);
|
|
ngx_uint_t timeout = chunk_type == HOLD_DATA ? fail_open_hold_timeout : fail_open_timeout;
|
|
|
|
res = ngx_http_cp_signal_to_service(cur_session_id);
|
|
if (res != NGX_OK) return res;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Successfully signaled to the service! pending to receive ack, retries = %d", tout_retries);
|
|
for (retry = 0; retry < tout_retries; ) {
|
|
// If inspection_mode is different from NON_BLOCKING_THREAD, then the loop will run indefinitely.
|
|
if (!is_fail_open_disabled) {
|
|
retry++;
|
|
}
|
|
struct pollfd s_poll;
|
|
s_poll.fd = comm_socket;
|
|
s_poll.events = POLLIN;
|
|
s_poll.revents = 0;
|
|
res = poll(&s_poll, 1, is_fail_open_disabled ? 150 : timeout);
|
|
|
|
if (res < 0) {
|
|
// Polling from the nano service has failed.
|
|
dbg_count += 1;
|
|
if (dbg_count > 50 && (((double)(clock() - clock_start)) / CLOCKS_PER_SEC) > 60) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Polling from nano service had fail");
|
|
clock_start = clock();
|
|
dbg_count = 0;
|
|
} else {
|
|
write_dbg(DBG_LEVEL_TRACE, "Polling from nano service had fail");
|
|
}
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
if (res == 0) {
|
|
// Polling from nano service has been timed out.
|
|
continue;
|
|
}
|
|
|
|
res = read(comm_socket, ((char *)&reply_from_service) + bytes_read, sizeof(reply_from_service) - bytes_read);
|
|
if (res <= 0) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to read ack from nano service");
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
bytes_read += res;
|
|
if (bytes_read != sizeof(reply_from_service)) continue;
|
|
bytes_read = 0;
|
|
|
|
if (reply_from_service == cur_session_id) {
|
|
// Read was successful and matches the current session Id.
|
|
write_dbg(
|
|
DBG_LEVEL_TRACE,
|
|
"Received signal from nano service to the current session. Current session id: %d",
|
|
cur_session_id
|
|
);
|
|
return NGX_OK;
|
|
} else if (reply_from_service == CORRUPTED_SESSION_ID) {
|
|
// Recieved corrupted session ID, returning error.
|
|
write_dbg(
|
|
DBG_LEVEL_WARNING,
|
|
"Received signal from nano service regarding a corrupted session. Current session id: %d",
|
|
cur_session_id
|
|
);
|
|
return NGX_ERROR;
|
|
} else {
|
|
// Recieved old session Id, attempting to poll again.
|
|
write_dbg(
|
|
DBG_LEVEL_DEBUG,
|
|
"Received signal from nano service regarding a previous session."
|
|
" Current session id: %d, Signaled session id: %d",
|
|
cur_session_id,
|
|
reply_from_service
|
|
);
|
|
return NGX_AGAIN;
|
|
}
|
|
}
|
|
|
|
write_dbg(DBG_LEVEL_WARNING, "Reached timeout during attempt to signal nano service");
|
|
return NGX_HTTP_REQUEST_TIME_OUT;
|
|
}
|
|
|
|
///
|
|
/// @brief Send data to the nano service.
|
|
/// @param[in] fragments Data to send.
|
|
/// @param[in] fragments_sizes Data size.
|
|
/// @param[in] num_of_data_elem Number of elements in the data.
|
|
/// @param[in] cur_session_id Session's Id.
|
|
/// @param[in, out] was_waiting A pointer to an int
|
|
/// that symbolize if the function waited to send the data to the nano service.
|
|
/// @returns ngx_int_t
|
|
/// - #NGX_OK
|
|
/// - #NGX_ERROR
|
|
///
|
|
static ngx_int_t
|
|
ngx_http_cp_send_data_to_service(
|
|
char **fragments,
|
|
const uint16_t *fragments_sizes,
|
|
uint8_t num_of_data_elem,
|
|
uint32_t cur_session_id,
|
|
int *was_waiting,
|
|
ngx_http_chunk_type_e chunk_type,
|
|
ngx_int_t tout_retries
|
|
)
|
|
{
|
|
ngx_int_t max_retries;
|
|
ngx_int_t res = NGX_OK;
|
|
int err_code = 0;
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending session data chunk for inspection");
|
|
|
|
for (max_retries = 5; max_retries > 0; max_retries--) {
|
|
err_code = sendChunkedData(nano_service_ipc, fragments_sizes, (const char **)fragments, num_of_data_elem);
|
|
if (res == NGX_OK && err_code == 0) {
|
|
return NGX_OK;
|
|
}
|
|
|
|
write_dbg(DBG_LEVEL_DEBUG, "Failed to send data for inspection - %d attempts remained", max_retries - 1);
|
|
|
|
if (was_waiting) {
|
|
*was_waiting = 1;
|
|
}
|
|
|
|
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type, tout_retries);
|
|
if (res != NGX_OK && res != NGX_AGAIN) return res;
|
|
}
|
|
|
|
switch(err_code)
|
|
{
|
|
case -1:
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to send data for inspection - Corrupted shared memory");
|
|
break;
|
|
case -2:
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to send data for inspection - Requested write size exceeds the write limit");
|
|
break;
|
|
case -3:
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to send data for inspection - Cannot write to a full queue");
|
|
break;
|
|
case -4:
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to send data for inspection - Attempted write to a location outside the queue");
|
|
break;
|
|
default:
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to send data for inspection - Unknown error code %d", err_code);
|
|
break;
|
|
}
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
///
|
|
/// @brief Receieves data from service.
|
|
/// @returns ngx_http_cp_reply_from_service_t
|
|
/// - #A valid ngx_http_cp_reply_from_service_t pointer if valid.
|
|
/// - #NULL if failed.
|
|
///
|
|
static ngx_http_cp_reply_from_service_t *
|
|
ngx_http_cp_receive_data_from_service()
|
|
{
|
|
ngx_int_t res, retry;
|
|
const char *reply_data;
|
|
uint16_t reply_size;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Receiving verdict data from nano service");
|
|
|
|
for (retry = 0; retry < 5; retry++) {
|
|
if (!isDataAvailable(nano_service_ipc)) {
|
|
usleep(1);
|
|
continue;
|
|
}
|
|
res = receiveData(nano_service_ipc, &reply_size, &reply_data);
|
|
if (res < 0 || reply_data == NULL) {
|
|
write_dbg(
|
|
DBG_LEVEL_TRACE,
|
|
"Failed to receive verdict data - trying again (retry = %d) in 1 u-seconds",
|
|
retry
|
|
);
|
|
|
|
usleep(1);
|
|
continue;
|
|
}
|
|
|
|
return (ngx_http_cp_reply_from_service_t *)reply_data;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
///
|
|
/// @brief Free data from nano service.
|
|
///
|
|
static void
|
|
free_data_from_service()
|
|
{
|
|
popData(nano_service_ipc);
|
|
}
|
|
|
|
///
|
|
/// @brief Create a custom web response by the provided data
|
|
/// @details If web_response_type is set to REDIRECT_WEB_RESPONSE, it will set a redirect response.
|
|
/// @param[in] web_response_data Web response data.
|
|
///
|
|
static void
|
|
handle_custom_web_response(ngx_http_cp_web_response_data_t *web_response_data)
|
|
{
|
|
ngx_str_t title;
|
|
ngx_str_t body;
|
|
ngx_str_t uuid;
|
|
ngx_str_t redirect_location;
|
|
|
|
uuid.len = web_response_data->uuid_size;
|
|
|
|
if (web_response_data->web_repsonse_type == REDIRECT_WEB_RESPONSE) {
|
|
// Settings a redirected web response.
|
|
write_dbg(DBG_LEVEL_TRACE, "Preparing to set redirect web response");
|
|
redirect_location.len = web_response_data->response_data.redirect_data.redirect_location_size;
|
|
if (redirect_location.len > 0) {
|
|
redirect_location.data = (u_char *)web_response_data->response_data.redirect_data.redirect_location;
|
|
}
|
|
uuid.data = (u_char *)web_response_data->response_data.redirect_data.redirect_location + redirect_location.len;
|
|
set_redirect_response(&redirect_location, &uuid, web_response_data->response_data.redirect_data.add_event_id);
|
|
return;
|
|
}
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Preparing to set custom web response page");
|
|
|
|
// Setting custom web response title's and body's length.
|
|
title.len = web_response_data->response_data.custom_response_data.title_size;
|
|
body.len = web_response_data->response_data.custom_response_data.body_size;
|
|
|
|
if (title.len > 0 && body.len > 0) {
|
|
// Setting custom web response title's and body's data.
|
|
title.data = (u_char *)web_response_data->response_data.custom_response_data.data;
|
|
body.data = (u_char *)web_response_data->response_data.custom_response_data.data + title.len;
|
|
}
|
|
uuid.data = (u_char *)web_response_data->response_data.custom_response_data.data + title.len + body.len;
|
|
set_custom_response(&title, &body, &uuid, web_response_data->response_data.custom_response_data.response_code);
|
|
}
|
|
|
|
///
|
|
/// @brief Allocate a modifications list buffer.
|
|
/// @param[in, out] target Pointer to the allocated buffer.
|
|
/// @param[in] data_size Desired allocated size.
|
|
/// @param[in] data Session's Id.
|
|
/// @param[in, out] pool NGINX pool.
|
|
/// @returns ngx_int_t
|
|
/// - #NGX_OK
|
|
/// - #NGX_ERROR
|
|
///
|
|
static ngx_int_t
|
|
create_modification_buffer(char **target, uint16_t data_size, char *data, ngx_pool_t *pool)
|
|
{
|
|
*target = (char *)ngx_pcalloc(pool, data_size + 1);
|
|
if (*target == NULL) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to allocate modification buffer of size: %d", data_size);
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
snprintf(*target, data_size + 1, "%s", data);
|
|
write_dbg(DBG_LEVEL_DEBUG, "Successfully created modification buffer: %s", *target);
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
///
|
|
/// @brief Create a modifications node.
|
|
/// @param[in] modification Modification data.
|
|
/// @param[in] request NGINX request.
|
|
/// @returns modification_node
|
|
/// - #ngx_http_cp_modification_list pointer on success.
|
|
/// - #NULL if the creation failed.
|
|
///
|
|
static ngx_http_cp_modification_list *
|
|
create_modification_node(ngx_http_cp_inject_data_t *modification, ngx_http_request_t *request)
|
|
{
|
|
ngx_int_t res;
|
|
ngx_http_cp_modification_list *modification_node = (ngx_http_cp_modification_list *)ngx_pcalloc(
|
|
request->pool,
|
|
sizeof(ngx_http_cp_modification_list)
|
|
);
|
|
if (modification_node == NULL) {
|
|
write_dbg(
|
|
DBG_LEVEL_WARNING,
|
|
"Failed to allocate modification node of size: %d",
|
|
sizeof(ngx_http_cp_modification_list)
|
|
);
|
|
return NULL;
|
|
}
|
|
|
|
res = create_modification_buffer(
|
|
&modification_node->modification_buffer,
|
|
modification->injection_size,
|
|
modification->data,
|
|
request->pool
|
|
);
|
|
|
|
if (res != NGX_OK) {
|
|
ngx_pfree(request->pool, modification_node);
|
|
return NULL;
|
|
}
|
|
|
|
modification_node->next = NULL;
|
|
modification_node->modification.is_header = modification->is_header;
|
|
modification_node->modification.mod_type = modification->mod_type;
|
|
modification_node->modification.injection_pos = modification->injection_pos;
|
|
modification_node->modification.injection_size = modification->injection_size;
|
|
modification_node->modification.orig_buff_index = modification->orig_buff_index;
|
|
|
|
write_dbg(
|
|
DBG_LEVEL_DEBUG,
|
|
"Successfully created modification node. "
|
|
"Is header: %d, \
|
|
Injection position: %d, \
|
|
Injection size: %d, \
|
|
Original buffer index: %d, \
|
|
Modification data: %s, \
|
|
Should change data: %d",
|
|
modification_node->modification.is_header,
|
|
modification_node->modification.injection_pos,
|
|
modification_node->modification.injection_size,
|
|
modification_node->modification.orig_buff_index,
|
|
modification_node->modification_buffer,
|
|
modification_node->modification.mod_type
|
|
);
|
|
|
|
return modification_node;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_is_reconf_needed()
|
|
{
|
|
ngx_http_cp_reply_from_service_t *reply_p;
|
|
ngx_int_t res;
|
|
const char *reply_data;
|
|
uint16_t reply_size;
|
|
|
|
if (!nano_service_ipc) {
|
|
write_dbg(DBG_LEVEL_DEBUG, "Communication with nano service is not ready yet");
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
res = receiveData(nano_service_ipc, &reply_size, &reply_data);
|
|
if (res < 0 || reply_data == NULL) {
|
|
write_dbg(DBG_LEVEL_DEBUG, "Reconf verdict was not found");
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
reply_p = (ngx_http_cp_reply_from_service_t *)reply_data;
|
|
if (reply_p->verdict == TRAFFIC_VERDICT_RECONF) {
|
|
write_dbg(DBG_LEVEL_DEBUG, "Verdict reconf was received from the nano service. Performing reconf on the nginx worker attachment");
|
|
reset_attachment_config();
|
|
free_data_from_service();
|
|
return NGX_OK;
|
|
}
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_reply_receiver(
|
|
ngx_int_t *expected_replies,
|
|
ngx_http_cp_verdict_e *verdict,
|
|
uint32_t cur_session_id,
|
|
ngx_http_request_t *request,
|
|
ngx_http_cp_modification_list **modification_list,
|
|
ngx_http_chunk_type_e chunk_type,
|
|
uint64_t processed_body_size
|
|
)
|
|
{
|
|
ngx_http_cp_reply_from_service_t *reply_p;
|
|
ngx_http_cp_modification_list *new_modification = NULL;
|
|
ngx_http_cp_modification_list *current_modification = NULL;
|
|
ngx_http_cp_inject_data_t *current_inject_data = NULL;
|
|
ngx_int_t res;
|
|
ngx_int_t tout_retries = min_retries_for_verdict;
|
|
uint8_t modification_count;
|
|
unsigned int modification_index;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Receiving verdict replies for %d chunks of inspected data", *expected_replies);
|
|
|
|
if (*expected_replies == 0) {
|
|
*verdict = TRAFFIC_VERDICT_INSPECT;
|
|
return NGX_OK;
|
|
}
|
|
|
|
if (processed_body_size > body_size_trigger)
|
|
tout_retries = max_retries_for_verdict;
|
|
|
|
do {
|
|
res = ngx_http_cp_wait_for_service(cur_session_id, chunk_type, tout_retries);
|
|
} while (res == NGX_AGAIN);
|
|
|
|
if (res != NGX_OK) return NGX_ERROR;
|
|
|
|
while (*expected_replies) {
|
|
// For each expected replies, receive the reply from the nano service.
|
|
reply_p = ngx_http_cp_receive_data_from_service();
|
|
if (reply_p == NULL) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to get reply from the nano service");
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
if (reply_p->verdict != TRAFFIC_VERDICT_RECONF) {
|
|
// Handling reconfiguration verdict.
|
|
if (reply_p->session_id != cur_session_id) {
|
|
write_dbg(DBG_LEVEL_DEBUG, "Ignoring verdict to an already handled request %d", reply_p->session_id);
|
|
free_data_from_service();
|
|
continue;
|
|
}
|
|
|
|
(*expected_replies)--;
|
|
}
|
|
|
|
*verdict = reply_p->verdict;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict %d received, number of repiles left: %d", *verdict, *expected_replies);
|
|
|
|
switch(*verdict) {
|
|
case TRAFFIC_VERDICT_INJECT: {
|
|
// Verdict inject received from the nano service.
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict inject received from the nano service");
|
|
updateMetricField(INJECT_VERDICTS_COUNT, 1);
|
|
current_inject_data = reply_p->modify_data->inject_data;
|
|
modification_count = reply_p->modification_count;
|
|
for (modification_index = 0; modification_index < modification_count; modification_index++) {
|
|
// Go over the modifications and create nodes.
|
|
new_modification = create_modification_node(current_inject_data, request);
|
|
if (new_modification == NULL) {
|
|
write_dbg(DBG_LEVEL_WARNING, "Failed to create modification node");
|
|
while (*modification_list) {
|
|
current_modification = *modification_list;
|
|
*modification_list = (*modification_list)->next;
|
|
ngx_pfree(request->pool, current_modification->modification_buffer);
|
|
ngx_pfree(request->pool, current_modification);
|
|
}
|
|
return NGX_ERROR;
|
|
}
|
|
if (*modification_list == NULL) {
|
|
*modification_list = new_modification;
|
|
current_modification = *modification_list;
|
|
} else {
|
|
current_modification->next = new_modification;
|
|
current_modification = current_modification->next;
|
|
}
|
|
// Saving injected data.
|
|
current_inject_data = (ngx_http_cp_inject_data_t *)(
|
|
(char *)current_inject_data +
|
|
sizeof(ngx_http_cp_inject_data_t) +
|
|
current_inject_data->injection_size
|
|
);
|
|
}
|
|
*verdict = TRAFFIC_VERDICT_INSPECT;
|
|
break;
|
|
}
|
|
|
|
case TRAFFIC_VERDICT_DROP: {
|
|
// After a drop verdict no more replies will be sent, so we can leave the loop
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict drop received from the nano service");
|
|
|
|
updateMetricField(DROP_VERDICTS_COUNT, 1);
|
|
handle_custom_web_response(reply_p->modify_data->web_response_data);
|
|
|
|
*expected_replies = 0;
|
|
free_data_from_service();
|
|
while (*modification_list) {
|
|
current_modification = *modification_list;
|
|
*modification_list = (*modification_list)->next;
|
|
ngx_pfree(request->pool, current_modification->modification.data);
|
|
ngx_pfree(request->pool, current_modification);
|
|
}
|
|
return NGX_HTTP_FORBIDDEN;
|
|
}
|
|
|
|
case TRAFFIC_VERDICT_ACCEPT: {
|
|
// After an accept verdict no more replies will be sent, so we can leave the loop
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict accept received from the nano service");
|
|
updateMetricField(ACCEPT_VERDICTS_COUNT, 1);
|
|
*expected_replies = 0;
|
|
free_data_from_service();
|
|
return NGX_OK;
|
|
}
|
|
|
|
case TRAFFIC_VERDICT_IRRELEVANT: {
|
|
// After an irrelevant verdict, ignore the verdict and continue to the next response.
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict irrelevant received from the nano service");
|
|
updateMetricField(IRRELEVANT_VERDICTS_COUNT, 1);
|
|
break;
|
|
}
|
|
|
|
case TRAFFIC_VERDICT_RECONF: {
|
|
// After a reconfiguration verdict, reset attachment config.
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict reconf received from the nano service");
|
|
updateMetricField(RECONF_VERDICTS_COUNT, 1);
|
|
reset_attachment_config();
|
|
break;
|
|
}
|
|
|
|
case TRAFFIC_VERDICT_INSPECT: {
|
|
// After an irrelevant verdict, ignore the verdict and continue to the next response.
|
|
write_dbg(DBG_LEVEL_TRACE, "Verdict inspect received from the nano service");
|
|
updateMetricField(INSPECT_VERDICTS_COUNT, 1);
|
|
break;
|
|
}
|
|
|
|
case TRAFFIC_VERDICT_WAIT: {
|
|
// After a wait verdict, query the nano agent again to get an updated verdict.
|
|
write_dbg(DBG_LEVEL_DEBUG, "Verdict wait received from the nano service");
|
|
updateMetricField(HOLD_VERDICTS_COUNT, 1);
|
|
break;
|
|
}
|
|
}
|
|
|
|
free_data_from_service();
|
|
}
|
|
|
|
write_dbg(DBG_LEVEL_DEBUG, "No verdict received from the nano service");
|
|
return NGX_OK;
|
|
}
|
|
|
|
///
|
|
/// @brief Set meta data fragment element data and size.
|
|
/// @param[in, out] meta_data_elems Fragments data array.
|
|
/// @param[in, out] meta_data_sizes Fragments data sizes array.
|
|
/// @param[in] data Data to set into the meta_data_elems array.
|
|
/// @param[in] size Size to be set into the meta_data_sizes array.
|
|
/// @param[in] idx Index of the arrays to set the data and size into.
|
|
///
|
|
static void
|
|
set_fragment_elem(char **meta_data_elems, uint16_t *meta_data_sizes, void *data, uint16_t size, uint idx)
|
|
{
|
|
meta_data_elems[idx] = data;
|
|
meta_data_sizes[idx] = size;
|
|
}
|
|
|
|
///
|
|
/// @brief Set meta data fragments identifiers.
|
|
/// @details The data identifiers will be set on the 0 and 1 slots of the array.
|
|
/// @param[in, out] meta_data_elems Fragments data array.
|
|
/// @param[in, out] meta_data_sizes Fragments data sizes array.
|
|
/// @param[in] data_type Data type identifier to be set.
|
|
/// @param[in] cur_request_id Request's Id.
|
|
///
|
|
static void
|
|
set_fragments_identifiers(
|
|
char **meta_data_elems,
|
|
uint16_t *meta_data_sizes,
|
|
uint16_t *data_type,
|
|
uint32_t *cur_request_id)
|
|
{
|
|
set_fragment_elem(meta_data_elems, meta_data_sizes, data_type, sizeof(uint16_t), 0);
|
|
set_fragment_elem(meta_data_elems, meta_data_sizes, cur_request_id, sizeof(uint32_t), 1);
|
|
}
|
|
|
|
///
|
|
/// @brief Convert sock address to a string.
|
|
/// @param[in, out] sockaddr Socker to convert.
|
|
/// @param[in, out] ip_addr Output location of the conversion.
|
|
///
|
|
static void
|
|
convert_sock_addr_to_string(const struct sockaddr *sa, char *ip_addr)
|
|
{
|
|
void *ip = NULL;
|
|
if (sa->sa_family == AF_INET) {
|
|
ip = (void *) &(((struct sockaddr_in*)sa)->sin_addr);
|
|
inet_ntop(AF_INET, ip, ip_addr, INET6_ADDRSTRLEN);
|
|
} else {
|
|
ip = (void *)&(((struct sockaddr_in6*)sa)->sin6_addr);
|
|
inet_ntop(AF_INET6, ip, ip_addr, INET6_ADDRSTRLEN);
|
|
}
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_meta_data_sender(ngx_http_request_t *request, uint32_t cur_request_id, ngx_uint_t *num_messages_sent)
|
|
{
|
|
char client_ip[INET6_ADDRSTRLEN];
|
|
char listening_ip[INET6_ADDRSTRLEN];
|
|
uint16_t client_ip_len;
|
|
uint16_t listening_ip_len;
|
|
uint16_t client_port;
|
|
uint16_t chunck_type;
|
|
uint16_t listening_port;
|
|
ngx_int_t res;
|
|
ngx_str_t ngx_parsed_host_str = ngx_string("host");
|
|
ngx_str_t maybe_host = { 0, (u_char *)"" };
|
|
ngx_str_t ngx_parsed_host = { 0, (u_char *)"" };
|
|
ngx_str_t parsed_uri = { 0, (u_char *)"" };
|
|
ngx_http_variable_value_t *ngx_var;
|
|
char *fragments[META_DATA_COUNT + 2];
|
|
uint16_t fragments_sizes[META_DATA_COUNT + 2];
|
|
static int failure_count = 0;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending request start meta data for inspection");
|
|
|
|
convert_sock_addr_to_string(((struct sockaddr *)request->connection->sockaddr), client_ip);
|
|
if(!is_inspection_required_for_source(client_ip)) return inspection_irrelevant;
|
|
|
|
// Sets the fragments
|
|
chunck_type = REQUEST_START;
|
|
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_request_id);
|
|
|
|
// Add protocol length to fragments.
|
|
set_fragment_elem(
|
|
fragments,
|
|
fragments_sizes,
|
|
&request->http_protocol.len,
|
|
sizeof(uint16_t),
|
|
HTTP_PROTOCOL_SIZE + 2
|
|
);
|
|
// Add protocol data to fragments.
|
|
set_fragment_elem(
|
|
fragments,
|
|
fragments_sizes,
|
|
request->http_protocol.data,
|
|
request->http_protocol.len,
|
|
HTTP_PROTOCOL_DATA + 2
|
|
);
|
|
|
|
// Add method data length to fragments.
|
|
set_fragment_elem(fragments, fragments_sizes, &request->method_name.len, sizeof(uint16_t), HTTP_METHOD_SIZE + 2);
|
|
// Add method data to fragments
|
|
set_fragment_elem(
|
|
fragments,
|
|
fragments_sizes,
|
|
request->method_name.data,
|
|
request->method_name.len,
|
|
HTTP_METHOD_DATA + 2
|
|
);
|
|
|
|
if (request->headers_in.host != NULL) {
|
|
maybe_host.len = request->headers_in.host->value.len;
|
|
maybe_host.data = request->headers_in.host->value.data;
|
|
}
|
|
|
|
ngx_var = ngx_http_get_variable(request, &ngx_parsed_host_str, ngx_hash_key(ngx_parsed_host_str.data, ngx_parsed_host_str.len));
|
|
if (ngx_var != NULL && !ngx_var->not_found && ngx_var->len != 0) {
|
|
ngx_parsed_host.len = ngx_var->len;
|
|
ngx_parsed_host.data = ngx_var->data;
|
|
} else {
|
|
ngx_parsed_host.len = maybe_host.len;
|
|
ngx_parsed_host.data = maybe_host.data;
|
|
}
|
|
|
|
if (request->uri.len != 0) {
|
|
parsed_uri.data = request->uri.data;
|
|
parsed_uri.len = request->uri.len;
|
|
} else {
|
|
parsed_uri.data = request->unparsed_uri.data;
|
|
parsed_uri.len = request->unparsed_uri.len;
|
|
}
|
|
|
|
// Add host data length to the fragments.
|
|
set_fragment_elem(
|
|
fragments,
|
|
fragments_sizes,
|
|
&maybe_host.len,
|
|
sizeof(uint16_t),
|
|
HOST_NAME_SIZE + 2
|
|
);
|
|
// Add host data to the fragments.
|
|
set_fragment_elem(
|
|
fragments,
|
|
fragments_sizes,
|
|
maybe_host.data,
|
|
maybe_host.len,
|
|
HOST_NAME_DATA + 2
|
|
);
|
|
|
|
convert_sock_addr_to_string(((struct sockaddr *)request->connection->local_sockaddr), listening_ip);
|
|
listening_ip_len = strlen(listening_ip);
|
|
// Add listening IP metadata.
|
|
set_fragment_elem(fragments, fragments_sizes, &listening_ip_len, sizeof(uint16_t), LISTENING_ADDR_SIZE + 2);
|
|
set_fragment_elem(fragments, fragments_sizes, listening_ip, listening_ip_len, LISTENING_ADDR_DATA + 2);
|
|
|
|
// Add listening port data.
|
|
listening_port = htons(((struct sockaddr_in *)request->connection->local_sockaddr)->sin_port);
|
|
set_fragment_elem(fragments, fragments_sizes, &listening_port, sizeof(listening_port), LISTENING_PORT + 2);
|
|
|
|
// Add URI data.
|
|
set_fragment_elem(fragments, fragments_sizes, &request->unparsed_uri.len, sizeof(uint16_t), URI_SIZE + 2);
|
|
set_fragment_elem(fragments, fragments_sizes, request->unparsed_uri.data, request->unparsed_uri.len, URI_DATA + 2);
|
|
|
|
// Add client IP data length.
|
|
client_ip_len = strlen(client_ip);
|
|
set_fragment_elem(fragments, fragments_sizes, &client_ip_len, sizeof(uint16_t), CLIENT_ADDR_SIZE + 2);
|
|
set_fragment_elem(fragments, fragments_sizes, client_ip, client_ip_len, CLIENT_ADDR_DATA + 2);
|
|
|
|
// Add client IP data.
|
|
client_port = htons(((struct sockaddr_in *)request->connection->sockaddr)->sin_port);
|
|
set_fragment_elem(fragments, fragments_sizes, &client_port, sizeof(client_port), CLIENT_PORT + 2);
|
|
|
|
// Add NGX parsed host data.
|
|
set_fragment_elem(fragments, fragments_sizes, &ngx_parsed_host.len, sizeof(uint16_t), PARSED_HOST_SIZE + 2);
|
|
set_fragment_elem(fragments, fragments_sizes, ngx_parsed_host.data, ngx_parsed_host.len, PARSED_HOST_DATA + 2);
|
|
|
|
// Add parsed URI data.
|
|
set_fragment_elem(fragments, fragments_sizes, &parsed_uri.len, sizeof(uint16_t), PARSED_URI_SIZE + 2);
|
|
set_fragment_elem(fragments, fragments_sizes, parsed_uri.data, parsed_uri.len, PARSED_URI_DATA + 2);
|
|
|
|
// Sends all the data to the nano service.
|
|
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, META_DATA_COUNT + 2, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
|
|
if (res != NGX_OK) {
|
|
// Failed to send the metadata to nano service.
|
|
if (res == NGX_ERROR && failure_count++ == 5) {
|
|
disconnect_communication();
|
|
failure_count = 0;
|
|
}
|
|
|
|
return res;
|
|
}
|
|
failure_count = 0;
|
|
|
|
set_dbg_by_ctx(
|
|
client_ip,
|
|
listening_ip,
|
|
(char *)request->unparsed_uri.data,
|
|
(char *)(maybe_host.data),
|
|
(char *)request->method_name.data,
|
|
listening_port
|
|
);
|
|
|
|
*num_messages_sent = 1;
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_end_transaction_sender(
|
|
ngx_http_chunk_type_e end_transaction_type,
|
|
uint32_t cur_request_id,
|
|
ngx_uint_t *num_messages_sent
|
|
)
|
|
{
|
|
static const ngx_uint_t end_transaction_num_fragments = 2;
|
|
|
|
char *fragments[end_transaction_num_fragments];
|
|
uint16_t fragments_sizes[end_transaction_num_fragments];
|
|
ngx_int_t res = NGX_ERROR;
|
|
|
|
write_dbg(
|
|
DBG_LEVEL_TRACE,
|
|
"Sending end %s event flag for inspection",
|
|
end_transaction_type == REQUEST_END ? "request" : "response"
|
|
);
|
|
|
|
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&end_transaction_type, &cur_request_id);
|
|
|
|
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
|
|
if (res != NGX_OK) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
*num_messages_sent = 1;
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_wait_sender(uint32_t cur_request_id, ngx_uint_t *num_messages_sent)
|
|
{
|
|
static const ngx_uint_t end_transaction_num_fragments = 2;
|
|
|
|
char *fragments[end_transaction_num_fragments];
|
|
uint16_t fragments_sizes[end_transaction_num_fragments];
|
|
ngx_http_chunk_type_e transaction_type = HOLD_DATA;
|
|
ngx_int_t res;
|
|
|
|
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&transaction_type, &cur_request_id);
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending wait event flag for inspection");
|
|
|
|
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, end_transaction_num_fragments, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
|
|
if (res != NGX_OK) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Successfully sent wait event");
|
|
*num_messages_sent = 1;
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_res_code_sender(uint16_t response_code, uint32_t cur_req_id, ngx_uint_t *num_messages_sent)
|
|
{
|
|
static const ngx_uint_t res_code_num_fragments = 3;
|
|
|
|
char *fragments[res_code_num_fragments];
|
|
uint16_t fragments_sizes[res_code_num_fragments];
|
|
uint16_t chunck_type;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending response code (%d) for inspection", response_code);
|
|
|
|
chunck_type = RESPONSE_CODE;
|
|
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id);
|
|
set_fragment_elem(fragments, fragments_sizes, &response_code, sizeof(uint16_t), 2);
|
|
|
|
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, res_code_num_fragments, cur_req_id, NULL, fail_open_hold_timeout, min_retries_for_verdict) != NGX_OK) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
*num_messages_sent = 1;
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_content_length_sender(uint64_t content_length_n, uint32_t cur_req_id, ngx_uint_t *num_messages_sent)
|
|
{
|
|
static const ngx_uint_t content_length_num_fragments = 3;
|
|
|
|
char *fragments[content_length_num_fragments];
|
|
uint16_t fragments_sizes[content_length_num_fragments];
|
|
uint16_t chunck_type;
|
|
uint64_t content_length_val = content_length_n;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending content length (%ld) to the intaker", content_length_n);
|
|
|
|
chunck_type = CONTENT_LENGTH;
|
|
set_fragments_identifiers(fragments, fragments_sizes, &chunck_type, &cur_req_id);
|
|
set_fragment_elem(fragments, fragments_sizes, &content_length_val, sizeof(content_length_val), 2);
|
|
|
|
if (ngx_http_cp_send_data_to_service(fragments, fragments_sizes, content_length_num_fragments, cur_req_id, NULL, fail_open_timeout, min_retries_for_verdict) != NGX_OK) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
*num_messages_sent = 1;
|
|
return NGX_OK;
|
|
}
|
|
|
|
///
|
|
/// @brief Create a header bulk.
|
|
/// @param[in, out] fragments Fragment data array.
|
|
/// @param[in, out] fragments_sizes Fragment data size array.
|
|
/// @param[in] header Header to add to the fragment array.
|
|
/// @param[in] index Index of the arrays to set the header into.
|
|
///
|
|
static inline void
|
|
add_header_to_bulk(char **fragments, uint16_t *fragments_sizes, ngx_table_elt_t *header, ngx_uint_t index)
|
|
{
|
|
ngx_uint_t pos = index * HEADER_DATA_COUNT;
|
|
set_fragment_elem(fragments, fragments_sizes, &header->key.len, sizeof(uint16_t), pos + HEADER_KEY_SIZE + 4);
|
|
set_fragment_elem(fragments, fragments_sizes, header->key.data, header->key.len, pos + HEADER_KEY_DATA + 4);
|
|
set_fragment_elem(fragments, fragments_sizes, &header->value.len, sizeof(uint16_t), pos + HEADER_VAL_SIZE + 4);
|
|
set_fragment_elem(fragments, fragments_sizes, header->value.data, header->value.len, pos + HEADER_VAL_DATA + 4);
|
|
}
|
|
|
|
///
|
|
/// @brief Send a headers bulk to the nano service.
|
|
/// @param[in, out] data Data array.
|
|
/// @param[in, out] data_sizes Data size array.
|
|
/// @param[in] num_headers Number of headers to be sent.
|
|
/// @param[in] is_last_part Is the header last.
|
|
/// @param[in] bulk_part_index Index of the data bulk.
|
|
/// @param[in, out] num_of_bulks_sent Number of bulks that's been sent will be written here.
|
|
/// @param[in] cur_request_id Request's Id.
|
|
/// @returns ngx_int_t
|
|
/// - #NGX_OK
|
|
/// - #NGX_ERROR
|
|
///
|
|
static ngx_int_t
|
|
send_header_bulk(
|
|
char **data,
|
|
uint16_t *data_sizes,
|
|
const ngx_uint_t num_headers,
|
|
uint8_t is_last_part,
|
|
uint8_t bulk_part_index,
|
|
ngx_uint_t *num_of_bulks_sent,
|
|
uint32_t cur_request_id
|
|
)
|
|
{
|
|
ngx_int_t res;
|
|
set_fragment_elem(data, data_sizes, &is_last_part, sizeof(is_last_part), 2);
|
|
set_fragment_elem(data, data_sizes, &bulk_part_index, sizeof(bulk_part_index), 3);
|
|
|
|
res = ngx_http_cp_send_data_to_service(data, data_sizes, HEADER_DATA_COUNT * num_headers + 4, cur_request_id, NULL, fail_open_timeout, min_retries_for_verdict);
|
|
if (res != NGX_OK) {
|
|
write_dbg(DBG_LEVEL_TRACE, "Failed to send bulk of %iu headers", num_headers);
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
(*num_of_bulks_sent)++;
|
|
write_dbg(DBG_LEVEL_TRACE, "Successfully sent bulk of %iu headers", num_headers);
|
|
return NGX_OK;
|
|
}
|
|
|
|
///
|
|
/// @brief Send an empty headers list to the nano service.
|
|
/// @param[in, out] sent_data Data array.
|
|
/// @param[in, out] sent_data_sizes Data size array.
|
|
/// @param[in, out] num_of_bulks_sent Number of bulks that's been sent will be written here.
|
|
/// @param[in] cur_request_id Request's Id.
|
|
/// @returns ngx_int_t
|
|
/// - #NGX_OK
|
|
/// - #NGX_ERROR
|
|
///
|
|
static ngx_int_t
|
|
send_empty_header_list(
|
|
char **sent_data,
|
|
uint16_t *sent_data_sizes,
|
|
ngx_uint_t *num_of_bulks_sent,
|
|
uint32_t cur_request_id)
|
|
{
|
|
static ngx_table_elt_t empty_header = {
|
|
.hash = 1,
|
|
.key = { .len = 0, .data = (u_char *)"" },
|
|
.value = { .len = 0, .data = (u_char *)"" },
|
|
.lowcase_key = NULL
|
|
};
|
|
|
|
add_header_to_bulk(sent_data, sent_data_sizes, &empty_header, 0);
|
|
if(send_header_bulk(sent_data, sent_data_sizes, 1, 1, 0, num_of_bulks_sent, cur_request_id) != NGX_OK) {
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_header_sender(
|
|
ngx_list_part_t *headers_list,
|
|
ngx_http_chunk_type_e header_type,
|
|
uint32_t cur_request_id,
|
|
ngx_uint_t *num_messages_sent,
|
|
ngx_str_t *waf_tag
|
|
)
|
|
{
|
|
ngx_uint_t header_idx = 0;
|
|
ngx_uint_t idx_in_bulk = 0;
|
|
ngx_uint_t num_of_bulks_sent = 0;
|
|
uint8_t part_count = 0;
|
|
uint8_t bulk_part_idx = 0;
|
|
ngx_int_t send_bulk_result;
|
|
uint8_t is_last_part;
|
|
ngx_list_part_t *headers_iter;
|
|
ngx_table_elt_t *headers_to_inspect;
|
|
ngx_table_elt_t *header;
|
|
const ngx_uint_t max_bulk_size = 10;
|
|
char *fragments[HEADER_DATA_COUNT * max_bulk_size + 4];
|
|
uint16_t fragments_sizes[HEADER_DATA_COUNT * max_bulk_size + 4];
|
|
ngx_flag_t waf_tag_found = 0;
|
|
|
|
write_dbg(
|
|
DBG_LEVEL_TRACE,
|
|
"Sending %s headers for inspection",
|
|
header_type == REQUEST_HEADER ? "request" : "response"
|
|
);
|
|
|
|
// Sets fragments identifier to the provided body type.
|
|
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&header_type, &cur_request_id);
|
|
|
|
// If waf_tag is provided and valid, check for existing x-waf-tag headers
|
|
if (waf_tag != NULL && waf_tag->len > 0) {
|
|
for (headers_iter = headers_list; headers_iter; headers_iter = headers_iter->next) {
|
|
headers_to_inspect = headers_iter->elts;
|
|
for (header_idx = 0; header_idx < headers_iter->nelts; ++header_idx) {
|
|
header = headers_to_inspect + header_idx;
|
|
if (header->key.len == 9 && ngx_strncasecmp(header->key.data, (u_char *)"x-waf-tag", 9) == 0) {
|
|
// Found existing x-waf-tag header, override its value
|
|
// header->value = *waf_tag;
|
|
waf_tag_found = 1;
|
|
write_dbg(DBG_LEVEL_DEBUG, "Overriding existing x-waf-tag header with value: %.*s", waf_tag->len, waf_tag->data);
|
|
break;
|
|
}
|
|
}
|
|
if (waf_tag_found) break;
|
|
}
|
|
|
|
// If no existing x-waf-tag header found, add a new one
|
|
if (!waf_tag_found) {
|
|
ngx_table_elt_t waf_header;
|
|
waf_header.hash = 1;
|
|
ngx_str_set(&waf_header.key, "x-waf-tag");
|
|
waf_header.value = *waf_tag;
|
|
waf_header.lowcase_key = NULL; // Not needed for sending to agent
|
|
|
|
add_header_to_bulk(fragments, fragments_sizes, &waf_header, idx_in_bulk);
|
|
idx_in_bulk++;
|
|
part_count++;
|
|
write_dbg(DBG_LEVEL_DEBUG, "Adding new x-waf-tag header with value: %.*s", waf_tag->len, waf_tag->data);
|
|
}
|
|
}
|
|
|
|
for (headers_iter = headers_list; headers_iter ; headers_iter = headers_iter->next) {
|
|
// Going over the header list.
|
|
for (header_idx = 0 ; header_idx < headers_iter->nelts ; ++header_idx) {
|
|
headers_to_inspect = headers_iter->elts;
|
|
header = headers_to_inspect + header_idx;
|
|
|
|
write_dbg(
|
|
DBG_LEVEL_TRACE,
|
|
"Sending current header (key: '%.*s', value: '%.*s') to inspection",
|
|
header->key.len,
|
|
header->key.data,
|
|
header->value.len,
|
|
header->value.data
|
|
);
|
|
|
|
is_last_part = (headers_iter->next == NULL && header_idx + 1 == headers_iter->nelts) ? 1 : 0;
|
|
// Create a header bulk to send.
|
|
if (waf_tag_found && header->key.len == 9 && ngx_strncasecmp(header->key.data, (u_char *)"x-waf-tag", 9) == 0) {
|
|
ngx_table_elt_t waf_header;
|
|
waf_header.hash = 1;
|
|
ngx_str_set(&waf_header.key, "x-waf-tag");
|
|
waf_header.value = *waf_tag;
|
|
waf_header.lowcase_key = NULL;
|
|
add_header_to_bulk(fragments, fragments_sizes, &waf_header, idx_in_bulk);
|
|
} else {
|
|
add_header_to_bulk(fragments, fragments_sizes, header, idx_in_bulk);
|
|
}
|
|
|
|
idx_in_bulk++;
|
|
part_count++;
|
|
if (idx_in_bulk < max_bulk_size && !is_last_part) continue;
|
|
|
|
// Send the headers to the nano agent.
|
|
send_bulk_result = send_header_bulk(
|
|
fragments,
|
|
fragments_sizes,
|
|
idx_in_bulk,
|
|
is_last_part,
|
|
bulk_part_idx,
|
|
&num_of_bulks_sent,
|
|
cur_request_id
|
|
);
|
|
if (send_bulk_result != NGX_OK) return NGX_ERROR;
|
|
|
|
if (is_last_part) break;
|
|
|
|
idx_in_bulk = 0;
|
|
bulk_part_idx = part_count;
|
|
}
|
|
}
|
|
|
|
if (part_count == 0 && header_type == RESPONSE_HEADER) {
|
|
// Handling an empty response header.
|
|
write_dbg(DBG_LEVEL_TRACE, "Empty list of headers received. Sending last header message to nano service");
|
|
if (send_empty_header_list(fragments, fragments_sizes, &num_of_bulks_sent, cur_request_id) != NGX_OK) {
|
|
return NGX_ERROR;
|
|
}
|
|
}
|
|
|
|
*num_messages_sent = num_of_bulks_sent;
|
|
|
|
write_dbg(DBG_LEVEL_TRACE, "Exit after inspection of %d headers", part_count);
|
|
return NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_body_sender(
|
|
ngx_chain_t *input,
|
|
ngx_http_chunk_type_e body_type,
|
|
ngx_http_cp_session_data *session_data,
|
|
ngx_int_t *is_last_part,
|
|
ngx_uint_t *num_messages_sent,
|
|
ngx_chain_t **next_elem_to_inspect
|
|
)
|
|
{
|
|
static const ngx_uint_t num_body_chunk_fragments = 5;
|
|
|
|
ngx_chain_t *chain_iter = NULL;
|
|
ngx_buf_t *buf;
|
|
ngx_int_t num_parts_sent;
|
|
ngx_int_t is_empty_chain = 1;
|
|
ngx_int_t res = NGX_ERROR;
|
|
uint8_t is_last_chunk;
|
|
uint8_t part_count;
|
|
ngx_int_t tout_retries = min_retries_for_verdict;
|
|
char *fragments[num_body_chunk_fragments];
|
|
uint16_t fragments_sizes[num_body_chunk_fragments];
|
|
int was_waiting = 0;
|
|
|
|
write_dbg(
|
|
DBG_LEVEL_TRACE,
|
|
"Sending %s body chunk from session id %d for inspection",
|
|
body_type == REQUEST_BODY ? "request" : "response",
|
|
session_data->session_id
|
|
);
|
|
|
|
// Sets fragments identifier to the provided body type.
|
|
set_fragments_identifiers(fragments, fragments_sizes, (uint16_t *)&body_type, &session_data->session_id);
|
|
|
|
num_parts_sent = 0;
|
|
part_count = 0;
|
|
for (chain_iter = input; chain_iter; chain_iter = chain_iter->next) {
|
|
// For each NGINX buffer, fragment the buffer and then send the fragments to the nano service.
|
|
buf = chain_iter->buf;
|
|
is_last_chunk = buf->last_buf ? 1 : 0;
|
|
write_dbg(DBG_LEVEL_TRACE, "Sending last_buf: %d, part_count: %d", buf->last_buf ? 1: 0, part_count);
|
|
|
|
if (buf->last - buf->pos > 0 || is_last_chunk) {
|
|
// Setting the fragments, including in the case of the last chunk.
|
|
set_fragment_elem(fragments, fragments_sizes, &is_last_chunk, sizeof(is_last_chunk), 2);
|
|
set_fragment_elem(fragments, fragments_sizes, &part_count, sizeof(part_count), 3);
|
|
set_fragment_elem(fragments, fragments_sizes, buf->pos, buf->last - buf->pos, 4);
|
|
|
|
if (body_type == REQUEST_BODY) {
|
|
session_data->processed_req_body_size += (buf->last - buf->pos);
|
|
if (session_data->processed_req_body_size > body_size_trigger)
|
|
tout_retries = max_retries_for_verdict;
|
|
} else if (body_type == RESPONSE_BODY) {
|
|
session_data->processed_res_body_size += (buf->last - buf->pos);
|
|
if (session_data->processed_res_body_size > body_size_trigger)
|
|
tout_retries = max_retries_for_verdict;
|
|
}
|
|
// Sending the data to the nano service.
|
|
res = ngx_http_cp_send_data_to_service(fragments, fragments_sizes, num_body_chunk_fragments, session_data->session_id,
|
|
&was_waiting, fail_open_timeout, tout_retries);
|
|
|
|
if (res != NGX_OK) {
|
|
// Failed to send the fragments to the nano service.
|
|
return NGX_ERROR;
|
|
}
|
|
|
|
num_parts_sent++;
|
|
is_empty_chain = 0;
|
|
}
|
|
|
|
part_count++;
|
|
|
|
if (was_waiting) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
*is_last_part = is_last_chunk;
|
|
*num_messages_sent = num_parts_sent;
|
|
|
|
*next_elem_to_inspect = chain_iter;
|
|
|
|
return (!is_empty_chain && num_parts_sent == 0) ? NGX_ERROR : NGX_OK;
|
|
}
|
|
|
|
ngx_int_t
|
|
ngx_http_cp_metric_data_sender()
|
|
{
|
|
char *fragments;
|
|
uint16_t fragments_sizes;
|
|
uint16_t fragment_type;
|
|
ngx_int_t res;
|
|
|
|
write_dbg(DBG_LEVEL_DEBUG, "Sending metric data to service");
|
|
|
|
fragment_type = METRIC_DATA_FROM_PLUGIN;
|
|
ngx_http_cp_metric_data_t data_to_send;
|
|
data_to_send.data_type = fragment_type;
|
|
memcpy(data_to_send.data, metric_data, METRIC_TYPES_COUNT * sizeof(data_to_send.data[0]));
|
|
fragments = (char *)&data_to_send;
|
|
fragments_sizes = sizeof(ngx_http_cp_metric_data_t);
|
|
|
|
res = ngx_http_cp_send_data_to_service(&fragments, &fragments_sizes, 1, 0, NULL, fail_open_timeout, min_retries_for_verdict);
|
|
reset_metric_data();
|
|
return res;
|
|
}
|