Updating local policy, metrics, and local update trigger

This commit is contained in:
Ned Wright
2023-12-14 16:02:53 +00:00
parent 9d848264f3
commit a3014ab381
56 changed files with 3781 additions and 331 deletions

View File

@@ -5,6 +5,10 @@ install(FILES cp-nano-http-transaction-handler-debug-conf.json DESTINATION http_
install(FILES cp-nano-http-transaction-handler.cfg DESTINATION http_transaction_handler_service/conf PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)
install(FILES k8s-log-file-handler.sh DESTINATION http_transaction_handler_service/bin PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)
install(DIRECTORY snort3_to_ips/ DESTINATION http_transaction_handler_service/scripts/snort3_to_ips FILES_MATCHING PATTERN "*" PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE)
install(FILES snort_to_ips_local.py DESTINATION http_transaction_handler_service/scripts PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)
install(FILES exception.py DESTINATION http_transaction_handler_service/scripts PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)
#install(DIRECTORY ${ng_module_osrc_pcre2_path}/lib/ DESTINATION http_transaction_handler_service/lib/ FILES_MATCHING PATTERN "libpcre2-8.so*" PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)
#install(DIRECTORY ${ng_module_osrc_pcre2_path}/lib/ DESTINATION http_transaction_handler_service/lib/ FILES_MATCHING PATTERN "libpcre2-posix.so*" PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)
#install(RUNTIME_DEPENDENCY_SET xml DESTINATION http_transaction_handler_service/lib/ PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ)

View File

@@ -0,0 +1,8 @@
class SnortHookException(Exception):
def __init__(self, message="", practice_id=""):
self.message = message
self.practice_id = practice_id
def __str__(self):
return "{}".format(self.message)

View File

@@ -7,6 +7,7 @@ INSTALLATION_TIME=$(date)
WAAP_POLICY_FOLDER_PATH=/etc/cp/conf/waap
IPS_POLICY_FOLDER_PATH=/etc/cp/conf/ips
SNORT_SCRIPTS_PATH=/etc/cp/scripts/
DEFAULT_HTTP_TRANSACTION_HANDLER_EVENT_BUFFER=/var/log/nano_agent/event_buffer/HTTP_TRANSACTION_HANDLER_events
@@ -216,6 +217,10 @@ run_installation()
install_policy $is_debug_mode "$var_certs_dir"
install_waap
cp_exec "cp -fr scripts/snort3_to_ips $SNORT_SCRIPTS_PATH/snort3_to_ips"
cp_exec "cp -f scripts/exception.py $SNORT_SCRIPTS_PATH/exception.py"
cp_exec "cp -f scripts/snort_to_ips_local.py $SNORT_SCRIPTS_PATH/snort_to_ips_local.py"
${INSTALL_COMMAND} lib/libshmem_ipc.so /usr/lib/cpnano/
${INSTALL_COMMAND} lib/libcompression_utils.so /usr/lib/
cp_exec "ldconfig"

View File

@@ -0,0 +1,53 @@
from exception import SnortHookException
class DetectionRules:
def __init__(self):
self.data = {
"type": "simple",
"SSM": "",
"keywords": "",
"context": []
}
def validate(self):
if self.data["context"] and (len(self.data["keywords"]) or len(self.data["SSM"])):
pass
else:
raise SnortHookException("No detection rule in the rule")
def set_default_if_needed(self):
pass
def is_key_valid(self, key):
if key in self.data.keys():
return True
return False
def parse_data(self, key, value):
self.parse_func_map[key](self, value)
def parse_keywords_data(self, value):
if len(self.data["keywords"]) != 0:
self.data["keywords"] += " "
self.data["keywords"] += value
def parse_context_data(self, value):
if value not in self.data["context"]:
self.data["context"].append(value)
def parse_ssm_data(self, value):
if self.data["SSM"] != "":
raise SnortHookException("two fast_pattern content")
self.data["SSM"] += value
def parse_type_data(self, value):
raise SnortHookException("DetectionRules of type not implemented")
parse_func_map = {
"type": parse_type_data,
"SSM": parse_ssm_data,
"keywords": parse_keywords_data,
"context": parse_context_data
}

View File

@@ -0,0 +1,91 @@
from exception import SnortHookException
class IpsKW:
def __init__(self, keyword, value, optional_modifier):
self.keyword = keyword
self.value = value
self.optional_modifiers = optional_modifier
def construct(self):
return self.construct_func_map[self.keyword](self)
def construct_name(self):
return [("protectionName", self.value)]
def construct_detection_keyword(self):
constructed_data = []
optional_modifier_str = ""
for key in self.optional_modifiers:
if key in ['nocase', 'relative']:
optional_modifier_str += ", " + key
elif key == 'depth':
optional_modifier_str += ", {} {}".format(key, self.optional_modifiers[key])
elif key == 'offset':
if self.optional_modifiers[key] == 0:
continue
optional_modifier_str += ", {} {}".format(key, self.optional_modifiers[key])
elif key == 'part':
optional_modifier_str += ", {} {}".format(key, self.optional_modifiers[key])
constructed_data.append(("context", self.optional_modifiers[key]))
else:
raise SnortHookException("Error: Key '{}' is not supported in keywords".format(key))
constructed_data.append(("keywords", "data: {}{};".format(self.value, optional_modifier_str)))
return constructed_data
def construct_pcre(self):
constructed_data = []
optional_modifier_str = ""
for key in self.optional_modifiers:
if key in ['nocase', 'relative']:
optional_modifier_str += ", " + key
elif key in ['offset']:
if self.optional_modifiers[key] == 0:
continue
optional_modifier_str += ", {} {}".format(key, self.optional_modifiers[key])
elif key == 'part':
optional_modifier_str += ", {} {}".format(key, self.optional_modifiers[key])
constructed_data.append(("context", self.optional_modifiers[key]))
else:
raise SnortHookException("Error: Key {} is not supported in pcre".format(key))
constructed_data.append(("keywords", "pcre: {}{};".format(self.value, optional_modifier_str)))
return constructed_data
def construct_SSM_keyword(self):
constructed_data = []
for key in self.optional_modifiers:
if key == 'part':
constructed_data.append(("context", self.optional_modifiers[key]))
constructed_data.append(("SSM", self.value.strip("\"")))
return constructed_data
def construct_length_keyword(self):
return [("keywords", "{}: {}, {}, part {};".format(self.keyword, self.optional_modifiers['var'],
self.value, self.optional_modifiers['part']))]
def construct_cvelist(self):
return [(self.keyword, self.value)]
def construct_severity(self):
return [(self.keyword, self.value)]
def construct_tags(self):
return [(self.keyword, "Vul_Type_{}".format(self.value))]
def construct_sid_rev(self):
return [("maintrainId", "{}:{}".format(self.keyword, self.value))]
construct_func_map = {
'protectionName': construct_name,
'keywords': construct_detection_keyword,
'length': construct_length_keyword,
'SSM': construct_SSM_keyword,
'tags': construct_tags,
'pcre': construct_pcre,
'cveList': construct_cvelist,
'severity': construct_severity,
'sid': construct_sid_rev,
'rev': construct_sid_rev
}

View File

@@ -0,0 +1,34 @@
from snort3_to_ips.Signature.ProtectionMetadata import ProtectionMetadata
from snort3_to_ips.Signature.DetectionRules import DetectionRules
from exception import SnortHookException
class IpsSignature:
def __init__(self):
self.protectionMetadata = ProtectionMetadata()
self.detectionRules = DetectionRules()
def __str__(self):
return str(self.output_signature())
def parse_data(self, constructs):
for json_key, json_value in constructs:
if self.protectionMetadata.is_key_valid(json_key):
self.protectionMetadata.parse_data(json_key, json_value)
elif self.detectionRules.is_key_valid(json_key):
self.detectionRules.parse_data(json_key, json_value)
else:
raise SnortHookException("'{}' is not a valid keyword in snort signature".format(json_key))
def validate(self):
self.protectionMetadata.validate()
self.detectionRules.validate()
def set_default_if_needed(self):
self.protectionMetadata.set_default_if_needed()
self.detectionRules.set_default_if_needed()
def output_signature(self):
self.validate()
self.set_default_if_needed()
return {"protectionMetadata": self.protectionMetadata.data, "detectionRules": self.detectionRules.data}

View File

@@ -0,0 +1,70 @@
class ProtectionMetadata:
def __init__(self):
self.data = {
"protectionName": "",
"severity": "",
"confidenceLevel": "",
"performanceImpact": "",
"lastUpdate": "",
"maintrainId": "",
"tags": ["Snort"],
"cveList": [],
"silent": False
}
def validate(self):
if len(self.data["protectionName"]) == 0:
raise Exception("msg field missing in the snort rule")
def set_default_if_needed(self):
if self.data["severity"] == "":
self.data["severity"] = "Critical"
if self.data["confidenceLevel"] == "":
self.data["confidenceLevel"] = "High"
if self.data["performanceImpact"] == "":
self.data["performanceImpact"] = "Medium"
if self.data["lastUpdate"] == "":
self.data["lastUpdate"] = "20210909"
def is_key_valid(self, key):
if key in self.data.keys():
return True
return False
def parse_data(self, key, value):
self.parse_func_map[key](self, key, value)
def parse_name_data(self, key, value):
self.data[key] = value
def parse_cvelist(self, key, value):
if value not in self.data[key]:
self.data[key].append(value)
def parse_severity(self, key, value):
self.data[key] = value
def parse_main_train_id(self, key, value):
if self.data[key] != "":
self.data[key] += " "
self.data[key] += value
def parse_tags(self, key, value):
if value not in self.data[key]:
self.data[key].append(value)
def not_implemented(self, key, value):
print("Not implemented for key {} with value {}".format(key, value))
parse_func_map = {
"protectionName": parse_name_data,
"severity": parse_severity,
"confidenceLevel": not_implemented,
"performanceImpact": not_implemented,
"lastUpdate": not_implemented,
"maintrainId": parse_main_train_id,
"tags": parse_tags,
"cveList": parse_cvelist,
"silent": not_implemented,
}

View File

@@ -0,0 +1,49 @@
import json
from snort3_to_ips.SnortRule.SnortRule import SnortRule
from snort3_to_ips.utils.utils import parse_snort_rules_line, is_invalid_line
from exception import SnortHookException
class Signatures:
def __init__(self):
self.signatures = []
self.error_rules = []
self.error_internal = []
def load_snort_signatures(self, rules_input):
line_number = 0
for line in rules_input.split("\n"):
line_number += 1
if is_invalid_line(line):
continue
rule = line.strip()
header, body = parse_snort_rules_line(rule)
try:
snort_rule = SnortRule(header)
if not snort_rule.is_http_rule(body):
continue
snort_rule.parse_body(body)
self.signatures.append(snort_rule.convert())
except SnortHookException as se:
self.error_rules.append({"Error": str(se), "Line": line_number})
except Exception as e:
self.error_internal.append({"Error": str(e), "Line": line_number})
def reset(self):
self.signatures = []
self.error_rules = []
def output_errors(self, output_pf):
output = {"Errors": self.error_rules}
with open(output_pf, 'w') as f:
json.dump(output, f, ensure_ascii=False, indent=4)
def output_ips_signature_package(self, output_pf):
output = {"IPSSnortSigs": {"protections": self.signatures}}
with open(output_pf, 'w') as f:
json.dump(output, f, ensure_ascii=False, indent=4)
def get_payload_data(self):
return self.signatures, self.error_rules, self.error_internal

View File

@@ -0,0 +1,190 @@
from snort3_to_ips.Signature.IpsKW import IpsKW
from snort3_to_ips.utils.utils import is_hex_segment_in_str
from exception import SnortHookException
class SnortKW:
def __init__(self, keyword, value, optional_modifiers):
self.keyword = keyword
self.value = value
self.optional_modifiers = optional_modifiers
def convert(self, snort_rule):
return self.convert_func_map[self.keyword](self, snort_rule)
def convert_snort_content_kw(self, snort_rule):
ips_data_modifiers = self.get_ips_modifiers_from_rule(snort_rule)
if ips_data_modifiers.pop('fast_pattern', False) and not is_hex_segment_in_str(self.value):
return [IpsKW("SSM", self.value, ips_data_modifiers)]
return [IpsKW("keywords", self.value, ips_data_modifiers)]
def convert_snort_pcre_kw(self, snort_rule):
ips_data_modifiers = snort_rule.get_ips_context()
return [IpsKW("pcre", self.value, ips_data_modifiers)]
def convert_snort_flow_kw(self, snort_rule):
for modifier in self.optional_modifiers:
if modifier == "to_server" or modifier == "from_client":
snort_rule.flow = "client_to_server"
elif modifier == "to_client" or modifier == "from_server":
snort_rule.flow = "server_to_client"
elif modifier in ["established", "not_established", "stateless"]:
pass
elif modifier in ["no_stream", "only_stream"]:
pass
elif modifier in ["no_frag", "only_frag"]:
pass
else:
raise SnortHookException("unsupported modifier for '{}': '{}'".format(self.keyword, modifier))
return []
def convert_snort_msg_kw(self, snort_rule):
return [IpsKW("protectionName", self.value.strip("\""), "")]
def convert_snort_sticky_buffer(self, snort_rule):
snort_rule.sticky_buffer = self.keyword
if self.value != "":
if self.keyword != "http_header":
raise SnortHookException("arguments are not supported for '{}' of value '{}'".format(self.keyword, self.value))
if self.value == "field":
snort_rule.dynamic_buffer = self.optional_modifiers
else:
raise SnortHookException("Unknown argument for '{}', '{}'".format(self.keyword, self.value))
return []
def convert_snort_reference(self, snort_rule):
if self.value == 'bugtraq':
return [IpsKW("cveList", "BUGTRAQ-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'cve':
return [IpsKW("cveList", "CVE-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'nessus':
return [IpsKW("cveList", "NESSUS-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'arachnids':
return [IpsKW("cveList", "ARACHNIDS-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'mcafee':
return [IpsKW("cveList", "MCAFEE-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'osvdb':
return [IpsKW("cveList", "OSVDB-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'msb':
return [IpsKW("cveList", "MSB-{}".format(self.optional_modifiers[self.value]), "")]
elif self.value == 'url':
return [IpsKW("cveList", "http://{}".format(self.optional_modifiers[self.value]), "")]
else:
raise SnortHookException("Unknown system in Reference of value: {}".format(self.value))
def convert_snort_classtype(self, snort_rule):
return [IpsKW("severity", self.optional_modifiers[self.value], ""),
IpsKW("tags", self.value.replace("-", " ").title().replace(" ", "_"), "")]
def convert_snort_priority(self, snort_rule):
return [IpsKW("severity", self.value, "")]
def convert_snort_bufferlen_kw(self, snort_rule):
ips_kw_list = []
ips_data_modifiers = snort_rule.get_ips_context()
if "<=>" in self.value:
left_var, right_var = self.value.split("<=>")
if not left_var.isnumeric() or not right_var.isnumeric():
raise SnortHookException("bufferlen - illegal numerical value")
ips_data_modifiers['var'] = left_var
ips_kw_list.append(IpsKW("length", "min", ips_data_modifiers))
ips_data_modifiers = snort_rule.get_ips_context()
ips_data_modifiers['var'] = right_var
ips_kw_list.append(IpsKW("length", "max", ips_data_modifiers))
elif "<>" in self.value:
left_var, right_var = self.value.split("<>")
ips_data_modifiers['var'] = left_var
if not left_var.isnumeric() or not right_var.isnumeric():
raise SnortHookException("bufferlen - illegal numerical value")
ips_kw_list.append(IpsKW("length", "min", ips_data_modifiers))
ips_data_modifiers = snort_rule.get_ips_context()
ips_data_modifiers['var'] = right_var
ips_kw_list.append(IpsKW("length", "max", ips_data_modifiers))
elif "<" in self.value:
if not self.value.split("<")[1].isnumeric():
raise SnortHookException("bufferlen - illegal numerical value")
ips_data_modifiers['var'] = self.value.split("<")[1]
ips_kw_list.append(IpsKW("length", "max", ips_data_modifiers))
elif ">" in self.value:
ips_data_modifiers['var'] = self.value.split(">")[1]
if not self.value.split(">")[1].isnumeric():
raise SnortHookException("bufferlen - illegal numerical value")
ips_kw_list.append(IpsKW("length", "min", ips_data_modifiers))
elif self.value.isnumeric():
ips_data_modifiers['var'] = self.value
ips_kw_list.append(IpsKW("length", "exact", ips_data_modifiers))
else:
raise SnortHookException("bufferlen operator is illegal")
return ips_kw_list
def convert_sid_kw(self, snort_rule):
return [IpsKW("sid", self.value, "")]
def convert_rev_kw(self, snort_rule):
return [IpsKW("rev", self.value, "")]
def not_implemented(self, snort_rule):
return []
convert_func_map = {
# Primarily functions
'content': convert_snort_content_kw,
'pcre': convert_snort_pcre_kw,
'bufferlen': convert_snort_bufferlen_kw,
'flow': convert_snort_flow_kw,
# metadata functions
'msg': convert_snort_msg_kw,
'reference': convert_snort_reference,
'gid': not_implemented,
'sid': convert_sid_kw,
'rev': convert_rev_kw,
'classtype': convert_snort_classtype,
'priority': convert_snort_priority,
'metadata': not_implemented,
# http functions
'pkt_data': convert_snort_sticky_buffer,
'http_uri': convert_snort_sticky_buffer,
'http_raw_uri': convert_snort_sticky_buffer,
'http_header': convert_snort_sticky_buffer,
'http_raw_header': convert_snort_sticky_buffer,
'http_method': convert_snort_sticky_buffer,
'http_client_body': convert_snort_sticky_buffer,
'http_cookie': convert_snort_sticky_buffer,
'http_raw_cookie': convert_snort_sticky_buffer,
'http_stat_code': convert_snort_sticky_buffer,
'http_stat_msg': convert_snort_sticky_buffer,
'http_encode': convert_snort_sticky_buffer,
# backward compatibility rules.
'service': not_implemented
}
def get_ips_modifiers_from_rule(self, snort_rule):
ips_data_modifiers = {}
for rule in self.optional_modifiers:
if rule == 'nocase':
ips_data_modifiers['nocase'] = self.optional_modifiers['nocase']
elif rule == 'depth':
ips_data_modifiers['depth'] = self.optional_modifiers['depth']
elif rule == 'distance':
if int(self.optional_modifiers['distance']) != 0:
ips_data_modifiers['offset'] = self.optional_modifiers['distance']
ips_data_modifiers['relative'] = True
elif rule == 'offset':
if int(self.optional_modifiers['offset']) != 0:
ips_data_modifiers['offset'] = self.optional_modifiers['offset']
elif rule == 'within':
ips_data_modifiers['depth'] = self.optional_modifiers['within']
ips_data_modifiers['relative'] = True
elif rule == 'fast_pattern':
ips_data_modifiers['fast_pattern'] = True
else:
# print("Error: Not supported convert from {}".format(rule))
raise SnortHookException("For keyword '{}', unsupported modifier '{}'".format(self.keyword, rule))
ips_data_modifiers.update(snort_rule.get_ips_context())
return ips_data_modifiers

View File

@@ -0,0 +1,206 @@
from exception import SnortHookException
class SnortKWParser:
def __init__(self):
pass
def parse_kw_parameters(self, snort_rule):
tmp_split = snort_rule.split(":")
keyword = tmp_split[0]
return self.parse_func_map[keyword](self, snort_rule)
def simple_rule(self, snort_rule):
return snort_rule.strip(), "", {}
def simple_binary_rule(self, snort_rule):
keyword, value = snort_rule.split(":")
return keyword, value, {}
def parse_content(self, snort_rule):
tmp_split = snort_rule.split(":", 1)
keyword = tmp_split[0]
value = tmp_split[1][:tmp_split[1].find("\"", 2) + 1].strip()
optional_string = tmp_split[1][tmp_split[1].find("\"", 2) + 1:].strip(",").strip()
if optional_string == "":
return keyword, value, {}
optional_modifiers = {}
for optional_command in optional_string.split(","):
optional_command = optional_command.strip()
command = optional_command.strip().split(" ", 1)
if len(command) == 1:
optional_modifiers[command[0]] = True
else:
optional_modifiers[command[0]] = command[1]
return keyword, value, optional_modifiers
def parse_pcre(self, snort_rule):
tmp_split = snort_rule.split(":", 1)
keyword = tmp_split[0]
value = tmp_split[1][:tmp_split[1].rfind("\"", 2) + 1].strip()
optional_string = tmp_split[1][tmp_split[1].rfind("/", 1) + 1:tmp_split[1].rfind("\"", 1)].strip()
optional_modifiers = {}
for modifier in optional_string:
if modifier in 'ismxGARE':
pass
elif modifier in 'O':
raise SnortHookException("unsupported {} modifier {}".format(keyword, modifier))
else:
raise SnortHookException("Unknown {} modifier {}".format(keyword, modifier))
return keyword, value, optional_modifiers
def parse_reference(self, snort_rule):
keyword = snort_rule.split(":")[0]
value = snort_rule.split(":")[1].split(",")[0]
optional_modifiers = {value: snort_rule.split(":")[1].split(",")[1]}
return keyword, value, optional_modifiers
def parse_classtype(self, snort_rule):
keyword = snort_rule.split(":")[0]
value = snort_rule.split(":")[1].strip()
optional_modifiers = {value: self.snort_default_classifications[value]}
return keyword, value, optional_modifiers
def parse_priority(self, snort_rule):
keyword = snort_rule.split(":")[0]
value = int(snort_rule.split(":")[1].strip())
if value > 4:
value = 4
return keyword, self.priority_map[value], {}
def parse_sticky_buffer(self, snort_rule):
split_rule = snort_rule.split(":", 1)
if len(split_rule) == 2:
arguments_split = split_rule[1].split(" ")
return split_rule[0], arguments_split[0].strip(), {arguments_split[0].strip(): arguments_split[1].strip()}
return snort_rule.strip(), "", {}
def parse_metadata(self, snort_rule):
keyword, value = snort_rule.split(":")
optional_modifiers = {}
for metadata in value.split(","):
key, value = metadata.strip().split(" ", 1)
if key not in optional_modifiers.keys():
optional_modifiers[key] = set()
optional_modifiers[key].add(value)
return keyword, "", optional_modifiers
def parse_flow(self, snort_rule):
keyword, value = snort_rule.split(":")
optional_modifiers = []
for modifier in value.split(','):
optional_modifiers.append(modifier.strip())
return keyword, value, optional_modifiers
def parse_service(self, snort_rule):
keyword, value = snort_rule.split(":")
return keyword, value, {}
def not_implemented(self, snort_rule):
raise SnortHookException("unsupported keyword '{}'".format(snort_rule.split(":")[0]))
parse_func_map = {
# Primarily functions
'content': parse_content,
'pcre': parse_pcre,
'flow': parse_flow,
'bufferlen': simple_binary_rule,
# metadata functions
'msg': simple_binary_rule,
'reference': parse_reference,
'gid': simple_binary_rule,
'sid': simple_binary_rule,
'rev': simple_binary_rule,
'classtype': parse_classtype,
'priority': parse_priority,
'metadata': parse_metadata,
# http functions
'pkt_data': simple_rule,
'http_uri': parse_sticky_buffer,
'http_raw_uri': parse_sticky_buffer,
'http_header': parse_sticky_buffer,
'http_raw_header': parse_sticky_buffer,
'http_method': parse_sticky_buffer,
'http_client_body': parse_sticky_buffer,
'http_cookie': parse_sticky_buffer,
'http_stat_code': parse_sticky_buffer,
'http_stat_msg': parse_sticky_buffer,
'http_raw_cookie': parse_sticky_buffer,
# Snort 2 functions
'service': parse_service,
# Not implemented
'byte_test': not_implemented,
'file_data': not_implemented,
'byte_jump': not_implemented,
'isdataat': not_implemented,
'dsize': not_implemented,
'icode': not_implemented,
'flowbits': not_implemented,
'itype': not_implemented,
'dce_iface': not_implemented,
'cmp_id': not_implemented,
'detection_filter': not_implemented,
'flags': not_implemented,
'sip_stat_code': not_implemented,
'ack': not_implemented,
'ip_proto': not_implemented,
'sip_method': not_implemented,
'asn1': not_implemented,
'ssl_version': not_implemented,
'base64_decode': not_implemented,
'ssl_state': not_implemented,
'sip_header': not_implemented,
'fragbits': not_implemented,
}
priority_map = {
1: "High",
2: "Medium",
3: "Low",
4: "Very Low"
}
# Default snort classification for reference.
# If needed custom config, we should provide it. (1 being High, 2 Medium, 3 Low, 4 Very Low)
snort_default_classifications = {
"attempted-admin": "High",
"attempted-user": "High",
"inappropriate-content": "High",
"policy-violation": "High",
"shellcode-detect": "High",
"successful-admin": "High",
"successful-user": "High",
"trojan-activity": "High",
"unsuccessful-user": "High",
"web-application-attack": "High",
"attempted-dos": "Medium",
"attempted-recon": "Medium",
"bad-unknown": "Medium",
"default-login-attempt": "Medium",
"denial-of-service": "Medium",
"misc-attack": "Medium",
"non-standard-protocol": "Medium",
"rpc-portmap-decode": "Medium",
"successful-dos": "Medium",
"successful-recon-largescale": "Medium",
"successful-recon-limited": "Medium",
"suspicious-filename-detect": "Medium",
"suspicious-login": "Medium",
"system-call-detect": "Medium",
"unusual-client-port-connection": "Medium",
"web-application-activity": "Medium",
"icmp-event": "Low",
"misc-activity": "Low",
"network-scan": "Low",
"not-suspicious": "Low",
"protocol-command-decode": "Low",
"string-detect": "Low",
"unknown": "Low",
"tcp-connection": "Very Low"
}

View File

@@ -0,0 +1,121 @@
from snort3_to_ips.SnortRule.SnortRuleHeader import SnortRuleHeader
from snort3_to_ips.SnortRule.SnortKW import SnortKW
from snort3_to_ips.SnortRule.SnortKWParser import SnortKWParser
from snort3_to_ips.Signature.IpsSignature import IpsSignature
from exception import SnortHookException
class SnortRule:
def __init__(self, rule_header):
self.header = SnortRuleHeader(rule_header)
self.keywords = []
self.sticky_buffer = "Default"
self.dynamic_buffer = {}
self.flow = "client_to_server"
def is_http_rule(self, parsed_body):
return self.header.is_http_header() or "service:http" in parsed_body
def add_keyword(self, snort_rule):
kw_parser = SnortKWParser()
keyword, value, optional_modifiers = kw_parser.parse_kw_parameters(snort_rule)
self.keywords.append(SnortKW(keyword, value, optional_modifiers))
def parse_body(self, input_snort_rule_body):
for keyword in input_snort_rule_body:
self.add_keyword(keyword)
def convert(self):
signature = IpsSignature()
for keyword in self.keywords:
for converted_keyword in keyword.convert(self):
signature.parse_data(converted_keyword.construct())
return signature.output_signature()
def get_ips_context(self):
return self.convert_http_map[self.sticky_buffer](self)
def convert_default(self):
return {"part": "HTTP_RAW"}
def convert_pkt_data(self):
raise SnortHookException("Unsupported keyword 'pkt_data'")
def convert_http_uri(self):
return {"part": "HTTP_COMPLETE_URL_DECODED"}
def convert_http_raw_uri(self):
return {"part": "HTTP_COMPLETE_URL_ENCODED"}
def convert_http_header(self):
if self.flow == "client_to_server":
if 'field' in self.dynamic_buffer.keys():
return {"part": 'HTTP_REQUEST_HEADER_{}'.format(self.dynamic_buffer['field'].upper())}
else:
return {"part": "HTTP_REQUEST_HEADER"}
elif self.flow == "server_to_client":
if 'field' in self.dynamic_buffer.keys():
return {"part": 'HTTP_RESPONSE_HEADER_{}'.format(self.dynamic_buffer['field'].upper())}
else:
return {"part": "HTTP_RESPONSE_HEADER"}
else:
raise SnortHookException("Unknown Flow {}".format(self.flow))
def convert_http_raw_header(self):
if self.flow == "client_to_server":
return {"part": "HTTP_REQUEST_HEADER"}
elif self.flow == "server_to_client":
return {"part": "HTTP_RESPONSE_HEADER"}
else:
raise SnortHookException("Unknown Flow {}".format(self.flow))
def convert_http_method(self):
return {"part": "HTTP_METHOD"}
def convert_http_client_body(self):
return {"part": "HTTP_REQUEST_BODY"}
def convert_http_cookie(self):
if self.flow == "client_to_server":
return {"part": "HTTP_REQUEST_HEADER_COOKIE"}
elif self.flow == "server_to_client":
return {"part": "HTTP_RESPONSE_HEADER_COOKIE"}
else:
raise SnortHookException("Unknown Flow {}".format(self.flow))
def convert_http_raw_cookie(self):
if self.flow == "client_to_server":
return {"part": "HTTP_REQUEST_HEADER_COOKIE"}
elif self.flow == "server_to_client":
return {"part": "HTTP_RESPONSE_HEADER_COOKIE"}
else:
raise SnortHookException("Unknown Flow {}".format(self.flow))
def convert_http_stat_code(self):
if self.flow == "client_to_server":
raise SnortHookException("http_stat_code isn't supported with flow: to_server, from_client")
elif self.flow == "server_to_client":
return {"part": "HTTP_RESPONSE_CODE"}
else:
raise SnortHookException("Unknown Flow {}".format(self.flow))
def not_implemented(self):
raise SnortHookException("unsupported keyword '{}'".format(self.sticky_buffer))
convert_http_map = {
'Default': convert_default,
'pkt_data': convert_pkt_data,
'http_uri': convert_http_uri,
'http_raw_uri': convert_http_raw_uri,
'http_header': convert_http_header,
'http_raw_header': convert_http_raw_header,
'http_method': convert_http_method,
'http_client_body': convert_http_client_body,
'http_cookie': convert_http_cookie,
'http_raw_cookie': convert_http_raw_cookie,
'http_stat_code': convert_http_stat_code,
'http_stat_msg': not_implemented,
'http_encode': not_implemented
}

View File

@@ -0,0 +1,60 @@
from exception import SnortHookException
class SnortRuleHeader:
def __init__(self, rule_header_str):
self.rules = {}
rule_list = rule_header_str.split(" ")
if len(rule_list) == 2:
# alert http
self.validate_and_parse_action(rule_list[0])
self.validate_and_parse_protocol(rule_list[1])
self.rules['source_ports'] = ""
self.rules['destination_ports'] = ""
self.rules['directional_op'] = ""
elif len(rule_list) == 7:
# alert http $HOME_NET any -> $EXTERNAL_NET $HTTP_PORTS
self.validate_and_parse_action(rule_list[0])
self.validate_and_parse_protocol(rule_list[1])
self.validate_and_parse_directional_operator(rule_list[4])
self.validate_and_parse_source_ports(rule_list[3])
self.validate_and_parse_destination_ports(rule_list[6])
else:
raise SnortHookException("Invalid Snort rule header")
def validate_and_parse_action(self, action):
if action in ['drop']:
self.rules['action'] = action
elif action in ['alert', 'log', 'pass', 'reject', 'sdrop']:
pass
# print("Header action {} not supported".format(action))
else:
raise SnortHookException("Unknown header action {}".format(action))
def validate_and_parse_protocol(self, protocol):
if protocol in ['http']:
self.rules['protocol'] = protocol
elif protocol in ['tcp', 'udp', 'icmp', 'ip', 'file']:
self.rules['protocol'] = protocol
else:
raise SnortHookException("Error: Unknown header protocol {}".format(protocol))
def validate_and_parse_directional_operator(self, directional_op):
if directional_op in ['->', '<>']:
self.rules['directional_op'] = directional_op
else:
raise SnortHookException("Error: Unknown unsupported header directional operator {}".format(directional_op))
def validate_and_parse_source_ports(self, ports):
if ports in ['$HTTP_PORTS']:
self.rules['source_ports'] = ports
else:
self.rules['source_ports'] = ""
def validate_and_parse_destination_ports(self, ports):
if ports in ['$HTTP_PORTS']:
self.rules['destination_ports'] = ports
else:
self.rules['destination_ports'] = ""
def is_http_header(self):
return self.rules['protocol'] == 'http' or self.rules['destination_ports'] == "$HTTP_PORTS"

View File

@@ -0,0 +1,7 @@
import snort3_to_ips.Signature.Signatures as Signatures
def convert_incoming_rules(rules):
signatures = Signatures.Signatures()
signatures.load_snort_signatures(rules)
return signatures.get_payload_data()

View File

@@ -0,0 +1,129 @@
import hashlib
import base64
from exception import SnortHookException
def parse_snort_rules_line(line):
header = line[0:line.find("(")].strip()
tmp_body = line[line.find("(") + 1:line.rfind(")")].replace("\n", "").strip().split(";")
body = []
tmp_body_segment = ""
for rule in tmp_body:
if len(rule) == 0:
continue
rule = rule.strip()
if rule[len(rule) - 1] == "\\":
tmp_body_segment += "{};".format(rule[:len(rule) - 1])
else:
body.append(tmp_body_segment + rule)
tmp_body_segment = ""
return header, body
def is_hex_segment_in_str(value):
if value.count("|") - value.count("\\|") > 0:
return True
return False
def is_invalid_line(line):
tmp_line = line.strip()
return len(tmp_line) == 0 or tmp_line[0] == "#"
def generate_version_id(data, hash_mode="md5", input_mode="decoded_data"):
if hash_mode == "md5":
hash_mod = hashlib.md5()
elif hash_mode == "sha1":
hash_mod = hashlib.sha1()
else:
return ""
if input_mode == "file":
with open(data, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_mod.update(chunk)
elif input_mode == "decoded_data":
hash_mod.update(data.encode())
return hash_mod.hexdigest()
def verify_custom_signatures(signatures):
if signatures is None:
return False
return signatures['isFileExist'] and signatures['size'] != 0
def decode_custom_signature(signatures):
data = signatures.split(",", 1)
metadata = data[0]
if metadata == 'data:':
return ""
elif metadata != 'data:application/octet-stream;base64':
raise SnortHookException("Invalid Snort file")
base64_message = base64.b64decode(data[1])
return base64_message.decode("utf-8")
def prepare_warnings_log(warnings):
if not warnings:
return []
if len(warnings) <= 10:
ret_warnings = []
for warning in warnings:
if warning['errorType'] == "SnortRule":
tmp_id = warning['id']
ret_warnings.append({"id": warning['id'],
"name": "Snort Warning",
"type": "Web Application",
"sub_type": "Snort Conversion",
"message": "Asset {}, skipped line {}: {}".format(warning["assetName"], warning['Line'], warning['Error'])
})
else:
assets_errors = {}
for warning in warnings:
if warning['errorType'] == "SnortRule":
tmp_id = warning['id']
asset_name = warning["assetName"]
if asset_name not in assets_errors.keys():
assets_errors[asset_name] = {}
if warning['Error'] not in assets_errors[asset_name].keys():
assets_errors[asset_name][warning['Error']] = 1
else:
assets_errors[asset_name][warning['Error']] += 1
ret_warnings = []
for asset_name in assets_errors.keys():
for err in assets_errors[asset_name].keys():
if assets_errors[asset_name][err] == 1:
message_format = "Asset {}: skipped {} {} time"
else:
message_format = "Asset {}: skipped {} {} times"
ret_warnings.append({"id": tmp_id,
"name": "Snort Warning",
"type": "Web Application",
"sub_type": "Snort Conversion",
"message": message_format.format(asset_name, err, assets_errors[asset_name][err])
})
for warning in warnings:
if warning['errorType'] != "SnortRule":
tmp_id = warning['id']
ret_warnings.append({"id": warning['id'],
"name": "Snort Warning",
"type": "Web Application",
"sub_type": "Snort Conversion",
"message": warning['Error']
})
if len(ret_warnings) == 1:
final_message_format = "To remove warning, please edit the Snort signatures file"
else:
final_message_format = "To remove warnings, please edit the Snort signatures file"
ret_warnings.append({"id": tmp_id,
"name": "Snort Warning",
"type": "Web Application",
"sub_type": "Snort Conversion",
"message": final_message_format
})
return ret_warnings

View File

@@ -0,0 +1,30 @@
import os
import snort3_to_ips.Signature.Signatures as Signatures
import sys
def convert_snort_to_ips_package(input_pf, output_pf, error_pf):
signatures = Signatures.Signatures()
with open(input_pf) as f:
input_data = f.read()
signatures.load_snort_signatures(input_data)
signatures.output_ips_signature_package(output_pf)
signatures.output_errors(error_pf)
if __name__ == '__main__':
if len(sys.argv) < 4:
print("Usage: python3 snort_to_ips_local.py <input_file> <output_file> <error_file>")
exit(1)
# Path to snort 3 rules file
in_pf = os.path.join("snort3_to_ips", "data", sys.argv[1])
# Path to output file (will create one if it does not exist)
out_pf = os.path.join("snort3_to_ips", "data", sys.argv[2])
# Path to output errors file (will create one if it does not exist)
err_pf = os.path.join("snort3_to_ips", "data", sys.argv[3])
convert_snort_to_ips_package(in_pf, out_pf, err_pf)