From 206eb02bd19f2e856bf932645a43d1573ebc4d57 Mon Sep 17 00:00:00 2001 From: b1v1r Date: Fri, 24 Jul 2009 05:04:55 +0000 Subject: [PATCH] Allow mlogc to periodically flush memory pools (MODSEC-68). --- CHANGES | 2 + apache2/mlogc-src/mlogc.c | 99 +++++++++++++++++++++++++++------------ 2 files changed, 72 insertions(+), 29 deletions(-) diff --git a/CHANGES b/CHANGES index 9537ef53..3b30bb90 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,8 @@ 23 July 2009 - 2.5.10-dev1 -------------------------- + * Allow mlogc to periodically flush memory pools. + * Using nolog,auditlog will now log the "Message:" line to the auditlog, but nothing to the error log. Prior versions dropped the "Message:" line from both logs. To do this now, just use "nolog" or "nolog,noauditlog". diff --git a/apache2/mlogc-src/mlogc.c b/apache2/mlogc-src/mlogc.c index b7aacdd0..de540cc5 100644 --- a/apache2/mlogc-src/mlogc.c +++ b/apache2/mlogc-src/mlogc.c @@ -154,6 +154,8 @@ int max_connections = 10; apr_global_mutex_t *gmutex = NULL; apr_thread_mutex_t *mutex = NULL; apr_pool_t *pool = NULL; +apr_pool_t *thread_pool = NULL; +apr_pool_t *recv_pool = NULL; apr_array_header_t *queue = NULL; const char *queue_path = NULL; /* apr_time_t queue_time = 0; */ @@ -176,7 +178,7 @@ int opt_force = 0; /* -- Code -- */ -static char *_log_escape(const char *input, apr_size_t input_len) +static char *_log_escape(apr_pool_t *mp, const char *input, apr_size_t input_len) { static const char c2x_table[] = "0123456789abcdef"; unsigned char *d = NULL; @@ -185,7 +187,7 @@ static char *_log_escape(const char *input, apr_size_t input_len) if (input == NULL) return NULL; - ret = apr_palloc(pool, input_len * 4 + 1); + ret = apr_palloc(mp, input_len * 4 + 1); if (ret == NULL) return NULL; d = (unsigned char *)ret; @@ -565,6 +567,7 @@ static void transaction_checkpoint(void) apr_hash_index_t *hi = NULL; char msg[256]; int i; + apr_pool_t *cpool; apr_snprintf(new_queue_path, sizeof(new_queue_path), "%s.new", queue_path); apr_snprintf(old_queue_path, sizeof(old_queue_path), "%s.old", queue_path); @@ -584,12 +587,15 @@ static void transaction_checkpoint(void) error_log(LOG_DEBUG, NULL, "Checkpoint started."); + apr_pool_create(&cpool, NULL); + /* Dump active entries into a new queue file. */ if (apr_file_open(&queue_fd, new_queue_path, APR_WRITE | APR_CREATE - | APR_EXCL | APR_TRUNCATE | APR_FILE_NOCLEANUP, APR_OS_DEFAULT, pool) != APR_SUCCESS) + | APR_EXCL | APR_TRUNCATE | APR_FILE_NOCLEANUP, APR_OS_DEFAULT, cpool) != APR_SUCCESS) { error_log(LOG_ERROR, NULL, "Failed to create file: %s", new_queue_path); error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex."); + apr_pool_destroy(cpool); apr_global_mutex_unlock(gmutex); return; } @@ -622,14 +628,16 @@ static void transaction_checkpoint(void) apr_file_close(queue_fd); /* Switch the files and truncate the transaction log file. */ - apr_file_remove(old_queue_path, pool); - apr_file_rename(queue_path, old_queue_path, pool); - apr_file_rename(new_queue_path, queue_path, pool); - apr_file_remove(old_queue_path, pool); + apr_file_remove(old_queue_path, cpool); + apr_file_rename(queue_path, old_queue_path, cpool); + apr_file_rename(new_queue_path, queue_path, cpool); + apr_file_remove(old_queue_path, cpool); apr_file_trunc(transaction_log_fd, 0); error_log(LOG_DEBUG, NULL, "Checkpoint completed."); + apr_pool_destroy(cpool); + /* Unlock and exit. */ error_log(LOG_DEBUG, NULL, "Checkpoint unlocking global mutex."); apr_global_mutex_unlock(gmutex); @@ -1016,6 +1024,7 @@ size_t curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) int curl_debugfunction(CURL *curl, curl_infotype infotype, char *data, size_t datalen, void *ourdata) { apr_size_t i, effectivelen; + apr_thread_t *thread = (apr_thread_t *)ourdata; if (error_log_level < LOG_DEBUG) return 0; @@ -1029,21 +1038,21 @@ int curl_debugfunction(CURL *curl, curl_infotype infotype, char *data, size_t da if (error_log_level >= LOG_DEBUG) { if (infotype == CURLINFO_TEXT) { - error_log(LOG_DEBUG, ourdata, "CURL: %s", _log_escape(data, effectivelen)); + error_log(LOG_DEBUG, thread, "CURL: %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); } } if (error_log_level >= LOG_DEBUG2) { if (infotype == CURLINFO_HEADER_IN) { - error_log(LOG_DEBUG2, ourdata, "CURL: HEADER_IN %s", _log_escape(data, effectivelen)); + error_log(LOG_DEBUG2, thread, "CURL: HEADER_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); } else if (infotype == CURLINFO_HEADER_OUT) { - error_log(LOG_DEBUG2, ourdata, "CURL: HEADER_OUT %s", _log_escape(data, effectivelen)); + error_log(LOG_DEBUG2, thread, "CURL: HEADER_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); } else if (infotype == CURLINFO_DATA_IN) { - error_log(LOG_DEBUG2, ourdata, "CURL: DATA_IN %s", _log_escape(data, effectivelen)); + error_log(LOG_DEBUG2, thread, "CURL: DATA_IN %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); } else if (infotype == CURLINFO_DATA_OUT) { - error_log(LOG_DEBUG2, ourdata, "CURL: DATA_OUT %s", _log_escape(data, effectivelen)); + error_log(LOG_DEBUG2, thread, "CURL: DATA_OUT %s", _log_escape(apr_thread_pool_get(thread), data, effectivelen)); } } @@ -1173,7 +1182,9 @@ static void keep_entries_hack(apr_pool_t *mp, apr_thread_t *thread, const char * return; } - error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", fn, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime); + if (error_log_level >= LOG_DEBUG) { + error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", fn, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime); + } if (finfo.mtime != KEEP_ENTRIES_REMOVE_TIME) { error_log(LOG_DEBUG2, thread, "Set mtime: %s", fn); @@ -1219,13 +1230,13 @@ static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data) int nodelay = 0; + error_log(LOG_DEBUG, thread, "Worker thread starting."); + /* Each worker uses its own pool to manage memory. To avoid * memory leaks the pool is cleared after each processed * entry. */ - apr_pool_create(&tpool, NULL); - - error_log(LOG_DEBUG, thread, "Worker thread starting."); + apr_pool_create(&tpool, thread_pool); /* Process jobs in a queue until there are no more jobs to process. */ for(;;) { @@ -1303,12 +1314,12 @@ static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data) rc = pcre_exec(logline_regex, NULL, entry->line, entry->line_size, 0, 0, capturevector, CAPTUREVECTORSIZE); if (rc == PCRE_ERROR_NOMATCH) { /* No match. */ - error_log(LOG_WARNING, thread, "Invalid entry (failed to match regex): %s", _log_escape(entry->line, entry->line_size)); + error_log(LOG_WARNING, thread, "Invalid entry (failed to match regex): %s", _log_escape(tpool, entry->line, entry->line_size)); take_new = 1; nodelay = 1; } else if (rc < 0) { /* Error condition. */ - error_log(LOG_WARNING, thread, "Invalid entry (PCRE error %d): %s", rc, _log_escape(entry->line, entry->line_size)); + error_log(LOG_WARNING, thread, "Invalid entry (PCRE error %d): %s", rc, _log_escape(tpool, entry->line, entry->line_size)); take_new = 1; nodelay = 1; } @@ -1339,7 +1350,9 @@ static void * APR_THREAD_FUNC thread_worker(apr_thread_t *thread, void *data) char response_buf[STATUSBUF_SIZE]; CURLcode res; - error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", auditlogentry, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime); + if (error_log_level >= LOG_DEBUG) { + error_log(LOG_DEBUG, thread, "STAT \"%s\" {uid=%d; gid=%d; size=%" APR_OFF_T_FMT "; csize=%" APR_OFF_T_FMT "; atime=%" APR_TIME_T_FMT "; ctime=%" APR_TIME_T_FMT "; mtime=%" APR_TIME_T_FMT "}", auditlogentry, finfo.user, finfo.group, finfo.size, finfo.csize, finfo.atime, finfo.ctime, finfo.mtime); + } /* Initialize the respone buffer with a hidden value */ response_buf[0] = 0; @@ -1561,16 +1574,27 @@ static void create_new_worker(int lock) return; } + /* Cleanup thread pool when idle */ + if (current_workers <= 0) { + if (thread_pool != NULL) { + error_log(LOG_DEBUG, NULL, "Destroying thread_pool."); + apr_pool_destroy(thread_pool); + thread_pool = NULL; + } + error_log(LOG_DEBUG, NULL, "Creating thread_pool."); + apr_pool_create(&thread_pool, NULL); + } + curlptr = (CURL **)apr_array_pop(curl_handles); if (curlptr != NULL) { apr_threadattr_t *thread_attrs; apr_status_t rc; - apr_threadattr_create(&thread_attrs, pool); + apr_threadattr_create(&thread_attrs, thread_pool); apr_threadattr_detach_set(thread_attrs, 1); apr_threadattr_stacksize_set(thread_attrs, 1024); - rc = apr_thread_create(&thread, thread_attrs, thread_worker, *curlptr, pool); + rc = apr_thread_create(&thread, thread_attrs, thread_worker, *curlptr, thread_pool); if (rc != APR_SUCCESS) { if (lock) { error_log(LOG_DEBUG, NULL, "Worker creation unlocking thread mutex."); @@ -1711,6 +1735,8 @@ static void receive_loop(void) { int done = 0; int drop_next = 0; int buffered_events = 0; + int count = 0; + apr_pool_t *tmp_pool; /* Open stdin. */ if (apr_file_open_stdin(&fd_stdin, pool) != APR_SUCCESS) { @@ -1721,11 +1747,15 @@ static void receive_loop(void) { /* Always want this NUL terminated */ buf[PIPE_BUF_SIZE] = '\0'; + apr_pool_create(&tmp_pool, NULL); + /* Loop forever receiving entries from stdin. */ while(!done || (curr < next)) { apr_status_t rc; - error_log(LOG_DEBUG2, NULL, "Internal state: [evnt \"%" APR_SIZE_T_FMT "\"][curr \"%" APR_SIZE_T_FMT "\"][next \"%" APR_SIZE_T_FMT "\"][nbytes \"%" APR_SIZE_T_FMT "\"]", evnt, curr, next, nbytes); + if (error_log_level >= LOG_DEBUG2) { + error_log(LOG_DEBUG2, NULL, "Internal state: [evnt \"%" APR_SIZE_T_FMT "\"][curr \"%" APR_SIZE_T_FMT "\"][next \"%" APR_SIZE_T_FMT "\"][nbytes \"%" APR_SIZE_T_FMT "\"]", evnt, curr, next, nbytes); + } /* If we are not done and have the space, read more */ if (!done && (nbytes > 0)) { @@ -1744,7 +1774,7 @@ static void receive_loop(void) { error_log(LOG_DEBUG, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe", nbytes); } else { - error_log(LOG_DEBUG2, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe: `%s'", nbytes, _log_escape((buf + next), nbytes)); + error_log(LOG_DEBUG2, NULL, "Read %" APR_SIZE_T_FMT " bytes from pipe: `%s'", nbytes, _log_escape(tmp_pool, (buf + next), nbytes)); } } @@ -1768,13 +1798,13 @@ static void receive_loop(void) { /* We may have to drop this one if it previously failed */ if (drop_next) { - error_log(LOG_ERROR, NULL, "Dropping remaining portion of failed event: `%s'", _log_escape((buf + evnt), (curr - evnt))); + error_log(LOG_ERROR, NULL, "Dropping remaining portion of failed event: `%s'", _log_escape(tmp_pool, (buf + evnt), (curr - evnt))); drop_next = 0; } else { transaction_log(IN, buf + evnt); error_log(LOG_DEBUG2, NULL, "Received audit log entry (count %lu queue %d workers %d): %s", - entry_counter, queue->nelts, current_workers, _log_escape((buf + evnt), strlen(buf + evnt))); + entry_counter, queue->nelts, current_workers, _log_escape(tmp_pool, (buf + evnt), strlen(buf + evnt))); add_entry(buf + evnt, 1); buffered_events++; } @@ -1783,7 +1813,7 @@ static void receive_loop(void) { evnt = curr = curr + 1; } else { - error_log(LOG_DEBUG2, NULL, "Event buffer contains partial event: `%s'", _log_escape((buf + evnt), (next - evnt))); + error_log(LOG_DEBUG2, NULL, "Event buffer contains partial event: `%s'", _log_escape(tmp_pool, (buf + evnt), (next - evnt))); break; } } @@ -1797,7 +1827,7 @@ static void receive_loop(void) { curr -= evnt; memmove(buf, (buf + evnt), next); - error_log(LOG_DEBUG2, NULL, "Shifted buffer back %" APR_SIZE_T_FMT " and offset %" APR_SIZE_T_FMT " bytes for next read: `%s'", evnt, next, _log_escape(buf, next)); + error_log(LOG_DEBUG2, NULL, "Shifted buffer back %" APR_SIZE_T_FMT " and offset %" APR_SIZE_T_FMT " bytes for next read: `%s'", evnt, next, _log_escape(tmp_pool, buf, next)); evnt = 0; } @@ -1809,10 +1839,10 @@ static void receive_loop(void) { */ if (drop_next) { - error_log(LOG_ERROR, NULL, "Event continuation too large, dropping it as well: `%s'", _log_escape(buf, PIPE_BUF_SIZE)); + error_log(LOG_ERROR, NULL, "Event continuation too large, dropping it as well: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE)); } else { - error_log(LOG_ERROR, NULL, "Event too large, dropping event: `%s'", _log_escape(buf, PIPE_BUF_SIZE)); + error_log(LOG_ERROR, NULL, "Event too large, dropping event: `%s'", _log_escape(tmp_pool, buf, PIPE_BUF_SIZE)); } /* Rewind buf and mark that we need to drop up to the next event */ @@ -1821,6 +1851,16 @@ static void receive_loop(void) { } nbytes = PIPE_BUF_SIZE - next; + + if (count++ > 1000) { + count = 0; + error_log(LOG_DEBUG, NULL, "Recycling tmp_pool."); + apr_pool_destroy(tmp_pool); + apr_pool_create(&tmp_pool, NULL); + } + else { + apr_pool_clear(tmp_pool); + } } /* Wait for queue to empty if specified */ @@ -1918,6 +1958,7 @@ int main(int argc, const char * const argv[]) { logc_pid = getpid(); apr_pool_create(&pool, NULL); + apr_pool_create(&recv_pool, NULL); apr_setup_signal_thread(); if (argc < 2) {