Source code for malduck.extractor.extract_manager
import json
import logging
import warnings
from typing import Any, Dict, List, Optional, Type
from ..procmem import ProcessMemory, ProcessMemoryELF, ProcessMemoryPE
from ..procmem.binmem import ProcessMemoryBinary
from ..yara import Yara, YaraRuleOffsets, YaraRulesetMatch
from .config_utils import (
Config,
apply_config_part,
encode_for_json,
is_config_better,
sanitize_config,
)
from .extractor import Extractor
from .modules import ExtractorModules
log = logging.getLogger(__name__)
__all__ = ["ExtractManager"]
[docs]class ExtractManager:
"""
Multi-dump extraction context. Handles merging configs from different dumps, additional dropped families etc.
:param modules: Object with loaded extractor modules
:type modules: :class:`ExtractorModules`
"""
def __init__(self, modules: ExtractorModules) -> None:
self.modules = modules
self.binary_classes: List[Type[ProcessMemoryBinary]] = [
ProcessMemoryPE,
ProcessMemoryELF,
]
self.configs: Dict[str, Config] = {}
@property
def rules(self) -> Yara:
"""
Bound Yara rules
:rtype: :class:`malduck.yara.Yara`
"""
return self.modules.rules
@property
def extractors(self) -> List[Type[Extractor]]:
"""
Bound extractor modules
:rtype: List[Type[:class:`malduck.extractor.Extractor`]]
"""
return self.modules.extractors
[docs] def on_error(self, exc: Exception, extractor: Extractor) -> None:
"""
Handler for all exceptions raised by :py:meth:`Extractor.handle_yara`.
.. deprecated:: 2.1.0
Look at :py:meth:`ExtractManager.on_extractor_error` instead.
:param exc: Exception object
:type exc: :class:`Exception`
:param extractor: Extractor object which raised the exception
:type extractor: :class:`malduck.extractor.Extractor`
"""
self.on_extractor_error(exc, extractor, "handle_yara")
[docs] def on_extractor_error(
self, exc: Exception, extractor: Extractor, method_name: str
) -> None:
"""
Handler for all exceptions raised by extractor methods (including :py:meth:`Extractor.handle_yara`).
Override this method if you want to set your own error handler.
:param exc: Exception object
:type exc: :class:`Exception`
:param extractor: Extractor instance
:type extractor: :class:`extractor.Extractor`
:param method_name: Name of method which raised the exception
:type method_name: str
"""
import traceback
log.warning(
"%s.%s raised an exception: %s",
extractor.__class__.__name__,
method_name,
traceback.format_exc(),
)
[docs] def push_file(self, filepath: str, base: int = 0) -> Optional[str]:
"""
Pushes file for extraction. Config extractor entrypoint.
:param filepath: Path to extracted file
:type filepath: str
:param base: Memory dump base address
:type base: int
:return: Detected family if configuration looks better than already stored one
"""
log.debug("Started extraction of file %s:%x", filepath, base)
with ProcessMemory.from_file(filepath, base=base) as p:
return self.push_procmem(p, rip_binaries=True)
[docs] def match_procmem(self, p: ProcessMemory) -> YaraRulesetMatch:
"""
Performs Yara matching on ProcessMemory using modules
bound with current ExtractManager.
"""
matches = p.yarap(self.rules, extended=True)
log.debug("Matched rules: %s", ",".join(list(matches.keys())))
return matches
[docs] def carve_procmem(self, p: ProcessMemory) -> List[ProcessMemoryBinary]:
"""
Carves binaries from ProcessMemory to try configuration extraction
using every possible address mapping.
"""
binaries = []
for binclass in self.binary_classes:
carved_bins = list(binclass.load_binaries_from_memory(p))
for carved_bin in carved_bins:
log.debug(
f"carve: Found {carved_bin.__class__.__name__} "
f"at offset {carved_bin.regions[0].offset}"
)
binaries += carved_bins
return binaries
def push_config(self, config: Config) -> bool:
if not config.get("family"):
return False
family = config["family"]
if family in self.configs:
if is_config_better(base_config=self.configs[family], new_config=config):
self.configs[family] = config
log.debug("%s config looks better than previous one", family)
return True
else:
log.debug("%s config doesn't look better than previous one", family)
return False
if family in self.modules.override_paths:
# 'citadel' > 'zeus'
# If 'zeus' appears but we have already 'citadel', we should ignore 'zeus'
# Otherwise we should get 'citadel' instead of 'zeus'
for stored_family in self.configs.keys():
if stored_family == family:
continue
score = self.modules.compare_family_overrides(family, stored_family)
if score == -1:
del self.configs[stored_family]
self.configs[family] = config
log.debug(
"%s config looks better (overrides %s)", family, stored_family
)
return True
elif score == 1:
log.debug(
"%s config doesn't look better than previous one (overridden by %s)",
family,
stored_family,
)
return False
log.debug("New %s config collected", family)
self.configs[family] = config
return True
def _extract_procmem(self, p: ProcessMemory, matches) -> Optional[str]:
log.debug("%s - ripping...", repr(p))
# Create extraction context for single file
manager = ExtractionContext(parent=self)
# Map offset matches to VA using procmem address mapping
va_matches = matches.remap(p.p2v)
# Push ProcessMemory for extraction with mapped Yara matches
manager.push_procmem(p, _matches=va_matches)
# Get final configurations
config = manager.collected_config
if config.get("family"):
log.debug("%s - found %s!", repr(p), config.get("family"))
if self.push_config(config):
return config["family"]
else:
return None
else:
log.debug("%s - no luck.", repr(p))
return None
[docs] def push_procmem(
self, p: ProcessMemory, rip_binaries: bool = False
) -> Optional[str]:
"""
Pushes ProcessMemory object for extraction
:param p: ProcessMemory object
:type p: :class:`malduck.procmem.ProcessMemory`
:param rip_binaries: Look for binaries (PE, ELF) in provided ProcessMemory and try to perform extraction using
specialized variants (ProcessMemoryPE, ProcessMemoryELF)
:type rip_binaries: bool (default: False)
:return: Detected family if configuration looks better than already stored one
"""
matches = self.match_procmem(p)
if not matches:
log.debug("No Yara matches.")
return None
binaries = self.carve_procmem(p) if rip_binaries else []
family = self._extract_procmem(p, matches)
for binary in binaries:
family = self._extract_procmem(binary, matches) or family
return family
@property
def config(self) -> List[Config]:
"""
Extracted configuration (list of configs for each extracted family)
"""
return [config for family, config in self.configs.items()]
[docs]class ExtractionContext:
"""
Single-dump extraction context (single family)
"""
def __init__(self, parent: ExtractManager) -> None:
#: Collected configuration so far (especially useful for "final" extractors)
self.collected_config: Config = {}
self.globals: Dict[str, Any] = {}
self.parent = parent #: Bound ExtractManager instance
@property
def family(self) -> Optional[str]:
"""Matched family"""
return self.collected_config.get("family")
[docs] def on_extractor_error(
self, exc: Exception, extractor: Extractor, method_name: str
) -> None:
"""
Handler for all exceptions raised by extractor methods.
:param exc: Exception object
:type exc: :class:`Exception`
:param extractor: Extractor instance
:type extractor: :class:`extractor.Extractor`
:param method_name: Name of method which raised the exception
:type method_name: str
"""
self.parent.on_extractor_error(exc, extractor, method_name)
[docs] def push_procmem(
self, p: ProcessMemory, _matches: Optional[YaraRulesetMatch] = None
) -> None:
"""
Pushes ProcessMemory object for extraction
:param p: ProcessMemory object
:type p: :class:`malduck.procmem.ProcessMemory`
:param _matches: YaraRulesetMatch object (used internally)
:type _matches: :class:`malduck.yara.YaraRulesetMatch`
"""
matches = _matches or p.yarav(self.parent.rules, extended=True)
# For each extractor...
for ext_class in self.parent.extractors:
extractor = ext_class(self)
if type(extractor.yara_rules) is str:
raise TypeError(
f'"{extractor.__class__.__name__}.yara_rules" cannot be a string, convert it into a list of strings'
)
# For each rule identifier in extractor.yara_rules...
for rule in extractor.yara_rules:
if rule in matches:
try:
if hasattr(extractor, "handle_yara"):
warnings.warn(
"Extractor.handle_yara is deprecated, use Extractor.handle_match",
DeprecationWarning,
)
getattr(extractor, "handle_yara")(
p, YaraRuleOffsets(matches[rule])
)
else:
extractor.handle_match(p, matches[rule])
except Exception as exc:
self.parent.on_error(exc, extractor)
[docs] def push_config(self, config: Config, extractor: Extractor) -> None:
"""
Pushes new partial config
If strong config provides different family than stored so far
and that family overrides stored family - set stored family
Example: citadel overrides zeus
:param config: Partial config object
:type config: dict
:param extractor: Extractor object reference
:type extractor: :class:`malduck.extractor.Extractor`
"""
config = encode_for_json(config)
try:
json.dumps(config)
except (TypeError, OverflowError) as e:
log.debug("Config is not JSON-encodable (%s): %s", str(e), repr(config))
raise RuntimeError("Config must be JSON-encodable")
config = sanitize_config(config)
if not config:
return
log.debug(
"%s found the following config parts: %s",
extractor.__class__.__name__,
sorted(config.keys()),
)
if "family" in config:
log.debug(
"%s tells it's %s", extractor.__class__.__name__, config["family"]
)
if (
"family" in self.collected_config
and self.collected_config["family"] != config["family"]
):
overrides = self.parent.modules.compare_family_overrides(
config["family"], self.collected_config["family"]
)
if not overrides:
raise RuntimeError(
f"Ripped both {self.collected_config['family']} and {config['family']} "
f"from the same ProcessMemory which is not expected"
)
if overrides == -1:
self.collected_config["family"] = config["family"]
else:
config["family"] = self.collected_config["family"]
self.collected_config = apply_config_part(self.collected_config, config)
@property
def config(self) -> Config:
"""
Returns collected config, but if family is not matched - returns empty dict.
Family is not included in config itself, look at :py:attr:`ProcmemExtractManager.family`.
"""
if self.family is None:
return {}
return self.collected_config