精华内容
下载资源
问答
  • 生成py文件后,运行该py文件,这里窗口我只是随便拖了几个组件进去,主要的text browser用于显示获取到的sinanews。 首先贴一下我的配置 官方下载:  Python 3.3.3  PyQt5-5.2.1 for Py3.3(当安装完Python3.3后,...
  • #批量执行当前文件夹下所有py文件 import os lst=os.listdir(os.getcwd()) for c in lst: if os.path.isfile(c) and c.endswith('.py') and c.find("AllTest")==-1: #去掉AllTest.py文件 prin
  • import os # 执行一个文件里所有的文件,比如 def func(path): # 先判断这个path是文件还是... #执行这个文件 **** 需要记怎么执行文件 **** os.system("python %s" % path)#模拟在cmd窗口命令行中执行代码 #如果..
    
    import os
    
    # 执行一个文件里所有的文件,比如
    def func(path):
        # 先判断这个path是文件还是文件夹 isdir, isfile
        # 如果是文件:.py结尾
        if os.path.isfile(path) and path.endswith(".py"):
            #执行这个文件  **** 需要记怎么执行文件 ****
            os.system("python %s" % path)#模拟在cmd窗口命令行中执行代码
        #如果是文件夹
        elif os.path.isdir(path):
            #查看这个文件夹下面的所有的名字
            name_list = os.listdir(path)
            for name in name_list:
                f_name = os.path.join(path,name)
                #如果是文件.py结尾的
                if f_name.endswith(".py"):
                    # 执行这个文件
                    os.system("python %s" % f_name)
    func('/xx/home/')

    原文地址:https://www.cnblogs.com/kenD/p/9519829.html

    注意事项:

    文件的相对路径:如果执行的文件夹里的py中含有相对路径,这个相对路径是针对现在这个py文件的。

    展开全文
  • 上篇文章已经研究过datax.py文件,产生2个问题: 如果用户不是py2环境(datax默认要求环境)怎么处理; 能不能有一个脚本自动识别用户的py环境,从而执行datax任务 2 效果 在py2或py3下执行下面命令 >python ...

    一、 思考

    上篇文章已经研究过datax.py文件,产生2个问题:

    1. 如果用户不是py2环境(datax默认要求环境)怎么处理;
    2. 能不能有一个脚本自动识别用户的py环境,从而执行datax任务

    二、效果

    在py2或py3下执行下面命令
    >python datax.py ../job/job.json
    

    在这里插入图片描述
    熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;


    三、改造过程

    1 编写py3的datax脚本

    共计3个文件

    ===datax.py文件===
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import sys
    import os
    import signal
    import subprocess
    import time
    import re
    import socket
    import json
    from optparse import OptionParser
    from optparse import OptionGroup
    from string import Template
    import codecs
    import platform
    
    
    def isWindows():
        return platform.system() == 'Windows'
    
    
    DATAX_HOME = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
    
    # print("DATAX_HOME==="+DATAX_HOME)
    
    DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
    if isWindows():
        codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
        CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
    else:
        CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
    
    print("CLASS_PATH===" + CLASS_PATH)
    
    LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
    print("LOGBACK_FILE===" + LOGBACK_FILE)
    
    DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
    print("DEFAULT_JVM===" + DEFAULT_JVM)
    
    DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
        DATAX_HOME, LOGBACK_FILE)
    print("DEFAULT_PROPERTY_CONF===" + DEFAULT_PROPERTY_CONF)
    
    ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
        DEFAULT_PROPERTY_CONF, CLASS_PATH)
    print("ENGINE_COMMAND===" + ENGINE_COMMAND)
    
    REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
    print("REMOTE_DEBUG_CONFIG===" + REMOTE_DEBUG_CONFIG)
    
    RET_STATE = {
        "KILL": 143,
        "FAIL": -1,
        "OK": 0, #无错误退出 不传参数时,默认传0
        "RUN": 1, #有错误退出
        "RETRY": 2
    }
    
    
    # 获取本地ip
    def getLocalIp():
        try:
            return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
        except:
            return "Unknown"
    
    
    # 自杀(自己停止程序)
    def suicide(signum):
        global child_process
        print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)
    
        if child_process:
            child_process.send_signal(signal.SIGQUIT)
            time.sleep(1)
            child_process.kill()
        print >> sys.stderr, "DataX Process was killed ! you did ?"
        sys.exit(RET_STATE["KILL"])
    
    
    def register_signal():
        if not isWindows():
            global child_process
            signal.signal(2, suicide)
            signal.signal(3, suicide)
            signal.signal(15, suicide)
    
    
    # 获取选项解析器
    def getOptionParser():
        usage = "usage: %prog [options] job-url-or-path"
        parser = OptionParser(usage)  # 定义解析器
    
        # 生产环境 配置组
        prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                         "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                         "Make sure these options can be used in Product Env.")
        prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                      default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
        prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                      help="Set job unique id when running by Distribute/Local Mode.")
        prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                      action="store", default="standalone",
                                      help="Set job runtime mode such as: standalone, local, distribute. "
                                           "Default mode is standalone.")
        prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                      action="store", dest="params",
                                      help='Set job parameter, eg: the source tableName you want to set it by command, '
                                           'then you can use like this: -p"-DtableName=your-table-name", '
                                           'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                           'Note: you should config in you job tableName with ${tableName}.')
        prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                      action="store", dest="reader", type="string",
                                      help='View job config[reader] template, eg: mysqlreader,streamreader')
        prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                      action="store", dest="writer", type="string",
                                      help='View job config[writer] template, eg: mysqlwriter,streamwriter')
        # 将“生产环境配置组”添加到 解析器
        parser.add_option_group(prodEnvOptionGroup)
    
        # 测试环境 配置组
        devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                        "Developer use these options to trace more details of DataX.")
        devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                     help="Set to remote debug mode.")
        devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                     default="info", help="Set log level such as: debug, info, all etc.")
        # 将“测试环境配置组”添加到 解析器
        parser.add_option_group(devEnvOptionGroup)
        return parser
    
    
    # 生成job配置模板
    def generateJobConfigTemplate(reader, writer):
        readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (
            reader, reader, reader)
        writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (
            writer, writer, writer)
        print("readerRef" + readerRef)
        print("writerRef" + writerRef)
        jobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
        print("jobGuid" + jobGuid)
        jobTemplate = {
            "job": {
                "setting": {
                    "speed": {
                        "channel": ""
                    }
                },
                "content": [
                    {
                        "reader": {},
                        "writer": {}
                    }
                ]
            }
        }
        readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader)
        print("readerTemplatePath===" + readerTemplatePath)
        writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer)
        print("writerTemplatePath===" + writerTemplatePath)
        try:
            readerPar = readPluginTemplate(readerTemplatePath)
        except Exception as e:
            print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath))
        try:
            writerPar = readPluginTemplate(writerTemplatePath)
        except Exception as e:
            print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath))
        jobTemplate['job']['content'][0]['reader'] = readerPar
        jobTemplate['job']['content'][0]['writer'] = writerPar
        print(json.dumps(jobTemplate, indent=4, sort_keys=True))
    
    
    # 读取插件模板
    def readPluginTemplate(plugin):
        with open(plugin, 'r') as f:
            return json.load(f)
    
    
    # 输入路径是否是一个url
    def isUrl(path):
        if not path:
            return False
    
        assert (isinstance(path, str))
        m = re.match(r"^http[s]?://\S+\w*", path.lower())
        if m:
            return True
        else:
            return False
    
    
    # 构建启动命令行
    def buildStartCommand(options, args):
        commandMap = {}
        tempJVMCommand = DEFAULT_JVM
        if options.jvmParameters:
            tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
    
        if options.remoteDebug:
            tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
            print('local ip: ', getLocalIp())
    
        if options.loglevel:
            tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
    
        if options.mode:
            commandMap["mode"] = options.mode
    
        # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
        jobResource = args[0]
        if not isUrl(jobResource):
            jobResource = os.path.abspath(jobResource)
            if jobResource.lower().startswith("file://"):
                jobResource = jobResource[len("file://"):]
    
        jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
        if options.params:
            jobParams = jobParams + " " + options.params
    
        if options.jobid:
            commandMap["jobid"] = options.jobid
    
        commandMap["jvm"] = tempJVMCommand
        commandMap["params"] = jobParams
        commandMap["job"] = jobResource
    
        return Template(ENGINE_COMMAND).substitute(**commandMap)
    
    
    def printCopyright():
        print('''
    DataX (%s), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    ''' % DATAX_VERSION)
        sys.stdout.flush()
    
    
    if __name__ == "__main__":
        printCopyright()  # 打印版权信息等
        parser = getOptionParser()  # 获取解析器,用于解析datax启动命令里的各项参数
        options, args = parser.parse_args(sys.argv[1:])
        if options.reader is not None and options.writer is not None:
            # 如果参数中有reader和 writer 则 构建job json
            generateJobConfigTemplate(options.reader, options.writer)
            sys.exit(RET_STATE['OK'])
        if len(args) != 1:
            parser.print_help()
            sys.exit(RET_STATE['FAIL'])
    
        startCommand = buildStartCommand(options, args)
        print("startCommand===" + startCommand )
    
        child_process = subprocess.Popen(startCommand, True)
        register_signal()
        (stdout, stderr) = child_process.communicate()
    
        sys.exit(child_process.returncode)
    
    
    ===dxprof.py文件===
    #! /usr/bin/env python
    # vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:
    
    import re
    import sys
    import time
    
    REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
    REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
    REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
    REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
    REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
    REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
    REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
    REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)
    
    # {{{ function parse_timestamp() #
    def parse_timestamp(line):
        try:
            ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
        except:
            ts = 0
    
        return ts
    
    # }}} #
    
    # {{{ function parse_query_host() #
    def parse_query_host(line):
        ori = REG_SQL_JDBC.search(line)
        if (not ori):
            return ''
    
        ori = ori.group(1).split('?')[0]
        off = ori.find('@')
        if (off > -1):
            ori = ori[off+1:len(ori)]
        else:
            off = ori.find('//')
            if (off > -1):
                ori = ori[off+2:len(ori)]
    
        return ori.lower()
    # }}} #
    
    # {{{ function parse_query_table() #
    def parse_query_table(line):
        ori = REG_SQL_PATH.search(line)
        return (ori and ori.group(1).lower()) or ''
    # }}} #
    
    # {{{ function parse_reader_task() #
    def parse_task(fname):
        global LAST_SQL_UUID
        global LAST_COMMIT_UUID
        global DATAX_JOBDICT
        global DATAX_JOBDICT_COMMIT
        global UNIXTIME
        LAST_SQL_UUID = ''
        DATAX_JOBDICT = {}
        LAST_COMMIT_UUID = ''
        DATAX_JOBDICT_COMMIT = {}
    
        UNIXTIME = int(time.time())
        with open(fname, 'r') as f:
            for line in f.readlines():
                line = line.strip()
    
                if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
                    DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
                    LAST_SQL_UUID = ''
    
                if line.find('CommonRdbmsReader$Task') > 0:
                    parse_read_task(line)
                elif line.find('commit blocks') > 0:
                    parse_write_task(line)
                else:
                    continue
    # }}} #
    
    # {{{ function parse_read_task() #
    def parse_read_task(line):
        ser = REG_SQL_UUID.search(line)
        if not ser:
            return
    
        LAST_SQL_UUID = ser.group()
        if REG_SQL_WAKE.search(line):
            DATAX_JOBDICT[LAST_SQL_UUID] = {
                'stat' : 'R',
                'wake' : parse_timestamp(line),
                'done' : UNIXTIME,
                'host' : parse_query_host(line),
                'path' : parse_query_table(line)
            }
        elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
            DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
            DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
    # }}} #
    
    # {{{ function parse_write_task() #
    def parse_write_task(line):
        ser = REG_COMMIT_UUID.search(line)
        if not ser:
            return
    
        LAST_COMMIT_UUID = ser.group()
        if REG_COMMIT_WAKE.search(line):
            DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
                'stat' : 'R',
                'wake' : parse_timestamp(line),
                'done' : UNIXTIME,
            }
        elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
            DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
            DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
    # }}} #
    
    # {{{ function result_analyse() #
    def result_analyse():
        def compare(a, b):
            return b['cost'] - a['cost']
    
        tasklist = []
        hostsmap = {}
        statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
        tasklist_commit = []
        statvars_commit = {'sum' : 0, 'cnt' : 0}
    
        for idx in DATAX_JOBDICT:
            item = DATAX_JOBDICT[idx]
            item['uuid'] = idx;
            item['cost'] = item['done'] - item['wake']
            tasklist.append(item);
    
            if (not (item['host'] in hostsmap)):
                hostsmap[item['host']] = 1
                statvars['svr'] += 1
    
            if (item['cost'] > -1 and item['cost'] < 864000):
                statvars['sum'] += item['cost']
                statvars['cnt'] += 1
                statvars['max'] = max(statvars['max'], item['done'])
                statvars['min'] = min(statvars['min'], item['wake'])
    
        for idx in DATAX_JOBDICT_COMMIT:
            itemc = DATAX_JOBDICT_COMMIT[idx]
            itemc['uuid'] = idx
            itemc['cost'] = itemc['done'] - itemc['wake']
            tasklist_commit.append(itemc)
    
            if (itemc['cost'] > -1 and itemc['cost'] < 864000):
                statvars_commit['sum'] += itemc['cost']
                statvars_commit['cnt'] += 1
    
        ttl = (statvars['max'] - statvars['min']) or 1
        idx = float(statvars['cnt']) / (statvars['sum'] or ttl)
    
        tasklist.sort(compare)
        for item in tasklist:
            print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
                                                               time.strftime('%H:%M:%S', time.localtime(item['wake'])),
                                                               (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
                                                               item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))
    
        if (not len(tasklist) or not statvars['cnt']):
            return
    
        print('\n--- DataX Profiling Statistics ---')
        print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
                                                                                                             statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
        print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
                                                                                                 time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
                                                                                                 time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
                                                                                                 float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))
    
        idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
        tasklist_commit.sort(compare)
        print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
            statvars_commit['cnt'],
            statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
            idx_commit * tasklist_commit[0]['cost']))
    
    # }}} #
    
    if (len(sys.argv) < 2):
        print("Usage: %s filename" %(sys.argv[0]))
        quit(1)
    else:
        parse_task(sys.argv[1])
        result_analyse()
    
    
    ===perftrace.py文件===
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    
    """
       Life's short, Python more.
    """
    
    import re
    import os
    import sys
    import json
    import uuid
    import signal
    import time
    import subprocess
    from optparse import OptionParser
    reload(sys)
    sys.setdefaultencoding('utf8')
    
    ##begin cli & help logic
    def getOptionParser():
        usage = getUsage()
        parser = OptionParser(usage = usage)
        #rdbms reader and writer
        parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
        parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')
    
        parser.add_option('-c', '--channel',  action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
        parser.add_option('-f', '--file',   action='store', help='existing datax configuration file, include reader and writer params')
        parser.add_option('-t', '--type',   action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
        parser.add_option('-d', '--delete',   action='store', default='true', help='delete temporary files, the default value is true')
        #parser.add_option('-h', '--help',   action='store', default='true', help='print usage information')
        return parser
    
    def getUsage():
        return '''
    The following params are available for -r --reader:
        [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
        *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
        *jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
        *username:       username for datasource
        *password:       password for datasource
        *table:          table name for read data
        column:          column to be read, the default value is ['*']
        splitPk:         the splitPk column of rdbms table
        where:           limit the scope of the performance data set
        fetchSize:       how many rows to be fetched at each communicate
    
        [these params is for stream reader, used to trace rdbms write performance]
        reader-sliceRecordCount:  how man test data to mock(each channel), the default value is 10000
        reader-column          :  stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
    
    The following params are available for -w --writer:
        [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
        datasourceType:  datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
        *jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
        *username:       username for datasource
        *password:       password for datasource
        *table:          table name for write data
        column:          column to be writed, the default value is ['*']
        batchSize:       how many rows to be storeed at each communicate, the default value is 512
        preSql:          prepare sql to be executed before write data, the default value is ''
        postSql:         post sql to be executed end of write data, the default value is ''
        url:             required for ads, pattern is ip:port
        schme:           required for ads, ads database name
    
        [these params is for stream writer, used to trace rdbms read performance]
        writer-print:           true means print data read from source datasource, the default value is false
    
    The following params are available global control:
        -c --channel:    the number of concurrent tasks, the default value is 1
        -f --file:       existing completely dataX configuration file path
        -t --type:       test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
        -h --help:       print help message
    
    some demo:
    perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
    perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
    perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
    perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
    
    some example jdbc url pattern, may help:
    jdbc:oracle:thin:@ip:port:database
    jdbc:mysql://ip:port/database
    jdbc:sqlserver://ip:port;DatabaseName=database
    jdbc:postgresql://ip:port/database
    warn: ads url pattern is ip:port
    warn: test write performance will write data into your table, you can use a temporary table just for test.
    '''
    
    def printCopyright():
        DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
        print('''
    DataX Util Tools (%s), From Alibaba !
    Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
        sys.stdout.flush()
    
    
    def yesNoChoice():
        yes = set(['yes','y', 'ye', ''])
        no = set(['no','n'])
        choice = raw_input().lower()
        if choice in yes:
            return True
        elif choice in no:
            return False
        else:
            sys.stdout.write("Please respond with 'yes' or 'no'")
    ##end cli & help logic
    
    
    ##begin process logic
    def suicide(signum, e):
        global childProcess
        print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
        if childProcess:
            childProcess.send_signal(signal.SIGQUIT)
            time.sleep(1)
            childProcess.kill()
        print >> sys.stderr, "DataX Process was killed ! you did ?"
        sys.exit(-1)
    
    
    def registerSignal():
        global childProcess
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)
    
    
    def fork(command, isShell=False):
        global childProcess
        childProcess = subprocess.Popen(command, shell = isShell)
        registerSignal()
        (stdout, stderr) = childProcess.communicate()
        #阻塞直到子进程结束
        childProcess.wait()
        return childProcess.returncode
    ##end process logic
    
    
    ##begin datax json generate logic
    #warn: if not '': -> true;   if not None: -> true
    def notNone(obj, context):
        if not obj:
            raise Exception("Configuration property [%s] could not be blank!" % (context))
    
    def attributeNotNone(obj, attributes):
        for key in attributes:
            notNone(obj.get(key), key)
    
    def isBlank(value):
        if value is None or len(value.strip()) == 0:
            return True
        return False
    
    def parsePluginName(jdbcUrl, pluginType):
        import re
        #warn: drds
        name = 'pluginName'
        mysqlRegex = re.compile('jdbc:(mysql)://.*')
        if (mysqlRegex.match(jdbcUrl)):
            name = 'mysql'
        postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
        if (postgresqlRegex.match(jdbcUrl)):
            name = 'postgresql'
        oracleRegex = re.compile('jdbc:(oracle):.*')
        if (oracleRegex.match(jdbcUrl)):
            name = 'oracle'
        sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
        if (sqlserverRegex.match(jdbcUrl)):
            name = 'sqlserver'
        db2Regex = re.compile('jdbc:(db2)://.*')
        if (db2Regex.match(jdbcUrl)):
            name = 'db2'
        return "%s%s" % (name, pluginType)
    
    def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
        dataxTemplate = {
            "job": {
                "setting": {
                    "speed": {
                        "channel": 1
                    }
                },
                "content": [
                    {
                        "reader": {
                            "name": "",
                            "parameter": {
                                "username": "",
                                "password": "",
                                "sliceRecordCount": "10000",
                                "column": [
                                    "*"
                                ],
                                "connection": [
                                    {
                                        "table": [],
                                        "jdbcUrl": []
                                    }
                                ]
                            }
                        },
                        "writer": {
                            "name": "",
                            "parameter": {
                                "print": "false",
                                "connection": [
                                    {
                                        "table": [],
                                        "jdbcUrl": ''
                                    }
                                ]
                            }
                        }
                    }
                ]
            }
        }
        dataxTemplate['job']['setting']['speed']['channel'] = channel
        dataxTemplateContent = dataxTemplate['job']['content'][0]
    
        pluginName = ''
        if paramsDict.get('datasourceType'):
            pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
        elif paramsDict.get('jdbcUrl'):
            pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
        elif paramsDict.get('url'):
            pluginName = 'adswriter'
    
        theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
        dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
        dataxPluginParamsContent.update(paramsDict)
    
        dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')
    
        if readerOrWriter == 'reader':
            dataxTemplateContent.get('reader')['name'] = pluginName
            dataxTemplateContent.get('writer')['name'] = 'streamwriter'
            if paramsDict.get('writer-print'):
                dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
                del dataxPluginParamsContent['writer-print']
            del dataxPluginParamsContentOtherSide['connection']
        if readerOrWriter == 'writer':
            dataxTemplateContent.get('reader')['name'] = 'streamreader'
            dataxTemplateContent.get('writer')['name'] = pluginName
            if paramsDict.get('reader-column'):
                dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
                del dataxPluginParamsContent['reader-column']
            if paramsDict.get('reader-sliceRecordCount'):
                dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
                del dataxPluginParamsContent['reader-sliceRecordCount']
            del dataxPluginParamsContentOtherSide['connection']
    
        if paramsDict.get('jdbcUrl'):
            if readerOrWriter == 'reader':
                dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
            else:
                dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
        if paramsDict.get('table'):
            dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])
    
    
        traceJobJson = json.dumps(dataxTemplate, indent = 4)
        return traceJobJson
    
    def isUrl(path):
        if not path:
            return False
        if not isinstance(path, str):
            raise Exception('Configuration file path required for the string, you configure is:%s' % path)
        m = re.match(r"^http[s]?://\S+\w*", path.lower())
        if m:
            return True
        else:
            return False
    
    
    def readJobJsonFromLocal(jobConfigPath):
        jobConfigContent = None
        jobConfigPath = os.path.abspath(jobConfigPath)
        file = open(jobConfigPath)
        try:
            jobConfigContent = file.read()
        finally:
            file.close()
        if not jobConfigContent:
            raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
        return jobConfigContent
    
    
    def readJobJsonFromRemote(jobConfigPath):
        import urllib
        conn = urllib.urlopen(jobConfigPath)
        jobJson = conn.read()
        return jobJson
    
    def parseJson(strConfig, context):
        try:
            return json.loads(strConfig)
        except Exception as e:
            import traceback
            traceback.print_exc()
            sys.stdout.flush()
            print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
            sys.exit(-1)
    
    def convert(options, args):
        traceJobJson = ''
        if options.file:
            if isUrl(options.file):
                traceJobJson = readJobJsonFromRemote(options.file)
            else:
                traceJobJson = readJobJsonFromLocal(options.file)
            traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
            attributeNotNone(traceJobDict, ['job'])
            attributeNotNone(traceJobDict['job'], ['content'])
            attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
            attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
            attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
            if options.type == 'reader':
                traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
                if options.reader:
                    traceReaderDict = parseJson(options.reader, 'reader config')
                    if traceReaderDict.get('writer-print') is not None:
                        traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
                    else:
                        traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
                else:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
            elif options.type == 'writer':
                traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
                if options.writer:
                    traceWriterDict = parseJson(options.writer, 'writer config')
                    if traceWriterDict.get('reader-column'):
                        traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
                    if traceWriterDict.get('reader-sliceRecordCount'):
                        traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
                else:
                    columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
                    streamReaderColumn = []
                    for i in range(columnSize):
                        streamReaderColumn.append({"type": "long", "random": "2,10"})
                    traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
                    traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
            else:
                pass#do nothing
            return json.dumps(traceJobDict, indent = 4)
        elif options.reader:
            traceReaderDict = parseJson(options.reader, 'reader config')
            return renderDataXJson(traceReaderDict, 'reader', options.channel)
        elif options.writer:
            traceWriterDict = parseJson(options.writer, 'writer config')
            return renderDataXJson(traceWriterDict, 'writer', options.channel)
        else:
            print(getUsage())
            sys.exit(-1)
        #dataxParams = {}
        #for opt, value in options.__dict__.items():
        #    dataxParams[opt] = value
    ##end datax json generate logic
    
    
    if __name__ == "__main__":
        printCopyright()
        parser = getOptionParser()
    
        options, args = parser.parse_args(sys.argv[1:])
        #print options, args
        dataxTraceJobJson = convert(options, args)
    
        #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
        dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
        jobConfigOk = True
        if os.path.exists(dataxJobPath):
            print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
            if yesNoChoice():
                jobConfigOk = True
            else:
                print("exit failed, because of file conflict")
                sys.exit(-1)
        fileWriter = open(dataxJobPath, 'w')
        fileWriter.write(dataxTraceJobJson)
        fileWriter.close()
    
    
        print("trace environments:")
        print("dataxJobPath:  %s" % dataxJobPath)
        dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        print("dataxHomePath: %s" % dataxHomePath)
    
        dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
        print("dataxCommand:  %s" % dataxCommand)
    
        returncode = fork(dataxCommand, True)
        if options.delete == 'true':
            os.remove(dataxJobPath)
        sys.exit(returncode)
    
    

    2 改造package.xml文件

    文件位置: xx\DataX\core\src\main\assembly\package.xml 。 该文件主要功能是‘拷贝’源代码中的脚本到target,实现打包的功能。package.xml 文件被 xx\DataX\core\pom.xml引用;

    ===package.xml文件===
    <assembly
            xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
        <id></id>
        <formats>
            <format>dir</format>
        </formats>
        <includeBaseDirectory>false</includeBaseDirectory>
        <fileSets>
            <!-- for bin -->
    <!--        <fileSet>-->
    <!--            <directory>src/main/bin</directory>-->
    <!--            <includes>-->
    <!--                <include>*.*</include>-->
    <!--            </includes>-->
    <!--            <excludes>-->
    <!--                <exclude>*.pyc</exclude>-->
    <!--            </excludes>-->
    <!--            <directoryMode>775</directoryMode>-->
    <!--            <outputDirectory>/bin</outputDirectory>-->
    <!--        </fileSet>-->
        
            <fileSet>
                <directory>src/main/bin</directory>
                <includes>
                    <include>*</include>
                </includes>
                <excludes>
                    <exclude>*.pyc</exclude>
                </excludes>
                <directoryMode>775</directoryMode>
                <outputDirectory>/bin</outputDirectory>
            </fileSet>
            <fileSet>
                <directory>src/main/bin/py/py2</directory>
                <includes>
                    <include>*</include>
                </includes>
                <excludes>
                    <exclude>*.pyc</exclude>
                </excludes>
                <directoryMode>775</directoryMode>
                <outputDirectory>/bin/py/py2</outputDirectory>
            </fileSet>
            
            <fileSet>
                <directory>src/main/bin/py/py3</directory>
                <includes>
                    <include>*</include>
                </includes>
                <excludes>
                    <exclude>*.pyc</exclude>
                </excludes>
                <directoryMode>775</directoryMode>
                <outputDirectory>/bin/py/py3</outputDirectory>
            </fileSet>
            <!-- for scripts -->
            <fileSet>
                <directory>src/main/script</directory>
                <includes>
                    <include>*.*</include>
                </includes>
                <directoryMode>775</directoryMode>
                <outputDirectory>/script</outputDirectory>
            </fileSet>
            <!-- for configs -->
            <fileSet>
                <directory>src/main/conf</directory>
                <includes>
                    <include>*.*</include>
                </includes>
                <outputDirectory>/conf</outputDirectory>
            </fileSet>
            <!-- for engine -->
            <fileSet>
                <directory>target/</directory>
                <includes>
                    <include>datax-core-0.0.1-SNAPSHOT.jar</include>
                </includes>
                <outputDirectory>/lib</outputDirectory>
            </fileSet>
    
            <fileSet>
                <directory>src/main/job/</directory>
                <includes>
                    <include>*.json</include>
                </includes>
                <outputDirectory>/job</outputDirectory>
            </fileSet>
    
            <fileSet>
                <directory>src/main/tools/</directory>
                <includes>
                    <include>*.*</include>
                </includes>
                <outputDirectory>/tools</outputDirectory>
            </fileSet>
    
            <fileSet>
                <fileMode>777</fileMode>
                <directory>src/main/tmp</directory>
                <includes>
                    <include>*.*</include>
                </includes>
                <outputDirectory>/tmp</outputDirectory>
            </fileSet>
        </fileSets>
    
        <dependencySets>
            <dependencySet>
                <useProjectArtifact>false</useProjectArtifact>
                <outputDirectory>/lib</outputDirectory>
                <scope>runtime</scope>
            </dependencySet>
        </dependencySets>
    </assembly>
    

    3 新增datax.py文件

    此datax.py非彼datax.py,此文件主要用来检查用户的py环境,从而调用不同的py脚本;

    ===datax.py 如果有更好的写法,欢迎补充===
    #!/usr/bin/python
    # -*- coding: UTF-8 -*-
    # 根据机器上py版本,自动选择调用py2还是py3
    
    import sys
    import os
    currentFilePath = os.path.dirname(os.path.abspath(__file__))
    print("currentFilePath===" + currentFilePath)
    
    argvs=sys.argv[1:]
    params = " ".join(argvs)
    print("params===" + params)
    
    if sys.version > '3':
        pyPath = "py3"
    else:
        pyPath = "py2"
    
    execPy = "python " + currentFilePath +os.sep +"py" +os.sep + pyPath + os.sep + "datax.py " + params
    print("execPy===" + execPy)
    os.system(execPy)
    

    4 各类文件归位

    如下图在core模块创建目录,将对应文件放入。(忽略datax.bat,该文件是想在win下无py环境直接调用datax,正在编写中)
    在这里插入图片描述
    项目打包编译后结果如下

    在这里插入图片描述

    最后,开心的去奔跑玩耍吧,管你py2还是3,just running~~~



    注:

    1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

    2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

    展开全文
  • 前几天做了几个简单的爬虫python程序,于是就想做个窗口看看效果。... 生成py文件后,运行该py文件,这里窗口我只是随便拖了几个组件进去,主要的text browser用于显示获取到的sinanews。  首先贴一下我的配置(...

      前几天做了几个简单的爬虫python程序,于是就想做个窗口看看效果。


      首先是,窗口的话,以前没怎么接触过,就先考虑用Qt制作简单的ui。这里用前面sinanews的爬虫脚本为例,制作一个获取当天sina头条新闻的窗口。

      生成py文件后,运行该py文件,这里窗口我只是随便拖了几个组件进去,主要的text browser用于显示获取到的sinanews。

      首先贴一下我的配置(点击即可官方下载): Python 3.3.3

                          PyQt5-5.2.1 for Py3.3(当安装完Python3.3后,安装对应PyQt,其会找到Python安装目录,不用更改安装目录)

      Python3.3默认是没有安装pip的,需要下载get-pip.py运行之后,提示安装成功。

    接下来就要安装一些必要的组件了。为了安装方便,先把pip添加进环境变量。下面我们就可以用pip命令安装组件了。

    先把sina_news.py贴出来,观察需要哪些组件。

    import requests
    from bs4 import BeautifulSoup
    res = requests.get('http://news.sina.com.cn/china/')
    res.encoding = 'utf-8'
    soup = BeautifulSoup(res.text,'html.parser')
    
    for news in soup.select('.news-item'):
        if len(news.select('h2')) > 0:
            h2 = news.select('h2')[0].text
            a = news.select('a')[0]['href']
            time = news.select('.time')[0].text
            print(time,h2,a)

    发现import requests,import BeautifulSoup 所以先来安装这些组件

    pip install requests
    
    pip install BeautifulSoup4

    当我们把这段代码贴进窗口代码后:

    x.py

    # -*- coding: utf-8 -*-
    
    # Form implementation generated from reading ui file 'x.ui'
    #
    # Created by: PyQt5 UI code generator 5.8.1
    #
    # WARNING! All changes made in this file will be lost!
    import sys
    import requests
    from PyQt5 import QtCore, QtGui, QtWidgets
    from bs4 import BeautifulSoup
    
    class Ui_x(object):
        def getNews():
            res = requests.get('http://news.sina.com.cn/china/')
            res.encoding = 'utf-8'
            soup = BeautifulSoup(res.text,'html.parser')
            title = []
            for news in soup.select('.news-item'):
                if len(news.select('h2')) > 0:
                    h2 = news.select('h2')[0].text
                    title.append(h2)
                    a = news.select('a')[0]['href']
                    time = news.select('.time')[0].text
            return '\n'.join(title)
    
        
        def setupUi(self, x):
            x.setObjectName("x")
            x.resize(841, 749)
            self.timeEdit = QtWidgets.QTimeEdit(x)
            self.timeEdit.setGeometry(QtCore.QRect(310, 10, 141, 31))
            self.timeEdit.setObjectName("timeEdit")
            self.dateEdit = QtWidgets.QDateEdit(x)
            self.dateEdit.setGeometry(QtCore.QRect(100, 10, 191, 31))
            self.dateEdit.setObjectName("dateEdit")
            self.textBrowser = QtWidgets.QTextBrowser(x)
            self.textBrowser.setGeometry(QtCore.QRect(60, 80, 701, 641))
            self.textBrowser.setObjectName("textBrowser")
            self.retranslateUi(x)
            QtCore.QMetaObject.connectSlotsByName(x)
    
        def retranslateUi(self, x):
            _translate = QtCore.QCoreApplication.translate
            x.setWindowTitle(_translate("x", "x"))
    
    if __name__ == '__main__':   
        app = QtWidgets.QApplication(sys.argv)
        Form = QtWidgets.QWidget()
        ui = Ui_x()
        ui.setupUi(Form)
        Form.show()
        ui.textBrowser.setText(Ui_x.getNews())
        sys.exit(app.exec_())

     

    如果前面顺利的话,现在用python运行x.py应该能看到显示的窗口。

    下面就是打包的过程了,这里笔者用的Pyinstaller,没有安装的话,要安装一下:

    pip install pyinstaller

    安装完成后,cmd路径cd到x.py所在目录。打包命令:

    Pyinstaller -w x.py

    此时,在x.py便生成dist文件夹,打包的x.exe就在此文件夹下。双击x.exe显示效果:

    当然还有许多改进的地方,比如在上面选择日期,获得指定日期的头条新闻。

    笔者在这片博文主要介绍py文件的打包过程。

    可能遇到的问题

    打开打包后的程序无法运行
    显示:
    ImportError: No module named 'queue'

    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
    File "test.py", line 2, in <module>
    File "c:\users\hasee\appdata\local\programs\python\python35-32\lib\site-packages\PyInstaller\loader\pyimod03_importers.py", line 389, in load_module
    exec(bytecode, module.__dict__)
    File "site-packages\requests\__init__.py", line 63, in <module>
    File "c:\users\hasee\appdata\local\programs\python\python35-32\lib\site-packages\PyInstaller\loader\pyimod03_importers.py", line 389, in load_module
    exec(bytecode, module.__dict__)
    File "site-packages\requests\utils.py", line 24, in <module>
    File "c:\users\hasee\appdata\local\programs\python\python35-32\lib\site-packages\PyInstaller\loader\pyimod03_importers.py", line 389, in load_module
    exec(bytecode, module.__dict__)
    File "site-packages\requests\_internal_utils.py", line 11, in <module>
    File "c:\users\hasee\appdata\local\programs\python\python35-32\lib\site-packages\PyInstaller\loader\pyimod03_importers.py", line 389, in load_module
    exec(bytecode, module.__dict__)
    File "site-packages\requests\compat.py", line 11, in <module>
    File "c:\users\hasee\appdata\local\programs\python\python35-32\lib\site-packages\PyInstaller\loader\pyimod03_importers.py", line 389, in load_module
    exec(bytecode, module.__dict__)
    File "site-packages\requests\packages\__init__.py", line 29, in <module>
    ImportError: No module named 'urllib3'
    Failed to execute script test

     

    当然这个错误代码,当时我没有保留,这是版本不匹配造成的:

    我的Pyinstaller为3.2

    需要降低requests的版本,requests2.10可以成功打包,而2.11就不行。这里贴上解决此问题用到的requests2.10不知道以后会不会修复这个问题。这个bug昨天做梦我还梦到呢。今天早上起来就解决了,兴奋的受不了。希望在此过程中遇到的问题对你会有所帮助。

                          

     

     

    转载于:https://www.cnblogs.com/dearvee/p/6580370.html

    展开全文
  • all_test.py这个文件是调用test_case下面的所有的.py文件,并且自动执行test_case里面以test开头的用例,执行顺序按照ascii码。 具体是怎么展现的看如下 因为生成测试报告需要借助第三方的模块,所以接下来要...

    all_test.py这个文件是调用test_case下面的所有的.py文件,并且自动执行test_case里面以test开头的用例,执行顺序按照ascii码。

    具体是怎么展现的看如下

    因为生成测试报告需要借助第三方的模块,所以接下来要配置HTMLTestRunnner.py文件,将该文件放到C:\Program Files\Python35\Lib下面 

    先到网盘将HTMLTestRunnner.py下载

    链接:https://pan.baidu.com/s/1foTU9UdAZiV1HBpWbgXF-A
    提取码:ky0w 

    接着将该文件放到C:\ProgramFiles\Python35\Lib下面 

    然后创建一个python.File,命名为all_test.py


    下面为all_test.py的代码

    from HTMLTestRunner import HTMLTestRunner
    import unittest,time,os
    test_case_ = os.path.dirname(os.path.realpath(__file__))
    print(test_case_)
    test_case = os.path.join(test_case_,"test_case")
    print(test_case)
    test_report = os.path.join(test_case_,"report")
    print(test_report)
    
    test_list = unittest.defaultTestLoader.discover(test_case,pattern="test*.py")
    
    if __name__ == '__main__':
        now = time.strftime("%Y_%m_%d %H_%M_%S")
        filename = test_report + "\\" + now + "_result.html"
        fp = open(filename,'wb')
        runner = HTMLTestRunner(stream=fp,
                                title=u"写你项目名字",
                                description="window10 Chrome browser 62.0 version写你测试的环境")
        runner.run(test_list)
        fp.close()
    

     这里主编有个小失误,如果想成功运行成下图的展示,需要将test1.py.py文件改一下,改成以test_的模式如下图我改成test_web

    展开全文
  • 但他具体是怎么强大的,让我们一点一点来了解吧(小编每天晚上下班回家会抽时间看看教程,多充实下自己也是好的)。废话不多说,就讲一下这个背景吧:事情是这个样子的~本着好学的精神,咱就买了本书,学习python...
  • Python实现批量执行同目录下的py文件

    千次阅读 2016-08-02 19:25:07
    Python版本:3.5 网上找了好多资料都没有直观的写出怎么批量执行,so,整理了一个小程序。最初是为了用Python进行单元测试,同目录下有两个...#批量执行当前文件夹下所有py文件 import os lst=os.listdir(os.getcwd())
  • 但是我的命令是python setup.py install,换其他的好像都还是不行啊。。。这要怎么解决啊。。。小白求教,谢谢! <p><img alt="" height="71" src=...
  • <div><p>下载了压缩包后,发现有gscan文件夹,却不知道怎么使用,拜托了,请多指教</p><p>该提问来源于开源项目:out0fmemory/GoAgent-Always-Available</p></div>
  • 但他具体是怎么强大的,让我们一点一点来了解吧(小编每天晚上下班回家会抽时间看看教程,多充实下自己也是好的)。 废话不多说,就讲一下这个背景吧: 事情是这个样子的~本着好学的精神,咱就买了本书,学习python...
  • 其实,我是尝试了pyinstaller后,打包的exe运行会闪退,不会出现GUI界面,我也不知道怎么改,因此换用cx-freeze。具体步骤很简单:在anaconda上安装cx-freeze。pip install cx-freeze。然后,在anaconda下进行打包...
  • python 新手,下载别人的代码自己跑,有一段执行其他目录下的python文件的代码,会报错。 ``` cmd = './b/b.py' subprocess.call(cmd, shell=True) ``` 报错为 **'.' 不是内部或外部命令,也不是可运行...
  • 首先找到python3文件的完整路径, 怎么找呢-- 在终端输入$ which python3 即可;其次将该完整路径前添加#!在把它们添加到要执行的.py代码的第一行即可
  • 传统我们写python代码和运行都是用pycharm,sublimeText等软件完成的,那么如果一个代码需要运行多次,怎么才能让py文件快速运行,像window的exe一样双击执行呢,这里提供两种方法,一种是利用bat文件进行批处理运行...
  • <div><p>你好,我从GitHub导入最新项目到eclipse&#...<p>1、我的问题在于执行first_task_execution文件夹下py文件的时候能够成功执行(云打码平台查到打码记录、数据库能够看到数据 <p>2、我的到的结果是 ...
  • 在我尝试使用pyinstaller将多个py文件打包成exe时,...就是这些py文件有一个执行顺序,我又不想将所有代码写入同一个py文件中,该怎么解决这个问题呢,哪位大佬解答一下,多谢了</p>
  • 用PyInstaller将.py文件打包成.exe文件

    万次阅读 2017-03-17 20:26:23
    上一篇文章中已经把PyInstaller安装好了,下面就来看看怎么用它把写的python程序打包成直接执行的.exe文件。 先写一个简单的判断质数的程序,保存为isprime.py。先用命令提示符运行这个程序。把这个文件复制到...
  • 用pyinstaller -F 打包含pyecharts库的py文件成exe文件时报错jinja2.exceptions.TemplateNotFound: template.html怎么解决出现原因解决办法 出现原因 正如报错信息所说,因为使用pyinstaller -F命令打包成一个简单的...
  • 2.其次将python文件和ico图标放到同一文件夹下,进行下图操作:(我这里放到D:\py_exe文件夹中,其中图标名称为abc.ico、python文件名称为abc.py) pyinstaller -F -i abc.ico abc.py 3.最后(在py_exe文件下的...
  • 不知道怎么回事文件锁着了不能修改了,只能读 自己的处理方法到文件所在的目录 执行 :sudo chmod 777 * 最后解锁了。
  • 下面讲一下,怎么用定时任务运行python脚本文件*.py: 1、点击右键创建任务,然后在弹出的任务窗口中的“常规”一栏中输入定时任务的名称“python定时任务测试”: 2、在触发器一栏中点击“新建”,...
  • 昔日的小提琴恩师在之前的学习群里艾特我 视频中有两个xgg 两个xjj演奏同一首曲子的片段 本来想着直接鉴赏一下下然后给个评分滴 然而最近刚好在学人工智能...关键在于评判分数的指标怎么定 用于训练的数据集如何获取
  • 在我们完成一个Python项目或一个程序时,希望将Python的py文件打包成在Windows系统下直接可以运行的exe程序。在浏览网上的资料来看,有利用pyinstaller和cx_Freeze进行打包的方法。关于cx_Freeze打包的方法,可以...
  • 所以val.txt文件到底要怎么样才能生成啊?(我把zip解压好之后,倒是可以生成相关的png文件,train.txt也不是空的)可能这个问题对大佬们来说来说太简单了,我在网上找不到问题...

空空如也

空空如也

1 2 3 4 5 ... 18
收藏数 349
精华内容 139
关键字:

py文件怎么执行