import os
from collections import defaultdict
from lark import Lark, Transformer, v_args
from firepit.props import parse_prop
[docs]def get_grammar():
pth = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"paramstix.lark")
return open(pth, "r").read()
[docs]def stix2sql(pattern, sco_type, dialect='sqlite3'):
grammar = get_grammar()
return Lark(grammar,
parser="lalr",
transformer=_TranslateTree(sco_type, dialect)).parse(pattern)
def _convert_op(sco_type, prop, op, rhs, dialect):
orig_op = op
neg, _, op = op.rpartition(' ')
if op == 'ISSUBSET':
#TODO: ipv6-addr
if sco_type == 'ipv4-addr' or prop in ['src_ref.value',
'dst_ref.value']:
return f'{neg} (in_subnet("{prop}", {rhs}))'
else:
raise ValueError(
f'{orig_op} not supported for SCO type {sco_type}')
elif op == 'ISSUPERSET': # When would anyone use ISSUPERSET?
#TODO: ipv6-addr
if sco_type == 'ipv4-addr' or prop in ['src_ref.value',
'dst_ref.value']:
return f'{neg} (in_subnet({rhs}, "{prop}"))' # FIXME!
else:
raise ValueError(
f'{orig_op} not supported for SCO type {sco_type}')
elif prop.endswith('payload_bin'):
if op == 'MATCHES':
return f'{neg} match_bin(CAST({rhs} AS TEXT), "{prop}")'
elif op == 'LIKE':
return f'{neg} like_bin(CAST({rhs} AS TEXT), "{prop}")'
elif op == 'MATCHES':
return f'{neg} match({rhs}, "{prop}")'
prop, chunk, subprop = prop.partition('[*]')
if chunk:
if op == '!=':
neg = 'NOT'
op = 'LIKE'
rhs = rhs.strip("'")
if subprop:
subprop = subprop.lstrip('.')
rhs = f"'%\"{subprop}\":\"{rhs}\"%'"
else:
rhs = f"'%{rhs}%'"
if dialect == 'postgresql' and op == 'LIKE':
rhs = rhs.replace("\\", r"\\") # PostgreSQL uses \ for escape with LIKE only!
return f'"{prop}" {neg} {op} {rhs}'
[docs]def comp2sql(sco_type, prop, op, value, dialect):
result = ''
links = parse_prop(sco_type, prop)
for link in reversed(links):
if link[0] == 'node':
from_type = link[1] or sco_type
result = _convert_op(from_type, link[2], op, value, dialect)
elif link[0] == 'rel':
_, from_type, ref_name, to_type = link
if ref_name.endswith('_refs'):
# Handle reflists
tmp = (f'JOIN "__reflist" AS "r" ON "{from_type}"."id" = "r"."source_ref"'
f' WHERE "r"."target_ref"')
else:
tmp = f'"{ref_name}"'
result = f' {tmp} IN (SELECT "id" FROM "{to_type}" WHERE {result})'
return result
[docs]def path2sql(sco_type, path):
result = ''
links = parse_prop(sco_type, path)
for link in reversed(links):
if link[0] == 'node':
pass
elif link[0] == 'rel':
result = f'"{link[2]}" IN (SELECT "id" FROM "{link[3]}" WHERE {result})'
return result
@v_args(inline=True)
class _TranslateTree(Transformer):
"""Transformer to convert relevant parts of STIX pattern to WHERE clause"""
def __init__(self, sco_type, dialect):
self.sco_type = sco_type
self.dialect = dialect
def _make_comp(self, lhs, op, rhs):
orig_op = op
sco_type, _, prop = lhs.partition(':')
# Ignore object paths that don't match table type
if self.sco_type == sco_type:
return comp2sql(sco_type, prop, op, rhs, self.dialect)
return ''
def _make_exp(self, lhs, op, rhs):
return op.join(filter(None, [lhs, rhs]))
def disj(self, lhs, rhs):
return self._make_exp(lhs, ' OR ', rhs)
def conj(self, lhs, rhs):
return self._make_exp(lhs, ' AND ', rhs)
def obs_disj(self, lhs, rhs):
return self.disj(lhs, rhs)
def obs_conj(self, lhs, rhs):
return self.conj(lhs, rhs)
def comp_grp(self, exp):
return f'({exp})'
def simple_comp_exp(self, lhs, op, rhs):
return self._make_comp(lhs, op, rhs)
def comp_disj(self, lhs, rhs):
return self.disj(lhs, rhs)
def comp_conj(self, lhs, rhs):
return self.conj(lhs, rhs)
def op(self, value):
return f'{value}'
def quoted_str(self, value):
# Adapt the string literal from STIX escapes to SQL escapes
value = value.replace(r'\\', '\\') # Convert double backslash to single
value = value.replace(r"\'", "''") # Convert escape from backslash to apostrophe
return f"'{value}'"
def lit_list(self, *args):
return "(" + ','.join(args) + ")"
def start(self, exp, qualifier):
# For now, drop the qualifier. Assume the query handled it.
return f'{exp}'
def object_path(self, sco_type, prop):
return f'{sco_type}:{prop}'
[docs]def summarize_pattern(pattern):
grammar = get_grammar()
paths = Lark(grammar,
parser="lalr",
transformer=_SummarizePattern()).parse(pattern)
result = defaultdict(set)
for path in paths:
sco_type, _, prop = path.partition(':')
result[sco_type].add(prop)
return result
@v_args(inline=True)
class _SummarizePattern(Transformer):
def obs_disj(self, lhs, rhs):
return lhs | rhs
def obs_conj(self, lhs, rhs):
return lhs & rhs
def comp_grp(self, exp):
return exp
def simple_comp_exp(self, lhs, _op, _rhs):
return {lhs}
def comp_disj(self, lhs, rhs):
return lhs | rhs
def comp_conj(self, lhs, rhs):
return lhs | rhs # Still want union here
# None of these actually matter
def op(self, _op):
return None
def quoted_str(self, _value):
return None
def lit_list(self, *args):
return None
def start(self, exp, _qualifier):
return exp
def object_path(self, sco_type, prop):
return f'{sco_type}:{prop}'