|
2 | 2 | # License, v. 2.0. If a copy of the MPL was not distributed with this
|
3 | 3 | # file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
4 | 4 |
|
| 5 | +import copy |
5 | 6 | from functools import cache
|
6 |
| -from typing import Any |
| 7 | +from typing import Any, Dict, List, Union |
7 | 8 |
|
8 | 9 | import requests
|
9 | 10 | import taskcluster
|
10 |
| -from taskgraph.util.taskcluster import get_ancestors |
11 |
| -from taskgraph.util.taskcluster import logging |
| 11 | +from taskgraph.util.taskcluster import logging, get_task_url, get_session |
| 12 | +from taskgraph.util.memoize import memoize |
12 | 13 |
|
13 | 14 | FIREFOXCI_ROOT_URL = "https://firefox-ci-tc.services.mozilla.com"
|
14 | 15 | STAGING_ROOT_URL = "https://stage.taskcluster.nonprod.cloudops.mozgcp.net"
|
15 | 16 |
|
| 17 | +def _do_request(url, method=None, **kwargs): |
| 18 | + if method is None: |
| 19 | + method = "post" if kwargs else "get" |
| 20 | + |
| 21 | + session = get_session() |
| 22 | + if method == "get": |
| 23 | + kwargs["stream"] = True |
| 24 | + |
| 25 | + logging.info("making request") |
| 26 | + logging.info(f"url: {url}") |
| 27 | + logging.info(f"kwargs: {kwargs}") |
| 28 | + response = getattr(session, method)(url, **kwargs) |
| 29 | + logging.info(f"response code: {response.status_code}") |
| 30 | + |
| 31 | + if response.status_code >= 400: |
| 32 | + # Consume content before raise_for_status, so that the connection can be |
| 33 | + # reused. |
| 34 | + response.content |
| 35 | + response.raise_for_status() |
| 36 | + return response |
| 37 | + |
| 38 | + |
| 39 | +@memoize |
| 40 | +def get_task_definition(task_id, use_proxy=False): |
| 41 | + url = get_task_url(task_id, use_proxy) |
| 42 | + logging.info(f"fetching url: {url}") |
| 43 | + response = _do_request(url) |
| 44 | + logging.info(f"got url: {url}") |
| 45 | + logging.info(f"response is: {response}") |
| 46 | + return response.json() |
| 47 | + |
| 48 | + |
| 49 | +@memoize |
| 50 | +def _get_deps(task_ids, use_proxy): |
| 51 | + upstream_tasks = {} |
| 52 | + for task_id in task_ids: |
| 53 | + logging.info(f"fetching dep: {task_id}") |
| 54 | + task_def = get_task_definition(task_id, use_proxy) |
| 55 | + logging.info(f"got task def: {task_def}") |
| 56 | + upstream_tasks[task_def["metadata"]["name"]] = task_id |
| 57 | + |
| 58 | + upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy)) |
| 59 | + |
| 60 | + return upstream_tasks |
| 61 | + |
| 62 | + |
| 63 | +def get_ancestors( |
| 64 | + task_ids: Union[List[str], str], use_proxy: bool = False |
| 65 | +) -> Dict[str, str]: |
| 66 | + """Gets the ancestor tasks of the given task_ids as a dictionary of label -> taskid. |
| 67 | +
|
| 68 | + Args: |
| 69 | + task_ids (str or [str]): A single task id or a list of task ids to find the ancestors of. |
| 70 | + use_proxy (bool): See get_root_url. |
| 71 | +
|
| 72 | + Returns: |
| 73 | + dict: A dict whose keys are task labels and values are task ids. |
| 74 | + """ |
| 75 | + upstream_tasks: Dict[str, str] = {} |
| 76 | + logging.info("A") |
| 77 | + |
| 78 | + if isinstance(task_ids, str): |
| 79 | + task_ids = [task_ids] |
| 80 | + |
| 81 | + for task_id in task_ids: |
| 82 | + logging.info(f"getting task_id: {task_id}") |
| 83 | + task_def = get_task_definition(task_id, use_proxy) |
| 84 | + logging.info(f"got task_id: {task_id}") |
| 85 | + |
| 86 | + upstream_tasks.update(_get_deps(tuple(task_def["dependencies"]), use_proxy)) |
| 87 | + |
| 88 | + return copy.deepcopy(upstream_tasks) |
16 | 89 |
|
17 | 90 | @cache
|
18 | 91 | def get_taskcluster_client(service: str):
|
|
0 commit comments