fix config.go file

This commit is contained in:
wiaamm 2025-05-27 15:59:44 +03:00
parent cc7e65729c
commit 5962caf2e9
3 changed files with 139 additions and 213 deletions

View File

@ -35,53 +35,59 @@ unsigned long get_thread_id() {
import "C"
const Name = "cp_nano_filter"
const admin_api_url = "http://127.0.0.1:%s/server_info"
const admin_api_server_info = "http://127.0.0.1:%s/server_info"
const keep_alive_interval = 10 * time.Second
var filter_id atomic.Int64
type nano_attachment C.struct_NanoAttachment
var attachments_map map[int]*nano_attachment = nil
var thread_to_attachment_mapping map[int]int = nil
var attachment_to_thread_mapping map[int]int = nil
var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil
var mutex sync.Mutex
const keep_alive_interval = 10 * time.Second
var last_keep_alive time.Time
type nano_attachment C.struct_NanoAttachment
// EnvoyServerInfo represents the structure of the JSON response from /server_info
type EnvoyServerInfo struct {
Concurrency int `json:"concurrency"`
}
// getEnvoyConcurrency fetches and returns the concurrency level of Envoy from the admin API
func getEnvoyConcurrency(admin_api_address string) (int, error) {
resp, err := http.Get(admin_api_address)
if err != nil {
return 0, fmt.Errorf("failed to reach Envoy admin API: %w", err)
func getEnvoyConcurrency() int {
concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores")
if concurrency_method == "numOfCores" {
api.LogWarnf("using number of CPU cores")
return runtime.NumCPU()
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code from Envoy admin API: %d", resp.StatusCode)
var conc_number string
switch concurrency_method {
case "istioCpuLimit":
conc_number = getEnv("ISTIO_CPU_LIMIT", "-1")
api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number)
case "custom":
conc_number = getEnv("CONCURRENCY_NUMBER", "-1")
api.LogWarnf("using custom concurrency number, conc_number %s", conc_number)
default:
api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method)
return runtime.NumCPU()
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read response body: %w", err)
if conc_number == "-1" {
api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores")
return runtime.NumCPU()
}
var info EnvoyServerInfo
if err := json.Unmarshal(body, &info); err != nil {
return 0, fmt.Errorf("failed to parse JSON response: %w", err)
conc_num, err := strconv.Atoi(conc_number)
if err != nil || conc_num <= 0 {
api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number)
return runtime.NumCPU()
}
return info.Concurrency, nil
return conc_num
}
func configurationServer() {
@ -193,37 +199,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
return conf, nil
}
var num_of_workers int
concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores")
if concurrency_method == "numOfCores" {
num_of_workers = runtime.NumCPU()
api.LogInfof("using number of cpu cores %d", num_of_workers)
} else if concurrency_method == "config" {
config_port := getEnv("CONFIG_PORT", "15000")
admin_api := fmt.Sprintf(admin_api_url, config_port)
workers, err := getEnvoyConcurrency(admin_api)
if err != nil {
api.LogWarnf("unable to fetch concurrency from admin server, using cpu cores. err: %s", err.Error())
num_of_workers = runtime.NumCPU()
} else {
num_of_workers = workers
}
} else if concurrency_method == "custom" {
conc_number := getEnv("CONCURRENCY_NUMBER", "-1")
if conc_number == "-1" {
api.LogWarnf("concurrency number is not set as an env variable, using cpu cores")
num_of_workers = runtime.NumCPU()
} else if conc_num, err := strconv.Atoi(conc_number); err == nil && conc_num > 0 {
num_of_workers = conc_num
} else {
api.LogWarnf("error converting conc_number %s, using num of cpu cores", conc_number)
num_of_workers = runtime.NumCPU()
}
} else {
api.LogWarnf("unable to fetch concurrency from %s, using cpu cores", concurrency_method)
num_of_workers = runtime.NumCPU()
}
num_of_workers := getEnvoyConcurrency()
configStruct := &xds.TypedStruct{}
if err := any.UnmarshalTo(configStruct); err != nil {

View File

@ -35,59 +35,65 @@ unsigned long get_thread_id() {
import "C"
const Name = "cp_nano_filter"
const Admin_api = "http://127.0.0.1:%s/server_info"
const admin_api_server_info = "http://127.0.0.1:%s/server_info"
const keep_alive_interval = 10 * time.Second
var filter_id atomic.Int64
type nano_attachment C.struct_NanoAttachment
var attachments_map map[int]*nano_attachment = nil
var thread_to_attachment_mapping map[int]int = nil
var attachment_to_thread_mapping map[int]int = nil
var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil
var mutex sync.Mutex
const keep_alive_interval = 10 * time.Second
var last_keep_alive time.Time
type nano_attachment C.struct_NanoAttachment
// EnvoyServerInfo represents the structure of the JSON response from /server_info
type EnvoyServerInfo struct {
Concurrency int `json:"concurrency"`
}
// getEnvoyConcurrency fetches and returns the concurrency level of Envoy from the admin API
func getEnvoyConcurrency(admin_api_address string) (int, error) {
resp, err := http.Get(admin_api_address)
if err != nil {
return 0, fmt.Errorf("failed to reach Envoy admin API: %w", err)
func getEnvoyConcurrency() int {
concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores")
if concurrency_method == "numOfCores" {
api.LogWarnf("using number of CPU cores")
return runtime.NumCPU()
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code from Envoy admin API: %d", resp.StatusCode)
var conc_number string
switch concurrency_method {
case "istioCpuLimit":
conc_number = getEnv("ISTIO_CPU_LIMIT", "-1")
api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number)
case "custom":
conc_number = getEnv("CONCURRENCY_NUMBER", "-1")
api.LogWarnf("using custom concurrency number, conc_number %s", conc_number)
default:
api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method)
return runtime.NumCPU()
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read response body: %w", err)
if conc_number == "-1" {
api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores")
return runtime.NumCPU()
}
var info EnvoyServerInfo
if err := json.Unmarshal(body, &info); err != nil {
return 0, fmt.Errorf("failed to parse JSON response: %w", err)
conc_num, err := strconv.Atoi(conc_number)
if err != nil || conc_num <= 0 {
api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number)
return runtime.NumCPU()
}
return info.Concurrency, nil
return conc_num
}
func configurationServer() {
r := chi.NewRouter()
r := chi.NewRouter()
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock()
defer mutex.Unlock()
worker_ids := make([]int, 0)
@ -131,7 +137,7 @@ func configurationServer() {
workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded"
}
response, err := json.Marshal(workers_reload_status)
response, err := json.Marshal(workers_reload_status)
if err != nil {
api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error())
response = []byte(`{"error": "Internal Error"}`)
@ -141,11 +147,11 @@ func configurationServer() {
w.WriteHeader(http.StatusInternalServerError)
}
w.Header().Set("Content-Type", "application/json")
w.Write(response)
})
w.Header().Set("Content-Type", "application/json")
w.Write(response)
})
http.ListenAndServe(":8119", r)
http.ListenAndServe(":8119", r)
}
func init() {
@ -155,9 +161,9 @@ func init() {
go configurationServer()
}
type config struct {}
type config struct{}
type parser struct {}
type parser struct{}
func sendKeepAlive() {
for {
@ -172,14 +178,14 @@ func sendKeepAlive() {
}
func (p *parser) initFilterStructs() *filterRequestStructs {
return &filterRequestStructs {
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
return &filterRequestStructs{
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
}
}
@ -193,37 +199,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
return conf, nil
}
var num_of_workers int
concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores")
if concurrency_method == "numOfCores" {
num_of_workers = runtime.NumCPU()
api.LogInfof("using number of cpu cores %d", num_of_workers)
} else if concurrency_method == "config" {
config_port := getEnv("CONFIG_PORT", "15000")
admin_api := fmt.Sprintf(Admin_api, config_port)
workers, err := getEnvoyConcurrency(admin_api)
if err != nil {
api.LogWarnf("unable to fetch concurrency from admin server, using cpu cores. err: %s", err.Error())
num_of_workers = runtime.NumCPU()
} else {
num_of_workers = workers
}
} else if concurrency_method == "custom" {
conc_number := getEnv("CONCURRENCY_NUMBER", "-1")
if conc_number == "-1" {
api.LogWarnf("concurrency number is not set as an env variable, using cpu cores")
num_of_workers = runtime.NumCPU()
} else if conc_num, err := strconv.Atoi(conc_number); err == nil && conc_num > 0 {
num_of_workers = conc_num
} else {
api.LogWarnf("error converting conc_number %s, using num of cpu cores", conc_number)
num_of_workers = runtime.NumCPU()
}
} else {
api.LogWarnf("unable to fetch concurrency from %s, using cpu cores", concurrency_method)
num_of_workers = runtime.NumCPU()
}
num_of_workers := getEnvoyConcurrency()
configStruct := &xds.TypedStruct{}
if err := any.UnmarshalTo(configStruct); err != nil {
@ -250,7 +226,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
//mutex.Unlock()
}
go func (){
go func() {
sendKeepAlive()
}()
@ -297,14 +273,13 @@ func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Strea
session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id))
return &filter{
callbacks: callbacks,
config: conf,
session_id: session_id,
cp_attachment: attachment_ptr,
session_data: session_data,
callbacks: callbacks,
config: conf,
session_id: session_id,
cp_attachment: attachment_ptr,
session_data: session_data,
request_structs: attachment_to_filter_request_structs[worker_id],
}
}
func main() {}

View File

@ -35,59 +35,65 @@ unsigned long get_thread_id() {
import "C"
const Name = "cp_nano_filter"
const Admin_api = "http://127.0.0.1:%s/server_info"
const admin_api_server_info = "http://127.0.0.1:%s/server_info"
const keep_alive_interval = 10 * time.Second
var filter_id atomic.Int64
type nano_attachment C.struct_NanoAttachment
var attachments_map map[int]*nano_attachment = nil
var thread_to_attachment_mapping map[int]int = nil
var attachment_to_thread_mapping map[int]int = nil
var attachment_to_filter_request_structs map[int]*filterRequestStructs = nil
var mutex sync.Mutex
const keep_alive_interval = 10 * time.Second
var last_keep_alive time.Time
type nano_attachment C.struct_NanoAttachment
// EnvoyServerInfo represents the structure of the JSON response from /server_info
type EnvoyServerInfo struct {
Concurrency int `json:"concurrency"`
}
// getEnvoyConcurrency fetches and returns the concurrency level of Envoy from the admin API
func getEnvoyConcurrency(admin_api_address string) (int, error) {
resp, err := http.Get(admin_api_address)
if err != nil {
return 0, fmt.Errorf("failed to reach Envoy admin API: %w", err)
func getEnvoyConcurrency() int {
concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores")
if concurrency_method == "numOfCores" {
api.LogWarnf("using number of CPU cores")
return runtime.NumCPU()
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code from Envoy admin API: %d", resp.StatusCode)
var conc_number string
switch concurrency_method {
case "istioCpuLimit":
conc_number = getEnv("ISTIO_CPU_LIMIT", "-1")
api.LogWarnf("using istioCpuLimit, conc_number %s", conc_number)
case "custom":
conc_number = getEnv("CONCURRENCY_NUMBER", "-1")
api.LogWarnf("using custom concurrency number, conc_number %s", conc_number)
default:
api.LogWarnf("unknown concurrency method %s, using number of CPU cores", concurrency_method)
return runtime.NumCPU()
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read response body: %w", err)
if conc_number == "-1" {
api.LogWarnf("concurrency number is not set as an env variable, using number of CPU cores")
return runtime.NumCPU()
}
var info EnvoyServerInfo
if err := json.Unmarshal(body, &info); err != nil {
return 0, fmt.Errorf("failed to parse JSON response: %w", err)
conc_num, err := strconv.Atoi(conc_number)
if err != nil || conc_num <= 0 {
api.LogWarnf("error converting concurrency number %s, using number of CPU cores", conc_number)
return runtime.NumCPU()
}
return info.Concurrency, nil
return conc_num
}
func configurationServer() {
r := chi.NewRouter()
r := chi.NewRouter()
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock()
defer mutex.Unlock()
worker_ids := make([]int, 0)
@ -131,7 +137,7 @@ func configurationServer() {
workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded"
}
response, err := json.Marshal(workers_reload_status)
response, err := json.Marshal(workers_reload_status)
if err != nil {
api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error())
response = []byte(`{"error": "Internal Error"}`)
@ -141,11 +147,11 @@ func configurationServer() {
w.WriteHeader(http.StatusInternalServerError)
}
w.Header().Set("Content-Type", "application/json")
w.Write(response)
})
w.Header().Set("Content-Type", "application/json")
w.Write(response)
})
http.ListenAndServe(":8119", r)
http.ListenAndServe(":8119", r)
}
func init() {
@ -155,9 +161,9 @@ func init() {
go configurationServer()
}
type config struct {}
type config struct{}
type parser struct {}
type parser struct{}
func sendKeepAlive() {
for {
@ -172,14 +178,14 @@ func sendKeepAlive() {
}
func (p *parser) initFilterStructs() *filterRequestStructs {
return &filterRequestStructs {
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
return &filterRequestStructs{
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
}
}
@ -193,37 +199,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
return conf, nil
}
var num_of_workers int
concurrency_method := getEnv("CONCURRENCY_CALC", "numOfCores")
if concurrency_method == "numOfCores" {
num_of_workers = runtime.NumCPU()
api.LogInfof("using number of cpu cores %d", num_of_workers)
} else if concurrency_method == "config" {
config_port := getEnv("CONFIG_PORT", "15000")
admin_api := fmt.Sprintf(Admin_api, config_port)
workers, err := getEnvoyConcurrency(admin_api)
if err != nil {
api.LogWarnf("unable to fetch concurrency from admin server, using cpu cores. err: %s", err.Error())
num_of_workers = runtime.NumCPU()
} else {
num_of_workers = workers
}
} else if concurrency_method == "custom" {
conc_number := getEnv("CONCURRENCY_NUMBER", "-1")
if conc_number == "-1" {
api.LogWarnf("concurrency number is not set as an env variable, using cpu cores")
num_of_workers = runtime.NumCPU()
} else if conc_num, err := strconv.Atoi(conc_number); err == nil && conc_num > 0 {
num_of_workers = conc_num
} else {
api.LogWarnf("error converting conc_number %s, using num of cpu cores", conc_number)
num_of_workers = runtime.NumCPU()
}
} else {
api.LogWarnf("unable to fetch concurrency from %s, using cpu cores", concurrency_method)
num_of_workers = runtime.NumCPU()
}
num_of_workers := getEnvoyConcurrency()
configStruct := &xds.TypedStruct{}
if err := any.UnmarshalTo(configStruct); err != nil {
@ -250,7 +226,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
//mutex.Unlock()
}
go func (){
go func() {
sendKeepAlive()
}()
@ -297,14 +273,13 @@ func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Strea
session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id))
return &filter{
callbacks: callbacks,
config: conf,
session_id: session_id,
cp_attachment: attachment_ptr,
session_data: session_data,
callbacks: callbacks,
config: conf,
session_id: session_id,
cp_attachment: attachment_ptr,
session_data: session_data,
request_structs: attachment_to_filter_request_structs[worker_id],
}
}
func main() {}