Skip to content

Caicai000 master (for harmony) #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -160,4 +160,5 @@ cython_debug/
#.idea/

poetry.lock
window_dump.xml
window_dump.xml
.DS_Store
59 changes: 59 additions & 0 deletions e2etests/test_harmony_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# coding: utf-8
#
# 参考:https://github.com/codematrixer/awesome-hdc

import pytest

from uiautodev.driver.harmony import HDC
from uiautodev.driver.harmony import HarmonyDriver

@pytest.fixture
def hdc() -> HDC:
return HDC()

@pytest.fixture
def serial(hdc: HDC) -> str:
devices = hdc.list_device()
assert len(devices) == 1
return devices[0]


def test_list_device(hdc: HDC):
devices = hdc.list_device()
assert len(devices) == 1


def test_shell(hdc: HDC, serial: str):
assert hdc.shell(serial, 'pwd') == '/'

def test_get_model(hdc: HDC, serial: str):
assert hdc.get_model(serial) == 'ohos'


def test_screenshot(hdc: HDC, serial: str):
image = hdc.screenshot(serial)
assert image is not None
assert image.size is not None


def test_dump_layout(hdc: HDC, serial: str):
layout = hdc.dump_layout(serial)
assert layout is not None
assert isinstance(layout, dict)


@pytest.fixture
def driver(hdc: HDC, serial: str) -> HarmonyDriver:
return HarmonyDriver(hdc, serial)


def test_window_size(driver: HarmonyDriver):
size = driver.window_size()
assert size.width > 0
assert size.height > 0


def test_dump_hierarchy(driver: HarmonyDriver):
xml, hierarchy = driver.dump_hierarchy()
assert xml is not None
assert hierarchy is not None
16 changes: 11 additions & 5 deletions uiautodev/app.py
Original file line number Diff line number Diff line change
@@ -11,16 +11,16 @@
from pathlib import Path
from typing import List

import uvicorn
from fastapi import FastAPI, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from pydantic import BaseModel
import uvicorn

from uiautodev import __version__
from uiautodev.common import convert_bytes_to_image, get_webpage_url, ocr_image
from uiautodev.model import Node
from uiautodev.provider import AndroidProvider, IOSProvider, MockProvider
from uiautodev.provider import AndroidProvider, HarmonyProvider, IOSProvider, MockProvider
from uiautodev.router.device import make_router
from uiautodev.router.xml import router as xml_router
from uiautodev.utils.envutils import Environment
@@ -39,16 +39,19 @@

android_router = make_router(AndroidProvider())
ios_router = make_router(IOSProvider())
harmony_router = make_router(HarmonyProvider())
mock_router = make_router(MockProvider())

app.include_router(mock_router, prefix="/api/mock", tags=["mock"])

if Environment.UIAUTODEV_MOCK:
app.include_router(mock_router, prefix="/api/android", tags=["mock"])
app.include_router(mock_router, prefix="/api/ios", tags=["mock"])
app.include_router(mock_router, prefix="/api/harmony", tags=["mock"])
else:
app.include_router(android_router, prefix="/api/android", tags=["android"])
app.include_router(ios_router, prefix="/api/ios", tags=["ios"])
app.include_router(harmony_router, prefix="/api/harmony", tags=["harmony"])

app.include_router(xml_router, prefix="/api/xml", tags=["xml"])

@@ -61,6 +64,7 @@ class InfoResponse(BaseModel):
cwd: str
drivers: List[str]


@app.get("/api/info")
def info() -> InfoResponse:
"""Information about the application"""
@@ -70,16 +74,18 @@ def info() -> InfoResponse:
platform=platform.system(), # Linux | Darwin | Windows
code_language="Python",
cwd=os.getcwd(),
drivers=["android", "ios"],
drivers=["android", "ios", "harmony"],
)


@app.post('/api/ocr_image')
async def _ocr_image(file: UploadFile = File(...)) -> List[Node]:
"""OCR an image"""
image_data = await file.read()
image = convert_bytes_to_image(image_data)
return ocr_image(image)


@app.get("/shutdown")
def shutdown() -> str:
"""Shutdown the server"""
@@ -88,7 +94,7 @@ def shutdown() -> str:


@app.get("/demo")
def demo() -> str:
def demo():
"""Demo endpoint"""
static_dir = Path(__file__).parent / "static"
print(static_dir / "demo.html")
@@ -104,4 +110,4 @@ def index_redirect():


if __name__ == '__main__':
uvicorn.run("uiautodev.app:app", port=4000, reload=True, use_colors=True)
uvicorn.run("uiautodev.app:app", port=4000, reload=True, use_colors=True)
2 changes: 1 addition & 1 deletion uiautodev/command_proxy.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
WindowSizeResponse
from uiautodev.driver.base_driver import BaseDriver
from uiautodev.exceptions import ElementNotFoundError
from uiautodev.model import Node, AppInfo
from uiautodev.model import AppInfo, Node
from uiautodev.utils.common import node_travel

COMMANDS: Dict[Command, Callable] = {}
1 change: 1 addition & 0 deletions uiautodev/common.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import locale
import logging
from typing import List

from PIL import Image

from uiautodev.model import Node, OCRNode
2 changes: 1 addition & 1 deletion uiautodev/driver/android.py
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
from uiautodev.command_types import CurrentAppResponse
from uiautodev.driver.base_driver import BaseDriver
from uiautodev.exceptions import AndroidDriverException, RequestError
from uiautodev.model import Node, AppInfo, Rect, ShellResponse, WindowSize
from uiautodev.model import AppInfo, Node, Rect, ShellResponse, WindowSize
from uiautodev.utils.common import fetch_through_socket

logger = logging.getLogger(__name__)
2 changes: 1 addition & 1 deletion uiautodev/driver/base_driver.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
from pydantic import BaseModel

from uiautodev.command_types import CurrentAppResponse
from uiautodev.model import Node, AppInfo, ShellResponse, WindowSize
from uiautodev.model import AppInfo, Node, ShellResponse, WindowSize


class BaseDriver(abc.ABC):
228 changes: 228 additions & 0 deletions uiautodev/driver/harmony.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
import time
import uuid
from pathlib import Path
from typing import List, Optional, Tuple, Union, final

from PIL import Image

from uiautodev.command_types import CurrentAppResponse
from uiautodev.driver.base_driver import BaseDriver
from uiautodev.model import AppInfo, Node, Rect, ShellResponse, WindowSize

logger = logging.getLogger(__name__)

StrOrPath = Union[str, Path]


def run_command(command: str, timeout: int = 60) -> str:
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
timeout=timeout,
text=True,
input='' # this avoid stdout: "FreeChannelContinue handle->data is nullptr"
)
# the hdc shell stderr is (不仅没啥用,还没办法去掉)
# Remote PTY will not be allocated because stdin is not a terminal.
# Use multiple -t options to force remote PTY allocation.
output = result.stdout.strip()
return output
except subprocess.TimeoutExpired as e:
raise TimeoutError(f"{command:r} timeout {e}")


class HDCError(Exception):
pass


class HDC:
def __init__(self):
self.hdc = 'hdc'
self.tmpdir = tempfile.TemporaryDirectory()

def __del__(self):
self.tmpdir.cleanup()

def list_device(self) -> List[str]:
command = f"{self.hdc} list targets"
result = run_command(command)
if result and not "Empty" in result:
devices = []
for line in result.strip().split("\n"):
if '\t' in line:
serial, state = line.strip().split('\t', 1)
if state == 'device':
devices.append(serial)
else:
logger.warning(f"{serial} is {state}")
return devices
else:
return []

def shell(self, serial: str, command: str) -> str:
command = f"{self.hdc} -t {serial} shell \"{command}\""
result = run_command(command)
return result.strip()

def get_model(self, serial: str) -> str:
return self.shell(serial, "param get const.product.model")

def pull(self, serial: str, remote: StrOrPath, local: StrOrPath):
if isinstance(remote, Path):
remote = remote.as_posix()
command = f"{self.hdc} -t {serial} file recv {remote} {local}"
output = run_command(command)
if not os.path.exists(local):
raise HDCError(f"device file: {remote} not found", output)

def push(self, serial: str, local: StrOrPath, remote: StrOrPath) -> str:
if isinstance(remote, Path):
remote = remote.as_posix()
command = f"{self.hdc} -t {serial} file send {local} {remote}"
return run_command(command)

def screenshot(self, serial: str) -> Image.Image:
device_path = f'/data/local/tmp/screenshot-{int(time.time()*1000)}.png'
self.shell(serial, f"uitest screenCap -p {device_path}")
try:
local_path = os.path.join(self.tmpdir.name, f"{uuid.uuid4()}.png")
self.pull(serial, device_path, local_path)
with Image.open(local_path) as image:
image.load()
return image
finally:
self.shell(serial, f"rm {device_path}")

def dump_layout(self, serial: str) -> dict:
name = "{}.json".format(int(time.time() * 1000))
remote_path = f"/data/local/tmp/layout-{name}.json"
temp_path = os.path.join(self.tmpdir.name, f"layout-{name}.json")
output = self.shell(serial, f"uitest dumpLayout -p {remote_path}")
self.pull(serial, remote_path, temp_path)
# mock
# temp_path = Path(__file__).parent / 'testdata/layout.json'
try:
with open(temp_path, "rb") as f:
json_content = json.load(f)
return json_content
except json.JSONDecodeError:
raise HDCError(f"failed to dump layout: {output}")
finally:
self.shell(serial, f"rm {remote_path}")


class HarmonyDriver(BaseDriver):
def __init__(self, hdc: HDC, serial: str):
super().__init__(serial)
self.hdc = hdc

def screenshot(self, id: int = 0) -> Image.Image:
return self.hdc.screenshot(self.serial)

def window_size(self) -> WindowSize:
result = self.hdc.shell(self.serial, "hidumper -s 10 -a screen")
pattern = r"activeMode:\s*(\d+x\d+)"
match = re.search(pattern, result)
if match:
resolution = match.group(1).split("x")
return WindowSize(width=int(resolution[0]), height=int(resolution[1]))
else:
image = self.screenshot()
return WindowSize(width=image.width, height=image.height)

def dump_hierarchy(self) -> Tuple[str, Node]:
"""returns xml string and hierarchy object"""
layout = self.hdc.dump_layout(self.serial)
return json.dumps(layout), parse_json_element(layout)

def tap(self, x: int, y: int):
self.hdc.shell(self.serial, f"uinput -T -c {x} {y}")

def app_current(self) -> Optional[CurrentAppResponse]:
echo = self.hdc.shell(self.serial, "hidumper -s WindowManagerService -a '-a'")
focus_window = re.search(r"Focus window: (\d+)", echo)
if focus_window:
focus_window = focus_window.group(1)
mission_echo = self.hdc.shell(self.serial, "aa dump -a")
pkg_names = re.findall(r"Mission ID #(\d+)\s+mission name #\[(.*?)\]", mission_echo)
if focus_window and pkg_names:
for mission in pkg_names:
mission_id = mission[0]
if focus_window == mission_id:
mission_name = mission[1]
pkg_name = mission_name.split(":")[0].replace("#", "")
ability_name = mission_name.split(":")[-1]
pid = self.hdc.shell(self.serial, f"pidof {pkg_name}").strip()
return CurrentAppResponse(package=pkg_name, activity=ability_name, pid=int(pid))
else:
return None

def shell(self, command: str) -> ShellResponse:
result = self.hdc.shell(self.serial, command)
return ShellResponse(output=result)

def home(self):
self.hdc.shell(self.serial, "uinput -K -d 1 -u 1")

def back(self):
self.hdc.shell(self.serial, "uinput -K -d 2 -u 2")

def volume_up(self):
self.hdc.shell(self.serial, "uinput -K -d 16 -u 16")

def volume_down(self):
self.hdc.shell(self.serial, "uinput -K -d 17 -u 17")

def volume_mute(self):
self.hdc.shell(self.serial, "uinput -K -d 22 -u 22")

def app_switch(self):
self.hdc.shell(self.serial, "uinput -K -d 2076 -d 2049 -u 2076 -u 2049")

def app_list(self) -> List[AppInfo]:
results = []
output = self.hdc.shell(self.serial, "bm dump -a")
for i in output.split("\n"):
if "ID" in i:
continue
else:
results.append(AppInfo(packageName=i.strip()))
return results


def parse_json_element(element, indexes: List[int] = [0]) -> Node:
"""
Recursively parse an json element into a dictionary format.
"""
attributes = element.get("attributes", {})
name = attributes.get("type", "")
bounds = attributes.get("bounds", "")
bounds = list(map(int, re.findall(r"\d+", bounds)))
assert len(bounds) == 4
rect = Rect(x=bounds[0], y=bounds[1], width=bounds[2] - bounds[0], height=bounds[3] - bounds[1])
elem = Node(
key="-".join(map(str, indexes)),
name=name,
bounds=None,
rect=rect,
properties={key: attributes[key] for key in attributes},
children=[],
)
# Construct xpath for children
for index, child in enumerate(element.get("children", [])):
child_node = parse_json_element(child, indexes + [index])
if child_node:
elem.children.append(child_node)

return elem
1 change: 1 addition & 0 deletions uiautodev/driver/testdata/layout.json

Large diffs are not rendered by default.

21 changes: 18 additions & 3 deletions uiautodev/provider.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@

from uiautodev.driver.android import AndroidDriver
from uiautodev.driver.base_driver import BaseDriver
from uiautodev.driver.harmony import HarmonyDriver, HDC
from uiautodev.driver.ios import IOSDriver
from uiautodev.driver.mock import MockDriver
from uiautodev.exceptions import UiautoException
@@ -27,7 +28,7 @@ def list_devices(self) -> list[DeviceInfo]:
@abc.abstractmethod
def get_device_driver(self, serial: str) -> BaseDriver:
raise NotImplementedError()

def get_single_device_driver(self) -> BaseDriver:
""" debug use """
devs = self.list_devices()
@@ -66,11 +67,25 @@ def list_devices(self) -> list[DeviceInfo]:
@lru_cache
def get_device_driver(self, serial: str) -> BaseDriver:
return IOSDriver(serial)



class HarmonyProvider(BaseProvider):
def __init__(self):
super().__init__()
self.hdc = HDC()

def list_devices(self) -> list[DeviceInfo]:
devices = self.hdc.list_device()
return [DeviceInfo(serial=d, model=self.hdc.get_model(d), name=self.hdc.get_model(d)) for d in devices]

@lru_cache
def get_device_driver(self, serial: str) -> HarmonyDriver:
return HarmonyDriver(self.hdc, serial)


class MockProvider(BaseProvider):
def list_devices(self) -> list[DeviceInfo]:
return [DeviceInfo(serial="mock-serial", model="mock-model", name="mock-name")]

def get_device_driver(self, serial: str) -> BaseDriver:
return MockDriver(serial)
return MockDriver(serial)
18 changes: 11 additions & 7 deletions uiautodev/utils/common.py
Original file line number Diff line number Diff line change
@@ -5,11 +5,12 @@
import platform
import re
import socket
import subprocess
import sys
import typing
import uuid
from http.client import HTTPConnection, HTTPResponse
from typing import Optional, TypeVar, Union
from typing import List, Optional, TypeVar, Union

from pydantic import BaseModel
from pygments import formatters, highlight, lexers
@@ -61,10 +62,11 @@ def print_json(buf, colored=None, default=default_json_encoder):
print(colorful_json)
else:
print(formatted_json)


_T = TypeVar("_T")


def convert_to_type(value: str, _type: _T) -> _T:
""" usage example:
convert_to_type("123", int)
@@ -78,9 +80,9 @@ def convert_to_type(value: str, _type: _T) -> _T:
if _type == re.Pattern:
return re.compile(value)
raise NotImplementedError(f"convert {value} to {_type}")


def convert_params_to_model(params: list[str], model: BaseModel) -> BaseModel:

def convert_params_to_model(params: List[str], model: BaseModel) -> BaseModel:
""" used in cli.py """
assert len(params) > 0
if len(params) == 1:
@@ -114,7 +116,7 @@ class SocketHTTPConnection(HTTPConnection):
def __init__(self, conn: socket.socket, timeout: float):
super().__init__("localhost", timeout=timeout)
self.__conn = conn

def connect(self):
self.sock = self.__conn

@@ -131,7 +133,8 @@ def connect(self):
self.sock.settimeout(self.timeout)


def fetch_through_socket(sock: socket.socket, path: str, method: str = "GET", json: Optional[dict] = None, timeout: float = 60) -> bytearray:
def fetch_through_socket(sock: socket.socket, path: str, method: str = "GET", json: Optional[dict] = None,
timeout: float = 60) -> bytearray:
""" usage example:
with socket.create_connection((host, port)) as s:
request_through_socket(s, "GET", "/")
@@ -163,4 +166,5 @@ def node_travel(node: Node, dfs: bool = True):
for child in node.children:
yield from node_travel(child, dfs)
if dfs:
yield node
yield node

2 changes: 1 addition & 1 deletion uiautodev/utils/envutils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import os


def is_enabled(name: str) -> bool: