精华内容
下载资源
问答
  • Python 编写的一个Monkey脚本例子
  • Python写Linux脚本实例(二)

    千次阅读 2020-04-03 16:16:55
    kafka报警脚本 写在前面的话 本人萌新,发文章一是为了...1.python如何获取shell命令返回的结果 之前提到过os.system可以直接执行shell命令,但是无法获取到结果。 方法一 result = os.popen('ps -aux') res = res...

    kafka报警脚本

    写在前面的话

    本人萌新,发文章一是为了记录自己写的东西,二是为了学习,同时希望能够帮助别人。语法什么的自己觉得也很low,大佬们不喜勿喷。

    遇到的问题

    1.python如何获取shell命令返回的结果

    之前提到过os.system可以直接执行shell命令,但是无法获取到结果。

    方法一
    result = os.popen('ps -aux')  
          res = result.read()  
          for line in res.splitlines():  
                  print line
    

    使用os.popen可以获取到结果,我使用的是这种方法。

    方法二

    使用commands,不过这种方法在python3中被舍弃了,使用subprocess替代

    import commands
    
    result = commands.getoutput('cmd')   #只返回执行的结果, 忽略返回值.
    result = commands.getstatus('cmd')   #返回ls -ld file执行的结果.
    result = commands.getstatusoutput('cmd') 
    #用os.popen()执行命令cmd, 然后返回两个元素的元组(status, result). cmd执行的方式是{ cmd ; }2>&1, 这样返回结果里面就会包含标准输出和标准错误.
    
    方法三

    使用内置模块subprocess
    详细的解释点这里

    # /usr/bin/python
    # -*- coding: UTF-8 -*-
    import subprocess
    
    group_list = subprocess.Popen("ip a", shell=True)
    print(group_list.stdout.read())
    
    

    在这里插入图片描述
    在使用subprocess.run和subprocess.call的时候提示我这个模块没有这个元素。。。求大佬帮助
    在这里插入图片描述

    2.python怎么进行队列相加(没解决)求大佬帮助

    考虑到了分片,要将所有的加在一起。shell的返回结果是这样的,我需要将第六列相加。由于不会(尴尬,不会你写什么文章)。我直接使用shell的awk获取的最后的值。求大佬指点。。。怎么用python写。

    awk '{#print$6}' | awk '{number+=$1}END{#print number}'
    

    在这里插入图片描述

    3.拼接字符串

    累死了,自己试吧(嘿嘿)

    最终脚本

    # /usr/bin/python
    # -*- coding: UTF-8 -*-
    import os
    
    CURL_ADDRESS = 'https://qyapi.weixin.qq.com/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    THRESHOLD = 100
    list_1 = []
    list_2 = []
    
    def main():
        res = result()
        get_result_number(res)
        
    def result():
        result = os.popen("/opt/kafka/default/bin/kafka-consumer-groups-scram.sh --list --bootstrap-server a02.firedata.hadoop.datanode:9092 --command-config /opt/kafka/default/config/consumer-scram.properties | grep -v '^anonymous'")
        res = result.read()
        return res
    
    def get_group_number(number):
        group_number = os.popen(
            "/opt/kafka/default/bin/kafka-consumer-groups-scram.sh  --describe --bootstrap-server a02.firedata.hadoop.datanode:9092 --group" + " " + number + " " + "--command-config /opt/kafka/default/config/consumer-scram.properties | awk '{#print$6}' | awk '{number+=$1}END{#print number}'")
        return group_number
    
    def get_markdown_content_1(number_1):
        markdown_content_1 = ">服务名称:""<font color=\\\"info\\\">" + str(number_1) + "</font>\\n>未处理消息数:<font color=\\\"warning\\\">" + str(x) + "</font>\\n"
        return markdown_content_1
    
    def curl_(markdown_content):
        os.system("curl" " " + CURL_ADDRESS + " ""-H 'Content-Type: application/json' -d '{\"msgtype\": \"markdown\",\"markdown\": {\"content\": \"kafka未处理队列超出预定值,请注意。\n" + markdown_content + "\"'")
    
    def get_result_number(res):
        markdown_content = ''
        for i in res.splitlines():
            list_1.append(i)
        j = len(list_1)
    
        for k in range(0, j):
            number = list_1[k]
            group_number = get_group_number(number)
            group_number_all = group_number.read()
    
            for x in group_number_all.splitlines():
                x = int(x)
    
            if x >= threshold:
                list_2.append(x)
                number_1 = list_1[k]
                markdown_content_1 = get_markdown_content_1(number_1)
                markdown_content = str(markdown_content) + str(markdown_content_1)
    
            if len(list_2) > 9:
                curl_(markdown_content)
                markdown_content = ''
                list_2 = []
        if len(list_2) > 0:
            curl_(markdown_content)
    
    if __name__ == '__main__':
        main()
    

    脚本实现效果

    在这里插入图片描述

    展开全文
  • 11 个非常实用的 PythonShell 拿来就用脚本实例! 转载请联系授权(微信ID:Hc220088) 关注公众号:「杰哥的IT之旅」后台回复:「脚本合集」可获取本文全部脚本实例文件 关注公众号:「杰哥的IT之旅」后台回复...

    作者:养乐多 编辑:JackTian
    来源:公众号「杰哥的IT之旅」
    ID:Jake_Internet
    原文链接:超硬核!11 个非常实用的 Python 和 Shell 拿来就用脚本实例!
    转载请联系授权(微信ID:Hc220088)

    关注公众号:「杰哥的IT之旅」后台回复:「脚本合集」可获取本文全部脚本实例文件
    关注公众号:「杰哥的IT之旅」后台回复:「wx」 可邀请你加入读者交流群

    大家好,我是JackTian。

    在上一篇分享的原创文章《7 个非常实用的 Shell 拿来就用脚本实例!》中,从这篇文章的阅读、点赞、在看、留言的数据来看,非常受读者欢迎。不得不说,脚本在我们的日常工作中可以提高很大的工作效率,的确很香!

    这次再来给大家分享一波我工作中用到的几个脚本,主要分为:PythonShell两个部分。

    Python 脚本部分实例: 企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图;

    Shell 脚本部分实例: SVN 完整备份、Zabbix 监控用户密码过期、构建本地 YUM 以及上篇文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知);

    篇幅有些长,还请大家耐心翻到文末,毕竟有彩蛋。

    Python 脚本部分

    企业微信告警

    此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。

    # -*- coding: utf-8 -*-
    
    
    import requests
    import json
    
    
    class DLF:
        def __init__(self, corpid, corpsecret):
            self.url = "https://qyapi.weixin.qq.com/cgi-bin"
            self.corpid = corpid
            self.corpsecret = corpsecret
            self._token = self._get_token()
    
        def _get_token(self):
            '''
            获取企业微信API接口的access_token
            :return:
            '''
            token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
            try:
                res = requests.get(token_url).json()
                token = res['access_token']
                return token
            except Exception as e:
                return str(e)
    
        def _get_media_id(self, file_obj):
            get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
            data = {"media": file_obj}
    
            try:
                res = requests.post(url=get_media_url, files=data)
                media_id = res.json()['media_id']
                return media_id
            except Exception as e:
                return str(e)
    
        def send_text(self, agentid, content, touser=None, toparty=None):
            send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
            send_data = {
                "touser": touser,
                "toparty": toparty,
                "msgtype": "text",
                "agentid": agentid,
                "text": {
                    "content": content
                }
            }
    
            try:
                res = requests.post(send_msg_url, data=json.dumps(send_data))
            except Exception as e:
                return str(e)
    
        def send_image(self, agentid, file_obj, touser=None, toparty=None):
            media_id = self._get_media_id(file_obj)
            send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
            send_data = {
                "touser": touser,
                "toparty": toparty,
                "msgtype": "image",
                "agentid": agentid,
                "image": {
                    "media_id": media_id
               }
            }
    
            try:
                res = requests.post(send_msg_url, data=json.dumps(send_data))
            except Exception as e:
                return str(e)
    

    FTP 客户端

    通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。

    # -*- coding: utf-8 -*-
    
    from ftplib import FTP
    from os import path
    import copy
    
    
    class FTPClient:
        def __init__(self, host, user, passwd, port=21):
            self.host = host
            self.user = user
            self.passwd = passwd
            self.port = port
            self.res = {'status': True, 'msg': None}
            self._ftp = None
            self._login()
    
        def _login(self):
            '''
            登录FTP服务器
            :return: 连接或登录出现异常时返回错误信息
            '''
            try:
                self._ftp = FTP()
                self._ftp.connect(self.host, self.port, timeout=30)
                self._ftp.login(self.user, self.passwd)
            except Exception as e:
                return e
    
        def upload(self, localpath, remotepath=None):
            '''
            上传ftp文件
            :param localpath: local file path
            :param remotepath: remote file path
            :return:
            '''
            if not localpath: return 'Please select a local file. '
            # 读取本地文件
            # fp = open(localpath, 'rb')
    
            # 如果未传递远程文件路径,则上传到当前目录,文件名称同本地文件
            if not remotepath:
                remotepath = path.basename(localpath)
    
            # 上传文件
            self._ftp.storbinary('STOR ' + remotepath, localpath)
            # fp.close()
    
        def download(self, remotepath, localpath=None):
            '''
            localpath
            :param localpath: local file path
            :param remotepath: remote file path
            :return:
            '''
    
            if not remotepath: return 'Please select a remote file. '
            # 如果未传递本地文件路径,则下载到当前目录,文件名称同远程文件
            if not localpath:
                localpath = path.basename(remotepath)
            # 如果localpath是目录的话就和remotepath的basename拼接
            if path.isdir(localpath):
                localpath = path.join(localpath, path.basename(remotepath))
    
            # 写入本地文件
            fp = open(localpath, 'wb')
    
            # 下载文件
            self._ftp.retrbinary('RETR ' + remotepath, fp.write)
            fp.close()
    
        def nlst(self, dir='/'):
            '''
            查看目录下的内容
            :return: 以列表形式返回目录下的所有内容
            '''
            files_list = self._ftp.nlst(dir)
            return files_list
    
        def rmd(self, dir=None):
            '''
            删除目录
            :param dir: 目录名称
            :return: 执行结果
            '''
            if not dir: return 'Please input dirname'
            res = copy.deepcopy(self.res)
            try:
                del_d = self._ftp.rmd(dir)
                res['msg'] = del_d
            except Exception as e:
                res['status'] = False
                res['msg'] = str(e)
    
            return res
    
        def mkd(self, dir=None):
            '''
            创建目录
            :param dir: 目录名称
            :return: 执行结果
            '''
            if not dir: return 'Please input dirname'
            res = copy.deepcopy(self.res)
            try:
                mkd_d = self._ftp.mkd(dir)
                res['msg'] = mkd_d
            except Exception as e:
                res['status'] = False
                res['msg'] = str(e)
    
            return res
    
        def del_file(self, filename=None):
            '''
            删除文件
            :param filename: 文件名称
            :return: 执行结果
            '''
            if not filename: return 'Please input filename'
            res = copy.deepcopy(self.res)
            try:
                del_f = self._ftp.delete(filename)
                res['msg'] = del_f
            except Exception as e:
                res['status'] = False
                res['msg'] = str(e)
    
            return res
    
        def get_file_size(self, filenames=[]):
            '''
            获取文件大小,单位是字节
            判断文件类型
            :param filename: 文件名称
            :return: 执行结果
            '''
            if not filenames: return {'msg': 'This is an empty directory'}
            res_l = []
            for file in filenames:
                res_d = {}
                # 如果是目录或者文件不存在就会报错
                try:
                    size = self._ftp.size(file)
                    type = 'f'
                except:
                    # 如果是路径的话size显示 - , file末尾加/ (/dir/)
                    size = '-'
                    type = 'd'
                    file = file + '/'
    
                res_d['filename'] = file
                res_d['size'] = size
                res_d['type'] = type
                res_l.append(res_d)
    
            return res_l
    
        def rename(self, old_name=None, new_name=None):
            '''
            重命名
            :param old_name: 旧的文件或者目录名称
            :param new_name: 新的文件或者目录名称
            :return: 执行结果
            '''
            if not old_name or not new_name: return 'Please input old_name and new_name'
            res = copy.deepcopy(self.res)
            try:
                rename_f = self._ftp.rename(old_name, new_name)
                res['msg'] = rename_f
            except Exception as e:
                res['status'] = False
                res['msg'] = str(e)
    
            return res
    
        def close(self):
            '''
            退出ftp连接
            :return:
            '''
            try:
                # 向服务器发送quit命令
                self._ftp.quit()
            except Exception:
                return 'No response from server'
            finally:
                # 客户端单方面关闭连接
                self._ftp.close()
    

    SSH 客户端

    此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可。

    # -*- coding: utf-8 -*-
    
    import paramiko
    
    class SSHClient:
        def __init__(self, host, port, user, pkey):
            self.ssh_host = host
            self.ssh_port = port
            self.ssh_user = user
            self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
            self.ssh = None
            self._connect()
    
        def _connect(self):
            self.ssh = paramiko.SSHClient()
            self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            try:
                self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
            except:
                return 'ssh connect fail'
    
        def execute_command(self, command):
            stdin, stdout, stderr = self.ssh.exec_command(command)
            out = stdout.read()
            err = stderr.read()
            return out, err
    
        def close(self):
            self.ssh.close()
    

    Saltstack 客户端

    通过 api 对 Saltstack 服务端进行操作,执行命令。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    
    import requests
    import json
    import copy
    
    
    class SaltApi:
        """
        定义salt api接口的类
        初始化获得token
        """
        def __init__(self):
            self.url = "http://172.85.10.21:8000/"
            self.username = "saltapi"
            self.password = "saltapi"
            self.headers = {"Content-type": "application/json"}
            self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
            self.login_url = self.url + "login"
            self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
            self.token = self.get_data(self.login_url, self.login_params)['token']
            self.headers['X-Auth-Token'] = self.token
    
        def get_data(self, url, params):
            '''
            请求url获取数据
            :param url: 请求的url地址
            :param params: 传递给url的参数
            :return: 请求的结果
            '''
            send_data = json.dumps(params)
            request = requests.post(url, data=send_data, headers=self.headers)
            response = request.json()
            result = dict(response)
            return result['return'][0]
    
        def get_auth_keys(self):
            '''
            获取所有已经认证的key
            :return:
            '''
            data = copy.deepcopy(self.params)
            data['client'] = 'wheel'
            data['fun'] = 'key.list_all'
            result = self.get_data(self.url, data)
            try:
                return result['data']['return']['minions']
            except Exception as e:
                return str(e)
    
        def get_grains(self, tgt, arg='id'):
            """
            获取系统基础信息
            :tgt: 目标主机
            :return:
            """
            data = copy.deepcopy(self.params)
            if tgt:
                data['tgt'] = tgt
            else:
                data['tgt'] = '*'
            data['fun'] = 'grains.item'
            data['arg'] = arg
            result = self.get_data(self.url, data)
    
            return result
    
    
        def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
            """
            执行saltstack 模块命令,类似于salt '*' cmd.run 'command'
            :param tgt: 目标主机
            :param fun: 模块方法 可为空
            :param arg: 传递参数 可为空
            :return: 执行结果
            """
            data = copy.deepcopy(self.params)
    
            if not tgt: return {'status': False, 'msg': 'target host not exist'}
            if not arg:
                data.pop('arg')
            else:
                data['arg'] = arg
            if tgt != '*':
                data['tgt_type'] = tgt_type
            if salt_async: data['client'] = 'local_async'
            data['fun'] = fun
            data['tgt'] = tgt
            result = self.get_data(self.url, data)
    
            return result
    
    
        def jobs(self, fun='detail', jid=None):
            """
            任务
            :param fun: active, detail
            :param jod: Job ID
            :return: 任务执行结果
            """
    
            data = {'client': 'runner'}
            data['fun'] = fun
            if fun == 'detail':
                if not jid: return {'success': False, 'msg': 'job id is none'}
                data['fun'] = 'jobs.lookup_jid'
                data['jid'] = jid
            else:
                return {'success': False, 'msg': 'fun is active or detail'}
            result = self.get_data(self.url, data)
    
            return result
    

    更多关于 Saltstack 相关的原创文章可参考:

    vCenter 客户端

    通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库。

    from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
    from pyVmomi import vim
    from asset import models
    import atexit
    
    
    class Vmware:
        def __init__(self, ip, user, password, port, idc, vcenter_id):
            self.ip = ip
            self.user = user
            self.password = password
            self.port = port
            self.idc_id = idc
            self.vcenter_id = vcenter_id
    
        def get_obj(self, content, vimtype, name=None):
            '''
            列表返回,name 可以指定匹配的对象
            '''
            container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
            obj = [ view for view in container.view ]
            return obj
    
        def get_esxi_info(self):
            # 宿主机信息
            esxi_host = {}
            res = {"connect_status": True, "msg": None}
    
            try:
                # connect this thing
                si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
            except Exception as e:
                res['connect_status'] = False
                try:
                    res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
                except Exception as e:
                    res['msg'] = '%s: connection error' % (self.ip)
                return res
            # disconnect this thing
            atexit.register(Disconnect, si)
            content = si.RetrieveContent()
            esxi_obj = self.get_obj(content, [vim.HostSystem])
    
            for esxi in esxi_obj:
                esxi_host[esxi.name] = {}
                esxi_host[esxi.name]['idc_id'] = self.idc_id
                esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
                esxi_host[esxi.name]['server_ip'] = esxi.name
                esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
                esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model
    
                for i in esxi.summary.hardware.otherIdentifyingInfo:
                    if isinstance(i, vim.host.SystemIdentificationInfo):
                        esxi_host[esxi.name]['server_sn'] = i.identifierValue
    
                # 系统名称
                esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
                # cpu总核数
                esxi_cpu_total = esxi.summary.hardware.numCpuThreads
                # 内存总量 GB
                esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024
    
                # 获取硬盘总量 GB
                esxi_disk_total = 0
                for ds in esxi.datastore:
                    esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024
    
                # 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机
                default_configure = {
                    'cpu': 4,
                    'memory': 8,
                    'disk': 100
                }
    
                esxi_host[esxi.name]['vm_host'] = []
                vm_usage_total_cpu = 0
                vm_usage_total_memory = 0
                vm_usage_total_disk = 0
    
                # 虚拟机信息
                for vm in esxi.vm:
                    host_info = {}
                    host_info['vm_name'] = vm.name
                    host_info['power_status'] = vm.runtime.powerState
                    host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核'
                    host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
                    host_info['system_info'] = vm.config.guestFullName
    
                    disk_info = ''
                    disk_total = 0
                    for d in vm.config.hardware.device:
                        if isinstance(d, vim.vm.device.VirtualDisk):
                            disk_total += d.capacityInKB / 1024 / 1024
                            disk_info += d.deviceInfo.label + ": " +  str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','
    
                    host_info['disk_info'] = disk_info
                    esxi_host[esxi.name]['vm_host'].append(host_info)
    
                    # 计算当前宿主机可用容量:总量 - 已分配的
                    if host_info['power_status'] == 'poweredOn':
                        vm_usage_total_cpu += vm.config.hardware.numCPU
                        vm_usage_total_disk += disk_total
                        vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
    
                esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
                esxi_memory_free = esxi_memory_total - vm_usage_total_memory
                esxi_disk_free = esxi_disk_total - vm_usage_total_disk
    
                esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free)
                esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
                esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)
    
                # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源
                if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
                    free_allocation_vm_host = 0
                else:
                    free_allocation_vm_host = int(min(
                        [
                            esxi_cpu_free / default_configure['cpu'],
                            esxi_memory_free / default_configure['memory'],
                            esxi_disk_free / default_configure['disk']
                        ]
                    ))
                esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
            esxi_host['connect_status'] = True
            return esxi_host
    
        def write_to_db(self):
            esxi_host = self.get_esxi_info()
            # 连接失败
            if not esxi_host['connect_status']:
                return esxi_host
    
            del esxi_host['connect_status']
    
            for machine_ip in esxi_host:
                # 物理机信息
                esxi_host_dict = esxi_host[machine_ip]
                # 虚拟机信息
                virtual_host = esxi_host[machine_ip]['vm_host']
                del esxi_host[machine_ip]['vm_host']
    
                obj = models.EsxiHost.objects.create(**esxi_host_dict)
                obj.save()
    
                for host_info in virtual_host:
                    host_info['management_host_id'] = obj.id
                    obj2 = models.virtualHost.objects.create(**host_info)
                    obj2.save()
    

    获取域名 ssl 证书过期时间

    用于 zabbix 告警

    import re
    import sys
    import time
    import subprocess
    from datetime import datetime
    from io import StringIO
    
    def main(domain):
        f = StringIO()
        comm = f"curl -Ivs https://{domain} --connect-timeout 10"
    
        result = subprocess.getstatusoutput(comm)
        f.write(result[1])
    
        try:
            m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
            start_date = m.group(1)
            expire_date = m.group(2)
            common_name = m.group(3)
            issuer = m.group(4)
        except Exception as e:
            return 999999999
    
        # time 字符串转时间数组
        start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
        start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
        # datetime 字符串转时间数组
        expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
        expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
    
        # 剩余天数
        remaining = (expire_date-datetime.now()).days
    
        return remaining 
    
    if __name__ == "__main__":
        domain = sys.argv[1] 
        remaining_days = main(domain)
        print(remaining_days)
    

    发送今天的天气预报以及未来的天气趋势图


    此脚本用于给老婆大人发送今天的天气预报以及未来的天气趋势图,现在微信把网页端禁止了,没法发送到微信了,我是通过企业微信进行通知的,需要把你老婆大人拉到企业微信,无兴趣的小伙伴跳过即可。

    # -*- coding: utf-8 -*-
    
    
        import requests
        import json
        import datetime
    
        def weather(city):
            url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
    
            try:
                data = requests.get(url).json()['data']
                city = data['city']
                ganmao = data['ganmao']
    
                today_weather = data['forecast'][0]
                res = "老婆今天是{}\n今天天气概况\n城市: {:<10}\n时间: {:<10}\n高温: {:<10}\n低温: {:<10}\n风力: {:<10}\n风向: {:<10}\n天气: {:<10}\n\n稍后会发送近期温度趋势图,请注意查看。\
                ".format(
                    ganmao,
                    city,
                    datetime.datetime.now().strftime('%Y-%m-%d'),
                    today_weather['high'].split()[1],
                    today_weather['low'].split()[1],
                    today_weather['fengli'].split('[')[2].split(']')[0],
                    today_weather['fengxiang'],today_weather['type'],
                )
    
                return {"source_data": data, "res": res}
            except Exception as e:
                return str(e)
        ```
        + 获取天气预报趋势图
        ```python
        # -*- coding: utf-8 -*-
    
    
        import matplotlib.pyplot as plt
        import re
        import datetime
    
    
        def Future_weather_states(forecast, save_path, day_num=5):
            '''
            展示未来的天气预报趋势图
            :param forecast: 天气预报预测的数据
            :param day_num: 未来几天
            :return: 趋势图
            '''
            future_forecast = forecast
            dict={}
    
            for i in range(day_num):
                data = []
                date = future_forecast[i]["date"]
                date = int(re.findall("\d+",date)[0])
                data.append(int(re.findall("\d+", future_forecast[i]["high"])[0]))
                data.append(int(re.findall("\d+", future_forecast[i]["low"])[0]))
                data.append(future_forecast[i]["type"])
                dict[date] = data
    
            data_list = sorted(dict.items())
            date=[]
            high_temperature = []
            low_temperature = []
            for each in data_list:
                date.append(each[0])
                high_temperature.append(each[1][0])
                low_temperature.append(each[1][1])
                fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")
    
            current_date = datetime.datetime.now().strftime('%Y-%m')
            plt.rcParams['font.sans-serif'] = ['SimHei']
            plt.rcParams['axes.unicode_minus'] = False
            plt.xlabel(current_date)
            plt.ylabel("℃")
            plt.legend(["高温", "低温"])
            plt.xticks(date)
            plt.title("最近几天温度变化趋势")
            plt.savefig(save_path)
        ```
        + 发送到企业微信
        ```python
        # -*- coding: utf-8 -*-
    
    
        import requests
        import json
    
    
        class DLF:
            def __init__(self, corpid, corpsecret):
                self.url = "https://qyapi.weixin.qq.com/cgi-bin"
                self.corpid = corpid
                self.corpsecret = corpsecret
                self._token = self._get_token()
    
            def _get_token(self):
                '''
                获取企业微信API接口的access_token
                :return:
                '''
                token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
                try:
                    res = requests.get(token_url).json()
                    token = res['access_token']
                    return token
                except Exception as e:
                    return str(e)
    
            def _get_media_id(self, file_obj):
                get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
                data = {"media": file_obj}
    
                try:
                    res = requests.post(url=get_media_url, files=data)
                    media_id = res.json()['media_id']
                    return media_id
                except Exception as e:
                    return str(e)
    
            def send_text(self, agentid, content, touser=None, toparty=None):
                send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
                send_data = {
                    "touser": touser,
                    "toparty": toparty,
                    "msgtype": "text",
                    "agentid": agentid,
                    "text": {
                        "content": content
                    }
                }
    
                try:
                    res = requests.post(send_msg_url, data=json.dumps(send_data))
                except Exception as e:
                    return str(e)
    
            def send_image(self, agentid, file_obj, touser=None, toparty=None):
                media_id = self._get_media_id(file_obj)
                send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
                send_data = {
                    "touser": touser,
                    "toparty": toparty,
                    "msgtype": "image",
                    "agentid": agentid,
                    "image": {
                        "media_id": media_id
                   }
                }
    
                try:
                    res = requests.post(send_msg_url, data=json.dumps(send_data))
                except Exception as e:
                    return str(e)
    + main脚本
    
    # -*- coding: utf-8 -*-
    
    
    from plugins.weather_forecast import weather
    from plugins.trend_chart import Future_weather_states
    from plugins.send_wechat import DLF
    import os
    
    
    # 企业微信相关信息
    corpid = "xxx"
    corpsecret = "xxx"
    agentid = "xxx"
    # 天气预报趋势图保存路径
    _path = os.path.dirname(os.path.abspath(__file__))
    save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')
    
    # 获取天气预报信息
    content = weather("大兴")
    
    # 发送文字消息
    dlf = DLF(corpid, corpsecret)
    dlf.send_text(agentid=agentid, content=content['res'], toparty='1')
    
    # 生成天气预报趋势图
    Future_weather_states(content['source_data']['forecast'], save_path)
    # 发送图片消息
    file_obj = open(save_path, 'rb')
    dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)
    

    Shell 脚本部分

    SVN 完整备份

    通过 hotcopy 进行 SVN 完整备份,备份保留 7 天。

    #!/bin/bash
    # Filename   :  svn_backup_repos.sh
    # Date       :  2020/12/14
    # Author     :  JakeTian      
    # Email      :  JakeTian@***.com
    # Crontab    :  59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1
    # Notes      :  将脚本加入crontab中,每天定时执行
    # Description:  SVN完全备份
    
    
    set -e
    
    SRC_PATH="/opt/svndata"
    DST_PATH="/data/svnbackup"
    LOG_FILE="$DST_PATH/logs/svn_backup.log"
    SVN_BACKUP_C="/bin/svnadmin hotcopy"
    SVN_LOOK_C="/bin/svnlook youngest"
    TODAY=$(date +'%F')
    cd $SRC_PATH
    ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './')
    
    # 创建备份目录,备份脚本日志目录
    test -d $DST_PATH || mkdir -p $DST_PATH
    test -d $DST_PATH/logs || mkdir $DST_PATH/logs
    test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
    
    # 备份repos文件
    for repo in $ALL_REPOS
    do
        $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
    
        # 判断备份是否完成
        if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
            echo "$TODAY: $repo Backup Success" >> $LOG_FILE 
        else
            echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
        fi
    done
    
    # # 备份用户密码文件和权限文件
    cp -p authz access.conf $DST_PATH/$TODAY
    
    # 日志文件转储
    mv $LOG_FILE $LOG_FILE-$TODAY
    
    # 删除七天前的备份
    seven_days_ago=$(date -d "7 days ago" +'%F')
    rm -rf $DST_PATH/$seven_days_ago
    

    zabbix 监控用户密码过期

    用于 Zabbix 监控 Linux 系统用户(shell 为 /bin/bash 和 /bin/sh)密码过期,密码有效期剩余 7 天触发加自动发现用户。

    #!/bin/bash
    
    
    diskarray=(`awk -F':' '$NF ~ /\/bin\/bash/||/\/bin\/sh/{print $1}' /etc/passwd`)
    length=${#diskarray[@]}
    
    printf "{\n"
    printf  '\t'"\"data\":["
    for ((i=0;i<$length;i++))
    do
        printf '\n\t\t{'
        printf "\"{#USER_NAME}\":\"${diskarray[$i]}\"}"
        if [ $i -lt $[$length-1] ];then
                printf ','
        fi
    done
    printf  "\n\t]\n"
    printf "}\n"
    
    检查用户密码过期
    
    #!/bin/bash
    
    export LANG=en_US.UTF-8
    
    SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')
    user="$1"
    
    # 将Sep 09, 2018格式的时间转换成unix时间
    expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')
    if [[ "$expires_date" != "never" ]];then
        expires_date=$(date -d "$expires_date" +'%s')
    
        if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then
            echo "1"
        else
            echo "0"
        fi
    else
        echo "0"
    fi
    

    构建本地YUM

    通过 rsync 的方式同步 yum,通过 nginx 只做 http yum 站点;

    但是 centos6 的镜像最近都不能用了,国内貌似都禁用了,如果找到合适的自行更换地址。

    #!/bin/bash
    # 更新yum镜像
    
    
    RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
    DIR="/app/yumData"
    LogDir="$DIR/logs"
    Centos6Base="$DIR/Centos6/x86_64/Base"
    Centos7Base="$DIR/Centos7/x86_64/Base"
    Centos6Epel="$DIR/Centos6/x86_64/Epel"
    Centos7Epel="$DIR/Centos7/x86_64/Epel"
    Centos6Salt="$DIR/Centos6/x86_64/Salt"
    Centos7Salt="$DIR/Centos7/x86_64/Salt"
    Centos6Update="$DIR/Centos6/x86_64/Update"
    Centos7Update="$DIR/Centos7/x86_64/Update"
    Centos6Docker="$DIR/Centos6/x86_64/Docker"
    Centos7Docker="$DIR/Centos7/x86_64/Docker"
    Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"
    Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"
    Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"
    Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"
    MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
    
    # 目录不存在就创建
    check_dir(){
        for dir in $*
        do
            test -d $dir || mkdir -p $dir
        done
    }
    
    # 检查rsync同步结果
    check_rsync_status(){
        if [ $? -eq 0 ];then
            echo "rsync success" >> $1
        else
            echo "rsync fail" >> $1
        fi
    }
    
    
    check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
    
    
    # Base yumrepo
    #$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
    # check_rsync_status "$LogDir/centos6Base.log"
    $RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
    check_rsync_status "$LogDir/centos7Base.log"
    
    # Epel yumrepo
    # $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
    # check_rsync_status "$LogDir/centos6Epel.log"
    $RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1
    check_rsync_status "$LogDir/centos7Epel.log"
    
    # SaltStack yumrepo
    # $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
    # ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
    # check_rsync_status "$LogDir/centos6Salt.log"
    $RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1
    check_rsync_status "$LogDir/centos7Salt.log"
    # ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
    
    # Docker yumrepo
    $RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1
    check_rsync_status "$LogDir/centos7Docker.log"
    
    # centos update yumrepo
    # $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
    # check_rsync_status "$LogDir/centos6Update.log"
    $RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1
    check_rsync_status "$LogDir/centos7Update.log"
    
    # mysql 5.7 yumrepo
    # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1
    # check_rsync_status "$LogDir/centos6Mysql5.7.log"
    $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1
    check_rsync_status "$LogDir/centos7Mysql5.7.log"
    
    # mysql 8.0 yumrepo
    # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1
    # check_rsync_status "$LogDir/centos6Mysql8.0.log"
    $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1
    check_rsync_status "$LogDir/centos7Mysql8.0.log"
    

    读者需求解答

    负载高时,查出占用比较高的进程脚本并存储或推送通知

    这部分内容是上篇 7 个非常实用的 Shell 拿来就用脚本实例!中底部读者留言的需求,如下:

    #!/bin/bash
    
    # 物理cpu个数
    physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)
    # 单个物理cpu核数
    physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
    # 总核数
    total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
    
    # 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发
    one_min_load_threshold="$total_cpu_cores"
    five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}')
    fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}')
    
    # 分别是分钟、五分钟、十五分钟负载平均值
    one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
    five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
    fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
    
    # 获取当前cpu 内存 磁盘io信息,并写入日志文件
    # 如果需要发送消息或者调用其他,请自行编写函数即可
    get_info(){
        log_dir="cpu_high_script_log"
        test -d "$log_dir" || mkdir "$log_dir"
        ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
        ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
        iostat -dx 1 10 > "$log_dir"/disk_io_10.log
    }
    
    
    export -f get_info
    
    echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | \
    awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'
    

    以上,就是今天分享的全部内容了。

    希望大家通过这些案例能够学以致用,结合自身的实际场景进行运用,从而提高自己的工作效率。

    如果你有更多脚本实例,也欢迎大家分享或通过本文留言区进行留言说说你具体的脚本实例需求,如果实例过多的话,下次杰哥在整一篇合集脚本文章实例来跟大家分享。

    对了,如果你觉得上述这些脚本实例能够用到自己工作中的话,也希望能给予小小的支持(点赞、留言、转发分享随你便)。


    欢迎各位走过路过的 CSDN 朋友们,关注我的微信公众号:杰哥的IT之旅关注后回复:wx,备注:地区-职业方向-昵称,也加入百人读者交流群,与更多的 IT 大佬学习交流。


    原创不易,码字不易。 觉得这篇文章对你有点用的话,麻烦你为本文点个赞留言转发一下,因为这将是我输出更多优质文章的动力,感谢!

    ⬇⬇⬇⬇⬇⬇⬇⬇

    展开全文
  • python脚本案例-python脚本范例

    千次阅读 2020-10-30 00:43:22
    【伯乐在线导读】:有网友在 quora 上提问,「你用 python 写过最牛逼的程序脚本是什么? 」。本文摘编了 3 个国外程序员的多个小项目,含代码。 manoj memana jayakumar, 3000+ 顶更新:凭借这些脚本,我找到了工作...

    flbm1hkjk6.jpg广告关闭

    2017年12月,云+社区对外发布,从最开始的技术博客到现在拥有多个社区产品。未来,我们一起乘风破浪,创造无限可能。

    【伯乐在线导读】:有网友在 quora 上提问,「你用 python 写过最牛逼的程序脚本是什么? 」。本文摘编了 3 个国外程序员的多个小项目,含代码。 manoj memana jayakumar, 3000+ 顶更新:凭借这些脚本,我找到了工作! 可看我在这个帖子中的回复,《has anyone got a job through quora? or somehow made lots of ...

    ul49t4yt5i.jpeg

    来源:python开发者id:pythoncoder【导读】:有网友在 quora 上提问,「你用 python 写过最牛逼的程序脚本是什么? 」。本文摘编了 3 个国外程序员的多个小项目,含代码。 manoj memana jayakumar, 3000+ 顶更新:凭借这些脚本,我找到了工作! 可看我在这个帖子中的回复,《has anyone got a job through quora?...

    ife4bdpjwd.jpeg

    【导读】:有网友在 quora 上提问,「你用 python 写过最牛逼的程序脚本是什么? 」。本文摘编了 3 个国外程序员的多个小项目,含代码。 manoj memana jayakumar, 3000+ 顶更新:凭借这些脚本,我找到了工作! 可看我在这个帖子中的回复,《has anyone got a job through quora? or somehow made lots of money ...

    编译:python开发者 - jake_on 英文:quorahttp:python.jobbole.com85986有网友在 quora 上提问,「你用 python 写过最牛逼的程序脚本是什么? 」。本文摘编了 3 个国外程序员的多个小项目,含代码。 manoj memana jayakumar, 3000+ 顶更新:凭借这些脚本,我找到了工作! 可看我在这个帖子中的回复,《has anyone ...

    为此写了修改json文件的python脚本供工程后续调用。 代码如下:# coding=utf-8 设置文本格式import os,sysimport jsondef get_new_json(filepath,key,value):key_ = key.split(.) key_length = len(key_) with open(filepath, rb) as f:json_data = json.load(f) i = 0 a = json_data while i< key_length : if i+1 =...

    原创文章,转载请注明: 转载自url-team本文链接地址:python远程部署利器fabric详解-转载related posts:学习—用 python 和 opencv 检测和跟踪运动对象使用pyaiml机器人模块快速做个和你智能对话的大脑 让树莓派开机运行python脚本阿里云学生主机压力测试与优化防御脚本 linux查看实时带宽流量情况以及查看端口信息...

    python是最受欢迎和随需应变的通用编程语言之一。 它是一种解释性的高级编程语言,支持多种编程范例,包括过程式,面向对象和函数式编程。 由于其全面的标准库,它通常被描述为"py自带着电池”。 它被广泛使用,并且是一种非常通用的编程语言,因为它被初学者和科学家用于不同类型的活动。 它用于系统编程和脚本编写...

    python的一些小函数很能提高效率,平时在工作中经常忽视这些内容,而使用很原始粗暴的方法写代码; 写了一段时间以后,发现自己的提高很少,要写个小脚本也要纠结半天,跟那些大拿们相差太大; 所以要检讨自己,看看自己可以从那方面提高自己的技术能力; 今天首先学习下python的实用小函数:lamda() 返回一个函数...

    mz3pinnz2w.png

    在下今天写了一个小小的python程序,可以在完全不看源代码的情况下,分析a如果调用b.so的时候,会引用b.so的哪些函数,它的用法如下:symbol-dep.py -s a -d b.so把a作为-s参数,把b.so作为-d参数。 它的原理如下:用nm -d --undefined-only命令可以列出一个程序依赖的需要动态链接的库函数,譬如:? 用nm -d ...

    初学python,装饰器是什么玩意儿? 1:装饰器是函数,只不过该函数可以具有特殊的含义,装饰器用来装饰函数或类,使用装饰器可以在函数执行前和执行后添加...范例:模块paramikoparamiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作。 fabric和ansible内部的远程管理就是使用param...

    第二步:编写python脚本本教程将使用python 3.x以及一些模块来进行简化。 使用pip模块安装程序,需运行一下命令:现在,在一个新的目录中,创建一个名为...你可能还想进一步挖掘元数据,例如范例,设计者或打字规律。 收集链接我们还需要一个函数–该函数读入给定语言的table对象,输出一个包含其他编程语言的...

    目前而言一些顶级公司在其技术堆栈中使用python(例如instagram)facebook旗下的一个图片社交网站python适合从简单到复杂的各种web项目。 它广泛用于旅行,医疗保健,交通运输,金融等不同领域,用于web开发和软件测试,脚本编写和生成。 python的受欢迎程度与其提供的各种好处有关,例如简单性和优雅性吸引了这些大...

    owrdhn1xq4.jpeg

    你瞧,我们就是这么干的,不是吗? 高级脚本语言非常适合人工智能和机器学习,因为我们可以快速更新并重试。 我们创建的代码大部分代表实际的数学和数据结构,而不是模板。 像python这样的脚本语言就很好,因为它是严格和一致的,每个人都可以更好地理解对方的python代码,相反的,其他语言则会有混淆和不一致的编程...

    背景:游戏公司,服务器上有充值服,世界服,经分服务器等,和前端的game有链接通信,为防止链接通信故障导致线上业务中断,需要一个小脚本时刻监控线上链接状况。 涉及:shell、python2.6、126免费邮箱配置:vimusrlightserverserveroperationanalysisserverconfig.xml-->环境不同,这里只做范例 shell脚本:#!...

    o67u5naos4.png

    文章赞赏所得归作者所有,文章将同步至python中文社区微信公众号、知乎专栏、简书等各大网络平台 投稿请寄:sinoandywong@gmail.com,大家共同学习,共同进步。 本期文章由@黑白授权发布,版权所有,感谢作者分享。 一个js动态数据抓取范例作者:python中文社区网友@黑白 抓取站点:球网,一个足球比分统计网站http...

    每个 python 使用者都可以更好地理解对方的 python 代码,而其他语言的语法有可能会导致混淆和不一致的编程范例,这就是 python 较其他编程语言的优势所在...▌编程语言对比在 2017 年世界脚本语言排行榜中,python 强势登顶; 在 tiobe 编程语言排行榜中,python 地位也在逐月攀升,越来越受广大开发者的欢迎...

    wn57ntrwui.jpeg

    每个 python 使用者都可以更好地理解对方的 python 代码,而其他语言的语法有可能会导致混淆和不一致的编程范例,这就是 python 较其他编程语言的优势所在...▌编程语言对比在 2017 年世界脚本语言排行榜中,python 强势登顶; 在 tiobe 编程语言排行榜中,python 地位也在逐月攀升,越来越受广大开发者的欢迎...

    您需要打开终端并输入python --version。 您应该可以看到python的版本为2.7.x。 对于windows用户而言,请由官方网站安装python。 下一步,我们需要利用pip...元(meta)和脚本(script)声明包含在和标签之间4. 网站上可见的部分包含在和标签之间5. 和标签之间的部分为网站标题6. 标签用于定义段落其他有用的标签...

    w6fjfkdzr4.jpeg

    像 python 这样的脚本语言更适合 ai 机器学习的工作,因为它严格而一致的语法风格。 每个 python 使用者都可以更好地理解对方的 python 代码,而其他语言的语法有可能会导致混淆和不一致的编程范例,这就是 python 较其他编程语言的优势所在。 此外,ipython notebook等工具的可用性使得我们可以在全新的平台上重复并...

    展开全文
  • liuyi@l:/media/liuyi/数据/编程学习/源码/python/for$ more list.txt qq.com 80 g.cm 90 baidu.com 443 现在我需要把文本跟端口进行切割,然后再打印切割后的数据并进行变量赋值,源码如下: from os import path...

    1、实验环境

    1.1、场景设计

    我在一个文本中存放了一些IP跟端口的信息,文本内容如下:

    liuyi@l:/media/liuyi/数据/编程学习/源码/python/for$ more list.txt 
    qq.com 80
    g.cm 90
    baidu.com 443
    

    现在我需要把文本跟端口进行切割,然后再打印切割后的数据并进行变量赋值,源码如下:

    from os import path
    #设置文件path变量
    url_file = "./list.txt"
    #判断文件是否存在
    if path.exists(url_file):
        #打开文件
        file = open(url_file, "r", encoding='utf-8')
        #遍历文件行内容
        for (num, li) in enumerate(file, 1):
            print("正在检测第[ %s ]行内容: [ %s ] " %(num, li))
            xi = li.split(' ', 2)
            xi2 = [li]
            for i in xi2:
                domain_port = i.split(' ')
                domain = domain_port[0]
                port = domain_port[1]
                print("第[ %s ]行获取的域名:[ %s ] " %(num, domain))
                print("第[ %s ] 行获取的端口:[ %s ] " %(num, port))
                print(domain)
                print(port)
        file.close()
    else:
        exit("找不到文件[%s]" %url_file)
    

    可是在打印的过程中出现了预期之外的现象,如下:
    在这里插入图片描述

    可以看出,下面这个符合出现了不该有的断层

    []

    作为强迫症患者,这怎么能忍受呢!

    2、查找问题

    程序出了问题,最好的解决办法就是一点一点的还原最初的信息,所以我们可以先看一下获取到的整行内容有什么,所以代码修改成这样:

    from os import path
    #设置文件path变量
    url_file = "./list.txt"
    #判断文件是否存在
    if path.exists(url_file):
        #打开文件
        file = open(url_file, "r", encoding='utf-8')
        #遍历文件行内容
        for (num, li) in enumerate(file, 1):
            print("正在检测第[ %s ]行内容: [ %s ] " ,(num, li))
            xi = li.split(' ', 2)
            xi2 = [li]
            for i in xi2:
                domain_port = i.split(' ')
                domain = domain_port[0]
                port = domain_port[1]
                print("第[ %s ]行获取的域名:[ %s ] " ,(num, domain))
                print("第[ %s ] 行获取的端口:[ %s ] " ,(num, port))
                print(domain)
                print(port)
        file.close()
    else:
        exit("找不到文件[%s]" %url_file)
    

    也就是把打印的变量独立出来,不作为

    %s

    的引用,然后运行,得出的结果如下:

    liuyi@l:/media/liuyi/数据/编程学习/源码/python/for$ python3 read_line.py 
    正在检测第[ %s ]行内容: [ %s ]  (1, 'qq.com 80\n')[ %s ]行获取的域名:[ %s ]  (1, 'qq.com')[ %s ] 行获取的端口:[ %s ]  (1, '80\n')
    qq.com
    80
    
    正在检测第[ %s ]行内容: [ %s ]  (2, 'g.cm 90\n')[ %s ]行获取的域名:[ %s ]  (2, 'g.cm')[ %s ] 行获取的端口:[ %s ]  (2, '90\n')
    g.cm
    90
    
    正在检测第[ %s ]行内容: [ %s ]  (3, 'baidu.com 443\n')[ %s ]行获取的域名:[ %s ]  (3, 'baidu.com')[ %s ] 行获取的端口:[ %s ]  (3, '443\n')
    baidu.com
    443
    
    liuyi@l:/media/liuyi/数据/编程学习/源码/python/for$ 
    

    此时可以看出,在每一行后面都有一个转义,也就是换行符,所以这个换行符就把符号的后一部分换行到下一行了,所以出现了符合不对称的问题,既然问题找到了,那么就解决一下!

    3、解决问题

    3.1、解决思路

    对于此类问题,我在编写shell脚本的过程中也会遇到,所以如果是换到shell语言的话,一个sed就解决了,也就是通过替换,那么在Python也是一样的,虽然工具有点不一样,但是解决的思路是一致的。

    3.2、解决工具

    在Python中,处理字符串替换的工具是str.replace,菜鸟教程解释链接如下:https://www.runoob.com/python/att-string-replace.html
    通过菜鸟教程的案例,可以看出语法是非常的简洁的,但是在这个案例中使用的只是其中一个语法,另一个语法如下:

    str.replace("str", "old", "new")
    

    举个例子:

    liuyi@l:~$ more d.py 
    li = 'baidu'
    lis = str.replace(li, "du", "da")
    print(lis)
    

    执行结果如下:

    liuyi@l:~$ python3 d.py 
    baida
    liuyi@l:~$ 
    

    3.2.1、增加str.replace语句

    代码改造如下:

    from os import path
    #设置文件path变量
    url_file = "./list.txt"
    #判断文件是否存在
    if path.exists(url_file):
        #打开文件
        file = open(url_file, "r", encoding='utf-8')
        #遍历文件行内容
        for (num, li) in enumerate(file, 1):
            li = replace(li, "\n", "")
            print("正在检测第[ %s ]行内容: [ %s ] " %(num, li))
            xi = li.split(' ', 2)
            xi2 = [li]
            for i in xi2:
                domain_port = i.split(' ')
                domain = domain_port[0]
                port = domain_port[1]
                print("第[ %s ]行获取的域名:[ %s ] " %(num, domain))
                print("第[ %s ] 行获取的端口:[ %s ] " %(num, port))
                print(domain)
                print(port)
        file.close()
    else:
        exit("找不到文件[%s]" %url_file)
    

    运行结果如下:

    liuyi@l:/media/liuyi/数据/编程学习/源码/python/for$ python3 read_line.py 
    正在检测第[ 1 ]行内容: [ qq.com 80 ][ 1 ]行获取的域名:[ qq.com ][ 1 ] 行获取的端口:[ 80 ] 
    qq.com
    80
    正在检测第[ 2 ]行内容: [ g.cm 90 ][ 2 ]行获取的域名:[ g.cm ][ 2 ] 行获取的端口:[ 90 ] 
    g.cm
    90
    正在检测第[ 3 ]行内容: [ baidu.com 443 ][ 3 ]行获取的域名:[ baidu.com ][ 3 ] 行获取的端口:[ 443 ] 
    baidu.com
    443
    liuyi@l:/media/liuyi/数据/编程学习/源码/python/for$ 
    

    完美!

    展开全文
  • Shell脚本综合实战案例 前言 实验环境 VMware Workstation 15 centos7虚拟机若干台 1.案例一: 1.需求描述 1.1编写名为system.sh的小脚本,记录局域网中各主机的MAC地址,保存到/etc/ethers文件中;若是文件已存在,...
  • shell脚本案例(一)

    2021-04-28 18:51:14
    1、写一个脚本,执行后,打印一行提示“Please input a number:",要求用户输入数值,然后打印出该数值,然后再次要求用户输入数值。直到用户输入"end"停止。 #!/bin/bash while : do read -p "Please input a ...
  • 通过shell脚本自动初始化python环境

    千次阅读 2014-08-18 10:02:37
    在项目周期内需要经历三个过程,分别是开发、测试、部署。在这三个过程中,一般需要有三个环境分别与之对应,分别是:开发...这时通过编写shell脚本可以满足这一需求。  像python这种脚本语言,有着强大的第三方...
  • Python编写测试案例

    2020-03-05 20:19:40
    因为工作需要,需要用Python编写测试案例。接下来会记录在这期间遇到的一些问题和解决方法。
  • 很多人讨厌bash脚本。每当我要做最简单的事情时,我都必须查阅文档。如何将函数的参数转发给子命令?如何将字符串分配给变量,然后作为命令调用该字符串?如何检查两个字符串变量是否相等?如何分割字符串并获得后半...
  • 1. 先说答案 对于像我这种没有耐心的同学,我这里先直接给出答案。这里先说明几个前提: 假设要运行的脚本名称为 handle.py,通常该脚本里面是一个循环运行的程序,...脚本直接运行的命令为:python3 action....
  • 编写shell脚本程序一次安装多个软件 编写shell脚本程序一次安装多个软件,主要用于一些软件依赖环境配置。 1、shell脚本程序必须以下面的行开始(必须方在文件的第一行):  #!/bin/sh   符号#!用来告诉系统它...
  • 如何用python写游戏脚本

    千次阅读 多人点赞 2020-08-17 13:43:08
    很多人学习python,掌握了基本语法过后,不知道在哪里寻找案例上手。 很多已经做案例的人,却不知道如何去学习更加高深的知识。 那么针对这三类人,我给大家提供一个好的学习平台,免费领取视频教程,电子书籍,以及...
  • 3 案例3:创建用户 3.1 问题 创建adduser.py文件,实现以下目标: 编写一个程序,实现创建用户的功能 提示用户输入用户名 随机生成8位密码 创建用户并设置密码 将用户相关信息写入指定文件 3.2 方案 创建add_user()...
  • shell脚本学习笔记

    千次阅读 2019-10-03 12:58:27
    shell脚本学习笔记 1.Shell入门简介 Shell是操作系统的最外层, Shell可以合并编程语言以控制进程和文件,以及启动和控制其它程序。shell通过提示您输入,向操作系统解释该输入, 然后处理来自操作系统的任何结果输...
  • 1、获取随机字符串或数字 2、定义一个颜色输出字符串函数 3、批量创建用户 4、检查软件包是否安装 5、检查服务状态 6、检查主机存活状态 7、监控CPU、内存和硬盘利用率 ...14、编写shell脚本,输入一个数字n并计算1~n的
  • Shell脚本编程-总

    2020-11-09 17:06:27
    Shell脚本编程一. SHELL入门1.1 变量1.1.1 变量名规范1.1.2 位置变量1.1.3 环境变量1.2 管道1.3 退出状态码1.3.1 退出状态码描述1.3.2 改变退出状态码的exit命令二. 判断与控制2.1 if-then 语句2.2 if-then-else ...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • 点击下方公众号「关注」和「星标」回复“1024”获取独家整理的学习资料!shell脚本是帮助程序员和系统管理员完成费时费力的枯燥工作的利器,是与计算机交互并管理文件和系统操作的有效方式。区...
  • Dos shell脚本运行shell脚本方法 在实际开发中,经常会需要通过Windows操作系统下的*.bat文件运行*.sh shell脚本。下面将通过案例讲解该方法: rem this is a dos shell script used to run a shell script @echo...
  • Shell脚本实战案例

    2018-03-30 20:27:41
    企业Shell面试题1:批量生成随机字符文件名案例使用for循环在/oldboy目录下批量创建10个html文件,其中每个文件需要包含10个随机小写字母加固定字符串oldboy,名称示例如下: 解答: 【文本如下】[ -d /oldboy ] || ...
  • Linux运维常用shell脚本实例

    千次阅读 2019-05-10 15:03:02
    1、用shell脚本批量建立Linux用户 实现要求:创建用户student1到student50,指定组为student组!而且每个用户需要设定一个不同的密码! 脚本实现如下: 说明: Linux下 Passwd有参数--stdin This option ...
  • 运维自动化,已经成为运维必不可少的一部分,下面附上自己写的监控nginx_status脚本,大神轻喷#! usrbinpython#coding:utf-8importurllib.requestimportsocket#自动获取主机ip地址myname=socket.getfqdn(...
  • Shell脚本基础知识及案例

    千次阅读 2016-11-03 03:01:54
    Shell 脚本(即 Shell Script),Shell 脚本类似于 Windows/Dos 下的批处理,也就是将所需的各类命令预先放入到一个文件中,方便一次性执行的一个程序文件,主要应用在 Linux 系统运维上,方便管理员进行设置或者...
  • 01.Shell脚本编写入门

    2020-03-21 23:33:50
    一、什么是Shell 工作中Linux内核与用户之间的解释程序 相当于操作系统的“外壳” 向Linux内核传达用户指令的“翻译官” 通常指BASH(/bin/bash) Windows下的Shell解释器 C:\Windows\System32\cmd.exe 二、什么是...
  • Shell脚本运行时,它会先查找系统环境变量ENV,该变量指定了环境文件(加载顺序通常是/etc/profile、~/.bash_profile、~/.bashrc、/etc/bashrc等),在加载了上述环境变量文件后,Shell就开始执行Shell脚本中的内容...
  • SHELL脚本编程之文本处理 文件处理三剑客之grep grep命令 语法格式 grep参数 grep和egrep 文件处理三剑客之sed sed工作模式 sed选项 sed中的pattern详解 sed中的编辑命令详解 反向引用 利用sed查找文件内容 利用sed...
  • 概述 shell概念 shell又称命令解释器,...在UNIX或者localhost中,Shell既是用户交互的界面,也是控制系统的脚本语言 shell的种类 CentOS liunx系统默认的shell为bash shell 相关 Bourne ...
  • Shell脚本基础架构

    2019-09-07 19:18:17
    介绍了从0开始编写一个完整的shell脚本,必须了解的基础知识,还列举了自动监控磁盘、批量创建文件、测试网络连通性、预防DDos攻击等几个shell脚本的小案例
  • 会不会有那么一天,生活可以简单到每天清早踏上一辆载着鲜花的脚踏车,...在Linux服务器的自动化维护工作中,除了计划任务的设置以外,Shell脚本的应用也是非常重要的一部分。本章将主要学习Shell脚本基础,变量使...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,403
精华内容 2,161
关键字:

python编写shell脚本案例

python 订阅