import enum
import json
import logging
import os
import re
import textwrap
from collections import defaultdict, namedtuple
from typing import Callable, Dict, Optional, Tuple, TypeVar
import yara
__all__ = [
"Yara",
"YaraString",
"YaraRulesetMatch",
"YaraRulesetOffsets",
"YaraRuleMatch",
"YaraRuleOffsets",
"YaraStringMatch",
"YaraMatches",
"YaraMatch",
]
log = logging.getLogger(__name__)
T = TypeVar("T")
OffsetMapper = Callable[[Optional[int], Optional[int]], Optional[int]]
YaraRulesString = Tuple[int, str, bytes]
class _Mapper:
def __init__(self, elements, default=None):
self.elements = elements
self.default = default
def keys(self):
"""List of matched string identifiers"""
return self.elements.keys()
def get(self, item):
"""Get matched string offsets or default if not matched"""
return self.elements.get(item, self.default)
def __bool__(self):
return bool(self.elements)
def __nonzero__(self):
return self.__bool__()
def __contains__(self, item):
return item in self.elements
def __getitem__(self, item):
return self.elements[item]
def __getattr__(self, item):
try:
return self[item]
except IndexError:
raise AttributeError()
[docs]class Yara:
"""
Represents Yara ruleset. Rules can be compiled from set of files or defined in code (single rule only).
Most simple rule (with default identifiers left):
.. code-block:: python
from malduck.yara import Yara, YaraString
Yara(strings="MALWR").match(data=b"MALWRMALWARMALWR").r.string == [0, 11]
Example of more complex rule defined in Python:
.. code-block:: python
from malduck.yara import Yara, YaraString
ruleset = Yara(name="MalwareRule",
strings={
"xor_stub": YaraString("This program cannot", xor=True, ascii=True),
"code_ref": YaraString("E2 34 ?? C8 A? FB", type=YaraString.HEX),
"mal1": "MALWR",
"mal2": "MALRW"
}, condition="( $xor_stub and $code_ref ) or any of ($mal*)")
# If mal1 or mal2 are matched, they are grouped into "mal"
# Print appropriate offsets
match = ruleset.match(data=b"MALWR MALRW")
if match:
# ["mal1", "mal", "mal2"]
print(match.MalwareRule.keys())
if "mal" in match.MalwareRule:
# Note: Order of offsets for grouped strings is undetermined
print("mal*", match.MalwareRule["mal"])
:param rule_paths: Dictionary of {"namespace": "rule_path"}. See also :py:meth:`Yara.from_dir`.
:type rule_paths: dict
:param name: Name of generated rule (default: "r")
:type name: str
:param strings: Dictionary representing set of string patterns ({"string_identifier": YaraString or plain str})
:type strings: dict or str or :class:`YaraString`
:param condition: Yara rule condition (default: "any of them")
:type condition: str
"""
def __init__(
self, rule_paths=None, name="r", strings=None, condition="any of them"
):
if rule_paths:
self.rules = yara.compile(filepaths=rule_paths)
return
if not strings:
raise ValueError("No strings specified")
if isinstance(strings, str) or isinstance(strings, YaraString):
strings = {"string": strings}
yara_strings = "\n ".join(
[
f"${key} = {str(YaraString(value) if isinstance(value, str) else value)}"
for key, value in strings.items()
]
)
yara_source = textwrap.dedent(
f"""
rule {name} {{
strings:
{yara_strings}
condition:
{condition}
}}
"""
)
self.rules = yara.compile(source=yara_source)
[docs] @staticmethod
def from_dir(path, recursive=True, followlinks=True):
"""
Find rules (recursively) in specified path. Supported extensions: \\*.yar, \\*.yara
:param path: Root path for searching
:type path: str
:param recursive: Search recursively (default: enabled)
:type recursive: bool
:param followlinks: Follow symbolic links (default: enabled)
:type followlinks: bool
:rtype: :class:`Yara`
"""
rule_paths: Dict[str, str] = {}
for root, _, files in os.walk(path, followlinks=followlinks):
for fname in files:
if not fname.endswith(".yar") and not fname.endswith(".yara"):
continue
ruleset_name = os.path.splitext(os.path.basename(fname))[0]
ruleset_path = os.path.join(root, fname)
if ruleset_name in rule_paths:
log.warning(
f"Yara file name collision - {rule_paths[ruleset_name]} "
f"overridden by {ruleset_path}"
)
rule_paths[ruleset_name] = ruleset_path
if not recursive:
break
return Yara(rule_paths=rule_paths)
[docs] def match(self, offset_mapper=None, extended=False, **kwargs):
"""
Perform matching on file or data block
:param filepath: Path to the file to be scanned
:type filepath: str
:param data: Data to be scanned
:type data: str
:param offset_mapper: Offset mapping function. For unmapped region, should returned None.
Used by :py:meth:`malduck.procmem.ProcessMemory.yarav`
:type offset_mapper: function
:param extended: Returns extended information about matched strings and rules
:type extended: bool (optional, default False)
:rtype: :class:`malduck.yara.YaraRulesetOffsets` or :class:`malduck.yara.YaraRulesetMatches`
if extended is set to True
"""
matches = YaraRulesetMatch(
self.rules.match(**kwargs), offset_mapper=offset_mapper
)
return YaraRulesetOffsets(matches) if not extended else matches
class YaraStringType(enum.IntEnum):
TEXT = 0
HEX = 1
REGEX = 2
[docs]class YaraString:
"""
Formatter for Yara string patterns
:param value: Pattern value
:type value: str
:param type: Pattern type (default is :py:attr:`YaraString.TEXT`)
:type type: :py:attr:`YaraString.TEXT` / :py:attr:`YaraString.HEX` / :py:attr:`YaraString.REGEX`
:param modifiers: Yara string modifier flags
"""
TEXT = YaraStringType.TEXT
HEX = YaraStringType.HEX
REGEX = YaraStringType.REGEX
def __init__(self, value, type=YaraStringType.TEXT, **modifiers):
self.value = value
self.type = type
self.modifiers = [k for k, v in modifiers.items() if v is True]
def __str__(self):
if self.type == YaraStringType.TEXT:
str_value = json.dumps(self.value)
elif self.type == YaraStringType.HEX:
str_value = f"{{ {self.value} }}"
elif self.type == YaraStringType.REGEX:
str_regex = "\\/".join(self.value.split("/"))
str_value = f"/{str_regex}/"
else:
raise ValueError(f"Unknown YaraString type: {self.type}")
return str_value + "".join([" " + modifier for modifier in self.modifiers])
class YaraRulesetMatch(_Mapper):
"""
Yara ruleset matches. Returned by :py:meth:`Yara.match`.
Rules can be referenced by both attribute and index.
"""
def __init__(self, matches, offset_mapper=None):
self._matches = matches
super().__init__(elements=self._map_matches(matches, offset_mapper))
def _map_matches(self, matches, offset_mapper):
mapped_matches = [
(match, self._map_strings(match.strings, offset_mapper))
for match in matches
]
return {
match.rule: YaraRuleMatch(
match.rule, strings, match.meta, match.namespace, match.tags
)
for match, strings in mapped_matches
if strings
}
def _map_strings(self, strings, offset_mapper):
mapped_strings = defaultdict(list)
for yara_string in strings:
# yara-python 4.3.0 broke compatibilty and started returning a StringMatch object
if type(yara_string) is tuple:
offsets = [yara_string[0]]
identifier = yara_string[1]
contents = [yara_string[2]]
else:
offsets = [x.offset for x in yara_string.instances]
identifier = yara_string.identifier
contents = [x.matched_data for x in yara_string.instances]
# Get identifier without "$" and group identifier
real_ident, group_ident = self._parse_string_identifier(identifier)
for offset, content in zip(offsets, contents):
# Map offset if offset_mapper is provided
if offset_mapper is not None:
_offset = offset_mapper(offset, len(content))
if _offset is None:
# Ignore match for unmapped region
continue
offset = _offset
# Register offset for full identifier
mapped_strings[real_ident].append(
YaraStringMatch(real_ident, offset, content)
)
# Register offset for grouped identifier
if real_ident != group_ident:
mapped_strings[group_ident].append(
YaraStringMatch(real_ident, offset, content)
)
return mapped_strings
def _parse_string_identifier(self, identifier):
real_ident = identifier.lstrip("$")
# Add group identifiers ($str1, $str2 => "str")
match_ident = re.match(r"^\$(\w+?[a-zA-Z])_?(\d*)$", identifier)
group_ident = match_ident.group(1) if match_ident else real_ident
return real_ident, group_ident
def remap(self, offset_mapper=None):
return YaraRulesetMatch(self._matches, offset_mapper=offset_mapper)
class YaraRulesetOffsets(_Mapper):
def __init__(self, matches):
self._matches = matches
super().__init__(
elements={k: YaraRuleOffsets(v) for k, v in matches.elements.items()}
)
def remap(self, offset_mapper=None):
return YaraRulesetOffsets(self._matches.remap(offset_mapper))
YaraStringMatch = namedtuple("YaraStringMatch", ["identifier", "offset", "content"])
class YaraRuleMatch(_Mapper):
"""
Rule matches. Returned by `YaraMatches.<rule>`.
Strings can be referenced by both attribute and index.
"""
def __init__(self, rule, strings, meta, namespace, tags):
self.rule = self.name = rule
self.meta = meta
self.namespace = namespace
self.tags = tags
super().__init__(
elements={k: sorted(v, key=lambda s: s.offset) for k, v in strings.items()}
)
def get_offsets(self, string):
return [match.offset for match in self.elements.get(string, [])]
class YaraRuleOffsets(_Mapper):
def __init__(self, rule_match):
self.rule = self.name = rule_match.rule
super().__init__(
{
identifier: [match.offset for match in string_matches]
for identifier, string_matches in rule_match.elements.items()
},
default=[],
)
# Legacy aliases, don't use them in new code
YaraMatches = YaraRulesetOffsets
YaraMatch = YaraRuleOffsets