Source code for niemafs.common
#! /usr/bin/env python
'''
Common variables, classes, functions, etc.
'''
# standard imports
from abc import ABC, abstractmethod
from gzip import open as gopen
from io import DEFAULT_BUFFER_SIZE
from pathlib import Path
from sys import stdin, stdout
# constants
DEFAULT_BUFFER_SIZE = 8388608 # 8 MB
DEFAULT_COMPRESS_LEVEL = 9
SAFE_CHARS = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-')
[docs]
def clean_string(s):
'''Clean a string (binary or normal) by right-stripping 0x00 and spaces.
Args:
`s` (`bytes`): The ISO 9660 string to clean.
Returns:
`str`: The cleaned string.
'''
if isinstance(s, bytes):
return s.rstrip(b'\x00').decode().rstrip()
else:
return s.rstrip()
[docs]
def safename(s):
'''Convert a string into a version that is safe for a filename
Args:
`s` (`str`): The original string.
Returns:
`str`: A version of `s` that is safe for a filename.
'''
return ''.join(c if c in SAFE_CHARS else '_' for c in s)
[docs]
def open_file(path, mode='rb', buffering=DEFAULT_BUFFER_SIZE, compresslevel=DEFAULT_COMPRESS_LEVEL):
'''Open a file for reading, writing, or appending. Automatically handles GZIP compression.
Args:
`path` (`Path`): The path of the file, or `None` for `stdin`/`stdout`.
`mode` (`str`): The mode in which to open the file.
`buffering` (`int`): The buffer size for buffered input/output.
Returns:
`file`-like object
'''
mode = mode.strip().lower()
if path is None:
if 'r' in mode:
return stdin
else:
return stdout
if isinstance(path, str):
path = Path(path)
ext = path.suffix.strip().lower()
if ext == '.gz':
return gopen(path, mode=mode, compresslevel=compresslevel)
else:
return open(path, mode=mode, buffering=buffering)
[docs]
class FileSystem(ABC):
'''Base class to represent a file system'''
def __init__(self, path=None, file_obj=None):
'''Initialize this `FileSystem` object.
Args:
`path` (`Path`): The path of this `FileSystem` object (e.g. the file on disk, or directory if it's a folder on disk).
`file_obj` (`file`-like): The input stream of data for this `FileSystem`, or `None` if the `FileSystem` will be a folder on disk. Use `open` for files on disk, `gzip.open` for GZIP files on disk, `io.BytesIO` for bytes in-memory, etc.
'''
self.path = path
self.file = file_obj
@abstractmethod
def __iter__(self):
'''Iterate over the files and folders in this `FileSystem`
Yields:
Each file or folder in this `FileSystem` as a `tuple` containing the following elements: (1) the `Path` of the file/folder within this `FileSystem`, (2) the modification timestamp of this file/folder, and (3) the `bytes` of data for files or `None` for folders.
'''
pass
[docs]
def read_file(self, offset, length=None, return_to_init=False):
'''Read data from the underlying `file`-like object.
Args:
`offset` (`int`): The offset from which to start reading.
`length` (`int`): The number of bytes to read, or `None` to read to the end.
`return_to_init` (`bool`): `True` to seek back to the initial offset after finishing the read, otherwise `False` (faster)
Returns:
`bytes`: The read data.
'''
if return_to_init:
start_offset = self.file.tell()
self.file.seek(offset)
data = self.file.read(length)
if return_to_init:
self.file.seek(start_offset)
return data