from __future__ import absolute_import
import json
import logging
import os
import re
import yara
__all__ = ["Yara", "YaraString"]
log = logging.getLogger(__name__)
_YARA_RULE_FORMAT = """
rule {name} {{
strings:
{strings}
condition:
{condition}
}}"""
[docs]class Yara(object):
"""
Represents Yara ruleset. Rules can be compiled from set of files or defined in code (single rule only).
Most simple rule (with default identifiers left):
.. code-block:: python
from malduck.yara import Yara, YaraString
Yara(strings="MALWR").match(data=b"MALWRMALWARMALWR").r.string == [0, 11]
Example of more complex rule defined in Python:
.. code-block:: python
from malduck.yara import Yara, YaraString
ruleset = Yara(name="MalwareRule",
strings={
"xor_stub": YaraString("This program cannot", xor=True, ascii=True),
"code_ref": YaraString("E2 34 ?? C8 A? FB", type=YaraString.HEX),
"mal1": "MALWR",
"mal2": "MALRW"
}, condition="( $xor_stub and $code_ref ) or any of ($mal*)")
# If mal1 or mal2 are matched, they are grouped into "mal"
# Print appropriate offsets
match = ruleset.match(data=b"MALWR MALRW")
if match:
# ["mal1", "mal", "mal2"]
print(match.MalwareRule.keys())
if "mal" in match.MalwareRule:
# Note: Order of offsets for grouped is arbitrary
print("mal*", match.MalwareRule["mal"])
:param rule_paths: Dictionary of {"namespace": "rule_path"}. See also :py:meth:`Yara.from_dir`.
:type rule_paths: dict
:param name: Name of generated rule (default: "r")
:type name: str
:param strings: Dictionary representing set of string patterns ({"string_identifier": YaraString or plain str})
:type strings: dict or str or :class:`YaraString`
:param condition: Yara rule condition (default: "any of them")
:type condition: str
"""
def __init__(
self, rule_paths=None, name="r", strings=None, condition="any of them"
):
if rule_paths:
self.rules = yara.compile(filepaths=rule_paths)
return
if not strings:
raise ValueError("No strings specified")
if isinstance(strings, str) or isinstance(strings, YaraString):
strings = {"string": strings}
yara_strings = "\n ".join(
[
"${key} = {value}".format(
key=key,
value=str(YaraString(value) if isinstance(value, str) else value),
)
for key, value in strings.items()
]
)
yara_source = _YARA_RULE_FORMAT.format(
name=name, strings=yara_strings, condition=condition
)
self.rules = yara.compile(source=yara_source)
[docs] @staticmethod
def from_dir(path, recursive=True, followlinks=True):
"""
Find rules (recursively) in specified path. Supported extensions: *.yar, *.yara
:param path: Root path for searching
:type path: str
:param recursive: Search recursively (default: enabled)
:type recursive: bool
:param followlinks: Follow symbolic links (default: enabled)
:type followlinks: bool
:rtype: :class:`Yara`
"""
rule_paths = {}
for root, _, files in os.walk(path, followlinks=followlinks):
for fname in files:
if not fname.endswith(".yar") and not fname.endswith(".yara"):
continue
ruleset_name = os.path.splitext(os.path.basename(fname))[0]
ruleset_path = os.path.join(root, fname)
if ruleset_name in rule_paths:
log.warning(
"Yara file name collision - {} overridden by {}".format(
rule_paths[ruleset_name], ruleset_path
)
)
rule_paths[ruleset_name] = ruleset_path
if not recursive:
break
return Yara(rule_paths=rule_paths)
[docs] def match(self, offset_mapper=None, **kwargs):
"""
Perform matching on file or data block
:param filepath: Path to the file to be scanned
:type filepath: str
:param data: Data to be scanned
:type data: str
:param offset_mapper: Offset mapping function. For unmapped region, should returned None.
Used by :py:meth:`malduck.procmem.ProcessMemory.yarav`
:type offset_mapper: function
:rtype: :class:`YaraMatches`
"""
return YaraMatches(self.rules.match(**kwargs), offset_mapper=offset_mapper)
[docs]class YaraString(object):
"""
Formatter for Yara string patterns
:param value: Pattern value
:type value: str
:param type: Pattern type (default is :py:attr:`YaraString.TEXT`)
:type type: :py:attr:`YaraString.TEXT` / :py:attr:`YaraString.HEX` / :py:attr:`YaraString.REGEX`
:param modifiers: Yara string modifier flags
"""
TEXT = 0 #: Text string ( `'value' => '"value"'` )
HEX = 1 #: Hexadecimal string ( `"aa bb cc dd" => '{ aa bb cc dd }'` )
REGEX = 2 #: Regex string ( `'value' => '/value/'` )
def __init__(self, value, type=TEXT, **modifiers):
self.value = value
self.type = type
self.modifiers = [k for k, v in modifiers.items() if v is True]
def __str__(self):
if self.type == YaraString.TEXT:
str_value = json.dumps(self.value)
elif self.type == YaraString.HEX:
str_value = "{{ {} }}".format(self.value)
elif self.type == YaraString.REGEX:
str_value = "/{}/".format("\\/".join(self.value.split("/")))
else:
raise ValueError("Unknown YaraString type: {}".format(self.type))
return str_value + "".join([" " + modifier for modifier in self.modifiers])
[docs]class YaraMatches(object):
"""
Represented matching results. Returned by :py:meth:`Yara.match`.
Rules can be referenced by both attribute and index.
"""
def __init__(self, match_results, offset_mapper=None):
self.match_results = match_results
self.matched_rules = {}
self.remap(offset_mapper)
def remap(self, offset_mapper=None):
self.matched_rules = {}
for match in self.match_results:
yara_match = YaraMatch(match, offset_mapper=offset_mapper)
if yara_match:
self.matched_rules[match.rule] = yara_match
[docs] def keys(self):
"""List of matched rule identifiers"""
return self.matched_rules.keys()
def __bool__(self):
return bool(self.matched_rules)
def __nonzero__(self):
return self.__bool__()
def __contains__(self, item):
return item in self.matched_rules
def __getitem__(self, item):
return self.matched_rules[item]
def __getattr__(self, item):
try:
return self[item]
except IndexError:
raise AttributeError()
[docs]class YaraMatch(object):
"""
Represented matching results for rules. Returned by `YaraMatches.<rule>`.
Strings can be referenced by both attribute and index.
"""
def __init__(self, match, offset_mapper=None):
self.rule = self.name = match.rule
self.offsets = {}
for off, ident, buf in match.strings:
real_ident = ident.lstrip("$")
# Add group identifiers ($str1, $str2 => "str")
group_ident = re.match(r"^\$(\w+?[a-zA-Z])(\d*)$", ident)
if not group_ident:
group_ident = real_ident
else:
group_ident = group_ident.group(1)
# Map offset if offset_mapper is provided
if offset_mapper is not None:
off = offset_mapper(off, len(buf))
if off is None:
# Ignore match for unmapped region
continue
# Register offset for full identifier
if real_ident not in self.offsets:
self.offsets[real_ident] = []
self.offsets[real_ident].append(off)
# Register offset for grouped identifier
if real_ident != group_ident:
if group_ident not in self.offsets:
self.offsets[group_ident] = []
self.offsets[group_ident].append(off)
[docs] def keys(self):
"""List of matched string identifiers"""
return self.offsets.keys()
[docs] def get(self, item):
"""Get matched string offsets or empty list if not matched"""
return self.offsets.get(item, [])
def __bool__(self):
return bool(self.offsets)
def __nonzero__(self):
return self.__bool__()
def __contains__(self, item):
return item in self.offsets
def __getitem__(self, item):
return self.offsets[item]
def __getattr__(self, item):
try:
return self[item]
except IndexError:
raise AttributeError()