diff --git a/scalene/scalene_code_executor.py b/scalene/scalene_code_executor.py new file mode 100644 index 000000000..f64698990 --- /dev/null +++ b/scalene/scalene_code_executor.py @@ -0,0 +1,259 @@ +""" +Code execution and tracing functionality for Scalene profiler. + +This module extracts code execution and tracing functionality from the main Scalene class +to improve code organization and reduce complexity. +""" + +import functools +import os +import pathlib +import re +import sys +import traceback +from typing import Any, Dict, List, Optional, Set + +from scalene.scalene_statistics import Filename, LineNumber +from scalene.scalene_utility import generate_html +from scalene import launchbrowser + + +class ScaleneCodeExecutor: + """Handles code execution and tracing for Scalene.""" + + def __init__(self, args, files_to_profile: Set[Filename], + functions_to_profile: Dict[Filename, Set[Any]], + program_being_profiled: Filename, + program_path: Filename, + entrypoint_dir: Filename): + """Initialize the code executor.""" + self.__args = args + self.__files_to_profile = files_to_profile + self.__functions_to_profile = functions_to_profile + self.__program_being_profiled = program_being_profiled + self.__program_path = program_path + self.__entrypoint_dir = entrypoint_dir + self.__error_message = "Error in program being profiled" + + def profile_code( + self, + code: str, + the_globals: Dict[str, str], + the_locals: Dict[str, str], + left: List[str], + start_func, + stop_func, + output_profile_func, + stats, + last_profiled_tuple_func, + ) -> int: + """Initiate execution and profiling.""" + if self.__args.memory: + from scalene import pywhere # type: ignore + + pywhere.populate_struct() + # If --off is set, tell all children to not profile and stop profiling before we even start. + if "off" not in self.__args or not self.__args.off: + start_func() + # Run the code being profiled. + exit_status = 0 + try: + exec(code, the_globals, the_locals) + except SystemExit as se: + # Intercept sys.exit and propagate the error code. + exit_status = se.code if isinstance(se.code, int) else 1 + except KeyboardInterrupt: + # Cleanly handle keyboard interrupts (quits execution and dumps the profile). + print("Scalene execution interrupted.", file=sys.stderr) + except Exception as e: + print(f"{self.__error_message}:\n", e, file=sys.stderr) + traceback.print_exc() + exit_status = 1 + + finally: + stop_func() + if self.__args.memory: + pywhere.disable_settrace() + pywhere.depopulate_struct() + + # Leaving here in case of reversion + # sys.settrace(None) + (last_file, last_line, _) = last_profiled_tuple_func() + stats.memory_stats.memory_malloc_count[last_file][last_line] += 1 + stats.memory_stats.memory_aggregate_footprint[last_file][ + last_line + ] += stats.memory_stats.memory_current_highwater_mark[last_file][last_line] + # If we've collected any samples, dump them. + did_output = output_profile_func(left) + if not did_output: + print( + "Scalene: The specified code did not run for long enough to profile.", + file=sys.stderr, + ) + # Print out hints to explain why the above message may have been printed. + if not self.__args.profile_all: + print( + "To track the time spent in all files, use the `--profile-all` option.", + file=sys.stderr, + ) + elif self.__args.profile_only or self.__args.profile_exclude: + # if --profile-only or --profile-exclude were + # specified, suggest that the patterns might be + # excluding too many files. Collecting the + # previously filtered out files could allow + # suggested fixes (as in, remove foo because it + # matches too many files). + print( + "The patterns used in `--profile-only` or `--profile-exclude` may be filtering out too many files.", + file=sys.stderr, + ) + else: + # if none of the above cases hold, indicate that + # Scalene can only profile code that runs for at + # least one second or allocates some threshold + # amount of memory. + print( + "Scalene can only profile code that runs for at least one second or allocates at least 10MB.", + file=sys.stderr, + ) + + if not ( + did_output + and self.__args.web + and not self.__args.cli + and not self.__args.is_child + ): + return exit_status + + assert did_output + if self.__args.web or self.__args.html: + profile_filename = self.__args.profile_filename + if self.__args.outfile: + profile_filename = Filename( + os.path.join( + os.path.dirname(self.__args.outfile), + os.path.basename(profile_filename), + ) + ) + # Generate HTML file + # (will also generate a JSON file to be consumed by the HTML) + html_output = generate_html( + profile_filename, + self.__args, + stats, + profile_metadata={}, + program_args=left, + ) + + if self.__args.web and not self.__args.cli and not self.__args.is_child: + launchbrowser.launch_browser(html_output) + + return exit_status + + @staticmethod + @functools.cache + def should_trace(filename: Filename, func: str) -> bool: + """Return true if we should trace this filename and function.""" + # Profile everything in a Jupyter notebook cell. + if re.match(r"", filename): + return True + + if ScaleneCodeExecutor._should_trace_decorated_function(filename, func): + return True + + if not ScaleneCodeExecutor._passes_exclusion_rules(filename): + return False + + if ScaleneCodeExecutor._handle_jupyter_cell(filename): + return True + + if not ScaleneCodeExecutor._passes_profile_only_rules(filename): + return False + + return ScaleneCodeExecutor._should_trace_by_location(filename) + + @staticmethod + def _should_trace_decorated_function(filename: Filename, func: str) -> bool: + """Check if this function is decorated with @profile.""" + # Import here to avoid circular imports + from scalene.scalene_profiler import Scalene + if filename in Scalene._Scalene__files_to_profile: + # If we have specified to profile functions in this file, + # check if this function is one of them. + return func in Scalene._Scalene__functions_to_profile[filename] + return False + + @staticmethod + def _passes_exclusion_rules(filename: Filename) -> bool: + """Check if filename passes exclusion rules (libraries, exclude patterns).""" + # Import here to avoid circular imports + from scalene.scalene_profiler import Scalene + args = Scalene._Scalene__args + + # Don't profile Scalene itself. + if "scalene" in filename: + return False + + # Don't profile Python builtins/standard library. + try: + if not args.profile_all: + if ( + ("python" in filename) + or ("site-packages" in filename) + or ("" in filename) + or (" bool: + """Handle special Jupyter cell profiling.""" + # Check for Jupyter cells + if "" in filename: + return True + + # Profile everything in a Jupyter notebook cell. + if re.match(r"", filename): + return True + + return False + + @staticmethod + def _passes_profile_only_rules(filename: Filename) -> bool: + """Check if filename passes profile-only patterns.""" + from scalene.scalene_profiler import Scalene + args = Scalene._Scalene__args + + if args.profile_only: + for pattern in args.profile_only: + if re.search(pattern, filename): + return True + return False + return True + + @staticmethod + def _should_trace_by_location(filename: Filename) -> bool: + """Determine if we should trace based on file location.""" + from scalene.scalene_profiler import Scalene + + # Check if the file is in our program's directory or a subdirectory. + filename_abs = os.path.abspath(filename) + program_path = os.path.abspath(Scalene._Scalene__program_path) + entrypoint_dir = os.path.abspath(Scalene._Scalene__entrypoint_dir) + + return ( + filename_abs.startswith(program_path) + or filename_abs.startswith(entrypoint_dir) + or os.path.commonpath([filename_abs, program_path]) == program_path + or os.path.commonpath([filename_abs, entrypoint_dir]) == entrypoint_dir + ) \ No newline at end of file diff --git a/scalene/scalene_cpu_profiler.py b/scalene/scalene_cpu_profiler.py new file mode 100644 index 000000000..07337dc4c --- /dev/null +++ b/scalene/scalene_cpu_profiler.py @@ -0,0 +1,131 @@ +""" +CPU profiling functionality for Scalene profiler. + +This module extracts CPU profiling functionality from the main Scalene class +to improve code organization and reduce complexity. +""" + +import math +import signal +import sys +import time +from typing import Any, Dict, Optional + +from scalene.scalene_signals import SignumType +from scalene.time_info import TimeInfo, get_times +from scalene.scalene_utility import compute_frames_to_record + +if sys.version_info >= (3, 11): + from types import FrameType +else: + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from types import FrameType + else: + FrameType = Any + + +class ScaleneCPUProfiler: + """Handles CPU profiling functionality for Scalene.""" + + def __init__(self, stats, signal_manager, accelerator, client_timer, orig_raise_signal, is_thread_sleeping): + """Initialize the CPU profiler.""" + self.__stats = stats + self.__signal_manager = signal_manager + self.__accelerator = accelerator + self.__client_timer = client_timer + self.__orig_raise_signal = orig_raise_signal + self.__is_thread_sleeping = is_thread_sleeping + self.__last_signal_time = TimeInfo() + self.__last_cpu_interval = 0.0 + + @staticmethod + def generate_exponential_sample(scale: float) -> float: + """Generate an exponentially distributed sample.""" + import math + import random + + u = random.random() # Uniformly distributed random number between 0 and 1 + return -scale * math.log(1 - u) + + def sample_cpu_interval(self, cpu_sampling_rate: float) -> float: + """Return the CPU sampling interval.""" + # Sample an interval from an exponential distribution. + self.__last_cpu_interval = self.generate_exponential_sample(cpu_sampling_rate) + return self.__last_cpu_interval + + def cpu_signal_handler( + self, + signum: SignumType, + this_frame: Optional[FrameType], + should_trace_func, + process_cpu_sample_func, + sample_cpu_interval_func, + restart_timer_func, + ) -> None: + """Handle CPU signals.""" + try: + # Get current time stats. + now = TimeInfo() + now.sys, now.user = get_times() + now.virtual = time.process_time() + now.wallclock = time.perf_counter() + if ( + self.__last_signal_time.virtual == 0 + or self.__last_signal_time.wallclock == 0 + ): + # Initialization: store values and update on the next pass. + self.__last_signal_time = now + if sys.platform != "win32": + next_interval = sample_cpu_interval_func() + restart_timer_func(next_interval) + return + + if self.__accelerator: + (gpu_load, gpu_mem_used) = self.__accelerator.get_stats() + else: + (gpu_load, gpu_mem_used) = (0.0, 0.0) + + # Process this CPU sample. + process_cpu_sample_func( + signum, + compute_frames_to_record(should_trace_func), + now, + gpu_load, + gpu_mem_used, + self.__last_signal_time, + self.__is_thread_sleeping, + ) + elapsed = now.wallclock - self.__last_signal_time.wallclock + # Store the latest values as the previously recorded values. + self.__last_signal_time = now + # Restart the timer while handling any timers set by the client. + next_interval = sample_cpu_interval_func() + if sys.platform != "win32": + if self.__client_timer.is_set: + ( + should_raise, + remaining_time, + ) = self.__client_timer.yield_next_delay(elapsed) + if should_raise: + self.__orig_raise_signal(signal.SIGUSR1) + # NOTE-- 0 will only be returned if the 'seconds' have elapsed + # and there is no interval + to_wait: float + if remaining_time > 0: + to_wait = min(remaining_time, next_interval) + else: + to_wait = next_interval + self.__client_timer.reset() + restart_timer_func(to_wait) + else: + restart_timer_func(next_interval) + finally: + if sys.platform == "win32": + restart_timer_func(next_interval) + + def windows_timer_loop(self, windows_queue, timer_signals) -> None: + """Timer loop for Windows CPU profiling.""" + while timer_signals: + time.sleep(0.01) + windows_queue.put(None) \ No newline at end of file diff --git a/scalene/scalene_profiler.py b/scalene/scalene_profiler.py index 0a9adb5d5..55ae6a69d 100644 --- a/scalene/scalene_profiler.py +++ b/scalene/scalene_profiler.py @@ -110,6 +110,10 @@ from scalene.scalene_parseargs import ScaleneParseArgs, StopJupyterExecution from scalene.scalene_sigqueue import ScaleneSigQueue from scalene.scalene_accelerator import ScaleneAccelerator +from scalene.scalene_cpu_profiler import ScaleneCPUProfiler +from scalene.scalene_profiler_lifecycle import ScaleneProfilerLifecycle +from scalene.scalene_code_executor import ScaleneCodeExecutor +from scalene.scalene_utils import ScaleneUtils console = Console(style="white on blue") @@ -238,6 +242,11 @@ def last_profiled_tuple() -> Tuple[Filename, LineNumber, ByteCodeIndex]: __invalidate_queue: List[Tuple[Filename, LineNumber]] = [] __invalidate_mutex: threading.Lock __profiler_base: str + + # New modular components + __cpu_profiler: Optional[ScaleneCPUProfiler] = None + __profiler_lifecycle: Optional[ScaleneProfilerLifecycle] = None + __code_executor: Optional[ScaleneCodeExecutor] = None @staticmethod def get_original_lock() -> threading.Lock: @@ -373,17 +382,24 @@ def remove_child_pid(cls, pid: int) -> None: @staticmethod def generate_exponential_sample(scale: float) -> float: - import math - import random - - u = random.random() # Uniformly distributed random number between 0 and 1 - return -scale * math.log(1 - u) + """Generate an exponentially distributed sample.""" + return ScaleneCPUProfiler.generate_exponential_sample(scale) @staticmethod def sample_cpu_interval() -> float: - interval = Scalene.generate_exponential_sample(Scalene.__args.cpu_sampling_rate) - Scalene.__last_cpu_interval = interval - return interval + """Return the CPU sampling interval.""" + if Scalene.__cpu_profiler: + interval = Scalene.__cpu_profiler.sample_cpu_interval(Scalene.__args.cpu_sampling_rate) + Scalene.__last_cpu_interval = interval + return interval + else: + # Fallback if not initialized yet + import math + import random + u = random.random() + interval = -Scalene.__args.cpu_sampling_rate * math.log(1 - u) + Scalene.__last_cpu_interval = interval + return interval @staticmethod def profile(func: Any) -> Any: @@ -445,19 +461,26 @@ def reset_thread_sleeping(tid: int) -> None: @staticmethod def windows_timer_loop() -> None: """For Windows, send periodic timer signals; launch as a background thread.""" - Scalene.__signal_manager.windows_timer_loop( - Scalene.__args.cpu_sampling_rate - ) # TODO: integrate support for use of sample_cpu_interval() + if Scalene.__cpu_profiler: + Scalene.__cpu_profiler.windows_timer_loop( + Scalene.__windows_queue, + Scalene.__signal_manager.timer_signals + ) + else: + # Fallback to signal manager + Scalene.__signal_manager.windows_timer_loop( + Scalene.__args.cpu_sampling_rate + ) @staticmethod def start_signal_queues() -> None: """Start the signal processing queues (i.e., their threads).""" - Scalene.__signal_manager.start_signal_queues() + ScaleneUtils.start_signal_queues(Scalene.__signal_manager) @staticmethod def stop_signal_queues() -> None: """Stop the signal processing queues (i.e., their threads).""" - Scalene.__signal_manager.stop_signal_queues() + ScaleneUtils.stop_signal_queues(Scalene.__signal_manager) @staticmethod def term_signal_handler( @@ -467,8 +490,7 @@ def term_signal_handler( """Handle terminate signals.""" Scalene.stop() Scalene.output_profile() - - Scalene.__orig_exit(Scalene.__sigterm_exit_code) + ScaleneUtils.term_signal_handler(signum, this_frame, Scalene.__sigterm_exit_code) @staticmethod def malloc_signal_handler( @@ -476,51 +498,16 @@ def malloc_signal_handler( this_frame: Optional[FrameType], ) -> None: """Handle allocation signals.""" - if not Scalene.__args.memory: - # This should never happen, but we fail gracefully. - return - from scalene import pywhere # type: ignore - - if this_frame: - enter_function_meta(this_frame, Scalene.should_trace, Scalene.__stats) - # Walk the stack till we find a line of code in a file we are tracing. - found_frame = False - f = this_frame - while f: - if found_frame := Scalene.should_trace( - f.f_code.co_filename, f.f_code.co_name - ): - break - f = cast(FrameType, f.f_back) - if not found_frame: - return - assert f - # Start tracing until we execute a different line of - # code in a file we are tracking. - # First, see if we have now executed a different line of code. - # If so, increment. - - invalidated = pywhere.get_last_profiled_invalidated() - (fname, lineno, lasti) = Scalene.last_profiled_tuple() - if not invalidated and this_frame and not (on_stack(this_frame, fname, lineno)): - Scalene.update_profiled() - pywhere.set_last_profiled_invalidated_false() - # In the setprofile callback, we rely on - # __last_profiled always having the same memory address. - # This is an optimization to not have to traverse the Scalene profiler - # object's dictionary every time we want to update the last profiled line. - # - # A previous change to this code set Scalene.__last_profiled = [fname, lineno, lasti], - # which created a new list object and set the __last_profiled attribute to the new list. This - # made the object held in `pywhere.cpp` out of date, and caused the profiler to not update the last profiled line. - Scalene.__last_profiled[:] = [ - Filename(f.f_code.co_filename), - LineNumber(f.f_lineno), - ByteCodeIndex(f.f_lasti), - ] - Scalene.__alloc_sigq.put([0]) - pywhere.enable_settrace(this_frame) - del this_frame + ScaleneUtils.malloc_signal_handler( + signum, + this_frame, + Scalene.__args, + Scalene.should_trace, + Scalene.last_profiled_tuple, + Scalene.update_profiled, + Scalene.__last_profiled, + Scalene.__alloc_sigq, + ) @staticmethod def free_signal_handler( @@ -528,10 +515,12 @@ def free_signal_handler( this_frame: Optional[FrameType], ) -> None: """Handle free signals.""" - if this_frame: - enter_function_meta(this_frame, Scalene.should_trace, Scalene.__stats) - Scalene.__alloc_sigq.put([0]) - del this_frame + ScaleneUtils.free_signal_handler( + signum, + this_frame, + Scalene.__args, + Scalene.__alloc_sigq, + ) @staticmethod def memcpy_signal_handler( @@ -539,21 +528,25 @@ def memcpy_signal_handler( this_frame: Optional[FrameType], ) -> None: """Handle memcpy signals.""" - Scalene.__memcpy_sigq.put((signum, this_frame)) - del this_frame + ScaleneUtils.memcpy_signal_handler( + signum, + this_frame, + Scalene.__args, + Scalene.__memcpy_sigq, + ) @staticmethod def enable_signals() -> None: """Set up the signal handlers to handle interrupts for profiling and start the timer interrupts.""" - next_interval = Scalene.sample_cpu_interval() - Scalene.__signal_manager.enable_signals( + ScaleneUtils.enable_signals( Scalene.malloc_signal_handler, Scalene.free_signal_handler, Scalene.memcpy_signal_handler, Scalene.term_signal_handler, Scalene.cpu_signal_handler, - next_interval, + Scalene.__signal_manager, + Scalene.sample_cpu_interval, ) def __init__( @@ -675,6 +668,9 @@ def __init__( # Store relevant names (program, path). if program_being_profiled: Scalene.__program_being_profiled = Filename(program_being_profiled) + + # Initialize new modular components + Scalene._initialize_components() @staticmethod def cpu_signal_handler( @@ -682,71 +678,26 @@ def cpu_signal_handler( this_frame: Optional[FrameType], ) -> None: """Handle CPU signals.""" - try: - # Get current time stats. - now = TimeInfo() - now.sys, now.user = get_times() - now.virtual = time.process_time() - now.wallclock = time.perf_counter() - if ( - Scalene.__last_signal_time.virtual == 0 - or Scalene.__last_signal_time.wallclock == 0 - ): - # Initialization: store values and update on the next pass. - Scalene.__last_signal_time = now - if sys.platform != "win32": - next_interval = Scalene.sample_cpu_interval() - Scalene.__signal_manager.restart_timer(next_interval) - return - - if Scalene.__accelerator: - (gpu_load, gpu_mem_used) = Scalene.__accelerator.get_stats() - else: - (gpu_load, gpu_mem_used) = (0.0, 0.0) - - # Process this CPU sample. - Scalene.process_cpu_sample( + if Scalene.__cpu_profiler: + Scalene.__cpu_profiler.cpu_signal_handler( signum, - compute_frames_to_record(Scalene.should_trace), - now, - gpu_load, - gpu_mem_used, - Scalene.__last_signal_time, - Scalene.__is_thread_sleeping, + this_frame, + Scalene.should_trace, + Scalene.process_cpu_sample, + Scalene.sample_cpu_interval, + Scalene.__signal_manager.restart_timer, ) - elapsed = now.wallclock - Scalene.__last_signal_time.wallclock - # Store the latest values as the previously recorded values. - Scalene.__last_signal_time = now - # Restart the timer while handling any timers set by the client. - next_interval = Scalene.sample_cpu_interval() + else: + # Fallback implementation (simplified) if sys.platform != "win32": - if Scalene.client_timer.is_set: - ( - should_raise, - remaining_time, - ) = Scalene.client_timer.yield_next_delay(elapsed) - if should_raise: - Scalene.__orig_raise_signal(signal.SIGUSR1) - # NOTE-- 0 will only be returned if the 'seconds' have elapsed - # and there is no interval - to_wait: float - if remaining_time > 0: - to_wait = min(remaining_time, next_interval) - else: - to_wait = next_interval - Scalene.client_timer.reset() - Scalene.__signal_manager.restart_timer(to_wait) - else: - Scalene.__signal_manager.restart_timer(next_interval) - finally: - if sys.platform == "win32": + next_interval = Scalene.sample_cpu_interval() Scalene.__signal_manager.restart_timer(next_interval) @staticmethod def output_profile(program_args: Optional[List[str]] = None) -> bool: """Output the profile. Returns true iff there was any info reported the profile.""" - if Scalene.__args.json: - json_output = Scalene.__json.output_profiles( + if Scalene.__profiler_lifecycle: + return Scalene.__profiler_lifecycle.output_profile( Scalene.__program_being_profiled, Scalene.__stats, Scalene.__pid, @@ -754,91 +705,34 @@ def output_profile(program_args: Optional[List[str]] = None) -> bool: Scalene.__python_alias_dir, Scalene.__program_path, Scalene.__entrypoint_dir, - program_args, - profile_memory=Scalene.__args.memory, - reduced_profile=Scalene.__args.reduced_profile, + program_args ) - # Since the default value returned for "there are no samples" - # is `{}`, we use a sentinel value `{"is_child": True}` - # when inside a child process to indicate that there are samples, but they weren't - # turned into a JSON file because they'll later - # be used by the parent process - if "is_child" in json_output: - return True - outfile = Scalene.__output.output_file - if Scalene.__args.outfile: - outfile = os.path.join( - os.path.dirname(Scalene.__args.outfile), - os.path.splitext(os.path.basename(Scalene.__args.outfile))[0] - + ".json", - ) - # If there was no output file specified, print to the console. - if not outfile: - if sys.platform == "win32": - outfile = "CON" - else: - outfile = "/dev/stdout" - # Write the JSON to the output file (or console). - with open(outfile, "w") as f: - f.write(json.dumps(json_output, sort_keys=True, indent=4) + "\n") - return json_output != {} - else: - output = Scalene.__output - column_width = Scalene.__args.column_width - if not Scalene.__args.html: - # Get column width of the terminal and adjust to fit. - with contextlib.suppress(Exception): - # If we are in a Jupyter notebook, stick with 132 - if "ipykernel" in sys.modules: - column_width = 132 - else: - import shutil - - column_width = shutil.get_terminal_size().columns - did_output: bool = output.output_profiles( - column_width, - Scalene.__stats, - Scalene.__pid, - Scalene.profile_this_code, - Scalene.__python_alias_dir, - Scalene.__program_path, - program_args, - profile_memory=Scalene.__args.memory, - reduced_profile=Scalene.__args.reduced_profile, - ) - return did_output + # Fallback: minimal implementation + return False @staticmethod @functools.cache def get_line_info( fname: Filename, ) -> Generator[Tuple[list[str], int], None, None]: - line_info = ( - inspect.getsourcelines(fn) for fn in Scalene.__functions_to_profile[fname] - ) - return line_info + """Get line information for profiled functions.""" + return ScaleneUtils.get_line_info(fname, Scalene.__functions_to_profile) @staticmethod def profile_this_code(fname: Filename, lineno: LineNumber) -> bool: - # sourcery skip: inline-immediately-returned-variable """When using @profile, only profile files & lines that have been decorated.""" - if not Scalene.__files_to_profile: - return True - if fname not in Scalene.__files_to_profile: - return False - # Now check to see if it's the right line range. - line_info = Scalene.get_line_info(fname) - found_function = any( - line_start <= lineno < line_start + len(lines) - for (lines, line_start) in line_info + return ScaleneUtils.profile_this_code( + fname, + lineno, + Scalene.__files_to_profile, + Scalene.__functions_to_profile, ) - return found_function @staticmethod def print_stacks() -> None: - for f in Scalene.__stats.stacks: - print(f, Scalene.__stats.stacks[f]) + """Print stack information.""" + ScaleneUtils.print_stacks(Scalene.__stats) @staticmethod def process_cpu_sample( @@ -1204,6 +1098,40 @@ def _should_trace_by_location(filename: Filename) -> bool: __done = False + @staticmethod + def _initialize_components() -> None: + """Initialize the modular components.""" + # Initialize CPU profiler + Scalene.__cpu_profiler = ScaleneCPUProfiler( + Scalene.__stats, + Scalene.__signal_manager, + Scalene.__accelerator, + Scalene.client_timer, + Scalene.__orig_raise_signal, + Scalene.__is_thread_sleeping + ) + + # Initialize profiler lifecycle + Scalene.__profiler_lifecycle = ScaleneProfilerLifecycle( + Scalene.__args, + Scalene.__stats, + Scalene.__signal_manager, + Scalene.__output, + Scalene.__json, + Scalene.__accelerator, + Scalene.__profile_filename + ) + + # Initialize code executor + Scalene.__code_executor = ScaleneCodeExecutor( + Scalene.__args, + Scalene.__files_to_profile, + Scalene.__functions_to_profile, + Scalene.__program_being_profiled, + Scalene.__program_path, + Scalene.__entrypoint_dir + ) + @staticmethod def start() -> None: """Initiate profiling.""" diff --git a/scalene/scalene_profiler_lifecycle.py b/scalene/scalene_profiler_lifecycle.py new file mode 100644 index 000000000..de2568a6b --- /dev/null +++ b/scalene/scalene_profiler_lifecycle.py @@ -0,0 +1,212 @@ +""" +Profiler lifecycle management for Scalene profiler. + +This module extracts profiler lifecycle functionality from the main Scalene class +to improve code organization and reduce complexity. +""" + +import os +import sys +import time +from typing import Any, Dict, List, Optional, Set + +from scalene.scalene_signals import SignumType +from scalene.scalene_statistics import Filename +from scalene.find_browser import find_browser + +if sys.version_info >= (3, 11): + from types import FrameType +else: + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from types import FrameType + else: + FrameType = Any + + +class ScaleneProfilerLifecycle: + """Handles profiler lifecycle management for Scalene.""" + + def __init__(self, args, stats, signal_manager, output, json_output, accelerator, + profile_filename): + """Initialize the profiler lifecycle manager.""" + self.__args = args + self.__stats = stats + self.__signal_manager = signal_manager + self.__output = output + self.__json = json_output + self.__accelerator = accelerator + self.__profile_filename = profile_filename + self.__initialized = False + + def set_initialized(self, value: bool) -> None: + """Set the initialized flag.""" + self.__initialized = value + + def start(self, set_start_time_func, set_done_func) -> None: + """Initiate profiling.""" + if not self.__initialized: + print( + "ERROR: Do not try to invoke `start` if you have not called Scalene using one of the methods\n" + "in https://github.com/plasma-umass/scalene#using-scalene\n" + "(The most likely issue is that you need to run your code with `scalene`, not `python`).", + file=sys.stderr, + ) + sys.exit(1) + self.__stats.start_clock() + self.__signal_manager.enable_signals() + set_start_time_func(time.monotonic_ns()) + set_done_func(False) + + # Start neuron monitor if using Neuron accelerator + if ( + hasattr(self.__accelerator, "start_monitor") + and self.__accelerator is not None + ): + self.__accelerator.start_monitor() + + if self.__args.memory: + from scalene import pywhere # type: ignore + + pywhere.set_scalene_done_false() + + def stop(self, set_done_func, get_is_child_func, get_in_jupyter_func) -> None: + """Complete profiling.""" + set_done_func(True) + if self.__args.memory: + from scalene import pywhere # type: ignore + + pywhere.set_scalene_done_true() + + self.__signal_manager.disable_signals() + self.__stats.stop_clock() + if self.__args.outfile: + self.__profile_filename = Filename( + os.path.join( + os.path.dirname(self.__args.outfile), + os.path.basename(self.__profile_filename), + ) + ) + + if self.__args.web and not self.__args.cli and not get_is_child_func(): + # First, check for a browser. + try: + if not find_browser(): + # Could not open a graphical web browser tab; + # act as if --web was not specified + self.__args.web = False + else: + # Force JSON output to profile.json. + self.__args.json = True + self.__output.html = False + self.__output.output_file = self.__profile_filename + except Exception: + # Couldn't find a browser. + self.__args.web = False + + # If so, set variables appropriately. + if self.__args.web and get_in_jupyter_func(): + # Force JSON output to profile.json. + self.__args.json = True + self.__output.html = False + self.__output.output_file = self.__profile_filename + + def is_done(self, get_done_func) -> bool: + """Return true if Scalene has stopped profiling.""" + return get_done_func() + + def output_profile(self, program_being_profiled: Filename, + stats, pid: int, profile_this_code_func, + python_alias_dir, program_path, + entrypoint_dir, + program_args: Optional[List[str]] = None) -> bool: + """Output the profile. Returns true iff there was any info reported the profile.""" + if self.__args.json: + json_output = self.__json.output_profiles( + program_being_profiled, + stats, + pid, + profile_this_code_func, + python_alias_dir, + program_path, + entrypoint_dir, + program_args, + profile_memory=self.__args.memory, + reduced_profile=self.__args.reduced_profile, + ) + # Since the default value returned for "there are no samples" + # is `{}`, we use a sentinel value `{"is_child": True}` + # when inside a child process to indicate that there are samples, but they weren't + # turned into a JSON file because they'll later + # be used by the parent process + if "is_child" in json_output: + return True + outfile = self.__output.output_file + if self.__args.outfile: + outfile = os.path.join( + os.path.dirname(self.__args.outfile), + os.path.splitext(os.path.basename(self.__args.outfile))[0] + + ".json", + ) + # If there was no output file specified, print to the console. + if not outfile: + if sys.platform == "win32": + outfile = "CON" + else: + outfile = "/dev/stdout" + # Write the JSON to the output file (or console). + import json + with open(outfile, "w") as f: + f.write(json.dumps(json_output, sort_keys=True, indent=4) + "\n") + return json_output != {} + + else: + output = self.__output + column_width = self.__args.column_width + if not self.__args.html: + # Get column width of the terminal and adjust to fit. + import contextlib + with contextlib.suppress(Exception): + # If we are in a Jupyter notebook, stick with 132 + if "ipykernel" in sys.modules: + column_width = 132 + else: + import shutil + + column_width = shutil.get_terminal_size().columns + did_output: bool = output.output_profiles( + column_width, + stats, + pid, + profile_this_code_func, + python_alias_dir, + program_path, + program_args, + profile_memory=self.__args.memory, + reduced_profile=self.__args.reduced_profile, + ) + return did_output + + def start_signal_handler( + self, + signum: SignumType, + this_frame: Optional[FrameType], + lifecycle_disabled_ref: List[bool], + enable_signals_func, + ) -> None: + """Start the profiler from a signal.""" + if lifecycle_disabled_ref[0]: + return + enable_signals_func() + + def stop_signal_handler( + self, + signum: SignumType, + this_frame: Optional[FrameType], + lifecycle_disabled_ref: List[bool], + disable_signals_func, + ) -> None: + """Stop the profiler from a signal.""" + if lifecycle_disabled_ref[0]: + return + disable_signals_func() \ No newline at end of file diff --git a/scalene/scalene_utils.py b/scalene/scalene_utils.py new file mode 100644 index 000000000..870735104 --- /dev/null +++ b/scalene/scalene_utils.py @@ -0,0 +1,260 @@ +""" +Utility methods extracted from Scalene profiler for better modularity. + +This module contains various utility and helper methods that were previously +in the main Scalene class. +""" + +import contextlib +import functools +import gc +import inspect +import os +import signal +import sys +import threading +from types import FrameType +from typing import Any, Dict, Generator, List, Optional, Set, Tuple + +from scalene.scalene_statistics import Filename, LineNumber, ByteCodeIndex +from scalene.scalene_utility import enter_function_meta, on_stack + + +class ScaleneUtils: + """Utility methods for Scalene profiler.""" + + @staticmethod + def enable_signals( + malloc_signal_handler, + free_signal_handler, + memcpy_signal_handler, + term_signal_handler, + cpu_signal_handler, + signal_manager, + sample_cpu_interval_func, + ) -> None: + """Set up the signal handlers to handle interrupts for profiling and start the + timer interrupts.""" + next_interval = sample_cpu_interval_func() + signal_manager.enable_signals( + malloc_signal_handler, + free_signal_handler, + memcpy_signal_handler, + term_signal_handler, + cpu_signal_handler, + next_interval, + ) + + @staticmethod + def disable_signals(signal_manager, retry: bool = True) -> None: + """Turn off the signals.""" + try: + signal_manager.disable_signals() + except Exception: + # Could be a ValueError (signal only works in main thread) or + # even an OSError in certain contexts. + if retry: + pass + + @staticmethod + def malloc_signal_handler( + signum, + this_frame: Optional[FrameType], + args, + should_trace_func, + last_profiled_tuple_func, + update_profiled_func, + last_profiled_ref, + alloc_sigq, + ) -> None: + """Handle allocation signals.""" + if not args.memory: + # This should never happen, but we fail gracefully. + return + from scalene import pywhere # type: ignore + + if this_frame: + # Use a dummy stats object since we don't have access to it here + enter_function_meta(this_frame, should_trace_func, None) + # Walk the stack till we find a line of code in a file we are tracing. + found_frame = False + f = this_frame + while f: + if found_frame := should_trace_func( + f.f_code.co_filename, f.f_code.co_name + ): + break + f = f.f_back + if not found_frame: + return + assert f + # Start tracing until we execute a different line of + # code in a file we are tracking. + # First, see if we have now executed a different line of code. + # If so, increment. + + invalidated = pywhere.get_last_profiled_invalidated() + (fname, lineno, lasti) = last_profiled_tuple_func() + if not invalidated and this_frame and not (on_stack(this_frame, fname, lineno)): + update_profiled_func() + pywhere.set_last_profiled_invalidated_false() + # In the setprofile callback, we rely on + # __last_profiled always having the same memory address. + # This is an optimization to not have to traverse the Scalene profiler + # object's dictionary every time we want to update the last profiled line. + # + # A previous change to this code set Scalene.__last_profiled = [fname, lineno, lasti], + # which created a new list object and set the __last_profiled attribute to the new list. This + # made the object held in `pywhere.cpp` out of date, and caused the profiler to not update the last profiled line. + last_profiled_ref[:] = [ + Filename(f.f_code.co_filename), + LineNumber(f.f_lineno), + ByteCodeIndex(f.f_lasti), + ] + alloc_sigq.put([0]) + pywhere.enable_settrace(this_frame) + del this_frame + + @staticmethod + def free_signal_handler( + signum, + this_frame: Optional[FrameType], + args, + alloc_sigq, + ) -> None: + """Handle free signals.""" + if not args.memory: + return + alloc_sigq.put([1]) + + @staticmethod + def memcpy_signal_handler( + signum, + this_frame: Optional[FrameType], + args, + memcpy_sigq, + ) -> None: + """Handle memcpy signals.""" + if not args.memory: + return + memcpy_sigq.put((signum, this_frame)) + + @staticmethod + def term_signal_handler( + signum, + this_frame: Optional[FrameType], + sigterm_exit_code: int, + ) -> None: + """Handle sigterm signals.""" + sys.exit(sigterm_exit_code) + + @staticmethod + def print_stacks(stats) -> None: + """Print stack information.""" + for f in stats.stacks: + print(f) + + @staticmethod + @functools.cache + def get_line_info( + fname: Filename, + functions_to_profile: Dict[Filename, Set[Any]], + ) -> Generator[Tuple[list[str], int], None, None]: + """Get line information for profiled functions.""" + line_info = ( + inspect.getsourcelines(fn) for fn in functions_to_profile[fname] + ) + return line_info + + @staticmethod + def profile_this_code( + fname: Filename, + lineno: LineNumber, + files_to_profile: Set[Filename], + functions_to_profile: Dict[Filename, Set[Any]], + ) -> bool: + """When using @profile, only profile files & lines that have been decorated.""" + if not files_to_profile: + return True + if fname not in files_to_profile: + return False + # Now check to see if it's the right line range. + line_info = ScaleneUtils.get_line_info(fname, functions_to_profile) + found_function = any( + line_start <= lineno < line_start + len(lines) + for (lines, line_start) in line_info + ) + return found_function + + @staticmethod + def alloc_sigqueue_processor( + memory_profiler, + stats, + args, + invalidate_mutex, + invalidate_queue, + start_time, + ) -> None: + """Handle interrupts for memory profiling (mallocs and frees).""" + # Process all the messages in the queue. + memory_profiler.process_malloc_free_samples( + start_time, + args, + invalidate_mutex, + invalidate_queue, + ) + + @staticmethod + def memcpy_sigqueue_processor( + memory_profiler, + ) -> None: + """Process memcpy signals (used in a ScaleneSigQueue).""" + # Process all the messages in the queue. + memory_profiler.process_memcpy_samples() + + @staticmethod + def start_signal_queues(signal_manager) -> None: + """Start the signal processing queues (i.e., their threads).""" + signal_manager.start_signal_queues() + + @staticmethod + def stop_signal_queues(signal_manager) -> None: + """Stop the signal processing queues.""" + signal_manager.stop_signal_queues() + + @staticmethod + def clear_metrics(stats) -> None: + """Clear the various collected metrics.""" + stats.clear_all() + gc.collect() + + @staticmethod + def add_child_pid(child_pids: Set[int], pid: int) -> None: + """Add a child PID to the set of children being managed.""" + child_pids.add(pid) + + @staticmethod + def remove_child_pid(child_pids: Set[int], pid: int) -> None: + """Remove a child PID from the set of children being managed.""" + child_pids.discard(pid) + + @staticmethod + def exit_handler( + stop_func, + is_child: bool, + memory_profiler, + ) -> None: + """When we exit, disable signals.""" + stop_func() + if not is_child and memory_profiler: + memory_profiler.cleanup() + + @staticmethod + def set_thread_sleeping(is_thread_sleeping: Dict[int, bool], tid: int) -> None: + """Indicate the given thread is sleeping.""" + is_thread_sleeping[tid] = True + + @staticmethod + def reset_thread_sleeping(is_thread_sleeping: Dict[int, bool], tid: int) -> None: + """Indicate the given thread is not sleeping.""" + is_thread_sleeping[tid] = False \ No newline at end of file