Python 脚本部分实例:企业微信告警、超硬FTP 客户端、核个n和SSH 客户端、非常Saltstack 客户端、实用vCenter 客户端、用脚获取域名 ssl 证书过期时间、本实发送今天的超硬天气预报以及未来的天气趋势图; Shell 脚本部分实例:SVN 完整备份、Zabbix 监控用户密码过期、核个n和构建本地 YUM 以及上篇文章中有读者的非常需求(负载高时,查出占用比较高的实用进程脚本并存储或推送通知); 篇幅有些长,还请大家耐心翻到文末,用脚毕竟有彩蛋。本实 此脚本通过企业微信应用,超硬进行微信告警,核个n和可用于 Zabbix 监控。非常 # -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): 获取企业微信API接口的access_token :return: token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res[access_token] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + "/media/upload?access_token={ }&type=file".format(self._token) data = { "media": file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()[media_id] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "text", "agentid": agentid, "text": { "content": content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "image", "agentid": agentid, "image": { "media_id": media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: 通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。 # -*- coding: utf-8 -*- from ftplib import FTP from os import path import copy class FTPClient: def __init__(self, host, user, passwd, port=21): self.host = host self.user = user self.passwd = passwd self.port = port self.res = { status: True, msg: None} self._ftp = None self._login() def _login(self): 登录FTP服务器 :return: 连接或登录出现异常时返回错误信息 try: self._ftp = FTP() self._ftp.connect(self.host, self.port, timeout=30) self._ftp.login(self.user, self.passwd) except Exception as e: return e def upload(self, localpath, remotepath=None): 上传ftp文件 :param localpath: local file path :param remotepath: remote file path :return: if not localpath: return Please select a local file. # 读取本地文件 # fp = open(localpath, rb) # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件 if not remotepath: remotepath = path.basename(localpath) # 上传文件 self._ftp.storbinary(STOR + remotepath, localpath) # fp.close() def download(self, remotepath, localpath=None): localpath :param localpath: local file path :param remotepath: remote file path :return: if not remotepath: return Please select a remote file. # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件 if not localpath: localpath = path.basename(remotepath) # 如果localpath是目录的话就和remotepath的basename拼接 if path.isdir(localpath): localpath = path.join(localpath, path.basename(remotepath)) # 写入本地文件 fp = open(localpath, wb) # 下载文件 self._ftp.retrbinary(RETR + remotepath, fp.write) fp.close() def nlst(self, dir=/): 查看目录下的内容 :return: 以列表形式返回目录下的所有内容 files_list = self._ftp.nlst(dir) return files_list def rmd(self, dir=None): 删除目录 :param dir: 目录名称 :return: 执行结果 if not dir: return Please input dirname res = copy.deepcopy(self.res) try: del_d = self._ftp.rmd(dir) res[msg] = del_d except Exception as e: res[status] = False res[msg] = str(e) return res def mkd(self, dir=None): 创建目录 :param dir: 目录名称 :return: 执行结果 if not dir: return Please input dirname res = copy.deepcopy(self.res) try: mkd_d = self._ftp.mkd(dir) res[msg] = mkd_d except Exception as e: res[status] = False res[msg] = str(e) return res def del_file(self, filename=None): 删除文件 :param filename: 文件名称 :return: 执行结果 if not filename: return Please input filename res = copy.deepcopy(self.res) try: del_f = self._ftp.delete(filename) res[msg] = del_f except Exception as e: res[status] = False res[msg] = str(e) return res def get_file_size(self, filenames=[]): 获取文件大小,单位是字节 判断文件类型 :param filename: 文件名称 :return: 执行结果 if not filenames: return { msg: This is an empty directory} res_l = [] for file in filenames: res_d = { } # 如果是目录或者文件不存在就会报错 try: size = self._ftp.size(file) type = f except: # 如果是云服务器提供商路径的话size显示 - , file末尾加/ (/dir/) size = - type = d file = file + / res_d[filename] = file res_d[size] = size res_d[type] = type res_l.append(res_d) return res_l def rename(self, old_name=None, new_name=None): 重命名 :param old_name: 旧的文件或者目录名称 :param new_name: 新的文件或者目录名称 :return: 执行结果 if not old_name or not new_name: return Please input old_name and new_name res = copy.deepcopy(self.res) try: rename_f = self._ftp.rename(old_name, new_name) res[msg] = rename_f except Exception as e: res[status] = False res[msg] = str(e) return res def close(self): 退出ftp连接 :return: try: # 向服务器发送quit命令 self._ftp.quit() except Exception: return No response from server finally: # 客户端单方面关闭连接 此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。 # -*- coding: utf-8 -*- import paramiko class SSHClient: def __init__(self, host, port, user, pkey): self.ssh_host = host self.ssh_port = port self.ssh_user = user self.private_key = paramiko.RSAKey.from_private_key_file(pkey) self.ssh = None self._connect() def _connect(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10) except: return ssh connect fail def execute_command(self, command): stdin, stdout, stderr = self.ssh.exec_command(command) out = stdout.read() err = stderr.read() return out, err def close(self): 通过 api 对 Saltstack 服务端进行操作,执行命令。 #!/usr/bin/env python # -*- coding:utf-8 -*- import requests import json import copy class SaltApi: """ 定义salt api接口的类 初始化获得token """ def __init__(self): self.url = "http://172.85.10.21:8000/" self.username = "saltapi" self.password = "saltapi" self.headers = { "Content-type": "application/json"} self.params = { client: local, fun: None, tgt: None, arg: None} self.login_url = self.url + "login" self.login_params = { username: self.username, password: self.password, eauth: pam} self.token = self.get_data(self.login_url, self.login_params)[token] self.headers[X-Auth-Token] = self.token def get_data(self, url, params): 请求url获取数据 :param url: 请求的url地址 :param params: 传递给url的参数 :return: 请求的结果 send_data = json.dumps(params) request = requests.post(url, data=send_data, headers=self.headers) response = request.json() result = dict(response) return result[return][0] def get_auth_keys(self): 获取所有已经认证的key :return: data = copy.deepcopy(self.params) data[client] = wheel data[fun] = key.list_all result = self.get_data(self.url, data) try: return result[data][return][minions] except Exception as e: return str(e) def get_grains(self, tgt, arg=id): """ 获取系统基础信息 :tgt: 目标主机 :return: """ data = copy.deepcopy(self.params) if tgt: data[tgt] = tgt else: data[tgt] = data[fun] = grains.item data[arg] = arg result = self.get_data(self.url, data) return result def execute_command(self, tgt, fun=cmd.run, arg=None, tgt_type=list, salt_async=False): """ 执行saltstack 模块命令,类似于salt * cmd.run command :param tgt: 目标主机 :param fun: 模块方法 可为空 :param arg: 传递参数 可为空 :return: 执行结果 """ data = copy.deepcopy(self.params) if not tgt: return { status: False, msg: target host not exist} if not arg: data.pop(arg) else: data[arg] = arg if tgt != *: data[tgt_type] = tgt_type if salt_async: data[client] = local_async data[fun] = fun data[tgt] = tgt result = self.get_data(self.url, data) return result def jobs(self, fun=detail, jid=None): """ 任务 :param fun: active, detail :param jod: Job ID :return: 任务执行结果 """ data = { client: runner} data[fun] = fun if fun == detail: if not jid: return { success: False, msg: job id is none} data[fun] = jobs.lookup_jid data[jid] = jid else: return { success: False, msg: fun is active or detail} result = self.get_data(self.url, data) 通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。 from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL from pyVmomi import vim from asset import models import atexit class Vmware: def __init__(self, ip, user, password, port, idc, vcenter_id): self.ip = ip self.user = user self.password = password self.port = port self.idc_id = idc self.vcenter_id = vcenter_id def get_obj(self, content, vimtype, name=None): 列表返回,name 可以指定匹配的对象 container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True) obj = [ view for view in container.view ] return obj def get_esxi_info(self): # 宿主机信息 esxi_host = { } res = { "connect_status": True, "msg": None} try: # connect this thing si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60) except Exception as e: res[connect_status] = False try: res[msg] = ("%s Caught vmodl fault : " + e.msg) % (self.ip) except Exception as e: res[msg] = %s: connection error % (self.ip) return res # disconnect this thing atexit.register(Disconnect, si) content = si.RetrieveContent() esxi_obj = self.get_obj(content, [vim.HostSystem]) for esxi in esxi_obj: esxi_host[esxi.name] = { } esxi_host[esxi.name][idc_id] = self.idc_id esxi_host[esxi.name][vcenter_id] = self.vcenter_id esxi_host[esxi.name][server_ip] = esxi.name esxi_host[esxi.name][manufacturer] = esxi.summary.hardware.vendor esxi_host[esxi.name][server_model] = esxi.summary.hardware.model for i in esxi.summary.hardware.otherIdentifyingInfo: if isinstance(i, vim.host.SystemIdentificationInfo): esxi_host[esxi.name][server_sn] = i.identifierValue # 系统名称 esxi_host[esxi.name][system_name] = esxi.summary.config.product.fullName # cpu总核数 esxi_cpu_total = esxi.summary.hardware.numCpuThreads # 内存总量 GB esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024 # 获取硬盘总量 GB esxi_disk_total = 0 for ds in esxi.datastore: esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024 # 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机 default_configure = { cpu: 4, memory: 8, disk: 100 } esxi_host[esxi.name][vm_host] = [] vm_usage_total_cpu = 0 vm_usage_total_memory = 0 vm_usage_total_disk = 0 # 虚拟机信息 for vm in esxi.vm: host_info = { } host_info[vm_name] = vm.name host_info[power_status] = vm.runtime.powerState host_info[cpu_total_kernel] = str(vm.config.hardware.numCPU) + 核 host_info[memory_total] = str(vm.config.hardware.memoryMB) + MB host_info[system_info] = vm.config.guestFullName disk_info = disk_total = 0 for d in vm.config.hardware.device: if isinstance(d, vim.vm.device.VirtualDisk): disk_total += d.capacityInKB / 1024 / 1024 disk_info += d.deviceInfo.label + ": " + str((d.capacityInKB) / 1024 / 1024) + GB + , host_info[disk_info] = disk_info esxi_host[esxi.name][vm_host].append(host_info) # 计算当前宿主机可用容量:总量 - 已分配的 if host_info[power_status] == poweredOn: vm_usage_total_cpu += vm.config.hardware.numCPU vm_usage_total_disk += disk_total vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024) esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu esxi_memory_free = esxi_memory_total - vm_usage_total_memory esxi_disk_free = esxi_disk_total - vm_usage_total_disk esxi_host[esxi.name][cpu_info] = Total: %d核, Free: %d核 % (esxi_cpu_total, esxi_cpu_free) esxi_host[esxi.name][memory_info] = Total: %dGB, Free: %dGB % (esxi_memory_total, esxi_memory_free) esxi_host[esxi.name][disk_info] = Total: %dGB, Free: %dGB % (esxi_disk_total, esxi_disk_free) # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源 if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100: free_allocation_vm_host = 0 else: free_allocation_vm_host = int(min( [ esxi_cpu_free / default_configure[cpu], esxi_memory_free / default_configure[memory], esxi_disk_free / default_configure[disk] ] )) esxi_host[esxi.name][free_allocation_vm_host] = free_allocation_vm_host esxi_host[connect_status] = True return esxi_host def write_to_db(self): esxi_host = self.get_esxi_info() # 连接失败 if not esxi_host[connect_status]: return esxi_host del esxi_host[connect_status] for machine_ip in esxi_host: # 物理机信息 esxi_host_dict = esxi_host[machine_ip] # 虚拟机信息 virtual_host = esxi_host[machine_ip][vm_host] del esxi_host[machine_ip][vm_host] obj = models.EsxiHost.objects.create(**esxi_host_dict) obj.save() for host_info in virtual_host: host_info[management_host_id] = obj.id obj2 = models.virtualHost.objects.create(**host_info) 用于 zabbix 告警 import re import sys import time import subprocess from datetime import datetime from io import StringIO def main(domain): f = StringIO() comm = f"curl -Ivs https://{ domain} --connect-timeout 10" result = subprocess.getstatusoutput(comm) f.write(result[1]) try: m = re.search(start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n, f.getvalue(), re.S) start_date = m.group(1) expire_date = m.group(2) common_name = m.group(3) issuer = m.group(4) except Exception as e: return 999999999 # time 字符串转时间数组 start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT") start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date) # datetime 字符串转时间数组 expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT") expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S") # 剩余天数 remaining = (expire_date-datetime.now()).days return remaining if __name__ == "__main__": domain = sys.argv[1] remaining_days = main(domain) 此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是服务器托管通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。 # -*- coding: utf-8 -*- import requests import json import datetime def weather(city): url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city try: data = requests.get(url).json()[data] city = data[city] ganmao = data[ganmao] today_weather = data[forecast][0] res = "老婆今天是{ }\n今天天气概况\n城市: { :<10}\n时间: { :<10}\n高温: { :<10}\n低温: { :<10}\n风力: { :<10}\n风向: { :<10}\n天气: { :<10}\n\n稍后会发送近期温度趋势图,请注意查看。\ ".format( ganmao, city, datetime.datetime.now().strftime(%Y-%m-%d), today_weather[high].split()[1], today_weather[low].split()[1], today_weather[fengli].split([)[2].split(])[0], today_weather[fengxiang],today_weather[type], ) return { "source_data": data, "res": res} except Exception as e: return str(e) ``` + 获取天气预报趋势图 ```python # -*- coding: utf-8 -*- import matplotlib.pyplot as plt import re import datetime def Future_weather_states(forecast, save_path, day_num=5): 展示未来的天气预报趋势图 :param forecast: 天气预报预测的数据 :param day_num: 未来几天 :return: 趋势图 future_forecast = forecast dict={ } for i in range(day_num): data = [] date = future_forecast[i]["date"] date = int(re.findall("\d+",date)[0]) data.append(int(re.findall("\d+", future_forecast[i]["high"])[0])) data.append(int(re.findall("\d+", future_forecast[i]["low"])[0])) data.append(future_forecast[i]["type"]) dict[date] = data data_list = sorted(dict.items()) date=[] high_temperature = [] low_temperature = [] for each in data_list: date.append(each[0]) high_temperature.append(each[1][0]) low_temperature.append(each[1][1]) fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b") current_date = datetime.datetime.now().strftime(%Y-%m) plt.rcParams[font.sans-serif] = [SimHei] plt.rcParams[axes.unicode_minus] = False plt.xlabel(current_date) plt.ylabel("℃") plt.legend(["高温", "低温"]) plt.xticks(date) plt.title("最近几天温度变化趋势") plt.savefig(save_path) ``` + 发送到企业微信 ```python # -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): 获取企业微信API接口的access_token :return: token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res[access_token] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + "/media/upload?access_token={ }&type=file".format(self._token) data = { "media": file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()[media_id] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "text", "agentid": agentid, "text": { "content": content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "image", "agentid": agentid, "image": { "media_id": media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) + main脚本 # -*- coding: utf-8 -*- from plugins.weather_forecast import weather from plugins.trend_chart import Future_weather_states from plugins.send_wechat import DLF import os # 企业微信相关信息 corpid = "xxx" corpsecret = "xxx" agentid = "xxx" # 天气预报趋势图保存路径 _path = os.path.dirname(os.path.abspath(__file__)) save_path = os.path.join(_path ,./tmp/weather_forecast.jpg) # 获取天气预报信息 content = weather("大兴") # 发送文字消息 dlf = DLF(corpid, corpsecret) dlf.send_text(agentid=agentid, content=content[res], toparty=1) # 生成天气预报趋势图 Future_weather_states(content[source_data][forecast], save_path) # 发送图片消息 file_obj = open(save_path, rb) 通过 hotcopy 进行 SVN 完整备份,备份保留 7 天。 #!/bin/bash # Filename : svn_backup_repos.sh # Date : 2020/12/14 # Author : JakeTian # Email : JakeTian@***.com # Crontab : 59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1 # Notes : 将脚本加入crontab中,每天定时执行 # Description: SVN完全备份 set -e SRC_PATH="/opt/svndata" DST_PATH="/data/svnbackup" LOG_FILE="$DST_PATH/logs/svn_backup.log" SVN_BACKUP_C="/bin/svnadmin hotcopy" SVN_LOOK_C="/bin/svnlook youngest" TODAY=$(date +%F) cd $SRC_PATH ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name httpd -a ! -name bak | tr -d ./) # 创建备份目录,备份脚本日志目录 test -d $DST_PATH || mkdir -p $DST_PATH test -d $DST_PATH/logs || mkdir $DST_PATH/logs test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY # 备份repos文件 for repo in $ALL_REPOS do $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo # 判断备份是否完成 if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then echo "$TODAY: $repo Backup Success" >> $LOG_FILE else echo "$TODAY: $repo Backup Fail" >> $LOG_FILE fi done # # 备份用户密码文件和权限文件 cp -p authz access.conf $DST_PATH/$TODAY # 日志文件转储 mv $LOG_FILE $LOG_FILE-$TODAY # 删除七天前的备份 seven_days_ago=$(date -d "7 days ago" +%F) 用于 Zabbix 监控 Linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。 #!/bin/bash diskarray=(`awk -F: $NF ~ /\/bin\/bash/||/\/bin\/sh/{ print $1} /etc/passwd`) length=${ #diskarray[@]} printf "{ \n" printf \t"\"data\":[" for ((i=0;i<$length;i++)) do printf \n\t\t{ printf "\"{ #USER_NAME}\":\"${ diskarray[$i]}\"}" if [ $i -lt $[$length-1] ];then printf , fi done printf "\n\t]\n" printf "}\n" 检查用户密码过期 #!/bin/bash export LANG=en_US.UTF-8 SEVEN_DAYS_AGO=$(date -d -7 day +%s) user="$1" # 将Sep 09, 2018格式的时间转换成unix时间 expires_date=$(sudo chage -l $user | awk -F: /Password expires/{ print $NF} | sed -n s/^ //p) if [[ "$expires_date" != "never" ]];then expires_date=$(date -d "$expires_date" +%s) if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then echo "1" else echo "0" fi else echo "0" 通过 rsync 的方式同步 yum,通过 nginx 只做 http yum 站点; 但是 centos6 的镜像最近都不能用了,国内貌似都禁用了,如果找到合适的自行更换地址。 #!/bin/bash # 更新yum镜像 RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000" DIR="/app/yumData" LogDir="$DIR/logs" Centos6Base="$DIR/Centos6/x86_64/Base" Centos7Base="$DIR/Centos7/x86_64/Base" Centos6Epel="$DIR/Centos6/x86_64/Epel" Centos7Epel="$DIR/Centos7/x86_64/Epel" Centos6Salt="$DIR/Centos6/x86_64/Salt" Centos7Salt="$DIR/Centos7/x86_64/Salt" Centos6Update="$DIR/Centos6/x86_64/Update" Centos7Update="$DIR/Centos7/x86_64/Update" Centos6Docker="$DIR/Centos6/x86_64/Docker" Centos7Docker="$DIR/Centos7/x86_64/Docker" Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7" Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7" Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0" Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0" MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn" # 目录不存在就创建 check_dir(){ for dir in $ do test -d $dir || mkdir -p $dir done } # 检查rsync同步结果 check_rsync_status(){ if [ $? -eq 0 ];then echo "rsync success" >> $1 else echo "rsync fail" >> $1 fi } check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0 # Base yumrepo #$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1 # check_rsync_status "$LogDir/centos6Base.log" $RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1 check_rsync_status "$LogDir/centos7Base.log" # Epel yumrepo # $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1 # check_rsync_status "$LogDir/centos6Epel.log" $RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1 check_rsync_status "$LogDir/centos7Epel.log" # SaltStack yumrepo # $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1 # ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest # check_rsync_status "$LogDir/centos6Salt.log" $RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1 check_rsync_status "$LogDir/centos7Salt.log" # ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest # Docker yumrepo $RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1 check_rsync_status "$LogDir/centos7Docker.log" # centos update yumrepo # $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1 # check_rsync_status "$LogDir/centos6Update.log" $RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1 check_rsync_status "$LogDir/centos7Update.log" # mysql 5.7 yumrepo # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1 # check_rsync_status "$LogDir/centos6Mysql5.7.log" $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1 check_rsync_status "$LogDir/centos7Mysql5.7.log" # mysql 8.0 yumrepo # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1 # check_rsync_status "$LogDir/centos6Mysql8.0.log" $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1 负载高时,查出占用比较高的进程脚本并存储或推送通知 这部分内容是上篇 Shell 脚本实例中底部读者留言的需求,站群服务器如下: #!/bin/bash # 物理cpu个数 physical_cpu_count=$(egrep physical id /proc/cpuinfo | sort | uniq | wc -l) # 单个物理cpu核数 physical_cpu_cores=$(egrep cpu cores /proc/cpuinfo | uniq | awk { print $NF}) # 总核数 total_cpu_cores=$((physical_cpu_count*physical_cpu_cores)) # 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发 one_min_load_threshold="$total_cpu_cores" five_min_load_threshold=$(awk BEGIN { print "$total_cpu_cores" * "0.8"}) fifteen_min_load_threshold=$(awk BEGIN { print "$total_cpu_cores" * "0.7"}) # 分别是分钟、五分钟、十五分钟负载平均值 one_min_load=$(uptime | awk { print $(NF-2)} | tr -d ,) five_min_load=$(uptime | awk { print $(NF-1)} | tr -d ,) fifteen_min_load=$(uptime | awk { print $NF} | tr -d ,) # 获取当前cpu 内存 磁盘io信息,并写入日志文件 # 如果需要发送消息或者调用其他,请自行编写函数即可 get_info(){ log_dir="cpu_high_script_log" test -d "$log_dir" || mkdir "$log_dir" ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log iostat -dx 1 10 > "$log_dir"/disk_io_10.log } export -f get_info echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | \ 以上,就是今天分享的全部内容了。 希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。Python 脚本部分
企业微信告警Shell 脚本部分
SVN 完整备份读者需求解答