attachment/attachments/nginx/ngx_module/ngx_cp_initializer.c
2024-06-09 11:27:25 +00:00

761 lines
23 KiB
C

// Copyright (C) 2022 Check Point Software Technologies Ltd. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// @file ngx_cp_initializer.c
#include "ngx_cp_initializer.h"
#include <poll.h>
#include <stdint.h>
#include <dirent.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <ngx_log.h>
#include <ngx_core.h>
#include <ngx_string.h>
#include <ngx_files.h>
#include "nginx_attachment_common.h"
#include "ngx_cp_io.h"
#include "ngx_cp_utils.h"
#include "ngx_cp_static_content.h"
#include "ngx_cp_compression.h"
#include "attachment_types.h"
#include "ngx_http_cp_attachment_module.h"
typedef enum ngx_cp_attachment_registration_state {
NOT_REGISTERED,
PENDING,
REGISTERED
} ngx_cp_attachment_registration_state_e; ///< Indicates the current attachment registation stage.
char unique_id[MAX_NGINX_UID_LEN] = ""; // Holds the unique identifier for this instance.
char shared_verdict_signal_path[128]; // Holds the path associating the attachment and service.
int registration_socket = -1; // Holds the file descriptor used for registering the instance.
static pthread_t registration_thread;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static ngx_cp_attachment_registration_state_e need_registration = NOT_REGISTERED;
struct sockaddr_un server;
uint32_t nginx_user_id, nginx_group_id; // Hold the process UID and GID respectively.
///
/// @brief Set shared registration value to the provided state.
/// @param[in] ngx_cp_attachment_registration_state_e Provided state that needs to be set.
///
static void
set_need_registration(ngx_cp_attachment_registration_state_e state)
{
pthread_mutex_lock(&mutex);
need_registration = state;
pthread_mutex_unlock(&mutex);
}
///
/// @brief Get shared registration value to the provided state.
/// @returns ngx_cp_attachment_registration_state_e
/// - #NOT_REGISTERED,
/// - #PENDING,
/// - #REGISTERED
///
static ngx_cp_attachment_registration_state_e
get_need_registration()
{
ngx_cp_attachment_registration_state_e state;
pthread_mutex_lock(&mutex);
state = need_registration;
pthread_mutex_unlock(&mutex);
return state;
}
void *
register_workers() {
int num_of_workers = get_saved_num_of_workers();
if (num_of_workers == 0) {
write_dbg(DBG_LEVEL_INFO, "Number of workers is 0, ignore registration");
set_need_registration(NOT_REGISTERED);
return NULL;
}
write_dbg(
DBG_LEVEL_INFO,
"Initiating registration of %d workers to the attachment",
num_of_workers
);
sleep(5);
while (register_to_attachments_manager(num_of_workers) != NGX_OK) {
write_dbg(
DBG_LEVEL_INFO,
"unable to register %d workers to the attachment, will try again in 5 seconds",
num_of_workers
);
if (get_need_registration() != PENDING) {
write_dbg(DBG_LEVEL_INFO, "Drop registration attempt, registration is not needed anymore");
return NULL;
}
sleep(5);
}
set_need_registration(REGISTERED);
return NULL;
}
void
init_attachment_registration_thread()
{
pthread_mutex_lock(&mutex);
if (need_registration != NOT_REGISTERED) {
pthread_mutex_unlock(&mutex);
return;
}
need_registration = PENDING;
pthread_mutex_unlock(&mutex);
int result = pthread_create(&registration_thread, NULL, register_workers, NULL);
if (result != 0) {
write_dbg(DBG_LEVEL_INFO, "Failed to create thread");
}
}
void
reset_attachment_registration()
{
int result = pthread_cancel(registration_thread);
if (result != 0) {
write_dbg(DBG_LEVEL_INFO, "Failed to cancel thread %d", result);
}
set_need_registration(NOT_REGISTERED);
}
int
exchange_communication_data_with_service(
int socket,
void *data,
uint32_t size,
ngx_cp_comm_direction_e direction,
struct timeval *remaining_timeout)
{
int res = 0;
ngx_int_t retry;
// `remaining_size` and `cur_data_ptr` are used to keep track of where we are in the memory.
// This allows us to read to\write from the socket in parts (if we need to).
int remaining_size = size;
char *cur_data_ptr = data;
while (remaining_size > 0) {
if (direction == WRITE_TO_SOCKET){
res = write(socket, (void *)cur_data_ptr, remaining_size);
} else {
// The read operation must not block the attachment indefinitely.
// To avoid that we check whether the socket has data to be read prior to the read operation.
// If the socket doesn't have data to be read from within a reasonable time, we treat this as an error.
for (retry = 0; retry < 3; retry++) {
struct pollfd s_poll;
s_poll.fd = socket;
s_poll.events = POLLIN;
s_poll.revents = 0;
res = poll(&s_poll, 1, 1000);
if (res > 0 && (s_poll.revents & POLLIN) != 0) break; // Socket is ready to be read from
res = -1;
}
if (res != -1) {
res = read(socket, (void *)cur_data_ptr, remaining_size);
}
}
// `res` is -1 in case of an error: either write or read functions failed or socket wasn't available.
if (res < 0) {
close(socket);
socket = -1;
write_dbg(DBG_LEVEL_TRACE, "Failed to communicate with the socket, Error: %s", strerror(errno));
break;
}
remaining_size -= res;
cur_data_ptr += res;
// If the operation exceeded the allowed time, treat it as a failure.
if (is_timeout_reached(remaining_timeout)) {
close(socket);
socket = -1;
write_dbg(DBG_LEVEL_TRACE, "Reached timeout while communicating with the socket");
break;
}
}
return res;
}
///
/// @brief Initialize socket communication with the serive.
/// @returns ngx_int_t
/// - #NGX_OK
/// - #NGX_ERROR
///
static ngx_int_t
init_signaling_socket()
{
uint8_t initialization_ack;
int res = 0;
uint8_t uid_size_to_send = strlen(unique_id);
struct timeval timeout = get_timeout_val_sec(1);
// Close the old socket if there was one.
if (comm_socket > 0) {
close(comm_socket);
comm_socket = -1;
}
// Setup a new socket
comm_socket = socket(AF_UNIX, SOCK_STREAM, 0);
if (comm_socket < 0) {
write_dbg(DBG_LEVEL_WARNING, "Could not create socket, Error: %s", strerror(errno));
return NGX_ERROR;
}
server.sun_family = AF_UNIX;
strncpy(server.sun_path, shared_verdict_signal_path, sizeof(server.sun_path) - 1);
if (connect(comm_socket, (struct sockaddr *)&server, sizeof(struct sockaddr_un)) == -1) {
close(comm_socket);
comm_socket = -1;
write_dbg(
DBG_LEVEL_DEBUG,
"Could not connect to nano service. Path: %s, Error: %s",
server.sun_path,
strerror(errno)
);
return NGX_ERROR;
}
// Pass the following information to the service (in this order):
// 1. The length of the unique identifier for this instance.
// 2. The unique identifier for this instance.
// 3. The process UID.
// 4. The process GID.
// If any of them fail - return an error.
res = exchange_communication_data_with_service(
comm_socket,
&uid_size_to_send,
sizeof(uid_size_to_send),
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send unique id size");
return NGX_ERROR;
}
res = exchange_communication_data_with_service(
comm_socket,
unique_id,
uid_size_to_send,
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send unique id %s", unique_id);
return NGX_ERROR;
}
res = exchange_communication_data_with_service(comm_socket, &nginx_user_id, sizeof(uint32_t), WRITE_TO_SOCKET, &timeout);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send nginx user id");
return NGX_ERROR;
}
res = exchange_communication_data_with_service(
comm_socket,
&nginx_group_id,
sizeof(uint32_t),
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send nginx group id");
return NGX_ERROR;
}
// Get an acknowledgement form the service that communication has been established.
timeout = get_timeout_val_sec(1);
res = exchange_communication_data_with_service(
comm_socket,
&initialization_ack,
sizeof(initialization_ack),
READ_FROM_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to read registration ack");
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_DEBUG, "Successfully connected on client socket %d", comm_socket);
return NGX_OK;
}
#define MAX_CONTAINER_LEN 12 // Maximum size for container identifier
ngx_int_t
get_docker_id(char **_docker_id)
{
// We keep the container ID as a static variable so we won't have to read it multiple times.
// The `already_evaluated` variable indicate if we already have the identifier.
static char docker_id[MAX_CONTAINER_LEN + 1];
static int already_evaluated = 0;
const char *container_id_file_path = "/proc/self/cgroup";
if (already_evaluated) {
// Already found the identifier before, just return the answer.
*_docker_id = docker_id;
return NGX_OK;
}
docker_id[0] = '\0';
FILE *file = fopen(container_id_file_path, "r");
if (file == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Failed to open %s", container_id_file_path);
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_DEBUG, "opened file %s", container_id_file_path);
// Reading the file line by line.
char *line = NULL;
size_t len = 0;
while (getline(&line, &len, file) != -1) {
char *docker_ptr = strstr(line, "docker/");
if (docker_ptr == NULL) continue;
// We've found a line with "docker/" so the identifier will be right after that.
docker_ptr += strlen("docker/");
snprintf(docker_id, MAX_CONTAINER_LEN + 1, "%s", docker_ptr);
break;
}
free(line);
fclose(file);
// Return the answer and set the indication so we won't have to
*_docker_id = docker_id;
already_evaluated = 1;
return NGX_OK;
}
ngx_int_t
register_to_attachments_manager(ngx_int_t num_of_workers)
{
uint8_t path_length;
int res = 0;
uint8_t family_name_size = strlen(unique_id);
uint8_t attachment_type = NGINX_ATT_ID;
uint8_t worker_id = ngx_worker + 1;
uint8_t workers_amount = num_of_workers;
char *family_name = NULL;
int cur_errno = 0; // temp fix for errno changing during print
struct timeval timeout = get_timeout_val_sec(1);
if (get_docker_id(&family_name) == NGX_ERROR) {
write_dbg(DBG_LEVEL_WARNING, "Could not evaluate family name");
return NGX_ERROR;
}
family_name_size = strlen(family_name);
// If there was an old socket, close it.
if (registration_socket > 0) {
close(registration_socket);
registration_socket = -1;
}
// Connect a new socket.
registration_socket = socket(AF_UNIX, SOCK_STREAM, 0);
if (registration_socket < 0) {
write_dbg(DBG_LEVEL_WARNING, "Could not create socket, Error: %s", strerror(errno));
return NGX_ERROR;
}
server.sun_family = AF_UNIX;
strncpy(server.sun_path, SHARED_REGISTRATION_SIGNAL_PATH, sizeof(server.sun_path) - 1);
if (connect(registration_socket, (struct sockaddr *)&server, sizeof(struct sockaddr_un)) == -1) {
cur_errno = errno;
close(registration_socket);
registration_socket = -1;
write_dbg(
DBG_LEVEL_DEBUG,
"Could not connect to nano service. Path: %s, Error: %s, Errno: %d",
server.sun_path,
strerror(errno),
cur_errno
);
if (cur_errno == ENOENT) {
strncpy(shared_verdict_signal_path, SHARED_VERDICT_SIGNAL_PATH, 128);
return NGX_OK;
}
return NGX_ERROR;
}
// Send to the attachment manager the following details:
// 1. The type of the attachment (fixed NGINX).
// 2. The number of this worker.
// 3. The total amount of workers.
// 4. The size of the docker ID.
// 5. If the docker ID isn't empty (size 0), the docker id itself.
// If any of these fail - return an error.
res = exchange_communication_data_with_service(
registration_socket,
&attachment_type,
sizeof(attachment_type),
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send attachment type");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
res = exchange_communication_data_with_service(
registration_socket,
&worker_id,
sizeof(worker_id),
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send worker ID");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
res = exchange_communication_data_with_service(
registration_socket,
&workers_amount,
sizeof(workers_amount),
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send workers amount");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
res = exchange_communication_data_with_service(
registration_socket,
&family_name_size,
sizeof(family_name_size),
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send family name size");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
if (family_name_size > 0) {
res = exchange_communication_data_with_service(
registration_socket,
family_name,
family_name_size,
WRITE_TO_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to send family name");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
}
// Read from the attachment manager:
// 1. The length of signal path.
// 2. The signal path itself.
// If that fails - return an error.
timeout = get_timeout_val_sec(1);
res = exchange_communication_data_with_service(
registration_socket,
&path_length,
sizeof(path_length),
READ_FROM_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to read path length");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
res = exchange_communication_data_with_service(
registration_socket,
shared_verdict_signal_path,
path_length,
READ_FROM_SOCKET,
&timeout
);
if (res <= 0) {
write_dbg(DBG_LEVEL_WARNING, "Failed to read socket path");
close(registration_socket);
registration_socket = -1;
return NGX_ERROR;
}
// Successfully go the shared communication path - add null termination and exit.
shared_verdict_signal_path[path_length] = '\0';
int32_t dbg_id = worker_id;
int32_t dbg_size = workers_amount_to_send;
write_dbg(
DBG_LEVEL_DEBUG,
"Successfully registered on client. socket: %d, instance ID: %d, instances amount: %d, received path: %s",
registration_socket,
dbg_id,
dbg_size,
shared_verdict_signal_path
);
close(registration_socket);
registration_socket = -1;
return NGX_OK;
}
const char *
get_unique_id()
{
return unique_id;
}
static ngx_int_t
set_unique_id()
{
int is_container_env = 0;
static const int max_container_id_len = 12;
const char *container_id_file_path = "/proc/self/cgroup";
unsigned int unique_id_size = 0;
if (strlen(unique_id) > 0) return NGX_OK;
FILE *file = fopen(container_id_file_path, "r");
if (file == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Failed to open %s", container_id_file_path);
return NGX_ERROR;
}
char *line = NULL;
char docker_id[max_container_id_len + 1];
memset(docker_id, '\0', max_container_id_len + 1);
size_t len = 0;
while (getline(&line, &len, file) != -1) {
char *docker_ptr = strstr(line, "docker/");
if (docker_ptr == NULL) continue;
is_container_env = 1;
docker_ptr += strlen("docker/");
snprintf(docker_id, max_container_id_len + 1, "%s", docker_ptr);
break;
}
free(line);
fclose(file);
long unsigned int ngx_worker_id = ngx_worker + 1;
if (is_container_env) {
unique_id_size += strlen(docker_id) + 1 + get_number_of_digits(ngx_worker_id) + 1;
snprintf(unique_id, unique_id_size, "%s_%lu", docker_id, ngx_worker_id);
} else {
unique_id_size += get_number_of_digits(ngx_worker_id) + 1;
snprintf(unique_id, unique_id_size, "%lu", ngx_worker_id);
}
write_dbg(DBG_LEVEL_INFO, "Successfully set attachment's unique_id: '%s'", unique_id);
return NGX_OK;
}
ngx_int_t
ngx_cp_attachment_init_process(ngx_http_request_t *request)
{
ngx_pool_t *memory_pool;
nginx_user_id = getuid();
nginx_group_id = getgid();
num_of_connection_attempts++;
// Best-effort attempt to read the configuration before we start.
init_general_config(SHARED_ATTACHMENT_CONF_PATH);
// Initalizing the various elements of the system (if needed):
// 1. Get the unique identifier.
// 2. Register with the attachment manager.
// 3. Signal to the service to start communication.
// 4. Make sure that the configuration is up-to-date.
if (access(SHARED_REGISTRATION_SIGNAL_PATH, F_OK) != 0) {
write_dbg(DBG_LEVEL_TRACE, "Attachment registration manager is turned off");
return NGX_ABORT;
}
if (strncmp(unique_id, "", 1) == 0) {
write_dbg(DBG_LEVEL_DEBUG, "Setting attachment's unique id");
if (set_unique_id() == NGX_ERROR) {
write_dbg(DBG_LEVEL_INFO, "Failed to set attachment's unique_id");
return NGX_ERROR;
}
}
if (get_need_registration() == PENDING) {
write_dbg(DBG_LEVEL_INFO, "Registration to the Attachments Manager is in process");
return NGX_ERROR;
}
if (get_need_registration() == NOT_REGISTERED) {
if (register_to_attachments_manager(get_num_of_workers(request)) == NGX_ERROR) {
write_dbg(DBG_LEVEL_INFO, "Failed to register to Attachments Manager service");
return NGX_ERROR;
}
set_need_registration(REGISTERED);
}
if (comm_socket < 0) {
write_dbg(DBG_LEVEL_DEBUG, "Registering to nano service");
if (init_signaling_socket() == NGX_ERROR) {
write_dbg(DBG_LEVEL_DEBUG, "Failed to register to the Nano Service");
pthread_mutex_lock(&mutex);
if (need_registration != PENDING) {
need_registration = NOT_REGISTERED;
}
pthread_mutex_unlock(&mutex);
return NGX_ERROR;
}
}
if (init_general_config(SHARED_ATTACHMENT_CONF_PATH) == NGX_ERROR) {
write_dbg(DBG_LEVEL_INFO, "Failed to initialize attachment's configuration");
return NGX_ERROR;
}
// Initalize the the communication channel with the service.
// If we encounter repeated failures - we will restart the whole communication.
int max_retry_count = 10;
if (nano_service_ipc == NULL) {
write_dbg(DBG_LEVEL_INFO, "Initializing IPC channel");
nano_service_ipc = initIpc(
unique_id,
nginx_user_id,
nginx_group_id,
0,
num_of_nginx_ipc_elements,
write_dbg_impl
);
if (nano_service_ipc == NULL) {
if (max_retry_count-- == 0) {
restart_communication(request);
}
write_dbg(DBG_LEVEL_INFO, "Failed to initialize IPC with nano service");
return NGX_ERROR;
}
}
// Initialize internal resources.
if (!is_static_resources_table_initialized()) {
memory_pool = get_memory_pool();
if (memory_pool == NULL) {
write_dbg(DBG_LEVEL_WARNING, "Cannot initialize static resources. No memory pool has been allocated.");
return NGX_ERROR;
}
write_dbg(DBG_LEVEL_DEBUG, "Initializing static resources");
if (init_static_resources(memory_pool) != NGX_OK) {
write_dbg(DBG_LEVEL_WARNING, "Failed to initialize static resources");
return NGX_ERROR;
}
}
if (!is_compression_debug_printing_initialized()) {
write_dbg(DBG_LEVEL_DEBUG, "Initializing compression debug message printing");
initialize_compression_debug_printing();
}
// we want to indicate about successful registration only once in default level
write_dbg(dbg_is_needed ? DBG_LEVEL_DEBUG : DBG_LEVEL_INFO, "NGINX attachment (UID='%s') successfully registered to nano service after %d attempts.", unique_id, num_of_connection_attempts);
dbg_is_needed = 1;
num_of_connection_attempts = 0;
return NGX_OK;
}
int
restart_communication(ngx_http_request_t *request)
{
write_dbg(DBG_LEVEL_TRACE, "Restarting communication channels with nano service");
if (nano_service_ipc != NULL) {
destroyIpc(nano_service_ipc, 0);
nano_service_ipc = NULL;
}
if (init_signaling_socket() == NGX_ERROR) {
if (register_to_attachments_manager(get_num_of_workers(request)) == NGX_ERROR) {
write_dbg(DBG_LEVEL_DEBUG, "Failed to register to Attachments Manager service");
return -1;
}
if (init_signaling_socket() == NGX_ERROR) return -1;
}
nano_service_ipc = initIpc(unique_id, nginx_user_id, nginx_group_id, 0, num_of_nginx_ipc_elements, write_dbg_impl);
if (nano_service_ipc == NULL) return -2;
return 0;
}
void
disconnect_communication()
{
if (comm_socket > 0) {
close(comm_socket);
comm_socket = -1;
}
if (nano_service_ipc != NULL) {
destroyIpc(nano_service_ipc, 0);
nano_service_ipc = NULL;
}
set_need_registration(NOT_REGISTERED);
}
ngx_int_t
handle_shmem_corruption()
{
if (nano_service_ipc == NULL) return NGX_OK;
if (isCorruptedShmem(nano_service_ipc, 0)) {
write_dbg(DBG_LEVEL_WARNING, "Shared memory is corrupted! restarting communication");
disconnect_communication();
return NGX_ERROR;
}
return NGX_OK;
}
ngx_int_t
isIpcReady()
{
return nano_service_ipc != NULL && comm_socket > 0;
}