Skip to content

Commit 903ced6

Browse files
committed
WIP: Move closer to supporting sandboxes
1 parent 715e1d9 commit 903ced6

File tree

6 files changed

+260
-182
lines changed

6 files changed

+260
-182
lines changed

src/diracx/core/models.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,18 @@ class UserInfo(BaseModel):
112112
preferred_username: str
113113
dirac_group: str
114114
vo: str
115+
116+
117+
class SandboxChecksum(StrEnum):
118+
SHA256 = "sha256"
119+
120+
121+
class SandboxFormat(StrEnum):
122+
TAR_GZ = "tar.gz"
123+
124+
125+
class SandboxInfo(BaseModel):
126+
checksum_algorithm: SandboxChecksum
127+
checksum: str = Field(pattern=r"^[0-f]{64}$")
128+
size: int = Field(ge=1)
129+
format: SandboxFormat

src/diracx/core/s3.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Utilities for interacting with S3-compatible storage."""
2+
from __future__ import annotations
3+
4+
import base64
5+
from typing import TypedDict
6+
7+
from botocore.errorfactory import ClientError
8+
9+
PRESIGNED_URL_TIMEOUT = 5 * 60
10+
11+
12+
class S3PresignedPostInfo(TypedDict):
13+
url: str
14+
fields: dict[str, str]
15+
16+
17+
def hack_get_s3_client():
18+
# TODO: Use async
19+
import boto3
20+
from botocore.config import Config
21+
22+
s3_cred = {
23+
"endpoint": "http://christohersmbp4.localdomain:32000",
24+
"access_key_id": "console",
25+
"secret_access_key": "console123",
26+
}
27+
bucket_name = "sandboxes"
28+
my_config = Config(signature_version="v4")
29+
s3 = boto3.client(
30+
"s3",
31+
endpoint_url=s3_cred["endpoint"],
32+
aws_access_key_id=s3_cred["access_key_id"],
33+
aws_secret_access_key=s3_cred["secret_access_key"],
34+
config=my_config,
35+
)
36+
try:
37+
s3.create_bucket(Bucket=bucket_name)
38+
except Exception:
39+
pass
40+
return s3, bucket_name
41+
42+
43+
def s3_object_exists(s3_client, bucket_name, key) -> bool:
44+
"""Check if an object exists in an S3 bucket."""
45+
try:
46+
s3_client.head_object(Bucket=bucket_name, Key=key)
47+
except ClientError as e:
48+
if e.response["Error"]["Code"] != "404":
49+
raise
50+
return False
51+
else:
52+
return True
53+
54+
55+
def generate_presigned_upload(
56+
s3_client, bucket_name, key, checksum_algorithm, checksum, size
57+
) -> S3PresignedPostInfo:
58+
"""Generate a presigned URL and fields for uploading a file to S3
59+
60+
The signature is restricted to only accept data with the given checksum and size.
61+
"""
62+
fields = {
63+
"x-amz-checksum-algorithm": checksum_algorithm,
64+
f"x-amz-checksum-{checksum_algorithm}": b16_to_b64(checksum),
65+
}
66+
conditions = [["content-length-range", size, size]] + [
67+
{k: v} for k, v in fields.items()
68+
]
69+
return s3_client.generate_presigned_post(
70+
Bucket=bucket_name,
71+
Key=key,
72+
Fields=fields,
73+
Conditions=conditions,
74+
ExpiresIn=PRESIGNED_URL_TIMEOUT,
75+
)
76+
77+
78+
def b16_to_b64(hex_string: str) -> str:
79+
"""Convert hexadecimal encoded data to base64 encoded data"""
80+
return base64.b64encode(base64.b16decode(hex_string.upper())).decode()

src/diracx/db/sql/sandbox_metadata/db.py

Lines changed: 49 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,86 @@
1-
""" SandboxMetadataDB frontend
2-
"""
3-
41
from __future__ import annotations
52

6-
import datetime
7-
83
import sqlalchemy
94

10-
from diracx.db.sql.utils import BaseSQLDB
5+
from diracx.core.models import SandboxInfo, UserInfo
6+
from diracx.db.sql.utils import BaseSQLDB, utcnow
117

128
from .schema import Base as SandboxMetadataDBBase
139
from .schema import sb_Owners, sb_SandBoxes
1410

1511
# In legacy DIRAC the SEName column was used to support multiple different
1612
# storage backends. This is no longer the case, so we hardcode the value to
1713
# S3 to represent the new DiracX system.
18-
SE_NAME = "S3"
14+
SE_NAME = "ProductionSandboxSE"
15+
PFN_PREFIX = "/S3/"
1916

2017

2118
class SandboxMetadataDB(BaseSQLDB):
2219
metadata = SandboxMetadataDBBase.metadata
2320

24-
async def _get_put_owner(self, owner: str, owner_group: str) -> int:
25-
"""adds a new owner/ownerGroup pairs, while returning their ID if already existing
26-
27-
Args:
28-
owner (str): user name
29-
owner_group (str): group of the owner
30-
"""
21+
async def upsert_owner(self, user: UserInfo) -> int:
22+
"""Get the id of the owner from the database"""
23+
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
3124
stmt = sqlalchemy.select(sb_Owners.OwnerID).where(
32-
sb_Owners.Owner == owner, sb_Owners.OwnerGroup == owner_group
25+
sb_Owners.Owner == user.preferred_username,
26+
sb_Owners.OwnerGroup == user.dirac_group,
27+
# TODO: Add VO
3328
)
3429
result = await self.conn.execute(stmt)
3530
if owner_id := result.scalar_one_or_none():
3631
return owner_id
3732

38-
stmt = sqlalchemy.insert(sb_Owners).values(Owner=owner, OwnerGroup=owner_group)
33+
stmt = sqlalchemy.insert(sb_Owners).values(
34+
Owner=user.preferred_username,
35+
OwnerGroup=user.dirac_group,
36+
)
3937
result = await self.conn.execute(stmt)
4038
return result.lastrowid
4139

42-
async def insert(
43-
self, owner: str, owner_group: str, sb_SE: str, se_PFN: str, size: int = 0
44-
) -> int:
45-
"""inserts a new sandbox in SandboxMetadataDB
46-
this is "equivalent" of DIRAC registerAndGetSandbox
47-
48-
Args:
49-
owner (str): user name_
50-
owner_group (str): groupd of the owner
51-
sb_SE (str): _description_
52-
sb_PFN (str): _description_
53-
size (int, optional): _description_. Defaults to 0.
54-
"""
55-
owner_id = await self._get_put_owner(owner, owner_group)
40+
@staticmethod
41+
def get_pfn(bucket_name: str, user: UserInfo, sandbox_info: SandboxInfo) -> str:
42+
"""Get the sandbox's user namespaced and content addressed PFN"""
43+
parts = [
44+
"S3",
45+
bucket_name,
46+
user.vo,
47+
user.dirac_group,
48+
user.preferred_username,
49+
f"{sandbox_info.checksum_algorithm}:{sandbox_info.checksum}.{sandbox_info.format}",
50+
]
51+
return "/".join(parts)
52+
53+
async def insert_sandbox(self, user: UserInfo, pfn: str, size: int):
54+
"""Add a new sandbox in SandboxMetadataDB"""
55+
# TODO: Follow https://github.com/DIRACGrid/diracx/issues/49
56+
owner_id = await self.upsert_owner(user)
5657
stmt = sqlalchemy.insert(sb_SandBoxes).values(
57-
OwnerId=owner_id, SEName=sb_SE, SEPFN=se_PFN, Bytes=size
58+
OwnerId=owner_id, SEName=SE_NAME, SEPFN=pfn, Bytes=size
5859
)
5960
try:
6061
result = await self.conn.execute(stmt)
61-
return result.lastrowid
6262
except sqlalchemy.exc.IntegrityError:
63-
# it is a duplicate, try to retrieve SBiD
64-
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.SBId).where( # type: ignore[no-redef]
65-
sb_SandBoxes.SEPFN == se_PFN,
66-
sb_SandBoxes.SEName == sb_SE,
67-
sb_SandBoxes.OwnerId == owner_id,
68-
)
69-
result = await self.conn.execute(stmt)
70-
sb_ID = result.scalar_one()
71-
stmt: sqlalchemy.Executable = ( # type: ignore[no-redef]
72-
sqlalchemy.update(sb_SandBoxes)
73-
.where(sb_SandBoxes.SBId == sb_ID)
74-
.values(LastAccessTime=datetime.datetime.utcnow())
75-
)
76-
await self.conn.execute(stmt)
77-
return sb_ID
78-
79-
async def exists_and_assigned(self, name: str) -> bool:
80-
"""Checks if a sandbox exists and has been assigned
63+
await self.update_sandbox_last_access_time(pfn)
64+
else:
65+
assert result.rowcount == 1
66+
67+
async def update_sandbox_last_access_time(self, pfn: str) -> None:
68+
stmt = (
69+
sqlalchemy.update(sb_SandBoxes)
70+
.where(sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn)
71+
.values(LastAccessTime=utcnow())
72+
)
73+
result = await self.conn.execute(stmt)
74+
assert result.rowcount == 1
8175

82-
As sandboxes are registered in the DB before uploading to the storage
83-
backend we can't on their existence in the database to determine if
84-
they have been uploaded. Instead we check if the sandbox has been
85-
assigned to a job. If it has then we know it has been uploaded and we
86-
can avoid communicating with the storage backend.
87-
"""
76+
async def sandbox_is_assigned(self, pfn: str) -> bool:
77+
"""Checks if a sandbox exists and has been assigned."""
8878
stmt: sqlalchemy.Executable = sqlalchemy.select(sb_SandBoxes.Assigned).where(
89-
sb_SandBoxes.SEName == SE_NAME,
90-
sb_SandBoxes.SEPFN == name,
79+
sb_SandBoxes.SEName == SE_NAME, sb_SandBoxes.SEPFN == pfn
9180
)
9281
result = await self.conn.execute(stmt)
93-
return result.scalar_one()
82+
is_assigned = result.scalar_one()
83+
return is_assigned
9484

9585
async def delete(self, sandbox_ids: list[int]) -> bool:
9686
stmt: sqlalchemy.Executable = sqlalchemy.delete(sb_SandBoxes).where(

0 commit comments

Comments
 (0)