Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 22 additions & 122 deletions sources/mimeogram/acquirers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,18 @@ async def _acquire_from_file( location: __.Path ) -> _parts.Part:
async with _aiofiles.open( location, 'rb' ) as f: # pyright: ignore
content_bytes = await f.read( )
except Exception as exc: raise ContentAcquireFailure( location ) from exc
mimetype, charset = _detect_mimetype_and_charset( content_bytes, location )
mimetype, charset = __.detextive.detect_mimetype_and_charset(
content_bytes, location
)
if not __.detextive.is_textual_mimetype( mimetype ):
raise __.detextive.exceptions.TextualMimetypeInvalidity(
str( location ), mimetype
)
if charset is None: raise ContentDecodeFailure( location, '???' )
linesep = _parts.LineSeparators.detect_bytes( content_bytes )
linesep = __.detextive.LineSeparators.detect_bytes( content_bytes )
if linesep is None:
_scribe.warning( f"No line separator detected in '{location}'." )
linesep = _parts.LineSeparators( __.os.linesep )
linesep = __.detextive.LineSeparators( __.os.linesep )
try: content = content_bytes.decode( charset )
except Exception as exc:
raise ContentDecodeFailure( location, charset ) from exc
Expand All @@ -109,16 +115,22 @@ async def _acquire_via_http(
response.headers.get( 'content-type', 'application/octet-stream' )
.split( ';' )[ 0 ].strip( ) )
content_bytes = response.content
charset = response.encoding or _detect_charset( content_bytes )
charset = response.encoding or __.detextive.detect_charset(
content_bytes
)
if charset is None: raise ContentDecodeFailure( url, '???' )
if not _is_textual_mimetype( mimetype ):
mimetype, _ = (
_detect_mimetype_and_charset(
content_bytes, url, charset = charset ) )
linesep = _parts.LineSeparators.detect_bytes( content_bytes )
if not __.detextive.is_textual_mimetype( mimetype ):
mimetype, _ = __.detextive.detect_mimetype_and_charset(
content_bytes, url, charset = charset
)
if not __.detextive.is_textual_mimetype( mimetype ):
raise __.detextive.exceptions.TextualMimetypeInvalidity(
url, mimetype
)
linesep = __.detextive.LineSeparators.detect_bytes( content_bytes )
if linesep is None:
_scribe.warning( f"No line separator detected in '{url}'." )
linesep = _parts.LineSeparators( __.os.linesep )
linesep = __.detextive.LineSeparators( __.os.linesep )
try: content = content_bytes.decode( charset )
except Exception as exc:
raise ContentDecodeFailure( url, charset ) from exc
Expand Down Expand Up @@ -157,102 +169,6 @@ def _collect_directory_files(
return paths


def _detect_charset( content: bytes ) -> str | None:
from chardet import detect
charset = detect( content )[ 'encoding' ]
if charset is None: return charset
if charset.startswith( 'utf' ): return charset
match charset:
case 'ascii': return 'utf-8' # Assume superset.
case _: pass
# Shake out false positives, like 'MacRoman'.
try: content.decode( 'utf-8' )
except UnicodeDecodeError: return charset
return 'utf-8'


def _detect_mimetype( content: bytes, location: str | __.Path ) -> str | None:
from mimetypes import guess_type
from puremagic import PureError, from_string # pyright: ignore
try: return from_string( content, mime = True )
except ( PureError, ValueError ):
return guess_type( str( location ) )[ 0 ]


def _detect_mimetype_and_charset(
content: bytes,
location: str | __.Path, *,
mimetype: __.Absential[ str ] = __.absent,
charset: __.Absential[ str ] = __.absent,
) -> tuple[ str, str | None ]:
from .exceptions import TextualMimetypeInvalidity
if __.is_absent( mimetype ):
mimetype_ = _detect_mimetype( content, location )
else: mimetype_ = mimetype
if __.is_absent( charset ): # noqa: SIM108
charset_ = _detect_charset( content )
else: charset_ = charset
if not mimetype_:
if charset_:
mimetype_ = 'text/plain'
_validate_mimetype_with_trial_decode(
content, location, mimetype_, charset_ )
return mimetype_, charset_
mimetype_ = 'application/octet-stream'
if _is_textual_mimetype( mimetype_ ):
return mimetype_, charset_
if charset_ is None:
raise TextualMimetypeInvalidity( location, mimetype_ )
_validate_mimetype_with_trial_decode(
content, location, mimetype_, charset_ )
return mimetype_, charset_


def _is_reasonable_text_content( content: str ) -> bool:
''' Checks if decoded content appears to be meaningful text. '''
if not content: return False
# Check for excessive repetition of single characters (likely binary)
if len( set( content ) ) == 1: return False
# Check for excessive control characters (excluding common whitespace)
common_whitespace = '\t\n\r'
ascii_control_limit = 32
control_chars = sum(
1 for c in content
if ord( c ) < ascii_control_limit and c not in common_whitespace )
if control_chars > len( content ) * 0.1: return False # >10% control chars
# Check for reasonable printable character ratio
printable_chars = sum(
1 for c in content if c.isprintable( ) or c in common_whitespace )
return printable_chars >= len( content ) * 0.8 # >=80% printable


# MIME types that are considered textual beyond those starting with 'text/'.
_TEXTUAL_MIME_TYPES = frozenset( (
'application/json',
'application/xml',
'application/xhtml+xml',
'application/x-perl',
'application/x-python',
'application/x-php',
'application/x-ruby',
'application/x-shell',
'application/javascript',
'image/svg+xml',
) )
# MIME type suffixes that indicate textual content.
_TEXTUAL_SUFFIXES = ( '+xml', '+json', '+yaml', '+toml' )
def _is_textual_mimetype( mimetype: str ) -> bool:
''' Checks if MIME type represents textual content. '''
_scribe.debug( f"MIME type: {mimetype}" )
if mimetype.startswith( ( 'text/', 'text/x-' ) ): return True
if mimetype in _TEXTUAL_MIME_TYPES: return True
if mimetype.endswith( _TEXTUAL_SUFFIXES ):
_scribe.debug(
f"MIME type '{mimetype}' accepted due to textual suffix." )
return True
return False


def _produce_fs_tasks(
location: str | __.Path, recursive: bool = False
) -> tuple[ __.cabc.Coroutine[ None, None, _parts.Part ], ...]:
Expand All @@ -277,19 +193,3 @@ async def _execute_session( ) -> _parts.Part:
) as client: return await _acquire_via_http( client, url )

return _execute_session( )


def _validate_mimetype_with_trial_decode(
content: bytes, location: str | __.Path, mimetype: str, charset: str
) -> None:
''' Validates charset fallback and returns appropriate MIME type. '''
from .exceptions import TextualMimetypeInvalidity
try: text = content.decode( charset )
except ( UnicodeDecodeError, LookupError ) as exc:
raise TextualMimetypeInvalidity( location, mimetype ) from exc
if _is_reasonable_text_content( text ):
_scribe.debug(
f"MIME type '{mimetype}' accepted after successful "
f"decode test with charset '{charset}' for '{location}'." )
return
raise TextualMimetypeInvalidity( location, mimetype )
8 changes: 0 additions & 8 deletions sources/mimeogram/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,6 @@ def __init__( self, species: str ):
super( ).__init__( f"Could not discover valid {species}." )


class TextualMimetypeInvalidity( Omnierror ):
''' Invalid textual MIME type for content at location. '''

def __init__( self, location: str | __.Path, mimetype: str ):
super( ).__init__(
f"Invalid MIME type '{mimetype}' for content at '{location}'." )


class TokenizerVariantInvalidity( Omnierror ):
''' Invalid tokenizer variant. '''

Expand Down
2 changes: 1 addition & 1 deletion sources/mimeogram/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def format_mimeogram(
location = 'mimeogram://message',
mimetype = 'text/plain', # TODO? Markdown
charset = 'utf-8',
linesep = _parts.LineSeparators.LF,
linesep = __.detextive.LineSeparators.LF,
content = message )
lines.append( format_part( message_part, boundary ) )
for part in parts:
Expand Down
8 changes: 5 additions & 3 deletions sources/mimeogram/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,19 @@ def _parse_descriptor_and_content(


_QUOTES = '"\''
def _parse_mimetype( header: str ) -> tuple[ str, str, _parts.LineSeparators ]:
def _parse_mimetype( header: str ) -> tuple[
str, str, __.detextive.LineSeparators
]:
''' Extracts MIME type and charset from Content-Type header. '''
parts = [ p.strip( ) for p in header.split( ';' ) ]
mimetype = parts[ 0 ]
charset = 'utf-8'
linesep = _parts.LineSeparators.LF
linesep = __.detextive.LineSeparators.LF
for part in parts[ 1: ]:
if part.startswith( 'charset=' ):
charset = part[ 8: ].strip( _QUOTES )
if part.startswith( 'linesep=' ):
linesep = _parts.LineSeparators[
linesep = __.detextive.LineSeparators[
part[ 8: ].strip( _QUOTES ).upper( ) ]
return mimetype, charset, linesep

Expand Down
44 changes: 1 addition & 43 deletions sources/mimeogram/parts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,6 @@
from . import fsprotect as _fsprotect


class LineSeparators( __.enum.Enum ):
''' Line separators for various platforms. '''

CR = '\r' # Classic MacOS
CRLF = '\r\n' # DOS/Windows
LF = '\n' # Unix/Linux

@classmethod
def detect_bytes(
selfclass, content: bytes, limit = 1024
) -> "LineSeparators | None":
''' Detects newline characters in bytes array. '''
sample = content[ : limit ]
found_cr = False
for byte in sample:
match byte:
case 0xd:
if found_cr: return selfclass.CR
found_cr = True
case 0xa: # linefeed
if found_cr: return selfclass.CRLF
return selfclass.LF
case _:
if found_cr: return selfclass.CR
return None

@classmethod
def normalize_universal( selfclass, content: str ) -> str:
''' Normalizes all varieties of newline characters in text. '''
return content.replace( '\r\n', '\r' ).replace( '\r', '\n' )

def nativize( self, content: str ) -> str:
''' Nativizes specific variety newline characters in text. '''
if LineSeparators.LF is self: return content
return content.replace( '\n', self.value )

def normalize( self, content: str ) -> str:
''' Normalizes specific variety newline characters in text. '''
if LineSeparators.LF is self: return content
return content.replace( self.value, '\n' )


class Resolutions( __.enum.Enum ):
''' Available resolutions for each part. '''

Expand All @@ -79,7 +37,7 @@ class Part( __.immut.DataclassObject ):
location: str # TODO? 'Url' class
mimetype: str
charset: str
linesep: "LineSeparators"
linesep: __.detextive.LineSeparators
content: str

# TODO? 'format' method
Expand Down
2 changes: 1 addition & 1 deletion sources/mimeogram/updaters.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def _update_content_atomic(
location: __.Path,
content: str,
charset: str = 'utf-8',
linesep: _parts.LineSeparators = _parts.LineSeparators.LF
linesep: __.detextive.LineSeparators = __.detextive.LineSeparators.LF
) -> None:
''' Updates file content atomically, if possible. '''
import aiofiles.os as os # noqa: PLR0402
Expand Down
12 changes: 0 additions & 12 deletions tests/test_000_mimeogram/test_100_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,6 @@ def test_070_mimeogram_format_failures( ):
assert reason in str( exc )


def test_080_mimetype_failures( ):
''' MIME type failure exceptions. '''
exceptions = cache_import_module( f"{PACKAGE_NAME}.exceptions" )

location = Path( '/test/path' )
mimetype = 'application/octet-stream'
exc = exceptions.TextualMimetypeInvalidity( location, mimetype )
assert isinstance( exc, exceptions.Omnierror )
assert str( location ) in str( exc )
assert mimetype in str( exc )


def test_090_program_absence( ):
''' Program absence error exceptions. '''
exceptions = cache_import_module( f"{PACKAGE_NAME}.exceptions" )
Expand Down
Loading
Loading