You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
468 lines
17 KiB
468 lines
17 KiB
|
|
import sys, os |
|
from io import BytesIO |
|
from construct import * |
|
from pathlib import Path # overrides same name from construct |
|
import ctypes |
|
from textwrap import dedent |
|
|
|
def dump_sd(path): |
|
return getxattr(path, "system.ntfs_acl") |
|
|
|
def write_sd(path, sd): |
|
if isinstance(sd, SecurityDescriptor): sd = sd.build() |
|
elif isinstance(sd, Container): sd = SecurityDescriptorStruct.build(sd) |
|
os.setxattr(path, "system.ntfs_acl", sd) |
|
|
|
_cdll = ctypes.CDLL(None) |
|
|
|
#_cdll.libcnotify_verbose_set(1) |
|
|
|
def getxattr(path, name): |
|
# We have to use ctypes instead of os.getxattr, because os.getxattr uses a 128B buffer, |
|
# which is too small (ironically, it retries with larger buffer if it gets ERANGE, but |
|
# ntfs-3g returns EIO instead of ERANGE when buffer is too small). It cannot be forced |
|
# to use larger buffer by default. |
|
if isinstance(name, str): |
|
name = name.encode('ascii') |
|
if isinstance(path, os.PathLike): |
|
path = str(path) |
|
if isinstance(path, str): |
|
path = path.encode(sys.getfilesystemencoding()) |
|
global _cdll |
|
size = _cdll.getxattr(path, name, None, 0) |
|
buf = ctypes.create_string_buffer(b'', size) |
|
_cdll.getxattr(path, name, buf, size) |
|
return buf.raw |
|
|
|
|
|
#class ClassAdapter(Adapter): |
|
# def __init__(self, cls, *a, **kw): |
|
# self.adapt_cls = cls |
|
# super().__init__(*a, **kw) |
|
# |
|
# def _decode(self, obj, context, path): |
|
# return self.adapt_cls.from_construct(obj) |
|
# |
|
# def _encode(self, obj, context, path): |
|
# if not isinstance(obj, self.adapt_cls): raise TypeError |
|
# return obj.to_construct() |
|
# |
|
#class ACLAdapter(Adapter): |
|
# def _decode(self, obj, context, path): |
|
# assert len(obj.items) == obj.cnt |
|
# return obj.items |
|
# |
|
# def _encode(self, obj, context, path): |
|
# return dict(revision=2, cnt=len(obj), items=obj) |
|
|
|
|
|
def _getpath(obj, path): |
|
if isinstance(path, str): path = path.split('.') |
|
for c in path: |
|
obj = obj[c] |
|
return obj |
|
|
|
def _setpath(obj, path, val): |
|
if isinstance(path, str): path = path.split('.') |
|
for c in path[:-1]: |
|
obj = obj[c] |
|
obj[path[-1]] = val |
|
|
|
class WithSize(Subconstruct): |
|
def __init__(self, size_path, subcon, pattern=b"\x00"): |
|
if not isinstance(pattern, bytes) or len(pattern) != 1: |
|
raise PaddingError("pattern expected to be bytes of length 1") |
|
super().__init__(subcon) |
|
self.size_path = size_path |
|
self.pattern = pattern |
|
|
|
def _parse(self, stream, context, path): |
|
position1 = stream.tell() |
|
obj = self.subcon._parsereport(stream, context, path) |
|
#print("obj:", obj) |
|
position2 = stream.tell() |
|
|
|
actual_length = (position2 - position1) |
|
formal_length = _getpath(obj, self.size_path) |
|
if formal_length < 0: |
|
raise PaddingError("length cannot be negative", path=path) |
|
pad = formal_length - actual_length |
|
#print("Path:", path) |
|
#print("Subcon:", self.subcon, self is ACEStruct, self is ACLStruct) |
|
#print("Formal length:", formal_length) |
|
#print("Actual length:", actual_length, position1, "..", position2) |
|
if pad < 0: |
|
raise PaddingError("subcon parsed %d bytes but was allowed only %d" % (actual_length, formal_length), path=path) |
|
stream_read(stream, pad, path) |
|
return obj |
|
|
|
def _build(self, obj, stream, context, path): |
|
tmpstream = BytesIO() |
|
self.subcon._build(obj, tmpstream, context, path) |
|
size = len(tmpstream.getvalue()) |
|
_setpath(obj, self.size_path, size) |
|
#print(size, obj) |
|
return self.subcon._build(obj, stream, context, path) |
|
|
|
def _sizeof(self, context, path): |
|
return self.subcon._sizeof(context, path) |
|
|
|
|
|
# https://github.com/tuxera/ntfs-3g/blob/a4a837025b6ac2b0c44c93e34e22535fe9e95b27/ntfsprogs/ntfssecaudit.c#L1229 |
|
SDHeaderStruct = Struct( |
|
"revision" / Const(1, Byte), |
|
Padding(1), |
|
"flags" / FlagsEnum(Int16ul, |
|
# https://docs.microsoft.com/en-us/windows/win32/secauthz/security-descriptor-control |
|
SE_OWNER_DEFAULTED = 0x0001, |
|
SE_GROUP_DEFAULTED = 0x0002, |
|
SE_DACL_PRESENT = 0x0004, |
|
SE_DACL_DEFAULTED = 0x0008, |
|
SE_SACL_DEFAULTED = 0x0008, |
|
SE_SACL_PRESENT = 0x0010, |
|
SE_DACL_AUTO_INHERIT_REQ = 0x0100, |
|
SE_SACL_AUTO_INHERIT_REQ = 0x0200, |
|
SE_DACL_AUTO_INHERITED = 0x0400, |
|
SE_SACL_AUTO_INHERITED = 0x0800, |
|
SE_DACL_PROTECTED = 0x1000, |
|
SE_SACL_PROTECTED = 0x2000, |
|
SE_RM_CONTROL_VALID = 0x4000, |
|
SE_SELF_RELATIVE = 0x8000, |
|
), |
|
"owner_sid_offset" / Int32ul, |
|
"group_sid_offset" / Int32ul, |
|
"sacl_offset" / Int32ul, |
|
"dacl_offset" / Int32ul, |
|
) |
|
|
|
# https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-sid |
|
# https://docs.microsoft.com/en-us/windows/win32/secauthz/sid-components |
|
# https://github.com/tuxera/ntfs-3g/blob/a4a837025b6ac2b0c44c93e34e22535fe9e95b27/ntfsprogs/ntfssecaudit.c#L957 |
|
SIDStruct = Struct( |
|
"revision" / Const(1, Byte), |
|
"cnt" / Byte, |
|
"auth" / BytesInteger(6), # strangely, this is big endian, unlike other fields |
|
"subauth" / Int32ul[this.cnt], |
|
) |
|
|
|
# https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-acl |
|
# https://github.com/tuxera/ntfs-3g/blob/a4a837025b6ac2b0c44c93e34e22535fe9e95b27/ntfsprogs/ntfssecaudit.c#L1420 |
|
ACLHeaderStruct = Struct( |
|
"revision" / Byte, # Const(2, Byte), |
|
Padding(1), |
|
"size" / Int16ul, |
|
"cnt" / Int16ul, |
|
Padding(2), |
|
|
|
) |
|
|
|
# https://docs.microsoft.com/en-us/windows/win32/secauthz/ace |
|
# https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-ace_header |
|
ACCESS_MAX_MS_V2_ACE_TYPE = 0x3 |
|
ACCESS_MAX_MS_V3_ACE_TYPE = 0x4 |
|
ACCESS_MAX_MS_V4_ACE_TYPE = 0x8 |
|
ACCESS_MAX_MS_V5_ACE_TYPE = 0x14 |
|
ACEHeaderStruct = Struct( |
|
"type" / Enum(Byte, |
|
# from wine's winnt.h |
|
ACCESS_ALLOWED_ACE_TYPE = 0x0, |
|
ACCESS_DENIED_ACE_TYPE = 0x1, |
|
SYSTEM_AUDIT_ACE_TYPE = 0x2, |
|
SYSTEM_ALARM_ACE_TYPE = 0x3, |
|
ACCESS_ALLOWED_COMPOUND_ACE_TYPE = 0x4, |
|
ACCESS_ALLOWED_OBJECT_ACE_TYPE = 0x5, |
|
ACCESS_DENIED_OBJECT_ACE_TYPE = 0x6, |
|
ACCESS_AUDIT_OBJECT_ACE_TYPE = 0x7, |
|
ACCESS_ALARM_OBJECT_ACE_TYPE = 0x8, |
|
ACCESS_ALLOWED_CALLBACK_ACE_TYPE = 0x9, |
|
ACCESS_DENIED_CALLBACK_ACE_TYPE = 0xa, |
|
ACCESS_ALLOWED_CALLBACK_OBJECT_ACE_TYPE = 0xb, |
|
ACCESS_DENIED_CALLBACK_OBJECT_ACE_TYPE = 0xc, |
|
SYSTEM_AUDIT_CALLBACK_ACE_TYPE = 0xd, |
|
SYSTEM_ALARM_CALLBACK_ACE_TYPE = 0xe, |
|
SYSTEM_AUDIT_CALLBACK_OBJECT_ACE_TYPE = 0xf, |
|
SYSTEM_ALARM_CALLBACK_OBJECT_ACE_TYPE = 0x10, |
|
SYSTEM_MANDATORY_LABEL_ACE_TYPE = 0x11, |
|
SYSTEM_RESOURCE_ATTRIBUTE_ACE_TYPE = 0x12, |
|
SYSTEM_SCOPED_POLICY_ID_ACE_TYPE = 0x13, |
|
SYSTEM_PROCESS_TRUST_LABEL_ACE_TYPE = 0x14, |
|
), |
|
"flags" / FlagsEnum(Byte, |
|
# from wine's winnt.h |
|
# inheritance AceFlags |
|
OBJECT_INHERIT_ACE = 0x01, |
|
CONTAINER_INHERIT_ACE = 0x02, |
|
NO_PROPAGATE_INHERIT_ACE = 0x04, |
|
INHERIT_ONLY_ACE = 0x08, |
|
INHERITED_ACE = 0x10, |
|
|
|
# AceFlags mask for what events we (should) audit |
|
SUCCESSFUL_ACCESS_ACE_FLAG = 0x40, |
|
FAILED_ACCESS_ACE_FLAG = 0x80, |
|
), |
|
"size" / Int16ul, |
|
) |
|
|
|
|
|
# https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-access_allowed_ace |
|
AllowDenyBodyStruct = Struct( |
|
# https://docs.microsoft.com/en-us/windows/win32/secauthz/access-mask |
|
"access_mask" / Int32ul, |
|
"sid" / SIDStruct, |
|
) |
|
|
|
ACE_BODY_STRUCTS = { |
|
'ACCESS_ALLOWED_ACE_TYPE': AllowDenyBodyStruct, |
|
'ACCESS_DENIED_ACE_TYPE': AllowDenyBodyStruct, |
|
# ... |
|
} |
|
ACEStruct = WithSize('header.size', Struct( |
|
"header" / ACEHeaderStruct, |
|
"body" / Switch( |
|
this.header.type, |
|
ACE_BODY_STRUCTS, |
|
), |
|
#Padding(lambda this: this.header.size - this._subcons.header.sizeof() - this._subcons.body.sizeof()), |
|
)) |
|
|
|
ACLStruct = WithSize('header.size', Struct( |
|
"header" / ACLHeaderStruct, |
|
"items" / ACEStruct[this.header.cnt], |
|
#Padding(lambda this: this.header.size - this._subcons.header.sizeof() - this._subcons.items.sizeof()), |
|
)) |
|
|
|
SecurityDescriptorStruct = Struct( |
|
"header" / SDHeaderStruct, |
|
"owner_sid" / Pointer(this.header.owner_sid_offset, SIDStruct), |
|
"group_sid" / Pointer(this.header.group_sid_offset, SIDStruct), |
|
"sacl" / If(lambda this: this.header.flags.get('SE_SACL_PRESENT') and this.header.sacl_offset >= this._subcons.header.sizeof(), Pointer(this.header.sacl_offset, ACLStruct)), |
|
"dacl" / If(lambda this: this.header.flags.get('SE_DACL_PRESENT') and this.header.dacl_offset >= this._subcons.header.sizeof(), Pointer(this.header.dacl_offset, ACLStruct)), |
|
) |
|
|
|
class Constructable: |
|
@classmethod |
|
def parse(cls, s): |
|
return cls.from_construct(cls.STRUCT.parse(s)) |
|
|
|
def build(self): |
|
return self.STRUCT.build(self.to_construct()) |
|
|
|
|
|
|
|
class SID(Constructable): |
|
STRUCT = SIDStruct |
|
def __init__(self, revision, auth=None, *subauth): |
|
if isinstance(revision, str): |
|
if not revision.startswith('S-'): raise ValueError |
|
revision, auth, *subauth = [ int(x) for x in revision[2:].split('-') ] |
|
elif isinstance(revision, SID): |
|
sid = revision |
|
revision = sid.revision |
|
auth = sid.auth |
|
subauth = sid.subauth |
|
if auth is None: raise ValueError |
|
self.revision = revision |
|
self.auth = auth |
|
self.subauth = tuple(subauth) |
|
|
|
def __eq__(self, other): |
|
if isinstance(other, SID): |
|
return self.revision == other.revision and self.auth == other.auth and self.subauth == other.subauth |
|
elif isinstance(other, str): |
|
return self == SID(other) |
|
else: return NotImplemented |
|
|
|
__req__ = __eq__ |
|
|
|
def __hash__(self): |
|
return hash((self.revision, self.auth, self.subauth)) |
|
|
|
def __str__(self): |
|
return 'S-' + '-'.join( str(a) for a in (self.revision, self.auth)+self.subauth ) |
|
|
|
def __repr__(self): |
|
return f'SID({str(self)!r})' |
|
|
|
@classmethod |
|
def from_construct(cls, cont): |
|
return cls(cont.revision, cont.auth, *cont.subauth) |
|
|
|
def to_construct(self): |
|
return dict(revision=self.revision, cnt=len(self.subauth), auth=self.auth, subauth=self.subauth) |
|
|
|
SID_ADMINISTRATORS = SID('S-1-5-32-544') # the built-in Administrators group |
|
SID_SYSTEM = SID('S-1-5-18') # the Local System (NT AUTHORITY\SYSTEM) account |
|
SID_USERS = SID('S-1-5-32-545') |
|
SID_AUTH_USERS = SID('S-1-5-11') # Authenticated Users |
|
SID_EVERYONE = SID('S-1-1-0') |
|
|
|
# as empiricaly set by Windows 10 |
|
MASK_FULL_CONTROL = 0x1f01ff |
|
MASK_READ_EXECUTE = 0x1200a9 |
|
|
|
def flags2set(flags): |
|
return frozenset( str(k) for k,v in flags.items() if v if k != '_flagsenum' ) |
|
|
|
def set2flags(s): |
|
return Container({ f: True for f in set(s) }) |
|
|
|
class ACE(Constructable): |
|
STRUCT = ACEStruct |
|
ALLOW = 'ACCESS_ALLOWED_ACE_TYPE' |
|
DENY = 'ACCESS_DENIED_ACE_TYPE' |
|
def __init__(self, type, access_mask, sid, flags=None, inheritance=None): |
|
self.type = type |
|
self.access_mask = access_mask |
|
self.sid = sid |
|
if flags is None and inheritance is None: |
|
inheritance = 'all' |
|
flags = set(flags or []) |
|
if inheritance in ('object', 'file', 'all'): |
|
flags |= {'OBJECT_INHERIT_ACE'} |
|
if inheritance in ('container', 'dir', 'all'): |
|
flags |= {'CONTAINER_INHERIT_ACE'} |
|
self.flags = frozenset(flags) |
|
|
|
def __repr__(self): |
|
return f'ACE({self.type!r}, {self.access_mask!r}, {self.sid!r}, flags={set(self.flags)!r})' |
|
|
|
@classmethod |
|
def from_construct(cls, cont): |
|
if cont.header.type not in (cls.ALLOW, cls.DENY): |
|
raise NotImplementedError |
|
return ACE(str(cont.header.type), cont.body.access_mask, SID.from_construct(cont.body.sid), flags=flags2set(cont.header.flags)) |
|
|
|
def to_construct(self): |
|
r = Container() |
|
r.header = Container(type=self.type, size=0, flags=set2flags(self.flags)) |
|
r.body = Container(access_mask=self.access_mask, sid=self.sid.to_construct(), flags={ f: True for f in self.flags }) |
|
return r |
|
|
|
def make_inherited(self): |
|
return ACE(self.type, self.access_mask, self.sid, self.flags | {"INHERITED_ACE"} - {"INHERIT_ONLY_ACE"}) |
|
|
|
|
|
class SecurityDescriptor(Constructable): |
|
STRUCT = SecurityDescriptorStruct |
|
|
|
def __init__(self, *, owner_sid=SID_ADMINISTRATORS, group_sid=SID_ADMINISTRATORS, dacl=[], sacl=None, sacl_inherit=True, dacl_inherit=True): |
|
self.owner_sid = SID(owner_sid) |
|
self.group_sid = SID(group_sid) |
|
self.sacl = sacl |
|
self.dacl = dacl |
|
self.sacl_inherit = sacl_inherit |
|
self.dacl_inherit = dacl_inherit |
|
|
|
def __repr__(self): |
|
r = ['SecurityDescriptor('] |
|
r.append(f' owner_sid={self.owner_sid!r},') |
|
r.append(f' group_sid={self.group_sid!r},') |
|
for name in ('sacl', 'dacl'): |
|
acl = getattr(self, name) |
|
if not acl: continue |
|
r.append(f' {name}=[') |
|
for ace in acl: |
|
r.append(' '*8 + repr(ace)) |
|
r.append(' ],') |
|
if not self.sacl_inherit: r.append(' sacl_inherit=False,') |
|
if not self.dacl_inherit: r.append(' dacl_inherit=False,') |
|
r.append(')') |
|
return '\n'.join(r) |
|
|
|
@classmethod |
|
def acl_from_construct(cls, cont): |
|
if cont: return [ ACE.from_construct(itm) for itm in cont['items'] ] |
|
else: return None |
|
|
|
@classmethod |
|
def acl_to_construct(cls, acl): |
|
# Empty ACL is not the same as no ACL. |
|
# https://docs.microsoft.com/en-us/windows/win32/secauthz/null-dacls-and-empty-dacls |
|
if acl is None: |
|
return None |
|
else: |
|
r = Container() |
|
r.header = Container(revision=2, cnt=len(acl), size=0) |
|
r.items = [ itm.to_construct() for itm in acl ] |
|
return r |
|
|
|
|
|
@classmethod |
|
def from_construct(cls, cont): |
|
owner_sid = SID.from_construct(cont.owner_sid) |
|
group_sid = SID.from_construct(cont.group_sid) |
|
sacl = cls.acl_from_construct(cont.sacl) |
|
dacl = cls.acl_from_construct(cont.dacl) |
|
flags = flags2set(cont.header.flags) |
|
return SecurityDescriptor(owner_sid=owner_sid, group_sid=group_sid, sacl=sacl, dacl=dacl, |
|
sacl_inherit="SE_SACL_PROTECTED" not in flags, |
|
dacl_inherit="SE_DACL_PROTECTED" not in flags, |
|
) |
|
|
|
|
|
def to_construct(self): |
|
hdr = Container() |
|
r = Container() |
|
r.header = hdr |
|
r.owner_sid = self.owner_sid.to_construct() |
|
r.group_sid = self.group_sid.to_construct() |
|
r.dacl = self.acl_to_construct(self.dacl) |
|
r.sacl = self.acl_to_construct(self.sacl) |
|
|
|
# SE_SELF_RELATIVE specifies that offsets in header are relative to start of SD. |
|
# This should be always set in on-disk SDs. |
|
# https://docs.microsoft.com/en-us/windows/win32/secauthz/absolute-and-self-relative-security-descriptors |
|
flags = {"SE_SELF_RELATIVE"} |
|
# Windows seems to always set SE_DACL_AUTO_INHERITED but not SE_DACL_AUTO_INHERIT_REQ. We mirror that. |
|
if self.sacl is not None: flags |= {"SE_SACL_PRESENT", "SE_SACL_AUTO_INHERITED"} |
|
if self.dacl is not None: flags |= {"SE_DACL_PRESENT", "SE_DACL_AUTO_INHERITED"} |
|
if not self.sacl_inherit: |
|
flags |= {"SE_SACL_PROTECTED"} |
|
if not self.dacl_inherit: |
|
flags |= {"SE_DACL_PROTECTED"} |
|
|
|
hdr.flags = set2flags(flags) |
|
|
|
sacl_size = len(ACLStruct.build(r.sacl)) if r.sacl else 0 |
|
dacl_size = len(ACLStruct.build(r.dacl)) if r.dacl else 0 |
|
pos = SDHeaderStruct.sizeof() |
|
def add_element(struct, cont): |
|
nonlocal pos |
|
off = pos |
|
pos += len(struct.build(cont)) |
|
return off |
|
hdr.owner_sid_offset = add_element(SIDStruct, r.owner_sid) |
|
hdr.group_sid_offset = add_element(SIDStruct, r.group_sid) |
|
if r.sacl: hdr.sacl_offset = add_element(ACLStruct, r.sacl) |
|
else: hdr.sacl_offset = 0 |
|
if r.dacl: hdr.dacl_offset = add_element(ACLStruct, r.dacl) |
|
else: hdr.dacl_offset = 0 |
|
return r |
|
|
|
|
|
def apply_sd_recursively(path, sd=None, *, dacl=None, skip_protected=True, set_owner=False, set_group=False): |
|
path = Path(path) |
|
if sd is None: |
|
sd = SecurityDescriptor.parse(dump_sd(path)) |
|
else: |
|
write_sd(path, sd) |
|
|
|
def visit(p): |
|
is_dir = p.is_dir() |
|
kind = 'CONTAINER' if is_dir else 'OBJECT' |
|
acl = [ ace.make_inherited() for ace in sd.dacl if f'{kind}_INHERIT_ACE' in ace.flags ] |
|
child_sd = SecurityDescriptor.parse(dump_sd(p)) |
|
if (not child_sd.dacl_inherit) and skip_protected: return # do not modify ACL and do not recurse |
|
child_sd.dacl = acl |
|
if set_owner: child_sd.owner_sid = sd.owner_sid |
|
if set_group: child_sd.group_sid = sd.group_sid |
|
print(p, child_sd) |
|
write_sd(p, child_sd) |
|
if is_dir: |
|
for child in p.iterdir(): visit(child) |
|
|
|
for p in path.iterdir(): visit(p) |
|
|
|
|
|
|
|
|