Source code for niemafs.hfs

#! /usr/bin/env python
'''
Handle Apple HFS file systems
'''

# NiemaFS imports
from niemafs.common import FileSystem

# imports
from codecs import lookup as lookup_codec
from datetime import datetime, timedelta
from math import ceil
from pathlib import Path
from struct import unpack
from warnings import warn

# constants
MACTIME_START = datetime(1904, 1, 1)
HFS_LOGICAL_BLOCK_SIZE = 512
HFS_MDB_SIGNATURE = b'BD'   # classic HFS MDB signature
HFS_DDR_SIGNATURE = b'ER'   # Apple Driver Descriptor Record
HFS_APM_SIGNATURE = b'PM'   # Apple Partition Map entry
HFS_ROOT_CNID = 2
HFS_EXTENTS_CNID = 3
HFS_CATALOG_CNID = 4
HFS_DATA_FORK = 0x00
HFS_RESOURCE_FORK = 0xFF
HFS_SIGNATURE_SCAN_SIZE = 1024 * 1024
DEFAULT_HFS_TEXT_ENCODING = 'cp932'

# Physical sector size, user-data offset within sector, user-data bytes.
# The 2352/16 case is what makes an APM/DDR "ER" appear at file offset 0x10.
COMMON_HFS_LAYOUT_CANDIDATES = [
    ( 512,  0,  512), # raw HFS with 512 sector size
    (2048,  0, 2048), # raw HFS with 2048 sector size
    (2352, 16, 2048), # CD-ROM Mode 1 raw
    (2352, 24, 2048), # CD-ROM XA Form 1 raw
    (2448, 16, 2048), # raw + subchannel
    (2448, 24, 2048), # XA raw + subchannel
    (2340,  4, 2048), # no sync, 4-byte header
    (2064, 16, 2048), # sync/header + data
    (2076, 16, 2048), # sync/header + data + EDC/zero
]


class HfsBTree:
    '''Class to represent HFS B-Tree'''
    NODE_HEADER_SIZE = 14
    NODE_TYPE_HEADER = 0x01
    NODE_TYPE_LEAF = 0xFF

    def __init__(self, data=b''):
        # set things up
        self.data = data
        self.node_size = 512

        # parse B-tree header
        header = self.read_node(0)
        desc = self.node_descriptor(header)
        if desc['type'] != self.NODE_TYPE_HEADER:
            raise ValueError("HFS B-tree node 0 is not a header node")
        rec0 = self.node_records(header)[0]
        if len(rec0) < 106:
            raise ValueError("HFS B-tree header record is too short")
        self.depth =        unpack('>H', rec0[0:2])[0]
        self.root =         unpack('>I', rec0[2:6])[0]
        self.record_count = unpack('>I', rec0[6:10])[0]
        self.first_leaf =   unpack('>I', rec0[10:14])[0]
        self.last_leaf =    unpack('>I', rec0[14:18])[0]
        self.node_size =    unpack('>H', rec0[18:20])[0] or 512
        self.key_len =      unpack('>H', rec0[20:22])[0]
        self.node_count =   unpack('>I', rec0[22:26])[0]
        self.free_nodes =   unpack('>I', rec0[26:30])[0]

    def read_node(self, node_number):
        '''Read a B-Tree node

        Args:
            `node_number` (`int`): The node number to read

        Returns:
            `bytes`: The node that was read
        '''
        start = node_number * self.node_size
        end = start + self.node_size
        node = self.data[start:end]
        if len(node) != self.node_size:
            raise ValueError("HFS B-tree node %d is incomplete" % node_number)
        return node

    def node_descriptor(self, node):
        '''Parse a descriptor of a node

        Args:
            `node` (`bytes`): The node to parse

        Returns:
            `dict`: The parsed node descriptor
        '''
        return {
            'flink':        unpack('>I', node[0:4])[0],
            'blink':        unpack('>I', node[4:8])[0],
            'type':         node[8],
            'height':       node[9],
            'record_count': unpack('>H', node[10:12])[0],
        }

    def node_records(self, node):
        '''Parse the records of a node

        Args:
            `node` (`bytes`): The node to parse

        Returns:
            `list`: The parsed node records
        '''
        n = unpack('>H', node[10:12])[0]
        offsets = [
            unpack('>H', node[self.node_size - 2 * (i + 1): self.node_size - 2 * i])[0]
            for i in range(n + 1)
        ]
        out = list()
        for i in range(n):
            start, end = offsets[i], offsets[i + 1]
            if start < self.NODE_HEADER_SIZE or end < start or end > self.node_size:
                raise ValueError("Bad HFS B-tree record offsets")
            out.append(node[start:end])
        return out

    def leaf_records(self):
        '''Iterate over the leaf records'''
        node_number = self.first_leaf
        seen = set()
        while node_number and node_number not in seen:
            seen.add(node_number)
            node = self.read_node(node_number)
            desc = self.node_descriptor(node)
            if desc['type'] != self.NODE_TYPE_LEAF:
                break
            for rec in self.node_records(node):
                yield rec
            if node_number == self.last_leaf:
                break
            node_number = desc['flink']


[docs] class HfsFS(FileSystem): '''Minimal reader for classic Apple HFS, not HFS+.''' def __init__( self, file_obj, path=None, text_encoding=DEFAULT_HFS_TEXT_ENCODING, text_errors='replace', ): # set things up if file_obj is None: raise ValueError("file_obj must be a file-like") if not text_encoding: raise ValueError("text_encoding must be a non-empty Python codec name") # Validate early so bad codec names fail clearly. try: lookup_codec(text_encoding) except LookupError as exc: raise ValueError("Unknown HFS text encoding: %r" % text_encoding) from exc self.text_encoding = text_encoding self.text_errors = text_errors super().__init__(path=path, file_obj=file_obj) self.physical_logical_block_size = None self.user_data_offset = None self.user_data_size = None self.image_base_offset = 0 self.volume_offset = None self.mdb = None self.allocation_block_size = None self.first_allocation_block = None self.extents_overflow = {} self.catalog_records = None self.detect_layout() self.parse_master_directory_block() self.load_extents_overflow()
[docs] @staticmethod def get_mac_time(seconds): '''Parse integer seconds as Mac Time Args: `seconds` (`int`): The seconds since the Mac Time start Returns: `datetime`: The parsed Mac Time ''' try: return MACTIME_START + timedelta(seconds=seconds) except Exception: return None
[docs] @staticmethod def decode_text_bytes(data, encoding, errors='replace'): '''Decode HFS text bytes using the caller-supplied codec. Args: `data` (`bytes`): The raw HFS bytes representing the string `encoding` (`str`): Python codec name, e.g. 'mac_roman', 'cp932', 'shift_jis' `errors` (`str`): Python decode error handling, e.g. 'replace' or 'strict' Returns: `str`: The decoded string ''' data = bytes(data).split(b'\0', 1)[0] return data.decode(encoding, errors=errors).rstrip()
[docs] def decode_text(self, data): '''Decode HFS text using this filesystem instance's configured encoding.''' return self.decode_text_bytes( data, encoding=self.text_encoding, errors=self.text_errors, )
[docs] def pstring(self, data): '''Decode a pstring, such as a volume name. Args: `data` (`bytes`): The raw bytes to decode Returns: `str`: The decoded string ''' if not data: return '' n = min(data[0], len(data) - 1) return self.decode_text(data[1:1 + n]).replace('/', ':')
[docs] def read_stream(self, offset, length): '''Read bytes from the logical user-data stream, stripping raw CD sector headers if needed. Args: `offset` (`int`): The offset from which to start reading. `length` (`int`): The number of bytes to read, or `None` to read to the end. Returns: `bytes`: The read data. ''' if length <= 0: return b'' if self.physical_logical_block_size == self.user_data_size and self.user_data_offset == 0: return self.read_file(self.image_base_offset + offset, length) out = bytearray() pos = offset remaining = length while remaining > 0: sector, inside = divmod(pos, self.user_data_size) n = min(remaining, self.user_data_size - inside) raw_off = ( self.image_base_offset + sector * self.physical_logical_block_size + self.user_data_offset + inside ) chunk = self.read_file(raw_off, n) if not chunk: break out.extend(chunk) got = len(chunk) pos += got remaining -= got if got < n: break return bytes(out)
[docs] def read_volume(self, offset, length): '''Read a volume Args: `offset` (`int`): The offset from which to start reading. `length` (`int`): The number of bytes to read, or `None` to read to the end. Returns: `bytes`: The read data. ''' return self.read_stream(self.volume_offset + offset, length)
[docs] def parse_extents(self, data): '''Parse extents Args: `data` (`bytes`): The raw data to parse Returns: `list`: The parsed extents ''' out = list() for i in range(0, min(len(data), 12), 4): start = unpack('>H', data[i : i+2])[0] count = unpack('>H', data[i+2 : i+4])[0] if count: out.append((start, count)) return out
[docs] def check_hfs(self, volume_offset): '''Check if this looks like a valid HFS starting at a given volume offset Args: `volume_offset` (`int`): The volume offset to check Returns: `bool`: `True` if this is a valid HFS, otherwise `False`. ''' mdb = self.read_stream(volume_offset + 2 * HFS_LOGICAL_BLOCK_SIZE, 162) if len(mdb) < 162 or mdb[0:2] != HFS_MDB_SIGNATURE: return False alloc_size = unpack('>I', mdb[20 : 24])[0] alloc_count = unpack('>H', mdb[18 : 20])[0] first_alloc = unpack('>H', mdb[28 : 30])[0] cat_size = unpack('>I', mdb[146 : 150])[0] cat_ext = self.parse_extents(mdb[150:162]) if alloc_size == 0 or alloc_size % HFS_LOGICAL_BLOCK_SIZE != 0: return False if alloc_count == 0 or first_alloc < 3: return False if cat_size == 0 or not cat_ext: return False return True
[docs] def find_apm(self, apm_base, block_size): '''Find the Apple Partition Map (APM) in an HFS filesystem Args: `apm_base` (`int`): The APM base to check `block_size` (`int`): The block size Returns: `int`: The volume offset of the APM base if found, otherwise `None`. ''' if block_size not in {512, 1024, 2048, 4096}: return None first = self.read_stream(apm_base + block_size, min(block_size, 512)) if len(first) < 136 or first[0:2] != HFS_APM_SIGNATURE: return None count = unpack('>I', first[4 : 8])[0] if count == 0 or count > 4096: return None fallback = None for i in range(1, count + 1): entry = self.read_stream(apm_base + i * block_size, min(block_size, 512)) if len(entry) < 136 or entry[0:2] != HFS_APM_SIGNATURE: continue start_block = unpack('>I', entry[8 : 12])[0] part_type = self.decode_text(entry[48:80]) vol_off = apm_base + start_block * block_size if self.check_hfs(vol_off): if part_type == 'Apple_HFS': return vol_off if fallback is None: fallback = vol_off return fallback
[docs] def probe_layout(self, scan=False): '''Probe the HFS layout for the Apple Partition Map (APM) Args: `scan` (`bool`): `True` to scan for the HFS DDR signature, otherwise `False`. Returns: `int`: The offset of the APM, otherwise `None`. ''' if self.check_hfs(0): return 0 ddr = self.read_stream(0, 512) if len(ddr) >= 8 and ddr[0:2] == HFS_DDR_SIGNATURE: block_size = unpack('>H', ddr[2 : 4])[0] found = self.find_apm(0, block_size) if found is not None: return found for block_size in (512, 2048): found = self.find_apm(0, block_size) if found is not None: return found if not scan: return None buf = self.read_stream(0, HFS_SIGNATURE_SCAN_SIZE) pos = 0 while True: pos = buf.find(HFS_DDR_SIGNATURE, pos) if pos < 0: break ddr = self.read_stream(pos, 512) if len(ddr) >= 8 and ddr[0:2] == HFS_DDR_SIGNATURE: block_size = unpack('>H', ddr[2 : 4])[0] found = self.find_apm(pos, block_size) if found is not None: return found pos += 1 pos = 0 while True: pos = buf.find(HFS_MDB_SIGNATURE, pos) if pos < 0: break vol_off = pos - 2 * HFS_LOGICAL_BLOCK_SIZE if vol_off >= 0 and vol_off % HFS_LOGICAL_BLOCK_SIZE == 0: if self.check_hfs(vol_off): return vol_off pos += 1 return None
[docs] def detect_layout(self): '''Detect HFS image layout''' if self.volume_offset is not None: return # first pass: exact probes; this catches direct images and raw CD sectors cleanly for phys, off, user_size in COMMON_HFS_LAYOUT_CANDIDATES: self.physical_logical_block_size = phys self.user_data_offset = off self.user_data_size = user_size self.image_base_offset = 0 found = self.probe_layout(scan=False) if found is not None: self.volume_offset = found return # second pass: signature scan; prefer non-identity/raw-CD layouts to avoid falsely # treating a raw sector header as a one-time file prefix scan_order = [c for c in COMMON_HFS_LAYOUT_CANDIDATES if c[0] != c[2] or c[1] != 0] scan_order += [c for c in COMMON_HFS_LAYOUT_CANDIDATES if c not in scan_order] for phys, off, user_size in scan_order: self.physical_logical_block_size = phys self.user_data_offset = off self.user_data_size = user_size self.image_base_offset = 0 found = self.probe_layout(scan=True) if found is not None: self.volume_offset = found return raise ValueError("No classic HFS volume found")
[docs] def parse_master_directory_block(self): '''Parse Master Directory Block Returns: `dict`: The parsed Master Directory Block ''' if self.mdb is not None: return self.mdb mdb = self.read_volume(2 * HFS_LOGICAL_BLOCK_SIZE, HFS_LOGICAL_BLOCK_SIZE) if len(mdb) < 162 or mdb[0:2] != HFS_MDB_SIGNATURE: raise ValueError("Not a classic HFS volume; missing MDB signature") self.mdb = { 'signature': mdb[0:2], 'created': HfsFS.get_mac_time(unpack('>I', mdb[2 : 6])[0]), 'modified': HfsFS.get_mac_time(unpack('>I', mdb[6 : 10])[0]), 'allocation_block_count': unpack('>H', mdb[18 : 20])[0], 'allocation_block_size': unpack('>I', mdb[20 : 24])[0], 'first_allocation_block': unpack('>H', mdb[28 : 30])[0], 'volume_name': self.pstring(mdb[36:64]), 'extents_file_size': unpack('>I', mdb[130 : 134])[0], 'extents_file_extents': self.parse_extents(mdb[134:146]), 'catalog_file_size': unpack('>I', mdb[146 : 150])[0], 'catalog_file_extents': self.parse_extents(mdb[150:162]), } self.allocation_block_size = self.mdb['allocation_block_size'] self.first_allocation_block = self.mdb['first_allocation_block'] return self.mdb
[docs] def read_from_extents(self, extents, length): '''Read from extents Args: `extents` (`list`): The extents to read from `length` (`int`): The number of bytes to read Returns: `bytes`: The read data ''' if length <= 0: return b'' out = bytearray() remaining = length for start, count in extents: if remaining <= 0: break n = min(remaining, count * self.allocation_block_size) allocation_block_offset = ( self.first_allocation_block * HFS_LOGICAL_BLOCK_SIZE + start * self.allocation_block_size ) out.extend(self.read_volume(allocation_block_offset, n)) remaining -= n if len(out) < length: warn("HFS fork is shorter than expected; extents-overflow data may be incomplete") return bytes(out[:length])
[docs] def resolve_extents_from_overflow(self, file_id, fork_type, initial_extents, length): extents = list(initial_extents) needed = ceil(length / self.allocation_block_size) if length else 0 have = sum(count for _, count in extents) if have >= needed: return extents for fabn, more_extents in sorted(self.extents_overflow.get((file_id, fork_type), [])): if fabn < have: continue extents.extend(more_extents) have = sum(count for _, count in extents) if have >= needed: break return extents
[docs] def read_fork(self, file_id, fork_type, initial_extents, length): extents = self.resolve_extents_from_overflow( file_id, fork_type, initial_extents, length, ) return self.read_from_extents(extents, length)
[docs] def parse_extents_overflow_records(self, data): records = {} if not data: return records tree = HfsBTree(data) for rec in tree.leaf_records(): if len(rec) < 20 or rec[0] != 7: continue fork_type = rec[1] file_id = unpack('>I', rec[2 : 6])[0] fabn = unpack('>H', rec[6 : 8])[0] extents = self.parse_extents(rec[8:20]) if extents: records.setdefault((file_id, fork_type), []).append((fabn, extents)) return records
[docs] def load_extents_overflow(self): self.extents_overflow = {} size = self.mdb['extents_file_size'] initial = self.mdb['extents_file_extents'] if not size or not initial: return self.extents_overflow extents = list(initial) # Usually one pass is enough. Extra passes allow the extents-overflow file # to describe additional extents of itself. for _ in range(3): data = self.read_from_extents(extents, size) parsed = self.parse_extents_overflow_records(data) self.extents_overflow = parsed new_extents = self.resolve_extents_from_overflow( HFS_EXTENTS_CNID, HFS_DATA_FORK, initial, size, ) if new_extents == extents: break extents = new_extents return self.extents_overflow
[docs] def catalog_leaf_records(self): data = self.read_fork( HFS_CATALOG_CNID, HFS_DATA_FORK, self.mdb['catalog_file_extents'], self.mdb['catalog_file_size'], ) return HfsBTree(data).leaf_records()
[docs] def parse_catalog_key(self, rec): if len(rec) < 7: return None key_len = rec[0] if key_len == 0: return None # HFS B-tree keys use a one-byte key length that does not include itself. # The following catalog record data is word-aligned. # # This is the important bug fix: # key_len even -> 1 + key_len is odd, so there is one pad byte. # key_len odd -> 1 + key_len is already even. data_offset = (key_len | 1) + 1 if len(rec) < data_offset: return None parent_id = unpack('>I', rec[2 : 6])[0] name_len = rec[6] # HFS names are Str31 values. if name_len > 31: return None # Make sure the declared name fits inside the key area, excluding padding. if 7 + name_len > 1 + key_len: return None name = self.decode_text(rec[7:7 + name_len]).replace('/', ':') return { 'key_len': key_len, 'data_offset': data_offset, 'parent_id': parent_id, 'name': name, }
[docs] def parse_catalog_record(self, rec): key = self.parse_catalog_key(rec) if key is None or len(rec) <= key['data_offset']: return None data = rec[key['data_offset']:] record_type = data[0] if record_type == 1 and len(data) >= 70: # directory record return { 'kind': 'directory', 'name': key['name'], 'parent_id': key['parent_id'], 'cnid': unpack('>I', data[6 : 10])[0], 'created': HfsFS.get_mac_time(unpack('>I', data[10 : 14])[0]), 'modified': HfsFS.get_mac_time(unpack('>I', data[14 : 18])[0]), } if record_type == 2 and len(data) >= 102: # file record return { 'kind': 'file', 'name': key['name'], 'parent_id': key['parent_id'], 'cnid': unpack('>I', data[20 : 24])[0], 'created': HfsFS.get_mac_time(unpack('>I', data[44 : 48])[0]), 'modified': HfsFS.get_mac_time(unpack('>I', data[48 : 52])[0]), # Data fork only, to match your IsoFS iterator shape. 'data_length': unpack('>I', data[26 : 30])[0], 'data_extents': self.parse_extents(data[74:86]), # Parsed, but not yielded. 'resource_length': unpack('>I', data[36 : 40])[0], 'resource_extents': self.parse_extents(data[86:98]), } return None
[docs] def load_catalog_records(self): if self.catalog_records is not None: return self.catalog_records records = list() for raw in self.catalog_leaf_records(): parsed = self.parse_catalog_record(raw) if parsed is not None and parsed.get('name'): records.append(parsed) self.catalog_records = records return records
def __iter__(self): # map parent directory IDs to children records = self.load_catalog_records() children = dict() directories = dict() for rec in records: if rec['parent_id'] not in children: children[rec['parent_id']] = list() children[rec['parent_id']].append(rec) if rec['kind'] == 'directory': directories[rec['cnid']] = rec # perform search starting from root directory to_visit = [(Path(''), HFS_ROOT_CNID)] while len(to_visit) != 0: curr_path, curr_id = to_visit.pop() # handle current directory if curr_path != Path(''): curr_directory_entry = directories[curr_id] yield (curr_path, curr_directory_entry['modified'], None) # load children of current directory for next_entry in children.get(curr_id, []): next_entry_fn = next_entry['name'] next_path = curr_path / next_entry_fn # next entry is a directory: add it to `to_visit` if next_entry['kind'] == 'directory': to_visit.append((next_path, next_entry['cnid'])) # next entry is a file: read and yield it elif next_entry['kind'] == 'file': next_data = self.read_fork( next_entry['cnid'], HFS_DATA_FORK, next_entry['data_extents'], next_entry['data_length'], ) yield (next_path, next_entry['modified'], next_data)