Skip to content

Commit

Permalink
test read/write_credentials for valid/invalid content, missing/empty …
Browse files Browse the repository at this point in the history
…file
  • Loading branch information
natthan-pigoux committed Oct 15, 2024
1 parent 62b9d6c commit 7315d77
Showing 1 changed file with 86 additions and 11 deletions.
97 changes: 86 additions & 11 deletions diracx-core/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fcntl
import time
from datetime import datetime, timedelta, timezone
from multiprocessing import Pool
from pathlib import Path
from tempfile import NamedTemporaryFile
Expand All @@ -12,6 +13,7 @@
from diracx.core.utils import (
dotenv_files_from_environment,
read_credentials,
serialize_credentials,
write_credentials,
)

Expand Down Expand Up @@ -39,7 +41,13 @@ def test_dotenv_files_from_environment(monkeypatch):
assert dotenv_files_from_environment("TEST_PREFIX") == ["/a", "/b"]


TOKEN_STR = '{"access_token": "test", "expires_on": 10, "refresh_token": "test"}'
TOKEN_RESPONSE_DICT = {
"access_token": "test_token",
"expires_in": int((datetime.now(tz=timezone.utc) + timedelta(days=1)).timestamp()),
"token_type": "Bearer",
"refresh_token": "test_refresh",
}
CREDENTIALS_CONTENT = serialize_credentials(TokenResponse(**TOKEN_RESPONSE_DICT))


def lock_and_read_file(file_path):
Expand All @@ -55,22 +63,16 @@ def lock_and_write_file(file_path: Path):
"""Lock and write file."""
with open(file_path, "a") as f:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
f.write(TOKEN_STR)
f.write(CREDENTIALS_CONTENT)
time.sleep(2)
fcntl.flock(f, fcntl.LOCK_UN)


@pytest.fixture
def token_setup():
def token_setup() -> tuple[TokenResponse, Path]:
"""Setup token response and location."""
token_location = Path(NamedTemporaryFile().name)
token_response = {
"access_token": "test",
"expires_in": 10,
"token_type": "Bearer",
"refresh_token": "test",
}
token_response = TokenResponse(**token_response)
token_response = TokenResponse(**TOKEN_RESPONSE_DICT)
return token_response, token_location


Expand All @@ -85,7 +87,7 @@ def run_processes(proc_to_test, *, read=True):
if read:
# Creating the file before reading it
with open(location, "w") as f:
f.write(TOKEN_STR)
f.write(CREDENTIALS_CONTENT)
pool.apply_async(
lock_and_read_file,
args=(location,),
Expand Down Expand Up @@ -121,6 +123,10 @@ def error_callback(error, error_dict, process_name):
error_dict[process_name] = error


def assert_read_credentials_error_message(exc_info):
assert "Error reading credentials:" in exc_info.value.args[0]


def test_read_credentials_reading_locked_file(
token_setup, concurrent_access_to_lock_file
):
Expand Down Expand Up @@ -154,3 +160,72 @@ def test_write_credentials_writing_locked_file(
raise AssertionError(
"Expected a BlockingIOError while writing locked credentials."
)


def create_temp_file(content=None) -> Path:
"""Helper function to create a temporary file with optional content."""
temp_file = NamedTemporaryFile(delete=False)
temp_path = Path(temp_file.name)
temp_file.close()
if content is not None:
temp_path.write_text(content)
return temp_path


def test_read_credentials_empty_file():
"""Test that read_credentials raises an appropriate error for an empty file."""
temp_file = create_temp_file("")

with pytest.raises(RuntimeError) as exc_info:
read_credentials(location=temp_file)

temp_file.unlink()
assert_read_credentials_error_message(exc_info)


def test_write_credentials_empty_file(token_setup):
"""Test that write_credentials raises an appropriate error for an empty file."""
temp_file = create_temp_file("")
token_response, _ = token_setup
write_credentials(token_response, location=temp_file)
temp_file.unlink()


def test_read_credentials_missing_file():
"""Test that read_credentials raises an appropriate error for a missing file."""
missing_file = Path("/path/to/nonexistent/file.txt")
with pytest.raises(RuntimeError) as exc_info:
read_credentials(location=missing_file)
assert_read_credentials_error_message(exc_info)


def test_write_credentials_unavailable_path(token_setup):
"""Test that write_credentials raises error when it can't create path."""
wrong_path = Path("/wrong/path/file.txt")
token_response, _ = token_setup
with pytest.raises(PermissionError):
write_credentials(token_response, location=wrong_path)


def test_read_credentials_invalid_content():
"""Test that read_credentials raises an appropriate error for a file with invalid content."""
temp_file = create_temp_file("invalid content")

with pytest.raises(RuntimeError) as exc_info:
read_credentials(location=temp_file)

temp_file.unlink()
assert_read_credentials_error_message(exc_info)


def test_read_credentials_valid_file(token_setup):
"""Test that read_credentials works correctly with a valid file."""
token_response, _ = token_setup
temp_file = create_temp_file(content=CREDENTIALS_CONTENT)

credentials = read_credentials(location=temp_file)
temp_file.unlink()
assert credentials.access_token == token_response.access_token
assert credentials.expires_in < token_response.expires_in
assert credentials.token_type == token_response.token_type
assert credentials.refresh_token == token_response.refresh_token

0 comments on commit 7315d77

Please sign in to comment.