Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pysnc/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AsyncServiceNowFlow,
AsyncServiceNowJWTAuth,
AsyncServiceNowPasswordGrantFlow,
AsyncServiceNowClientCredentialsFlow,
)
from .client import (
AsyncAttachmentAPI,
Expand All @@ -28,5 +29,6 @@
"AsyncAttachment",
"AsyncServiceNowFlow",
"AsyncServiceNowPasswordGrantFlow",
"AsyncServiceNowClientCredentialsFlow",
"AsyncServiceNowJWTAuth",
]
130 changes: 130 additions & 0 deletions pysnc/asyncio/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,136 @@ async def authenticate(self, instance: str, **kwargs) -> httpx.AsyncClient: # t
return client


class AsyncServiceNowClientCredentialsFlow(AsyncServiceNowFlow):
"""
OAuth2 Client Credentials Grant Flow for async ServiceNow client.

This flow is ideal for machine-to-machine authentication where no user context is needed.
Only requires client_id and client_secret (no username/password).

Example:
>>> flow = AsyncServiceNowClientCredentialsFlow('my_client_id', 'my_client_secret')
>>> client = AsyncServiceNowClient('dev12345', flow)
"""

def __init__(self, client_id: str, client_secret: str):
"""
Client Credentials flow authentication (OAuth 2.0)

:param client_id: The OAuth application client ID
:param client_secret: The OAuth application client secret
"""
self.client_id = client_id
self.__secret = client_secret
self.__token: Optional[str] = None
self.__expires_at: Optional[float] = None

def authorization_url(self, authorization_base_url: str) -> str:
"""Generate the token endpoint URL"""
return f"{authorization_base_url}/oauth_token.do"

async def _get_access_token(self, instance: str) -> str:
"""
Request an access token from ServiceNow using client credentials.

:param instance: The ServiceNow instance URL
:return: Access token string
:raises AuthenticationException: If token request fails
"""
token_url = self.authorization_url(instance)
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
}
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.__secret
}

async with httpx.AsyncClient() as client:
try:
r = await client.post(token_url, headers=headers, data=data, timeout=30.0)
except httpx.RequestError as e:
raise AuthenticationException(f"Failed to connect to token endpoint: {e}")

if r.status_code != 200:
try:
error_data = r.json()
error_msg = error_data.get('error_description', error_data.get('error', r.text))
except Exception:
error_msg = r.text
raise AuthenticationException(
f"Failed to obtain access token: {r.status_code} {r.reason_phrase} - {error_msg}"
)

try:
token_data = r.json()
except Exception:
raise AuthenticationException(f"Invalid JSON response from token endpoint: {r.text}")

if 'access_token' not in token_data:
raise AuthenticationException(f"No access_token in response: {token_data}")

self.__token = token_data['access_token']
# Use expires_in from response, default to 3600 seconds (1 hour) if not provided
expires_in = token_data.get('expires_in', 3600)
# Refresh 60 seconds before actual expiry to avoid edge cases
self.__expires_at = time.time() + expires_in - 60

return self.__token

async def authenticate(self, instance: str, **kwargs) -> httpx.AsyncClient:
"""
Create and return an authenticated httpx.AsyncClient with Bearer token.
The client will automatically refresh the token when it expires.

:param instance: The ServiceNow instance URL
:param kwargs: Additional arguments (proxies, verify, timeout, etc.)
:return: Authenticated httpx.AsyncClient
"""
verify = kwargs.get("verify", True)
proxies = kwargs.get("proxies", None)
timeout = kwargs.get("timeout", 30.0)

# Get initial token
if not self.__token or (self.__expires_at is not None and time.time() > self.__expires_at):
await self._get_access_token(instance)

# Create client with custom auth handler that refreshes tokens
client = httpx.AsyncClient(
base_url=instance,
headers={"Accept": "application/json"},
auth=_AsyncClientCredentialsAuth(self, instance),
verify=verify,
proxy=proxies,
timeout=timeout,
follow_redirects=True,
)

return client


class _AsyncClientCredentialsAuth(httpx.Auth):
"""
Internal auth handler that automatically refreshes client credentials tokens for async client.
"""

def __init__(self, flow: AsyncServiceNowClientCredentialsFlow, instance: str):
self._flow = flow
self._instance = instance

async def async_auth_flow(self, request: httpx.Request):
"""httpx Auth flow that checks and refreshes token before each request"""
# Check if token needs refresh
if not self._flow._AsyncServiceNowClientCredentialsFlow__token or \
(self._flow._AsyncServiceNowClientCredentialsFlow__expires_at is not None and
time.time() > self._flow._AsyncServiceNowClientCredentialsFlow__expires_at):
await self._flow._get_access_token(self._instance)

request.headers['Authorization'] = f"Bearer {self._flow._AsyncServiceNowClientCredentialsFlow__token}"
yield request


class AsyncServiceNowJWTAuth(httpx.Auth):
"""
JWT-based authentication for async client
Expand Down
25 changes: 23 additions & 2 deletions pysnc/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,29 @@ def __init__(self, instance, auth, proxy=None, verify=None, cert=None, auto_retr
self.__session = auth
# best-effort header merge
self.__session.headers.update(headers)
elif isinstance(auth, AsyncServiceNowFlow): # accept either, adapt
raise NotImplementedError("AsyncServiceNowFlow is not supported yet for async client")
elif isinstance(auth, AsyncServiceNowFlow):
# Use the async flow to authenticate and get a client
import asyncio
# We need to run the async authenticate method
# This is a bit awkward but necessary for __init__
loop = None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
pass

if loop and loop.is_running():
# If we're already in an async context, we can't use run_until_complete
# Store the flow and defer authentication
self.__pending_flow = auth
self.__session = None
else:
# Not in an async context, we can authenticate now
self.__session = asyncio.run(auth.authenticate(
self.__instance,
proxies=self.__proxy_url,
verify=verify if verify is not None else True
))
elif cert is not None:
# cert-only client (no auth)
self.__session = httpx.AsyncClient(
Expand Down
124 changes: 124 additions & 0 deletions pysnc/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'

__all__ = [
'ServiceNowFlow',
'ServiceNowPasswordGrantFlow',
'ServiceNowClientCredentialsFlow',
'ServiceNowJWTAuth',
]


class ServiceNowFlow:
def authenticate(self, instance: str, **kwargs) -> requests.Session:
Expand Down Expand Up @@ -56,6 +63,123 @@ def authenticate(self, instance: str, **kwargs) -> requests.Session:
raise AuthenticationException('Install dependency requests-oauthlib')


class ServiceNowClientCredentialsFlow(ServiceNowFlow):
"""
OAuth2 Client Credentials Grant Flow for ServiceNow.

This flow is ideal for machine-to-machine authentication where no user context is needed.
Only requires client_id and client_secret (no username/password).

Example:
>>> flow = ServiceNowClientCredentialsFlow('my_client_id', 'my_client_secret')
>>> client = ServiceNowClient('dev12345', flow)
"""

def __init__(self, client_id, client_secret):
"""
Client Credentials flow authentication (OAuth 2.0)

:param client_id: The OAuth application client ID
:param client_secret: The OAuth application client secret
"""
self.client_id = client_id
self.__secret = client_secret
self.__token = None
self.__expires_at = None

def authorization_url(self, authorization_base_url):
"""Generate the token endpoint URL"""
return f"{authorization_base_url}/oauth_token.do"

def _get_access_token(self, instance):
"""
Request an access token from ServiceNow using client credentials.

:param instance: The ServiceNow instance URL
:return: Access token string
:raises AuthenticationException: If token request fails
"""
token_url = self.authorization_url(instance)
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
}
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.__secret
}

try:
r = requests.post(token_url, headers=headers, data=data, timeout=30)
except requests.exceptions.RequestException as e:
raise AuthenticationException(f"Failed to connect to token endpoint: {e}")

if r.status_code != 200:
try:
error_data = r.json()
error_msg = error_data.get('error_description', error_data.get('error', r.text))
except ValueError:
error_msg = r.text
raise AuthenticationException(
f"Failed to obtain access token: {r.status_code} {r.reason} - {error_msg}"
)

try:
token_data = r.json()
except ValueError:
raise AuthenticationException(f"Invalid JSON response from token endpoint: {r.text}")

if 'access_token' not in token_data:
raise AuthenticationException(f"No access_token in response: {token_data}")

self.__token = token_data['access_token']
# Use expires_in from response, default to 3600 seconds (1 hour) if not provided
expires_in = token_data.get('expires_in', 3600)
# Refresh 60 seconds before actual expiry to avoid edge cases
self.__expires_at = int(time.time() + expires_in - 60)

return self.__token

def authenticate(self, instance: str, **kwargs) -> requests.Session:
"""
Create and return an authenticated requests.Session with Bearer token.
The session will automatically refresh the token when it expires.

:param instance: The ServiceNow instance URL
:param kwargs: Additional arguments (proxies, verify, etc.)
:return: Authenticated requests.Session
"""
session = requests.Session()

# Get initial token
if not self.__token or time.time() > (self.__expires_at or 0):
self._get_access_token(instance)

# Use a custom auth handler that refreshes tokens
session.auth = _ClientCredentialsAuth(self, instance)

return session


class _ClientCredentialsAuth(AuthBase):
"""
Internal auth handler that automatically refreshes client credentials tokens.
"""

def __init__(self, flow, instance):
self._flow = flow
self._instance = instance

def __call__(self, request):
# Check if token needs refresh
if not self._flow._ServiceNowClientCredentialsFlow__token or \
time.time() > (self._flow._ServiceNowClientCredentialsFlow__expires_at or 0):
self._flow._get_access_token(self._instance)

request.headers['Authorization'] = f"Bearer {self._flow._ServiceNowClientCredentialsFlow__token}"
return request


class ServiceNowJWTAuth(AuthBase):

def __init__(self, client_id, client_secret, jwt):
Expand Down
Loading