Project

General

Profile

zephirservice.py

fichier patché pour 2.6.2 - Joël Cuissinat, 09/08/2022 09:56 AM

Download (22.2 KB)

 
1
# -*- coding: UTF-8 -*-
2
###########################################################################
3
# Eole NG - 2007
4
# Copyright Pole de Competence Eole  (Ministere Education - Academie Dijon)
5
# Licence CeCill  cf /root/LicenceEole.txt
6
# eole@ac-dijon.fr
7
###########################################################################
8

    
9
"""
10
Services Twisted de collection et de publication de données.
11
"""
12

    
13
import locale, gettext, os, pwd, shutil, random
14
from glob import glob
15
import cjson
16
import traceback
17

    
18
# install locales early
19
from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR
20
APP = 'zephir-agents'
21
DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n')
22
gettext.install(APP, DIR, unicode=False)
23

    
24

    
25
from twisted.application import internet, service
26
from twisted.internet import utils, reactor
27
from twisted.web import resource, server, static, util, xmlrpc
28
from twisted.python import syslog
29

    
30
from zephir.monitor.agentmanager import config as cfg
31
from zephir.monitor.agentmanager.util import ensure_dirs, md5file, get_md5files, log
32
from zephir.monitor.agentmanager.web_resources import ZephirServerResource
33
from zephir.monitor.agentmanager.clientmanager import ClientManager
34

    
35
try:
36
    import zephir.zephir_conf.zephir_conf as conf_zeph
37
    from zephir.lib_zephir import zephir_proxy, convert, zephir_dir, update_sudoers, charset
38
    from zephir.lib_zephir import log as zeph_log
39
    registered = 1
40
except:
41
    # serveur non enregistré sur zephir
42
    registered = 0
43

    
44
class ZephirService(service.MultiService):
45
    """Main Twisted service for Zephir apps"""
46

    
47
    def __init__(self, config, root_resource=None, serve_static=False):
48
        """config will be completed by default values"""
49
        service.MultiService.__init__(self)
50
        self.config = cfg.DEFAULT_CONFIG.copy()
51
        self.config.update(config)
52
        self.updater = self.publisher = None
53
        # mise à jour des scripts clients dans sudoers
54
        if registered:
55
            update_sudoers()
56
        # parent web server
57
        if root_resource is None:
58
            self.root_resource = resource.Resource()
59
            webserver = internet.TCPServer(self.config['webserver_port'],
60
                                                server.Site(self.root_resource))
61
            webserver.setServiceParent(service.IServiceCollection(self))
62
        else:
63
            self.root_resource = root_resource
64
        # serve global static files
65
        if serve_static:
66
            self.root_resource.putChild('static',
67
                                        static.File(self.config['static_web_dir']))
68

    
69

    
70
    # subservices factory methods
71

    
72
    def with_updater(self):
73
        assert self.updater is None
74
        self.updater = UpdaterService(self.config, self, self.root_resource)
75
        return self
76

    
77
    def with_publisher(self):
78
        assert self.publisher is None
79
        self.publisher = PublisherService(self.config, self, self.root_resource)
80
        return self
81

    
82
    def with_updater_and_publisher(self):
83
        assert self.updater is None
84
        assert self.publisher is None
85
        self.updater = UpdaterService(self.config, self, self.root_resource)
86
        self.publisher = PublisherService(self.config, self, self.root_resource,
87
                                          show_clients_page = False,
88
                                          live_agents={self.config['host_ref']: self.updater.agents})
89
        return self
90

    
91

    
92

    
93

    
94
class UpdaterService(service.MultiService, xmlrpc.XMLRPC):
95
    """Schedules measures, data serialisation and upload."""
96

    
97
    def __init__(self, config, parent, root_resource):
98
        """config should be complete"""
99
        service.MultiService.__init__(self)
100
        xmlrpc.XMLRPC.__init__(self)
101
        self.old_obs = None
102
        self.config = config
103
        # updates site.cfg file
104
        self.update_static_data()
105
        # start subservices
106
        loc, enc = locale.getdefaultlocale()
107
        log.msg(_('default locale: %s encoding: %s') % (loc, enc))
108
        if enc == 'utf':
109
            log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc)
110
        self.agents = self.load_agents()
111
        # attach to parent service
112
        self.setServiceParent(service.IServiceCollection(parent))
113
        root_resource.putChild('xmlrpc', self)
114

    
115
    def startService(self):
116
        """initialize zephir services"""
117
        service.MultiService.startService(self)
118
        reactor.callLater(2,self.schedule_all)
119
        # mise à jour du préfixe de log (twisted par défaut)
120
        # FIX : on conserve la référence à l'ancien observer pour
121
        # éviter les pb à la fermeture du service
122
        self.old_obs = log.theLogPublisher.observers[0]
123
        try:
124
            from zephir.backend import config as conf_zeph
125
            log_prefix = 'zephir_backend'
126
        except:
127
            log_prefix = 'zephiragents'
128
        new_obs = syslog.SyslogObserver(log_prefix, options=syslog.DEFAULT_OPTIONS, facility=syslog.DEFAULT_FACILITY)
129
        log.addObserver(new_obs.emit)
130
        log.removeObserver(self.old_obs)
131
        if registered != 0:
132
            # on est enregistré sur zephir => initiation de
133
            # la création et l'envoi d'archives
134
            self.setup_uucp()
135
            # dans le cas ou un reboot a été demandé, on indique que le redémarrage est bon
136
            if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')):
137
                try:
138
                    zeph_log('REBOOT',0,'redémarrage du serveur terminé')
139
                    os.unlink(os.path.join(zephir_dir,'reboot.lck'))
140
                except:
141
                    pass
142

    
143
    def stopService(self):
144
        """stops zephir services"""
145
        if self.old_obs:
146
            log.removeObserver(log.theLogPublisher.observers[0])
147
            log.addObserver(self.old_obs)
148
        service.MultiService.stopService(self)
149

    
150
    def load_agents(self):
151
        """Charge tous les agents du répertoire de configurations."""
152
        log.msg(_("Loading agents from %s...") % self.config['config_dir'])
153
        loaded_agents = {}
154
        list_agents = glob(os.path.join(self.config['config_dir'], "*.agent"))
155
        for f in list_agents:
156
            log.msg(_("  from %s:") % os.path.basename(f))
157
            h = { 'AGENTS': None }
158
            execfile(f, globals(), h)
159
            assert h.has_key('AGENTS')
160
            for a in h['AGENTS']:
161
                assert not loaded_agents.has_key(a.name)
162
                # init agent data and do a first archive
163
                a.init_data(os.path.join(self.config['state_dir'],
164
                                         self.config['host_ref'],
165
                                         a.name))
166
                a.manager = self
167
                a.archive()
168
                loaded_agents[a.name] = a # /!\ écrasement des clés
169
                log.msg(_("    %s, period %d") % (a.name, a.period))
170
        log.msg(_("Loaded."))
171
        return loaded_agents
172

    
173

    
174
    # scheduling measures
175

    
176
    def schedule(self, agent_name):
177
        """Planifie les mesures périodiques d'un agent."""
178
        assert self.agents.has_key(agent_name)
179
        if self.agents[agent_name].period > 0:
180
            timer = internet.TimerService(self.agents[agent_name].period,
181
                                          self.wakeup_for_measure, agent_name)
182
            timer.setName(agent_name)
183
            timer.setServiceParent(service.IServiceCollection(self))
184

    
185

    
186
    def wakeup_for_measure(self, agent_name):
187
        """Callback pour les mesures planifiées."""
188
        assert self.agents.has_key(agent_name)
189
        # log.debug("Doing scheduled measure on " + agent_name)
190
        self.agents[agent_name].scheduled_measure()
191

    
192

    
193
    def schedule_all(self):
194
        """Planifie tous les agents chargés.
195
        Démarre le cycle de mesures périodiques de chaque agent
196
        chargé. La première mesure est prise immédiatement.
197
        """
198
        for agent_name in self.agents.keys():
199
            # charge les actions disponibles (standard en premier, puis les actions locales)
200
            # les actions locales écrasent les actions standard si les 2 existent
201
            for action_dir in (os.path.join(self.config['action_dir'],'eole'), self.config['action_dir']):
202
                f_actions = os.path.join(action_dir, "%s.actions" % agent_name)
203
                if os.path.isfile(f_actions):
204
                    actions = {}
205
                    execfile(f_actions, globals(), actions)
206
                    for item in actions.keys():
207
                        if item.startswith('action_'):
208
                            setattr(self.agents[agent_name], item, actions[item])
209
            # self.wakeup_for_measure(agent_name) # first measure at launch
210
            self.schedule(agent_name)
211

    
212

    
213
    def timer_for_agent_named(self, agent_name):
214
        assert self.agents.has_key(agent_name)
215
        return self.getServiceNamed(agent_name)
216

    
217

    
218
    # data upload to zephir server
219

    
220
    def setup_uucp(self):
221
        ensure_dirs(self.config['uucp_dir'])
222
        self.update_static_data()
223
        # récupération du délai de connexion à zephir
224
        try:
225
            reload(conf_zeph)
226
            # supression des éventuels répertoires de stats invalides
227
            # par ex, en cas de désinscription zephir 'manuelle'.
228

    
229
            # sur zephir : on garde toujours 0 pour éviter les conflits avec les serveurs enregistrés
230
            if not os.path.isdir('/var/lib/zephir'):
231
                for st_dir in os.listdir(self.config['state_dir']):
232
                    if st_dir != str(conf_zeph.id_serveur):
233
                        shutil.rmtree(os.path.join(self.config['state_dir'],st_dir))
234
            # vérification sur zephir du délai de connexion
235
            period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1])
236
        except:
237
            period = 0
238

    
239
        if period < 30:
240
            period = self.config['upload_period']
241
            log.msg(_('Using default period : %s seconds') % period)
242
        # on ajoute un décalage aléatoire (entre 30 secondes et period) au premier démarrage
243
        # (pour éviter trop de connexions simultanées si le service est relancé par crontab)
244
        delay = random.randrange(30,period)
245
        reactor.callLater(delay,self.wakeup_for_upload)
246

    
247
    def update_static_data(self):
248
        original = os.path.join(self.config['config_dir'], 'site.cfg')
249
        if os.path.isfile(original):
250
            destination = cfg.client_data_dir(self.config, self.config['host_ref'])
251
            ensure_dirs(destination)
252
            need_copy = False
253
            try:
254
                org_mtime = os.path.getmtime(original)
255
                dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg'))
256
            except OSError:
257
                need_copy = True
258
            if need_copy or (org_mtime > dest_mtime):
259
                shutil.copy(original, destination)
260

    
261
    def wakeup_for_upload(self, recall=True):
262
        # relecture du délai de connexion sur zephir
263
        try:
264
            reload(conf_zeph)
265
            period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1])
266
        except:
267
            period = 0
268
        # on relance la fonction dans le délai demandé
269
        if period < 30:
270
            period = self.config['upload_period']
271
            log.msg(_('Using default period : %s seconds') % period)
272
        # on ajoute un décalage au premier démarrage
273
        # (pour éviter trop de connexions simultanées si le service est relancé par crontab)
274
        if recall:
275
            reactor.callLater(period,self.wakeup_for_upload)
276

    
277
        # virer l'ancienne archive du rép. uucp
278
        for agent in self.agents.values():
279
            agent.archive()
280
            # agent.reset_max_status()
281
        self.update_static_data()
282
        # archiver dans rép. uucp, donner les droits en lecture sur l'archive
283
        try:
284
            assert conf_zeph.id_serveur != 0
285
            client_dir = os.path.join(self.config['tmp_data_dir'],str(conf_zeph.id_serveur))
286
        except:
287
            client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref'])
288
        try:
289
            # purge du répertoire temporaire
290
            if os.path.isdir(client_dir):
291
                shutil.rmtree(client_dir)
292
            os.makedirs(client_dir)
293
        except: # non existant
294
            pass
295
        args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))]
296
        ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list'))
297
        if os.path.exists(ignore_file):
298
            args.append(ignore_file)
299
        # on ne copie que les données des agents instanciés
300
        # cela évite de remonter par exemple les stats rvp si le service a été désactivé
301
        for agent_name in self.agents.keys():
302
            args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name)))
303
        args.append(os.path.abspath(client_dir))
304
        res = utils.getProcessOutput('/bin/cp', args = args)
305
        res.addCallbacks(self._make_archive,
306
                         lambda x: log.msg(_("/!\ copy failed (%s)\n"
307
                                             "data: %s")
308
                                           % (x, self.config['state_dir'])))
309

    
310
    def _check_md5(self):
311
        def to_bytes(objet):
312
            """Transforme les objets unicode contenus dans un objet en bytes
313
            """
314
            if isinstance(objet, tuple):
315
                l = []
316
                for item in objet:
317
                    l.append(to_bytes(item))
318
                return '({})'.format(', '.join(l))
319
            if isinstance(objet, list):
320
                l = []
321
                for item in objet:
322
                    l.append(to_bytes(item))
323
                return '[{}]'.format(', '.join(l))
324
            if isinstance(objet, dict):
325
                dico={}
326
                for cle in objet:
327
                    dico[to_bytes(cle)] = to_bytes(objet[cle])
328
                return '{{{}}}'.format(', '.join(['{}: {}'.format(el[0], el[1]) for el in dico.items()]))
329
            if isinstance(objet, unicode):
330
                print(type(objet), objet)
331
                string =  objet.encode(charset)
332
                return "'{}'".format(string)
333
            if isinstance(objet, int):
334
                return str(objet)
335
            if isinstance(objet, float):
336
                return str(objet)
337
            if objet == None:
338
                return 'None'
339
            return objet
340

    
341
        # calcul de sommes md5 pour config.eol et les patchs
342
        rep_src = "/usr/share/eole/creole"
343
        rep_conf = "/etc/eole"
344
        data = []
345
        try:
346
            for src, dst, pattern in get_md5files(cfg.distrib_version):
347
                if src == 'variables.eol':
348
                    # cas particulier : variables.eol, on génère le fichier à chaque fois
349
                    orig_eol = os.path.join(rep_conf, 'config.eol')
350
                    if os.path.isfile(orig_eol):
351
                        var_eol = os.path.join(rep_src, 'variables.eol')
352
                        # on crée un fichier avec variable:valeur ordonné par nom de variable
353
                        conf = cjson.decode(file(orig_eol).read(), all_unicode=True)
354
                        var_names = conf.keys()
355
                        var_names.sort()
356
                        f_var = file(var_eol, 'w')
357
                        with open(var_eol, 'w') as f_var:
358
                            for key, value in sorted(conf.items()):
359
                                if key not in ['mode_zephir', '___version___'] and isinstance(value, dict) and 'val' in value:
360
                                    if type(value['val']) in [list, dict, tuple]:
361
                                        converted_value = to_bytes(value['val'])
362
                                    else:
363
                                        converted_value = value['val']
364
                                    f_var.write('{}:{}\n'.format(key, converted_value))
365
                if os.path.isdir(os.path.join(rep_src,src)):
366
                    fics = os.listdir(os.path.join(rep_src,src))
367
                    fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics]
368
                else:
369
                    fics = [(src,dst)]
370
                for fic, fic_dst in fics:
371
                    if os.path.isfile(os.path.join(rep_src,fic)):
372
                        if (pattern is None) or fic.endswith(pattern):
373
                            md5res = md5file(os.path.join(rep_src,fic))
374
                            data.append("%s  %s\n" % (md5res, fic_dst))
375
        except:
376
            # on n'empêche pas de continuer les opérations si le calcul du md5 n'est pas bon
377
            log.msg('!! Erreur rencontrée lors du calcul du md5 de config.eol !!')
378
            traceback.print_exc()
379
        try:
380
            assert conf_zeph.id_serveur != 0
381
            outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % str(conf_zeph.id_serveur)), "w")
382
        except:
383
            outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w")
384
        outf.writelines(data)
385
        outf.close()
386

    
387
    def _get_packages(self, *args):
388
        """génère une liste des paquets installés
389
        """
390
        try:
391
            assert conf_zeph.id_serveur != 0
392
            cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % str(conf_zeph.id_serveur)))
393
        except:
394
            cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % self.config['host_ref']))
395
        os.system(cmd_pkg)
396

    
397
    def _make_archive(self,*args):
398
        self._check_md5()
399
        self._get_packages()
400
        # compression des données à envoyer
401
        try:
402
            assert conf_zeph.id_serveur != 0
403
            tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % str(conf_zeph.id_serveur))
404
        except:
405
            tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % self.config['host_ref'])
406
        tar_cwd = os.path.dirname(os.path.abspath(self.config['tmp_data_dir']))
407
        tar_dir = os.path.basename(os.path.abspath(self.config['tmp_data_dir']))
408
        res = utils.getProcessOutput('/bin/tar',
409
                                     args = ('czf', tarball,
410
                                             '--exclude', 'private',
411
                                             '-C', tar_cwd,
412
                                             tar_dir))
413
        res.addCallbacks(self._try_chown,
414
                         lambda x: log.msg(_("/!\ archiving failed (%s)\n"
415
                                             "data: %s\narchive: %s")
416
                                           % (str(x), self.config['state_dir'], tarball)),
417
                         callbackArgs = [tarball])
418

    
419
    def _try_chown(self, tar_output, tarball):
420
        try:
421
            uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
422
            uid = os.getuid()
423
            os.chown(tarball, uucp_uid, uucp_gid) # only change group id so that uucp can read while we can still write
424
        except OSError, e:
425
            log.msg("/!\ chown error, check authorizations (%s)" % e)
426
        # upload uucp
427
        # on fait également un chown sur le fichier deffered_logs au cas ou il serait en root
428
        try:
429
            uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
430
            os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid)
431
        except:
432
            log.msg("/!\ chown error on deffered_logs")
433
        os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
434

    
435

    
436
    # xmlrpc methods
437

    
438
    def xmlrpc_list_agents(self):
439
        """@return: Liste des agents chargés"""
440
        return self.agents.keys()
441
    xmlrpc_list_agents.signature = [['array']]
442

    
443
    def xmlrpc_agents_menu(self):
444
        """@return: Liste des agents chargés et structure d'affichage"""
445
        try:
446
            menu = {}
447
            for name, agent in self.agents.items():
448
                if agent.section != None:
449
                    if not menu.has_key(agent.section):
450
                        menu[agent.section] = []
451
                    menu[agent.section].append((name, agent.description))
452
            return menu
453
        except Exception, e:
454
            log.msg(e)
455
    xmlrpc_agents_menu.signature = [['struct']]
456

    
457
    def xmlrpc_status_for_agents(self, agent_name_list = []):
458
        """
459
        @return: Les statuts des agents listés dans un dictionnaire
460
        C{{nom:status}}. Le status est lui-même un dictionnaire avec
461
        pour clés C{'level'} et C{'message'}. Seuls les noms d'agents
462
        effectivement chargés apparaîtront parmi les clés du
463
        dictionnaire.
464
        """
465
        result = {}
466
        if len(agent_name_list) == 0:
467
            agent_name_list = self.agents.keys()
468
        for agent_name in agent_name_list:
469
            if self.agents.has_key(agent_name):
470
                result[agent_name] = self.agents[agent_name].check_status().to_dict()
471
        return result
472
    xmlrpc_status_for_agents.signature = [['string', 'struct']]
473

    
474
    def xmlrpc_reset_max_status_for_agents(self, agent_name_list=[]):
475
            if len(agent_name_list) == 0:
476
                agent_name_list = self.agents.keys()
477
            for agent_name in agent_name_list:
478
                if self.agents.has_key(agent_name):
479
                    self.agents[agent_name].reset_max_status()
480
            return "ok"
481

    
482
    def xmlrpc_archive_for_upload(self):
483
        self.wakeup_for_upload(False)
484
        return "ok"
485

    
486

    
487
class PublisherService(service.MultiService):
488
    """Serves the web interface for current agent data"""
489

    
490
    def __init__(self, config, parent, root_resource,
491
                 live_agents=None,
492
                 show_clients_page=True):
493
        """config should be complete"""
494
        service.MultiService.__init__(self)
495
        self.config = config
496
        self.show_clients_page = show_clients_page
497
        self.manager = ClientManager(self.config, live_agents)
498
        # attach to parent service
499
        self.setServiceParent(service.IServiceCollection(parent))
500
        # run webserver
501
        rsrc = ZephirServerResource(self.config, self.manager)
502
        root_resource.putChild('agents', rsrc)
503
        default_page = './agents/'
504
        if not self.show_clients_page:
505
            default_page += self.config['host_ref'] + '/'
506
        root_resource.putChild('', util.Redirect(default_page))
507

    
508
#TODO
509
# update resources: loading host structures, manager -> agent dict
510
# connect publisher and updater to zephir service (web server, config...)
511

    
512
# client manager: liste des host_ref, {host_ref => agent_manager}
513
# agent manager: structure, {nom => agent_data}