From 52fca5bb98b9732900af8724cfc4049a80adcd79 Mon Sep 17 00:00:00 2001 From: MeditationDuck Date: Tue, 10 Dec 2024 17:45:46 +0100 Subject: [PATCH] :sparkles: remove developping feature and update comments --- wake/cli/test.py | 34 +---- wake/testing/fuzzing/fuzz_shrink.py | 222 ++-------------------------- 2 files changed, 15 insertions(+), 241 deletions(-) diff --git a/wake/cli/test.py b/wake/cli/test.py index 178283a4..aa095b88 100644 --- a/wake/cli/test.py +++ b/wake/cli/test.py @@ -120,16 +120,6 @@ def shell_complete( default=None, required=False, ) -@click.option( - "-SF", - "--step-flow", - type=str, - help="Path to the crash log file.", - is_flag=False, - flag_value="0", - default=None, - required=False, -) @click.argument("paths_or_pytest_args", nargs=-1, type=FileAndPassParamType()) @click.pass_context def run_test( @@ -145,7 +135,6 @@ def run_test( verbosity: int, shrink: Optional[str], shrank: Optional[str], - step_flow: Optional[str], paths_or_pytest_args: Tuple[str, ...], ) -> None: """Execute Wake tests using pytest.""" @@ -361,27 +350,10 @@ def get_shrank_argument_path(shrank_path_str: str) -> Path: "Both shrink and shrieked cannot be provided at the same time." ) - if step_flow is not None and shrank is not None: - raise click.BadParameter( - "Both step-flow and shrank cannot be provided at the same time." - ) - - if step_flow and shrink is not None: - raise click.BadParameter( - "Both step-flow and shrink cannot be provided at the same time." - ) - - if shrink is not None or step_flow is not None: - if step_flow: - set_fuzz_mode(3) - argument_path = step_flow - else: - assert shrink is not None, "Shrink must be provided when step-flow is not used" - set_fuzz_mode(1) - argument_path = shrink - + if shrink is not None: + set_fuzz_mode(1) pytest_path_specified, test_path = get_single_test_path(pytest_args) - shrink_crash_path = get_shrink_argument_path(argument_path) + shrink_crash_path = get_shrink_argument_path(shrink) path = extract_test_path(shrink_crash_path) number = extract_executed_flow_number(shrink_crash_path) set_error_flow_num(number) diff --git a/wake/testing/fuzzing/fuzz_shrink.py b/wake/testing/fuzzing/fuzz_shrink.py index 67503a16..11d25ca7 100644 --- a/wake/testing/fuzzing/fuzz_shrink.py +++ b/wake/testing/fuzzing/fuzz_shrink.py @@ -43,14 +43,15 @@ from wake.testing.core import default_chain as global_default_chain -EXACT_FLOW_INDEX = False # False if you accept it could reproduce same error earlier. +EXACT_FLOW_INDEX = False # Set True if you accept the same error that happened earlier than the crash log. -EXECT_EXCEPTION_MATCH = False # False if you accept the same kind of error. -# The meaining of the same kind of error is that -# If the error was in transaction, Same Error is emit and ignore arguments value. except for Error with only message, we compare the message. -# If the error was in test like assertion error, we care the file and exception line in python code. +EXACT_EXCEPTION_MATCH = False # True if you do not accept the same kind of error but different arguments. +# The meaning of the same kind of error is that +# # If the Error was in the transaction, the same error emits and ignores the argument's value. Except for errors with only the message, we compare the message. +# # If the Error was in a test like an assertion error, we care about the file and exception line in Python code. -ONLY_TARGET_INVARIANTS = False # True if you want to Care only target invariants. +ONLY_TARGET_INVARIANTS = False # True if you want to check only target invariants. +# True makes one fuzz test run faster. but it may lose chance to shortcut that finding the same error earlier. def __get_methods(target, attr: str) -> List[Callable]: @@ -70,7 +71,7 @@ def clear_previous_lines(num_lines): def compare_exceptions(e1, e2): - if EXECT_EXCEPTION_MATCH: + if EXACT_EXCEPTION_MATCH: if e1 == e2: return True else: @@ -247,8 +248,6 @@ def fuzz_shrink( shrink_test(test_class, flows_count) elif fuzz_mode == 2: shrank_reproduce(test_class, dry_run) - elif fuzz_mode == 3: - flow_step_execution(test_class, flows_count) else: raise Exception("Invalid fuzz mode") @@ -428,206 +427,6 @@ def shrink_collecting_phase( assert exception_content is not None return exception_content, time_spent - -def flow_step_execution(test_class: type[FuzzTest], flows_count: int): - error_flow_num = get_error_flow_num() - user_number = input(f"SNAPSHOT FLOW NUMBER ({error_flow_num}) >") - if user_number != "": - error_flow_num = int(user_number) - - assert error_flow_num != flows_count, "Does not support post sequence, comming soon" - test_instance = test_class() - chains = get_connected_chains() - flows: List[Callable] = __get_methods(test_instance, "flow") - invariants: List[Callable] = __get_methods(test_instance, "invariant") - flows_counter: DefaultDict[Callable, int] = defaultdict(int) - invariant_periods: DefaultDict[Callable[[None], None], int] = defaultdict(int) - print("Shrinking flow length: ", error_flow_num) - - if error_flow_num < 1: - raise Exception("Flow number is less than 1, not supported for shrinking") - - random.setstate(pickle.loads(get_sequence_initial_internal_state())) - # ignore print for pre_sequence logging - test_instance._flow_num = 0 - test_instance.pre_sequence() - try: - for j in range(error_flow_num): - valid_flows = [ - f - for f in flows - if ( - not hasattr(f, "max_times") - or flows_counter[f] < getattr(f, "max_times") - ) - and ( - not hasattr(f, "precondition") - or getattr(f, "precondition")(test_instance) - ) - ] - weights = [getattr(f, "weight") for f in valid_flows] - if len(valid_flows) == 0: - max_times_flows = [ - f - for f in flows - if hasattr(f, "max_times") - and flows_counter[f] >= getattr(f, "max_times") - ] - precondition_flows = [ - f - for f in flows - if hasattr(f, "precondition") - and not getattr(f, "precondition")(test_instance) - ] - raise Exception( - f"Could not find a valid flow to run.\nFlows that have reached their max_times: {max_times_flows}\nFlows that do not satisfy their precondition: {precondition_flows}" - ) - - # Pick a flow and generate the parameters - flow = random.choices(valid_flows, weights=weights)[0] - flow_params = [ - generate(v) - for k, v in get_type_hints(flow, include_extras=True).items() - if k != "return" - ] - - test_instance._flow_num = j - test_instance.pre_flow(flow) - flow(test_instance, *flow_params) # Execute the selected flow - flows_counter[flow] += 1 - test_instance.post_flow(flow) - # DO NOT RUN INVARIANTS HERE - # REQUIREMENT: DO NOT CHANGE STATE IN INVARIANTS - except Exception as e: - print(f"MUST NOT FAIL HERE {e}") - raise e - - # take snapshot of previous state!! - states = StateSnapShot() - states.take_snapshot( - test_instance, - test_class(), - chains, - overwrite=False, - random_state=random.getstate(), - ) - error_place = None - j = 0 - for j in range(test_instance._flow_num, flows_count): - try: - valid_flows = [ - f - for f in flows - if ( - not hasattr(f, "max_times") - or flows_counter[f] < getattr(f, "max_times") - ) - and ( - not hasattr(f, "precondition") - or getattr(f, "precondition")(test_instance) - ) - ] - weights = [getattr(f, "weight") for f in valid_flows] - if len(valid_flows) == 0: - max_times_flows = [ - f - for f in flows - if hasattr(f, "max_times") - and flows_counter[f] >= getattr(f, "max_times") - ] - precondition_flows = [ - f - for f in flows - if hasattr(f, "precondition") - and not getattr(f, "precondition")(test_instance) - ] - raise Exception( - f"Could not find a valid flow to run.\nFlows that have reached their max_times: {max_times_flows}\nFlows that do not satisfy their precondition: {precondition_flows}" - ) - - # Pick a flow and generate the parameters - flow = random.choices(valid_flows, weights=weights)[0] - flow_params = [ - generate(v) - for k, v in get_type_hints(flow, include_extras=True).items() - if k != "return" - ] - - test_instance._flow_num = j - test_instance.pre_flow(flow) - commands = [] - commands.append("s") - commands.append("s") - error_place = "flow" - from ipdb.__main__ import _init_pdb - - frame = sys._getframe() # Get the parent frame - p = _init_pdb(commands=["s"]) # Initialize with two step commands - p.set_trace(frame) - # this flow cause error or vioration of invariant. - flow(test_instance, *flow_params) # Execute the selected flow - # After flow execution, I would have option to take snapshot and execute or - # Revert to previous state and execute this flow again. - # Print flow information before execution - - flows_counter[flow] += 1 - error_place = "post_flow" - test_instance.post_flow(flow) - error_place = "pre_invariants" - test_instance.pre_invariants() - error_place = "invariants" - for inv in invariants: - if invariant_periods[inv] == 0: - test_instance.pre_invariant(inv) - inv(test_instance) - test_instance.post_invariant(inv) - - invariant_periods[inv] += 1 - if invariant_periods[inv] == getattr(inv, "period"): - invariant_periods[inv] = 0 - except Exception as e: - exception_content = e - print(f"Error at {error_place} with\n{e}") - finally: - # repeat option till user type valid input - quit = False - while True: - print("Flow Step Execution") - print("Options:") - print("1. take snapshot and continue") - print("2. Revert and repeat current flow") - print("3. run post_sequence and exit") - choice = input("Enter your choice (1-3): ").strip() - if choice == "1": - states.take_snapshot( - test_instance, - test_class(), - chains, - overwrite=True, - random_state=random.getstate(), - ) - break - elif choice == "2": - states.revert(test_instance, chains, with_random_state=True) - states.take_snapshot( - test_instance, - test_class(), - chains, - overwrite=False, - random_state=random.getstate(), - ) - j -= 1 - break - elif choice == "3": - quit = True - break - else: - print("Invalid choice. Please try again.") - if quit: - break - test_instance.post_sequence() - - def shrink_test(test_class: type[FuzzTest], flows_count: int): error_flow_num = get_error_flow_num() # argument actual_error_flow_num = error_flow_num @@ -721,6 +520,8 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): break # since it is sorted, and not required to snapshot, already taken. success = False shortcut = False + j: int = 0 + reason: str = "" with print_ignore(debug=False): try: for j in range(flows_count): @@ -846,7 +647,8 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): success = False shortcut = False - reason = None + reason: str = "" + j: int = 0 with print_ignore(debug=False): try: # Python state and chain state is same as snapshot