通过调用HTTP接口实现短信监控
更多

我公司在某项目组维护的OGG(Oracle Goldengate,一款数据同步软件),由于局方没有成熟的监控平台,一直没有短信告警,导致我方维护人员每天都需要频繁登录每个配置过OGG的主机进行巡检(目前已达20多台),工作重复量较大,但一直没有很好的解决办法。虽然可通过写shell脚本,并配置crontab定时任务的方式生成巡检结果,但还是需要人为地查看。前不久,局方给了一个HTTP接口,说是通过调用这个接口便可实现短信监控,但是没给调用该接口的具体实现代码。 正好那段时间我所在的项目组离那边比较近,接到同事的反馈后,就想如何去实现这个监控的需求。

 

虽然用shell脚本也可以实现http接口的调用(使用curl命令、GET/POST的方式),但是比较复杂,而且后期不好维护。既然已经放弃了使用shell脚本实现(主要还是自己懒。。。),那么就得用一种编程语言来做了。既要比shell编写起来简单直观,还要便于后期维护,最好还能有时间、相关运行日志的记录,那么好像非Python莫属了,哈哈!话不多说,下面就大概说一下实现的具体过程。

 

HTTP接口的调用方式

 

HTTP接口的调用格式如下:

 

http://xxx.xxx.0.2/sms/sendsms.php?Action=MSG_ALERT&DestTermID=手机号码&MsgContent=告警内容

 

HTTP接口的好处就是,测试的时候可以在浏览器内直接调用。我们先拼凑一条告警信息,并直接在浏览器中输入:

 

http://xxx.xxx.0.2/sms/sendsms.php?Action=MSG_ALERT&DestTermID=180xxxx3808&MsgContent=OGG_ALERT!

 

很快就能收到返回的告警信息,说明请求的格式没问题。

 

 

初步监控目标

 

OGG进程状态共有RUNNING, STOPPED, ABENDED三种,其中RUNNING为正常运行状态,STOPPED一般是人为停止的状态,ABENDED是由于数据同步等原因导致的异常中断状态。我们初步的监控目标就是能够发现异常ABENDED的进程,并定时发送告警短信给相关维护人员(即监控图中的Status列)。

 

对于监控频率的设置,通过询问局方人员得知,目前需要监控的系统有本地和异地容灾两部分系统。异地容灾系统由于有集团考核,及时性要求较高,出现问题必须马上处理,所以需要配置成每5分钟检查一次;而本地系统配置成每小时检查一次即可。

 

收集告警信息

 

根据上面的接口调用方式,那么我们现在要做的可以分以下几步:

 

1) 收集OGG的运行状态,并汇总到一起;

 

2) 过滤出包含ABENDED的进程及对应的业务系统名称;

 

3) 发送包含该业务系统名称的告警短信给相关人员(注:短信接收人员可能有多个)。

 

其中的第一条,我们的同事已经通过shell脚本实现好了(还加入了对目录使用率的告警),并统一ftp到了一台Windows监控主机的目录下,帮了我的一个大忙。脚本内容如下:

 

oggcheck.sh (主要收集OGG进程状态、目录使用率情况及超过阈值后的关键字over_load)

 

 
#!/usr/bin/sh
# 设置OGG目录使用率阈值
OGG_DIR_THRESHOLD=60
 
# 获取并输出操作系统类型
OS_TYPE=`uname -a| awk '{print $1}'`
echo "OS TYPE: $OS_TYPE"
 
## 获取OGG安装目录
# 使用查找mgr进程的命令查找OGG安装目录
VAR1=`ps -ef|grep mgr.prm | grep -v grep`
# 从左边开始,删除PARAMFILE左边所有的字符
VAR2=${VAR1#*PARAMFILE}
# 从右边开始,删除dirprm右边所有的字符
OGG_HOME=${VAR2%dirprm*}
 
# 获取OGG运行状态
cd $OGG_HOME
echo "info all" | ./ggsci
 
# 获取OGG目录使用率(当前维护系统主要有LINUX, AIX和HP-UX三类)
if [ $OS_TYPE  = 'Linux' ];then
    VAR3=`df -h $OGG_HOME | sed -n '$p'|cut -d '%' -f 1|awk '{print $NF}'`
elif [ $OS_TYPE = 'AIX' ];then
    VAR3=`df -g $OGG_HOME | sed -n '$p'|cut -d '%' -f 1|awk '{print $NF}'`
elif [ $OS_TYPE = 'HP-UX' ];then
    VAR3=`bdf $OGG_HOME | sed -n '$p'|cut -d '%' -f 1|awk '{print $NF}'`
else
    OGG_DIR_USAGE=100    # 如果操作系统无法识别,则输出100
    echo "`hostname`'s OS TYPE IS UNKNOWN!"
fi
OGG_DIR_USAGE=${VAR3#* }
 
# 输出OGG目录的实际使用率及关键字over_load(关键字用于给后面程序判断是否发目录告警时用)
if [ "$OGG_DIR_USAGE" -ge "$OGG_DIR_THRESHOLD" ];then
    echo -e "\n`hostname`'s OGG directory usage is: $OGG_DIR_USAGE%, over_load"
fi

 

put_alert_file.sh(上传监控文件)

 

 
#!/usr/bin/sh
# 获取IP地址
IP_ADD=$(awk '{print $1,$2}' /etc/hosts|grep "$(hostname)""$"|awk '{print $1}')
# 设置监控文件绝对路径
FILE_NAME=/home/oracle/ogg_mon/"$IP_ADD"_`hostname`.txt
# 设置监控文件名称
FILE_NAME_SHORT="$IP_ADD"_`hostname`.txt
# 将oggcheck.sh的执行结果输出到$FILE_NAME监控文件中
/bin/sh oggcheck.sh>$FILE_NAME 2>&1
# 设置目录名称
dirdate=`date "+%Y%m%d"`
# 使用ftp创建当日目录,并上传监控文件
ftp -n <<!
open xxx.2.0.11 50191
user admin admin
pwd
binary
cd ogg
# 按当天日期创建目录
mkdir ${dirdate}
cd ${dirdate}
lcd /home/oracle/ogg_mon
prompt
put ${FILE_NAME_SHORT}
bye
!
rm -f /home/oracle/ogg_mon/*.txt
echo "finish"

 

创建配置文件

 

第二步就主要是对告警信息的处理了。在处理之前,我们需要先定义一个配置文件,里面存放例如需要监控的系统信息、告警接收人的手机号码、告警内容等。如果以后这些内容有变动,直接在配置文件中修改即可,便于维护。

 

代码如下:

 

settings.py

 

 
class Settings():
    def __init__(self):
        """ 所有告警系统的配置(包括异地灾备系统)
        配置方法:采用键值对的方式,键为具体的告警文件名称(带扩展名),值为对应的系统名称(可自定义).
                 告警内容将使用自定义系统名称,建议将IP也一并配置进系统名称。
                 例如:"xxx.0.0.1_xxxdb1.txt":"xxx.0.0.1_xxxdb1" 表示告警文件为 xxx.0.0.1_xxxdb1.txt,
                 自定义的系统名称为 xxx.0.0.1_xxxdb1 ,如果有告警,该系统名称会显示在告警短信中。
        注:修改配置信息后需重启程序方可生效。
        重启方式:任务管理器中kill pyw.exe进程,再双击程序目录中的alert.pyw文件即可。可查看本地日志确认是否启动进程。 """
        self.all_systems = {"xxx.0.2.1_erpdb1.txt":"xxx.0.0.1_erpdb1","xxx.0.2.32_pmpdb1.txt":"xxx.0.2.32_pmpdb1",
                           "xxx.1.20.131_xnyx.txt": "xxx.1.20.131_xnyx", "xxx.1.18.79_ods.txt": "xxx.1.18.79_remote"
                           }
 
        # 异地灾备告警系统配置(灾备系统告警阈值为5分钟,需另单独配置一份,只配置系统名称即可,必须跟上面配置的系统名称一致)
        self.remote_systems = ['xnyx', 'xxx.1.18.79_remote']
 
        # 配置接收告警的手机号码
        self.acc_nbr = [181xxxx9607, 152xxxx8230, 152xxxx3315, 133xxxx8640, 177xxxx213, 136xxxx6297]
        # 设置本地日志路径
        self.logfile_name = 'D:\\ogg_alert\\log\\info.log'
        # 设置告警文件路径
        self.alert_file_path = 'D:\\系统软件\\oggxj\\'
        # 设置告警相关的字符串,用于拼接告警信息
        self.url1 = "http://xxx.xxx.0.2/sms/sendsms.php?Action=MSG_ALERT&DestTermID="
        self.url2 = "&MsgContent="
        self.abend_warning = "_Goldengate:ABENDED"
        # 为方便起见,这里只提示目录使用率大于60%。实际值可以从监控文件中提取
        self.dict_warning = "_Goldengate_directory_usage_>_60%!"

 

按照时间生成日志路径

 

按照上面shell脚本的内容,每天都会生成一个YYYYMMDD格式的文件夹,然后再将当日的监控文件放入该文件夹中(会覆盖上次的文件)。所以我们在程序中也要按照日期动态生成路径:

 

functions.py

 

 
import time
import os
 
 
def time_path(sys_settings):
    # 生成固定格式的日期字符串并拼接
    time_format = time.strftime('%Y%m%d', time.localtime())
    alert_path = sys_settings.alert_file_path + time_format
    # 改变当前工作目录至alert_path
    os.chdir(alert_path)

 

按照要求,监控每隔5分钟就会扫描一次,所以在每次循环中都调用此函数即可。

 

处理告警信息

 

我们采用的方法是遍历每个告警文件,并处理成列表,如果存在ABENDED或者over_load(OGG目录告警关键字),就使用相关信息生成告警内容,并记录日志。

 

functions.py

 

 
def get_alert_list(sys_settings):
    info_list = []
    try:
        # 遍历当前目录下的文件
        for filename in os.listdir():
            with open(filename) as file:
                lines = file.read()
                # 使用空格分割成列表
                line_list = lines.split()
                if 'ABENDED' in line_list:
                    # 使用settings.py中配置的系统名称及告警信息进行拼接
                    info = sys_settings.all_systems[filename] + sys_settings.abend_warning
                    info_list.append(info)
                    # 将处理好的告警内容写入日志
                    log_write("WARNING: " + info + "\n", sys_settings)
                if 'over_load' in line_list:
                    info = sys_settings.all_systems[filename] + sys_settings.dict_warning
                    info_list.append(info)
                    log_write("WARNING: " + info + "\n", sys_settings)
    except PermissionError as e:
        log_write("Alert file not found!\n")
    """ 返回告警内容 """
    return info_list
 
 
def log_write(log_info, settings):
    # 以固定格式写入日志内容
    with open(settings.logfile_name, 'a') as logfile:
        logfile.write("[" + time.strftime('%Y/%m/%d %H:%M:%S', time.localtime()) + "] " + log_info)

 

发送告警

 

收集并记录好告警信息后,我们就可以使用这些信息给指定人员发送短信了。能够成功发送请求的关键是对指定的url调用request.get()方法,从而获得相应的返回码(一般的接口在处理完成后都会给请求方返回一个字符或数字作为返回码,可以进一步写一个处理返回码的函数,记录或者发送返回码对应的信息,这里为了简便就省略了)。

 

因为我们要把每条告警信息都发送给所有的告警接收人员,所以在遍历告警信息的同时,也需要遍历电话号码:

 

functions.py

 

 
import requests
from urllib.error import URLError
 
 
def send_info(info_list, sys_settings):
    for info in info_list:
        for phone_num in sys_settings.acc_nbr:
            # 按照http接口请求格式拼接字符串
            url = sys_settings.url1 + str(phone_num) + sys_settings.url2 + info
            try:
                # get_code 为对应的返回码,这里为了简便不做处理
                get_code = requests.get(url)
                # 在日志中记录将告警发送给哪个电话号码
                log_write(info + " send to: " + str(phone_num) + "\n", sys_settings)
            except URLError as e:
                # 如果调用失败,则记录连接错误的日志
                log_write("connect to the server failed!\n", sys_settings)

 

针对不同的业务系统发送告警

 

有了上面的send_info()函数后,我们只要传入包含告警信息的列表info_list及配置信息sys_settings,就可以向指定人员发送告警了。但是在正式发送前,针对本地系统和异地容灾系统告警频率的差异,需要分别写两个函数分别进行处理:

 

functions.py

 

 
def send_remote_only(alert_list, sys_settings):
    # 只发送异地容灾系统的告警
    if len(alert_list) == 0:
        log_write("There is no alert, process finished!\n\t"
                  "Remote process will restart after 5 min ... ...\n", sys_settings)
    else:
        for remote_system in sys_settings.remote_systems:
            abend_warning = remote_system + sys_settings.abend_warning
            dict_warning = remote_system + sys_settings.dict_warning
            if abend_warning in alert_list:
                send_info([abend_warning], sys_settings)
                log_write("Remote process will restart after 5 min ... ...\n", sys_settings)
            if dict_warning in alert_list:
                send_info([dict_warning], sys_settings)
 
 
def send_all(alert_list, sys_settings):
    # 发送所有系统的告警
    if len(alert_list) == 0:
        log_write("There is no alert, process finished!\n\t"
                  "Local process will restart after 60 min ... ...\n", sys_settings)
    else:
        send_info(alert_list, sys_settings)
        log_write("Local process will restart after 60 min ... ...\n", sys_settings)

 

使用主程序文件

 

目前,我们已经基本上完成了所有的功能,只差一个不断调用这些函数的循环结构以及控制定时发送的策略了。循环结构使用while循环即可,对时间的控制可以使用time.sleep()方法。所以我们需要一个主程序文件来统一控制这些动作。

 

alert.pyw

 

 
from settings import Settings
from timepath import TimePath, time
from functions import send_all, send_remote_only, log_write, get_alert_list, time_path
 
 
# 初始化配置信息
sys_settings = Settings()
# 初始化用于计数的变量n
n = 0
 
# 主循环
while True:
    try:
        # 调用time_path生成日志路径
        time_path(sys_settings)
    except FileNotFoundError as e:
        # 如果发生FileNotFoundError,说明当日目录未生成,记录到日志中
        log_write("Alert Directory not exists!\n", sys_settings)
 
    alert_list = get_alert_list(sys_settings)
 
    """ 如果n == 12,就调用send_all()函数,检查所有系统
    否则就调用send_remote_only()函数,只检查异地容灾系统 """
    n += 1
    if n == 12:
        send_all(alert_list, sys_settings)
        n = 0
        time.sleep(300)
    else:
        send_remote_only(alert_list, sys_settings)
        # 使用time.sleep()方法,每5分钟进行一次循环
        time.sleep(300)

 

注意到,我们的主程序文件alert.pyw的扩展名是.pyw,这是因为在Windows环境下,.pyw文件可以通过直接双击在后台运行,不用再打开编辑器执行了。

 

功能测试

 

程序运行一段时间后查看记录的日志:

 

正常运行时的日志:

 

 

目录使用率超过设定的60%时的日志:

 

本地系统(到达一小时后统一发送告警):

 

异地容灾系统(5分钟扫描并发送告警):

 

OGG进程异常ABENDED时的日志:

 

本地系统:

 

异地容灾系统:

 

可以看到,所有功能都测试正常。

 

监控延迟

 

之前实现的是最基本的对进程状态的监控。在有些对数据实时性要求较高的系统中,只监控这些还不够,还需要对延迟时间进行监控。

 

图中标出的Lag at Chkpt列即在事务能够正常提交情况下的延迟总时间,如果有延迟,说明这个进程里同步的表或数据过多,需要做拆分;Time Since Chkpt列是指距离上次事务提交的时间,如果该列出现延迟,说明有大事务,或者有索引缺失导致DML操作执行较慢等情况,也需要关注。所以在监控延迟时,只需关注这两列即可。

 

假设我们的目标是当延迟大于5分钟时输出告警信息,则需要提取这两列表示分钟的项并做判断。这要比上面的只对Status列做判断要稍微复杂一些,除了同时取多个列之外(进程名称、两个延迟列),还涉及分割、数据类型转换等操作。所以我们计划使用numpy数组存放多个列的值,方便后续在时间上的判断(numpy数组可以进行矩阵运算,比循环的效率要高)。写日志也换一种方式,使用csv.reader()输出延迟信息。

 

为了便于阅读,我们将延迟信息单独生成一个文件进行判断。

 

oggcheck_new.sh

 

 
## 获取OGG安装目录
# 使用查找mgr进程的命令查找OGG安装目录
VAR1=`ps -ef|grep mgr.prm | grep -v grep`
# 从左边开始,删除PARAMFILE左边所有的字符
VAR2=${VAR1#*PARAMFILE}
# 从右边开始,删除dirprm右边所有的字符
OGG_HOME=${VAR2%dirprm*}
 
# 获取OGG运行状态
cd $OGG_HOME
# 每次覆盖写当时的监控信息
date > /acfs/dump/ogginfo.tmp
echo "info all" | /ogg/ggsci |grep -E "MANAGER|EXTRACT|REPLICAT" >> /acfs/dump/ogginfo.tmp
# 将监控信息同时写入备份监控文件
date >> /oradump/ogg_tmp/ogginfo.bak
echo "info all" | /ogg/ggsci |grep -E "MANAGER|EXTRACT|REPLICAT" >> /acfs/dump/ogginfo.bak

 

使用crontab将整个脚本设置成5分钟执行一次:

 

*/5 * * * */home/ogg/chk_ogg_status.sh >> /home/ogg/chk_oggstatus.log 2>&1

 

我们先写主程序文件,取进程名称和两个延迟列的数据,并将其放到numpy数组中。

 

ogg_lag_mon.py

 

 
import sys 
sys.path.append('/ogg/dirsql/py_mon')    # 进入延迟监控脚本所在目录 
import csv 
import numpy as np 
  
  
alert_file = '/acfs/dump/ogginfo.tmp'           # 设置监控文件路径 
alert_his_file = '/acfs/dump/ogginfo.bak'       # 设置备份文件路径 
lag_min = 5                                     # 设置延迟时间 
error_log = '/ogg/dirsql/py_mon/lag.log'        # 设置本地错误日志路径 
  
lines = list(csv.reader(open(alert_file)))    # 将监控文件内容转换成列表 
values = lines[2:]        # 取实际的进程信息(前两个元素为时间和MANAGER进程的状态) 
group_name = []           # 进程名称(GROUP) 
lag_time = []             # 延迟时间(Lag at Chkpt) 
ckpt_time = []            # 事务提交延迟时间(Time Since Chkpt) 
  
for value in values: 
    group_name.append(value[0].split()[2]) 
    lag_time.append(value[0].split(':')[1]) 
    ckpt_time.append(value[0].split(':')[-2]) 
result_arr = np.array((group_name, lag_time, ckpt_time)) 

 

注:将监控文件内容转成lines列表时,我们按惯例就应该查看一下lines列表里到底装了些什么(为保持代码的连贯性,在这里给出分析步骤):

 

可以看到,这是一个列表中嵌套列表的结构,而且前两项不是我们要分析的数据(本例中暂不考虑MANAGER进程的状态),我们需要的元素的格式为:

 

           [‘EXTRACT     RUNNING    EXT01    00:00:00      00:00:01    ‘]

 

这个列表元素只有一项,取进程名称EXT01时可以按照空格分割,取后两列时间的分钟数值时可直接按照分号分割,分别添加到指定的列表中。最后将三个列表作为元素转换成numpy数组result_arr。

 

查看result_arr的内容:

 

 
>>> result_arr
array([['EXT01', 'EXT02', 'EXT03'],
       ['00', '08', '07'],
       ['00', '00', '00']],
      dtype='|S8')

 

这实际上是一个3*3的数组,第0行为进程名称,第1行为延迟时间,第2行为事务提交延迟时间。可以看到,正好已经有超过lag_min的进程了(第1行超过5的元素值)。我们先定义一个记录日志的函数alert_writer(),然后再定义一个写延迟信息的函数write_lag(),write_lag()函数可以调用alert_writer()函数记录延迟信息:

 

alert_writer.py

 

 
import csv
 
 
def alert_writer(fileName, *contents):
    with open(fileName, 'a') as file_obj:
        writer = csv.writer(file_obj)
        writer.writerow(contents)

 

注:因为使用csv.writer()写数据时,每个元素之间都以逗号分隔,所以通过使用任意数量的实参*contents,来记录你想记录的任何信息。

 

write_lag.py

 

 
import numpy as np
from datetime import datetime
from pyfunc.alert_writer import alert_writer
 
 
def write_lag(result_arr, rows, lag_min, alert_his_file):
    # 使用np.where()获得对应rows行中大于lag_min的元素的索引
    # rows只需取1, 2行,第0行为进程名称
    index_lag = np.where(result_arr[rows].astype(int) > lag_min)
    if len(index_lag) != 0:
        for indexes in index_lag:
            for index in indexes:
                # 将延迟信息写入备份文件
                alert_writer(alert_his_file, 'LAG', result_arr[0][index], result_arr[rows][index] + ' minutes', datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

 

最后,我们再完善主程序ogg_lag_mon.py中的代码,将收集到的result_arr数组带入write_lag()函数中进行判断,并记录延迟信息:

 

ogg_lag_mon.py

 

 
import sys
sys.path.append('/ogg/dirsql/py_mon')
from pyfunc.alert_writer import alert_writer
from pyfunc.write_lag import write_lag
import csv
from datetime import datetime
import numpy as np
 
... ...
 
try:
    for value in values:
        group_name.append(value[0].split()[2])
        lag_time.append(value[0].split(':')[1])
        ckpt_time.append(value[0].split(':')[-2])
    result_arr = np.array((group_name, lag_time, ckpt_time))
    # row1为Lag at Chkpt列
    write_lag(result_arr, rows=1, lag_min, alert_his_file)
    # row2为Time Since Chkpt
    write_lag(result_arr, rows=2, lag_min, alert_his_file)
except IndexError as e:
    alert_writer(error_log, 'Lag exists, please try later!', datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

 

最后对存在延迟的OGG进行测试,日志中记录的信息为:

 

以上只是将进程的相关信息输出到了日志,同样也可以组织成统一的格式,利用HTTP接口发送延迟的告警短信。具体方法与上面介绍的相似,不再赘述。有相同环境的同学可以自行实现。实际上,像这种比较成熟的应用程序,输出的日志格式基本上都是固定的,可以针对平时常用的几个产品,对不同的格式先分别进行格式化处理,然后统一生成相同格式的内容,无论是发送告警,还是做进一步的数据分析,都是非常方便的。

 

总结

 

在我们工程师日常运维的过程中,除了做好每天份内的工作外,有时候也经常会接到客户提出的一些超出我们职责范围的需求。这时候,如果直接拒绝客户,肯定是不明智的。作为技术服务人员,一定要以满足客户的要求为目标。这时候,如果我们自己刚好有这样的能力,或者借助同事、公司二线技术人员的力量来完成这些需求,肯定会给客户留下更好的印象。


关于巨象| 短信群发| 彩信群发| 短信群发软件| 资费标准| 付款方式| 代理加盟| 人才招聘| 联系我们

版权所有 广州巨象计算机科技发展有限公司 粤ICP备05007238号
服务电话:020-85272100 传真:020-85272100
总部地址:广州市天河区黄埔大道西876号跑马地凯怡阁29层
Copyright © 2004-2016 Hechina.com.All rights reserved.
短信群发 彩信群发 短信群发软件 巨象科技短信群发,彩信群发,短信群发软件,广州巨象计算机科技发展有限公司是一家致力于为企业提供互联网、通讯技术应用服务和解决方案的高科技公司,具有良好的国内外资金和技术背景;是国内最早投入研发企业短信应用和企业网络电视台系统的公司之一,业已成为广东地区最大的移动商务产品与解决方案的提供商和优秀的电讯服务品牌企业。其主要业务有:短信群发平台软件-巨象企信通,微信营销平台-巨象微信通,网络传真群发平台-Fax66网络传真,网络视频系统-巨象网视