Source code for malduck.pe

# Copyright (C) 2018 Jurriaan Bremer.
# This file is part of Roach - https://github.com/jbremer/roach.
# See the file 'docs/LICENSE.txt' for copying permission.

from typing import TYPE_CHECKING, Any, Iterator, Optional, Tuple, Union

import pefile

if TYPE_CHECKING:
    from .procmem import ProcessMemory

__all__ = ["pe", "PE", "MemoryPEData"]


class MemoryPEData:
    """
    `pefile.PE.__data__` represents image file usually aligned to 512 bytes.
    MemoryPEData perform mapping from pefile's offset-access to Memory object va-access
    based on section layout.
    """

    def __init__(self, memory: "ProcessMemory", fast_load: bool) -> None:
        self.memory = memory
        # Preload headers
        self.pe = pefile.PE(data=self, fast_load=True)
        # Perform full_load if needed
        if not fast_load:
            self.pe.full_load()

    def map_offset(self, offs: int) -> int:
        if not hasattr(self, "pe") or not self.pe.sections:
            return self.memory.imgbase + offs
        return self.memory.imgbase + (self.pe.get_rva_from_offset(offs) or offs)

    def __len__(self) -> int:
        return (
            self.memory.regions[-1].addr
            - self.memory.regions[0].addr
            + self.memory.regions[-1].size
        )

    def __getitem__(self, item: Any) -> object:
        if isinstance(item, slice):
            start = self.map_offset(item.start or 0)
            stop = self.map_offset(item.stop - 1)
        else:
            start = self.map_offset(item)
            stop = start
        return self.memory.readv(start, stop - start + 1)

    def find(self, str: bytes, beg: int = 0, end: Optional[int] = None) -> int:
        if end and beg >= end:
            return -1
        try:
            return next(
                self.memory.regexv(str, self.memory.imgbase + beg, end and end - beg)
            )
        except StopIteration:
            return -1


[docs]class PE(object): """ Wrapper around :class:`pefile.PE`, accepts either bytes (raw file contents) or :class:`ProcessMemory` instance. """ def __init__( self, data: Union["ProcessMemory", bytes], fast_load: bool = False ) -> None: from .procmem import ProcessMemory if isinstance(data, ProcessMemory): self.pe = MemoryPEData(data, fast_load).pe else: self.pe = pefile.PE(data=data, fast_load=fast_load) @property def data(self) -> bytes: return self.pe.__data__ @property def dos_header(self) -> Any: """Dos header""" return self.pe.DOS_HEADER @property def nt_headers(self) -> Any: """NT headers""" return self.pe.NT_HEADERS @property def file_header(self) -> Any: """File header""" return self.pe.FILE_HEADER @property def optional_header(self) -> Any: """Optional header""" return self.pe.OPTIONAL_HEADER @property def sections(self) -> list: """Sections""" return self.pe.sections @property def is32bit(self) -> Any: """ Is it 32-bit file (PE)? """ return self.optional_header.Magic == pefile.OPTIONAL_HEADER_MAGIC_PE @property def is64bit(self) -> Any: """ Is it 64-bit file (PE+)? """ return self.optional_header.Magic == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS @property def headers_size(self) -> int: """ Estimated size of PE headers (first section offset). If there are no sections: returns 0x1000 or size of input if provided data are shorter than single page """ return ( self.sections[0].PointerToRawData if self.sections else min(len(self.pe.__data__), 0x1000) )
[docs] def section(self, name: Union[str, bytes]) -> Any: """ Get section by name :param name: Section name :type name: str or bytes """ if isinstance(name, str): name = name.encode() for section in self.pe.sections: if section.Name.rstrip(b"\x00") == name: return section
[docs] def directory(self, name: str) -> Any: """ Get pefile directory entry by identifier :param name: shortened pefile directory entry identifier (e.g. 'IMPORT' for 'IMAGE_DIRECTORY_ENTRY_IMPORT') :rtype: :class:`pefile.Structure` """ return self.optional_header.DATA_DIRECTORY[ pefile.DIRECTORY_ENTRY.get("IMAGE_DIRECTORY_ENTRY_" + name) ]
[docs] def structure(self, rva: int, format: Any) -> Any: """ Get internal pefile Structure from specified rva :param rva: Relative virtual address of structure :param format: :class:`pefile.Structure` format (e.g. :py:attr:`pefile.PE.__IMAGE_LOAD_CONFIG_DIRECTORY64_format__`) :rtype: :class:`pefile.Structure` """ structure = pefile.Structure(format) structure.__unpack__(self.pe.get_data(rva, structure.sizeof())) return structure
[docs] def validate_import_names(self) -> bool: """ Returns True if the first 8 imported library entries have valid library names """ import_dir = self.directory("IMPORT") if not import_dir.VirtualAddress: # There's nothing wrong with no imports return True try: import_rva = import_dir.VirtualAddress # Don't go further than 8 entries for _ in range(8): import_desc = self.structure( import_rva, pefile.PE.__IMAGE_IMPORT_DESCRIPTOR_format__ ) if import_desc.all_zeroes(): # End of import-table break import_dllname = self.pe.get_string_at_rva( import_desc.Name, pefile.MAX_DLL_LENGTH ) if not pefile.is_valid_dos_filename(import_dllname): # Invalid import filename found return False import_rva += import_desc.sizeof() return True except pefile.PEFormatError: return False
[docs] def validate_resources(self) -> bool: """ Returns True if first level of resource tree looks consistent """ resource_dir = self.directory("RESOURCE") if not resource_dir.VirtualAddress: # There's nothing wrong with no resources return True try: resource_rva = resource_dir.VirtualAddress resource_desc = self.structure( resource_rva, pefile.PE.__IMAGE_RESOURCE_DIRECTORY_format__ ) resource_no = ( resource_desc.NumberOfNamedEntries + resource_desc.NumberOfIdEntries ) if not 0 <= resource_no < 128: # Incorrect resource number return False for rsrc_idx in range(resource_no): resource_entry_desc = self.structure( resource_rva + resource_desc.sizeof() + rsrc_idx * 8, pefile.PE.__IMAGE_RESOURCE_DIRECTORY_ENTRY_format__, ) if ( self.pe.get_word_at_rva( resource_rva + resource_entry_desc.OffsetToData & 0x7FFFFFFF ) is None ): return False return True except pefile.PEFormatError: return False
[docs] def validate_padding(self) -> bool: """ Returns True if area between first non-bss section and first 4kB doesn't have only null-bytes """ section_start_offs = None for section in self.sections: if section.SizeOfRawData > 0: section_start_offs = section.PointerToRawData break if section_start_offs is None: # No non-bss sections? Is it real PE file? return False if section_start_offs > 0x1000: # Unusual - try checking last 512 bytes section_start_offs = 0x800 try: data_len = 0x1000 - section_start_offs if not data_len: # Probably fixpe'd - seems to be ok return True return not all( b in [0, "\0"] for b in self.pe.__data__[ section_start_offs : section_start_offs + data_len ] ) except pefile.PEFormatError: return False
def iterate_resources( self, ) -> Iterator[ Tuple[ pefile.ResourceDirEntryData, pefile.ResourceDirEntryData, pefile.ResourceDirEntryData, ] ]: for e1 in self.pe.DIRECTORY_ENTRY_RESOURCE.entries: for e2 in e1.directory.entries: for e3 in e2.directory.entries: yield (e1, e2, e3)
[docs] def resources(self, name: Union[int, str, bytes]) -> Iterator[bytes]: """ Finds resource objects by specified name or type :param name: String name (e2) or type (e1), numeric identifier name (e2) or RT_* type (e1) :type name: int or str or bytes :rtype: Iterator[bytes] """ def name_str(e1, e2, e3): return (e1.name and e1.name.string == name) or ( e2.name and e2.name.string == name ) def name_int(e1, e2, e3): return e2.struct.Name == name def type_int(e1, e2, e3): return e1.id == type_id # Broken PE files will not have this directory and it's better to return no value # than to throw a meaningless exception if not hasattr(self.pe, "DIRECTORY_ENTRY_RESOURCE"): return if isinstance(name, str): name = name.encode() if isinstance(name, bytes): if name.startswith(b"RT_"): compare = type_int type_id = pefile.RESOURCE_TYPE[name.decode()] else: compare = name_str else: compare = name_int for e1, e2, e3 in self.iterate_resources(): if compare(e1, e2, e3): yield self.pe.get_data(e3.data.struct.OffsetToData, e3.data.struct.Size)
[docs] def resource(self, name: Union[int, str, bytes]) -> Optional[bytes]: """ Retrieves single resource by specified name or type :param name: String name (e2) or type (e1), numeric identifier name (e2) or RT_* type (e1) :type name: int or str or bytes :rtype: bytes or None """ try: return next(self.resources(name)) except StopIteration: return None
pe = PE