From c309f7fa6a62af2aea826aaeee82dbfec96d7bba Mon Sep 17 00:00:00 2001 From: MeditationDuck Date: Sun, 15 Dec 2024 00:27:40 +0100 Subject: [PATCH] :construction: remove pickle. other than --random-state option are removed. need to think about random state input --- wake/cli/test.py | 79 +++------ wake/development/globals.py | 6 +- wake/testing/fuzzing/fuzz_shrink.py | 161 ++++++++++-------- wake/testing/fuzzing/fuzz_test.py | 4 +- wake/testing/pytest_plugin_multiprocess.py | 55 ++++-- .../pytest_plugin_multiprocess_server.py | 1 + wake/testing/pytest_plugin_single.py | 64 ++++--- 7 files changed, 192 insertions(+), 178 deletions(-) diff --git a/wake/cli/test.py b/wake/cli/test.py index 6a692ada..d1759461 100644 --- a/wake/cli/test.py +++ b/wake/cli/test.py @@ -240,6 +240,7 @@ def run_test( set_shrank_path, ) from wake.testing.pytest_plugin_single import PytestWakePluginSingle + import json def get_single_test_path(args: list[str]) -> tuple[bool, str | None]: has_path = False @@ -254,48 +255,13 @@ def get_single_test_path(args: list[str]) -> tuple[bool, str | None]: path = arg return has_path, path - def extract_test_path(crash_log_file_path: Path) -> str: - if crash_log_file_path is not None: - with open(crash_log_file_path, "r") as file: - for line in file: - if "Current test file" in line: - # Extract the part after the colon - parts = line.split(":") - if len(parts) == 2: - return parts[1].strip() - raise ValueError("Unexpected file format") - - def extract_executed_flow_number(crash_log_file_path: Path) -> int: - if crash_log_file_path is not None: - with open(crash_log_file_path, "r") as file: - for line in file: - if "executed flow number" in line: - # Extract the number after the colon - parts = line.split(":") - if len(parts) == 2: - try: - executed_flow_number = int(parts[1].strip()) - return executed_flow_number - except ValueError: - pass # Handle the case where the value after ":" is not an integer - raise ValueError("Unexpected file format") - - def extract_internal_state(crash_log_file_path: Path) -> bytes: - if crash_log_file_path is not None: + def extract_crash_log_dict(crash_log_file_path: Path) -> dict: + try: with open(crash_log_file_path, "r") as file: - for line in file: - if "Internal state of beginning of sequence" in line: - # Extract the part after the colon - parts = line.split(":") - if len(parts) == 2: - hex_string = parts[1].strip() - try: - # Convert the hex string to bytes - internal_state_bytes = bytes.fromhex(hex_string) - return internal_state_bytes - except ValueError: - pass # Handle the case where the value after ":" is not a valid hex string - raise ValueError("Unexpected file format") + crash_log_dict = json.load(file) + return crash_log_dict + except json.JSONDecodeError: + raise ValueError(f"Invalid JSON format in crash log file: {crash_log_file_path}") def get_shrink_argument_path(shrink_path_str: str) -> Path: try: @@ -315,7 +281,7 @@ def get_shrink_argument_path(shrink_path_str: str) -> Path: ) index = int(shrink_path_str) crash_logs = sorted( - crash_logs_dir.glob("*.txt"), key=os.path.getmtime, reverse=True + crash_logs_dir.glob("*.json"), key=os.path.getmtime, reverse=True ) if abs(index) > len(crash_logs): raise click.BadParameter(f"Invalid crash log index: {index}") @@ -350,15 +316,17 @@ def get_shrank_argument_path(shrank_path_str: str) -> Path: "Both shrink and shrieked cannot be provided at the same time." ) + pytest_path_specified, test_path = get_single_test_path(pytest_args) + + if shrink is not None: set_fuzz_mode(1) - pytest_path_specified, test_path = get_single_test_path(pytest_args) shrink_crash_path = get_shrink_argument_path(shrink) - path = extract_test_path(shrink_crash_path) - number = extract_executed_flow_number(shrink_crash_path) - set_error_flow_num(number) - beginning_random_state_bytes = extract_internal_state(shrink_crash_path) - set_sequence_initial_internal_state(beginning_random_state_bytes) + print("shrink from crash log: ", shrink_crash_path) + crash_log_dict = extract_crash_log_dict(shrink_crash_path) + path = crash_log_dict["test_file"] + set_error_flow_num(crash_log_dict["crash_flow_number"]) + set_sequence_initial_internal_state(crash_log_dict["initial_random_state"]) if pytest_path_specified: assert ( path == test_path @@ -367,20 +335,11 @@ def get_shrank_argument_path(shrank_path_str: str) -> Path: pytest_args.insert(0, path) if shrank: - import pickle - from wake.testing.fuzzing.fuzz_shrink import ShrankInfoFile - set_fuzz_mode(2) - pytest_path_specified, test_path = get_single_test_path(pytest_args) shrank_data_path = get_shrank_argument_path(shrank) - - set_fuzz_mode(2) - pytest_path_specified, test_path = get_single_test_path(pytest_args) - shrank_data_path = get_shrank_argument_path(shrank) - - with open(shrank_data_path, "rb") as f: - store_data: ShrankInfoFile = pickle.load(f) - target_fuzz_path = store_data.target_fuzz_path + print("shrank from shrank data: ", shrank_data_path) + with open(shrank_data_path, "r") as f: + target_fuzz_path = json.load(f)["target_fuzz_path"] if pytest_path_specified: assert ( target_fuzz_path == test_path diff --git a/wake/development/globals.py b/wake/development/globals.py index cc804899..773e4c15 100644 --- a/wake/development/globals.py +++ b/wake/development/globals.py @@ -50,7 +50,7 @@ ] = None _exception_handled = False -_initial_internal_state: bytes = b"" +_initial_internal_state: dict = {} _coverage_handler: Optional[CoverageHandler] = None @@ -161,11 +161,11 @@ def set_exception_handler( global _exception_handler _exception_handler = handler -def set_sequence_initial_internal_state(intenral_state: bytes): +def set_sequence_initial_internal_state(intenral_state: dict): global _initial_internal_state _initial_internal_state = intenral_state -def get_sequence_initial_internal_state() -> bytes: +def get_sequence_initial_internal_state() -> dict: return _initial_internal_state def reset_exception_handled(): diff --git a/wake/testing/fuzzing/fuzz_shrink.py b/wake/testing/fuzzing/fuzz_shrink.py index 11d25ca7..a0866c6e 100644 --- a/wake/testing/fuzzing/fuzz_shrink.py +++ b/wake/testing/fuzzing/fuzz_shrink.py @@ -2,6 +2,7 @@ from collections import defaultdict +import json from typing import Callable, DefaultDict, List, Optional, Any, Tuple from typing_extensions import get_type_hints @@ -27,7 +28,6 @@ ) from wake.development.core import Chain -import pickle from pathlib import Path from datetime import datetime, timedelta @@ -118,6 +118,23 @@ def compare_exceptions(e1, e2): return True +def serialize_random_state(state: tuple[int, tuple[int, ...], float | None]) -> dict: + """Convert random state to JSON-serializable format""" + version, state_tuple, gauss = state + return { + "version": version, + "state_tuple": list(state_tuple), # convert tuple to list for JSON + "gauss": gauss + } + +def deserialize_random_state(state_dict: dict) -> tuple[int, tuple[int, ...], float | None]: + """Convert JSON format back to random state tuple""" + return ( + state_dict["version"], + tuple(state_dict["state_tuple"]), # convert list back to tuple + state_dict["gauss"] + ) + class StateSnapShot: _python_state: FuzzTest | None chain_states: List[str] @@ -189,33 +206,60 @@ class OverRunException(Exception): def __init__(self): super().__init__("Overrun") - @dataclass -class FlowState: - random_state: bytes +class ReproducibleFlowState: + random_state: tuple[int, tuple[int, ...], float | None] flow_num: int flow_name: str - flow: Callable # Store the function itself - flow_params: List[Any] # Store the list of arguments - required: bool = True - before_inv_random_state: bytes = b"" - + flow_params: List[Any] + + def to_dict(self): + return { + "random_state": serialize_random_state(self.random_state), + "flow_num": self.flow_num, + "flow_name": self.flow_name, + "flow_params": self.flow_params, + } + + @classmethod + def from_dict(cls, data): + return cls( + random_state=deserialize_random_state(data["random_state"]), + flow_num=data["flow_num"], + flow_name=data["flow_name"], + flow_params=data["flow_params"], + ) @dataclass -class FlowStateForFile: - random_state: bytes - flow_num: int - flow_name: str - flow_params: List[Any] # Store the list of arguments - required: bool = True - before_inv_random_state: bytes = b"" - +class FlowState(ReproducibleFlowState): + flow: Callable # Runtime-only field for analysis + required: bool = True # Runtime-only field for analysis @dataclass class ShrankInfoFile: target_fuzz_path: str - initial_state: bytes - required_flows: List[FlowStateForFile] + initial_state: dict + required_flows: List[ReproducibleFlowState] + + def to_dict(self): + return { + "target_fuzz_path": self.target_fuzz_path, + "initial_state": self.initial_state, + "required_flows": [ + flow.to_dict() for flow in self.required_flows + ] + } + + @classmethod + def from_dict(cls, data): + return cls( + target_fuzz_path=data["target_fuzz_path"], + initial_state=data["initial_state"], + required_flows=[ + ReproducibleFlowState.from_dict(flow) + for flow in data["required_flows"] + ] + ) @contextmanager @@ -260,10 +304,12 @@ def shrank_reproduce(test_class: type[FuzzTest], dry_run: bool = False): shrank_path = get_shrank_path() if shrank_path is None: raise Exception("Shrunken data file path not found") - with open(shrank_path, "rb") as f: - store_data: ShrankInfoFile = pickle.load(f) + # read shrank json file + with open(shrank_path, "r") as f: + serialized_shrank_info = f.read() + store_data: ShrankInfoFile = ShrankInfoFile.from_dict(json.loads(serialized_shrank_info)) - random.setstate(pickle.loads(store_data.initial_state)) + random.setstate(deserialize_random_state(store_data.initial_state)) test_instance._flow_num = 0 test_instance._sequence_num = 0 test_instance.pre_sequence() @@ -286,18 +332,11 @@ def shrank_reproduce(test_class: type[FuzzTest], dry_run: bool = False): if not hasattr(flow, "precondition") or getattr(flow, "precondition")( test_instance ): - random.setstate(pickle.loads(store_data.required_flows[j].random_state)) + random.setstate(store_data.required_flows[j].random_state) test_instance.pre_flow(flow) flow(test_instance, *flow_params) test_instance.post_flow(flow) - try: - random.setstate( - pickle.loads(store_data.required_flows[j].before_inv_random_state) - ) - except Exception as e: - pass - if not dry_run: test_instance.pre_invariants() for inv in invariants: @@ -312,7 +351,6 @@ def shrank_reproduce(test_class: type[FuzzTest], dry_run: bool = False): print("Shrunken test passed") - def shrink_collecting_phase( test_instance: FuzzTest, flows, @@ -320,6 +358,7 @@ def shrink_collecting_phase( flow_states: List[FlowState], chains: Tuple[Chain, ...], flows_count: int, + initial_state: Any, ) -> Tuple[Exception, timedelta]: data_time = datetime.now() flows_counter: DefaultDict[Callable, int] = defaultdict(int) @@ -327,9 +366,7 @@ def shrink_collecting_phase( # Snapshot all connected chains initial_chain_state_snapshots = [chain.snapshot() for chain in chains] error_flow_num = get_error_flow_num() # argument - initial_state = get_sequence_initial_internal_state() # argument - - random.setstate(pickle.loads(initial_state)) + random.setstate(initial_state) with print_ignore(): test_instance._flow_num = 0 test_instance.pre_sequence() @@ -377,14 +414,14 @@ def shrink_collecting_phase( if k != "return" ] - random_state = pickle.dumps(random.getstate()) + random_state = random.getstate() flow_states.append( FlowState( random_state=random_state, flow_name=flow.__name__, - flow=flow, flow_params=flow_params, flow_num=j, + flow=flow, ) ) @@ -394,8 +431,6 @@ def shrink_collecting_phase( flows_counter[flow] += 1 test_instance.post_flow(flow) - flow_states[j].before_inv_random_state = pickle.dumps(random.getstate()) - test_instance.pre_invariants() for inv in invariants: if invariant_periods[inv] == 0: @@ -424,6 +459,8 @@ def shrink_collecting_phase( second_time = datetime.now() time_spent = second_time - data_time print("Time spent for one fuzz test: ", time_spent) + + print("exception_content: ", exception_content) assert exception_content is not None return exception_content, time_spent @@ -433,6 +470,8 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): print( "Fuzz test shrink start! First of all, collect random/flow information!!! >_<" ) + initial_state: tuple[int, tuple[int, ...], float | None] = deserialize_random_state(get_sequence_initial_internal_state()) + shrink_start_time = datetime.now() print("Start time: ", shrink_start_time) test_instance = test_class() @@ -446,7 +485,7 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): raise Exception("Flow number is less than 1, not supported for shrinking") exception_content, time_spent_for_one_fuzz = shrink_collecting_phase( - test_instance, flows, invariants, flow_states, chains, flows_count + test_instance, flows, invariants, flow_states, chains, flows_count, initial_state ) print( "Estimated completion time for shrinking:", @@ -455,7 +494,7 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): ) # estimate around half of flow is not related to the error print("Starting shrinking") - random.setstate(pickle.loads(get_sequence_initial_internal_state())) + random.setstate(initial_state) # ignore print for pre_sequence logging with print_ignore(): test_instance._flow_num = 0 @@ -532,7 +571,7 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): raise OverRunException() curr_flow_state = flow_states[j] - random.setstate(pickle.loads(curr_flow_state.random_state)) + random.setstate(curr_flow_state.random_state) flow = curr_flow_state.flow flow_params = curr_flow_state.flow_params test_instance._flow_num = j @@ -544,11 +583,6 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): flow(test_instance, *flow_params) test_instance.post_flow(flow) - if curr_flow_state.before_inv_random_state != b"": - random.setstate( - pickle.loads(curr_flow_state.before_inv_random_state) - ) - test_instance.pre_invariants() if ( not ONLY_TARGET_INVARIANTS @@ -671,7 +705,7 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): print("flow: ", j, flow_states[j].flow.__name__) curr_flow_state = flow_states[j] - random.setstate(pickle.loads(curr_flow_state.random_state)) + random.setstate(curr_flow_state.random_state) flow = curr_flow_state.flow flow_params = curr_flow_state.flow_params @@ -684,11 +718,6 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): flow(test_instance, *flow_params) test_instance.post_flow(flow) - if curr_flow_state.before_inv_random_state != b"": - random.setstate( - pickle.loads(curr_flow_state.before_inv_random_state) - ) - test_instance.pre_invariants() if (not ONLY_TARGET_INVARIANTS and flow_states[j].required) or ( ONLY_TARGET_INVARIANTS and j == error_flow_num @@ -795,7 +824,7 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): # write crash log file. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Assuming `call.execinfo` contains the crash information - crash_log_file = crash_logs_dir / f"{timestamp}.bin" + shrank_file = crash_logs_dir / f"{timestamp}.json" import inspect @@ -805,30 +834,19 @@ def shrink_test(test_class: type[FuzzTest], flows_count: int): relative_test_path = os.path.relpath(current_file_path, project_root_path) # initial_state - required_flows: List[FlowStateForFile] = [] - for i in range(len(flow_states)): - if flow_states[i].required: - required_flows.append( - FlowStateForFile( - random_state=flow_states[i].random_state, - flow_num=flow_states[i].flow_num, - flow_name=flow_states[i].flow_name, - # ignore flow_states[i].flow - flow_params=flow_states[i].flow_params, - required=flow_states[i].required, - before_inv_random_state=flow_states[i].before_inv_random_state, - ) - ) + required_flows: List[ReproducibleFlowState] = [flow_states[i] for i in range(len(flow_states)) if flow_states[i].required] store_data: ShrankInfoFile = ShrankInfoFile( target_fuzz_path=relative_test_path, initial_state=get_sequence_initial_internal_state(), required_flows=required_flows, ) + + import json # Write to a JSON file - with open(crash_log_file, "wb") as f: - pickle.dump(store_data, f) - print(f"Shrunken data file written to {crash_log_file}") + with open(shrank_file, "w") as f: + json.dump(store_data.to_dict(), f, indent=2) + print(f"Shrunken data file written to {shrank_file}") def single_fuzz_test( @@ -855,7 +873,8 @@ def single_fuzz_test( # Snapshot all connected chains snapshots = [chain.snapshot() for chain in chains] - set_sequence_initial_internal_state(pickle.dumps(random.getstate())) + state = serialize_random_state(random.getstate()) + set_sequence_initial_internal_state(state) test_instance._flow_num = 0 test_instance._sequence_num = i diff --git a/wake/testing/fuzzing/fuzz_test.py b/wake/testing/fuzzing/fuzz_test.py index 23d01671..e7db1028 100644 --- a/wake/testing/fuzzing/fuzz_test.py +++ b/wake/testing/fuzzing/fuzz_test.py @@ -43,14 +43,14 @@ def flow_num(self): @classmethod def run( - self, + cls, sequences_count: int, flows_count: int, *, dry_run: bool = False, ): from .fuzz_shrink import fuzz_shrink - fuzz_shrink(self, sequences_count, flows_count, dry_run) + fuzz_shrink(cls, sequences_count, flows_count, dry_run) def pre_sequence(self) -> None: pass diff --git a/wake/testing/pytest_plugin_multiprocess.py b/wake/testing/pytest_plugin_multiprocess.py index 5defdfc6..0f812f37 100644 --- a/wake/testing/pytest_plugin_multiprocess.py +++ b/wake/testing/pytest_plugin_multiprocess.py @@ -17,6 +17,7 @@ from tblib import pickling_support from wake.cli.console import console +from wake.config.wake_config import WakeConfig from wake.development.globals import ( attach_debugger, chain_interfaces_manager, @@ -32,15 +33,13 @@ from wake.utils.tee import StderrTee, StdoutTee from wake.testing.custom_pdb import CustomPdb -from datetime import datetime -import rich.traceback -from rich.console import Console class PytestWakePluginMultiprocess: _index: int _conn: multiprocessing.connection.Connection + _config: WakeConfig _coverage: Optional[CoverageHandler] _log_file: Path _crash_log_file: Path @@ -58,6 +57,7 @@ def __init__( index: int, conn: multiprocessing.connection.Connection, queue: multiprocessing.Queue, + config: WakeConfig, coverage: Optional[CoverageHandler], log_dir: Path, crash_log_dir: Path, @@ -68,6 +68,7 @@ def __init__( ): self._conn = conn self._index = index + self._config = config self._queue = queue self._coverage = coverage self._log_file = log_dir / sanitize_filename(f"process-{index}.ansi") @@ -165,6 +166,8 @@ def pytest_internalerror( self._queue.put(("pytest_internalerror", self._index, pickled), block=True) def pytest_exception_interact(self, node, call, report): + import json + from datetime import datetime if self._debug and not self._exception_handled: self._exception_handler( call.excinfo.type, call.excinfo.value, call.excinfo.tb @@ -173,22 +176,40 @@ def pytest_exception_interact(self, node, call, report): state = get_sequence_initial_internal_state() timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - crash_log_file = self._crash_log_dir / F"crash_log_{timestamp}.txt" - - relative_path = os.path.relpath(node.fspath, self._config.project_root_path) + crash_log_file = self._crash_log_dir / F"{timestamp}.json" + + # Find the test file in the traceback that's within project root + tb = call.excinfo.tb + test_file_path = None + while tb: + filename = tb.tb_frame.f_code.co_filename + try: + # Check if the file is within project root + relative = os.path.relpath(filename, self._config.project_root_path) + if not relative.startswith('..') and filename.endswith('.py'): + test_file_path = relative + break + except ValueError: + # relpath raises ValueError if paths are on different drives + pass + if hasattr(tb, 'tb_next'): + tb = tb.tb_next + + if test_file_path is None: + test_file_path = node.fspath # fallback to node's path if no test file found + + crash_data = { + "test_file": test_file_path, + "crash_flow_number": get_error_flow_num(), + "exception_content": { + "type": str(call.excinfo.type), + "value": str(call.excinfo.value), + }, + "initial_random_state": state, + } with crash_log_file.open('w') as f: - f.write(f"Current test file: {relative_path}\n") - f.write(f"executed flow number : {get_error_flow_num()}\n") - f.write(f"Internal state of beginning of sequence : {state.hex()}\n") - f.write(f"Assertion type: {call.excinfo.type}\n") - f.write(f"Assertion value: {call.excinfo.value}\n") - # Create the rich traceback object - rich_tb = rich.traceback.Traceback.from_exception( - call.excinfo.type, call.excinfo.value, call.excinfo.tb - ) - file_console = Console(file=f, force_terminal=False) - file_console.print(rich_tb) + json.dump(crash_data, f, indent=2) def pytest_runtestloop(self, session: Session): if ( diff --git a/wake/testing/pytest_plugin_multiprocess_server.py b/wake/testing/pytest_plugin_multiprocess_server.py index d87c9534..2f4c9fbf 100644 --- a/wake/testing/pytest_plugin_multiprocess_server.py +++ b/wake/testing/pytest_plugin_multiprocess_server.py @@ -98,6 +98,7 @@ def pytest_sessionstart(self, session: pytest.Session): i, child_conn, # pyright: ignore reportGeneralTypeIssues self._queue, + self._config, empty_coverage if i < self._coverage else None, logs_dir, crash_logs_process_dir, diff --git a/wake/testing/pytest_plugin_single.py b/wake/testing/pytest_plugin_single.py index 2f33d6bb..09f5a135 100644 --- a/wake/testing/pytest_plugin_single.py +++ b/wake/testing/pytest_plugin_single.py @@ -1,12 +1,9 @@ from functools import partial +import os from typing import Iterable, List, Optional from pytest import Session -from datetime import datetime -import rich.traceback -from rich.console import Console - from wake.cli.console import console from wake.config import WakeConfig from wake.development.globals import ( @@ -53,6 +50,10 @@ def pytest_runtest_setup(self, item): reset_exception_handled() def pytest_exception_interact(self, node, call, report): + import json + from datetime import datetime + import os + if self._debug: attach_debugger( call.excinfo.type, @@ -61,11 +62,10 @@ def pytest_exception_interact(self, node, call, report): seed=self._random_seeds[0], ) - import os if get_fuzz_mode() != 0: return - state = get_sequence_initial_internal_state() - if state == b"": + random_state_dict = get_sequence_initial_internal_state() + if random_state_dict == {}: return crash_logs_dir = self._config.project_root_path / ".wake" / "logs" / "crashes" # shutil.rmtree(crash_logs_dir, ignore_errors=True) @@ -73,25 +73,39 @@ def pytest_exception_interact(self, node, call, report): # write crash log file. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Assuming `call.execinfo` contains the crash information - crash_log_file = crash_logs_dir / F"{timestamp}.txt" - - relative_path = os.path.relpath(node.fspath, self._config.project_root_path) - - - # Write contents to the crash log file + crash_log_file = crash_logs_dir / F"{timestamp}.json" + + # Find the test file in the traceback that's within project root + tb = call.excinfo.tb + test_file_path = None + while tb: + filename = tb.tb_frame.f_code.co_filename + try: + # Check if the file is within project root + relative = os.path.relpath(filename, self._config.project_root_path) + if not relative.startswith('..') and filename.endswith('.py'): + test_file_path = relative + break + except ValueError: + # relpath raises ValueError if paths are on different drives + pass + if hasattr(tb, 'tb_next'): + tb = tb.tb_next + + if test_file_path is None: + test_file_path = node.fspath # fallback to node's path if no test file found + + crash_data = { + "test_file": test_file_path, + "crash_flow_number": get_error_flow_num(), + "exception_content": { + "type": str(call.excinfo.type), + "value": str(call.excinfo.value), + }, + "initial_random_state": random_state_dict, + } with crash_log_file.open('w') as f: - f.write(f"Current test file: {relative_path}\n") - f.write(f"executed flow number : {get_error_flow_num()}\n") - f.write(f"Internal state of beginning of sequence : {state.hex()}\n") - f.write(f"Assertion type: {call.excinfo.type}\n") - f.write(f"Assertion value: {call.excinfo.value}\n") - # Create the rich traceback object - rich_tb = rich.traceback.Traceback.from_exception( - call.excinfo.type, call.excinfo.value, call.excinfo.tb - ) - file_console = Console(file=f, force_terminal=False) - file_console.print(rich_tb) - + json.dump(crash_data, f, indent=2) console.print(f"Crash log written to {crash_log_file}")