zabbix触发器禁用后未启用审计

zabbix触发器禁用后未启用审计

场景

目前的例行发布做了发布前自动屏蔽触发器,发布后恢复的操作,由于有时候发布过程中某一步失败了,人工介入处理之后整个发布流程没有继续走完就会导致触发器未恢复。

代码

代码分两块,一个是main.py,一个是github上的开源脚本zhtrigfinder.py,仓库名叫q1x/zabbix-gnomes,感谢,该仓库年久失修,为了防止失联本人fork了一下,也在下面把源码直接贴了出来,请把两个代码放到同一级目录。

整个项目依赖python2,主要是这个七八年前的上古开源脚本zhtrigfinder.py也依赖python2。

脚本依赖安装如下:

python2 -m pip install pyzabbix==1.0.0 requests==2.26.0

这两个脚本公用一个配置文件,格式如下:

[Zabbix API]
     username=johndoe
     password=verysecretpassword
     api=https://zabbix.mycompany.com/path/to/zabbix/frontend/
     no_verify=true

所以主代码main.py的运行方式如下:

python main.py -c zbx_prod.conf

另外,主代码中你只需要修改企业微信的群组chat_id和机器人公网请求地址,此外修改一下operation_dictoperation_dict这个字典中的zabbix agent的ip和触发器中关键字就行。

main.py代码如下:

# -*- encoding: utf-8 -*-
import subprocess
import argparse
import ConfigParser
import os
import os.path
import sys
import distutils.util
import requests
import json
from pyzabbix import ZabbixAPI
import logging
import time


def logger_getter():
    today = time.strftime("%Y-%m-%d", time.localtime())
    logger = logging.getLogger()
    if not len(logger.handlers):
        logger.setLevel(logging.INFO)
        formatter = logging.Formatter("%(asctime)s ||| %(levelname)s ||| %(lineno)d ||| %(funcName)s ||| %(message)s",
                                      datefmt='%Y-%m-%d %H:%M:%S')
        file_handler = logging.FileHandler('/data/logs/zabbix_trigger_disabled_notify/debug.log.' + today)
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)
    return logger

def check_kw(keywords, host_ip):
    try:
        cmd="python zhtrigfinder.py -c zbx_prod.conf -s '{0}' -n '{1}'".format(keywords, host_ip)
        # print(cmd)
        trigger_id_list = subprocess.check_output(cmd, shell=True)
        return trigger_id_list.strip()
    except subprocess.CalledProcessError:
        return ''

def send_msg_2_wework(trigger_notify_list):
    if trigger_notify_list:
        content = """# **zabbix触发器巡检通知**
<font color='warning'>**巡检状态:**</font>异常
<font color='warning'>**巡检详情:**</font>以下触发器未恢复,请检查核实!
{0}
<@xxxx> <@xxxx>
""".format('\n'.join(['IP: ' + t[1] + ' 触发器名称:' + t[0] for t in trigger_notify_list]))
    else:
        content = """# **zabbix触发器巡检通知**
<font color='info'>**巡检状态:**</font>正常
<font color='info'>**巡检详情:**</font>容器触发器状态一切正常
<@xxxx> <@xxx>
"""
        
    # prod
    chat_id = 'xxxxx' 
    # test
    diag = {"chatid": chat_id,
         "msgtype":"markdown",
         "markdown": {
             "content": content}}

    headers ={"Content-Type":"application/json"}

    diag = json.dumps(diag)
    requests.post('weixin.qq.com/cgi-bin/xxxxxx', 
            data=diag, headers=headers)

def trigger_id_to_txt(zapi,t_id):
    check=zapi.trigger.get(filter={'triggerid':t_id}, output=['description','status'], expandDescription=1)
    return check[0]['description'].encode('utf-8')

def get_trigger_list(operation_dict):
    # test
    # operation_dict = {'10.99.224.4': ['tof']}
    trigger_notify_list = []
    for item in operation_dict:
        ip_list = item.split(',')
        for ip in ip_list:
            keywords_list = operation_dict[item]
            trigger_id_txt_all = []
            # coz the ip maybe has two keywords,so at first we iterate the kw list to get a string list which seperated by '\n'
            for kw in keywords_list:
                #print('kw: ' + kw)
                #print('ip: ' + ip)
                trigger_id_txt = check_kw(kw, ip)
                trigger_id_txt_all.append(trigger_id_txt)
            #print(trigger_id_txt_all)
            trigger_id_list_all = []
            # Second, we iterate the txt based string list to get the whole trigger id list,which contains one trigger id for each list element
            for txt in trigger_id_txt_all:
                trigger_id_list = [id_ for id_ in txt.strip().split('\n')]
                for id_ in trigger_id_list:
                    trigger_id_list_all.append(id_)
            # Then we check the trigger's status
            for id_  in trigger_id_list_all:
                check=zapi.trigger.get(filter={'triggerid': id_}, output=['description','status'], expandDescription=1)
                if check[0]['status'] == '1':
                    trigger_notify_list.append([int(id_),ip])
                elif check[0]['status'] == '0':
                    pass
                else:
                    trigger_notify_list.append([int(id_),ip])
    trigger_desc_list = [[trigger_id_to_txt(zapi, item[0]),item[1]] for item in trigger_notify_list]
    return trigger_desc_list


if __name__ == '__main__':
    # define config helper function
    def ConfigSectionMap(section):
        dict1 = {}
        options = Config.options(section)
        for option in options:
            try:
                dict1[option] = Config.get(section, option)
                if dict1[option] == -1:
                    DebugPrint("skip: %s" % option)
            except:
                print("exception on %s!" % option)
                dict1[option] = None
        return dict1
    
    
    # set default vars
    defconf = os.getenv("HOME") + "/.zbx.conf"
    username = ""
    password = ""
    api = ""
    noverify = ""
    
    # Define commandline arguments
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description='Switches the host inventory mode for the specified host(s) or hostgroup(s). The default setting is to switch to "automatic" mode.', epilog="""
    This program can use .ini style configuration files to retrieve the needed API connection information.
    To use this type of storage, create a conf file (the default is $HOME/.zbx.conf) that contains at least the [Zabbix API] section and any of the other parameters:
           
     [Zabbix API]
     username=johndoe
     password=verysecretpassword
     api=https://zabbix.mycompany.com/path/to/zabbix/frontend/
     no_verify=true
    
    """)
    
    parser.add_argument('-u', '--username', help='User for the Zabbix api')
    parser.add_argument('-p', '--password', help='Password for the Zabbix api user')
    parser.add_argument('-a', '--api', help='Zabbix API URL')
    parser.add_argument('--no-verify', help='Disables certificate validation when nventory_mode using a secure connection',action='store_true') 
    parser.add_argument('-c','--config', help='Config file location (defaults to $HOME/.zbx.conf)')
    args = parser.parse_args()
    
    # load config module
    Config = ConfigParser.ConfigParser()
    Config
    
    # if configuration argument is set, test the config file
    if args.config:
     if os.path.isfile(args.config) and os.access(args.config, os.R_OK):
      Config.read(args.config)
    
    # if not set, try default config file
    else:
     if os.path.isfile(defconf) and os.access(defconf, os.R_OK):
      Config.read(defconf)
    
    # try to load available settings from config file
    try:
     username=ConfigSectionMap("Zabbix API")['username']
     password=ConfigSectionMap("Zabbix API")['password']
     api=ConfigSectionMap("Zabbix API")['api']
     noverify=bool(distutils.util.strtobool(ConfigSectionMap("Zabbix API")["no_verify"]))
    except:
     pass
    
    # override settings if they are provided as arguments
    if args.username:
     username = args.username
    
    if args.password:
     password = args.password
    
    if args.api:
     api = args.api
    
    if args.no_verify:
     noverify = args.no_verify
    
    # test for needed params
    if not username:
     sys.exit("Error: API User not set")
    
    if not password:
     sys.exit("Error: API Password not set")
     
    if not api:
     sys.exit("Error: API URL is not set")
    
    # Setup Zabbix API connection
    zapi = ZabbixAPI(api)
    
    if noverify is True:
     zapi.session.verify = False
    
    # Login to the Zabbix API
    zapi.login(username, password)
    # prod
    operation_dict = {'127.0.0.1':['网站'],'127.0.0.1': ['接口']}
    docker_trigger_list = get_trigger_list(operation_dict)
    send_msg_2_wework(docker_trigger_list)

开源脚本zhtrigfinder.py如下,再次感谢:

#!/usr/bin/env python
#
# import needed modules.
# pyzabbix is needed, see https://github.com/lukecyca/pyzabbix
#
import argparse
import ConfigParser
import os
import os.path
import sys
import distutils.util
from pyzabbix import ZabbixAPI

# define config helper function
def ConfigSectionMap(section):
    dict1 = {}
    options = Config.options(section)
    for option in options:
 	try:
		dict1[option] = Config.get(section, option)
		if dict1[option] == -1:
			DebugPrint("skip: %s" % option)
	except:
		print("exception on %s!" % option)
		dict1[option] = None
    return dict1


# set default vars
defconf = os.getenv("HOME") + "/.zbx.conf"
username = ""
password = ""
api = ""
noverify = ""

# Define commandline arguments
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description='Tries to find triggers configured for the specified Zabbix host.', epilog="""
This program can use .ini style configuration files to retrieve the needed API connection information.
To use this type of storage, create a conf file (the default is $HOME/.zbx.conf) that contains at least the [Zabbix API] section and any of the other parameters:
       
 [Zabbix API]
 username=johndoe
 password=verysecretpassword
 api=https://zabbix.mycompany.com/path/to/zabbix/frontend/
 no_verify=true

""")
group = parser.add_mutually_exclusive_group(required=False)
group2 = parser.add_mutually_exclusive_group(required=False)
parser.add_argument('hostname', help='Hostname to find the configured triggers on')
parser.add_argument('-u', '--username', help='User for the Zabbix api')
parser.add_argument('-p', '--password', help='Password for the Zabbix api user')
parser.add_argument('-a', '--api', help='Zabbix API URL')
parser.add_argument('--no-verify', help='Disables certificate validation when using a secure connection',action='store_true') 
parser.add_argument('-c','--config', help='Config file location (defaults to $HOME/.zbx.conf)')
group.add_argument('-n', '--numeric', help='Return numeric triggerids instead of descriptions',action='store_true')
group.add_argument('-e', '--extended', help='Returns trigger id, value, status, state, severity, description and expression separated by ":". See https://www.zabbix.com/documentation/2.2/manual/api/reference/trigger/object for more information.',action='store_true')
group2.add_argument('-s', '--search', help='Show only triggers with a description containing this search string')
group2.add_argument('-A', '--active', help='Show only active triggers',action='store_true')
args = parser.parse_args()

# load config module
Config = ConfigParser.ConfigParser()
Config

# if configuration argument is set, test the config file
if args.config:
 if os.path.isfile(args.config) and os.access(args.config, os.R_OK):
  Config.read(args.config)

# if not set, try default config file
else:
 if os.path.isfile(defconf) and os.access(defconf, os.R_OK):
  Config.read(defconf)

# try to load available settings from config file
try:
 username=ConfigSectionMap("Zabbix API")['username']
 password=ConfigSectionMap("Zabbix API")['password']
 api=ConfigSectionMap("Zabbix API")['api']
 noverify=bool(distutils.util.strtobool(ConfigSectionMap("Zabbix API")["no_verify"]))
except:
 pass

# override settings if they are provided as arguments
if args.username:
 username = args.username

if args.password:
 password = args.password

if args.api:
 api = args.api

if args.no_verify:
 noverify = args.no_verify

# test for needed params
if not username:
 sys.exit("Error: API User not set")

if not password:
 sys.exit("Error: API Password not set")
 
if not api:
 sys.exit("Error: API URL is not set")

# Setup Zabbix API connection
zapi = ZabbixAPI(api)

if noverify is True:
 zapi.session.verify = False

# Login to the Zabbix API
zapi.login(username, password)

##################################
# Start actual API logic
##################################

# set the hostname we are looking for
host_name = args.hostname

hosts = zapi.host.get(output="extend", filter={"host": host_name})

if hosts:
   # Find triggers
   if args.search:
      triggers = zapi.trigger.get(filter={'host':host_name},output='extend',search={'description':args.search},expandExpression=1,expandDescription=1)
   elif args.active:
      triggers = zapi.trigger.get(filter={'host':host_name,'value':1},output='extend',monitored=1,active=1,expandExpression=1,expandDescription=1)
   else:
      triggers = zapi.trigger.get(filter={'host':host_name},output='extend',expandExpression=1,expandDescription=1)

   if triggers:
      if args.extended:
         # print ids and descriptions
	 for trigger in triggers:
	   print(format(trigger["triggerid"])+":"+format(trigger["value"])+":"+format(trigger["status"])+":"+format(trigger["state"])+":"+format(trigger["priority"])+":"+format(trigger["description"])+":"+format(trigger["expression"]))
      else:
        if args.numeric:
           # print ids
	   for trigger in triggers:
	     print(format(trigger["triggerid"]))
        else:
           # print descriptions
	   for trigger in triggers:
	     print(format(trigger["description"]))
   else:
       sys.exit("Error: No matching triggers found on "+ host_name)
else:
   sys.exit("Error: Could not find host "+ host_name)

# And we're done...

 

0 评论 在 “zabbix触发器禁用后未启用审计

发表评论

电子邮件地址不会被公开。 必填项已用*标注

captcha