fix config.go file

This commit is contained in:
wiaamm 2025-05-27 16:29:25 +03:00
parent 5962caf2e9
commit 589aafdd1f
3 changed files with 147 additions and 147 deletions

View File

@ -60,7 +60,6 @@ func getEnvoyConcurrency() int {
api.LogWarnf("using number of CPU cores") api.LogWarnf("using number of CPU cores")
return runtime.NumCPU() return runtime.NumCPU()
} }
defer resp.Body.Close()
var conc_number string var conc_number string
@ -91,9 +90,9 @@ func getEnvoyConcurrency() int {
} }
func configurationServer() { func configurationServer() {
r := chi.NewRouter() r := chi.NewRouter()
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() defer mutex.Unlock()
worker_ids := make([]int, 0) worker_ids := make([]int, 0)
@ -137,7 +136,7 @@ func configurationServer() {
workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded"
} }
response, err := json.Marshal(workers_reload_status) response, err := json.Marshal(workers_reload_status)
if err != nil { if err != nil {
api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error())
response = []byte(`{"error": "Internal Error"}`) response = []byte(`{"error": "Internal Error"}`)
@ -147,23 +146,22 @@ func configurationServer() {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(response) w.Write(response)
}) })
http.ListenAndServe(":8119", r) http.ListenAndServe(":8119", r)
} }
func init() { func init() {
last_keep_alive = time.Time{} last_keep_alive = time.Time{}
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) envoyHttp.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
go configurationServer() go configurationServer()
} }
type config struct{} type config struct {}
type parser struct{} type parser struct {}
func sendKeepAlive() { func sendKeepAlive() {
for { for {
@ -178,14 +176,14 @@ func sendKeepAlive() {
} }
func (p *parser) initFilterStructs() *filterRequestStructs { func (p *parser) initFilterStructs() *filterRequestStructs {
return &filterRequestStructs{ return &filterRequestStructs {
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
} }
} }
@ -226,7 +224,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
//mutex.Unlock() //mutex.Unlock()
} }
go func() { go func (){
sendKeepAlive() sendKeepAlive()
}() }()
@ -242,43 +240,45 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return &newConfig return &newConfig
} }
func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { func ConfigFactory(c interface{}) api.StreamFilterFactory {
conf, ok := c.(*config) conf, ok := c.(*config)
if !ok { if !ok {
panic("unexpected config type") panic("unexpected config type")
} }
worker_thread_id := int(C.get_thread_id()) return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
api.LogDebugf("worker_thread_id: %d", worker_thread_id) worker_thread_id := int(C.get_thread_id())
if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { api.LogDebugf("worker_thread_id: %d", worker_thread_id)
api.LogDebugf("need to add new thread to the map") if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok {
map_size := len(attachment_to_thread_mapping) api.LogDebugf("need to add new thread to the map")
if map_size < len(attachments_map) { map_size := len(attachment_to_thread_mapping)
attachment_to_thread_mapping[map_size] = worker_thread_id if map_size < len(attachments_map) {
thread_to_attachment_mapping[worker_thread_id] = map_size attachment_to_thread_mapping[map_size] = worker_thread_id
api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) thread_to_attachment_mapping[worker_thread_id] = map_size
api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping))
api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping)
} else { api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping)
panic("unexpected thread id") } else {
panic("unexpected thread id")
}
} }
}
worker_id := thread_to_attachment_mapping[int(worker_thread_id)] worker_id := thread_to_attachment_mapping[int(worker_thread_id)]
api.LogDebugf("worker_id: %d", worker_id) api.LogDebugf("worker_id: %d", worker_id)
filter_id.Add(1) filter_id.Add(1)
session_id := filter_id.Load() session_id := filter_id.Load()
attachment_ptr := attachments_map[worker_id] attachment_ptr := attachments_map[worker_id]
session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id))
return &filter{ return &filter{
callbacks: callbacks, callbacks: callbacks,
config: conf, config: conf,
session_id: session_id, session_id: session_id,
cp_attachment: attachment_ptr, cp_attachment: attachment_ptr,
session_data: session_data, session_data: session_data,
request_structs: attachment_to_filter_request_structs[worker_id], request_structs: attachment_to_filter_request_structs[worker_id],
}
} }
} }

View File

@ -60,7 +60,6 @@ func getEnvoyConcurrency() int {
api.LogWarnf("using number of CPU cores") api.LogWarnf("using number of CPU cores")
return runtime.NumCPU() return runtime.NumCPU()
} }
defer resp.Body.Close()
var conc_number string var conc_number string
@ -91,9 +90,9 @@ func getEnvoyConcurrency() int {
} }
func configurationServer() { func configurationServer() {
r := chi.NewRouter() r := chi.NewRouter()
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() defer mutex.Unlock()
worker_ids := make([]int, 0) worker_ids := make([]int, 0)
@ -137,7 +136,7 @@ func configurationServer() {
workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded"
} }
response, err := json.Marshal(workers_reload_status) response, err := json.Marshal(workers_reload_status)
if err != nil { if err != nil {
api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error())
response = []byte(`{"error": "Internal Error"}`) response = []byte(`{"error": "Internal Error"}`)
@ -147,23 +146,22 @@ func configurationServer() {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(response) w.Write(response)
}) })
http.ListenAndServe(":8119", r) http.ListenAndServe(":8119", r)
} }
func init() { func init() {
last_keep_alive = time.Time{} last_keep_alive = time.Time{}
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) envoyHttp.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
go configurationServer() go configurationServer()
} }
type config struct{} type config struct {}
type parser struct{} type parser struct {}
func sendKeepAlive() { func sendKeepAlive() {
for { for {
@ -178,14 +176,14 @@ func sendKeepAlive() {
} }
func (p *parser) initFilterStructs() *filterRequestStructs { func (p *parser) initFilterStructs() *filterRequestStructs {
return &filterRequestStructs{ return &filterRequestStructs {
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
} }
} }
@ -226,7 +224,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
//mutex.Unlock() //mutex.Unlock()
} }
go func() { go func (){
sendKeepAlive() sendKeepAlive()
}() }()
@ -242,43 +240,45 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return &newConfig return &newConfig
} }
func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { func ConfigFactory(c interface{}) api.StreamFilterFactory {
conf, ok := c.(*config) conf, ok := c.(*config)
if !ok { if !ok {
panic("unexpected config type") panic("unexpected config type")
} }
worker_thread_id := int(C.get_thread_id()) return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
api.LogDebugf("worker_thread_id: %d", worker_thread_id) worker_thread_id := int(C.get_thread_id())
if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { api.LogDebugf("worker_thread_id: %d", worker_thread_id)
api.LogDebugf("need to add new thread to the map") if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok {
map_size := len(attachment_to_thread_mapping) api.LogDebugf("need to add new thread to the map")
if map_size < len(attachments_map) { map_size := len(attachment_to_thread_mapping)
attachment_to_thread_mapping[map_size] = worker_thread_id if map_size < len(attachments_map) {
thread_to_attachment_mapping[worker_thread_id] = map_size attachment_to_thread_mapping[map_size] = worker_thread_id
api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) thread_to_attachment_mapping[worker_thread_id] = map_size
api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping))
api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping)
} else { api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping)
panic("unexpected thread id") } else {
panic("unexpected thread id")
}
} }
}
worker_id := thread_to_attachment_mapping[int(worker_thread_id)] worker_id := thread_to_attachment_mapping[int(worker_thread_id)]
api.LogDebugf("worker_id: %d", worker_id) api.LogDebugf("worker_id: %d", worker_id)
filter_id.Add(1) filter_id.Add(1)
session_id := filter_id.Load() session_id := filter_id.Load()
attachment_ptr := attachments_map[worker_id] attachment_ptr := attachments_map[worker_id]
session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id))
return &filter{ return &filter{
callbacks: callbacks, callbacks: callbacks,
config: conf, config: conf,
session_id: session_id, session_id: session_id,
cp_attachment: attachment_ptr, cp_attachment: attachment_ptr,
session_data: session_data, session_data: session_data,
request_structs: attachment_to_filter_request_structs[worker_id], request_structs: attachment_to_filter_request_structs[worker_id],
}
} }
} }

View File

@ -60,7 +60,6 @@ func getEnvoyConcurrency() int {
api.LogWarnf("using number of CPU cores") api.LogWarnf("using number of CPU cores")
return runtime.NumCPU() return runtime.NumCPU()
} }
defer resp.Body.Close()
var conc_number string var conc_number string
@ -91,9 +90,9 @@ func getEnvoyConcurrency() int {
} }
func configurationServer() { func configurationServer() {
r := chi.NewRouter() r := chi.NewRouter()
r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) { r.Get("/load-config", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() defer mutex.Unlock()
worker_ids := make([]int, 0) worker_ids := make([]int, 0)
@ -137,7 +136,7 @@ func configurationServer() {
workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded" workers_reload_status[strconv.Itoa(worker_id)] = "Reload Configuraiton Succeded"
} }
response, err := json.Marshal(workers_reload_status) response, err := json.Marshal(workers_reload_status)
if err != nil { if err != nil {
api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error()) api.LogWarnf("Error while sending reponse about reload configuration. Err: %s", err.Error())
response = []byte(`{"error": "Internal Error"}`) response = []byte(`{"error": "Internal Error"}`)
@ -147,23 +146,22 @@ func configurationServer() {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(response) w.Write(response)
}) })
http.ListenAndServe(":8119", r) http.ListenAndServe(":8119", r)
} }
func init() { func init() {
last_keep_alive = time.Time{} last_keep_alive = time.Time{}
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, ConfigFactory, &parser{}) envoyHttp.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
go configurationServer() go configurationServer()
} }
type config struct{} type config struct {}
type parser struct{} type parser struct {}
func sendKeepAlive() { func sendKeepAlive() {
for { for {
@ -178,14 +176,14 @@ func sendKeepAlive() {
} }
func (p *parser) initFilterStructs() *filterRequestStructs { func (p *parser) initFilterStructs() *filterRequestStructs {
return &filterRequestStructs{ return &filterRequestStructs {
http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)), http_start_data: (*C.HttpRequestFilterData)(C.malloc(C.sizeof_HttpRequestFilterData)),
http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)), http_meta_data: (*C.HttpMetaData)(C.malloc(C.sizeof_HttpMetaData)),
http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)), http_headers: (*C.HttpHeaders)(C.malloc(C.sizeof_HttpHeaders)),
http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)), http_headers_data: (*C.HttpHeaderData)(C.malloc(10000 * C.sizeof_HttpHeaderData)),
http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)), http_res_headers: (*C.ResHttpHeaders)(C.malloc(C.sizeof_ResHttpHeaders)),
http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)), http_body_data: (*C.nano_str_t)(C.malloc(10000 * C.sizeof_nano_str_t)),
attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)), attachment_data: (*C.AttachmentData)(C.malloc(C.sizeof_AttachmentData)),
} }
} }
@ -226,7 +224,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
//mutex.Unlock() //mutex.Unlock()
} }
go func() { go func (){
sendKeepAlive() sendKeepAlive()
}() }()
@ -242,43 +240,45 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return &newConfig return &newConfig
} }
func ConfigFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { func ConfigFactory(c interface{}) api.StreamFilterFactory {
conf, ok := c.(*config) conf, ok := c.(*config)
if !ok { if !ok {
panic("unexpected config type") panic("unexpected config type")
} }
worker_thread_id := int(C.get_thread_id()) return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
api.LogDebugf("worker_thread_id: %d", worker_thread_id) worker_thread_id := int(C.get_thread_id())
if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok { api.LogDebugf("worker_thread_id: %d", worker_thread_id)
api.LogDebugf("need to add new thread to the map") if _, ok := thread_to_attachment_mapping[int(worker_thread_id)]; !ok {
map_size := len(attachment_to_thread_mapping) api.LogDebugf("need to add new thread to the map")
if map_size < len(attachments_map) { map_size := len(attachment_to_thread_mapping)
attachment_to_thread_mapping[map_size] = worker_thread_id if map_size < len(attachments_map) {
thread_to_attachment_mapping[worker_thread_id] = map_size attachment_to_thread_mapping[map_size] = worker_thread_id
api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping)) thread_to_attachment_mapping[worker_thread_id] = map_size
api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping) api.LogDebugf("len(attachment_to_thread_mapping): %d", len(attachment_to_thread_mapping))
api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping) api.LogDebugf("thread_to_attachment_mapping: %v", thread_to_attachment_mapping)
} else { api.LogDebugf("attachment_to_thread_mapping: %v", attachment_to_thread_mapping)
panic("unexpected thread id") } else {
panic("unexpected thread id")
}
} }
}
worker_id := thread_to_attachment_mapping[int(worker_thread_id)] worker_id := thread_to_attachment_mapping[int(worker_thread_id)]
api.LogDebugf("worker_id: %d", worker_id) api.LogDebugf("worker_id: %d", worker_id)
filter_id.Add(1) filter_id.Add(1)
session_id := filter_id.Load() session_id := filter_id.Load()
attachment_ptr := attachments_map[worker_id] attachment_ptr := attachments_map[worker_id]
session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id)) session_data := C.InitSessionData((*C.NanoAttachment)(attachment_ptr), C.SessionID(session_id))
return &filter{ return &filter{
callbacks: callbacks, callbacks: callbacks,
config: conf, config: conf,
session_id: session_id, session_id: session_id,
cp_attachment: attachment_ptr, cp_attachment: attachment_ptr,
session_data: session_data, session_data: session_data,
request_structs: attachment_to_filter_request_structs[worker_id], request_structs: attachment_to_filter_request_structs[worker_id],
}
} }
} }