From e91c4bd28b251f97f443e06c42a3037ab39f3fee Mon Sep 17 00:00:00 2001 From: Daniel Eisenberg Date: Mon, 18 Aug 2025 20:52:30 +0300 Subject: [PATCH] add envoy 1.34 --- attachments/envoy/1.34/CMakeLists.txt | 33 ++ attachments/envoy/1.34/build_template | 13 + attachments/envoy/1.34/config.go | 282 +++++++++++++++ attachments/envoy/1.34/filter.go | 495 ++++++++++++++++++++++++++ attachments/envoy/1.34/go.sum | 23 ++ attachments/envoy/1.34/utils.go | 108 ++++++ attachments/envoy/CMakeLists.txt | 3 +- 7 files changed, 956 insertions(+), 1 deletion(-) create mode 100755 attachments/envoy/1.34/CMakeLists.txt create mode 100755 attachments/envoy/1.34/build_template create mode 100755 attachments/envoy/1.34/config.go create mode 100755 attachments/envoy/1.34/filter.go create mode 100755 attachments/envoy/1.34/go.sum create mode 100755 attachments/envoy/1.34/utils.go diff --git a/attachments/envoy/1.34/CMakeLists.txt b/attachments/envoy/1.34/CMakeLists.txt new file mode 100755 index 0000000..832a6de --- /dev/null +++ b/attachments/envoy/1.34/CMakeLists.txt @@ -0,0 +1,33 @@ +if(CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND ATTACHMENT_TYPE STREQUAL "envoy") + set(ATTACHMENTS_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/core/include/attachments) + set(NANO_ATTACHMENT_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/attachments/nano_attachment) + set(SHMEM_LIBRARY_DIR ${CMAKE_BINARY_DIR}/core/shmem_ipc_2) + set(NANO_ATTACHMENT_LIBRARY_DIR ${CMAKE_BINARY_DIR}/attachments/nano_attachment) + set(NANO_ATTACHMENT_UTIL_LIBRARY_DIR ${CMAKE_BINARY_DIR}/attachments/nano_attachment/nano_attachment_util) + set(LIBRARIES "-lnano_attachment -lnano_attachment_util -lshmem_ipc_2") + set(ENVOY_ATTACHMENT_DIR ${CMAKE_CURRENT_SOURCE_DIR}) + + get_filename_component(CURRENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} NAME) + + # Configure the build.sh script from the template + configure_file( + ${PROJECT_SOURCE_DIR}/attachments/envoy/${CURRENT_DIR}/build_template + ${CMAKE_BINARY_DIR}/attachments/envoy/${CURRENT_DIR}/build.sh + @ONLY + ) + + # Define a custom command to run the bash script + add_custom_target( + envoy_attachment${CURRENT_DIR} ALL + COMMAND chmod +x ${CMAKE_BINARY_DIR}/attachments/envoy/${CURRENT_DIR}/build.sh + COMMAND ${CMAKE_BINARY_DIR}/attachments/envoy/${CURRENT_DIR}/build.sh + WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/attachments/envoy + COMMENT "Building envoy attachment ${CURRENT_DIR}" + ) + + add_dependencies(envoy_attachment${CURRENT_DIR} shmem_ipc_2 nano_attachment nano_attachment_util) + + install(FILES libenvoy_attachment.so DESTINATION ${CMAKE_BINARY_DIR}/attachments/envoy/${CURRENT_DIR} PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ) + install(FILES libenvoy_attachment.so DESTINATION envoy/${CURRENT_DIR} PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ) +endif() + diff --git a/attachments/envoy/1.34/build_template b/attachments/envoy/1.34/build_template new file mode 100755 index 0000000..d54bb36 --- /dev/null +++ b/attachments/envoy/1.34/build_template @@ -0,0 +1,13 @@ +#!/bin/bash + +# Set environment variables +SHMEM_LIBRARY_DIR="@SHMEM_LIBRARY_DIR@" +NANO_ATTACHMENT_LIBRARY_DIR="@NANO_ATTACHMENT_LIBRARY_DIR@" +NANO_ATTACHMENT_UTIL_LIBRARY_DIR="@NANO_ATTACHMENT_UTIL_LIBRARY_DIR@" +LIBRARIES="@LIBRARIES@" +ENVOY_ATTACHMENT_DIR="@ENVOY_ATTACHMENT_DIR@" + +cd $ENVOY_ATTACHMENT_DIR + +# Run the go build command +CGO_CFLAGS="-I@ATTACHMENTS_INCLUDE_DIR@ -I@NANO_ATTACHMENT_INCLUDE_DIR@" go build -o ${ENVOY_ATTACHMENT_DIR}/libenvoy_attachment.so -buildmode=c-shared -ldflags="-extldflags '-L${SHMEM_LIBRARY_DIR} -L${NANO_ATTACHMENT_LIBRARY_DIR} -L${NANO_ATTACHMENT_UTIL_LIBRARY_DIR} ${LIBRARIES}'" diff --git a/attachments/envoy/1.34/config.go b/attachments/envoy/1.34/config.go new file mode 100755 index 0000000..a0db7ba --- /dev/null +++ b/attachments/envoy/1.34/config.go @@ -0,0 +1,282 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "runtime" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/go-chi/chi/v5" + + xds "github.com/cncf/xds/go/xds/type/v3" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http" +) + +/* +#include + +unsigned long get_thread_id() { + return (unsigned long)pthread_self(); +} + +#include "nano_attachment_common.h" +#include "nano_initializer.h" +#include "nano_attachment.h" +*/ +import "C" + +const Name = "cp_nano_filter" +const admin_api_server_info = "http://127.0.0.1:%s/server_info" +const keep_alive_interval = 10 * time.Second + +var filter_id atomic.Int64 +var attachments_map map[int]*nano_attachment = nil +var thread_to_attachment_mapping map[int]int = nil +var attachment_to_thread_mapping map[int]int = nil +var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil +var mutex sync.Mutex +var last_keep_alive time.Time + +type nano_attachment C.struct_NanoAttachment + +// EnvoyServerInfo represents the structure of the JSON response from /server_info +type EnvoyServerInfo struct { + Concurrency int `json:"concurrency"` +} + +func getEnvoyConcurrency() int { + concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") + + if concurrency_method == "numOfCores" { + api.LogWarnf("using number of CPU cores") + return runtime.NumCPU() + } + + var conc_number string + + switch concurrency_method { + case "istioCpuLimit": + conc_number = getEnv("ISTIO_CPU_LIMIT", "-1") + api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number) + case "custom": + conc_number = getEnv("CONCURRENCY_NUMBER", "-1") + api.LogWarnf("using custom concurrency number, conc_number %s", conc_number) + default: + api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method) + return runtime.NumCPU() + } + + if conc_number == "-1" { + api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores") + return runtime.NumCPU() + } + + conc_num, err := strconv.Atoi(conc_number) + if err != nil || conc_num <= 0 { + api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number) + return runtime.NumCPU() + } + + return conc_num +} + +func configurationServer() { + r := chi.NewRouter() + + r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { + mutex.Lock() + defer mutex.Unlock() + worker_ids := make([]int, 0) + workersParam := r.URL.Query().Get("workers") + num_of_workers := len(attachments_map) // concurrency + if workersParam == "" { + for i := 0; i < num_of_workers; i++ { + worker_ids = append(worker_ids, i) + } + } else { + workers := strings.Split(workersParam, ",") + for _, worker := range workers { + worker_id, err := strconv.Atoi(worker) + + if worker_id >= num_of_workers { + api.LogWarnf( + "Can not load configuration of invalid worker ID %d. worker ID should be lower than: %d", + worker_id, + num_of_workers) + } + + if err != nil || worker_id >= num_of_workers { + w.WriteHeader(http.StatusBadRequest) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(fmt.Sprintf(`{"error": "invalid worker ID: %s"}`, worker))) + return + } + worker_ids = append(worker_ids, worker_id) + } + } + + workers_reload_status := make(map[string]string, len(worker_ids)) + res := C.NANO_OK + for _, worker_id := range worker_ids { + worker_reload_res := C.RestartAttachmentConfiguration((*C.NanoAttachment)(attachments_map[worker_id])) + if worker_reload_res == C.NANO_ERROR { + res = C.NANO_ERROR + workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Failed" + continue + } + workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" + } + + response, err := json.Marshal(workers_reload_status) + if err != nil { + api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) + response = []byte(`{"error": "Internal Error"}`) + } + + if res == C.NANO_ERROR || err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + + w.Header().Set("Content-Type", "application/json") + w.Write(response) + }) + + http.ListenAndServe(":8119", r) +} + +func init() { + last_keep_alive = time.Time{} + envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) + go configurationServer() +} + +type config struct {} + +type parser struct {} + +func sendKeepAlive() { + for { + attachment_ptr := (*C.NanoAttachment)(attachments_map[0]) + if attachment_ptr == nil { + return + } + + C.SendKeepAlive(attachment_ptr) + time.Sleep(30 * time.Second) + } +} + +func (p *parser) initFilterStructs() *filterRequestStructs { + return &filterRequestStructs { + http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), + http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), + http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), + http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), + http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), + http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), + attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), + } +} + +// Parse the filter configuration. We can call the ConfigCallbackHandler to control the filter's +// behavior +func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) { + conf := &config{} + + if attachments_map != nil { + api.LogInfof("Waf Configuration already loaded") + return conf, nil + } + + num_of_workers := getEnvoyConcurrency() + + configStruct := &xds.TypedStruct{} + if err := any.UnmarshalTo(configStruct); err != nil { + return nil, err + } + + attachments_map = make(map[int]*nano_attachment) + attachment_to_filter_request_structs = make(map[int]*filterRequestStructs) + attachment_to_thread_mapping = make(map[int]int, 0) + thread_to_attachment_mapping = make(map[int]int, 0) + api.LogInfof("Number of worker threds: %d", num_of_workers) + for worker_id := 0; worker_id < num_of_workers; worker_id++ { + + attachment := C.InitNanoAttachment(C.uint8_t(0), C.int(worker_id), C.int(num_of_workers), C.int(C.fileno(C.stdout))) + for attachment == nil { + api.LogWarnf("attachment is nill going to sleep for two seconds and retry") + time.Sleep(2 * time.Second) + attachment = C.InitNanoAttachment(C.uint8_t(0), C.int(worker_id), C.int(num_of_workers), C.int(C.fileno(C.stdout))) + } + + //mutex.Lock() + attachments_map[worker_id] = (*nano_attachment)(attachment) + attachment_to_filter_request_structs[worker_id] = p.initFilterStructs() + //mutex.Unlock() + } + + go func (){ + sendKeepAlive() + }() + + return conf, nil +} + +// Merge configuration from the inherited parent configuration +func (p *parser) Merge(parent interface{}, child interface{}) interface{} { + parentConfig := parent.(*config) + + // copy one, do not update parentConfig directly. + newConfig := *parentConfig + return &newConfig +} + +func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { + conf, ok := c.(*config) + if !ok { + panic("unexpected config type") + } + + worker_thread_id := int(C.get_thread_id()) + api.LogDebugf("worker_thread_id: %d", worker_thread_id) + if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { + api.LogDebugf("need to add new thread to the map") + map_size := len(attachment_to_thread_mapping) + if map_size < len(attachments_map) { + attachment_to_thread_mapping[map_size] = worker_thread_id + thread_to_attachment_mapping[worker_thread_id] = map_size + api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) + api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) + api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) + } else { + panic("unexpected thread id") + } + } + + worker_id := thread_to_attachment_mapping[int(worker_thread_id)] + api.LogDebugf("worker_id: %d", worker_id) + + filter_id.Add(1) + session_id := filter_id.Load() + attachment_ptr := attachments_map[worker_id] + session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) + + return &filter{ + callbacks: callbacks, + config: conf, + session_id: session_id, + cp_attachment: attachment_ptr, + session_data: session_data, + request_structs: attachment_to_filter_request_structs[worker_id], + } +} + +func main() {} \ No newline at end of file diff --git a/attachments/envoy/1.34/filter.go b/attachments/envoy/1.34/filter.go new file mode 100755 index 0000000..1a47336 --- /dev/null +++ b/attachments/envoy/1.34/filter.go @@ -0,0 +1,495 @@ +package main + +/* +#include + +unsigned long get_thread_id_2() { + return (unsigned long)pthread_self(); +} + +#include +#include +#include "nano_attachment_common.h" +#include "nano_attachment.h" + +HttpHeaderData* createHttpHeaderDataArray(int size) { + return (HttpHeaderData*)malloc(size * sizeof(HttpHeaderData)); +} + +HttpMetaData* createHttpMetaData() { + return (HttpMetaData*)malloc(sizeof(HttpMetaData)); +} + +void setHeaderElement(HttpHeaderData* arr, int index, nano_str_t key, nano_str_t value) { + if (arr == NULL) { + return; + } + + arr[index].key = key; + arr[index].value = value; +} +*/ +import "C" +import ( + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + + "strconv" + "strings" + "unsafe" +) + +func convertBlockPageToString(block_page C.BlockPageData) string { + block_page_size := block_page.title_prefix.len + + block_page.title.len + + block_page.body_prefix.len + + block_page.body.len + + block_page.uuid_prefix.len + + block_page.uuid.len + + block_page.uuid_suffix.len + + block_page_bytes := make([]byte, block_page_size) + + location := 0 + location = copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.title_prefix.data), + C.size_t(block_page.title_prefix.len), + location) + + location = copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.title.data), + C.size_t(block_page.title.len), + location) + + location = copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.body_prefix.data), + C.size_t(block_page.body_prefix.len), + location) + + location = copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.body.data), + C.size_t(block_page.body.len), + location) + + location = copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.uuid_prefix.data), + C.size_t(block_page.uuid_prefix.len), + location) + + location = copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.uuid.data), + C.size_t(block_page.uuid.len), + location) + + copyToSlice( + block_page_bytes, + unsafe.Pointer(block_page.uuid_suffix.data), + C.size_t(block_page.uuid_suffix.len), + location) + + return string(block_page_bytes) +} + +// The callbacks in the filter, like `DecodeHeaders`, can be implemented on demand. +// Because api.PassThroughStreamFilter provides a default implementation. +type filter struct { + api.PassThroughStreamFilter + + callbacks api.FilterCallbackHandler + path string + config *config + session_id int64 + session_data *C.HttpSessionData + cp_attachment *nano_attachment + request_structs *filterRequestStructs + body_buffer_chunk int +} + +type filterRequestStructs struct { + http_start_data *C.HttpRequestFilterData + http_meta_data *C.HttpMetaData + http_headers *C.HttpHeaders + http_headers_data *C.HttpHeaderData + http_res_headers *C.ResHttpHeaders + http_body_data *C.nano_str_t + attachment_data *C.AttachmentData +} + +func (f *filterRequestStructs) ZeroInitialize() { + if f.http_start_data != nil { + C.memset(unsafe.Pointer(f.http_start_data), 0, C.size_t(unsafe.Sizeof(*f.http_start_data))) + } + if f.http_meta_data != nil { + C.memset(unsafe.Pointer(f.http_meta_data), 0, C.size_t(unsafe.Sizeof(*f.http_meta_data))) + } + if f.http_headers != nil { + C.memset(unsafe.Pointer(f.http_headers), 0, C.size_t(unsafe.Sizeof(*f.http_headers))) + } + if f.http_headers_data != nil { + C.memset(unsafe.Pointer(f.http_headers_data), 0, C.size_t(unsafe.Sizeof(*f.http_headers_data))) + } + if f.attachment_data != nil { + C.memset(unsafe.Pointer(f.attachment_data), 0, C.size_t(unsafe.Sizeof(*f.attachment_data))) + } +} + +func (f *filter) isSessionFinalized() bool { + return C.IsSessionFinalized((*C.NanoAttachment)(f.cp_attachment), (*C.HttpSessionData)(f.session_data)) == 1 +} + +func (f *filter) sendData(data unsafe.Pointer, chunkType C.HttpChunkType) C.AttachmentVerdictResponse { + + attachment_data := f.request_structs.attachment_data + attachment_data.session_id = C.uint32_t(f.session_id) + attachment_data.chunk_type = chunkType // Adjust type as needed + attachment_data.session_data = f.session_data // Ensure `f.session_data` is compatible + attachment_data.data = C.DataBuffer(data) // Ensure `data` is compatible with `C.DataBuffer` + + return C.SendDataNanoAttachment((*C.NanoAttachment)(f.cp_attachment), attachment_data) +} + +func (f *filter) handleCustomResponse(verdict_response *C.AttachmentVerdictResponse) api.StatusType { + if verdict_response.web_response_data.web_response_type == C.RESPONSE_CODE_ONLY { + response_code := C.GetResponseCode((*C.AttachmentVerdictResponse)(verdict_response)) + return f.sendLocalReplyInternal(int(response_code), "", nil) + } + + if verdict_response.web_response_data.web_response_type == C.CUSTOM_WEB_RESPONSE { + headers := map[string][]string{ + "Content-Type": []string{"text/html"}, + } + block_page_parts := C.GetBlockPage( + (*C.NanoAttachment)(f.cp_attachment), + (*C.HttpSessionData)(f.session_data), + (*C.AttachmentVerdictResponse)(verdict_response)) + return f.sendLocalReplyInternal(int(block_page_parts.response_code), convertBlockPageToString(block_page_parts), headers) + } + + redirect_data := C.GetRedirectPage( + (*C.NanoAttachment)(f.cp_attachment), + (*C.HttpSessionData)(f.session_data), + (*C.AttachmentVerdictResponse)(verdict_response)) + redirect_location := redirect_data.redirect_location + + redirect_location_slice := unsafe.Slice((*byte)(unsafe.Pointer(redirect_location.data)), redirect_location.len) + headers := map[string][]string{ + "Location": []string{string(redirect_location_slice)}, + } + + return f.sendLocalReplyInternal(307, "", headers) +} + +func (f *filter) finalizeRequest(verdict_response *C.AttachmentVerdictResponse) api.StatusType { + if C.AttachmentVerdict(verdict_response.verdict) == C.ATTACHMENT_VERDICT_DROP { + return f.handleCustomResponse(verdict_response) + } + + return api.Continue +} + +func (f *filter) handleHeaders(header api.HeaderMap) { + const envoy_headers_prefix = "x-envoy" + i := 0 + header.Range(func(key, value string) bool { + if i > 10000 { + return true + } + + api.LogInfof("inserting headers: key %s, value %s", key, value) + + if strings.HasPrefix(key, envoy_headers_prefix) || + key == "x-request-id" || + key == ":method" || + key == ":path" || + key == ":scheme" || + key == "x-forwarded-proto" { + return true + } + + if key == ":authority" { + key = "Host" + } + + key_nano_str := createNanoStrWithoutCopy(key) + value_nano_str := createNanoStrWithoutCopy(value) + C.setHeaderElement((*C.HttpHeaderData)(f.request_structs.http_headers_data), C.int(i), key_nano_str, value_nano_str) + i++ + return true + }) + + http_headers := f.request_structs.http_headers + http_headers.data = f.request_structs.http_headers_data + http_headers.headers_count = C.size_t(i) +} + +func (f *filter) sendBody(buffer api.BufferInstance, is_req bool) C.AttachmentVerdictResponse { + chunk_type := C.HTTP_REQUEST_BODY + if !is_req { + chunk_type = C.HTTP_RESPONSE_BODY + } + + data := buffer.Bytes() + data_len := len(data) + buffer_size := 8 * 1024 + + num_of_buffers := ((data_len - 1) / buffer_size) + 1 + + // TO DO: FIX THIS ASAP + if num_of_buffers > 10000 { + num_of_buffers = 10000 + } + + + for i := 0; i < num_of_buffers; i++ { + nanoStrPtr := (*C.nano_str_t)(unsafe.Pointer(uintptr(unsafe.Pointer(f.request_structs.http_body_data)) + uintptr(i)*unsafe.Sizeof(*f.request_structs.http_body_data))) + nanoStrPtr.data = (*C.uchar)(unsafe.Pointer(&data[i * buffer_size])) + + if i + 1 == num_of_buffers { + nanoStrPtr.len = C.size_t(data_len - (i * buffer_size)) + } else { + nanoStrPtr.len = C.size_t(buffer_size) + } + + } + + http_chunks_array := C.HttpBody{ + data: f.request_structs.http_body_data, + bodies_count: C.size_t(num_of_buffers), + } + + api.LogInfof("sending body data: %+v", http_chunks_array) + return f.sendData(unsafe.Pointer(&http_chunks_array), C.HttpChunkType(chunk_type)) + +} + +func (f *filter) sendStartTransaction(start_transaction_data *C.HttpRequestFilterData) C.AttachmentVerdictResponse { + return f.sendData(unsafe.Pointer(&start_transaction_data), C.HTTP_REQUEST_FILTER) +} + +func (f *filter) handleStartTransaction(header api.RequestHeaderMap) { + stream_info := f.callbacks.StreamInfo() + + ip_location := 0 + port_location := 1 + + listening_address := stream_info.DownstreamLocalAddress() + listening_address_arr := strings.Split(listening_address, ":") + listening_port, _ := strconv.Atoi(listening_address_arr[port_location]) + + client_address := stream_info.DownstreamRemoteAddress() + client_addr_arr := strings.Split(client_address, ":") + client_port, _ := strconv.Atoi(client_addr_arr[port_location]) + + host := strings.Split(header.Host(), ":")[0] + + protocol, _ := stream_info.Protocol() + + // init start transaction struct + meta_data := f.request_structs.http_meta_data + meta_data.http_protocol = createNanoStr(protocol) + meta_data.method_name = createNanoStr(header.Method()) + meta_data.host = createNanoStr(host) + meta_data.listening_ip = createNanoStr(listening_address_arr[ip_location]) + meta_data.listening_port = C.uint16_t(listening_port) + meta_data.uri = createNanoStr(header.Path()) + meta_data.client_ip = createNanoStr(client_addr_arr[ip_location]) + meta_data.client_port = C.uint16_t(client_port) +} + +func (f *filter) sendLocalReplyInternal(ret_code int, custom_response string, headers map[string][]string) api.StatusType { + f.callbacks.DecoderFilterCallbacks().SendLocalReply(ret_code, custom_response, headers, 0, "") + return api.LocalReply +} + +func (f *filter) endInspectionPart(chunk_type C.HttpChunkType) api.StatusType { + api.LogInfof("Ending inspection for current chunk") + res := f.sendData(nil, chunk_type) + + if C.AttachmentVerdict(res.verdict) != C.ATTACHMENT_VERDICT_INSPECT { + api.LogInfof("got final verict: %v", res.verdict) + return f.finalizeRequest(&res) + } + + return api.Continue +} + +// Callbacks which are called in request path +// The endStream is true if the request doesn't have body +func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType { + ret := api.Continue + + defer RecoverPanic(&ret) + + if f.isSessionFinalized() { + api.LogInfof("session has already been inspected, no need for further inspection") + return api.Continue + } + + f.handleStartTransaction(header) + f.handleHeaders(header) + + http_start_data := f.request_structs.http_start_data + http_start_data.meta_data = f.request_structs.http_meta_data + http_start_data.req_headers = f.request_structs.http_headers + http_start_data.contains_body = C.bool(!endStream) + + res := f.sendData(unsafe.Pointer(http_start_data), C.HTTP_REQUEST_FILTER) + if C.AttachmentVerdict(res.verdict) != C.ATTACHMENT_VERDICT_INSPECT { + api.LogInfof("got final verict: %v", res.verdict) + return f.finalizeRequest(&res) + } + + return ret +} + +// DecodeData might be called multiple times during handling the request body. +// The endStream is true when handling the last piece of the body. +func (f *filter) DecodeData(buffer api.BufferInstance, endStream bool) api.StatusType { + ret := api.Continue + + defer RecoverPanic(&ret) + + if f.isSessionFinalized() { + return api.Continue + } + + if endStream && buffer.Len() == 0 { + return f.endInspectionPart(C.HttpChunkType(C.HTTP_REQUEST_END)) + } + + if buffer.Len() == 0 { + return ret + } + + res := f.sendBody(buffer, true) + if C.AttachmentVerdict(res.verdict) != C.ATTACHMENT_VERDICT_INSPECT { + api.LogInfof("got final verict: %v", res.verdict) + return f.finalizeRequest(&res) + } + + if endStream { + return f.endInspectionPart(C.HttpChunkType(C.HTTP_REQUEST_END)) + } + + return ret +} + +// Callbacks which are called in response path +// The endStream is true if the response doesn't have body +func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType { + ret := api.Continue + + defer RecoverPanic(&ret) + + if f.isSessionFinalized() { + return api.Continue + } + + const content_length_key = "content-length" + const status_code_key = ":status" + + + content_length_str, _ := header.Get(content_length_key) + status_code_str, _ := header.Get(status_code_key) + content_length, _ := strconv.Atoi(content_length_str) + status_code, _ := strconv.Atoi(status_code_str) + + f.handleHeaders(header) + res_http_headers := f.request_structs.http_res_headers + res_http_headers.headers = f.request_structs.http_headers + res_http_headers.content_length = C.uint64_t(content_length) + res_http_headers.response_code = C.uint16_t(status_code) + + res := f.sendData(unsafe.Pointer(res_http_headers), C.HTTP_RESPONSE_HEADER) + if C.AttachmentVerdict(res.verdict) != C.ATTACHMENT_VERDICT_INSPECT { + api.LogInfof("got final verict: %v", res.verdict) + return f.finalizeRequest(&res) + } + + if endStream { + return f.endInspectionPart(C.HttpChunkType(C.HTTP_RESPONSE_END)) + } + + return ret +} + +func injectBodyChunk( + curr_modification *C.struct_NanoHttpModificationList, + body_buffer_chunk int, + buffer *api.BufferInstance) { + for curr_modification != nil { + if (int(curr_modification.modification.orig_buff_index) == body_buffer_chunk) { + mod := curr_modification.modification // type: HttpInjectData + modifications := C.GoString(curr_modification.modification_buffer) + new_buffer:= insertAtPosition((*buffer).String(), modifications, int(mod.injection_pos)) + (*buffer).SetString(new_buffer) + } + curr_modification = curr_modification.next + } +} + +// EncodeData might be called multiple times during handling the response body. +// The endStream is true when handling the last piece of the body. +func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType { + ret := api.Continue + + defer RecoverPanic(&ret) + + if f.isSessionFinalized() { + return api.Continue + } + + if endStream && buffer.Len() == 0 { + return f.endInspectionPart(C.HttpChunkType(C.HTTP_RESPONSE_END)) + } + + if buffer.Len() == 0 { + return ret + } + + res := f.sendBody(buffer, false) + injectBodyChunk(res.modifications, f.body_buffer_chunk, &buffer) + f.body_buffer_chunk++ + if C.AttachmentVerdict(res.verdict) != C.ATTACHMENT_VERDICT_INSPECT { + api.LogInfof("got final verict: %v", res.verdict) + return f.finalizeRequest(&res) + } + + if endStream { + return f.endInspectionPart(C.HttpChunkType(C.HTTP_RESPONSE_END)) + } + + return ret +} + +// ____________NOT IMPLEMENTED AT THE MOMENT____________ +func (f *filter) DecodeTrailers(trailers api.RequestTrailerMap) api.StatusType { + // support suspending & resuming the filter in a background goroutine + return api.Continue +} + +func (f *filter) EncodeTrailers(trailers api.ResponseTrailerMap) api.StatusType { + return api.Continue +} + +// OnLog is called when the HTTP stream is ended on HTTP Connection Manager filter. +func (f *filter) OnLog(api.RequestHeaderMap, api.RequestTrailerMap, api.ResponseHeaderMap, api.ResponseTrailerMap) {} + +// OnLogDownstreamStart is called when HTTP Connection Manager filter receives a new HTTP request +// (required the corresponding access log type is enabled) +func (f *filter) OnLogDownstreamStart(api.RequestHeaderMap) {} + +// OnLogDownstreamPeriodic is called on any HTTP Connection Manager periodic log record +// (required the corresponding access log type is enabled) +func (f *filter) OnLogDownstreamPeriodic(api.RequestHeaderMap, api.RequestTrailerMap, api.ResponseHeaderMap, api.ResponseTrailerMap) {} + +func (f *filter) OnDestroy(reason api.DestroyReason) { + freeHttpMetaDataFields(f.request_structs.http_meta_data) + f.request_structs.ZeroInitialize() + C.FiniSessionData((*C.NanoAttachment)(f.cp_attachment), f.session_data) +} diff --git a/attachments/envoy/1.34/go.sum b/attachments/envoy/1.34/go.sum new file mode 100755 index 0000000..93a51f4 --- /dev/null +++ b/attachments/envoy/1.34/go.sum @@ -0,0 +1,23 @@ +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/envoyproxy/envoy v1.34.4 h1:C7icH8oLgy7Fx4A5AaljRrNBoLO7xeyN4XEJhYSlL1U= +github.com/envoyproxy/envoy v1.34.4/go.mod h1:A/vRPuqivdZBAr0NfT3sccV8KtY07B2PyvILAdV0qCU= +github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= +github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= +github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 h1:rcS6EyEaoCO52hQDupoSfrxI3R6C2Tq741is7X8OvnM= +google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917/go.mod h1:CmlNWB9lSezaYELKS5Ym1r44VrrbPUa7JTvw+6MbpJ0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= diff --git a/attachments/envoy/1.34/utils.go b/attachments/envoy/1.34/utils.go new file mode 100755 index 0000000..0d0c90e --- /dev/null +++ b/attachments/envoy/1.34/utils.go @@ -0,0 +1,108 @@ +package main + +/* +#include +#include "nano_attachment_common.h" +#include "nano_attachment.h" +#include +*/ +import "C" +import ( + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + + "reflect" + "unsafe" + "os" + "runtime" + "strconv" +) +func getEnv(key, defaultValue string) string { + value, exists := os.LookupEnv(key) + if !exists { + return defaultValue + } + return value +} + +var INSERT_POS_ERR_MSG = "Got invalid insertion position, will not insert." + +func copyToSlice(dest []byte, src unsafe.Pointer, size C.size_t, location int) int { + C.memcpy(unsafe.Pointer(&dest[location]), src, size) + return location + int(size) +} + +func newNanoStr(data []byte) *C.nano_str_t { + nanoStr := (*C.nano_str_t)(C.malloc(C.size_t(unsafe.Sizeof(C.nano_str_t{})))) + if nanoStr == nil { + panic("failed to allocate memory for nano_str_t struct") + } + + nanoStr.len = C.size_t(len(data)) + return nanoStr +} + +func insertAtPosition(buff string, injection string, pos int) string { + if pos < 0 || pos > len(buff) { + api.LogDebugf( + INSERT_POS_ERR_MSG + + " Position: " + + strconv.Itoa(pos) + + ", buffer's lenght: " + + strconv.Itoa(len(buff))) + return buff + } + return_buff := buff[:pos] + injection + buff[pos:] + return return_buff +} + +func createNanoStr(str string) C.nano_str_t { + c_str := C.CString(str) + nanoStr := C.nano_str_t{ + len: C.size_t(len(str)), + data: (*C.uchar)(unsafe.Pointer(c_str)), + } + + return nanoStr +} + +func createNanoStrWithoutCopy(str string) C.nano_str_t { + nanoStr := C.nano_str_t{ + len: C.size_t(len(str)), + data: (*C.uchar)(unsafe.Pointer((*(*reflect.StringHeader)(unsafe.Pointer(&str))).Data)), + } + + return nanoStr +} + +func freeNanoStr(str *C.nano_str_t) { + C.free(unsafe.Pointer(str.data)) +} + +func freeHttpMetaDataFields(meta_data *C.HttpMetaData) { + freeNanoStr(&(*meta_data).http_protocol) + freeNanoStr(&(*meta_data).method_name) + freeNanoStr(&(*meta_data).host) + freeNanoStr(&(*meta_data).listening_ip) + freeNanoStr(&(*meta_data).uri) + freeNanoStr(&(*meta_data).client_ip) +} + +func freeHeaders(header_arr *C.HttpHeaderData, header_slice []C.HttpHeaderData) { + C.free(unsafe.Pointer(header_arr)) + + for _, header := range header_slice { + freeNanoStr(&(header.key)) + freeNanoStr(&(header.value)) + } +} + +func RecoverPanic(ret *api.StatusType) { + if e := recover(); e != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + api.LogErrorf("http: panic serving: %v\n%s", e, buf) + + *ret = api.Continue + } +} diff --git a/attachments/envoy/CMakeLists.txt b/attachments/envoy/CMakeLists.txt index 0c68476..d0f9274 100755 --- a/attachments/envoy/CMakeLists.txt +++ b/attachments/envoy/CMakeLists.txt @@ -1,3 +1,4 @@ add_subdirectory(1.31) add_subdirectory(1.32) -add_subdirectory(1.33) \ No newline at end of file +add_subdirectory(1.33) +add_subdirectory(1.34)