DB监控-Riak集群监控

  一、采集的指标  1、吞吐量指标  1.1 单机采集方法:
/usr/sbin/riak-admin status
指标功能 node_gets 某节点前一分钟处理的 GET 请求数量,包括该节点上非本地虚拟节点处理的 GET 请求 node_puts 某节点前一分钟处理的 PUT 请求数量,包括该节点上非本地虚拟节点处理的 PUT 请求      1.2 集群 指标功能统计方法 node_gets_total 集群前一分钟处理的 GET 请求数量 SUM(node_gets) node_puts_total 集群前一分钟处理的 PUT 请求数量 SUM(node_puts)      2、延迟指标  2.1 单机采集方法:
/usr/sbin/riak-admin status
指标功能 node_get_fsm_time_mean 客户端发起 GET 请求到收到响应时间间隔的均值 node_get_fsm_time_median 客户端发起 GET 请求到收到响应时间间隔的中值 node_get_fsm_time_95 客户端发起 GET 请求到收到响应时间间隔的 95 百分位值 node_get_fsm_time_100 客户端发起 GET 请求到收到响应时间间隔的 100 百分位值 node_put_fsm_time_mean 客户端发起 PUT 请求到收到响应时间间隔的均值 node_put_fsm_time_median 客户端发起 PUT 请求到收到响应时间间隔的中值 node_put_fsm_time_95 客户端发起 PUT 请求到收到响应时间间隔的 95 百分位值 node_put_fsm_time_100 客户端发起 PUT 请求到收到响应时间间隔的 100 百分位值            2.2 集群 指标功能统计方法 node_get_fsm_time_mean_avg 客户端发起 GET 请求到收到响应时间间隔的均值 AVG(node_get_fsm_time_mean) node_put_fsm_time_mean_avg 客户端发起 PUT 请求到收到响应时间间隔的均值 AVG(node_put_fsm_time_mean)      3、Erlang 资源使用情况指标(单机)采集方法:
/usr/sbin/riak-admin status
指标功能 sys_process_count Erlang 进程的数量 memory_processes 分配给 Erlang 进程的内存总量(单位 bytes) memory_processes_used Erlang 进程使用的内存总量(单位 bytes)        4、Riak 负荷/健康指标   4.1 单机采集方法:
/usr/sbin/riak-admin status
指标功能 read_repairs 某节点前一分钟处理的读取修复操作数量 node_get_fsm_siblings_mean 某节点前一分钟所有 GET 操作处理的兄弟数据数量均值 node_get_fsm_siblings_median 某节点前一分钟所有 GET 操作处理的兄弟数据数量中值 node_get_fsm_siblings_95 某节点前一分钟所有 GET 操作处理的兄弟数据数量 95 百分位值 node_get_fsm_siblings_100 某节点前一分钟所有 GET 操作处理的兄弟数据数量 100 百分位值 node_get_fsm_objsize_mean 某节点前一分钟流经 GET_FSM 的对象大小均值 node_get_fsm_objsize_median 某节点前一分钟流经 GET_FSM 的对象大小中值 node_get_fsm_objsize_95 某节点前一分钟流经 GET_FSM 的对象大小 95 百分位值 node_get_fsm_objsize_100 某节点前一分钟流经 GET_FSM 的对象大小 100 百分位值             4.2 集群 指标功能统计方法 read_repairs_total 集群前一分钟处理的读取修复操作数量 SUM(read_repairs) node_get_fsm_siblings_mean_avg 集群前一分钟所有 GET 操作处理的兄弟数据数量均值 AVG(node_get_fsm_siblings_mean) node_get_fsm_objsize_mean_avg 集群前一分钟流经 GET_FSM 的对象大小均值 AVG(node_get_fsm_objsize_mean)        5、其他  5.1 LevelDB合并错误(单机)采集方法:
find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} ; | wc -l
  5.2 LevelDB读取块操作错误(单机)采集方法:
/usr/sbin/riak-admin status
指标功能 leveldb_read_block_error LevelDB 读取块操作错误数量     5.3 节点存活状态(单机)采集方法:
/usr/sbin/riak-admin member-status | grep `ifconfig | grep "inet addr:10" | awk -F':' '{print $2}' | awk '{print $1}'`
输出如下,valid表示节点正常
valid       9.0%      --      'riak@10.1.80.114'
  5.4 Riak Error Log(单机)
Riak 日志路径:/data1/riak/logs 
采集文件:/data1/riak/logs/* 
采集时间段:最近一分钟 
采集内容:最近一分钟发生的错误数 
采集示例:grep error -rn /data1/riak/logs | wc -l 
说明:这个采集需要程序处理下逻辑,在此不给出完整的采集方法
   二、采集程序  1、Riak监控系统设计 DBA通过前台页面根据CMDB三级业务添加/卸载Riak集群监控,根据CMDB的ip添加Riak单机监控(单机属于集群,不能单独存在,可增量添加单机监控),填写ip和端口,配置阈值、负责人等信息1)数据库设计
mysql> use riakMonitor
show tabReading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+---------------------------+
| Tables_in_riakMonitor     |
+---------------------------+
| riakClusterConf           | 
| riakClusterDisplay        | 
| riakClusterStatus         | 
| riakClusterStatusTemplate | 
| riakSingleConf            | 
| riakSingleDisplay         | 
| riakSingleStatus          | 
| riakSingleStatusTemplate  | 
+---------------------------+
8 rows in set (0.00 sec)
Template表作为历史库表模板,历史库按月分库,按ip分表2) 单机Agent设计
  • Agent会通过自动调度平台下发到目标机器,Crond周期是1分钟,直接上报到mysql数据库。运行时间超过45s 会被调度平台kill
  • 如果检测不到riak或者命令出错则会发送rtx告警给admins + dba, 系统错误会发送给admins
  • 3) 集群汇聚设计
  • 集群数据根据节点agent上报数据在50s的时候select出当前一分钟的数据计算汇聚入库
  • 程序每分钟都会清除clusterStatus的数据,如果agent在本分钟上报心跳异常或者上报时间不在集群程序运行前(50s),cluster则不会统计该ip数据,但平均值计算时的除数会算上该ip(+1)
  • 集群计算同时会写进历史库,并创建历史表
  • 4) CGI接口设计(NodeJs)
  • 异步接收agent上报的数据,根据redis的ip列表转换成ip1
  • 如果redis获取的ip1不存在singleConf表中则会拒绝上报,返回3003错误
  • 上报成功会入singleStatus和历史库,并创建历史表
  • 5) 代码列表
    CGI :  /data/riakMonitor   # daemon
    agent: /home/opd/script/riakMonitor  #  crond
    analyzer: /opdData/opdOnline/script/kmc/riakMonitor/analyzer  # crond
    1、从CMDB更新single/cluster conf数据  
    2、同步conf和display
    3、解析status数据到display
    4、异常数据写入
    5、告警
    riakTool: /opdData/opdOnline/script/kmc/riakMonitor/riakTool  # daemon
    每分钟第50s运行一次
    1、获取监控集群和集群的ip,计算结果并汇聚
    2、操作redis,将集群数据入历史库
      2、采集程序部分代码 (单机,python2.4)1) 采集指标函数
    def getRiakMeta():
        thisFuncName = str(sys._getframe().f_code.co_name)
        cmdStr = "/usr/sbin/riak-admin status"
        cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
        if 0 != cmdCode:
            msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
            logger.error(msgTxt)
            sendRtx(MYCONF.riakAdmins, thisFuncName+" %s Fail:" % cmdStr)
            return 1
    
        data["node_gets"] = data["node_puts"] = data["node_get_fsm_time_mean"] = data["node_get_fsm_time_median"] = 0
        data["node_get_fsm_time_95"] = data["node_get_fsm_time_100"] = data["node_put_fsm_time_mean"] = 0
        data["node_put_fsm_time_median"] = data["node_put_fsm_time_95"] = data["node_put_fsm_time_100"] = 0
        data["sys_process_count"] = data["memory_processes"] = data["memory_processes_used"] = 0
        data["read_repairs"] = data["node_get_fsm_siblings_mean"] = data["node_get_fsm_siblings_median"] = 0
        data["node_get_fsm_siblings_95"] = data["node_get_fsm_siblings_100"] = data["node_get_fsm_objsize_mean"] = 0
        data["node_get_fsm_objsize_median"] = data["node_get_fsm_objsize_95"] = data["node_get_fsm_objsize_100"] = 0
        data["leveldb_read_block_error"] = 0
    
        riakItemInfo = cmdStdout.split('n')
        for each in riakItemInfo:
            eachInfo = each.split(" : ")
            if 2 == len(eachInfo):
                itemKey = eachInfo[0]
                itemValue = eachInfo[1].replace('<<"', '').replace('">>', '')
                if itemKey in data: 
                    logger.debug("%s:%s" % (itemKey, itemValue))
                    try:
                        data[itemKey] = str(round(float(itemValue), 2))
                    except ValueError:
                        data[itemKey] = itemValue
                    except:
                        raise
        
        cmdStr = """ find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} ; | wc -l """
        cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
        if 0 != cmdCode:
            msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
            logger.error(msgTxt)
            sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
            return 1
        data["leveldb_compaction_error"] = cmdStdout #不用转int
        
        cmdStr = "/usr/sbin/riak-admin member-status | grep %s" % data["mainIp"]
        cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
        logger.debug(cmdStdout)
        if 0 != cmdCode:
            msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
            logger.error(msgTxt)
            sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
            return 1
        if cmdStdout.strip().startswith('valid'):
            data["is_active"] = 1
        else:
            data["is_active"] = 0
    
        data["riak_error_log"] = 0
        riakLogPath = "/data1/riak/logs/"
        if not os.path.isdir(riakLogPath):
            msgTxt = "[%s] %s not exists" % (thisFuncName, riakLogPath)
            logger.error(msgTxt)
            sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
            return 1
        riakLogInfo = os.listdir(riakLogPath)
        reportTimeSec = time.mktime(time.strptime(data["report_time"], "%Y-%m-%d %H:%M:%S"))
        for each in riakLogInfo:
            logger.debug("fileName: "+each)
            eachFile = os.path.join(riakLogPath, each)
            if os.path.isfile(eachFile):
                try:
                    eachFd = open(eachFile, 'r')
                except IOError, e:
                    msgTxt = "I/O error({}): {}".format(e.errno, e.strerror)
                    logger.error(msgTxt)
                    sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
                    return 1
                else:
                    for eachLine in eachFd: #从头读,怕文件太大撑爆内存
                        if "error" in eachLine: #2016-03-20 04:57:09.704 [info] <0.19012.49>@riak_kv_index_h
                            eachInfo = eachLine.split(' ')
                            try:
                                eachTimeStr = "%s %s" % (eachInfo[0], eachInfo[1][:-4])
                                eachTimeSec = time.mktime(time.strptime(eachTimeStr, "%Y-%m-%d %H:%M:%S"))
                                if reportTimeSec - 60 <= eachTimeSec < reportTimeSec:
                                    logger.debug(eachLine)
                                    data["riak_error_log"] += 1
                                elif eachTimeSec >= reportTimeSec:
                                    break
                            except:
                                msgTxt = "file(%s) format wrong " % eachFile
                                logger.error(msgTxt)
                                break
                                #sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
                                #eachFile.close()
                                #return 1
                    eachFd.close()
        return 0
    2)  上报和失败重传函数
    def report2server(content, retry):
        '''上报到入库程序,根据ip求余获取优先的server,如果上报失败会遍历server列表'''
        thisFuncName = ""
        try:
            thisFuncName = str(sys._getframe().f_code.co_name)
            pos = data["ip"] % len(MYCONF.reportServer)
            serverKeys = MYCONF.reportServer.keys()
            serverKeys.sort()
            serverKeys = serverKeys[pos:] + serverKeys[:pos]
            for serverId in serverKeys:
                cmdStr = "/usr/bin/curl -s --connect-timeout %d -m %d -d '%s&reTry=%d' %s" %(
                        MYCONF.curlConnectTimeout, MYCONF.curlMaxTimeout, content, retry, MYCONF.reportServer[serverId])
                cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
                logger.info(cmdStr + "ncmdCode:" + str(cmdCode) + "n" + cmdStdout + cmdStderr)
                if 0 == cmdCode:
                    return 0
            return 1
        except:
            exceptmsg = StringIO.StringIO()
            traceback.print_exc(file=exceptmsg)
            msgTxt = exceptmsg.getvalue()
            sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
            return 1
    
    def reportScheduler(reportRecord=0):
        '''reportRecord = 0 表示上报data中采集的新数据, 
        reportRecord = 1 表示从reportFailFile里面获取最新的一条数据上报到server,然后需要处理reportFailFile'''
        thisFuncName = ""
        try:
            thisFuncName = str(sys._getframe().f_code.co_name)
            if 1 == reportRecord: # 从上报失败文件中获取最后一条数据,上报之
                if not reportFail.has_section("index"): #这里不要去add_section("index") 该谁add谁add去
                    return 0
                if not reportFail.has_option("index", "index") or "" == reportFail.get("index", "index").strip():
                    return 0
                indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
                index = indexVec[-1]
                if "" == index:
                    msgTxt = reportFail.get("index", "index").strip()
                    sendRtx(MYCONF.admins, thisFuncName + "[系统错误] index.index 末尾有多余的逗号 " + msgTxt)
                    return 1
                if not reportFail.has_option("content", index + "_c") or not reportFail.has_option("content", index + "_t"): 
                    # _c 是内容  _t 是重试次数
                    msgTxt = "content sector 缺少 %s_c 或 %s_t" %(index, index)
                    sendRtx(MYCONF.admins, thisFuncName + "[系统错误] " + msgTxt)
                    return 1
                content = reportFail.get("content", index + "_c")
                retry = reportFail.getint("content", index + "_t")
                retry += 1
                code = report2server(content, retry)
                if 0 == code: # 发送成功
                    indexVec.remove(index)
                    if indexVec:
                        reportFail.set("index", "index", ",".join(indexVec))
                    else:
                        reportFail.set("index", "index", "")
                    reportFail.remove_option("content", index + "_c")
                    reportFail.remove_option("content", index + "_t")
                elif retry > MYCONF.maxRetry: # 重发失败,且超过最大重试次数
                    indexVec.remove(index)
                    if indexVec:
                        reportFail.set("index", "index", ",".join(indexVec))
                    else:
                        reportFail.set("index", "index", "")
                    reportFail.remove_option("content", index + "_c")
                    reportFail.remove_option("content", index + "_t")
                else: # 重发失败, 更新 _t (retry) 字段
                    reportFail.set("content", index + "_t", retry)
            else: # 发送新数据
                index = data["report_time"].replace(" ", "").replace("-", "").replace(":", "")
                content = urllib.urlencode(data)
                retry = 0
                code = report2server(content, retry)
                if 0 == code:
                    return 0
                if not reportFail.has_section("index"):
                    reportFail.add_section("index")
                    reportFail.set("index", "index", index)
                    reportFail.add_section("content")
                    reportFail.set("content", index + "_c", content)
                    reportFail.set("content", index + "_t", retry)
                else:
                    indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
                    indexVec.append(index)
                    if len(indexVec) > MYCONF.maxFailRecord: # 超过最大 fail record 数
                        reportFail.set("index", "index", ",".join(indexVec[len(indexVec) - MYCONF.maxFailRecord:]))
                        reportFail.set("content", index + "_c", content)
                        reportFail.set("content", index + "_t", retry)
                        for i in range(0, len(indexVec) - MYCONF.maxFailRecord):
                            delIndex = indexVec[i]
                            reportFail.remove_option("content", delIndex + "_c")
                            reportFail.remove_option("content", delIndex + "_t")
                    else:
                        reportFail.set("index", "index", ",".join(indexVec))
                        reportFail.set("content", index + "_c", content)
                        reportFail.set("content", index + "_t", retry)
            return 0
        except:
            exceptmsg = StringIO.StringIO()
            traceback.print_exc(file=exceptmsg)
            msgTxt = exceptmsg.getvalue()
            sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
            return 1
    3) 获取shell命令输出函数
    def getCmdResult(cmdStr):
        '''获取shell命令的返回码,标准输出,标准错误'''
        #child = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
        #cmdStdout, cmdStderr = child.communicate()
        #cmdCode = child.wait() 
        #return (cmdCode, cmdStdout, cmdStderr)
        thisFuncName = str(sys._getframe().f_code.co_name)
        nowTime = int(time.time())
        tmpstdout = os.path.join(MYCONF.basePath, "cmd.stdout.%d.tmp" % nowTime)
        tmpstderr = os.path.join(MYCONF.basePath, "cmd.stderr.%d.tmp" % nowTime)
        if "debug" == MYCONF.role:
            msgTxt = "[%d]Run Cmd: %s" % (nowTime, cmdStr)
            logger.debug(msgTxt)
    
        cmdStr = "(%s) 1>%s 2>%s" %(cmdStr, tmpstdout, tmpstderr)
        cmdCode = os.system(cmdStr) >> 8
        cdmStdout = cmdStderr = ""
        try:
            outfd = open(tmpstdout)
            cmdStdout = outfd.read()
            errfd = open(tmpstderr)
            cmdStderr = errfd.read()
        except:
            exceptmsg = StringIO.StringIO()
            traceback.print_exc(file=exceptmsg)
            msgTxt = exceptmsg.getvalue()
            sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
            cmdCode = 110
        else:
            outfd.close()
            errfd.close()
            os.remove(tmpstderr)
            os.remove(tmpstdout)
        return (cmdCode, cmdStdout, cmdStderr)
    4) 读/写Cache函数
    def readLastCache():
        global lastCache
        lastCache = ConfigParser.ConfigParser()
        if not os.path.isfile(MYCONF.lastCacheFile):
            try:
                fd = open(MYCONF.lastCacheFile, "w")
            except IOError, e:
                logger.error("I/O error({}): {}".format(e.errno, e.strerror))
                return 1
            else:
                fd.close()
        lastCache.readfp(open(MYCONF.lastCacheFile), "rb")
        return 0
    
    def writeCache():
        thisFuncName = ""
        try:
            thisFuncName = str(sys._getframe().f_code.co_name)
            lastCache.write(open(MYCONF.lastCacheFile, 'w'))
            return 0
        except:
            exceptmsg = StringIO.StringIO()
            traceback.print_exc(file=exceptmsg)
            msgTxt = exceptmsg.getvalue()
            logger.error(msgTxt)
        return 1
    5)  读/写失败记录
    def readFailRecord():
        global reportFail
        reportFail = ConfigParser.ConfigParser()
        if not os.path.isfile(MYCONF.lastReportFailFile):
            try:
                fd = open(MYCONF.lastReportFailFile, "w")
            except IOError, e:
                logger.error("I/O error({}): {}".format(e.errno, e.strerror))
                return 1
            else:
                fd.close()
        reportFail.readfp(open(MYCONF.lastReportFailFile), "rb")
        return 0
    
    def writeFailRecord():
        thisFuncName = ""
        try:
            thisFuncName = str(sys._getframe().f_code.co_name)
            reportFail.write(open(MYCONF.lastReportFailFile, 'w'))
            return 0
        except:
            exceptmsg = StringIO.StringIO()
            traceback.print_exc(file=exceptmsg)
            msgTxt = exceptmsg.getvalue()
            logger.error(msgTxt)
        return 1
    6) main函数
    def main():
        data["osType"] = 0 # 0表示 linux
        data["version"] = MYCONF.version # 当前程序的自定义版本号
        data["report_time"] = time.strftime("%Y-%m-%d %H:%M:00") #上报时间,由于目前基础监控是分钟级监控粒度,因此秒取 00
        
        initLog()
        logger.info('='*80)
        if 0 == checkLastPid() and 0 == readLastCache() and 0 == getLoginIp():
            readFailRecord() # 读取早迁采集周期上报失败,需要重传的数据
            reportScheduler(reportRecord=1) #从 fail record 中选取最近的一条信息上报给服务器
            if 0 == getRiakMeta():
                reportScheduler(reportRecord=0)
            writeFailRecord()
            writeCache()
        logger.info('='*80)
        logging.shutdown()
        return 
      3、添加/卸载监控1) 添加监控
    添加监控需要先添加集群(不支持先添加IP),添加集群会默认把所有IP都添加监控(前台将在clusterConf新增记录,并在singleConf增加对应的ip记录,然后调用调度平台,检测ip是否已经安装)如果该集群在CMDB里面新增Ip,则需要手动添加监控(前台提供新增监控节点,插入singleConf)
    2) 卸载监控
    (1) 卸载监控可以卸载整个集群的监控(将clusterConf needMonitor置0,同步将singleConf的needMonitor都置0,然后调用
    调度平台
    卸载集群下的所有机器,如果该ip存在其他集群并且需要监控,则不用调用
    调度平台
    卸载)也可以卸载单个节点的监控(前台将singleConf的needMonitor置0,调用
    调度平台
    ,同样判断ip是否存在其他集群) (2) 添加卸载监控部由前台调用
    调度平台
    接口,并修改数据库(插入数据或者更新need_monitor) (3) Single/cluster dislplay表会同步conf表的数据,只保留need_monitor=1的数据
      4、CMDB数据同步
    后台一直同步CMDB的数据和conf表的数据,如果不在CMDB的则需要删掉conf里面的数据,不管needMonitor的值为多少。删除三级业务的话只需要删除clusterConf表对应的记录,single会自动同步外键(尝试调用
    调度平台
    卸载接口,卸载掉被删除的三级业务ID下面的所有已安装监控的IP)
      5、前台展示1) 集群状态展示 2) 单机节点状态展示 

    相关内容推荐