Source code for malduck.extractor.extractor

import functools
import logging

from ..procmem import ProcessMemoryPE, ProcessMemoryELF

from ..py2compat import add_metaclass

log = logging.getLogger(__name__)

__all__ = ["Extractor"]


[docs]class MetaExtractor(type): """ Metaclass for Extractor. Handles proper registration of decorated extraction methods """ def __new__(cls, name, bases, attrs): """ Collect ext_yara_string and ext_final methods """ klass = type.__new__(cls, name, bases, attrs) klass.extractor_methods = dict(getattr(klass, "extractor_methods", {})) klass.final_methods = list(getattr(klass, "final_methods", [])) if type(getattr(klass, "yara_rules")) not in (list, tuple): raise TypeError( "'yara_rules' field must be 'list' or 'tuple' in {}".format(str(name)) ) for name, method in attrs.items(): if isinstance(method, ExtractorMethod): if method.final: klass.final_methods.append(name) else: if method.yara_string in klass.extractor_methods: raise TypeError( "There can be only one extractor method " 'for "{}" string'.format(method.yara_string) ) klass.extractor_methods[method.yara_string] = name return klass
class ExtractorMethod(object): """ Represents registered extractor method """ def __init__(self, method): self.method = method self.weak = False self.needs_exec = None self.final = False self.yara_string = method.__name__ functools.update_wrapper(self, method) def __call__(self, extractor, *args, **kwargs): # Get config from extractor method config = self.method(extractor, *args, **kwargs) if not config: return # If method returns True - family matched (for non-weak methods) if config is True: config = {} # If method is "strong" - "family" key will be automatically added if not self.weak and extractor.family and "family" not in config: config["family"] = extractor.family # If config is not empty - push it if config: extractor.push_config(config)
[docs]class ExtractorBase(object): family = None #: Extracted malware family, automatically added to "family" key for strong extraction methods overrides = [] #: Family match overrides another match e.g. citadel overrides zeus def __init__(self, parent): self.parent = parent #: ProcmemExtractManager instance
[docs] def push_procmem(self, procmem, **info): """ Push procmem object for further analysis :param procmem: ProcessMemory object :type procmem: :class:`malduck.procmem.ProcessMemory` :param info: Additional info about object """ return self.parent.push_procmem(procmem, **info)
[docs] def push_config(self, config): """ Push partial config (used by :py:meth:`Extractor.handle_yara`) :param config: Partial config element :type config: dict """ return self.parent.push_config(config, self)
@property def matched(self): """ Returns True if family has been matched so far :rtype: bool """ return self.parent.family is not None @property def collected_config(self): """ Shows collected config so far (useful in "final" extractors) :rtype: dict """ return self.parent.collected_config @property def globals(self): """ Container for global variables associated with analysis :rtype: dict """ return self.parent.globals @property def log(self): """ Logger instance for Extractor methods :return: :class:`logging.Logger` """ return logging.getLogger( "{}.{}".format( # should be malduck.extractor.modules (see # malduck.extractor.loaders) self.__class__.__module__, self.__class__.__name__, ) )
[docs]@add_metaclass(MetaExtractor) class Extractor(ExtractorBase): """ Base class for extractor modules Following parameters need to be defined: * :py:attr:`family` (see :py:attr:`extractor.ExtractorBase.family`) * :py:attr:`yara_rules` * :py:attr:`overrides` (optional, see :py:attr:`extractor.ExtractorBase.overrides`) Example extractor code for Citadel: .. code-block:: Python from ripper import Extractor class Citadel(Extractor): family = "citadel" yara_rules = ["citadel"] overrides = ["zeus"] @Extractor.extractor("briankerbs") def citadel_found(self, p, addr): log.info('[+] `Coded by Brian Krebs` str @ %X' % addr) return True @Extractor.extractor def cit_login(self, p, addr): log.info('[+] Found login_key xor @ %X' % addr) hit = p.uint32v(addr + 4) print(hex(hit)) if p.is_addr(hit): return {'login_key': p.asciiz(hit)} hit = p.uint32v(addr + 5) print(hex(hit)) if p.is_addr(hit): return {'login_key': p.asciiz(hit)} .. py:decoratormethod:: Extractor.extractor Decorator for string-based extractor methods. Method is called each time when string with the same identifier as method name has matched Extractor can be called for many number-suffixed strings e.g. `$keyex1` and `$keyex2` will call `keyex` method. .. py:decoratormethod:: Extractor.extractor(string_or_method, final=False) Specialized `@extractor` variant :param string_or_method: If method name doesn't match the string identifier pass yara string identifier as decorator argument :type string_or_method: str :param final: Extractor will be called whenever Yara rule has been matched, but always after string-based extractors :type final: bool .. py:decoratormethod:: Extractor.final Decorator for final extractors, called after regular extraction methods. .. code-block:: Python from ripper import Extractor class Evil(Extractor): yara_rules = ["evil"] family = "evil" ... @Extractor.needs_pe @Extractor.final def get_config(self, p): cfg = {"urls": self.get_cncs_from_rsrc(p)} if "role" not in self.collected_config: cfg["role"] = "loader" return cfg .. py:decoratormethod:: Extractor.weak Use this decorator for extractors when successful extraction is not sufficient to mark family as matched. All "weak configs" will be flushed when "strong config" appears. .. py:decoratormethod:: Extractor.needs_pe Use this decorator for extractors that need PE instance. (:class:`malduck.procmem.ProcessMemoryPE`) .. py:decoratormethod:: Extractor.needs_elf Use this decorator for extractors that need ELF instance. (:class:`malduck.procmem.ProcessMemoryELF`) """ yara_rules = () #: Names of Yara rules for which handle_yara is called
[docs] def on_error(self, exc, method_name): """ Handler for all Exception's throwed by extractor methods. :param exc: Exception object :type exc: :class:`Exception` :param method_name: Name of method which throwed exception :type method_name: str """ self.parent.on_extractor_error(exc, self, method_name)
[docs] def handle_yara(self, p, match): """ Override this if you don't want to use decorators and customize ripping process (e.g. yara-independent, brute-force techniques) :param p: ProcessMemory object :type p: :class:`malduck.procmem.ProcessMemory` :param match: Found yara matches for this family :type match: List[:class:`malduck.yara.YaraMatch`] """ # Call string-based extractors for identifier, method_name in self.extractor_methods.items(): if identifier not in match: continue method = getattr(self, method_name) for va in match[identifier]: try: if method.needs_exec and not isinstance(p, method.needs_exec): log.debug( "Omitting %s.%s for %s@%x - %s is not %s", self.__class__.__name__, method_name, identifier, va, p.__class__.__name__, method.needs_exec.__name__, ) continue log.debug( "Trying %s.%s for %s@%x", self.__class__.__name__, method_name, identifier, va, ) method(self, p, va) except Exception as exc: self.on_error(exc, method_name) # Call final extractors for method_name in self.final_methods: method = getattr(self, method_name) if method.needs_exec and not isinstance(p, method.needs_exec): log.debug( "Omitting %s.%s (final) - %s is not %s", self.__class__.__name__, method_name, p.__class__.__name__, method.needs_exec.__name__, ) continue log.debug("Trying %s.%s (final)", self.__class__.__name__, method_name) try: method(self, p) except Exception as exc: self.on_error(exc, method_name)
# Extractor method decorators
[docs] @staticmethod def needs_pe(method): method = Extractor._extractor_method(method) method.needs_exec = ProcessMemoryPE return method
[docs] @staticmethod def needs_elf(method): method = Extractor._extractor_method(method) method.needs_exec = ProcessMemoryELF return method
[docs] @staticmethod def weak(method): method = Extractor._extractor_method(method) method.weak = True return method
[docs] @staticmethod def extractor(string_or_method=None, final=False): if final and string_or_method: raise ValueError("String identifier is unnecessary for final methods") def extractor_wrapper(method): extractor_method = Extractor._extractor_method(method) # If there is string provided, use it as yara_string if string_or_method and not callable(string_or_method): extractor_method.yara_string = string_or_method extractor_method.final = final return extractor_method if callable(string_or_method): return extractor_wrapper(string_or_method) else: return extractor_wrapper
[docs] @staticmethod def final(method): return Extractor.extractor(final=True)(method)
# Internals @staticmethod def _extractor_method(method): # Check whether method is already wrapped by ExtractorMethod if isinstance(method, ExtractorMethod): return method else: return ExtractorMethod(method)