Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pydoll/browser/chromium/chrome.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def _get_default_binary_location():
'Linux': [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium',
'/usr/bin/chromium-browser',
],
'Darwin': [
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
Expand Down
208 changes: 147 additions & 61 deletions pydoll/browser/tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import base64 as _b64
import logging
import shutil
import warnings
from contextlib import asynccontextmanager
from functools import partial
from pathlib import Path
Expand Down Expand Up @@ -39,7 +40,6 @@
IFrameNotFound,
InvalidFileExtension,
InvalidIFrame,
InvalidScriptWithElement,
InvalidTabInitialization,
MissingScreenshotPath,
NetworkEventsNotEnabled,
Expand Down Expand Up @@ -67,12 +67,16 @@
from pydoll.protocol.page.events import FileChooserOpenedEvent, PageEvent
from pydoll.protocol.page.methods import CaptureScreenshotResponse, PrintToPDFResponse
from pydoll.protocol.page.types import ScreenshotFormat
from pydoll.protocol.runtime.methods import CallFunctionOnResponse, EvaluateResponse
from pydoll.protocol.runtime.methods import (
CallFunctionOnResponse,
EvaluateResponse,
SerializationOptions,
)
from pydoll.protocol.runtime.types import CallArgument
from pydoll.protocol.storage.methods import GetCookiesResponse
from pydoll.utils import (
decode_base64_to_bytes,
has_return_outside_function,
is_script_already_function,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -711,35 +715,158 @@ async def handle_dialog(self, accept: bool, prompt_text: Optional[str] = None):
)

@overload
async def execute_script(self, script: str) -> EvaluateResponse: ...
async def execute_script(
self,
script: str,
*,
object_group: Optional[str] = None,
include_command_line_api: Optional[bool] = None,
silent: Optional[bool] = None,
context_id: Optional[int] = None,
return_by_value: Optional[bool] = None,
generate_preview: Optional[bool] = None,
user_gesture: Optional[bool] = None,
await_promise: Optional[bool] = None,
throw_on_side_effect: Optional[bool] = None,
timeout: Optional[float] = None,
disable_breaks: Optional[bool] = None,
repl_mode: Optional[bool] = None,
allow_unsafe_eval_blocked_by_csp: Optional[bool] = None,
unique_context_id: Optional[str] = None,
serialization_options: Optional[SerializationOptions] = None,
) -> EvaluateResponse: ...

@overload
async def execute_script(self, script: str, element: WebElement) -> CallFunctionOnResponse: ...
async def execute_script(
self,
script: str,
element: WebElement,
*,
arguments: Optional[list[CallArgument]] = None,
silent: Optional[bool] = None,
return_by_value: Optional[bool] = None,
generate_preview: Optional[bool] = None,
user_gesture: Optional[bool] = None,
await_promise: Optional[bool] = None,
execution_context_id: Optional[int] = None,
object_group: Optional[str] = None,
throw_on_side_effect: Optional[bool] = None,
unique_context_id: Optional[str] = None,
serialization_options: Optional[SerializationOptions] = None,
) -> CallFunctionOnResponse: ...

async def execute_script(
self, script: str, element: Optional[WebElement] = None
self,
script: str,
element: Optional[WebElement] = None,
*,
arguments: Optional[list[CallArgument]] = None,
object_group: Optional[str] = None,
include_command_line_api: Optional[bool] = None,
silent: Optional[bool] = None,
context_id: Optional[int] = None,
return_by_value: Optional[bool] = None,
generate_preview: Optional[bool] = None,
user_gesture: Optional[bool] = None,
await_promise: Optional[bool] = None,
execution_context_id: Optional[int] = None,
throw_on_side_effect: Optional[bool] = None,
timeout: Optional[float] = None,
disable_breaks: Optional[bool] = None,
repl_mode: Optional[bool] = None,
allow_unsafe_eval_blocked_by_csp: Optional[bool] = None,
unique_context_id: Optional[str] = None,
serialization_options: Optional[SerializationOptions] = None,
) -> Union[EvaluateResponse, CallFunctionOnResponse]:
"""
Execute JavaScript in page context.

Args:
script: JavaScript code to execute.
element: Element context (use 'argument' in script to reference).
script (str): JavaScript code to execute.
element (Optional[WebElement]): Optional WebElement to execute script on.
arguments (Optional[list[CallArgument]]): Arguments to pass to the function.
object_group (Optional[str]): Symbolic group name for the result (Runtime.evaluate).
include_command_line_api (Optional[bool]): Whether to include command line API
(Runtime.evaluate).
silent (Optional[bool]): Whether to silence exceptions (Runtime.evaluate).
context_id (Optional[int]): ID of the execution context to evaluate in
(Runtime.evaluate).
return_by_value (Optional[bool]): Whether to return the result by value instead of
reference (Runtime.evaluate).
generate_preview (Optional[bool]): Whether to generate a preview for the result
(Runtime.evaluate).
user_gesture (Optional[bool]): Whether to treat evaluation as initiated by user
gesture (Runtime.evaluate).
await_promise (Optional[bool]): Whether to await promise result (Runtime.evaluate).
execution_context_id (Optional[int]): ID of the execution context to call the
function in.
throw_on_side_effect (Optional[bool]): Whether to throw if side effect cannot be
ruled out (Runtime.evaluate).
timeout (Optional[float]): Timeout in milliseconds (Runtime.evaluate).
disable_breaks (Optional[bool]): Whether to disable breakpoints during evaluation
(Runtime.evaluate).
repl_mode (Optional[bool]): Whether to execute in REPL mode (Runtime.evaluate).
allow_unsafe_eval_blocked_by_csp (Optional[bool]): Allow unsafe evaluation
(Runtime.evaluate).
unique_context_id (Optional[str]): Unique context ID for evaluation
(Runtime.evaluate).
serialization_options (Optional[SerializationOptions]): Serialization options for
the result (Runtime.evaluate).

Examples:
await page.execute_script('argument.click()', element)
await page.execute_script('argument.value = "Hello"', element)
Returns:
Union[EvaluateResponse, CallFunctionOnResponse]: The result of the script execution.

Raises:
InvalidScriptWithElement: If script contains 'argument' but no element is provided.
Examples:
await page.execute_script('console.log("Hello World")')
await page.execute_script('return document.title')
"""
if 'argument' in script and element is None:
raise InvalidScriptWithElement('Script contains "argument" but no element was provided')

logger.debug(f'Executing script: with_element={bool(element)}, length={len(script)}')
if element:
return await self._execute_script_with_element(script, element)
return await self._execute_script_without_element(script)
if element is not None:
warnings.warn(
'Passing a WebElement to Tab.execute_script() is deprecated. '
'Use WebElement.execute_script() instead.',
DeprecationWarning,
stacklevel=2,
)

return await element.execute_script(
script,
arguments=arguments,
silent=silent,
return_by_value=return_by_value,
generate_preview=generate_preview,
user_gesture=user_gesture,
await_promise=await_promise,
execution_context_id=execution_context_id,
object_group=object_group,
throw_on_side_effect=throw_on_side_effect,
unique_context_id=unique_context_id,
serialization_options=serialization_options,
)

if has_return_outside_function(script):
script = f'(function(){{ {script} }})()'

command = RuntimeCommands.evaluate(
expression=script,
object_group=object_group,
include_command_line_api=include_command_line_api,
silent=silent,
context_id=context_id,
return_by_value=return_by_value,
generate_preview=generate_preview,
user_gesture=user_gesture,
await_promise=await_promise,
throw_on_side_effect=throw_on_side_effect,
timeout=timeout,
disable_breaks=disable_breaks,
repl_mode=repl_mode,
allow_unsafe_eval_blocked_by_csp=allow_unsafe_eval_blocked_by_csp,
unique_context_id=unique_context_id,
serialization_options=serialization_options,
)
logger.debug(f'Executing script without element: length={len(script)}')
return await self._execute_command(command)

# TODO: think about how to remove these duplications with the base class
async def continue_request(
Expand Down Expand Up @@ -1115,47 +1242,6 @@ def _get_connection_handler(self) -> ConnectionHandler:
)
return ConnectionHandler(self._connection_port, self._target_id)

async def _execute_script_with_element(self, script: str, element: WebElement):
"""
Execute script with element context.

Args:
script: JavaScript code to execute.
element: Element context (use 'argument' in script to reference).

Returns:
The result of the script execution.
"""
if 'argument' not in script:
raise InvalidScriptWithElement('Script does not contain "argument"')

script = script.replace('argument', 'this')

if not is_script_already_function(script):
script = f'function(){{ {script} }}'

command = RuntimeCommands.call_function_on(
object_id=element._object_id, function_declaration=script, return_by_value=True
)
return await self._execute_command(command)

async def _execute_script_without_element(self, script: str):
"""
Execute script without element context.

Args:
script: JavaScript code to execute.

Returns:
The result of the script execution.
"""
if has_return_outside_function(script):
script = f'(function(){{ {script} }})()'

command = RuntimeCommands.evaluate(expression=script)
logger.debug(f'Executing script without element: length={len(script)}')
return await self._execute_command(command)

async def _refresh_if_url_not_changed(self, url: str) -> bool:
"""Refresh page if URL hasn't changed."""
current_url = await self.current_url
Expand Down Expand Up @@ -1200,7 +1286,7 @@ async def _bypass_cloudflare(
element = cast(WebElement, element)
if element:
# adjust the external div size to shadow root width (usually 300px)
await self.execute_script('argument.style="width: 300px"', element)
await element.execute_script('this.style="width: 300px"')
await asyncio.sleep(time_before_click)
await element.click()
except Exception as exc:
Expand Down
89 changes: 80 additions & 9 deletions pydoll/elements/web_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@
)
from pydoll.protocol.page.methods import CaptureScreenshotResponse
from pydoll.protocol.page.types import ScreenshotFormat, Viewport
from pydoll.protocol.runtime.methods import GetPropertiesResponse
from pydoll.protocol.runtime.methods import (
CallFunctionOnResponse,
GetPropertiesResponse,
SerializationOptions,
)
from pydoll.protocol.runtime.types import CallArgument
from pydoll.utils import (
decode_base64_to_bytes,
extract_text_from_html,
is_script_already_function,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -520,22 +526,87 @@ async def is_interactable(self):
result = await self.execute_script(Scripts.ELEMENT_INTERACTIVE, return_by_value=True)
return result['result']['result']['value']

async def execute_script(self, script: str, return_by_value: bool = False):
async def execute_script(
self,
script: str,
*,
arguments: Optional[list[CallArgument]] = None,
silent: Optional[bool] = None,
return_by_value: Optional[bool] = None,
generate_preview: Optional[bool] = None,
user_gesture: Optional[bool] = None,
await_promise: Optional[bool] = None,
execution_context_id: Optional[int] = None,
object_group: Optional[str] = None,
throw_on_side_effect: Optional[bool] = None,
unique_context_id: Optional[str] = None,
serialization_options: Optional[SerializationOptions] = None,
) -> CallFunctionOnResponse:
"""
Execute JavaScript in element context.

Element is available as 'this' within the script.
Args:
script (str): JavaScript code to execute. Use 'this' to reference this element.
arguments (Optional[list[CallArgument]]): Arguments to pass to the function
(Runtime.callFunctionOn).
silent (Optional[bool]): Whether to silence exceptions (Runtime.callFunctionOn).
return_by_value (Optional[bool]): Whether to return the result by value instead of
reference (Runtime.callFunctionOn).
generate_preview (Optional[bool]): Whether to generate a preview for the result
(Runtime.callFunctionOn).
user_gesture (Optional[bool]): Whether to treat the call as initiated by user
gesture (Runtime.callFunctionOn).
await_promise (Optional[bool]): Whether to await promise result
(Runtime.callFunctionOn).
execution_context_id (Optional[int]): ID of the execution context to call the
function in (Runtime.callFunctionOn).
object_group (Optional[str]): Symbolic group name for the result
(Runtime.callFunctionOn).
throw_on_side_effect (Optional[bool]): Whether to throw if side effect cannot be
ruled out (Runtime.callFunctionOn).
unique_context_id (Optional[str]): Unique context ID for the function call
(Runtime.callFunctionOn).
serialization_options (Optional[SerializationOptions]): Serialization options for
the result (Runtime.callFunctionOn).

Returns:
CallFunctionOnResponse: The result of the script execution.

Examples:
# Click the element
await element.execute_script('this.click()')

# Modify element style
await element.execute_script('this.style.border = "2px solid red"')

# Get element text
result = await element.execute_script('return this.textContent', return_by_value=True)

# Set element content
await element.execute_script('this.textContent = "Hello World"')
"""
if not is_script_already_function(script):
script = f'function(){{ {script} }}'

logger.debug(
f'Executing script on element: return_by_value={return_by_value}, length={len(script)}'
)
return await self._execute_command(
RuntimeCommands.call_function_on(
object_id=self._object_id,
function_declaration=script,
return_by_value=return_by_value,
)
command = RuntimeCommands.call_function_on(
function_declaration=script,
object_id=self._object_id,
arguments=arguments,
silent=silent,
return_by_value=return_by_value,
generate_preview=generate_preview,
user_gesture=user_gesture,
await_promise=await_promise,
execution_context_id=execution_context_id,
object_group=object_group,
throw_on_side_effect=throw_on_side_effect,
unique_context_id=unique_context_id,
serialization_options=serialization_options,
)
return await self._execute_command(command)

async def _get_family_elements(
self, script: str, max_depth: int = 1, tag_filter: list[str] = []
Expand Down
Loading