Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from alembic import context
from sqlalchemy import engine_from_config, pool

from print_service.models import Model
from print_service.models import BaseDbModel
from print_service.settings import get_settings


Expand All @@ -20,7 +20,7 @@
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Model.metadata
target_metadata = BaseDbModel.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand Down
34 changes: 34 additions & 0 deletions migrations/versions/90539e2253b3_add_soft_delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add soft delete

Revision ID: 90539e2253b3
Revises: a68c6bb2972c
Create Date: 2025-06-01 17:29:08.641697

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '90539e2253b3'
down_revision = 'a68c6bb2972c'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('file', 'source',
existing_type=sa.VARCHAR(),
nullable=False)
op.add_column('union_member', sa.Column('is_deleted', sa.Boolean(), nullable=False))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('union_member', 'is_deleted')
op.alter_column('file', 'source',
existing_type=sa.VARCHAR(),
nullable=True)
# ### end Alembic commands ###
21 changes: 19 additions & 2 deletions print_service/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@

settings = get_settings()


class ObjectNotFound(Exception):
pass
def __init__(self, obj: type, obj_id_or_name: int | str):
super().__init__(
f"Object {obj.__name__} {obj_id_or_name=} not found",
)


class AlreadyExists(Exception):
def __init__(self, obj: type, obj_id_or_name: int | str):
super().__init__(
f"Object {obj.__name__}, {obj_id_or_name=} already exists",
)


class TerminalTokenNotFound(ObjectNotFound):
Expand Down Expand Up @@ -71,6 +80,14 @@ def __init__(self, content_type: str):
f'Only {", ".join(settings.CONTENT_TYPES)} files allowed, but {content_type} was recieved'
)

class PrintCodeExpired(Exception):
def __init__(self):
super().__init__(f'Print code expired')

class PrintLimitExceed(Exception):
def __init__(self):
super().__init__(f'Print limit exceed')


class AlreadyUploaded(Exception):
def __init__(self):
Expand Down
93 changes: 3 additions & 90 deletions print_service/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,5 @@
from __future__ import annotations
from .base import Base, BaseDbModel
from .db import *

import math
from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql.sqltypes import Boolean


@as_declarative()
class Model:
pass


class UnionMember(Model):
__tablename__ = 'union_member'

id: Mapped[int] = mapped_column(Integer, primary_key=True)
surname: Mapped[str] = mapped_column(String, nullable=False)
union_number: Mapped[str] = mapped_column(String, nullable=True)
student_number: Mapped[str] = mapped_column(String, nullable=True)

files: Mapped[list[File]] = relationship('File', back_populates='owner')
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='owner')


class File(Model):
__tablename__ = 'file'

id: Mapped[int] = Column(Integer, primary_key=True)
pin: Mapped[str] = Column(String, nullable=False)
file: Mapped[str] = Column(String, nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
option_pages: Mapped[str] = Column(String)
option_copies: Mapped[int] = Column(Integer)
option_two_sided: Mapped[bool] = Column(Boolean)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at: Mapped[datetime] = Column(
DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow
)
number_of_pages: Mapped[int] = Column(Integer)
source: Mapped[str] = Column(String, default='unknown', nullable=False)

owner: Mapped[UnionMember] = relationship('UnionMember', back_populates='files')
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='file')

@property
def flatten_pages(self) -> list[int] | None:
'''Возвращает расширенный список из элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> [1, 2, 3, 4, 5, 3, 2]'''
if self.number_of_pages is None:
return None
result = list()
if self.option_pages == '':
return result
for part in self.option_pages.split(','):
x = part.split('-')
result.extend(range(int(x[0]), int(x[-1]) + 1))
return result

@property
def sheets_count(self) -> int | None:
'''Возвращает количество элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> 7
P.S. 1, 2, 3, 4, 5, 3, 2 -- 7 чисел'''
if self.number_of_pages is None:
return None
if not self.flatten_pages:
return (
math.ceil(self.number_of_pages - (self.option_two_sided * self.number_of_pages / 2))
* self.option_copies
)
if self.option_two_sided:
return math.ceil(len(self.flatten_pages) / 2) * self.option_copies
else:
return len(self.flatten_pages) * self.option_copies


class PrintFact(Model):
__tablename__ = 'print_fact'

id: Mapped[int] = Column(Integer, primary_key=True)
file_id: Mapped[int] = Column(Integer, ForeignKey('file.id'), nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)

owner: Mapped[UnionMember] = relationship('UnionMember', back_populates='print_facts')
file: Mapped[File] = relationship('File', back_populates='print_facts')
sheets_used: Mapped[int] = Column(Integer)
__all__ = ["Base", "BaseDbModel", "UnionMember", "File", "PrintFact"]
87 changes: 87 additions & 0 deletions print_service/models/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import re

from sqlalchemy import not_
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Query, Session, as_declarative, declared_attr

from ..exceptions import AlreadyExists, ObjectNotFound


@as_declarative()
class Base:
"""Base class for all database entities"""

@declared_attr
def __tablename__(cls) -> str: # pylint: disable=no-self-argument
"""Generate database table name automatically.
Convert CamelCase class name to snake_case db table name.
"""
return re.sub(r"(?<!^)(?=[A-Z])", "_", cls.__name__).lower()

def __repr__(self):
attrs = []
for c in self.__table__.columns:
attrs.append(f"{c.name}={getattr(self, c.name)}")
return "{}({})".format(c.__class__.__name__, ', '.join(attrs))


class BaseDbModel(Base):
__abstract__ = True

@classmethod
def create(cls, *, session: Session, **kwargs) -> BaseDbModel:
obj = cls(**kwargs)
session.add(obj)
session.flush()
return obj

@classmethod
def query(cls, *, with_deleted: bool = False, session: Session) -> Query:
"""Get all objects with soft deletes"""
objs = session.query(cls)
if not with_deleted and hasattr(cls, "is_deleted"):
objs = objs.filter(not_(cls.is_deleted))
return objs

@classmethod
def get(cls, id: int | str, *, with_deleted=False, session: Session) -> BaseDbModel:
"""Get object with soft deletes"""
objs = session.query(cls)
if not with_deleted and hasattr(cls, "is_deleted"):
objs = objs.filter(not_(cls.is_deleted))
try:
if hasattr(cls, "uuid"):
return objs.filter(cls.uuid == id).one()
return objs.filter(cls.id == id).one()
except NoResultFound:
raise ObjectNotFound(obj=cls, obj_id_or_name=id)

@classmethod
def update(cls, id: int | str, *, session: Session, **kwargs) -> BaseDbModel:
"""Update model with new values from kwargs.
If no new values are given, raise HTTP 409 error.
"""
get_new_values = False
obj = cls.get(id, session=session)
for k, v in kwargs.items():
cur_v = getattr(obj, k)
if cur_v != v:
setattr(obj, k, v)
get_new_values = True
if not get_new_values:
raise AlreadyExists(cls, id)
session.add(obj)
session.flush()
return obj

@classmethod
def delete(cls, id: int | str, *, session: Session) -> None:
"""Soft delete object if possible, else hard delete"""
obj = cls.get(id, session=session)
if hasattr(obj, "is_deleted"):
obj.is_deleted = True
else:
session.delete(obj)
session.flush()
97 changes: 97 additions & 0 deletions print_service/models/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

import math
from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.schema import ForeignKey
from sqlalchemy.sql.sqltypes import Boolean

from .base import BaseDbModel


class UnionMember(BaseDbModel):
__tablename__ = 'union_member'

id: Mapped[int] = mapped_column(Integer, primary_key=True)
surname: Mapped[str] = mapped_column(String, nullable=False)
union_number: Mapped[str] = mapped_column(String, nullable=True)
student_number: Mapped[str] = mapped_column(String, nullable=True)

files: Mapped[list[File]] = relationship('File', back_populates='owner')
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='owner')
is_deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)


class File(BaseDbModel):
__tablename__ = 'file'

id: Mapped[int] = Column(Integer, primary_key=True)
pin: Mapped[str] = Column(String, nullable=False)
file: Mapped[str] = Column(String, nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
option_pages: Mapped[str] = Column(String)
option_copies: Mapped[int] = Column(Integer)
option_two_sided: Mapped[bool] = Column(Boolean)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)
updated_at: Mapped[datetime] = Column(
DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow
)
number_of_pages: Mapped[int] = Column(Integer)
source: Mapped[str] = Column(String, default='unknown', nullable=False)

owner: Mapped[UnionMember] = relationship(
'UnionMember',
primaryjoin="and_(File.owner_id==UnionMember.id, not_(UnionMember.is_deleted))",
back_populates='files',
)
print_facts: Mapped[list[PrintFact]] = relationship('PrintFact', back_populates='file')

@property
def flatten_pages(self) -> list[int] | None:
'''Возвращает расширенный список из элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> [1, 2, 3, 4, 5, 3, 2]'''
if self.number_of_pages is None:
return None
result = list()
if self.option_pages == '':
return result
for part in self.option_pages.split(','):
x = part.split('-')
result.extend(range(int(x[0]), int(x[-1]) + 1))
return result

@property
def sheets_count(self) -> int | None:
'''Возвращает количество элементов списков внутренних целочисленных точек переданного множества отрезков
"1-5, 3, 2" --> 7
P.S. 1, 2, 3, 4, 5, 3, 2 -- 7 чисел'''
if self.number_of_pages is None:
return None
if not self.flatten_pages:
return (
math.ceil(self.number_of_pages - (self.option_two_sided * self.number_of_pages / 2))
* self.option_copies
)
if self.option_two_sided:
return math.ceil(len(self.flatten_pages) / 2) * self.option_copies
else:
return len(self.flatten_pages) * self.option_copies


class PrintFact(BaseDbModel):
__tablename__ = 'print_fact'

id: Mapped[int] = Column(Integer, primary_key=True)
file_id: Mapped[int] = Column(Integer, ForeignKey('file.id'), nullable=False)
owner_id: Mapped[int] = Column(Integer, ForeignKey('union_member.id'), nullable=False)
created_at: Mapped[datetime] = Column(DateTime, nullable=False, default=datetime.utcnow)

owner: Mapped[UnionMember] = relationship(
'UnionMember',
primaryjoin="and_(PrintFact.owner_id==UnionMember.id, not_(UnionMember.is_deleted))",
back_populates='print_facts',
)
file: Mapped[File] = relationship('File', back_populates='print_facts')
sheets_used: Mapped[int] = Column(Integer)
2 changes: 1 addition & 1 deletion print_service/routes/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from auth_lib.fastapi import UnionAuth
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends
from redis import Redis

from print_service.exceptions import TerminalTokenNotFound
Expand Down
Loading
Loading