From 3ec9b03cadb469f2d3b8fe9c98876867812ea9b4 Mon Sep 17 00:00:00 2001 From: Daniel Toyama Date: Fri, 18 Aug 2023 14:07:10 -0700 Subject: [PATCH] Replace more cascading `if` with `match`. Another CL to cleanup a few conditionals with a cleaner `match` block. PiperOrigin-RevId: 558242072 --- android_env/components/adb_call_parser.py | 259 ++++++++++++---------- android_env/components/coordinator.py | 32 +-- android_env/components/utils.py | 25 ++- 3 files changed, 171 insertions(+), 145 deletions(-) diff --git a/android_env/components/adb_call_parser.py b/android_env/components/adb_call_parser.py index 5131140..da94298 100644 --- a/android_env/components/adb_call_parser.py +++ b/android_env/components/adb_call_parser.py @@ -261,24 +261,25 @@ def _install_apk( location_type = install_apk.WhichOneof('location') logging.info('location_type: %s', location_type) - if location_type == 'filesystem': - fpath = install_apk.filesystem.path - if not os.path.exists(fpath): - response.status = adb_pb2.AdbResponse.Status.INTERNAL_ERROR - response.error_message = f'Could not find local_apk_path: {fpath}' + match location_type: + case 'filesystem': + fpath = install_apk.filesystem.path + if not os.path.exists(fpath): + response.status = adb_pb2.AdbResponse.Status.INTERNAL_ERROR + response.error_message = f'Could not find local_apk_path: {fpath}' + return response + case 'blob': + with tempfile.NamedTemporaryFile( + dir=self._tmp_dir, suffix='.apk', delete=False + ) as f: + fpath = f.name + f.write(install_apk.blob.contents) + case _: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = ( + f'Unsupported `install_apk.location` type: {location_type}' + ) return response - elif location_type == 'blob': - with tempfile.NamedTemporaryFile( - dir=self._tmp_dir, suffix='.apk', delete=False - ) as f: - fpath = f.name - f.write(install_apk.blob.contents) - else: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ( - f'Unsupported `install_apk.location` type: {location_type}' - ) - return response response, _ = self._execute_command( ['install', '-r', '-t', '-g', fpath], timeout=timeout @@ -647,64 +648,78 @@ def _handle_settings( namespace = adb_pb2.AdbRequest.SettingsRequest.Namespace.Name( request.name_space).lower() - verb = request.WhichOneof('verb') - if verb == 'get': - get = request.get - if not get.key: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ( - f'Empty SettingsRequest.get.key. Got: {request}.') - return response - response, command_output = self._execute_command( - ['shell', 'settings', 'get', namespace, get.key], timeout=timeout) - response.settings.output = command_output - elif verb == 'put': - put = request.put - if not put.key or not put.value: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ( - f'Empty SettingsRequest.put key or value. Got: {request}.') - return response - response, command_output = self._execute_command( - ['shell', 'settings', 'put', namespace, put.key, put.value], - timeout=timeout) - response.settings.output = command_output - elif verb == 'delete_key': - delete = request.delete_key - if not delete.key: + match request.WhichOneof('verb'): + case 'get': + get = request.get + if not get.key: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = ( + f'Empty SettingsRequest.get.key. Got: {request}.' + ) + return response + response, command_output = self._execute_command( + ['shell', 'settings', 'get', namespace, get.key], timeout=timeout + ) + response.settings.output = command_output + case 'put': + put = request.put + if not put.key or not put.value: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = ( + f'Empty SettingsRequest.put key or value. Got: {request}.' + ) + return response + response, command_output = self._execute_command( + ['shell', 'settings', 'put', namespace, put.key, put.value], + timeout=timeout, + ) + response.settings.output = command_output + case 'delete_key': + delete = request.delete_key + if not delete.key: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = ( + f'Empty SettingsRequest.delete_key.key. Got: {request}.' + ) + return response + response, command_output = self._execute_command( + ['shell', 'settings', 'delete', namespace, delete.key], + timeout=timeout, + ) + response.settings.output = command_output + case 'reset': + reset = request.reset + # At least one of `package_name` or `mode` should be given. + if ( + not reset.package_name + and reset.mode + == adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.UNKNOWN + ): + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = ( + 'At least one of SettingsRequest.reset package_name or mode' + f' should be given. Got: {request}.' + ) + return response + + mode = adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.Name( + reset.mode + ).lower() + arg = reset.package_name or mode + response, command_output = self._execute_command( + ['shell', 'settings', 'reset', namespace, arg], timeout=timeout + ) + response.settings.output = command_output + case 'list': + response, command_output = self._execute_command( + ['shell', 'settings', 'list', namespace], timeout=timeout + ) + response.settings.output = command_output + case _: response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION response.error_message = ( - f'Empty SettingsRequest.delete_key.key. Got: {request}.') - return response - response, command_output = self._execute_command( - ['shell', 'settings', 'delete', namespace, delete.key], - timeout=timeout) - response.settings.output = command_output - elif verb == 'reset': - reset = request.reset - # At least one of `package_name` or `mode` should be given. - if (not reset.package_name and - reset.mode == adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.UNKNOWN): - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ( - 'At least one of SettingsRequest.reset package_name or mode should ' - f'be given. Got: {request}.') - return response - - mode = adb_pb2.AdbRequest.SettingsRequest.Reset.Mode.Name( - reset.mode).lower() - arg = reset.package_name or mode - response, command_output = self._execute_command( - ['shell', 'settings', 'reset', namespace, arg], timeout=timeout) - response.settings.output = command_output - elif verb == 'list': - response, command_output = self._execute_command( - ['shell', 'settings', 'list', namespace], timeout=timeout) - response.settings.output = command_output - else: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ( - f'Unknown SettingsRequest.verb. Got: {request}.') + f'Unknown SettingsRequest.verb. Got: {request}.' + ) return response @@ -744,54 +759,58 @@ def _handle_package_manager( request = request.package_manager response = adb_pb2.AdbResponse() - verb = request.WhichOneof('verb') - if verb == 'list': - what = request.list.WhichOneof('what') - response, output = self._execute_command(['shell', 'pm', 'list', what], - timeout=timeout) - - if output: - items = output.decode('utf-8').split() - # Remove prefix for each item. - prefix = { - 'features': 'feature:', - 'libraries': 'library:', - 'packages': 'package:', - }[what] - items = [x[len(prefix):] for x in items if x.startswith(prefix)] - response.package_manager.list.items.extend(items) - response.package_manager.output = output - elif verb == 'clear': - package_name = request.clear.package_name - if not package_name: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ( - f'Empty PackageManagerRequest.clear.package_name. Got: {request}.') - return response - - args = ['shell', 'pm', 'clear', package_name] - if request.clear.user_id: - args.insert(3, '-f') - args.insert(4, request.clear.user_id) - response, response.package_manager.output = self._execute_command( - args, timeout=timeout) - elif verb == 'grant': - grant = request.grant - if not grant.package_name: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ('`grant.package_name` cannot be empty.') - return response - - if not grant.permissions: - response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION - response.error_message = ('`grant.permissions` cannot be empty.') - return response - - for permission in grant.permissions: - logging.info('Granting permission: %r', permission) + match request.WhichOneof('verb'): + case 'list': + what = request.list.WhichOneof('what') + response, output = self._execute_command( + ['shell', 'pm', 'list', what], timeout=timeout + ) + + if output: + items = output.decode('utf-8').split() + # Remove prefix for each item. + prefix = { + 'features': 'feature:', + 'libraries': 'library:', + 'packages': 'package:', + }[what] + items = [x[len(prefix) :] for x in items if x.startswith(prefix)] + response.package_manager.list.items.extend(items) + response.package_manager.output = output + case 'clear': + package_name = request.clear.package_name + if not package_name: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = ( + f'Empty PackageManagerRequest.clear.package_name. Got: {request}.' + ) + return response + + args = ['shell', 'pm', 'clear', package_name] + if request.clear.user_id: + args.insert(3, '-f') + args.insert(4, request.clear.user_id) response, response.package_manager.output = self._execute_command( - ['shell', 'pm', 'grant', grant.package_name, permission], - timeout=timeout) + args, timeout=timeout + ) + case 'grant': + grant = request.grant + if not grant.package_name: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = '`grant.package_name` cannot be empty.' + return response + + if not grant.permissions: + response.status = adb_pb2.AdbResponse.Status.FAILED_PRECONDITION + response.error_message = '`grant.permissions` cannot be empty.' + return response + + for permission in grant.permissions: + logging.info('Granting permission: %r', permission) + response, response.package_manager.output = self._execute_command( + ['shell', 'pm', 'grant', grant.package_name, permission], + timeout=timeout, + ) return response diff --git a/android_env/components/coordinator.py b/android_env/components/coordinator.py index f421448..c309239 100644 --- a/android_env/components/coordinator.py +++ b/android_env/components/coordinator.py @@ -387,20 +387,24 @@ def _send_action_to_simulator(self, action: dict[str, np.ndarray]) -> None: """ try: - # If the action is a TOUCH or LIFT, send a touch event to the simulator. - if (action['action_type'] == action_type_lib.ActionType.TOUCH or - action['action_type'] == action_type_lib.ActionType.LIFT): - prepared_action = self._prepare_touch_action(action) - self._simulator.send_touch(prepared_action) - # If the action is a key event, send a key event to the simulator. - elif action['action_type'] == action_type_lib.ActionType.KEYDOWN: - self._simulator.send_key( - action['keycode'].item(0), event_type='keydown' - ) - elif action['action_type'] == action_type_lib.ActionType.KEYUP: - self._simulator.send_key(action['keycode'].item(0), event_type='keyup') - elif action['action_type'] == action_type_lib.ActionType.KEYPRESS: - self._simulator.send_key(action['keycode'].item(0), event_type='keypress') + match action['action_type']: + # If the action is a TOUCH or LIFT, send a touch event to the simulator. + case action_type_lib.ActionType.TOUCH | action_type_lib.ActionType.LIFT: + prepared_action = self._prepare_touch_action(action) + self._simulator.send_touch(prepared_action) + # If the action is a key event, send a key event to the simulator. + case action_type_lib.ActionType.KEYDOWN: + self._simulator.send_key( + action['keycode'].item(0), event_type='keydown' + ) + case action_type_lib.ActionType.KEYUP: + self._simulator.send_key( + action['keycode'].item(0), event_type='keyup' + ) + case action_type_lib.ActionType.KEYPRESS: + self._simulator.send_key( + action['keycode'].item(0), event_type='keypress' + ) except (socket.error, errors.SendActionError): logging.exception('Unable to execute action. Restarting simulator.') self._stats['relaunch_count_execute_action'] += 1 diff --git a/android_env/components/utils.py b/android_env/components/utils.py index ba18b19..8f630c0 100644 --- a/android_env/components/utils.py +++ b/android_env/components/utils.py @@ -39,17 +39,20 @@ def transpose_pixels(frame: np.ndarray) -> np.ndarray: def orient_pixels(frame: np.ndarray, orientation: int) -> np.ndarray: """Rotates screen pixels according to the given orientation.""" - if orientation == 0: # PORTRAIT_90 - return frame - elif orientation == 1: # LANDSCAPE_90 - return np.rot90(frame, k=3, axes=(0, 1)) - elif orientation == 2: # PORTRAIT_180 - return np.rot90(frame, k=2, axes=(0, 1)) - elif orientation == 3: # LANDSCAPE_270 - return np.rot90(frame, k=1, axes=(0, 1)) - else: - raise ValueError( - 'Orientation must be an integer in [0, 3] but is %r' % orientation) + + match orientation: + case 0: # PORTRAIT_90 + return frame + case 1: # LANDSCAPE_90 + return np.rot90(frame, k=3, axes=(0, 1)) + case 2: # PORTRAIT_180 + return np.rot90(frame, k=2, axes=(0, 1)) + case 3: # LANDSCAPE_270 + return np.rot90(frame, k=1, axes=(0, 1)) + case _: + raise ValueError( + 'Orientation must be an integer in [0, 3] but is %r' % orientation + ) def convert_int_to_float(data: np.ndarray,