Source code for firepit.stix21

# Reference: https://docs.oasis-open.org/cti/stix/v2.1/os/stix-v2.1-os.html

import uuid

import ujson


ID_NAMESPACE = uuid.UUID('00abedb4-aa42-466c-9c01-fed23315a9b7')

HASHES_PREF_LIST = ["MD5", "'SHA-1'", "'SHA-256'", "'SHA-512'"]

ID_PROPS = {
    'artifact': ('hashes', 'payload_bin'),
    'autonomous-system': ('number',),
    'directory': ('path',),
    'domain-name': ('value',),
    'email-addr': ('value',),
    'email-message': ('from_ref', 'subject', 'body'),
    'file': ('hashes', 'name', 'extensions', 'parent_directory_ref'),
    'ipv4-addr': ('value',),
    'ipv6-addr': ('value',),
    'mac-addr': ('value',),
    'mutex': ('name',),
    'network-traffic': ('start', 'end', 'src_ref', 'dst_ref', 'src_port', 'dst_port', 'protocols', 'extensions'),
    'process': ('x_unique_id',),  # This is non-standard, obviously
    'software': ('name', 'cpe', 'swid', 'vendor', 'version'),
    'url': ('value',),
    'user-account': ('account_type', 'user_id', 'account_login'),
    'windows-registry-keys': ('key', 'values'),
    'x509-certificate': ('hashes', 'serial_number'),

    # Common extensions
    'x-oca-asset': ('host_id', 'device_id', 'name', 'hostname'),
}

PROCESS_UNIQUE_ID_PROPS = [
    'process_id', # reaqta (older version)
    'process_uid', # reaqta (newer)
    'process_unique_id', # sentinelone
    'process_guid', # just in case
]


def _get_asset_id(obs):
    """Find the first x-oca-asset and return ID or hostname"""
    for _, sco in obs.get('objects', {}).items():
        if sco['type'] == 'x-oca-asset':
            for prop in ('host_id', 'hostname'):
                if prop in sco:
                    return sco[prop]


[docs]def makeid(sco, obs=None): sco_type = sco['type'] contrib = {} # the ID contributing properties props = ID_PROPS.get(sco_type, []) for prop in props: if prop == 'hashes': # hashes is a special case. Choose first hash according to spec. hashes = sco.get('hashes') if hashes: for hash_type in HASHES_PREF_LIST: value = hashes.get(hash_type) if value: contrib['hashes'] = {hash_type.strip("'"): value} break else: # None of the preferred hashes found prop = sorted(list(hashes.keys()))[0] contrib['hashes'] = {prop.strip("'"): hashes[prop]} elif prop in sco: value = sco[prop] if prop.endswith('_ref') and obs: # Hook for STIX 2.0 SCOs target = obs['objects'].get(value) if target: value = makeid(target) contrib[prop] = value else: contrib[prop] = value if sco_type == 'process' and 'x_unique_id' not in contrib: unique_id = None for _, ext in sco.get('extensions', {}).items(): for prop in PROCESS_UNIQUE_ID_PROPS: unique_id = ext.get(prop) if unique_id: contrib['x_unique_id'] = unique_id break if unique_id: break else: # Still don't have unique_id if obs: # Try to use other SCOs pid = sco.get('pid') asset = _get_asset_id(obs) if pid and asset and obs: ts = obs['last_observed'] contrib['x_unique_id'] = f'{pid}_{asset}_{ts}' if contrib: name = ujson.dumps(contrib, sort_keys=True, ensure_ascii=False) oid = f'{sco_type}--{str(uuid.uuid5(ID_NAMESPACE, name))}' else: oid = f'{sco_type}--{str(uuid.uuid4())}' return oid