From 589aafdd1f28fb96d0be8a5ce1d6a0a78f586ca4 Mon Sep 17 00:00:00 2001 From: wiaamm Date: Tue, 27 May 2025 16:29:25 +0300 Subject: [PATCH] fix config.go file --- attachments/envoy/1.31/config.go | 98 ++++++++++++++++---------------- attachments/envoy/1.32/config.go | 98 ++++++++++++++++---------------- attachments/envoy/1.33/config.go | 98 ++++++++++++++++---------------- 3 files changed, 147 insertions(+), 147 deletions(-) diff --git a/attachments/envoy/1.31/config.go b/attachments/envoy/1.31/config.go index a2c9687..748ee30 100755 --- a/attachments/envoy/1.31/config.go +++ b/attachments/envoy/1.31/config.go @@ -60,7 +60,6 @@ func getEnvoyConcurrency() int { api.LogWarnf("using number of CPU cores") return runtime.NumCPU() } - defer resp.Body.Close() var conc_number string @@ -91,9 +90,9 @@ func getEnvoyConcurrency() int { } func configurationServer() { - r := chi.NewRouter() + r := chi.NewRouter() - r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { + r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() worker_ids := make([]int, 0) @@ -137,7 +136,7 @@ func configurationServer() { workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" } - response, err := json.Marshal(workers_reload_status) + response, err := json.Marshal(workers_reload_status) if err != nil { api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) response = []byte(`{"error": "Internal Error"}`) @@ -147,23 +146,22 @@ func configurationServer() { w.WriteHeader(http.StatusInternalServerError) } - w.Header().Set("Content-Type", "application/json") - w.Write(response) - }) + w.Header().Set("Content-Type", "application/json") + w.Write(response) + }) - http.ListenAndServe(":8119", r) + http.ListenAndServe(":8119", r) } func init() { last_keep_alive = time.Time{} - envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) - + envoyHttp.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{}) go configurationServer() } -type config struct{} +type config struct {} -type parser struct{} +type parser struct {} func sendKeepAlive() { for { @@ -178,14 +176,14 @@ func sendKeepAlive() { } func (p *parser) initFilterStructs() *filterRequestStructs { - return &filterRequestStructs{ - http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), - http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), - http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), - http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), - http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), - http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), - attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), + return &filterRequestStructs { + http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), + http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), + http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), + http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), + http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), + http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), + attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), } } @@ -226,7 +224,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int //mutex.Unlock() } - go func() { + go func (){ sendKeepAlive() }() @@ -242,43 +240,45 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} { return &newConfig } -func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { +func ConfigFactory(c interface{}) api.StreamFilterFactory { conf, ok := c.(*config) if !ok { panic("unexpected config type") } - worker_thread_id := int(C.get_thread_id()) - api.LogDebugf("worker_thread_id: %d", worker_thread_id) - if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { - api.LogDebugf("need to add new thread to the map") - map_size := len(attachment_to_thread_mapping) - if map_size < len(attachments_map) { - attachment_to_thread_mapping[map_size] = worker_thread_id - thread_to_attachment_mapping[worker_thread_id] = map_size - api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) - api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) - api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) - } else { - panic("unexpected thread id") + return func(callbacks api.FilterCallbackHandler) api.StreamFilter { + worker_thread_id := int(C.get_thread_id()) + api.LogDebugf("worker_thread_id: %d", worker_thread_id) + if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { + api.LogDebugf("need to add new thread to the map") + map_size := len(attachment_to_thread_mapping) + if map_size < len(attachments_map) { + attachment_to_thread_mapping[map_size] = worker_thread_id + thread_to_attachment_mapping[worker_thread_id] = map_size + api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) + api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) + api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) + } else { + panic("unexpected thread id") + } } - } - worker_id := thread_to_attachment_mapping[int(worker_thread_id)] - api.LogDebugf("worker_id: %d", worker_id) + worker_id := thread_to_attachment_mapping[int(worker_thread_id)] + api.LogDebugf("worker_id: %d", worker_id) - filter_id.Add(1) - session_id := filter_id.Load() - attachment_ptr := attachments_map[worker_id] - session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) + filter_id.Add(1) + session_id := filter_id.Load() + attachment_ptr := attachments_map[worker_id] + session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) - return &filter{ - callbacks: callbacks, - config: conf, - session_id: session_id, - cp_attachment: attachment_ptr, - session_data: session_data, - request_structs: attachment_to_filter_request_structs[worker_id], + return &filter{ + callbacks: callbacks, + config: conf, + session_id: session_id, + cp_attachment: attachment_ptr, + session_data: session_data, + request_structs: attachment_to_filter_request_structs[worker_id], + } } } diff --git a/attachments/envoy/1.32/config.go b/attachments/envoy/1.32/config.go index a2c9687..748ee30 100755 --- a/attachments/envoy/1.32/config.go +++ b/attachments/envoy/1.32/config.go @@ -60,7 +60,6 @@ func getEnvoyConcurrency() int { api.LogWarnf("using number of CPU cores") return runtime.NumCPU() } - defer resp.Body.Close() var conc_number string @@ -91,9 +90,9 @@ func getEnvoyConcurrency() int { } func configurationServer() { - r := chi.NewRouter() + r := chi.NewRouter() - r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { + r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() worker_ids := make([]int, 0) @@ -137,7 +136,7 @@ func configurationServer() { workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" } - response, err := json.Marshal(workers_reload_status) + response, err := json.Marshal(workers_reload_status) if err != nil { api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) response = []byte(`{"error": "Internal Error"}`) @@ -147,23 +146,22 @@ func configurationServer() { w.WriteHeader(http.StatusInternalServerError) } - w.Header().Set("Content-Type", "application/json") - w.Write(response) - }) + w.Header().Set("Content-Type", "application/json") + w.Write(response) + }) - http.ListenAndServe(":8119", r) + http.ListenAndServe(":8119", r) } func init() { last_keep_alive = time.Time{} - envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) - + envoyHttp.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{}) go configurationServer() } -type config struct{} +type config struct {} -type parser struct{} +type parser struct {} func sendKeepAlive() { for { @@ -178,14 +176,14 @@ func sendKeepAlive() { } func (p *parser) initFilterStructs() *filterRequestStructs { - return &filterRequestStructs{ - http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), - http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), - http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), - http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), - http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), - http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), - attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), + return &filterRequestStructs { + http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), + http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), + http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), + http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), + http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), + http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), + attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), } } @@ -226,7 +224,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int //mutex.Unlock() } - go func() { + go func (){ sendKeepAlive() }() @@ -242,43 +240,45 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} { return &newConfig } -func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { +func ConfigFactory(c interface{}) api.StreamFilterFactory { conf, ok := c.(*config) if !ok { panic("unexpected config type") } - worker_thread_id := int(C.get_thread_id()) - api.LogDebugf("worker_thread_id: %d", worker_thread_id) - if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { - api.LogDebugf("need to add new thread to the map") - map_size := len(attachment_to_thread_mapping) - if map_size < len(attachments_map) { - attachment_to_thread_mapping[map_size] = worker_thread_id - thread_to_attachment_mapping[worker_thread_id] = map_size - api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) - api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) - api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) - } else { - panic("unexpected thread id") + return func(callbacks api.FilterCallbackHandler) api.StreamFilter { + worker_thread_id := int(C.get_thread_id()) + api.LogDebugf("worker_thread_id: %d", worker_thread_id) + if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { + api.LogDebugf("need to add new thread to the map") + map_size := len(attachment_to_thread_mapping) + if map_size < len(attachments_map) { + attachment_to_thread_mapping[map_size] = worker_thread_id + thread_to_attachment_mapping[worker_thread_id] = map_size + api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) + api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) + api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) + } else { + panic("unexpected thread id") + } } - } - worker_id := thread_to_attachment_mapping[int(worker_thread_id)] - api.LogDebugf("worker_id: %d", worker_id) + worker_id := thread_to_attachment_mapping[int(worker_thread_id)] + api.LogDebugf("worker_id: %d", worker_id) - filter_id.Add(1) - session_id := filter_id.Load() - attachment_ptr := attachments_map[worker_id] - session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) + filter_id.Add(1) + session_id := filter_id.Load() + attachment_ptr := attachments_map[worker_id] + session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) - return &filter{ - callbacks: callbacks, - config: conf, - session_id: session_id, - cp_attachment: attachment_ptr, - session_data: session_data, - request_structs: attachment_to_filter_request_structs[worker_id], + return &filter{ + callbacks: callbacks, + config: conf, + session_id: session_id, + cp_attachment: attachment_ptr, + session_data: session_data, + request_structs: attachment_to_filter_request_structs[worker_id], + } } } diff --git a/attachments/envoy/1.33/config.go b/attachments/envoy/1.33/config.go index a2c9687..748ee30 100755 --- a/attachments/envoy/1.33/config.go +++ b/attachments/envoy/1.33/config.go @@ -60,7 +60,6 @@ func getEnvoyConcurrency() int { api.LogWarnf("using number of CPU cores") return runtime.NumCPU() } - defer resp.Body.Close() var conc_number string @@ -91,9 +90,9 @@ func getEnvoyConcurrency() int { } func configurationServer() { - r := chi.NewRouter() + r := chi.NewRouter() - r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { + r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() worker_ids := make([]int, 0) @@ -137,7 +136,7 @@ func configurationServer() { workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" } - response, err := json.Marshal(workers_reload_status) + response, err := json.Marshal(workers_reload_status) if err != nil { api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) response = []byte(`{"error": "Internal Error"}`) @@ -147,23 +146,22 @@ func configurationServer() { w.WriteHeader(http.StatusInternalServerError) } - w.Header().Set("Content-Type", "application/json") - w.Write(response) - }) + w.Header().Set("Content-Type", "application/json") + w.Write(response) + }) - http.ListenAndServe(":8119", r) + http.ListenAndServe(":8119", r) } func init() { last_keep_alive = time.Time{} - envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) - + envoyHttp.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{}) go configurationServer() } -type config struct{} +type config struct {} -type parser struct{} +type parser struct {} func sendKeepAlive() { for { @@ -178,14 +176,14 @@ func sendKeepAlive() { } func (p *parser) initFilterStructs() *filterRequestStructs { - return &filterRequestStructs{ - http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), - http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), - http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), - http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), - http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), - http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), - attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), + return &filterRequestStructs { + http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), + http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), + http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), + http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), + http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), + http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), + attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), } } @@ -226,7 +224,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int //mutex.Unlock() } - go func() { + go func (){ sendKeepAlive() }() @@ -242,43 +240,45 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} { return &newConfig } -func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { +func ConfigFactory(c interface{}) api.StreamFilterFactory { conf, ok := c.(*config) if !ok { panic("unexpected config type") } - worker_thread_id := int(C.get_thread_id()) - api.LogDebugf("worker_thread_id: %d", worker_thread_id) - if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { - api.LogDebugf("need to add new thread to the map") - map_size := len(attachment_to_thread_mapping) - if map_size < len(attachments_map) { - attachment_to_thread_mapping[map_size] = worker_thread_id - thread_to_attachment_mapping[worker_thread_id] = map_size - api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) - api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) - api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) - } else { - panic("unexpected thread id") + return func(callbacks api.FilterCallbackHandler) api.StreamFilter { + worker_thread_id := int(C.get_thread_id()) + api.LogDebugf("worker_thread_id: %d", worker_thread_id) + if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { + api.LogDebugf("need to add new thread to the map") + map_size := len(attachment_to_thread_mapping) + if map_size < len(attachments_map) { + attachment_to_thread_mapping[map_size] = worker_thread_id + thread_to_attachment_mapping[worker_thread_id] = map_size + api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) + api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) + api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) + } else { + panic("unexpected thread id") + } } - } - worker_id := thread_to_attachment_mapping[int(worker_thread_id)] - api.LogDebugf("worker_id: %d", worker_id) + worker_id := thread_to_attachment_mapping[int(worker_thread_id)] + api.LogDebugf("worker_id: %d", worker_id) - filter_id.Add(1) - session_id := filter_id.Load() - attachment_ptr := attachments_map[worker_id] - session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) + filter_id.Add(1) + session_id := filter_id.Load() + attachment_ptr := attachments_map[worker_id] + session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) - return &filter{ - callbacks: callbacks, - config: conf, - session_id: session_id, - cp_attachment: attachment_ptr, - session_data: session_data, - request_structs: attachment_to_filter_request_structs[worker_id], + return &filter{ + callbacks: callbacks, + config: conf, + session_id: session_id, + cp_attachment: attachment_ptr, + session_data: session_data, + request_structs: attachment_to_filter_request_structs[worker_id], + } } }