Source code for malduck.extractor.extract_manager
import json
import logging
import os
from .extractor import Extractor
from .loaders import load_modules
from ..py2compat import binary_type
from ..yara import Yara
log = logging.getLogger(__name__)
__all__ = ["ExtractManager", "ExtractorModules"]
def is_config_better(base_config, new_config):
"""
Checks whether new config looks more reliable than base.
Currently just checking the amount of non-empty keys.
"""
base = [(k, v) for k, v in base_config.items() if v]
new = [(k, v) for k, v in new_config.items() if v]
return len(new) > len(base)
def encode_for_json(data):
if isinstance(data, binary_type):
return data.decode("utf-8")
elif isinstance(data, list) or isinstance(data, tuple):
return [encode_for_json(item) for item in data]
elif isinstance(data, dict):
return {key: encode_for_json(value) for key, value in data.items()}
else:
return data
def sanitize_config(config):
"""
Sanitize static configuration by removing empty strings/collections
:param config: Configuration to sanitize
:return: Sanitized configuration
"""
return {k: v for k, v in config.items() if v in [0, False] or v}
def merge_configs(base_config, new_config):
"""
Merge static configurations.
Used internally. Removes "family" key from the result, which is set explicitly by ExtractManager.push_config
:param base_config: Base configuration
:param new_config: Changes to apply
:return: Merged configuration
"""
config = dict(base_config)
for k, v in new_config.items():
if k == "family":
continue
if k not in config:
config[k] = v
elif config[k] == v:
continue
elif isinstance(config[k], list):
for el in v:
if el not in config[k]:
config[k] = config[k] + [el]
else:
raise RuntimeError(
"Extractor tries to override '{old_value}' "
"value of '{key}' with '{new_value}'".format(
key=k, old_value=config[k], new_value=v
)
)
return config
[docs]class ExtractorModules(object):
"""
Configuration object with loaded Extractor modules for ExtractManager
:param modules_path: Path with module files (Extractor classes and Yara files, default '~/.malduck')
:type modules_path: str
"""
def __init__(self, modules_path=None):
if modules_path is None:
modules_path = os.path.join(os.path.expanduser("~"), ".malduck")
if not os.path.exists(modules_path):
os.makedirs(modules_path)
# Load Yara rules
self.rules = Yara.from_dir(modules_path)
# Preload modules
load_modules(modules_path, onerror=self.on_error)
self.extractors = Extractor.__subclasses__()
[docs] def on_error(self, exc, module_name):
"""
Handler for all Exception's throwed during module load
Override this method if you want to set your own error handler.
:param exc: Exception object
:type exc: :class:`Exception`
:param module_name: Name of module which throwed exception
:type module_name: str
"""
log.warning("{} not loaded: {}".format(module_name, exc))
[docs]class ExtractManager(object):
"""
Multi-dump extraction context. Handles merging configs from different dumps, additional dropped families etc.
:param modules: Object with loaded extractor modules
:type modules: :class:`ExtractorModules`
"""
def __init__(self, modules):
self.modules = modules
self.configs = {}
@property
def rules(self):
"""
Bound Yara rules
:rtype: :class:`malduck.yara.Yara`
"""
return self.modules.rules
@property
def extractors(self):
"""
Bound extractor modules
:rtype: List[Type[:class:`malduck.extractor.Extractor`]]
"""
return self.modules.extractors
[docs] def on_error(self, exc, extractor):
"""
Handler for all Exception's thrown by :py:meth:`Extractor.handle_yara`.
.. deprecated:: 2.1.0
Look at :py:meth:`ExtractManager.on_extractor_error` instead.
:param exc: Exception object
:type exc: :class:`Exception`
:param extractor: Extractor object which throwed exception
:type extractor: :class:`malduck.extractor.Extractor`
"""
self.on_extractor_error(exc, extractor, "handle_yara")
[docs] def on_extractor_error(self, exc, extractor, method_name):
"""
Handler for all Exception's thrown by extractor methods (including :py:meth:`Extractor.handle_yara`).
Override this method if you want to set your own error handler.
:param exc: Exception object
:type exc: :class:`Exception`
:param extractor: Extractor instance
:type extractor: :class:`extractor.Extractor`
:param method_name: Name of method which throwed exception
:type method_name: str
"""
import traceback
log.warning(
"{}.{} throwed exception: {}".format(
extractor.__class__.__name__, method_name, traceback.format_exc()
)
)
[docs] def push_file(self, filepath, base=0):
"""
Pushes file for extraction. Config extractor entrypoint.
:param filepath: Path to extracted file
:type filepath: str
:param base: Memory dump base address
:type base: int
:return: Family name if ripped successfully and provided better configuration than previous files.
Returns None otherwise.
"""
from ..procmem import ProcessMemory
log.debug("Started extraction of file {}:{:x}".format(filepath, base))
with ProcessMemory.from_file(filepath, base=base) as p:
return self.push_procmem(p, rip_binaries=True)
def push_config(self, family, config):
config["family"] = family
if family not in self.configs:
self.configs[family] = config
return family
else:
base_config = self.configs[family]
if is_config_better(base_config, config):
log.debug("Config looks better")
self.configs[family] = config
return family
else:
log.debug("Config doesn't look better - ignoring.")
[docs] def push_procmem(self, p, rip_binaries=False):
"""
Pushes ProcessMemory object for extraction
:param p: ProcessMemory object
:type p: :class:`malduck.procmem.ProcessMemory`
:param rip_binaries: Look for binaries (PE, ELF) in provided ProcessMemory and try to perform extraction using
specialized variants (ProcessMemoryPE, ProcessMemoryELF)
:type rip_binaries: bool (default: False)
:return: Family name if ripped successfully and provided better configuration than previous procmems.
Returns None otherwise.
"""
from ..procmem import ProcessMemoryPE, ProcessMemoryELF
from ..procmem.binmem import ProcessMemoryBinary
matches = p.yarav(self.rules)
if not matches:
log.debug("No Yara matches.")
return
binaries = [p]
if rip_binaries:
binaries += list(ProcessMemoryPE.load_binaries_from_memory(p)) + list(
ProcessMemoryELF.load_binaries_from_memory(p)
)
def fmt_procmem(p):
return "{}:{}:{:x}".format(
p.__class__.__name__,
"IMG" if getattr(p, "is_image", False) else "DMP",
p.imgbase,
)
def extract_config(procmem):
log.debug("{} - ripping...".format(fmt_procmem(procmem)))
extractor = ProcmemExtractManager(self)
matches.remap(procmem.p2v)
extractor.push_procmem(procmem, _matches=matches)
if extractor.family:
log.debug(
"{} - found {}!".format(fmt_procmem(procmem), extractor.family)
)
return self.push_config(extractor.family, extractor.config)
else:
log.debug("{} - No luck.".format(fmt_procmem(procmem)))
# 'list()' for prettier logs
log.debug("Matched rules: {}".format(list(matches.keys())))
ripped_family = None
for binary in binaries:
found_family = extract_config(binary)
if found_family is not None:
ripped_family = found_family
if isinstance(binary, ProcessMemoryBinary) and binary.image is not None:
found_family = extract_config(binary.image)
if found_family is not None:
ripped_family = found_family
return ripped_family
@property
def config(self):
"""
Extracted configuration (list of configs for each extracted family)
"""
return [config for family, config in self.configs.items()]
[docs]class ProcmemExtractManager(object):
"""
Single-dump extraction context (single family)
"""
def __init__(self, parent):
#: Collected configuration so far (especially useful for "final" extractors)
self.collected_config = {}
self.globals = {}
self.parent = parent #: Bound ExtractManager instance
self.family = None #: Matched family
[docs] def on_extractor_error(self, exc, extractor, method_name):
"""
Handler for all Exception's throwed by extractor methods.
:param exc: Exception object
:type exc: :class:`Exception`
:param extractor: Extractor instance
:type extractor: :class:`extractor.Extractor`
:param method_name: Name of method which throwed exception
:type method_name: str
"""
self.parent.on_extractor_error(exc, extractor, method_name)
[docs] def push_procmem(self, p, _matches=None):
"""
Pushes ProcessMemory object for extraction
:param p: ProcessMemory object
:type p: :class:`malduck.procmem.ProcessMemory`
:param _matches: YaraMatches object (used internally)
:type _matches: :class:`malduck.yara.YaraMatches`
"""
matches = _matches or p.yarav(self.parent.rules)
# For each extractor...
for ext_class in self.parent.extractors:
extractor = ext_class(self)
# For each rule identifier in extractor.yara_rules...
for rule in extractor.yara_rules:
if rule in matches:
try:
extractor.handle_yara(p, matches[rule])
except Exception as exc:
self.parent.on_error(exc, extractor)
[docs] def push_config(self, config, extractor):
"""
Pushes new partial config
If strong config provides different family than stored so far
and that family overrides stored family - set stored family
Example: citadel overrides zeus
:param config: Partial config object
:type config: dict
:param extractor: Extractor object reference
:type extractor: :class:`malduck.extractor.Extractor`
"""
config = encode_for_json(config)
try:
json.dumps(config)
except (TypeError, OverflowError) as e:
log.debug(
"Config is not JSON-encodable ({}): {}".format(str(e), repr(config))
)
raise RuntimeError("Config must be JSON-encodable")
config = sanitize_config(config)
if not config:
return
log.debug(
"%s found the following config parts: %s",
extractor.__class__.__name__,
sorted(config.keys()),
)
self.collected_config = merge_configs(self.collected_config, config)
if "family" in config and (
not self.family
or (self.family != extractor.family and self.family in extractor.overrides)
):
self.family = config["family"]
log.debug("%s tells it's %s", extractor.__class__.__name__, self.family)
@property
def config(self):
"""
Returns collected config, but if family is not matched - returns empty dict.
Family is not included in config itself, look at :py:attr:`ProcmemExtractManager.family`.
"""
if self.family is None:
return {}
return self.collected_config