Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/performance test #850

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions openadapt/a11y/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""This module provides platform-specific implementations for window and element
interactions using accessibility APIs. It abstracts the platform differences
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unnecessary indent

and provides a unified interface for retrieving the active window, finding
display elements, and getting element values.
"""

import sys

from loguru import logger

if sys.platform == "darwin":
from . import _macos as impl

role = "AXStaticText"
elif sys.platform in ("win32", "linux"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove linux for now

from . import _windows as impl

role = "Text"
else:
raise Exception(f"Unsupported platform: {sys.platform}")


def get_active_window():
"""Get the active window object.

Returns:
The active window object.
"""
try:
return impl.get_active_window()
except Exception as exc:
logger.warning(f"{exc=}")
return None


def get_element_value(active_window, role=role):
"""Find the display of active_window.

Args:
active_window: The parent window to search within.

Returns:
The found active_window.
"""
try:
return impl.get_element_value(active_window, role)
except Exception as exc:
logger.warning(f"{exc=}")
return None
61 changes: 61 additions & 0 deletions openadapt/a11y/_macos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import AppKit
import ApplicationServices


def get_attribute(element, attribute):
result, value = ApplicationServices.AXUIElementCopyAttributeValue(
element, attribute, None
)
if result == 0:
return value
return None


def find_element_by_attribute(element, attribute, value):
if get_attribute(element, attribute) == value:
return element
children = get_attribute(element, ApplicationServices.kAXChildrenAttribute) or []
for child in children:
found = find_element_by_attribute(child, attribute, value)
if found:
return found
return None


def get_active_window():
"""Get the active window object.

Returns:
AXUIElement: The active window object.
"""
workspace = AppKit.NSWorkspace.sharedWorkspace()
active_app = workspace.frontmostApplication()
app_element = ApplicationServices.AXUIElementCreateApplication(
active_app.processIdentifier()
)

error_code, focused_window = ApplicationServices.AXUIElementCopyAttributeValue(
app_element, ApplicationServices.kAXFocusedWindowAttribute, None
)
if error_code:
raise Exception("Could not get the active window.")
return focused_window


def get_element_value(element, role="AXStaticText"):
"""Get the value of a specific element .

Args:
element: The AXUIElement to search within.

Returns:
str: The value of the element, or an error message if not found.
"""
target_element = find_element_by_attribute(
element, ApplicationServices.kAXRoleAttribute, role
)
if not target_element:
return f"AXStaticText element not found."

value = get_attribute(target_element, ApplicationServices.kAXValueAttribute)
return value if value else f"No value for AXStaticText element."
44 changes: 44 additions & 0 deletions openadapt/a11y/_windows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from loguru import logger
import pywinauto
import re


def get_active_window() -> pywinauto.application.WindowSpecification:
"""Get the active window object.

Returns:
pywinauto.application.WindowSpecification: The active window object.
"""
app = pywinauto.application.Application(backend="uia").connect(active_only=True)
window = app.top_window()
return window.wrapper_object()


def get_element_value(active_window, role="Text"):
"""Find the display element.

Args:
active_window: The parent window to search within.
role (str): The role of the element to search for.

Returns:
The found display element value.

Raises:
ValueError: If the element is not found.
"""
try:
elements = active_window.descendants() # Retrieve all descendants
for elem in elements:
if (
elem.element_info.control_type == role
and elem.element_info.name.startswith("Display is")
):
# Extract the number from the element's name
match = re.search(r"[-+]?\d*\.?\d+", elem.element_info.name)
if match:
return str(match.group())
raise ValueError("Display element not found")
except Exception as exc:
logger.warning(f"Error in get_element_value: {exc}")
return None
101 changes: 60 additions & 41 deletions openadapt/scripts/generate_db_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
import os
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
from openadapt.db.db import Base
from openadapt.config import DATA_DIR_PATH, PARENT_DIR_PATH, RECORDING_DIR_PATH
from openadapt.config import PARENT_DIR_PATH, RECORDING_DIR_PATH
import openadapt.db.crud as crud
from loguru import logger


def get_session():
"""
Establishes a database connection and returns a session and engine.

Returns:
tuple: A tuple containing the SQLAlchemy session and engine.
"""
db_url = RECORDING_DIR_PATH / "recording.db"
print(f"Database URL: {db_url}")
logger.info(f"Database URL: {db_url}")
engine = create_engine(f"sqlite:///{db_url}")
# SessionLocal = sessionmaker(bind=engine)
Base.metadata.create_all(bind=engine)
session = crud.get_new_session(read_only=True)
print("Database connection established.")
logger.info("Database connection established.")
return session, engine


def check_tables_exist(engine):
"""
Checks if the expected tables exist in the database.

Args:
engine: SQLAlchemy engine object.

Returns:
list: A list of table names in the database.
"""
inspector = inspect(engine)
tables = inspector.get_table_names()
expected_tables = [
Expand All @@ -32,13 +44,21 @@ def check_tables_exist(engine):
for table_name in expected_tables:
table_exists = table_name in tables
logger.info(f"{table_name=} {table_exists=}")
return tables
return tables


def fetch_data(session):
"""
Fetches the most recent recordings and related data from the database.

Args:
session: SQLAlchemy session object.

Returns:
dict: A dictionary containing fetched data.
"""
# get the most recent three recordings
recordings = crud.get_recordings(session, max_rows=3)
recording_ids = [recording.id for recording in recordings]

action_events = []
screenshots = []
Expand All @@ -63,17 +83,27 @@ def fetch_data(session):
}

# Debug prints to verify data fetching
print(f"Recordings: {len(data['recordings'])} found.")
print(f"Action Events: {len(data['action_events'])} found.")
print(f"Screenshots: {len(data['screenshots'])} found.")
print(f"Window Events: {len(data['window_events'])} found.")
print(f"Performance Stats: {len(data['performance_stats'])} found.")
print(f"Memory Stats: {len(data['memory_stats'])} found.")
logger.info(f"Recordings: {len(data['recordings'])} found.")
logger.info(f"Action Events: {len(data['action_events'])} found.")
logger.info(f"Screenshots: {len(data['screenshots'])} found.")
logger.info(f"Window Events: {len(data['window_events'])} found.")
logger.info(f"Performance Stats: {len(data['performance_stats'])} found.")
logger.info(f"Memory Stats: {len(data['memory_stats'])} found.")

return data


def format_sql_insert(table_name, rows):
"""
Formats SQL insert statements for a given table and rows.

Args:
table_name (str): The name of the table.
rows (list): A list of SQLAlchemy ORM objects representing the rows.

Returns:
str: A string containing the SQL insert statements.
"""
if not rows:
return ""

Expand All @@ -94,35 +124,24 @@ def format_sql_insert(table_name, rows):


def dump_to_fixtures(filepath):
"""
Dumps the fetched data into an SQL file.

Args:
filepath (str): The path to the SQL file.
"""
session, engine = get_session()
check_tables_exist(engine)
data = fetch_data(session)

with open(filepath, "a", encoding="utf-8") as file:
if data["recordings"]:
file.write("-- Insert sample recordings\n")
file.write(format_sql_insert("recording", data["recordings"]))

if data["action_events"]:
file.write("-- Insert sample action_events\n")
file.write(format_sql_insert("action_event", data["action_events"]))

if data["screenshots"]:
file.write("-- Insert sample screenshots\n")
file.write(format_sql_insert("screenshot", data["screenshots"]))

if data["window_events"]:
file.write("-- Insert sample window_events\n")
file.write(format_sql_insert("window_event", data["window_events"]))

if data["performance_stats"]:
file.write("-- Insert sample performance_stats\n")
file.write(format_sql_insert("performance_stat", data["performance_stats"]))

if data["memory_stats"]:
file.write("-- Insert sample memory_stats\n")
file.write(format_sql_insert("memory_stat", data["memory_stats"]))
print(f"Data appended to {filepath}")
rows_by_table_name = fetch_data(session)

for table_name, rows in rows_by_table_name.items():
if not rows:
logger.warning(f"No rows for {table_name=}")
continue
with open(filepath, "a", encoding="utf-8") as file:
logger.info(f"Writing {len(rows)=} to {filepath=} for {table_name=}")
file.write(f"-- Insert sample rows for {table_name}\n")
file.write(format_sql_insert(table_name, rows))


if __name__ == "__main__":
Expand Down
12 changes: 0 additions & 12 deletions openadapt/window/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import sys

from loguru import logger
import pywinauto

from openadapt.config import config

Expand Down Expand Up @@ -68,17 +67,6 @@ def get_active_window_state(read_window_data: bool) -> dict | None:
return None


def get_active_window() -> pywinauto.application.WindowSpecification:
"""Get the active window object.

Returns:
pywinauto.application.WindowSpecification: The active window object.
"""
app = pywinauto.application.Application(backend="uia").connect(active_only=True)
window = app.top_window()
return window.wrapper_object()


def get_active_element_state(x: int, y: int) -> dict | None:
"""Get the state of the active element at the specified coordinates.

Expand Down
Loading
Loading