From b065da0d8fd48ef55a8a61177b7d69f4e87992d2 Mon Sep 17 00:00:00 2001 From: tuxuser <462620+tuxuser@users.noreply.github.com> Date: Thu, 30 Nov 2023 00:54:31 +0100 Subject: [PATCH] feat: (wip) OAuth2 device code --- xbox/webapi/authentication/manager.py | 56 +++++++++++++++++++++++++-- xbox/webapi/authentication/models.py | 25 +++++++++--- xbox/webapi/scripts/authenticate.py | 14 ++++++- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/xbox/webapi/authentication/manager.py b/xbox/webapi/authentication/manager.py index 1552e32..5fde514 100644 --- a/xbox/webapi/authentication/manager.py +++ b/xbox/webapi/authentication/manager.py @@ -3,12 +3,15 @@ Authenticate with Windows Live Server and Xbox Live. """ +import asyncio import logging -from typing import List, Optional +import time +from typing import Callable, List, Optional import httpx from xbox.webapi.authentication.models import ( + OAuth2DeviceCodeResponse, OAuth2TokenResponse, XAUResponse, XSTSResponse, @@ -101,7 +104,9 @@ async def refresh_oauth_token(self) -> OAuth2TokenResponse: } ) - async def _oauth2_token_request(self, data: dict) -> OAuth2TokenResponse: + async def _oauth2_token_request( + self, data: dict, raise_error: bool = True + ) -> OAuth2TokenResponse: """Execute token requests.""" data["client_id"] = self._client_id if self._client_secret: @@ -109,9 +114,52 @@ async def _oauth2_token_request(self, data: dict) -> OAuth2TokenResponse: resp = await self.session.post( "https://login.live.com/oauth20_token.srf", data=data ) - resp.raise_for_status() + + if raise_error: + resp.raise_for_status() return OAuth2TokenResponse(**resp.json()) + async def device_code_auth( + self, cb: Callable[[OAuth2DeviceCodeResponse], None] + ) -> OAuth2TokenResponse: + # HACK: Do not hardcode client id.. + self._client_id = "000000004C12AE6F" + data = { + "client_id": self._client_id, + "response_type": "device_code", + "scope": "service::user.auth.xboxlive.com::MBI_SSL", + } + resp = await self.session.post( + "https://login.live.com/oauth20_connect.srf", data=data + ) + resp.raise_for_status() + resp_model = OAuth2DeviceCodeResponse(**resp.json()) + + # Pass back device code response to the caller + cb(resp_model) + + polling_start = time.time() + authorization_resp = None + poll_data = { + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "device_code": resp_model.device_code, + } + + log.info(f"Waiting {resp_model.expires_in} seconds for user authorization") + while time.time() - polling_start < resp_model.expires_in: + resp = await self._oauth2_token_request(poll_data, raise_error=False) + # Check if response body contains token aka. user has authorized + # Then this polling-loop can be exited + if resp.access_token: + authorization_resp = resp + break + await asyncio.sleep(resp_model.interval) + + if not authorization_resp: + raise Exception("Waiting time for user authorization expired") + + return authorization_resp + async def request_user_token( self, relying_party: str = "http://auth.xboxlive.com", @@ -153,7 +201,7 @@ async def request_xsts_token( resp = await self.session.post(url, json=data, headers=headers) if resp.status_code == 401: # if unauthorized - print( + log.error( "Failed to authorize you! Your password or username may be wrong or you are trying to use child account (< 18 years old)" ) raise AuthenticationException() diff --git a/xbox/webapi/authentication/models.py b/xbox/webapi/authentication/models.py index 4557837..570306e 100644 --- a/xbox/webapi/authentication/models.py +++ b/xbox/webapi/authentication/models.py @@ -83,18 +83,31 @@ def authorization_header_value(self) -> str: class OAuth2TokenResponse(BaseModel): - token_type: str - expires_in: int - scope: str - access_token: str + token_type: Optional[str] = None + expires_in: Optional[int] = None + scope: Optional[str] = None + access_token: Optional[str] = None refresh_token: Optional[str] = None - user_id: str - issued: datetime = Field(default_factory=utc_now) + user_id: Optional[str] = None + issued: Optional[datetime] = Field(default_factory=utc_now) + + # Error type + error: Optional[str] = None + error_description: Optional[str] = None + correlation_id: Optional[str] = None def is_valid(self) -> bool: return (self.issued + timedelta(seconds=self.expires_in)) > utc_now() +class OAuth2DeviceCodeResponse(BaseModel): + user_code: str + device_code: str + verification_uri: str + interval: int + expires_in: int + + """XAL related models""" diff --git a/xbox/webapi/scripts/authenticate.py b/xbox/webapi/scripts/authenticate.py index 58f67cd..a863ce7 100644 --- a/xbox/webapi/scripts/authenticate.py +++ b/xbox/webapi/scripts/authenticate.py @@ -12,7 +12,10 @@ import webbrowser from xbox.webapi.authentication.manager import AuthenticationManager -from xbox.webapi.authentication.models import OAuth2TokenResponse +from xbox.webapi.authentication.models import ( + OAuth2DeviceCodeResponse, + OAuth2TokenResponse, +) from xbox.webapi.common.signed_session import SignedSession from xbox.webapi.scripts import CLIENT_ID, CLIENT_SECRET, REDIRECT_URI, TOKENS_FILE @@ -78,6 +81,15 @@ async def do_auth( session, client_id, client_secret, redirect_uri ) + def print_verification(resp: OAuth2DeviceCodeResponse): + print( + f"Navigate to URL: {resp.verification_uri}\nEnter following CODE: {resp.user_code}" + ) + + resp = await auth_mgr.device_code_auth(print_verification) + print(resp) + return + # Refresh tokens if we have them if os.path.exists(token_filepath): with open(token_filepath) as f: