#! /usr/bin/env python
'''
Handle ISO 9660 file systems
'''
# NiemaFS imports
from niemafs.common import clean_string, FileSystem
# imports
from datetime import datetime
from math import ceil
from pathlib import Path
from struct import unpack
from warnings import warn
# constants
ISO9660_PVD_MAGIC_WORD = bytes(ord(c) for c in 'CD001') # https://wiki.osdev.org/ISO_9660#Volume_Descriptors
MAGIC_WORD_SEARCH_SIZE = 50000 # fallback search window if auto-detect fails
# Common optical disc sector layouts (physical sector bytes, user data offset, user data size)
COMMON_LAYOUT_CANDIDATES = [
(2048, 0, 2048), # Standard ISO/UDF logical blocks
(2352, 16, 2048), # CD-ROM Mode 1 raw: sync(12)+hdr(4)+data(2048)+...
(2352, 16, 2336), # CD-ROM Mode 2 raw "formless" data payload after hdr (rare for ISO9660)
(2352, 24, 2048), # CD-ROM XA Mode 2 Form 1 raw: +subhdr(8) then 2048
(2352, 24, 2324), # CD-ROM XA Mode 2 Form 2 raw: +subhdr(8) then 2324
(2352, 0, 2352), # CD-DA audio frame (Red Book)
(2448, 16, 2048), # Mode 1 raw + 96B subchannel
(2448, 16, 2336), # Mode 2 raw + 96B subchannel (formless)
(2448, 24, 2048), # XA Form 1 raw + 96B subchannel
(2448, 24, 2324), # XA Form 2 raw + 96B subchannel
(2448, 0, 2352), # Audio + 96B subchannel (CDG/Karaoke/CD-DA+sub)
(2340, 4, 2048), # Mode 1 without sync (hdr at 0..3, user at 4)
(2340, 4, 2336), # Mode 2 without sync (hdr at 0..3, "data" after)
(2336, 0, 2336), # Mode 2 payload-only (no sync/hdr)
(2336, 0, 2048), # Mode 1 sans sync+hdr: first 2048 are user, then EDC/zero/ECC
(2324, 0, 2324), # XA Form 2 user data only
(2328, 0, 2324), # XA Form 2 "data + 4 spare" (libcdio notes 2328 = 2324+4 spare)
(2332, 8, 2324), # XA "subheader + 2324" (8 + 2324) a.k.a. M2SUB_SECTOR_SIZE
(2056, 8, 2048), # XA Form 1 cooked with subheader preserved (8 + 2048)
(2052, 0, 2048), # Mode 1 cooked-ish with EDC preserved (2048 + 4)
(2064, 16, 2048), # Mode 1 with only sync+hdr kept (12+4+2048)
(2076, 16, 2048), # Mode 1 with sync+hdr+data+EDC+zero (12+4+2048+4+8)
]
[docs]
class IsoFS(FileSystem):
'''Class to represent an `ISO 9660 <https://wiki.osdev.org/ISO_9660>`_ optical disc'''
def __init__(self, file_obj, path=None):
# set things up
if file_obj is None:
raise ValueError("file_obj must be a file-like")
super().__init__(path=path, file_obj=file_obj)
self.logical_block_size = None # ISO logical block size (usually 2048)
self.physical_logical_block_size = None # bytes per physical sector in the image (e.g., 2048, 2352, 2448)
self.user_data_offset = None # byte offset of ISO user data within a physical sector
self.user_data_size = None # bytes of ISO user data per physical sector (usually 2048)
self.system_area = None # system area data
self.volume_descriptors = dict() # keys = Volume Descriptor Type codes, values = bytes
# detect sector layout and load header to ensure file validity up-front
self.detect_layout()
self.get_system_area()
self.get_volume_descriptors()
# if the PVD states a logical block size, honor it (it should be 2048 for ISO 9660)
try:
pvd = self.parse_primary_volume_descriptor()
if pvd is not None:
lbs = pvd.get('logical_block_size_LE')
if isinstance(lbs, int) and lbs > 0:
self.logical_block_size = lbs
except:
pass # if PVD parsing fails here, the image might still be readable via other volume descriptors.
[docs]
def tz_offset_to_datetime_str(x):
'''Convert an ISO 9660 timezone offset to a `datetime` format string
Args:
`x` (`int`): The ISO 9660 timezone offset.
Returns:
`str`: The `datetime` format string.
'''
tz_offset_hours = (x / 4) - 12
tz_offset_sign = '-' if tz_offset_hours < 0 else '+'
tz_offset_hh = str(abs(int(tz_offset_hours))).zfill(2)
tz_offset_mm = str(int((abs(tz_offset_hours) % 1) * 60)).zfill(2)
return '%s%s%s' % (tz_offset_sign, tz_offset_hh, tz_offset_mm)
[docs]
def parse_pvd_datetime(data):
'''Parse a date/time in the `ISO 9660 Primary Volume Descriptor (PVD) date/time format <https://wiki.osdev.org/ISO_9660#Date/time_format>`_.
Args:
`data` (`bytes`): A date/time (exactly 17 bytes) in the ISO 9660 PVD date/time format.
Returns:
`datetime`: A Python `datetime` object.
'''
if len(data) != 17:
raise ValueError("ISO 9660 PVD date/time must be exactly 17 bytes: %s" % data)
dt_str = ''.join(str(v) if v < 48 else chr(v) for v in data[0:16]) + '0000' # chr(48) == '0'
tz_offset_str = IsoFS.tz_offset_to_datetime_str(data[16])
try:
return datetime.strptime(dt_str + tz_offset_str, '%Y%m%d%H%M%S%f%z')
except ValueError:
return datetime.strptime(dt_str, '%Y%m%d%H%M%S%f') # timezone is sometimes messed up
[docs]
def parse_directory_datetime(data):
'''Parse a date/time in the `ISO 9660 directory record date/time format <https://wiki.osdev.org/ISO_9660#Directories>.`_
Args:
`data` (`bytes`): A date/time (exactly 7 bytes) in the ISO 9660 directory record date/time format.
Returns:
`datetime`: A Python `datetime` object.
'''
dt_str = str(data[0] + 1900) + ''.join(str(x).zfill(2) for x in data[1:6])
tz_offset_str = IsoFS.tz_offset_to_datetime_str(data[6])
try:
return datetime.strptime(dt_str + tz_offset_str, '%Y%m%d%H%M%S%f%z')
except ValueError:
return datetime.strptime(dt_str, '%Y%m%d%H%M%S%f') # timezone is sometimes messed up
[docs]
def parse_directory_record(data):
'''Parse an `ISO 9660 directory record <https://wiki.osdev.org/ISO_9660#Directories>`_.
Args:
`data` (`bytes`): The raw bytes of the directory record.
Returns:
`dict`: The parsed directory record.
'''
# parse directory record data
out = dict()
out['directory_record_length'] = data[0] # should be equal to len(data)
out['extended_attribute_record_length'] = data[1] # extended attribute record length
out['data_location_LE'] = unpack('<I', data[2:6])[0] # location (LBA) of data (little-endian)
out['data_location_BE'] = unpack('>I', data[6:10])[0] # location (LBA) of data (big-endian) (should be equal to previous)
out['data_length_LE'] = unpack('<I', data[10:14])[0] # length of data (little-endian)
out['data_length_BE'] = unpack('>I', data[14:18])[0] # length of data (big-endian) (should be equal to previous)
out['datetime'] = data[18:25] # recording date and time
out['file_flags'] = data[25] # file flags
out['interleave_file_unit_size'] = data[26] # file unit size for files recorded in interleaved mode (otherwise 0)
out['interleave_gap_size'] = data[27] # gap size for files recorded in interleaved mode (otherwise 0)
out['volume_sequence_number_LE'] = unpack('<H', data[28:30])[0] # volume this extent is recorded on (little-endian)
out['volume_sequence_number_BE'] = unpack('>H', data[30:32])[0] # volume this extent is recorded on (big-endian) (should be equal to previous)
out['filename_length'] = data[32] # length of filename (terminated with ';1' where 1 is file version number)
out['filename'] = data[33 : 33+out['filename_length']] # filename (terminated with ';1' where 1 is file version number)
# I'm skipping "padding field" and "system use" since they're not useful to me and non-trivial to code
# parse file flags
out['file_flags'] = {
'is_hidden': bool(out['file_flags'] & 0b00000001),
'is_directory': bool(out['file_flags'] & 0b00000010),
'is_associated_file': bool(out['file_flags'] & 0b00000100),
'format_in_extended_attribute': bool(out['file_flags'] & 0b00001000),
'permissions_in_extended_attribute': bool(out['file_flags'] & 0b00010000),
'reserved_5': bool(out['file_flags'] & 0b00100000),
'reserved_6': bool(out['file_flags'] & 0b01000000),
'not_final_directory': bool(out['file_flags'] & 0b10000000),
}
# clean strings
for k in ['filename']:
try:
out[k] = clean_string(out[k])
except:
warn("Unable to parse Directory Record '%s' as string: %s" % (k, out[k]))
# parse date-times
for k in ['datetime']:
try:
out[k] = IsoFS.parse_directory_datetime(out[k])
except:
warn("Unable to parse Directory Record '%s' as date-time: %s" % (k, out[k]))
# return final parsed data
return out
[docs]
def read_user_blocks(self, lba, count=1):
'''Read ISO logical blocks (user data blocks) starting at a specific LBA, returning concatenated user data.
Args:
`lba` (`int`): The first LBA of the read.
`count`: The number of ISO logical blocks to read.
Returns:
`bytes`: The read data.
'''
if count <= 0:
return b''
out = bytearray()
for i in range(count):
phys_off = (lba + i) * self.physical_logical_block_size + self.user_data_offset
out.extend(self.read_file(phys_off, self.user_data_size))
return bytes(out)
[docs]
def read_extent(self, lba, length):
'''Read bytes from the ISO extent starting at a specific LBA (in user-data LBAs).
Args:
`lba`: The first LBA of the read.
`length` (`int`): The number of bytes to read.
Returns:
`bytes`: The read data.
'''
if length <= 0:
return b''
blocks = ceil(length / self.user_data_size)
data = self.read_user_blocks(lba, blocks)
return data[:length]
[docs]
def looks_like_pvd(self, block: bytes) -> bool:
'''Validate the start of an ISO 9660 Volume Descriptor block.
Args:
`block` (`bytes`): The block to validate.
Returns:
`bool`: `True` if the block looks valid, otherwise `False`.
'''
return (block is not None) and (len(block) > 6) and (block[0] == 1) and (block[1:6] == ISO9660_PVD_MAGIC_WORD) and (block[6] == 1)
[docs]
def detect_layout(self):
'''Detect physical sector size, user data offset, and user data size by validating the PVD at LBA 16.'''
if self.physical_logical_block_size is not None:
return
# try common known layouts by reading LBA 16 (PVD location)
for (phys, off, udsz) in COMMON_LAYOUT_CANDIDATES:
try:
self.physical_logical_block_size = phys
self.user_data_offset = off
self.user_data_size = udsz
self.logical_block_size = udsz # logical_block_size == logical block size for ISO-level parsing
pvd = self.read_user_blocks(16, 1)
if self.looks_like_pvd(pvd):
return
except:
pass
# if we reach here, we failed (unknown layout)
raise ValueError("ISO layout does not match known existing layouts")
[docs]
def get_physical_logical_block_size(self):
'''Return the ISO physical logical block size.
Returns:
`int`: The ISO physical logical block size.
'''
if self.physical_logical_block_size is None:
self.detect_layout()
return self.physical_logical_block_size
[docs]
def get_user_data_offset(self):
'''Return the ISO user data offset.
Returns:
`int`: The ISO user data offset.
'''
if self.user_data_offset is None:
self.detect_layout()
return self.user_data_offset
[docs]
def get_user_data_size(self):
'''Return the ISO user data size.
Returns:
`int`: The ISO user data size.
'''
if self.user_data_size is None:
self.detect_layout()
return self.user_data_size
[docs]
def get_logical_block_size(self):
'''Return the ISO logical block size.
Returns:
`int`: The ISO logical block size in bytes.
'''
if self.logical_block_size is None:
self.detect_layout()
return self.logical_block_size
[docs]
def get_system_area(self):
'''Return the System Area (logical sectors 0x00-0x0F = first 16 sectors) of the ISO.
Returns:
`bytes`: The System Area (first 16 ISO logical blocks).
'''
if self.system_area is None:
self.system_area = self.read_user_blocks(0, 16)
return self.system_area
[docs]
def get_volume_descriptors(self):
'''Return the Volume Descriptors of the ISO.
Returns:
`dict`: Keys are `Volume Descriptor Type codes <https://wiki.osdev.org/ISO_9660#Volume_Descriptor_Type_Codes>`_, and values are `bytes` of the corresponding volume descriptor.
'''
if len(self.volume_descriptors) == 0:
lba = 16 # Volume Descriptors begin at LBA 16 and continue until type code 255
while True:
next_volume_descriptor = self.read_user_blocks(lba, 1)
if len(next_volume_descriptor) < 7 or next_volume_descriptor[1:6] != ISO9660_PVD_MAGIC_WORD:
warn("Volume Descriptor at LBA %d does not look like an ISO 9660 descriptor" % lba)
self.volume_descriptors[next_volume_descriptor[0]] = next_volume_descriptor
if next_volume_descriptor[0] == 255: # Volume Descriptor Set Terminator
break
lba += 1
return self.volume_descriptors
[docs]
def get_boot_record(self):
'''Return the Boot Record (Volume Descriptor code 0) of the ISO.
Returns:
`bytes`: The Boot Record (Volume Descriptor code 0) of the ISO, or `None` if the ISO does not have one.
'''
try:
return self.get_volume_descriptors()[0]
except KeyError:
return None
[docs]
def get_primary_volume_descriptor(self):
'''Return the Primary Volume Descriptor (PVD; Volume Descriptor code 1) of the ISO.
Returns:
`bytes`: The Primary Volume Descriptor (PVD; Volume Descriptor code 1) of the ISO, or `None` if the ISO does not have one.
'''
try:
return self.get_volume_descriptors()[1]
except KeyError:
return None
[docs]
def get_supplementary_volume_descriptor(self):
'''Return the Supplementary Volume Descriptor (Volume Descriptor code 2) of the ISO.
Returns:
`bytes`: The Supplementary Volume Descriptor (Volume Descriptor code 2) of the ISO, or `None` if the ISO does not have one.
'''
try:
return self.get_volume_descriptors()[2]
except KeyError:
return None
[docs]
def get_volume_partition_descriptor(self):
'''Return the Volume Partition Descriptor (Volume Descriptor code 3) of the ISO.
Returns:
`bytes`: The Volume Partition Descriptor (Volume Descriptor code 3) of the ISO, or `None` if the ISO does not have one.
'''
try:
return self.get_volume_descriptors()[3]
except KeyError:
return None
[docs]
def get_volume_descriptor_set_terminator(self):
'''Return the Volume Descriptor Set Terminator (Volume Descriptor code 0xFF = 255) of the ISO.
Returns:
`bytes`: The Volume Descriptor Set Terminator (Volume Descriptor code 0xFF = 255) of the ISO, or `None` if the ISO does not have one.
'''
try:
return self.get_volume_descriptors()[255]
except KeyError:
raise ValueError("ISO does not have a Volume Descriptor Set Terminator")
[docs]
def parse_boot_record(self):
'''Return a parsed version of the `Boot Record <https://wiki.osdev.org/ISO_9660#The_Boot_Record>`_ of the ISO.
Returns:
`dict`: A parsed version of the Boot Record of the ISO, or `None` if the ISO does not have one.
'''
# set things up
br = self.get_boot_record()
if br is None:
return None
out = dict()
# parse raw Boot Record data
out['type_code'] = br[0] # should always be 0
out['identifier'] = br[1:6] # should always be "CD001"
out['version'] = br[6] # should always be 1?
out['boot_system_identifier'] = br[7:39] # ID of the system which can act on and boot the system from the boot record
out['boot_identifier'] = br[39:71] # ID of the boot system defined in the rest of this descriptor
out['boot_system_use'] = br[71:] # Custom - used by the boot system
# clean strings
for k in ['identifier', 'boot_system_identifier', 'boot_identifier']:
try:
out[k] = clean_string(out[k])
except:
warn("Unable to parse Boot Record '%s' as string: %s" % (k, out[k]))
# return final parsed data
return out
[docs]
def parse_primary_volume_descriptor(self):
'''Return a parsed version of the `Primary Volume Descriptor (PVD) <https://wiki.osdev.org/ISO_9660#The_Primary_Volume_Descriptor>`_ of the ISO.
Returns:
`dict`: A parsed version of the Primary Volume Descriptor (PVD) of the ISO, or `None` if the ISO does not have one.
'''
# set things up
pvd = self.get_primary_volume_descriptor()
if pvd is None:
return None
out = dict()
# parse raw PVD data
out['type_code'] = pvd[0] # should always be 1
out['identifier'] = pvd[1:6] # should always be "CD001"
out['version'] = pvd[6] # should always be 1?
out['offset_7'] = pvd[7] # should always be 0
out['system_identifier'] = pvd[8:40] # Name of the system that can act upon sectors 0x00-0x0F for the volume
out['volume_identifier'] = pvd[40:72] # Identification (label) of this volume
out['offsets_72_79'] = pvd[72:80] # should always be all 0s
out['volume_space_size_LE'] = unpack('<I', pvd[80:84])[0] # Volume Space Size (little-endian)
out['volume_space_size_BE'] = unpack('>I', pvd[84:88])[0] # Volume Space Size (big-endian) (should be equal to previous)
out['offsets_88_119'] = pvd[88:120] # should always be all 0s
out['volume_set_size_LE'] = unpack('<H', pvd[120:122])[0] # Volume Set Size (little-endian)
out['volume_set_size_BE'] = unpack('>H', pvd[122:124])[0] # Volume Set Size (big-endian) (should be equal to previous)
out['volume_sequence_number_LE'] = unpack('<H', pvd[124:126])[0] # Volume Sequence Number (little-endian)
out['volume_sequence_number_BE'] = unpack('>H', pvd[126:128])[0] # Volume Sequence Number (big-endian) (should be equal to previous)
out['logical_block_size_LE'] = unpack('<H', pvd[128:130])[0] # Logical Block Size (little-endian)
out['logical_block_size_BE'] = unpack('>H', pvd[130:132])[0] # Logical Block Size (big-endian) (should be equal to previous)
out['path_table_size_LE'] = unpack('<I', pvd[132:136])[0] # Path Table Size (little-endian)
out['path_table_size_BE'] = unpack('>I', pvd[136:140])[0] # Path Table Size (big-endian) (should be equal to previous)
out['location_L_path_table'] = unpack('<I', pvd[140:144])[0] # Location of Type-L Path Table
out['location_optional_L_path_table'] = unpack('<I', pvd[144:148])[0] # Location of Optional Type-L Path Table
out['location_M_path_table'] = unpack('>I', pvd[148:152])[0] # Location of Type-M Path Table
out['location_optional_M_path_table'] = unpack('>I', pvd[152:156])[0] # Location of Optional Type-M Path Table
out['root_directory_entry'] = pvd[156:190] # Directory Entry for Root Directory
out['volume_set_identifier'] = pvd[190:318] # Volume Set Identifier
out['publisher_identifier'] = pvd[318:446] # Publisher Identifier
out['data_preparer_identifier'] = pvd[446:574] # Data Preparer Identifier
out['application_identifier'] = pvd[574:702] # Application Identifier
out['copyright_file_identifier'] = pvd[702:739] # Copyright File Identifier
out['abstract_file_identifier'] = pvd[739:776] # Abstract File Identifier
out['bibliographic_file_identifier'] = pvd[776:813] # Bibliographic File Identifier
out['volume_creation_datetime'] = pvd[813:830] # Volume Creation Date and Time
out['volume_modification_datetime'] = pvd[830:847] # Volume Modification Date and Time
out['volume_expiration_datetime'] = pvd[847:864] # Volume Expiration Date and Time
out['volume_effective_datetime'] = pvd[864:881] # Volume Effective Date and Time
out['file_structure_version'] = pvd[881] # File Structure Version (should always be 1)
out['offset_882'] = pvd[882] # should always be 0
out['application_used'] = pvd[883:1395] # Application Used (not defined by ISO 9660)
out['reserved'] = pvd[1395:2048] # Reserved by ISO
# clean strings
for k in ['identifier', 'system_identifier', 'volume_identifier', 'volume_set_identifier', 'publisher_identifier', 'data_preparer_identifier', 'application_identifier', 'copyright_file_identifier', 'abstract_file_identifier', 'bibliographic_file_identifier']:
try:
out[k] = clean_string(out[k])
except:
warn("Unable to parse Primary Volume Descriptor '%s' as string: %s" % (k, out[k]))
# parse date-times
for k in ['volume_creation_datetime', 'volume_modification_datetime', 'volume_expiration_datetime', 'volume_effective_datetime']:
try:
out[k] = IsoFS.parse_pvd_datetime(out[k])
except:
warn("Unable to parse PVD '%s' as date-time: %s" % (k, out[k]))
# parse root directory entry (this must succeed to be able to parse files in this ISO)
out['root_directory_entry'] = IsoFS.parse_directory_record(out['root_directory_entry'])
# return final parsed data
return out
[docs]
def parse_volume_descriptor_set_terminator(self):
'''Return a parsed version of the `Volume Descriptor Set Terminator <https://wiki.osdev.org/ISO_9660#Volume_Descriptor_Set_Terminator>`_ of the ISO.
Returns:
`dict`: A parsed version of the Volume Descriptor Set Terminator of the ISO, or `None` if the ISO does not have one.
'''
# set things up
vdst = self.get_volume_descriptor_set_terminator()
if vdst is None:
return None
out = dict()
# parse raw VDST data
out['type_code'] = vdst[0] # should always be 255
out['identifier'] = vdst[1:6] # should always be "CD001"
out['version'] = vdst[6] # should always be 1
out['unused'] = vdst[7:] # remaining bytes are not part of ISO 9660
# clean strings
for k in ['identifier']:
try:
out[k] = clean_string(out[k])
except:
warn("Unable to parse Volume Descriptor Set Terminator '%s' as string: %s" % (k, out[k]))
# return final parsed data
return out
def __iter__(self):
# load root directory entry from PVD
pvd = self.parse_primary_volume_descriptor()
if pvd is None:
return
to_visit = [(Path(''), pvd['root_directory_entry'])] # (Path, directory entry) tuples
# perform search starting from root directory (only contains directories, not files)
while len(to_visit) != 0:
# handle current directory
curr_path, curr_directory_entry = to_visit.pop()
if curr_path != Path(''):
yield (curr_path, curr_directory_entry['datetime'], None)
# read directory data (extent) using ISO LBAs (data_location_LE)
curr_data = self.read_extent(curr_directory_entry['data_location_LE'], curr_directory_entry['data_length_LE'])
ind = 0
while True:
# load next entry if not at end of this directory
if ind >= len(curr_data):
break
next_len = curr_data[ind]
if next_len == 0:
break
next_entry = IsoFS.parse_directory_record(curr_data[ind:ind + next_len])
next_entry_fn = next_entry['filename']
# next entry is a directory (add it to `to_visit`)
if next_entry['file_flags']['is_directory']:
if next_entry_fn not in {'', '\x01'}: # ignore '.' and '..'
to_visit.append((curr_path / next_entry_fn, next_entry))
# next entry is a file (yield it)
else:
next_data = self.read_extent(next_entry['data_location_LE'], next_entry['data_length_LE'])
yield (curr_path / next_entry_fn, next_entry['datetime'], next_data)
ind += next_len