From 53008412a4fac75f31e01be0b124f68d801321e5 Mon Sep 17 00:00:00 2001 From: Eric Brown Date: Tue, 19 Mar 2024 20:43:30 -0700 Subject: [PATCH] Add new rule to detect regex denial-of-service patterns (#372) New rule that checks various functions of the re module for suspicious patterns that might cause catastrophic backtracking. Closes: #371 Signed-off-by: Eric Brown --- docs/rules.md | 1 + .../python/stdlib/re-denial-of-service.md | 10 + precli/core/redos.py | 309 ++++++++++++++++++ .../python/stdlib/re_denial_of_service.py | 100 ++++++ setup.cfg | 3 + tests/unit/rules/python/stdlib/re/__init__.py | 0 .../python/stdlib/re/examples/re_compile.py | 11 + .../stdlib/re/examples/re_compile_good.py | 6 + .../python/stdlib/re/examples/re_findall.py | 11 + .../python/stdlib/re/examples/re_finditer.py | 11 + .../python/stdlib/re/examples/re_fullmatch.py | 11 + .../python/stdlib/re/examples/re_match.py | 11 + .../python/stdlib/re/examples/re_search.py | 11 + .../stdlib/re/examples/re_search_good.py | 6 + .../python/stdlib/re/examples/re_split.py | 11 + .../rules/python/stdlib/re/examples/re_sub.py | 11 + .../python/stdlib/re/examples/re_subn.py | 11 + .../stdlib/re/test_re_denial_of_service.py | 55 ++++ 18 files changed, 589 insertions(+) create mode 100644 docs/rules/python/stdlib/re-denial-of-service.md create mode 100644 precli/core/redos.py create mode 100644 precli/rules/python/stdlib/re_denial_of_service.py create mode 100644 tests/unit/rules/python/stdlib/re/__init__.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_compile.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_compile_good.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_findall.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_finditer.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_fullmatch.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_match.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_search.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_search_good.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_split.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_sub.py create mode 100644 tests/unit/rules/python/stdlib/re/examples/re_subn.py create mode 100644 tests/unit/rules/python/stdlib/re/test_re_denial_of_service.py diff --git a/docs/rules.md b/docs/rules.md index d14c6a8f..d6d451b1 100644 --- a/docs/rules.md +++ b/docs/rules.md @@ -44,3 +44,4 @@ | PY030 | [socketserver — unrestricted bind](rules/python/stdlib/socketserver-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socketserver` Module | | PY031 | [http — unrestricted bind](rules/python/stdlib/http-server-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `http.server` Module | | PY032 | [xmlrpc — unrestricted bind](rules/python/stdlib/xmlrpc-server-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `xmlrpc.server` Module | +| PY033 | [re — denial of service](rules/python/stdlib/re-denial-of-service.md) | Inefficient Regular Expression Complexity in `re` Module | diff --git a/docs/rules/python/stdlib/re-denial-of-service.md b/docs/rules/python/stdlib/re-denial-of-service.md new file mode 100644 index 00000000..b4baeeef --- /dev/null +++ b/docs/rules/python/stdlib/re-denial-of-service.md @@ -0,0 +1,10 @@ +--- +id: PY033 +title: re — denial of service +hide_title: true +pagination_prev: null +pagination_next: null +slug: /rules/PY033 +--- + +::: precli.rules.python.stdlib.re_denial_of_service diff --git a/precli/core/redos.py b/precli/core/redos.py new file mode 100644 index 00000000..8b744d62 --- /dev/null +++ b/precli/core/redos.py @@ -0,0 +1,309 @@ +# Copyright 2024 Secure Saurce LLC +# Copyright 2019 Duo Security +import collections +import itertools +import sys +from re import _constants as constants +from re import _parser as parser + + +CR = collections.namedtuple("CR", ["cr_min", "cr_max"]) + +CATEGORY_TO_RANGE = { + constants.CATEGORY_DIGIT: [(48, 57)], + constants.CATEGORY_NOT_DIGIT: [(0, 47), (58, sys.maxunicode)], + constants.CATEGORY_SPACE: [(9, 13), (32, 32)], + constants.CATEGORY_NOT_SPACE: [(0, 8), (14, 31), (33, sys.maxunicode)], + constants.CATEGORY_WORD: [(48, 57), (65, 90), (95, 95), (97, 122)], + constants.CATEGORY_NOT_WORD: [ + (0, 47), + (58, 64), + (91, 94), + (96, 96), + (123, sys.maxunicode), + ], +} + + +class OpNode: + def __init__( + self, op: constants._NamedIntConstant, args: tuple, backtrackable=False + ): + self.op = op + self.args = args + self.backtrackable = backtrackable + self.children = collections.deque() + + def __str__(self, level: int = 0) -> str: + result = ( + f"{' ' * level}{self.op}: args={self.args} " + f"backtrackable={self.backtrackable}\n" + ) + + for child in self.children: + result += child.__str__(level + 1) + + return result + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__} - op={self.op} args={self.args} " + f"backtrackable={self.backtrackable}>" + ) + + +def _build_op_tree_helper( + node: OpNode, subpattern: parser.SubPattern, parent_backtrackable: bool +): + prev_sibling_backtrackable = False + + # Iterating in reverse helps with determining backtrackability. A + # subpattern's ability to backtrack depends on subsequent subpatterns, so + # it's easier to determine the last subpattern's backtrackability then + # propagate that information backwards. + for op, av in reversed(subpattern.data): + args = [] + subpatterns = [] + + if op is constants.BRANCH: + for a in av[1]: + subpatterns.append(a) + elif op is constants.GROUPREF_EXISTS: + condgroup, item_yes, item_no = av + subpatterns.append(item_yes) + if item_no: + subpatterns.append(item_no) + elif isinstance(av, (tuple, list)): + for a in av: + if isinstance(a, parser.SubPattern): + subpatterns.append(a) + else: + args.append(a) + else: + args.append(av) + + current_backtrackable = ( + parent_backtrackable or prev_sibling_backtrackable + ) + new_node = OpNode(op, tuple(args), current_backtrackable) + for sp in reversed(subpatterns): + _build_op_tree_helper(new_node, sp, current_backtrackable) + + prev_sibling_backtrackable = ( + prev_sibling_backtrackable or not optional_repeat(new_node) + ) + node.children.appendleft(new_node) + + +def build_op_tree(node: OpNode, subpattern: parser.SubPattern): + _build_op_tree_helper(node, subpattern, False) + + +class CharacterRange: + def __init__(self, character_ranges: CR, negate: bool = False): + self.character_ranges = character_ranges + self.negate = negate + + @classmethod + def from_any(cls, _any: tuple): + """E.g. '.'""" + return cls([CR(cr_min=0, cr_max=sys.maxunicode)]) + + @classmethod + def from_literal(cls, literal: tuple): + """E.g. 'a'""" + return cls([CR(cr_min=literal[0], cr_max=literal[0])]) + + @classmethod + def from_not_literal(cls, not_literal: tuple): + """E.g. '[^a]'""" + return cls( + [CR(cr_min=not_literal[0], cr_max=not_literal[0])], negate=True + ) + + @staticmethod + def _parse_in_nodes(nodes: tuple): + results = [] + for node_type, args in nodes: + match node_type: + case constants.LITERAL: + results.append(CR(cr_min=args, cr_max=args)) + case constants.RANGE: + results.append(CR(cr_min=args[0], cr_max=args[1])) + case constants.CATEGORY: + for c, r in CATEGORY_TO_RANGE.items(): + if args is c: + results.extend( + CR(cr_min=r_min, cr_max=r_max) + for r_min, r_max in r + ) + + return results + + @classmethod + def from_in(cls, _in: tuple): + """E.g. '[abcA-Z]'""" + character_ranges = cls._parse_in_nodes(_in) + return cls(character_ranges) + + @classmethod + def from_not_in(cls, not_in: tuple): + """E.g. '[^abcA-Z]'""" + # Avoid initial NEGATE + character_ranges = cls._parse_in_nodes(not_in[1:]) + return cls(character_ranges, negate=True) + + @classmethod + def from_op_node(cls, node: OpNode): + match node.op: + case constants.ANY: + return cls.from_any(node.args) + case constants.LITERAL: + return cls.from_literal(node.args) + case constants.NOT_LITERAL: + return cls.from_not_literal(node.args) + case constants.IN: + if node.args and node.args[0] == (constants.NEGATE, None): + return cls.from_not_in(node.args) + else: + return cls.from_in(node.args) + + # Unsupported OpNode + return None + + def overlap(self, other_character_range: CR): + if self.negate and other_character_range.negate: + # Unless the sets are disjoint and cover the entire character + # space they will have overlap - let's punt on the logic and + # assume this is true + return True + elif self.negate: + character_set = { + i + for cr in self.character_ranges + for i in range(cr.cr_min, cr.cr_max + 1) + } + other_character_set = { + i + for cr in other_character_range.character_ranges + for i in range(cr.cr_min, cr.cr_max + 1) + } + return bool(other_character_set - character_set) + elif other_character_range.negate: + character_set = { + i + for cr in self.character_ranges + for i in range(cr.cr_min, cr.cr_max + 1) + } + other_character_set = { + i + for cr in other_character_range.character_ranges + for i in range(cr.cr_min, cr.cr_max + 1) + } + return bool(character_set - other_character_set) + + return any( + cr1.cr_min <= cr2.cr_min <= cr1.cr_max + or cr1.cr_min <= cr2.cr_max <= cr1.cr_max + for cr1, cr2 in itertools.product( + self.character_ranges, other_character_range.character_ranges + ) + ) + + def __repr__(self) -> str: + return "<{} - negate={} ranges={}>".format( + self.__class__.__name__, + self.negate, + ", ".join( + str((cr.cr_min, cr.cr_max)) for cr in self.character_ranges + ), + ) + + +def optional_repeat(node: OpNode): + if node.op not in parser._REPEATCODES: + return False + + repeat_min, _ = node.args + + return repeat_min == 0 + + +def large_repeat(node: OpNode): + if node.op not in parser._REPEATCODES: + return False + + _, repeat_max = node.args + + # Repetition sizes that cause catastrophic backtracking depend on many + # factors including subject length, machine hardware, and the repetition + # size itself. This value was mostly arbitrarily chosen after running a + # few basic catastrophic cases. We may consider making it configurable + # in the future. + large_max = 10 + + return ( + # e.g. '{min,}', '+', '*' + repeat_max is constants.MAXREPEAT + or repeat_max >= large_max + ) + + +def max_nested_quantifiers(node: OpNode): + if not node.children: + return 0 + + child_max = max(max_nested_quantifiers(child) for child in node.children) + is_catastrophic = int(large_repeat(node) and node.backtrackable) + + return is_catastrophic + child_max + + +def inclusive_alternation_branch(branch_node: OpNode): + character_ranges = ( + CharacterRange.from_op_node(node) for node in branch_node.children + ) + return any( + cr1.overlap(cr2) + for cr1, cr2 in itertools.combinations( + filter(None, character_ranges), 2 + ) + ) + + +def _mutually_inclusive_alternation_helper( + node: OpNode, nested_quantifier: bool +): + if not node.children: + return False + + nested_quantifier = nested_quantifier or large_repeat(node) + + inclusive_alternation = False + if node.op is constants.BRANCH: + inclusive_alternation = inclusive_alternation_branch(node) + + return any( + (nested_quantifier and inclusive_alternation and node.backtrackable) + or _mutually_inclusive_alternation_helper(child, nested_quantifier) + for child in node.children + ) + + +def mutually_inclusive_alternation(node: OpNode): + return _mutually_inclusive_alternation_helper(node, False) + + +def catastrophic(pattern: str) -> bool: + try: + subpattern = parser.parse(pattern) + except constants.error: + return False + + root = OpNode(None, ()) + + build_op_tree(root, subpattern) + nested_quantifiers = max_nested_quantifiers(root) > 1 + alternation = mutually_inclusive_alternation(root) + + return any([nested_quantifiers, alternation]) diff --git a/precli/rules/python/stdlib/re_denial_of_service.py b/precli/rules/python/stdlib/re_denial_of_service.py new file mode 100644 index 00000000..47e6da93 --- /dev/null +++ b/precli/rules/python/stdlib/re_denial_of_service.py @@ -0,0 +1,100 @@ +# Copyright 2024 Secure Saurce LLC +r""" +# Inefficient Regular Expression Complexity in `re` Module + +Patterns in Python's re module that are susceptible to catastrophic +backtracking. Such patterns can lead to performance issues and may cause +a Denial-of-Service (DoS) condition in applications by consuming an +excessive amount of CPU time on certain inputs Vulnerability Explanation + +Catastrophic backtracking occurs in regex evaluation when the engine +tries to match complex patterns that contain nested quantifiers or +ambiguous constructs. In certain cases, especially with maliciously +crafted input, this can lead to an exponential number of combinations +being checked, severely impacting application performance and potentially +causing it to hang or crash. + +## Examples + +```python +import re + + +pattern = re.compile('(a+)+$') +result = pattern.match('aaaaaaaaaaaaaaaaaaaa!') +``` + +## Remediation + +When using Python's re module to compile or match regular expressions, ensure +that patterns are designed to avoid ambiguous repetition and nested +quantifiers that can cause catastrophic backtracking. Regular expressions +should be reviewed and tested for efficiency and resistance to DoS attacks. + +```python +import re + + +pattern = re.compile('a+$') +result = pattern.match('aaaaaaaaaaaaaaaaaaaa!') +``` + +## See also + +- [re — Regular expression operations](https://docs.python.org/3/library/re.html) +- [Regular expression Denial of Service - ReDoS OWASP Foundation](https://owasp.org/www-community/attacks/Regular_expression_Denial_of_Service_-_ReDoS) +- [CWE-1333: Inefficient Regular Expression Complexity](https://cwe.mitre.org/data/definitions/1333.html) + +_New in version 0.3.14_ + +""" # noqa: E501 +from precli.core import redos +from precli.core.config import Config +from precli.core.level import Level +from precli.core.location import Location +from precli.core.result import Result +from precli.rules import Rule + + +class ReDenialOfService(Rule): + def __init__(self, id: str): + super().__init__( + id=id, + name="regex_denial_of_service", + description=__doc__, + cwe_id=1333, + message="The call to '{0}'' with regex pattern '{1}'' is " + "susceptible to catastrophic backtracking and may cause " + "performance degradation.", + targets=("call"), + wildcards={}, + config=Config(level=Level.ERROR), + ) + + def analyze(self, context: dict, **kwargs: dict) -> Result: + call = kwargs.get("call") + + if call.name_qualified not in [ + "re.compile", + "re.search", + "re.match", + "re.fullmatch", + "re.split", + "re.findall", + "re.finditer", + "re.sub", + "re.subn", + ]: + return + + arg = call.get_argument(position=0, name="pattern") + pattern = arg.value + if not isinstance(pattern, str): + return + + if redos.catastrophic(pattern) is True: + return Result( + rule_id=self.id, + location=Location(node=arg.node), + message=self.message.format(call.name_qualified, pattern), + ) diff --git a/setup.cfg b/setup.cfg index 7ba44387..5045d35f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -142,5 +142,8 @@ precli.rules.python = # precli/rules/python/stdlib/xmlrpc_server_unrestricted_bind.py PY032 = precli.rules.python.stdlib.xmlrpc_server_unrestricted_bind:XmlrpcServerUnrestrictedBind + # precli/rules/python/stdlib/re_denial_of_service.py + PY033 = precli.rules.python.stdlib.re_denial_of_service:ReDenialOfService + [build_sphinx] all_files = 1 diff --git a/tests/unit/rules/python/stdlib/re/__init__.py b/tests/unit/rules/python/stdlib/re/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/rules/python/stdlib/re/examples/re_compile.py b/tests/unit/rules/python/stdlib/re/examples/re_compile.py new file mode 100644 index 00000000..8dc3863b --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_compile.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 10 +# end_line: 10 +# start_column: 17 +# end_column: 28 +import re + + +IPv6address = r"([A-Fa-f0-9:]+:+)+[A-Fa-f0-9]+" +reg = re.compile(IPv6address) +reg.search("http://[:::::::::::::::::::::::::::::::::::::::]/path") diff --git a/tests/unit/rules/python/stdlib/re/examples/re_compile_good.py b/tests/unit/rules/python/stdlib/re/examples/re_compile_good.py new file mode 100644 index 00000000..ce6c6ee4 --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_compile_good.py @@ -0,0 +1,6 @@ +# level: NONE +import re + +IPv6address = r"([A-Fa-f0-9:]+[:$])[A-Fa-f0-9]{1,4}" +reg = re.compile(IPv6address) +reg.search("http://[:::::::::::::::::::::::::::::::::::::::]/path") diff --git a/tests/unit/rules/python/stdlib/re/examples/re_findall.py b/tests/unit/rules/python/stdlib/re/examples/re_findall.py new file mode 100644 index 00000000..e115056e --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_findall.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 11 +# end_column: 18 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.findall(pattern, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_finditer.py b/tests/unit/rules/python/stdlib/re/examples/re_finditer.py new file mode 100644 index 00000000..76a7934d --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_finditer.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 12 +# end_column: 19 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.finditer(pattern, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_fullmatch.py b/tests/unit/rules/python/stdlib/re/examples/re_fullmatch.py new file mode 100644 index 00000000..5c738687 --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_fullmatch.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 13 +# end_column: 20 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.fullmatch(pattern, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_match.py b/tests/unit/rules/python/stdlib/re/examples/re_match.py new file mode 100644 index 00000000..977a1acb --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_match.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 9 +# end_column: 16 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.match(pattern, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_search.py b/tests/unit/rules/python/stdlib/re/examples/re_search.py new file mode 100644 index 00000000..6e38daa9 --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_search.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 10 +# end_column: 17 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.search(pattern, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_search_good.py b/tests/unit/rules/python/stdlib/re/examples/re_search_good.py new file mode 100644 index 00000000..2b02f2c6 --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_search_good.py @@ -0,0 +1,6 @@ +# level: NONE +import re + + +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.search(r"a+", string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_split.py b/tests/unit/rules/python/stdlib/re/examples/re_split.py new file mode 100644 index 00000000..1b76c75d --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_split.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 9 +# end_column: 16 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.split(pattern, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_sub.py b/tests/unit/rules/python/stdlib/re/examples/re_sub.py new file mode 100644 index 00000000..8ec85f2d --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_sub.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 7 +# end_column: 14 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.sub(pattern, print, string) diff --git a/tests/unit/rules/python/stdlib/re/examples/re_subn.py b/tests/unit/rules/python/stdlib/re/examples/re_subn.py new file mode 100644 index 00000000..bc38b594 --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/examples/re_subn.py @@ -0,0 +1,11 @@ +# level: ERROR +# start_line: 11 +# end_line: 11 +# start_column: 8 +# end_column: 15 +import re + + +pattern = r"(a+)+" +string = "aaaaaaaaaaaaaaaaaaaaaaaa!" +re.subn(pattern, print, string) diff --git a/tests/unit/rules/python/stdlib/re/test_re_denial_of_service.py b/tests/unit/rules/python/stdlib/re/test_re_denial_of_service.py new file mode 100644 index 00000000..c1af2da0 --- /dev/null +++ b/tests/unit/rules/python/stdlib/re/test_re_denial_of_service.py @@ -0,0 +1,55 @@ +# Copyright 2024 Secure Saurce LLC +import os + +from parameterized import parameterized + +from precli.core.level import Level +from precli.parsers import python +from precli.rules import Rule +from tests.unit.rules import test_case + + +class ReDenialOfService(test_case.TestCase): + def setUp(self): + super().setUp() + self.rule_id = "PY033" + self.parser = python.Python() + self.base_path = os.path.join( + "tests", + "unit", + "rules", + "python", + "stdlib", + "re", + "examples", + ) + + def test_rule_meta(self): + rule = Rule.get_by_id(self.rule_id) + self.assertEqual(self.rule_id, rule.id) + self.assertEqual("regex_denial_of_service", rule.name) + self.assertEqual( + f"https://docs.securesauce.dev/rules/{self.rule_id}", rule.help_url + ) + self.assertEqual(True, rule.default_config.enabled) + self.assertEqual(Level.ERROR, rule.default_config.level) + self.assertEqual(-1.0, rule.default_config.rank) + self.assertEqual("1333", rule.cwe.cwe_id) + + @parameterized.expand( + [ + "re_compile.py", + "re_compile_good.py", + "re_search.py", + "re_search_good.py", + "re_match.py", + "re_fullmatch.py", + "re_split.py", + "re_findall.py", + "re_finditer.py", + "re_sub.py", + "re_subn.py", + ] + ) + def test(self, filename): + self.check(filename)