# -*- coding: utf-8 -*-
'''
Watch files and translate the changes into salt events
:depends: - pyinotify Python module >= 0.9.5
:Caution: Using generic mask options like open, access, ignored, and
closed_nowrite with reactors can easily cause the reactor
to loop on itself. To mitigate this behavior, consider
setting the `disable_during_state_run` flag to `True` in
the beacon configuration.
'''
# Import Python libs
from __future__ import absolute_import
import types
import base64
import collections
import fnmatch
import os
import re
import yaml
import time
from salt.exceptions import CommandExecutionError
# Import salt libs
import salt.ext.six
import salt.loader
import salt.utils.platform
# Import third party libs
try:
import pyinotify
HAS_PYINOTIFY = True
DEFAULT_MASK = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY
MASKS = {}
for var in dir(pyinotify):
if var.startswith('IN_'):
key = var[3:].lower()
MASKS[key] = getattr(pyinotify, var)
except ImportError:
HAS_PYINOTIFY = False
DEFAULT_MASK = None
class pyinotify:
WatchManager = object
__virtualname__ = 'pulsar'
SPAM_TIME = 0 # track spammy status message times
TOP = None
TOP_STALENESS = 0
import logging
log = logging.getLogger(__name__)
def __virtual__():
if salt.utils.platform.is_windows():
return False, 'This module only works on Linux'
return True
def _get_mask(mask):
'''
Return the int that represents the mask
'''
return MASKS.get(mask, 0)
def _enqueue(revent):
'''
Enqueue the event
'''
__context__['pulsar.queue'].append(revent)
class ConfigManager(object):
_config = {}
_last_update = 0
@property
def config(self):
if self.stale():
self.update()
return self.nc_config
@property
def nc_config(self):
return self.__class__._config
@nc_config.setter
def nc_config(self, v):
self.__class__._config = v
@config.setter
def config(self, v):
return self.nc_config.update(v)
@property
def last_update(self):
return self.__class__._last_update
@last_update.setter
def last_update(self, v):
self.__class__._last_update = v
def freshness(self, freshness_limit=2):
t = time.time()
return (t - self.last_update <= freshness_limit)
def stale(self):
if (time.time() - self.last_update) >= self.nc_config.get('refresh_interval', 300):
return True
return False
def format_path(self, path):
path = os.path.abspath(path)
fname = os.path.basename(path)
cpath = self.path_of_config(path)
dname = path if os.path.isdir(path) else os.path.dirname(path)
return cpath, path, dname, fname
def path_config(self, path, falsifyable=False):
config = self.nc_config
if falsifyable and path not in config:
return False
c = collections.defaultdict(lambda: False)
c.update( config.get(path, {}) )
return c
def path_of_config(self, path):
ncc = self.nc_config
while len(path)>1 and path not in ncc:
path = os.path.dirname(path)
return path
def _abspathify(self):
c = self.nc_config
for k in tuple(c):
if k.startswith('/'):
l = os.path.abspath(k)
if k != l:
c[l] = c.pop(k)
def update(self):
config = self.nc_config
to_set = __opts__.get('pulsar', {})
if isinstance(config.get('paths'), (list,tuple)):
for path in config['paths']:
if 'salt://' in path:
path = __salt__['cp.cache_file'](path)
if path and os.path.isfile(path):
with open(path, 'r') as f:
to_set = _dict_update(to_set, yaml.safe_load(f),
recursive_update=True, merge_lists=True)
else:
log.error('Path {0} does not exist or is not a file'.format(path))
else:
log.error('Pulsar beacon \'paths\' data improperly formatted. Should be list of paths')
self.nc_config = to_set
self._abspathify()
if config.get('verbose'):
log.debug('Pulsar config updated')
self.last_update = time.time()
def __init__(self, configfile=None, verbose=False):
if configfile is not None:
if isinstance(configfile, (list,tuple)):
self.nc_config['paths'] = configfile
else:
self.nc_config['paths'] = [configfile]
else:
self.nc_config['paths'] = []
config = self.config
config['verbose'] = verbose
self._abspathify()
class PulsarWatchManager(pyinotify.WatchManager):
''' Subclass of pyinotify.WatchManager for the purposes:
* adding dict() based watch_db (for faster lookups)
* adding file watches (to notice changes to hardlinks outside the watched locations)
* adding various convenience functions
pyinotify.WatchManager tracks watches internally, but only for directories
and only in a list format. Such that many lookups take on a list-within-list
O(n^2) complexity (eg):
.. code-block:: python
for path in path_list:
wd = wm.get_wd(i) # search watch-list in an internal for loop
'''
def __init__(self, *a, **kw):
# because the salt loader periodically reloads everything,
# it becomes necessary to store the super class. Arguably, we
# could instead use pyinotify.WatchManager.__init__(self, ...)
# but super() lets us work with MRO later iff necessary
self.__super = super(PulsarWatchManager, self)
self.__super.__init__(*a, **kw)
self.watch_db = dict()
self.parent_db = dict()
self._last_config_update = 0
self.update_config()
@classmethod
def _iterate_anything(cls, x, discard_none=True):
''' iterate any amount of list/tuple nesting
'''
if isinstance(x, (types.GeneratorType,list,tuple,set,dict)):
# ∀ item ∈ x: listify(item)
for list_or_item in x:
for i in cls._listify_anything(list_or_item):
if i is None and discard_none:
continue
yield i # always a scalar
elif x is None and discard_none:
pass
else:
yield x # always a scalar
@classmethod
def _listify_anything(cls, x, discard_none=True):
''' _iterate_anything, then uniquify and force a list return; because,
pyinotify's __format_param, checks only isinstance(item,list)
'''
s = set( cls._iterate_anything(x, discard_none=discard_none) )
return list(s)
def _add_db(self, parent, items):
if parent and not items:
return
todo = {}
for i in items:
if items[i] > 0:
todo[i] = items[i]
self.watch_db.update(todo)
if parent in todo:
del todo[parent]
if todo:
if parent not in self.parent_db:
self.parent_db[parent] = set()
self.parent_db[parent].update(todo)
def _get_wdl(self, *pathlist):
''' inverse pathlist and return a flat list of wd's for the paths and their child paths
probably O( (N+M)^2 ); use sparingly
'''
super_list = self._listify_anything(pathlist,
[ x if isinstance(x,int) else self.parent_db.get(x) for x in self._iterate_anything(pathlist) ])
return self._listify_anything([ x if isinstance(x,int) else self.watch_db.get(x) for x in super_list ])
def _get_paths(self, *wdl):
wdl = self._listify_anything(wdl)
return self._listify_anything([ k for k,v in salt.ext.six.iteritems(self.watch_db) if v in wdl ])
def update_config(self):
''' (re)check the config files for inotify_limits:
* inotify_limits:update - whether we should try to manage fs.inotify.max_user_watches
* inotify_limits:highwater - the highest we should set MUW (default: 1000000)
* inotify_limits:increment - the amount we should increase MUW when applicable
* inotify_limits:initial - if given, and if MUW is initially lower at startup: set MUW to this
'''
if not hasattr(self, 'cm'):
self.cm = ConfigManager()
else:
self.cm.update()
config = self.cm.config.get('inotify_limits', {})
self.update_muw = config.get('update', False)
self.update_muw_highwater = config.get('highwater', 1000000)
self.update_muw_bump = config.get('increment', 1000)
initial = config.get('initial', 0)
if initial > 0:
muw = self.max_user_watches
if muw < initial:
self.max_user_watches = initial
@property
def max_user_watches(self):
''' getter/setter for fs.inotify.max_user_watches
'''
with open('/proc/sys/fs/inotify/max_user_watches', 'r') as fh:
l = fh.readline()
muw = int(l.strip())
return muw
@max_user_watches.setter
def max_user_watches(self,muwb):
log.splunk("Setting fs.inotify.max_user_watches={0}".format(muwb))
try:
with open('/proc/sys/fs/inotify/max_user_watches', 'w') as fh:
fh.write('{0}\n'.format(muwb))
except IOError as e:
log.error("Error updating sys.fs.inotify.max_user_watches: %s", e)
def _add_recursed_file_watch(self, path, mask=None, **kw):
if mask is None:
# don't simply set this as default above
# (it seems to get messed up by the namespace reload during grains refresh)
mask = pyinotify.IN_MODIFY
if os.path.isdir(path):
# this used to be if not os.path.isfile(); turns out socket files aren't isfile()s
raise Exception("use add_watch() or watch() for directories like path={0}".format(path))
if os.path.islink(path):
return {}
path = os.path.abspath(path)
up_path = kw.pop('parent', False)
if not up_path:
up_path = os.path.dirname(path)
while len(up_path) > 1 and up_path not in self.watch_db:
up_path = os.path.dirname(up_path)
if up_path and up_path in self.watch_db:
# we already did many of the lookups add_watch would do
# so we say no_db=True and manually add the (up_path,**res)
res = self.add_watch(path, mask, no_db=True)
self._add_db(up_path, res)
return res
else:
raise Exception("_add_recursed_file_watch('{0}') must be located in a watched directory".format(path))
def watch(self, path, mask=None, **kw):
''' Automatically select add_watch()/update_watch() and try to do the right thing.
Also add 'new_file' argument: add an IN_MODIFY watch for the named filepath and track it
'''
path = os.path.abspath(path)
new_file = kw.pop('new_file', False)
if not os.path.exists(path):
log.debug("watch({0}): NOENT (skipping)".format(path))
return
if mask is None:
mask = DEFAULT_MASK
pconf = self.cm.path_config(path)
wd = self.watch_db.get(path)
if wd:
update = False
if self.watches[wd].mask != mask:
update = True
if self.watches[wd].auto_add != kw.get('auto_add'):
update = True
if update:
kw['mask'] = mask
kw.pop('exclude_filter',None)
self.update_watch(wd,**kw)
log.debug('update-watch wd={0} path={1} watch_files={2} recurse={3}'.format(
wd, path, pconf['watch_files'], pconf['recurse']))
else:
if 'recurse' in kw:
kw['rec'] = kw.pop('recurse')
kw['rec'] = kw.get('rec')
if kw['rec'] is None:
kw['rec'] = pconf['recurse']
self.add_watch(path,mask,**kw)
log.debug('add-watch wd={0} path={1} watch_files={2} recurse={3}'.format(
self.watch_db.get(path), path, pconf['watch_files'], kw['rec']))
if new_file: # process() says this is a new file
self._add_recursed_file_watch(path)
else: # watch_files if configured to do so
if pconf['watch_files']:
rec = kw.get('rec')
excludes = kw.get('exclude_filter', lambda x: False)
if isinstance(excludes, (list,tuple)):
pfft = excludes
excludes = lambda x: x in pfft
file_track = self.parent_db.get(path, {})
pre_count = len(self.watch_db)
for wpath,wdirs,wfiles in os.walk(path):
if rec or wpath == path:
for f in wfiles:
wpathname = os.path.join(wpath,f)
if excludes(wpathname):
continue
if not os.path.isfile(wpathname):
continue
if wpathname in file_track: # checking file_track isn't strictly necessary
continue # but gives a slight speedup
self._add_recursed_file_watch( wpathname, parent=path )
ft_count = len(self.watch_db) - pre_count
if ft_count > 0:
log.debug('recursive file-watch totals for path={0} new-this-loop: {1}'.format(path, ft_count))
def add_watch(self, path, mask, **kw):
''' Curry of pyinotify.WatchManager.add_notify
* override - quiet = False
* automatic absolute path
* implicit retries
'''
no_db = kw.pop('no_db', False)
path = os.path.abspath(path)
res = {}
kw['quiet'] = False
retries = 5
while retries > 0:
retries -= 1
try:
_res = self.__super.add_watch(path, mask, **kw)
if isinstance(_res, dict):
res.update(_res)
break
else:
raise Exception("pyinotify.WatchManager.add_watch() failed to return a dict")
except pyinotify.WatchManagerError as wme:
self.update_config() # make sure we have the latest settings
if isinstance(wme.wmd, dict):
res.update(wme.wmd) # copy over what did work before it broke
if self.update_muw:
muw = self.max_user_watches
muwb = muw + self.update_muw_bump
if muwb <= self.update_muw_highwater:
self.max_user_watches = muwb
continue
else:
log.error("during add_watch({0}): max watches reached ({1}). consider "
"increasing the inotify_limits:highwater mark".format(path, muw))
break
else:
log.error("during add_watch({0}): max watches reached. "
"consider setting the inotify_limits:udpate".format(path))
break
except Exception as e:
log.error("exception during add_watch({0}): {1}".format(path, repr(e)))
break
if not no_db: # (heh)
self._add_db(path, res)
return res
def _prune_paths_to_stop_watching(self):
inverse_parent_db = {}
for k,v in salt.ext.six.iteritems(self.parent_db):
for i in v:
inverse_parent_db[i] = k
for dirpath in self.watch_db:
pc = self.cm.path_config(dirpath, falsifyable=True)
if pc is False:
if dirpath in self.parent_db:
# there's no config for this dir, but it had child watches at one point
# probably this is just nolonger configured
for item in self.parent_db[dirpath]:
yield item
yield dirpath
elif dirpath not in inverse_parent_db:
# this doesn't seem to be in parent_db or the reverse
# probably nolonger configured
yield dirpath
elif dirpath in self.parent_db:
for item in self.parent_db[dirpath]:
if os.path.isdir(item):
if not pc['recurse']:
# there's config for this dir, but it nolonger recurses
yield item
elif not pc['watch_files'] and not pc['watch_new_files']:
# there's config for this dir, but it nolonger watches files
yield item
def prune(self):
def _wd(l):
for item in l:
yield self.watch_db[item]
to_stop = self._prune_paths_to_stop_watching()
to_rm = self._listify_anything( _wd(to_stop) )
self.rm_watch(to_rm)
def _rm_db(self, wd):
plist = set( self._get_paths(wd) )
for dirpath in plist:
if dirpath in self.watch_db:
del self.watch_db[dirpath]
if dirpath in self.parent_db:
del self.parent_db[dirpath]
# in the interests of being really really thourough make sure none of
# the parent_db sets contain any of the removed dirpaths
# and then make sure there's no empty sets in the parent_db
to_fully_delete = set()
for d,s in salt.ext.six.iteritems(self.parent_db):
s -= plist
if not s:
to_fully_delete.add(d)
for item in to_fully_delete:
del self.parent_db[item]
def del_watch(self, wd):
''' remove a watch from the watchmanager database
'''
if not isinstance(wd, int):
wd = self._get_wdl(wd)[0]
self.__super.del_watch(wd)
self._rm_db(wd)
def rm_watch(self, *wd, **kw):
''' recursively unwatch things
'''
wdl = self._get_wdl(wd)
res = self.__super.rm_watch(wdl, **kw)
self._rm_db(wdl)
return res
def _get_notifier():
'''
Check the context for the notifier and construct it if not present
'''
if 'pulsar.notifier' not in __context__:
__context__['pulsar.queue'] = collections.deque()
log.info("creating new watch manager")
wm = PulsarWatchManager()
__context__['pulsar.notifier'] = pyinotify.Notifier(wm, _enqueue)
return __context__['pulsar.notifier']
def _preprocess_excludes(excludes):
'''
Wrap excludes in simple decision curry functions.
'''
# silently discard non-list excludes
if not isinstance(excludes, (list,tuple)) or not excludes:
return lambda x: False
# wrap whatever in a decision problem
def re_wrapper(robj):
# log.debug('wrapping re {0}'.format(robj.pattern))
def _wrapped(val):
return bool(robj.search(val))
return _wrapped
def fn_wrapper(rpat):
# log.debug('wrapping fnmatch {0}'.format(rpat))
def _wrapped(val):
return bool(fnmatch.fnmatch(rpat, val))
return _wrapped
def str_wrapper(rstr):
# log.debug('wrapping strmatch {0}'.format(rstr))
def _wrapped(val):
return bool( val.startswith(rstr) )
return _wrapped
# figure out what to wrap things with
the_list = []
for e in excludes:
if isinstance(e,dict):
if e.values()[0].get('regex'):
r = e.keys()[0]
try:
c = re.compile(r)
the_list.append(re_wrapper(c))
except:
log.warn('Failed to compile regex: {0}'.format(r))
continue
else:
e = e.keys()[0]
if '*' in e:
the_list.append(fn_wrapper(e))
else:
the_list.append(str_wrapper(e))
# finally, wrap the whole decision set in a decision wrapper
def _final(val):
for i in the_list:
if i( val ):
return True
return False
return _final
class delta_t(object):
def __init__(self):
self.marks = {}
self.fins = {}
self.mark('top')
def __repr__(self):
return "delta_t({0})".format(self)
def __str__(self):
ret = ["delta_t={0:0.2f}".format(self.get())]
for i in sorted(self.marks):
if i in ('top',):
continue
ret.append("{0}={1:0.2f}".format(i, self.get(i)))
return '; '.join(ret)
def fin(self,name=None):
if name is None:
name = self.last_mark
if name == 'top':
return # top doesn't finish
self.fins[name] = time.time()
def get(self,name=None):
if name is None:
name = 'top'
begin = self.marks[name]
end = self.fins.get(name, time.time())
return end - begin
def mark(self,name):
self.last_mark = name
self.marks[name] = time.time()
[docs]def process(configfile='salt://hubblestack_pulsar/hubblestack_pulsar_config.yaml',
verbose=False):
'''
Watch the configured files
Example pillar config
.. code-block:: yaml
beacons:
pulsar:
paths:
- /var/cache/salt/minion/files/base/hubblestack_pulsar/hubblestack_pulsar_config.yaml
refresh_interval: 300
verbose: False
Example yaml config on fileserver (targeted by pillar)
.. code-block:: yaml
/path/to/file/or/dir:
mask:
- open
- create
- close_write
recurse: True
auto_add: True
exclude:
- /path/to/file/or/dir/exclude1
- /path/to/file/or/dir/exclude2
- /path/to/file/or/dir/regex[\d]*$:
regex: True
return:
splunk:
batch: True
slack:
batch: False # overrides the global setting
checksum: sha256
stats: True
batch: True
contents_size: 20480
checksum_size: 104857600
Note that if `batch: True`, the configured returner must support receiving
a list of events, rather than single one-off events.
The mask list can contain the following events (the default mask is create,
delete, and modify):
* access - File accessed
* attrib - File metadata changed
* close_nowrite - Unwritable file closed
* close_write - Writable file closed
* create - File created in watched directory
* delete - File deleted from watched directory
* delete_self - Watched file or directory deleted
* modify - File modified
* moved_from - File moved out of watched directory
* moved_to - File moved into watched directory
* move_self - Watched file moved
* open - File opened
The mask can also contain the following options:
* dont_follow - Don't dereference symbolic links
* excl_unlink - Omit events for children after they have been unlinked
* oneshot - Remove watch after one event
* onlydir - Operate only if name is directory
recurse:
Recursively watch directories under the named directory
auto_add:
Python inotify option, meaning: automatically start watching new
directories that are created in a watched directory
watch_new_files:
when a new file is created in a watched dir, add a watch on the file
(implied by watch_files below)
watch_files:
add explicit watches on all files (except excluded) under the named directory
exclude:
Exclude directories or files from triggering events in the watched directory.
Can use regex if regex is set to True
contents:
Retrieve the contents of changed files based on checksums (which must be enabled)
If pillar/grains/minion config key `hubblestack:pulsar:maintenance` is set to
True, then changes will be discarded.
'''
dt = delta_t()
dt.mark('read_config')
if not HAS_PYINOTIFY:
log.debug('Not running beacon pulsar. No python-inotify installed.')
return []
cm = ConfigManager(configfile=configfile, verbose=verbose)
config = cm.config
if config.get('verbose'):
log.debug('Pulsar beacon called.')
log.debug('Pulsar beacon config from pillar:\n{0}'.format(config))
ret = []
notifier = _get_notifier()
wm = notifier._watch_manager
update_watches = cm.freshness(2)
initial_count = len(wm.watch_db)
recent = set()
dt.fin()
# Read in existing events
if notifier.check_events(1):
dt.mark('check_events')
notifier.read_events()
notifier.process_events()
queue = __context__['pulsar.queue']
if config.get('verbose'):
log.debug('Pulsar found {0} inotify events.'.format(len(queue)))
while queue:
event = queue.popleft()
if event.maskname == 'IN_Q_OVERFLOW':
log.warn('Your inotify queue is overflowing.')
log.warn('Fix by increasing /proc/sys/fs/inotify/max_queued_events')
continue
log.debug("queue {0}".format(event)) # shows mask/name/pathname/wd and other things
k = "{0.pathname} {0.maskname}".format(event)
if k in recent:
log.debug("skipping event")
continue
recent.add(k)
pathname = event.pathname
cpath, abspath, dirname, basename = cm.format_path(pathname)
# cpath : the path under which the config is specified
# abspath : os.path.abspath() reformatted path
# dirname : the directory of the pathname, or the pathname if
# : it's a directory
# basename : the os.path.basename() of the path
# wpath = event.path : the path of the watch that triggered (not actually populated
# : in wpath)
excludes = _preprocess_excludes( config[cpath].get('exclude') )
_append = not excludes(pathname)
if _append:
config_path = config['paths'][0]
pulsar_config = config_path[config_path.rfind('/') + 1:len(config_path)]
sub = { 'change': event.maskname,
'path': abspath, # goes to object_path in splunk
'tag': dirname, # goes to file_path in splunk
'name': basename, # goes to file_name in splunk
'pulsar_config': pulsar_config}
if config.get('checksum', False) and os.path.isfile(pathname):
if 'pulsar_checksums' not in __context__:
__context__['pulsar_checksums'] = {}
# Don't checksum any file over 100MB
if os.path.getsize(pathname) < config.get('checksum_size', 104857600):
sum_type = config['checksum']
if not isinstance(sum_type, salt.ext.six.string_types):
sum_type = 'sha256'
old_checksum = __context__['pulsar_checksums'].get(pathname)
new_checksum = __salt__['file.get_hash'](pathname, sum_type)
__context__['pulsar_checksums'][pathname] = new_checksum
sub['checksum'] = __context__['pulsar_checksums'][pathname]
sub['checksum_type'] = sum_type
# File contents? Don't fetch contents for any file over
# 20KB or where the checksum is unchanged
if (pathname in config[cpath].get('contents', []) or
os.path.dirname(pathname) in config[cpath].get('contents', [])) \
and os.path.getsize(pathname) < config.get('contents_size', 20480) \
and old_checksum != new_checksum:
try:
with open(pathname, 'r') as f:
sub['contents'] = base64.b64encode(f.read())
except Exception as e:
log.debug('Could not get file contents for {0}: {1}'
.format(pathname, e))
if cm.config.get('stats', False):
if os.path.exists(pathname):
sub['stats'] = __salt__['file.stats'](pathname)
else:
sub['stats'] = {}
if os.path.isfile(pathname):
sub['size'] = os.path.getsize(pathname)
ret.append(sub)
if not event.mask & pyinotify.IN_ISDIR:
if event.mask & pyinotify.IN_CREATE:
watch_this = config[cpath].get('watch_new_files', False) \
or config[cpath].get('watch_files', False)
if watch_this:
if not excludes(pathname):
log.debug("add file-watch path={0}".format(pathname))
wm.watch(pathname, pyinotify.IN_MODIFY, new_file=True)
elif event.mask & pyinotify.IN_DELETE:
wm.rm_watch(pathname)
else:
log.debug('Excluding {0} from event for {1}'.format(pathname, cpath))
dt.fin()
if update_watches:
dt.mark('update_watches')
log.debug("update watches")
# Update existing watches and add new ones
# TODO: make the config handle more options
for path in config:
excludes = lambda x: False
if path in ['return', 'checksum', 'stats', 'batch', 'verbose',
'paths', 'refresh_interval', 'contents_size',
'checksum_size']:
continue
if isinstance(config[path], dict):
mask = config[path].get('mask', DEFAULT_MASK)
watch_files = config[path].get('watch_files', DEFAULT_MASK)
# if watch_files:
# # we're going to get dup modify events if watch_files is set
# # and we still monitor modify for the dir
# a = mask & pyinotify.IN_MODIFY
# if a:
# log.debug("mask={0} -= mask & pyinotify.IN_MODIFY={1} ==> {2}".format(
# mask, a, mask-a))
# mask -= mask & pyinotify.IN_MODIFY
excludes = _preprocess_excludes( config[path].get('exclude') )
if isinstance(mask, list):
r_mask = 0
for sub in mask:
r_mask |= _get_mask(sub)
elif isinstance(mask, salt.ext.six.binary_type):
r_mask = _get_mask(mask)
else:
r_mask = mask
mask = r_mask
rec = config[path].get('recurse', False)
auto_add = config[path].get('auto_add', False)
else:
mask = DEFAULT_MASK
rec = False
auto_add = False
wm.watch(path, mask, rec=rec, auto_add=auto_add, exclude_filter=excludes)
dt.fin()
dt.mark('prune_watches')
wm.prune()
dt.fin()
if __salt__['config.get']('hubblestack:pulsar:maintenance', False):
# We're in maintenance mode, throw away findings
ret = []
global SPAM_TIME
now_t = time.time()
spam_dt = now_t - SPAM_TIME
current_count = len(wm.watch_db)
delta_c = current_count - initial_count
if dt.get() >= 0.1 or abs(delta_c)>0 or spam_dt >= 60:
SPAM_TIME = now_t
log.info("process() sweep {0}; watch count: {1} (delta: {2})".format(dt, current_count, delta_c))
if 'DUMP_WATCH_DB' in os.environ:
import json
f = os.path.basename(os.environ['DUMP_WATCH_DB'])
if f.lower() in ('1', 'true', 'yes'):
f = 'pulsar-watch.db'
f = '/tmp/{}'.format(f)
with open(f, 'w') as fh:
json.dump(wm.watch_db, fh)
log.debug("wrote watch_db to {}".format(f))
return ret
def canary(change_file=None):
'''
Simple module to change a file to trigger a FIM event (daily, etc)
THE SPECIFIED FILE WILL BE CREATED AND DELETED
Defaults to CONF_DIR/fim_canary.tmp, i.e. /etc/hubble/fim_canary.tmp
'''
if change_file is None:
conf_dir = os.path.dirname(__opts__['conf_file'])
change_file = os.path.join(conf_dir, 'fim_canary.tmp')
__salt__['file.touch'](change_file)
__salt__['file.remove'](change_file)
def _dict_update(dest, upd, recursive_update=True, merge_lists=False):
'''
Recursive version of the default dict.update
Merges upd recursively into dest
If recursive_update=False, will use the classic dict.update, or fall back
on a manual merge (helpful for non-dict types like FunctionWrapper)
If merge_lists=True, will aggregate list object types instead of replace.
This behavior is only activated when recursive_update=True. By default
merge_lists=False.
'''
if (not isinstance(dest, collections.Mapping)) \
or (not isinstance(upd, collections.Mapping)):
raise TypeError('Cannot update using non-dict types in dictupdate.update()')
updkeys = list(upd.keys())
if not set(list(dest.keys())) & set(updkeys):
recursive_update = False
if recursive_update:
for key in updkeys:
val = upd[key]
try:
dest_subkey = dest.get(key, None)
except AttributeError:
dest_subkey = None
if isinstance(dest_subkey, collections.Mapping) \
and isinstance(val, collections.Mapping):
ret = _dict_update(dest_subkey, val, merge_lists=merge_lists)
dest[key] = ret
elif isinstance(dest_subkey, list) \
and isinstance(val, list):
if merge_lists:
dest[key] = dest.get(key, []) + val
else:
dest[key] = upd[key]
else:
dest[key] = upd[key]
return dest
else:
try:
for k in upd.keys():
dest[k] = upd[k]
except AttributeError:
# this mapping is not a dict
for k in upd:
dest[k] = upd[k]
return dest
[docs]def top(topfile='salt://hubblestack_pulsar/top.pulsar',
verbose=False):
'''
Execute pulsar using a top.pulsar file to decide which configs to use for
this host.
The topfile should be formatted like this:
.. code-block:: yaml
pulsar:
'<salt compound match identifying host(s)>':
- list.of.paths
- using.dots.as.directory.separators
Paths in the topfile should be relative to `salt://hubblestack_pulsar`, and
the .yaml should not be included.
'''
configs = get_top_data(topfile)
configs = ['salt://hubblestack_pulsar/' + config.replace('.', '/') + '.yaml'
for config in configs]
return process(configs, verbose=verbose)
def get_top_data(topfile):
'''
Cache the topfile and process the list of configs this host should use.
'''
# Get topdata from filesystem if we don't have them already
global TOP
global TOP_STALENESS
if TOP and TOP_STALENESS < 60:
TOP_STALENESS += 1
topdata = TOP
else:
log.debug('Missing/stale cached topdata found for pulsar, retrieving fresh from fileserver.')
topfile = __salt__['cp.cache_file'](topfile)
try:
with open(topfile) as handle:
topdata = yaml.safe_load(handle)
except Exception as e:
raise CommandExecutionError('Could not load topfile: {0}'.format(e))
if not isinstance(topdata, dict) or 'pulsar' not in topdata or \
not(isinstance(topdata['pulsar'], dict)):
raise CommandExecutionError('Pulsar topfile not formatted correctly')
topdata = topdata['pulsar']
TOP = topdata
TOP_STALENESS = 0
ret = []
for match, data in topdata.iteritems():
if __salt__['match.compound'](match):
ret.extend(data)
return ret