Skip to content

Commit

Permalink
Merge pull request #8 from BITNP:explicit-actions
Browse files Browse the repository at this point in the history
refactor: use explicit login and logouts
  • Loading branch information
spencerwooo authored Jan 24, 2023
2 parents 9cf29b6 + a4b56fd commit 260bf84
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 63 deletions.
6 changes: 0 additions & 6 deletions bitsrun/action.py

This file was deleted.

16 changes: 9 additions & 7 deletions bitsrun/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import click

from bitsrun.action import Action
from bitsrun.config import get_config_paths, read_config
from bitsrun.user import User

Expand Down Expand Up @@ -68,26 +67,29 @@ def do_action(action, username, password, verbose, silent):

try:
if action == "login":
res = user.do_action(Action.LOGIN)
res = user.login()

# Output login result by default if not silent
if not silent:
click.echo(f"{res.get('username')} ({res.get('online_ip')}) logged in")

else:
res = user.do_action(Action.LOGOUT)
elif action == "logout":
res = user.logout()

# Output logout result by default if not silent
if not silent:
click.echo(f"{res.get('online_ip')} logged out")

else:
# Should not reach here, but just in case
raise ValueError(f"unknown action `{action}`")

# Output direct result of response if verbose
if verbose:
click.secho(f"Info: {res}", fg="blue")
click.echo(f"{click.style('info:', fg='blue')} {res}")

except Exception as e:
click.secho(f"Error: {e}", fg="red")

click.echo(f"{click.style('error:', fg='red')} {e}")
# Throw with error code 1 for scripts to pick up error state
sys.exit(1)

Expand Down
30 changes: 30 additions & 0 deletions bitsrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@


def get_config_paths() -> map:
r"""Enumerate possible paths of the configuration file.
On Windows, the possible paths are:
- `C:\ProgramData\bitsrun\bit-user.json`
- `~\AppData\Roaming\bitsrun\bit-user.json`
On macOS and Linux:
- `/etc/bitsrun/bit-user.json`
- `$XDG_CONFIG_HOME/bitsrun/bit-user.json`
- `~/.config/bitsrun/bit-user.json`
- `~/.config/bit-user.json`
Returns:
A map of possible paths of the configuration file based on the current platform.
"""

paths = [
site_config_path(_APP_NAME, appauthor=False),
user_config_path(_APP_NAME, appauthor=False, roaming=True),
Expand All @@ -32,6 +50,18 @@ def get_config_paths() -> map:


def read_config() -> Optional[Tuple[str, str]]:
"""Read config from the first available config file with name `bit-user.json`.
The config file should be a JSON file with the following structure:
```json
{ "username": "xxxx", "password": "xxxx" }
```
Returns:
A tuple of (username, password) if the config file is found.
"""

paths = get_config_paths()
for path in paths:
try:
Expand Down
79 changes: 61 additions & 18 deletions bitsrun/user.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,90 @@
import hmac
import json
from enum import Enum
from hashlib import sha1
from typing import Dict, Union
from typing import Dict, Optional, Union

from requests import Session

from bitsrun.action import Action
from bitsrun.utils import fkbase64, get_user_info, parse_homepage, xencode
from bitsrun.utils import fkbase64, parse_homepage, xencode

API_BASE = "http://10.0.0.55"
TYPE_CONST = 1
N_CONST = 200


class Action(Enum):
LOGIN = "login"
LOGOUT = "logout"


class User:
def __init__(self, username: str, password: str):
self.username = username
self.password = password

self.ip, self.acid = parse_homepage()
self.ip, self.acid = parse_homepage(api_base=API_BASE)
self.session = Session()

def do_action(self, action: Action) -> Dict[str, Union[str, int]]:
# Check current state - whether device is logged in
# and whether current user the same as the provided one
is_logged_in, username = get_user_info()
def login(self) -> Dict[str, Union[str, int]]:
logged_in_user = self._user_validate()

# Raise exception if device is already logged in
if logged_in_user == self.username:
raise Exception(f"{logged_in_user}, you are already online")

return self._do_action(Action.LOGIN)

def logout(self) -> Dict[str, Union[str, int]]:
logged_in_user = self._user_validate()

# Raise exception if device is not logged in
if logged_in_user is None:
raise Exception("you have already logged out")

return self._do_action(Action.LOGOUT)

def _do_action(self, action: Action) -> Dict[str, Union[str, int]]:
params = self._make_params(action)
response = self.session.get(API_BASE + "/cgi-bin/srun_portal", params=params)
return json.loads(response.text[6:-1])

def _get_user_info(self) -> Optional[str]:
"""Get current logged in user info if exists.
Returns:
The username of the current logged in user if exists.
"""

resp = self.session.get(API_BASE + "/cgi-bin/rad_user_info")
data = resp.text

if data == "not_online_error":
return None

return data.split(",")[0]

def _user_validate(self) -> Optional[str]:
"""Check if current logged in user matches the username provided.
Raises:
Exception: If current logged in user and username provided does not match.
Returns:
The username of the current logged in user if exists.
"""

logged_in_user = self._get_user_info()

# Raise exception only if username exists on this IP and
# command line arguments provided another username
if username and username != self.username:
if logged_in_user and logged_in_user != self.username:
raise Exception(
f"Current logged in user ({username}) and "
f"Current logged in user ({logged_in_user}) and "
f"yours ({self.username}) does not match"
)
if is_logged_in and action is Action.LOGIN:
raise Exception(f"{username}, you are already online")
if not is_logged_in and action is Action.LOGOUT:
raise Exception("you have already logged out")

# Perform login or logout action
params = self._make_params(action)
response = self.session.get(API_BASE + "/cgi-bin/srun_portal", params=params)
return json.loads(response.text[6:-1])
return logged_in_user

def _get_token(self) -> str:
params = {"callback": "jsonp", "username": self.username, "ip": self.ip}
Expand Down
39 changes: 7 additions & 32 deletions bitsrun/utils.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import math
from base64 import b64encode
from html.parser import HTMLParser
from typing import Optional, Tuple
from typing import Tuple
from urllib.parse import parse_qs, urlparse

import requests

API_BASE = "http://10.0.0.55"


def parse_homepage() -> Tuple[str, str]:
"""Parse homepage of 10.0.0.55 and get the acid + ip of current session
def parse_homepage(api_base: str) -> Tuple[str, str]:
"""Parse homepage of 10.0.0.55 and get the acid + ip of current session.
Raises:
Exception: Throw exception if acid not present in the redirected URL
Exception: Throw exception if response text does not contain IP
Exception: Throw exception if acid not present in the redirected URL.
Exception: Throw exception if response text does not contain IP.
Returns:
Tuple[str, str]: Both the ip and the acid of the current session
A tuple of (ip, acid) of the current session.
"""

res = requests.get(API_BASE)
res = requests.get(api_base)

# ac_id appears in the url query parameter of the redirected URL
query = parse_qs(urlparse(res.url).query)
Expand Down Expand Up @@ -54,29 +52,6 @@ def feed(self, *args, **kwargs):
return ip, ac_id[0]


def get_user_info() -> Tuple[bool, Optional[str]]:
"""Get current logged in user info if exists
Returns:
tuple[bool, Optional[str]]
- a boolean indicating whether the current IP is logged in
- the username of the current logged in user if exists
"""

is_logged_in = True
username = None

resp = requests.get(API_BASE + "/cgi-bin/rad_user_info")
data = resp.text

if data == "not_online_error":
is_logged_in = False
else:
username = data.split(",")[0]

return is_logged_in, username


def fkbase64(raw_s: str) -> str:
"""Encode string with a magic base64 mask"""
trans = str.maketrans(
Expand Down

0 comments on commit 260bf84

Please sign in to comment.