|
1 | 1 | """GitHub specific code"""
|
2 | 2 |
|
3 | 3 | import json
|
4 |
| -from datetime import datetime, timezone |
5 |
| -from typing import Final, TypedDict |
| 4 | +from datetime import datetime |
| 5 | +from typing import Final, Literal, TypedDict |
6 | 6 |
|
7 | 7 | import requests
|
| 8 | +from pydantic import BaseModel, Field, TypeAdapter, ValidationError |
| 9 | + |
| 10 | +PermARW = None | Literal["admin", "read", "write"] |
| 11 | +PermRW = None | Literal["read", "write"] |
| 12 | +PermR = None | Literal["read"] |
| 13 | +PermW = None | Literal["write"] |
8 | 14 |
|
9 | 15 |
|
10 | 16 | class TokenResponse(TypedDict, total=False):
|
@@ -32,6 +38,89 @@ class NotInstalledError(Exception):
|
32 | 38 | """The GitHub App isn't installed in the specified account"""
|
33 | 39 |
|
34 | 40 |
|
| 41 | +class GitHubErrors(BaseModel): |
| 42 | + """ |
| 43 | + https://docs.github.com/en/rest/overview/resources-in-the-rest-api?apiVersion=2022-11-28 |
| 44 | + """ |
| 45 | + |
| 46 | + message: str |
| 47 | + |
| 48 | + |
| 49 | +class AccountInfo(BaseModel): |
| 50 | + """Part of Installation""" |
| 51 | + |
| 52 | + login: str = Field( |
| 53 | + max_length=39, pattern=r"^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?$" |
| 54 | + ) |
| 55 | + |
| 56 | + |
| 57 | +class Installation(BaseModel): |
| 58 | + """ |
| 59 | + https://docs.github.com/en/rest/apps/apps?apiVersion=2022-11-28#list-installations-for-the-authenticated-app |
| 60 | + """ |
| 61 | + |
| 62 | + id: int |
| 63 | + account: AccountInfo |
| 64 | + |
| 65 | + |
| 66 | +class TokenPermissions(BaseModel): |
| 67 | + """Part of AccessToken""" |
| 68 | + |
| 69 | + # Repository permissions |
| 70 | + actions: PermRW = None |
| 71 | + administration: PermRW = None |
| 72 | + checks: PermRW = None |
| 73 | + contents: PermRW = None |
| 74 | + deployments: PermRW = None |
| 75 | + environments: PermRW = None |
| 76 | + issues: PermRW = None |
| 77 | + metadata: PermRW = None |
| 78 | + packages: PermRW = None |
| 79 | + pages: PermRW = None |
| 80 | + pull_requests: PermRW = None |
| 81 | + repository_hooks: PermRW = None |
| 82 | + repository_projects: PermARW = None |
| 83 | + secret_scanning_alerts: PermRW = None |
| 84 | + secrets: PermRW = None |
| 85 | + security_events: PermRW = None |
| 86 | + single_file: PermRW = None |
| 87 | + statuses: PermRW = None |
| 88 | + vulnerability_alerts: PermRW = None |
| 89 | + workflows: PermW = None |
| 90 | + # Organizational permissions |
| 91 | + members: PermRW = None |
| 92 | + organization_administration: PermRW = None |
| 93 | + organization_custom_roles: PermRW = None |
| 94 | + organization_announcement_banners: PermRW = None |
| 95 | + organization_hooks: PermRW = None |
| 96 | + organization_personal_access_tokens: PermRW = None |
| 97 | + organization_personal_access_token_requests: PermRW = None |
| 98 | + organization_plan: PermR = None |
| 99 | + organization_projects: PermARW = None |
| 100 | + organization_packages: PermRW = None |
| 101 | + organization_secrets: PermRW = None |
| 102 | + organization_self_hosted_runners: PermRW = None |
| 103 | + organization_user_blocking: PermRW = None |
| 104 | + team_discussions: PermRW = None |
| 105 | + |
| 106 | + |
| 107 | +class Repository(BaseModel): |
| 108 | + """Part of AccessToken""" |
| 109 | + |
| 110 | + name: str = Field(max_length=100, pattern=r"^[a-zA-Z0-9_\-\.]+$") |
| 111 | + |
| 112 | + |
| 113 | +class AccessToken(BaseModel): |
| 114 | + """ |
| 115 | + https://docs.github.com/en/rest/apps/apps?apiVersion=2022-11-28#create-an-installation-access-token-for-an-app |
| 116 | + """ |
| 117 | + |
| 118 | + token: str |
| 119 | + expires_at: datetime |
| 120 | + permissions: TokenPermissions |
| 121 | + repositories: None | list[Repository] = None |
| 122 | + |
| 123 | + |
35 | 124 | class GitHubApp:
|
36 | 125 | """GitHub App Access Tokens, etc"""
|
37 | 126 |
|
@@ -68,16 +157,23 @@ def __find_installation(self) -> str:
|
68 | 157 | )
|
69 | 158 | response.raise_for_status()
|
70 | 159 | except requests.exceptions.HTTPError as http_error:
|
71 |
| - error_message: str |
72 | 160 | try:
|
73 |
| - error_message = http_error.response.json()["message"] |
| 161 | + errors_bm = GitHubErrors(**http_error.response.json()) |
| 162 | + error_message = errors_bm.message |
74 | 163 | except Exception: # pylint: disable=broad-exception-caught
|
75 | 164 | error_message = "<Failed to parse GitHub API error response>"
|
76 | 165 | raise InstallationLookupError(error_message) from http_error
|
77 | 166 |
|
78 |
| - for installation in response.json(): |
79 |
| - if installation["account"]["login"].lower() == self.account.lower(): |
80 |
| - return str(installation["id"]) |
| 167 | + try: |
| 168 | + ita = TypeAdapter(list[Installation]) |
| 169 | + installations = ita.validate_python(response.json()) |
| 170 | + except ValidationError as validation_error: |
| 171 | + error_message = "<Failed to parse Installations API response>" |
| 172 | + raise InstallationLookupError(error_message) from validation_error |
| 173 | + |
| 174 | + for installation in installations: |
| 175 | + if installation.account.login.lower() == self.account.lower(): |
| 176 | + return str(installation.id) |
81 | 177 |
|
82 | 178 | if "next" in response.links.keys():
|
83 | 179 | pagination_params["page"] += 1
|
@@ -125,26 +221,28 @@ def issue_token(
|
125 | 221 | )
|
126 | 222 | response.raise_for_status()
|
127 | 223 | except requests.exceptions.HTTPError as http_error:
|
128 |
| - error_message: str |
129 | 224 | try:
|
130 |
| - error_message = http_error.response.json()["message"] |
| 225 | + errors_bm = GitHubErrors(**http_error.response.json()) |
| 226 | + error_message = errors_bm.message |
131 | 227 | except Exception: # pylint: disable=broad-exception-caught
|
132 | 228 | error_message = "<Failed to parse GitHub API error response>"
|
133 | 229 | raise TokenIssueError(error_message) from http_error
|
134 | 230 |
|
135 |
| - expiry = datetime.strptime( |
136 |
| - response.json()["expires_at"], "%Y-%m-%dT%H:%M:%SZ" |
137 |
| - ).replace(tzinfo=timezone.utc) |
| 231 | + try: |
| 232 | + access_token_bm = AccessToken(**response.json()) |
| 233 | + except ValidationError as validation_error: |
| 234 | + error_message = "<Failed to parse Token Issue API response>" |
| 235 | + raise TokenIssueError(error_message) from validation_error |
138 | 236 |
|
139 | 237 | access_token: TokenResponse = {
|
140 |
| - "access_token": response.json()["token"], |
141 |
| - "expires_at": expiry, |
142 |
| - "permissions": response.json()["permissions"], |
| 238 | + "access_token": access_token_bm.token, |
| 239 | + "expires_at": access_token_bm.expires_at, |
| 240 | + "permissions": access_token_bm.permissions.model_dump(exclude_unset=True), |
143 | 241 | }
|
144 | 242 |
|
145 |
| - if "repositories" in response.json().keys(): |
| 243 | + if access_token_bm.repositories is not None: |
146 | 244 | access_token["repositories"] = sorted(
|
147 |
| - [repo["name"] for repo in response.json()["repositories"]] |
| 245 | + [repo.name for repo in access_token_bm.repositories] |
148 | 246 | )
|
149 | 247 |
|
150 | 248 | return access_token
|
0 commit comments