diff --git a/attachments/envoy/1.31/config.go b/attachments/envoy/1.31/config.go index 0f210f8..a2c9687 100755 --- a/attachments/envoy/1.31/config.go +++ b/attachments/envoy/1.31/config.go @@ -35,53 +35,59 @@ unsigned long get_thread_id() { import "C" const Name = "cp_nano_filter" - -const admin_api_url = "http://127.0.0.1:%s/server_info" +const admin_api_server_info = "http://127.0.0.1:%s/server_info" +const keep_alive_interval = 10 * time.Second var filter_id atomic.Int64 - -type nano_attachment C.struct_NanoAttachment - var attachments_map map[int]*nano_attachment = nil var thread_to_attachment_mapping map[int]int = nil var attachment_to_thread_mapping map[int]int = nil - var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil - var mutex sync.Mutex - -const keep_alive_interval = 10 * time.Second - var last_keep_alive time.Time +type nano_attachment C.struct_NanoAttachment + // EnvoyServerInfo represents the structure of the JSON response from /server_info type EnvoyServerInfo struct { Concurrency int `json:"concurrency"` } -// getEnvoyConcurrency fetches and returns the concurrency level of Envoy from the admin API -func getEnvoyConcurrency(admin_api_address string) (int, error) { - resp, err := http.Get(admin_api_address) - if err != nil { - return 0, fmt.Errorf("failed to reach Envoy admin API: %w", err) +func getEnvoyConcurrency() int { + concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") + + if concurrency_method == "numOfCores" { + api.LogWarnf("using number of CPU cores") + return runtime.NumCPU() } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("unexpected status code from Envoy admin API: %d", resp.StatusCode) + var conc_number string + + switch concurrency_method { + case "istioCpuLimit": + conc_number = getEnv("ISTIO_CPU_LIMIT", "-1") + api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number) + case "custom": + conc_number = getEnv("CONCURRENCY_NUMBER", "-1") + api.LogWarnf("using custom concurrency number, conc_number %s", conc_number) + default: + api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method) + return runtime.NumCPU() } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to read response body: %w", err) + if conc_number == "-1" { + api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores") + return runtime.NumCPU() } - var info EnvoyServerInfo - if err := json.Unmarshal(body, &info); err != nil { - return 0, fmt.Errorf("failed to parse JSON response: %w", err) + conc_num, err := strconv.Atoi(conc_number) + if err != nil || conc_num <= 0 { + api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number) + return runtime.NumCPU() } - return info.Concurrency, nil + return conc_num } func configurationServer() { @@ -193,37 +199,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int return conf, nil } - var num_of_workers int - concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") - - if concurrency_method == "numOfCores" { - num_of_workers = runtime.NumCPU() - api.LogInfof("using number of cpu cores %d", num_of_workers) - } else if concurrency_method == "config" { - config_port := getEnv("CONFIG_PORT", "15000") - admin_api := fmt.Sprintf(admin_api_url, config_port) - workers, err := getEnvoyConcurrency(admin_api) - if err != nil { - api.LogWarnf("unable to fetch concurrency from admin server, using cpu cores. err: %s", err.Error()) - num_of_workers = runtime.NumCPU() - } else { - num_of_workers = workers - } - } else if concurrency_method == "custom" { - conc_number := getEnv("CONCURRENCY_NUMBER", "-1") - if conc_number == "-1" { - api.LogWarnf("concurrency number is not set as an env variable, using cpu cores") - num_of_workers = runtime.NumCPU() - } else if conc_num, err := strconv.Atoi(conc_number); err == nil && conc_num > 0 { - num_of_workers = conc_num - } else { - api.LogWarnf("error converting conc_number %s, using num of cpu cores", conc_number) - num_of_workers = runtime.NumCPU() - } - } else { - api.LogWarnf("unable to fetch concurrency from %s, using cpu cores", concurrency_method) - num_of_workers = runtime.NumCPU() - } + num_of_workers := getEnvoyConcurrency() configStruct := &xds.TypedStruct{} if err := any.UnmarshalTo(configStruct); err != nil { diff --git a/attachments/envoy/1.32/config.go b/attachments/envoy/1.32/config.go index 4229f33..a2c9687 100755 --- a/attachments/envoy/1.32/config.go +++ b/attachments/envoy/1.32/config.go @@ -35,59 +35,65 @@ unsigned long get_thread_id() { import "C" const Name = "cp_nano_filter" - -const Admin_api = "http://127.0.0.1:%s/server_info" +const admin_api_server_info = "http://127.0.0.1:%s/server_info" +const keep_alive_interval = 10 * time.Second var filter_id atomic.Int64 - -type nano_attachment C.struct_NanoAttachment - var attachments_map map[int]*nano_attachment = nil var thread_to_attachment_mapping map[int]int = nil var attachment_to_thread_mapping map[int]int = nil - var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil - var mutex sync.Mutex - -const keep_alive_interval = 10 * time.Second - var last_keep_alive time.Time +type nano_attachment C.struct_NanoAttachment + // EnvoyServerInfo represents the structure of the JSON response from /server_info type EnvoyServerInfo struct { Concurrency int `json:"concurrency"` } -// getEnvoyConcurrency fetches and returns the concurrency level of Envoy from the admin API -func getEnvoyConcurrency(admin_api_address string) (int, error) { - resp, err := http.Get(admin_api_address) - if err != nil { - return 0, fmt.Errorf("failed to reach Envoy admin API: %w", err) +func getEnvoyConcurrency() int { + concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") + + if concurrency_method == "numOfCores" { + api.LogWarnf("using number of CPU cores") + return runtime.NumCPU() } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("unexpected status code from Envoy admin API: %d", resp.StatusCode) + var conc_number string + + switch concurrency_method { + case "istioCpuLimit": + conc_number = getEnv("ISTIO_CPU_LIMIT", "-1") + api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number) + case "custom": + conc_number = getEnv("CONCURRENCY_NUMBER", "-1") + api.LogWarnf("using custom concurrency number, conc_number %s", conc_number) + default: + api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method) + return runtime.NumCPU() } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to read response body: %w", err) + if conc_number == "-1" { + api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores") + return runtime.NumCPU() } - var info EnvoyServerInfo - if err := json.Unmarshal(body, &info); err != nil { - return 0, fmt.Errorf("failed to parse JSON response: %w", err) + conc_num, err := strconv.Atoi(conc_number) + if err != nil || conc_num <= 0 { + api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number) + return runtime.NumCPU() } - return info.Concurrency, nil + return conc_num } func configurationServer() { - r := chi.NewRouter() + r := chi.NewRouter() - r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { + r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() worker_ids := make([]int, 0) @@ -131,7 +137,7 @@ func configurationServer() { workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" } - response, err := json.Marshal(workers_reload_status) + response, err := json.Marshal(workers_reload_status) if err != nil { api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) response = []byte(`{"error": "Internal Error"}`) @@ -141,11 +147,11 @@ func configurationServer() { w.WriteHeader(http.StatusInternalServerError) } - w.Header().Set("Content-Type", "application/json") - w.Write(response) - }) + w.Header().Set("Content-Type", "application/json") + w.Write(response) + }) - http.ListenAndServe(":8119", r) + http.ListenAndServe(":8119", r) } func init() { @@ -155,9 +161,9 @@ func init() { go configurationServer() } -type config struct {} +type config struct{} -type parser struct {} +type parser struct{} func sendKeepAlive() { for { @@ -172,14 +178,14 @@ func sendKeepAlive() { } func (p *parser) initFilterStructs() *filterRequestStructs { - return &filterRequestStructs { - http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), - http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), - http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), - http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), - http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), - http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), - attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), + return &filterRequestStructs{ + http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), + http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), + http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), + http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), + http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), + http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), + attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), } } @@ -193,37 +199,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int return conf, nil } - var num_of_workers int - concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") - - if concurrency_method == "numOfCores" { - num_of_workers = runtime.NumCPU() - api.LogInfof("using number of cpu cores %d", num_of_workers) - } else if concurrency_method == "config" { - config_port := getEnv("CONFIG_PORT", "15000") - admin_api := fmt.Sprintf(Admin_api, config_port) - workers, err := getEnvoyConcurrency(admin_api) - if err != nil { - api.LogWarnf("unable to fetch concurrency from admin server, using cpu cores. err: %s", err.Error()) - num_of_workers = runtime.NumCPU() - } else { - num_of_workers = workers - } - } else if concurrency_method == "custom" { - conc_number := getEnv("CONCURRENCY_NUMBER", "-1") - if conc_number == "-1" { - api.LogWarnf("concurrency number is not set as an env variable, using cpu cores") - num_of_workers = runtime.NumCPU() - } else if conc_num, err := strconv.Atoi(conc_number); err == nil && conc_num > 0 { - num_of_workers = conc_num - } else { - api.LogWarnf("error converting conc_number %s, using num of cpu cores", conc_number) - num_of_workers = runtime.NumCPU() - } - } else { - api.LogWarnf("unable to fetch concurrency from %s, using cpu cores", concurrency_method) - num_of_workers = runtime.NumCPU() - } + num_of_workers := getEnvoyConcurrency() configStruct := &xds.TypedStruct{} if err := any.UnmarshalTo(configStruct); err != nil { @@ -250,7 +226,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int //mutex.Unlock() } - go func (){ + go func() { sendKeepAlive() }() @@ -297,14 +273,13 @@ func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Strea session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) return &filter{ - callbacks: callbacks, - config: conf, - session_id: session_id, - cp_attachment: attachment_ptr, - session_data: session_data, + callbacks: callbacks, + config: conf, + session_id: session_id, + cp_attachment: attachment_ptr, + session_data: session_data, request_structs: attachment_to_filter_request_structs[worker_id], } } - func main() {} diff --git a/attachments/envoy/1.33/config.go b/attachments/envoy/1.33/config.go index 4229f33..a2c9687 100755 --- a/attachments/envoy/1.33/config.go +++ b/attachments/envoy/1.33/config.go @@ -35,59 +35,65 @@ unsigned long get_thread_id() { import "C" const Name = "cp_nano_filter" - -const Admin_api = "http://127.0.0.1:%s/server_info" +const admin_api_server_info = "http://127.0.0.1:%s/server_info" +const keep_alive_interval = 10 * time.Second var filter_id atomic.Int64 - -type nano_attachment C.struct_NanoAttachment - var attachments_map map[int]*nano_attachment = nil var thread_to_attachment_mapping map[int]int = nil var attachment_to_thread_mapping map[int]int = nil - var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil - var mutex sync.Mutex - -const keep_alive_interval = 10 * time.Second - var last_keep_alive time.Time +type nano_attachment C.struct_NanoAttachment + // EnvoyServerInfo represents the structure of the JSON response from /server_info type EnvoyServerInfo struct { Concurrency int `json:"concurrency"` } -// getEnvoyConcurrency fetches and returns the concurrency level of Envoy from the admin API -func getEnvoyConcurrency(admin_api_address string) (int, error) { - resp, err := http.Get(admin_api_address) - if err != nil { - return 0, fmt.Errorf("failed to reach Envoy admin API: %w", err) +func getEnvoyConcurrency() int { + concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") + + if concurrency_method == "numOfCores" { + api.LogWarnf("using number of CPU cores") + return runtime.NumCPU() } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf("unexpected status code from Envoy admin API: %d", resp.StatusCode) + var conc_number string + + switch concurrency_method { + case "istioCpuLimit": + conc_number = getEnv("ISTIO_CPU_LIMIT", "-1") + api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number) + case "custom": + conc_number = getEnv("CONCURRENCY_NUMBER", "-1") + api.LogWarnf("using custom concurrency number, conc_number %s", conc_number) + default: + api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method) + return runtime.NumCPU() } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to read response body: %w", err) + if conc_number == "-1" { + api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores") + return runtime.NumCPU() } - var info EnvoyServerInfo - if err := json.Unmarshal(body, &info); err != nil { - return 0, fmt.Errorf("failed to parse JSON response: %w", err) + conc_num, err := strconv.Atoi(conc_number) + if err != nil || conc_num <= 0 { + api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number) + return runtime.NumCPU() } - return info.Concurrency, nil + return conc_num } func configurationServer() { - r := chi.NewRouter() + r := chi.NewRouter() - r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { + r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { mutex.Lock() defer mutex.Unlock() worker_ids := make([]int, 0) @@ -131,7 +137,7 @@ func configurationServer() { workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" } - response, err := json.Marshal(workers_reload_status) + response, err := json.Marshal(workers_reload_status) if err != nil { api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) response = []byte(`{"error": "Internal Error"}`) @@ -141,11 +147,11 @@ func configurationServer() { w.WriteHeader(http.StatusInternalServerError) } - w.Header().Set("Content-Type", "application/json") - w.Write(response) - }) + w.Header().Set("Content-Type", "application/json") + w.Write(response) + }) - http.ListenAndServe(":8119", r) + http.ListenAndServe(":8119", r) } func init() { @@ -155,9 +161,9 @@ func init() { go configurationServer() } -type config struct {} +type config struct{} -type parser struct {} +type parser struct{} func sendKeepAlive() { for { @@ -172,14 +178,14 @@ func sendKeepAlive() { } func (p *parser) initFilterStructs() *filterRequestStructs { - return &filterRequestStructs { - http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), - http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), - http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), - http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), - http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), - http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), - attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), + return &filterRequestStructs{ + http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), + http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), + http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), + http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), + http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), + http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), + attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), } } @@ -193,37 +199,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int return conf, nil } - var num_of_workers int - concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores") - - if concurrency_method == "numOfCores" { - num_of_workers = runtime.NumCPU() - api.LogInfof("using number of cpu cores %d", num_of_workers) - } else if concurrency_method == "config" { - config_port := getEnv("CONFIG_PORT", "15000") - admin_api := fmt.Sprintf(Admin_api, config_port) - workers, err := getEnvoyConcurrency(admin_api) - if err != nil { - api.LogWarnf("unable to fetch concurrency from admin server, using cpu cores. err: %s", err.Error()) - num_of_workers = runtime.NumCPU() - } else { - num_of_workers = workers - } - } else if concurrency_method == "custom" { - conc_number := getEnv("CONCURRENCY_NUMBER", "-1") - if conc_number == "-1" { - api.LogWarnf("concurrency number is not set as an env variable, using cpu cores") - num_of_workers = runtime.NumCPU() - } else if conc_num, err := strconv.Atoi(conc_number); err == nil && conc_num > 0 { - num_of_workers = conc_num - } else { - api.LogWarnf("error converting conc_number %s, using num of cpu cores", conc_number) - num_of_workers = runtime.NumCPU() - } - } else { - api.LogWarnf("unable to fetch concurrency from %s, using cpu cores", concurrency_method) - num_of_workers = runtime.NumCPU() - } + num_of_workers := getEnvoyConcurrency() configStruct := &xds.TypedStruct{} if err := any.UnmarshalTo(configStruct); err != nil { @@ -250,7 +226,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int //mutex.Unlock() } - go func (){ + go func() { sendKeepAlive() }() @@ -297,14 +273,13 @@ func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Strea session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) return &filter{ - callbacks: callbacks, - config: conf, - session_id: session_id, - cp_attachment: attachment_ptr, - session_data: session_data, + callbacks: callbacks, + config: conf, + session_id: session_id, + cp_attachment: attachment_ptr, + session_data: session_data, request_structs: attachment_to_filter_request_structs[worker_id], } } - func main() {}