# coding: utf-8
__all__ = '__version__', 'Fanotify', 'FanotifyClient', 'FanoRule'
import array
import atexit
import datetime
import errno
import hashlib
import logging
import multiprocessing as mp
import os
import socket
import struct
import sys
from typing import Any, Callable, Iterable, Optional, Tuple, Union
from . import ext
FanoRule = ext.FanoRule
__version__ = '0.3.0'
# events
FAN_ACCESS = ext.FAN_ACCESS
FAN_MODIFY = ext.FAN_MODIFY
FAN_ATTRIB = ext.FAN_ATTRIB # since Linux 5.1
FAN_CLOSE_WRITE = ext.FAN_CLOSE_WRITE
FAN_CLOSE_NOWRITE = ext.FAN_CLOSE_NOWRITE
FAN_OPEN = ext.FAN_OPEN
FAN_MOVED_FROM = ext.FAN_MOVED_FROM # since Linux 5.1
FAN_MOVED_TO = ext.FAN_MOVED_TO # since Linux 5.1
FAN_CREATE = ext.FAN_CREATE # since Linux 5.1
FAN_DELETE = ext.FAN_DELETE # since Linux 5.1
FAN_DELETE_SELF = ext.FAN_DELETE_SELF # since Linux 5.1
FAN_MOVE_SELF = ext.FAN_MOVE_SELF # since Linux 5.1
FAN_OPEN_EXEC = ext.FAN_OPEN_EXEC # since Linux 5.0
FAN_Q_OVERFLOW = ext.FAN_Q_OVERFLOW
FAN_OPEN_PERM = ext.FAN_OPEN_PERM
FAN_ACCESS_PERM = ext.FAN_ACCESS_PERM
FAN_OPEN_EXEC_PERM = ext.FAN_OPEN_EXEC_PERM # since Linux 5.0
FAN_RENAME = ext.FAN_RENAME # since Linux 5.17
FAN_ONDIR = ext.FAN_ONDIR
FAN_EVENT_ON_CHILD = ext.FAN_EVENT_ON_CHILD
FAN_CLOSE = FAN_CLOSE_WRITE | FAN_CLOSE_NOWRITE
FAN_MOVE = FAN_MOVED_FROM | FAN_MOVED_TO
FAN_ALL_EVENTS = FAN_ACCESS | FAN_MODIFY | FAN_OPEN | FAN_OPEN_EXEC | FAN_CLOSE | FAN_ONDIR | FAN_EVENT_ON_CHILD
FAN_ALL_PERM_EVENTS = FAN_OPEN_PERM | FAN_ACCESS_PERM | FAN_OPEN_EXEC_PERM
FAN_ALL_FID_EVENTS = (FAN_ATTRIB | FAN_MOVE | FAN_CREATE | FAN_DELETE | FAN_DELETE_SELF
| FAN_MOVE_SELF | FAN_RENAME | FAN_ONDIR | FAN_EVENT_ON_CHILD)
# flags for init()
FAN_CLOEXEC = ext.FAN_CLOEXEC
FAN_NONBLOCK = ext.FAN_NONBLOCK
# NOT bitwise flags
FAN_CLASS_NOTIF = ext.FAN_CLASS_NOTIF
FAN_CLASS_CONTENT = ext.FAN_CLASS_CONTENT
FAN_CLASS_PRE_CONTENT = ext.FAN_CLASS_PRE_CONTENT
FAN_UNLIMITED_QUEUE = ext.FAN_UNLIMITED_QUEUE
FAN_UNLIMITED_MARKS = ext.FAN_UNLIMITED_MARKS
FAN_ENABLE_AUDIT = ext.FAN_ENABLE_AUDIT # since Linux 4.15
# Flags to determine fanotify event format
FAN_REPORT_TID = ext.FAN_REPORT_TID # since Linux 4.20
FAN_REPORT_FID = ext.FAN_REPORT_FID # since Linux 5.1
FAN_REPORT_DIR_FID = ext.FAN_REPORT_DIR_FID # since Linux 5.9
FAN_REPORT_NAME = ext.FAN_REPORT_NAME # since Linux 5.9
FAN_REPORT_DFID_NAME = ext.FAN_REPORT_DFID_NAME # since Linux 5.9
# flags for mark()
FAN_MARK_ADD = ext.FAN_MARK_ADD
FAN_MARK_REMOVE = ext.FAN_MARK_REMOVE
FAN_MARK_DONT_FOLLOW = ext.FAN_MARK_DONT_FOLLOW
FAN_MARK_ONLYDIR = ext.FAN_MARK_ONLYDIR
FAN_MARK_IGNORED_MASK = ext.FAN_MARK_IGNORED_MASK
FAN_MARK_IGNORED_SURV_MODIFY = ext.FAN_MARK_IGNORED_SURV_MODIFY
FAN_MARK_FLUSH = ext.FAN_MARK_FLUSH
# NOT bitwise flags
FAN_MARK_INODE = ext.FAN_MARK_INODE
FAN_MARK_MOUNT = ext.FAN_MARK_MOUNT
FAN_MARK_FILESYSTEM = ext.FAN_MARK_FILESYSTEM # since Linux 4.20
FANOTIFY_METADATA_VERSION = ext.FANOTIFY_METADATA_VERSION
# Legit userspace responses to a _PERM event
FAN_ALLOW = ext.FAN_ALLOW
FAN_DENY = ext.FAN_DENY
FAN_AUDIT = ext.FAN_AUDIT
FAN_NOFD = ext.FAN_NOFD # No fd set in event
AT_FDCWD = ext.AT_FDCWD
_INIT_FLAGS = FAN_CLOEXEC | FAN_NONBLOCK | FAN_UNLIMITED_QUEUE | FAN_UNLIMITED_MARKS | FAN_CLASS_CONTENT
_INIT_FID_FLAGS = FAN_REPORT_FID | FAN_REPORT_DFID_NAME
_INIT_O_FLAGS = os.O_RDONLY | os.O_LARGEFILE | ext.O_CLOEXEC | os.O_NOATIME
_CMD_STOP = ext.CMD_STOP
_CMD_CONNECT = ext.CMD_CONNECT
_CMD_DISCONNECT = ext.CMD_DISCONNECT
_EVT_MASKS = {
FAN_ACCESS: 'access',
FAN_MODIFY: 'modify',
FAN_ATTRIB: 'attrib',
FAN_CLOSE_WRITE: 'close_write',
FAN_CLOSE_NOWRITE: 'close_nowrite',
FAN_OPEN: 'open',
FAN_MOVED_FROM: 'moved_from',
FAN_MOVED_TO: 'moved_to',
FAN_CREATE: 'create',
FAN_DELETE: 'delete',
FAN_DELETE_SELF: 'delete_self',
FAN_MOVE_SELF: 'move_self',
FAN_OPEN_EXEC: 'open_exec',
FAN_OPEN_PERM: 'open_perm',
FAN_ACCESS_PERM: 'access_perm',
FAN_OPEN_EXEC_PERM: 'open_exec_perm',
FAN_RENAME: 'rename',
FAN_ONDIR: 'ondir',
FAN_EVENT_ON_CHILD: 'on_child',
0: ''
}
[docs]class Fanotify(mp.Process):
"""
Wrapper for Linux fanotify. Runs in a new process.
"""
[docs] def __init__(self, init_fid: bool = False, log: logging.Logger = None,
fn: Callable = None, fn_args: Tuple = (), fn_timeout: int = 0):
"""
:param init_fid: Enable filesystem events to watch (FAN_CREATE, FAN_DELETE, FAN_MOVE, FAN_ATTRIB).
See **man fanotify_init** for FAN_REPORT_FID
and FAN_REPORT_DIR_FID
:param log: Logger
:param fn: Function that will be called after the specified `fn_timeout`
:param fn_args: Arguments for `fn`
:param fn_timeout: Timeout for `fn`
:raises OSError: if fanotify is not set in kernel or other fanotify
error (see man fanotify_init)
:raises TypeError: if `fn` is not callable or `fn_args` is not tuple
"""
super().__init__(name='Fanotify')
self._with_fid = init_fid
self._log = log
self._ctx = ext.create()
try:
ext.mark(self._ctx, FAN_MARK_ADD, FAN_MODIFY | FAN_CLOSE_WRITE | FAN_EVENT_ON_CHILD, AT_FDCWD, '')
except OSError as e:
if e.errno != errno.EBADF:
if e.errno == errno.ENOSYS:
self._exception('No fanotify!')
e.strerror += ': No fanotify!'
else:
self._exception('Fanotify init: %s', e)
raise
flags = _INIT_FLAGS
if init_fid:
flags &= ~FAN_CLASS_CONTENT
flags |= _INIT_FID_FLAGS
try:
self._fd = ext.init(self._ctx, flags, _INIT_O_FLAGS)
except OSError as e:
e.strerror = 'Fanotify init: %s' % e.strerror
self._exception(str(e))
raise
self._rd, self._wr = mp.Pipe(False)
self._pid = os.getpid()
self._is_ready = True
if fn and not callable(fn):
raise TypeError("'fn' is not callable")
self._fn = fn
if not isinstance(fn_args, tuple):
raise TypeError("'fn_args': expected 'tuple', got '%s' instead." % type(fn_args))
self._fn_args = fn_args
self._fn_timeout = int(fn_timeout)
@property
def with_fid(self) -> bool:
return self._with_fid
[docs] def start(self) -> None:
"""
Start Fanotify process
"""
if self._is_ready:
self._is_ready = False
super(Fanotify, self).start()
atexit.register(lambda x: (x.stop(), x.join()), self)
self._rd.close()
self._rd = None
def run(self) -> None:
self._debug('start')
try:
self._action()
except BaseException as e:
self._exception('Exception occured: %s', e)
finally:
self._close()
self._debug('finish')
[docs] def stop(self) -> None:
"""
Stop Fanotify process
"""
if self._wr:
wr = self._wr
self._wr = None
wr.send((_CMD_STOP,))
wr.close()
self.join()
if self._fd != FAN_NOFD:
os.close(self._fd)
self._fd = FAN_NOFD
[docs] def connect(self, rule: FanoRule) -> None:
"""
Add :class:`FanoRule` to receive events on it
"""
if not isinstance(rule, FanoRule):
raise TypeError('Got %s, FanoRule expected' % type(rule).__name__)
self._wr.send((_CMD_CONNECT, rule))
[docs] def disconnect(self, rule: FanoRule) -> None:
"""
Delete the :class:`FanoRule` so as not to receive events for it
"""
if not isinstance(rule, FanoRule):
raise TypeError('Got %s, FanoRule expected' % type(rule).__name__)
self._wr.send((_CMD_DISCONNECT, rule))
[docs] def mark(self, path: Union[str, Iterable], ev_types: int = FAN_ALL_EVENTS,
is_type: str = '', dont_follow: bool = False,
as_ignore: bool = False, remove: bool = False,
dirfd: int = AT_FDCWD) -> None:
"""
To detail see **man fanotify_mark**
Adds, removes, or modifies an fanotify mark on a
filesystem object. The caller must have read permission on the
filesystem object that is to be marked.
`ev_types` must be nonempty
:param path: path to be marked
:param ev_types: defines which events shall be listened for (or which
shall be ignored). It is a bit mask composed values. See man
:param is_type: type of `path`. It can be:
- ``'mp'`` - is mount point
- ``'fs'`` - is filesystem
- ``'dir'`` - is directory
:param dont_follow: if `path` is a symbolic link, mark the link itself,
rather than the file to which it refers.
:param as_ignore: if `True` add/remove to/from ignore mask.
:param remove: if `True`, events in `ev_types` will be removed from
the mark mask (or from ignore mask);
else events will be added to the mark mask (or to ignore mask).
:param dirfd: used with `path`. see man
"""
if ev_types & (FAN_OPEN_PERM | FAN_ACCESS_PERM | FAN_OPEN_EXEC_PERM):
msg = 'PERM events are not supported yet'
self._error(msg)
raise NotImplementedError(msg)
if isinstance(path, str):
flags = (remove and FAN_MARK_REMOVE) or FAN_MARK_ADD
if is_type == 'mp':
flags |= FAN_MARK_MOUNT
elif is_type == 'fs':
flags |= FAN_MARK_FILESYSTEM
elif is_type == 'dir':
flags |= FAN_MARK_ONLYDIR
if dont_follow:
flags |= FAN_MARK_DONT_FOLLOW
if as_ignore:
flags |= FAN_MARK_IGNORED_MASK | FAN_MARK_IGNORED_SURV_MODIFY
try:
ext.mark(self._ctx, flags, ev_types, dirfd, path)
# self._debug('%s is marked', path)
except OSError as e:
msg = 'mark(): %s' % e
if e.errno == errno.EBADF:
msg += ': fd=%s is not an fanotify descriptor' % self._fd
elif e.errno == errno.EINVAL:
if self.with_fid and (ev_types & (FAN_OPEN_PERM | FAN_ACCESS_PERM | FAN_OPEN_EXEC_PERM)):
msg += ': PERM events are not allowed with FID report'
elif ev_types & FAN_ALL_FID_EVENTS:
if not self.with_fid:
msg += ': Filesystem events required an fanotify FID group (init_fid=True when init)'
elif flags & FAN_MARK_MOUNT:
msg += ': Filesystem events not supported with mount point'
else:
msg += ': invalid evt_types or fd=%s is not an fanotify descriptor' % self._fd
self._error(msg)
elif isinstance(path, Iterable):
for p in path:
self.mark(p, ev_types, is_type, dont_follow)
[docs] def flush(self, do_non_mounts=True, do_mounts=True, do_fs=True) -> None:
"""
To detail see **man fanotify_mark** for FAN_MARK_FLUSH
Remove either all marks for filesystems, all marks for
mounts, or all marks for directories and files from the
fanotify group.
:param do_non_mounts: Remove all marks for directories and files
:param do_mounts: Remove all marks for mounts
:param do_fs: Remove all marks for filesystems (since Linux 4.20)
"""
try:
if do_non_mounts:
ext.mark(self._ctx, FAN_MARK_FLUSH, 0, AT_FDCWD)
if do_mounts:
ext.mark(self._ctx, FAN_MARK_FLUSH | FAN_MARK_MOUNT, 0, AT_FDCWD)
if do_fs and FAN_MARK_FILESYSTEM: # Linux 4.20 and above
ext.mark(self._ctx, FAN_MARK_FLUSH | FAN_MARK_FILESYSTEM, 0, AT_FDCWD)
except OSError as e:
msg = 'flush(): %s' % e
if e.errno == errno.EBADF:
msg += ': fd=%s is not an fanotify descriptor' % self._fd
self._exception(msg)
def _close(self) -> None:
self._rd.close()
self._rd = None
def _action(self) -> None:
self._wr.close()
self._wr = None
ext.run(self._ctx, self._rd, sys.stdout.fileno(), self._fn, self._fn_args, self._fn_timeout)
def _debug(self, *args, **kwargs) -> None:
if self._log:
self._log.debug(*args, **kwargs)
else:
self._do_log('DEBUG', *args, **kwargs)
def _info(self, *args, **kwargs) -> None:
if self._log:
self._log.info(*args, **kwargs)
else:
self._do_log('INFO', *args, **kwargs)
def _warning(self, *args, **kwargs) -> None:
if self._log:
self._log.warning(*args, **kwargs)
else:
self._do_log('WARNING', *args, **kwargs)
def _error(self, *args, **kwargs) -> None:
if self._log:
self._log.error(*args, **kwargs)
else:
self._do_log('ERROR', *args, **kwargs)
def _critical(self, *args, **kwargs) -> None:
if self._log:
self._log.critical(*args, **kwargs)
else:
self._do_log('CRITICAL', *args, **kwargs)
def _exception(self, *args, **kwargs) -> None:
if self._log:
self._log.exception(*args, **kwargs)
else:
self._do_log('EXCEPTION', *args, **kwargs)
def _do_log(self, lvl: str, msg: str, *args, **kwargs) -> None:
x = sys.stdout if (lvl in ('DEBUG', 'INFO')) else sys.stderr
x.write('%s %s: Fanotify: %s\n' % (datetime.datetime.now(), lvl, msg % args))
x.flush()
[docs]class FanotifyData(dict):
"""
Contains fanotify event information
"""
[docs] def __init__(self, fd: int = -1, pid: int = 0, ev_types: int = 0,
exe: str = None, cwd: str = None, path: Tuple[str] = ()):
"""
:param fd: File descriptor if passed
:param pid: PID of caused process
:param ev_types: Event types of fanotify event
:param exe: EXE of the event caused process if passed
:param cwd: CWD of the event caused process if passed
:param path: tuple of PATH of the event caused file if passed
"""
super().__init__(fd=fd, pid=pid, ev_types=ev_types, exe=exe, cwd=cwd, path=path)
def __getattr__(self, name: str) -> Any:
return self[name]
def __setattr__(self, name: str, value: Any) -> None:
self[name] = value
def __delattr__(self, name: str) -> None:
del self[name]
[docs]class FanotifyClient:
"""
Client for easy use and getting data via Fanotify.
"""
_PID_EVT_S = struct.Struct('=qQ')
_P_SZ_S = struct.Struct('=I')
[docs] def __init__(self, fanotify: Fanotify, **rkw) -> None:
"""
:param fanotify: :class:`Fanotify` object to associate with it.
:param dict rkw: Keyword arguments for :class:`FanoRule`, excluding
:py:attr:`FanoRule.name` - this will be auto-generated and stored
to :py:attr:`FanotifyClient.rname`
"""
self.fanotify = fanotify
self.rname = hashlib.sha256(rkw.pop('name', None) or os.urandom(32)).hexdigest().encode()
self.rule = FanoRule(name=self.rname, **rkw)
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.setblocking(False)
s.bind(b'\0' + self.rname)
self.sock = s
self.fanotify.connect(self.rule)
[docs] def close(self) -> None:
"""
Close the connection to the Fanotify object. The data will no
longer be received.
"""
self.fanotify.disconnect(self.rule)
self.sock.close()
[docs] def get_events(self) -> FanotifyData:
"""
Receive fanotify events according to the established rules
for the current client.
"""
while 1:
try:
data = self._recv_data()
except (socket.error, OSError):
break
if not data:
break
yield data
def _recv_data(self) -> Optional[FanotifyData]:
msg, anc, flags, addr = self.sock.recvmsg(8192, 4096, socket.MSG_DONTWAIT)
if not msg:
return
res = FanotifyData()
for level, ty, fd in anc:
if level == socket.SOL_SOCKET and ty == socket.SCM_RIGHTS:
fds = array.array('i')
fds.frombytes(fd)
res.fd = fds[0]
res.pid, res.ev_types = self._PID_EVT_S.unpack_from(msg, 0)
off = self._PID_EVT_S.size
for i in 'exe', 'cwd':
sz, = self._P_SZ_S.unpack_from(msg, off)
off += self._P_SZ_S.size
res[i] = msg[off:off + sz]
off += sz
p = []
while len(msg) - off:
sz, = self._P_SZ_S.unpack_from(msg, off)
off += self._P_SZ_S.size
p.append(msg[off:off + sz])
off += sz
res['path'] = tuple(p)
return res
def evt_to_str(evt: int):
return '|'.join(v for k, v in _EVT_MASKS.items() if k & evt)