精华内容
下载资源
问答
  • 今天带给大家的是从新浪和腾讯爬取股票数据,...2、爬取所有沪深两市股票行情数据,可选择时分线、日k线、周K线、月K线、股票最新行情。3、根据指定的选股策略和指定的日期进行选股测试。4、计算选股测试实际结果(...

    今天带给大家的是从新浪和腾讯爬取股票数据,主要是因为新浪和腾讯的股票数据存储在js中,不需要再重新解析网页源码方便很多。

    今天我们要实现的股票爬取设计内容丰富,包括:

    1、一个股票数据(沪深)爬虫和选股策略测试框架,数据基于腾讯L和新浪财经。

    2、爬取所有沪深两市股票的行情数据,可选择时分线、日k线、周K线、月K线、股票最新行情。

    3、根据指定的选股策略和指定的日期进行选股测试。

    4、计算选股测试实际结果(包括与沪深300指数比较)。

    5、 保存数据到JSON文件、CSV文件。

    6、支持使用表达式定义选股策略。

    7、支持多线程处理。

    使用环境python3.6

    option.py

    首先是参数定义部分option.py,这一部分用于控制代码的运行,主要包括,是否下载数据,是否进行测试,文件保存格式,文件的字符编码、测试的时间范围、抓起的时间起点、抓取的时间终点、股票数据保存路径、线程数目、选股策略文件路径(UTF8编码)、选股策略的数据库名称(不用数据库可以不用关心)、策略表达式(也可以将策略表达式放在文件中)

    #coding:utf-8

    #爬虫参数配置

    import argparse

    import datetime

    #获取偏移指定天数的时间表达式

    def get_date_str(offset):

    if(offset is None):

    offset = 0

    date_str = (datetime.datetime.today() + datetime.timedelta(days=offset)).strftime("%Y%m%d")

    return date_str

    _default = dict(

    reload_data = 'Y', # --reload {Y,N} 是否重新抓取股票数据,默认值:Y

    gen_portfolio = 'Y', # --portfolio {Y,N} 是否生成选股测试结果,默认值:N

    output_type = 'json', # --output {json,csv,all} 输出文件格式,默认值:json

    charset = 'utf-8', # --charset {utf-8,gbk} 输出文件编码,默认值:utf-8

    test_date_range = 60, # --testrange NUM 测试日期范围天数,默认值:50

    start_date = get_date_str(-90), # --startdate yyyy-MM-dd 抓取数据的开始日期,默认值:当前系统日期-100天(例如2015-01-01)

    end_date = get_date_str(None), # --enddate yyyy-MM-dd 抓取数据的结束日期,默认值:当前系统日期

    target_date = get_date_str(None), # --targetdate yyyy-MM-dd 测试选股策略的目标日期,默认值:当前系统日期

    store_path = 'stockholm_export', # --storepath PATH 输出文件路径,默认值:stockholm_export

    thread = 10, # --thread NUM 线程数,默认值:10

    testfile_path = './portfolio_test.txt',# --testfile PATH 选股策略文件路径,默认值:./portfolio_test.txt

    db_name = 'stockholm', #选股策略数据库名称

    methods = '' #选股策略表达式

    )

    parser = argparse.ArgumentParser(description='A stock crawler and portfolio testing framework.')

    parser.add_argument('--reload', type=str, default=_default['reload_data'], dest='reload_data', help='Reload the stock data or not (Y/N), Default: %s' % _default['reload_data'])

    parser.add_argument('--portfolio', type=str, default=_default['gen_portfolio'], dest='gen_portfolio', help='Generate the portfolio or not (Y/N), Default: %s' % _default['gen_portfolio'])

    parser.add_argument('--output', type=str, default=_default['output_type'], dest='output_type', help='Data output type (json/csv/all), Default: %s' % _default['output_type'])

    parser.add_argument('--charset', type=str, default=_default['charset'], dest='charset', help='Data output charset (utf-8/gbk), Default: %s' % _default['charset'])

    parser.add_argument('--testrange', type=int, default=_default['test_date_range'], dest='test_date_range', help='Test date range(days): %s' % _default['test_date_range'])

    parser.add_argument('--startdate', type=str, default=_default['start_date'], dest='start_date', help='Data loading start date, Default: %s' % _default['start_date'])

    parser.add_argument('--enddate', type=str, default=_default['end_date'], dest='end_date', help='Data loading end date, Default: %s' % _default['end_date'])

    parser.add_argument('--targetdate', type=str, default=_default['target_date'], dest='target_date', help='Portfolio generating target date, Default: %s' % _default['target_date'])

    parser.add_argument('--storepath', type=str, default=_default['store_path'], dest='store_path', help='Data file store path, Default: %s' % _default['store_path'])

    parser.add_argument('--thread', type=int, default=_default['thread'], dest='thread', help='Thread number, Default: %s' % _default['thread'])

    parser.add_argument('--testfile', type=str, default=_default['testfile_path'], dest='testfile_path', help='Portfolio test file path, Default: %s' % _default['testfile_path'])

    parser.add_argument('--dbname', type=str, default=_default['db_name'], dest='db_name', help='MongoDB DB name, Default: %s' % _default['db_name'])

    parser.add_argument('--methods', type=str, default=_default['methods'], dest='methods', help='Target methods for back testing, Default: %s' % _default['methods'])

    def main():

    args = parser.parse_args()

    print(args)

    if __name__ == '__main__':

    main()

    股票数据爬取、测试,存储、读取等全部主要功能程序stockholm.py

    #coding:utf-8

    import requests

    import json

    import datetime

    import timeit

    import time

    import io

    import os

    import csv

    import re

    from pymongo import MongoClient

    from multiprocessing.dummy import Pool as ThreadPool

    from functools import partial

    class Stockholm(object):

    def __init__(self, args):

    ## --reload {Y,N} 是否重新抓取股票数据,默认值:Y

    self.reload_data = args.reload_data

    ## --portfolio {Y,N} 是否生成选股测试结果,默认值:N

    self.gen_portfolio = args.gen_portfolio

    ## --output {json,csv,all} 输出文件格式,默认值:json

    self.output_type = args.output_type

    ## --charset {utf-8,gbk} 输出文件编码,默认值:utf-8

    self.charset = args.charset

    ## --testrange NUM 测试日期范围天数,默认值:50

    self.test_date_range = args.test_date_range

    ## --startdate yyyy-MM-dd 抓取数据的开始日期,默认值:当前系统日期-100天(例如2015-01-01)

    self.start_date = args.start_date

    ## --enddate yyyy-MM-dd 抓取数据的结束日期,默认值:当前系统日期

    self.end_date = args.end_date

    ## --targetdate yyyy-MM-dd 测试选股策略的目标日期,默认值:当前系统日期

    self.target_date = args.target_date

    ## --thread NUM 线程数,默认值:10

    self.thread = args.thread

    ## --storepath PATH 输出文件路径,默认值:stockholm_export

    if(args.store_path == 'USER_HOME/tmp/stockholm_export'):

    self.export_folder = os.path.expanduser('~') + '/tmp/stockholm_export'

    else:

    self.export_folder = args.store_path

    ## --testfile PATH 测试文件路径,默认值:./portfolio_test.txt

    self.testfile_path = args.testfile_path

    ## 选股策略的字符串表达式

    self.methods = args.methods

    ## 导出的文件名称

    self.export_file_name = 'stockholm_export'

    #大盘指数

    self.index_array = ['sh000001', 'sz399001', 'sh000300']

    self.sh000001 = {'Symbol': 'sh000001', 'Name': '上证指数'}

    self.sz399001 = {'Symbol': 'sz399001', 'Name': '深证成指'}

    self.sh000300 = {'Symbol': 'sh000300', 'Name': '沪深300'}

    ## self.sz399005 = {'Symbol': '399005.SZ', 'Name': '中小板指'}

    ## self.sz399006 = {'Symbol': '399006.SZ', 'Name': '创业板指'}

    ## mongodb数据库的相关信息

    self.mongo_url = 'localhost'

    self.mongo_port = 27017

    self.database_name = args.db_name

    self.collection_name = 'testing_method'

    #获取csv文件存储时,第一行的列名

    def get_columns(self, quote):

    columns = []

    if(quote is not None):

    for key in quote.keys():

    if(key == 'Data'):

    for data_key in quote['Data'][-1]:

    columns.append("data." + data_key)

    else:

    columns.append(key)

    columns.sort()

    return columns

    #比较两个价格的变化率

    def get_profit_rate(self, price1, price2):

    if(price1 == 0):

    return None

    else:

    return round((price2-price1)/price1, 5)

    #获取均值

    def get_MA(self, number_array):

    total = 0

    n = 0

    for num in number_array:

    if num is not None and num != 0:

    n += 1

    total += num

    return round(total/n, 3)

    #将选股策略字符串表达式,转化为python函数表达式

    def convert_value_check(self, exp):

    val = exp.replace('day', 'quote[\'Data\']').replace('(0)', '(-0)')

    val = re.sub(r'\(((-)?\d+)\)', r'[target_idx\g<1>]', val)

    val = re.sub(r'\.\{((-)?\w+)\}', r"['\g<1>']", val)

    return val

    def convert_null_check(self, exp):

    p = re.compile('\((-)?\d+...\w+\}')

    iterator = p.finditer(exp.replace('(0)', '(-0)'))

    array = []

    for match in iterator:

    v = 'quote[\'Data\']' + match.group()

    v = re.sub(r'\(((-)?\d+)\)', r'[target_idx\g<1>]', v)

    v = re.sub(r'\.\{((-)?\w+)\}', r"['\g<1>']", v)

    v += ' is not None'

    array.append(v)

    val = ' and '.join(array)

    return val

    #KDJ指数类

    class KDJ():

    def _avg(self, array):

    length = len(array)

    return sum(array)/length

    def _getMA(self, values, window):

    array = []

    x = window

    while x <= len(values):

    curmb = 50

    if(x-window == 0):

    curmb = self._avg(values[x-window:x])

    else:

    curmb = (array[-1]*2+values[x-1])/3

    array.append(round(curmb,3))

    x += 1

    return array

    def _getRSV(self, arrays):

    rsv = []

    x = 9

    while x <= len(arrays):

    high = max(map(lambda x: x['High'], arrays[x-9:x]))

    low = min(map(lambda x: x['Low'], arrays[x-9:x]))

    close = arrays[x-1]['Close']

    rsv.append((close-low)/(high-low)*100)

    t = arrays[x-1]['Date']

    x += 1

    return rsv

    #根据股票数据,计算KDJ指数

    def getKDJ(self, quote_data):

    if(len(quote_data) > 12):

    rsv = self._getRSV(quote_data)

    k = self._getMA(rsv,3)

    d = self._getMA(k,3)

    j = list(map(lambda x: round(3*x[0]-2*x[1],3), zip(k[2:], d)))

    for idx, data in enumerate(quote_data[0:12]):

    data['KDJ_K'] = None

    data['KDJ_D'] = None

    data['KDJ_J'] = None

    for idx, data in enumerate(quote_data[12:]):

    data['KDJ_K'] = k[2:][idx]

    data['KDJ_D'] = d[idx]

    if(j[idx] > 100):

    data['KDJ_J'] = 100

    elif(j[idx] < 0):

    data['KDJ_J'] = 0

    else:

    data['KDJ_J'] = j[idx]

    return quote_data

    # 下载所有的股票编号,返回字典数组,每个元素都是股票编号字典

    def load_all_quote_symbol(self):

    print("开始下载所有的股票符号..." + "\n")

    start = timeit.default_timer()

    all_quotes = []

    all_quotes.append(self.sh000001) #将大盘添加到最前面

    all_quotes.append(self.sz399001) #将大盘添加到最前面

    all_quotes.append(self.sh000300) #将大盘添加到最前面

    ## all_quotes.append(self.sz399005) #将大盘添加到最前面

    ## all_quotes.append(self.sz399006) #将大盘添加到最前面

    try:

    count = 1

    while (count < 100):

    para_val = '[["hq","hs_a","",0,' + str(count) + ',500]]'

    r_params = {'__s': para_val}

    all_quotes_url = 'http://money.finance.sina.com.cn/d/api/openapi_proxy.php'

    r = requests.get(all_quotes_url, params=r_params) #根据网址下载所有的股票编号,这里使用的是新浪网址,有很多网址可以下载股票编号

    print("从 "+r.url+" 处下载所有的股票编号")

    if(len(r.json()[0]['items']) == 0): #响应返回的是一个数组,自己打开网址一看便懂

    break

    for item in r.json()[0]['items']:

    quote = {}

    code = item[0] #股票编号

    name = item[2] #股票名称

    # 获取股票编号后再进行每个股票的数据爬取,需要先对编号转换一下格式,后面我们使用腾讯股票接口,编号格式相同,所以这不需要进行转换

    # if(code.find('sh') > -1):

    # code = code[2:] + '.SS'

    # elif(code.find('sz') > -1):

    # code = code[2:] + '.SZ'

    ## 将股票基本信息记入字典

    quote['Symbol'] = code #股票编号

    quote['Name'] = name #股票名称

    all_quotes.append(quote)

    count += 1

    except Exception as e:

    print("Error: 下载股票编号失败..." + "\n")

    print(e)

    print("下载所有的股票编号完成... time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    return all_quotes

    # 根据指定股票编号,下载此股票的数据,这里使用腾信接口

    def load_quote_data(self, quote, start_date, end_date, is_retry, counter):

    # print("开始下载指定股票的数据..." + "\n")

    start = timeit.default_timer()

    # 直接从腾讯的js接口中读取

    if (quote is not None and quote['Symbol'] is not None):

    try:

    url = 'http://data.gtimg.cn/flashdata/hushen/latest/daily/' + quote['Symbol'] + '.js' # 腾讯的日k线数据

    r = requests.get(url) # 向指定网址请求,下载股票数据

    alldaytemp = r.text.split("\\n\\")[2:] #根据返回的字符串进行处理提取出股票数据的数组形式

    quote_data=[]

    for day in alldaytemp:

    if(len(day)<10): #去掉一些不对的数据,这里去除方法比较笼统.

    continue

    oneday = day.strip().split(' ') #获取日K线的数据。strip为了去除首部的\n,' '来分割数组,分割出来的数据分别是日期、开盘、收盘、最高、最低、成交量

    onedayquote={}

    onedayquote['Date'] = "20"+oneday[0] #腾讯股票数据中时间没有20170513中的20,所以这里加上,方便后面比较

    onedayquote['Open'] = oneday[1] #开盘

    onedayquote['Close'] = oneday[2] #收盘

    onedayquote['High'] = oneday[3] #最高

    onedayquote['Low'] = oneday[4] #最低

    onedayquote['Volume'] = oneday[5] #成交量

    quote_data.append(onedayquote)

    quote['Data'] = quote_data #当前股票每天的数据

    # print(quote_data)

    if (not is_retry):

    counter.append(1)

    except:

    print("Error: 加载指定股票的数据失败... " + quote['Symbol'] + "/" + quote['Name'] + "\n")

    if (not is_retry):

    time.sleep(2)

    self.load_quote_data(quote, start_date, end_date, True,counter) ##这里重试,以免是因为网络问题而导致的下载失败

    print("下载指定股票 " + quote['Symbol'] + "/" + quote['Name'] + " 完成..." + "\n")

    ## print("time cost: " + str(round(timeit.default_timer() - start)) + "s." + "\n")

    ## print("total count: " + str(len(counter)) + "\n")

    return quote

    #根据股票符号,下载所有的股票数据

    def load_all_quote_data(self, all_quotes, start_date, end_date):

    print("开始下载所有股票数据..." + "\n")

    start = timeit.default_timer()

    counter = []

    mapfunc = partial(self.load_quote_data, start_date=start_date, end_date=end_date, is_retry=False, counter=counter) #创建映射函数

    pool = ThreadPool(self.thread) # 开辟包含指定数目线程的线程池

    pool.map(mapfunc, all_quotes) # 多线程执行下载工作

    pool.close()

    pool.join()

    print("下载所有的股票数据完成... time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    return all_quotes

    #股票数据处理

    def data_process(self, all_quotes):

    print("开始处理所有的股票..." + "\n")

    kdj = self.KDJ()

    start = timeit.default_timer()

    for quote in all_quotes:

    #划分股票类别

    if(quote['Symbol'].startswith('300')):

    quote['Type'] = '创业板'

    elif(quote['Symbol'].startswith('002')):

    quote['Type'] = '中小板'

    else:

    quote['Type'] = '主板'

    if('Data' in quote): #Data字段中存储了股票所需要的所有数据

    try:

    temp_data = []

    for quote_data in quote['Data']: #遍历当前股票每一天的数据

    if(quote_data['Volume'] != '000' or quote_data['Symbol'] in self.index_array): #如果成交量不等于0或股票不是大盘指数才能正常处理

    d = {}

    d['Open'] = float(quote_data['Open']) #开盘价

    ## d['Adj_Close'] = float(quote_data['Adj_Close'])

    d['Close'] = float(quote_data['Close']) #收盘价

    d['High'] = float(quote_data['High']) #最高价

    d['Low'] = float(quote_data['Low']) #最低价

    d['Volume'] = int(quote_data['Volume']) #成交量

    d['Date'] = quote_data['Date'] #时间字符串

    temp_data.append(d)

    quote['Data'] = temp_data

    except KeyError as e:

    print("Data Process: Key Error")

    print(e)

    print(quote)

    ## 计算5天、10天、20天、30天均线

    for quote in all_quotes:

    if('Data' in quote):

    try:

    for i, quote_data in enumerate(quote['Data']):

    if(i > 0):

    quote_data['Change'] = self.get_profit_rate(quote['Data'][i-1]['Close'], quote_data['Close']) #计算股票变化率

    quote_data['Vol_Change'] = self.get_profit_rate(quote['Data'][i-1]['Volume'], quote_data['Volume']) #计算股票成交量变化率

    else:

    quote_data['Change'] = None

    quote_data['Vol_Change'] = None

    last_5_array = []

    last_10_array = []

    last_20_array = []

    last_30_array = []

    for i, quote_data in enumerate(quote['Data']):

    last_5_array.append(quote_data['Close']) #记录收盘价,用来计算股票k日均值

    last_10_array.append(quote_data['Close']) #记录收盘价,用来计算股票k日均值

    last_20_array.append(quote_data['Close']) #记录收盘价,用来计算股票k日均值

    last_30_array.append(quote_data['Close']) #记录收盘价,用来计算股票k日均值

    quote_data['MA_5'] = None

    quote_data['MA_10'] = None

    quote_data['MA_20'] = None

    quote_data['MA_30'] = None

    if(i < 4): #不到5日,不开始计算

    continue

    if(len(last_5_array) == 5):

    last_5_array.pop(0)

    quote_data['MA_5'] = self.get_MA(last_5_array) #计算股票5日均值

    if(i < 9): #不到10日不开始计算

    continue

    if(len(last_10_array) == 10):

    last_10_array.pop(0)

    quote_data['MA_10'] = self.get_MA(last_10_array) #计算股票10日均值

    if(i < 19): #不到20日不开始计算

    continue

    if(len(last_20_array) == 20):

    last_20_array.pop(0)

    quote_data['MA_20'] = self.get_MA(last_20_array) #计算股票20日均值

    if(i < 29): #不到30日不开始计算

    continue

    if(len(last_30_array) == 30):

    last_30_array.pop(0)

    quote_data['MA_30'] = self.get_MA(last_30_array) #计算股票30日均值

    except KeyError as e:

    print("Key Error")

    print(e)

    print(quote)

    ## 计算KDJ指数

    for quote in all_quotes:

    if('Data' in quote):

    try:

    kdj.getKDJ(quote['Data']) #计算股票的KDJ指数

    except KeyError as e:

    print("Key Error")

    print(e)

    print(quote)

    print("所有的股票处理结束... time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    #数据导出

    def data_export(self, all_quotes, export_type_array, file_name):

    print("开始导出"+str(len(all_quotes))+"个股票数据")

    start = timeit.default_timer()

    directory = self.export_folder

    if(file_name is None):

    file_name = self.export_file_name

    if not os.path.exists(directory): #如果目录不存在

    os.makedirs(directory) #创建目录

    if(all_quotes is None or len(all_quotes) == 0):

    print("没有数据要导出...\n")

    if('json' in export_type_array): #如果导出类型中包含json,就导出json文件

    print("开始导出到json文件...\n")

    f = io.open(directory + '/' + file_name + '.json', 'w', encoding=self.charset) #使用指定的字符编码写入文件

    json.dump(all_quotes, f, ensure_ascii=False)

    if('csv' in export_type_array): #如果导出类型中包含csv,就导出csv文件

    print("开始导出到csv文件...\n")

    columns = []

    if(all_quotes is not None and len(all_quotes) > 0):

    columns = self.get_columns(all_quotes[0]) #获取csv文件第一行的列名

    writer = csv.writer(open(directory + '/' + file_name + '.csv', 'w', encoding=self.charset)) #使用指定的字符编码写入文件

    writer.writerow(columns) #写入列头作为第一行

    for quote in all_quotes: #将数据一次写入每一行

    if('Data' in quote):

    for quote_data in quote['Data']:

    try:

    line = []

    for column in columns:

    if(column.find('data.') > -1):

    if(column[5:] in quote_data):

    line.append(quote_data[column[5:]])

    else:

    line.append(quote[column])

    writer.writerow(line)

    except Exception as e:

    print(e)

    print("write csv error: " + quote)

    if('mongo' in export_type_array):

    print("开始导出到 MongoDB数据库...\n")

    print("导出数据完成.. time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    def file_data_load(self):

    print("开始从文件中加载数据..." + "\n")

    start = timeit.default_timer()

    directory = self.export_folder

    file_name = self.export_file_name

    f = io.open(directory + '/' + file_name + '.json', 'r', encoding=self.charset)

    json_str = f.readline()

    all_quotes_data = json.loads(json_str)

    print("文件中数据加载"+str(len(all_quotes_data))+"个股票完成... time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    return all_quotes_data

    #校验日期,检测date时间的股票数据是否已出爬出来了

    def check_date(self, all_quotes, date):

    is_date_valid = False

    for quote in all_quotes:

    if(quote['Symbol'] in self.index_array): #选取大盘数据,是因为大盘数据绝对会有,这就要求你爬虫时一定要把大盘数据获取下来

    for quote_data in quote['Data']: #获取一只股票每一天的数据行情

    if(quote_data['Date'] == date): #爬虫的股票数据中的时间和将要测试的时间要相同

    is_date_valid = True

    return is_date_valid

    if not is_date_valid:

    print(date + " 日期不存在数据...\n")

    return is_date_valid

    #根据选股策略,选择股票

    def quote_pick(self, all_quotes, target_date, methods):

    print(target_date+"选股启动..." + "\n")

    start = timeit.default_timer()

    results = []

    data_issue_count = 0 #有问题股票的数据个数

    for quote in all_quotes:

    try:

    if(quote['Symbol'] in self.index_array): #如果是大盘数据,自动选中

    results.append(quote) #将大盘数据加入到选中集合

    continue

    target_idx = None

    for idx, quote_data in enumerate(quote['Data']): #迭代遍历股票的数据

    if(quote_data['Date'] == target_date): #首先判断股票是否包含目标时间的数据

    target_idx = idx

    if(target_idx is None): #如果股票不包含目标时间数据,就错过该股票

    ## print(quote['Name'] + " data is not available at this date..." + "\n")

    data_issue_count+=1

    continue

    ## 选股策略 ##

    valid = False #默认是无效的

    for method in methods: #测试所有选股策略

    ## print(method['name'])

    ## null_check = eval(method['null_check'])

    try:

    value_check = eval(method['value_check']) #执行选股策略(选股策略就是python代码的字符串表达式)

    if(value_check):

    quote['Method'] = method['name'] #如果选股策略有效,这种将测试方法名加入到股票数据中,为了记录这只股票是通过什么策略选中的

    results.append(quote) #将选中的股票加入到结果集中

    valid = True

    break

    except:

    valid = False

    if(valid):

    continue

    ## 选股策略结束##

    except KeyError as e:

    ## print("KeyError: " + quote['Name'] + " data is not available..." + "\n")

    data_issue_count+=1

    print("选股完成... time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    print(str(data_issue_count) + " 个股票时间数据有问题...\n")

    return results

    #把根据选股策略选择的股票进行测试

    def profit_test(self, selected_quotes, target_date):

    print("启动股票策略测试..." + "\n")

    start = timeit.default_timer()

    results = []

    INDEX = None #沪深300股票

    INDEX_idx = 0 #股票目标日期的数据在日k线中的索引

    for quote in selected_quotes: #遍历每只选中的股票,找到沪深300指定日期的股票数据

    if(quote['Symbol'] == self.sh000300['Symbol']): #如果是沪深300

    INDEX = quote

    for idx, quote_data in enumerate(quote['Data']): #迭代股票每一天的数据,为了找到指定日期的数据

    if(quote_data['Date'] == target_date):

    INDEX_idx = idx

    break

    for quote in selected_quotes:

    target_idx = None

    if(quote['Symbol'] in self.index_array): #去除大盘股票

    continue

    #获取指定时间在股票数据中的索引,去除不包含指定日期的股票,这么没必要,因为在选取的时候就是按照包含指定日期选取的

    for idx, quote_data in enumerate(quote['Data']): #迭代股票每一天的数据

    if(quote_data['Date'] == target_date):

    target_idx = idx

    if(target_idx is None):

    print(quote['Name'] + " 的股票数据不可处理..." + "\n")

    continue

    test = {}

    test['Name'] = quote['Name'] #股票名称

    test['Symbol'] = quote['Symbol'] #股票编号

    test['Method'] = quote['Method'] #选股策略

    test['Type'] = quote['Type'] #股票类型:创业板,中小板,主板

    if('KDJ_K' in quote['Data'][target_idx]):

    test['KDJ_K'] = quote['Data'][target_idx]['KDJ_K'] #kdj指数

    test['KDJ_D'] = quote['Data'][target_idx]['KDJ_D'] #kdj指数

    test['KDJ_J'] = quote['Data'][target_idx]['KDJ_J'] #kdj指数

    test['Close'] = quote['Data'][target_idx]['Close'] #收盘价

    test['Change'] = quote['Data'][target_idx]['Change'] #变化率

    test['Vol_Change'] = quote['Data'][target_idx]['Vol_Change'] #成交量变化率

    test['MA_5'] = quote['Data'][target_idx]['MA_5'] #5日均值

    test['MA_10'] = quote['Data'][target_idx]['MA_10'] #10日均值

    test['MA_20'] = quote['Data'][target_idx]['MA_20'] #20日均值

    test['MA_30'] = quote['Data'][target_idx]['MA_30'] #30日均值

    test['Data'] = [{}]

    for i in range(1,11): #这里为什么用11

    if(target_idx+i >= len(quote['Data'])): #如果预测日期超出数据范围,则结束

    print(quote['Name'] + " 的数据在 "+target_date+"后" + str(i) + " 天的测试存在问题..." + "\n")

    break

    day2day_profit = self.get_profit_rate(quote['Data'][target_idx]['Close'], quote['Data'][target_idx+i]['Close']) #获取多日股价变化率

    test['Data'][0]['Day_' + str(i) + '_Profit'] = day2day_profit #记录股价变化率数据

    if(INDEX_idx+i < len(INDEX['Data'])):

    day2day_INDEX_change = self.get_profit_rate(INDEX['Data'][INDEX_idx]['Close'], INDEX['Data'][INDEX_idx+i]['Close'])

    test['Data'][0]['Day_' + str(i) + '_INDEX_Change'] = day2day_INDEX_change

    test['Data'][0]['Day_' + str(i) + '_Differ'] = day2day_profit-day2day_INDEX_change

    results.append(test)

    print("选股测试完成... time cost: " + str(round(timeit.default_timer() - start)) + "s" + "\n")

    return results

    #下载数据

    def data_load(self, start_date, end_date, output_types):

    all_quotes = self.load_all_quote_symbol() #下载所有的股票符号

    print("共 " + str(len(all_quotes)) + " 股票符号下载完成..." + "\n")

    self.load_all_quote_data(all_quotes, start_date, end_date) #下载所有的股票数据

    self.data_process(all_quotes) #数据处理

    self.data_export(all_quotes, output_types, None) #数据导出

    #数据预测

    def data_test(self, target_date, date_range, output_types):

    ## 加载选股策略

    methods = []

    path = self.testfile_path #选股策略文件路径

    ## 从mongodb数据库加载选股策略

    if(path == 'mongodb'):

    print("从Mongodb加载测试方法...\n")

    client = MongoClient(self.mongo_url, self.mongo_port)

    db = client[self.database_name]

    col = db[self.collection_name]

    q = None

    if(len(self.methods) > 0):

    applied_methods = list(map(int, self.methods.split(',')))

    q = {"method_id": {"$in": applied_methods}}

    for doc in col.find(q, ['name','desc','method']):

    print(doc)

    m = {'name': doc['name'], 'value_check': self.convert_value_check(doc['method'])}

    methods.append(m)

    ## 从文件加载选股策略(文件要求utf8编码)

    else:

    if not os.path.exists(path):

    print("选股策略测试文件不存在,测试取消...\n")

    return

    f = io.open(path, 'r', encoding='utf-8') #在保存选股策略文件时,记得保存成utf8编码格式的

    for line in f: #每行一个选股策略

    if(line.startswith('##') or len(line.strip()) == 0): #去除注释行

    continue

    line = line.strip().strip('\n')

    name = line[line.find('[')+1:line.find(']:')] #提取本次测试的代号,也就是每行最前面的[]里面的内容

    value = line[line.find(']:')+2:] #提取[]:后的内容——选股策略

    m = {'name': name, 'value_check': self.convert_value_check(value)}

    methods.append(m)

    if(len(methods) == 0):

    print("没有发现测试方法,测试取消...\n")

    return

    ## 启动选股策略测试

    all_quotes = self.file_data_load() #从文件中加载股票数据

    target_date_time = datetime.datetime.strptime(target_date, "%Y%m%d") #将目标时间字符串转化为时间对象

    for i in range(date_range):

    date = (target_date_time - datetime.timedelta(days=i)).strftime("%Y%m%d") #将未来时间字符串转换为指定时间格式

    is_date_valid = self.check_date(all_quotes, date) #校验时间

    if is_date_valid:

    selected_quotes = self.quote_pick(all_quotes, date, methods) #根据选策略选取股票

    res = self.profit_test(selected_quotes, date) #测试股票

    self.data_export(res, output_types, 'result_' + date) #测试结果导出

    def run(self):

    ## 输出数据类型

    output_types = []

    if(self.output_type == "json"):

    output_types.append("json")

    elif(self.output_type == "csv"):

    output_types.append("csv")

    elif(self.output_type == "all"):

    output_types = ["json", "csv"]

    ## 根据选定的日期范围抓取所有沪深两市股票的行情数据。

    if(self.reload_data == 'Y'):

    print("开始下载股票数据...\n")

    self.data_load(self.start_date, self.end_date, output_types)

    # 测试和生成选择策略

    if(self.gen_portfolio == 'Y'):

    print("开始选股测试...\n")

    self.data_test(self.target_date, self.test_date_range, output_types)

    启动文件main.py

    #coding:utf-8

    from stockholm import Stockholm

    import option

    import os

    def checkFoldPermission(path):

    try:

    if not os.path.exists(path):

    os.makedirs(path)

    else:

    txt = open(path + os.sep + "test.txt","w")

    txt.write("test")

    txt.close()

    os.remove(path + os.sep + "test.txt")

    except Exception as e:

    print(e)

    return False

    return True

    def main():

    args = option.parser.parse_args()

    if not checkFoldPermission(args.store_path): #检测是否具有读写存储文件的权限

    print('\n没有文件读写权限: %s' % args.store_path)

    else:

    print('股票数据爬虫和预测启动...\n')

    stockh = Stockholm(args) #初始化参数

    stockh.run() #启动

    print('股票数据爬虫和预测完成...\n')

    if __name__ == '__main__':

    main()

    展开全文
  • <div><h1>问题反馈 当您安装/使用QUANTAXIS的时候如果遇到任何问题, 您可以在这里提出,我们会在24小时内给您答复 您使用的QUANTAXIS版本号是什么? <pre><code> </code></pre> 您的系统信息(包括系统版本,系统架构...
  • 本文介绍如何使用采集器的智能模式,实时采集东方财富网行情中心新三板股票数据采集工具简介:后羿采集器是一款基于人工智能技术的网页采集器,只需要输入网址就能够自动识别网页数据,无需配置即可完成数据采集,是...

    本文介绍如何使用采集器的智能模式,实时采集东方财富网行情中心新三板股票数据

    采集工具简介:

    后羿采集器是一款基于人工智能技术的网页采集器,只需要输入网址就能够自动识别网页数据,无需配置即可完成数据采集,是业内首家支持三种操作系统(包括Windows、Mac和Linux)的网络爬虫软件。

    该软件是一款真正免费的数据采集软件,对采集结果导出没有任何限制,没有编程基础的小白用户也可轻松实现数据采集要求。

    官方网址:http://www.houyicaiji.com/

    采集对象简介:

    东方财富网是中国访问量最大、影响力最大的财经证券门户网站之一。 东方财富网致力于打造专业、权威、为用户着想的财经媒。东方财富网始终坚持网站内容的权威性和专业性,打造中国财经航母。网站内容涉及财经、股票、基金、期货、债券、外汇、银行、保险等诸多金融资讯与财经信息,全面覆盖财经领域,每日更新上万条最新数据及资讯,为用户提供便利的查询。

    官网网址:http://www.eastmoney.com/

    采集字段:

    代码、标题链接、成交额、名称、昨收、成交量、涨跌幅、今开、最高、最低 委比、最新价、涨跌额、采集时间

    功能点目录:

    什么是定时采集

    什么是自动入库

    采集结果预览:

    东方财富网行情中心股票数据导出到Excel

    东方财富网行情中心股票数据导出到数据库

    下面我们来详细介绍一下如何采集东方财富网行情中心新三板的股票数据,具体步骤如下:

    步骤一:下载安装后羿采集器,并注册登录

    1、打开后羿采集器官网,下载并安装最新版的后羿采集器

    2、点击注册登录,注册新账号,登录后羿采集器

    【温馨提示】您可以直接使用此款爬虫软件,不需要进行注册,但是匿名账户下的任务在切换到注册用户时会丢失,因此建议您注册后使用。后羿采集器为神箭手旗下产品,神箭手用户可直接登录。

    步骤二:新建采集任务

    1、复制东方财富网网址(需要搜索结果页的网址,而不是首页的网址)

    点此了解关于如何正确地输入网址。

    2、新建智能模式采集任务

    您可以在软件上直接新建采集任务,也可以通过导入规则来创建任务。

    点此了解如何导入和导出采集规则。

    步骤三:配置采集规则

    1、设置提取数据字段

    在智能模式下,我们输入网址后软件即可自动识别出页面上的数据并生成采集结果,每一类数据对应一个采集字段,我们可以右击字段进行相关设置,包括修改字段名称、增减字段、处理数据等。

    点此了解如何对采集字段进行配置。

    2、增加特殊字段

    由于我们需要实时采集数据,可以在字段内增加一个“采集时间”的特殊字段。

    步骤四:设置并启动采集任务

    1、设置采集任务

    完成了采集字段的添加,我们可以开始启动采集任务了。在启动之前我们需要对采集任务进行一些设置,从而提高采集的稳定性和成功率。

    点击“设置”按钮,在弹出的运行设置页面中我们可以进行运行设置和防屏蔽设置,这里我们勾选“跳过继续采集”,设置“2”秒请求等待时间,勾选“不加载网页图片”,防屏蔽设置就按照系统默认设置,然后点击保存。

    点此深入了解如何对采集任务进行配置。

    2、启动采集任务

    (1)设置定时采集

    我们需要实时采集股票信息,个人专业版及以上用户可以设置定时采集功能实时采集,我们设置隔一个小时自动采集一次数据。

    普通用户可以设置一个固定时间点去运行采集任务。

    (2)自动入库

    个人专业版及以上用户在使用定时采集时搭配自动入库使用,可将采集到的数据实时发送到数据库,方便用户使用数据。

    点此了解更多自动入库的详细内容。

    3、运行任务提取数据

    系统开始自动采集数据,等待一会儿,便可看到采集到的东方财富网行情中心股票的数据。首次运行结束之后,任务会每隔一个小时自动运行一次,并且会将数据自动发布到数据库。

    注意:软件关闭任务就无法运行,若要实时采集数据请不要关闭后羿采集器。

    步骤五:导出并查看数据

    数据采集完成后,可以导出数据,选择导出的文件类型,点击“确认导出”。

    如果没有使用数据库的用户,需要查看新下载下来数据,可以“右击任务—查看数据”。

    展开全文
  • 股票软件都提供自定义公式的功能,但因为常规股票软件都是点播数据,自定义公式对分时行情完全无效,要想更细腻的分析,只能找股票实时行情接口 一般来说,有如下3种版本 屌丝版 去门户财经频道采集数据,...

    股票软件都提供自定义公式的功能,但因为常规股票软件都是点播数据,自定义公式对分时行情完全无效,要想更细腻的分析,只能找股票实时行情接口

    一般来说,有如下3种版本

    1. 屌丝版

      去门户财经频道采集数据,百度,sina,搜狐,网易,和讯都有。首推sina,页面结构良好,同时提供js接口和动态gif绘制

      • json数据:
        1. >>curl http://hq.sinajs.cn/list=sh600133
        2. >>var hq_str_sh600133="东湖高新,5.41,5.38,5.43,5.46,5.35,5.43,5.44,2596381,1401227
        3. 6,15200,5.43,27310,5.42,8900,5.41,17700,5.40,13400,5.39,73295,5.44,82100,5.45,40
        4. 000,5.46,18100,5.47,59700,5.48,2013-04-17,15:03:10,00";
      • 日k线图: http://image.sinajs.cn/newchart/daily/n/sh600133.gif
      • 东湖高新K线图

      sina提供js data url 有很多,到sina网站上仔细研究即可

      另外 http://www.webxml.com.cn/zh_cn/web_services.aspx 通过webservice的方法提供了行情数据,免费的限制了每天的请求次数

      这两种方法都很屌丝,费电费流量,sina提供js接口数据只是为了自家渲染页面用,处理能力有限。只适合用来练习编程

    2. 破解版

      各种股票软件都可以下载历史行情数据,目前他们是以自定义文件格式存放这些数据,有爱好者研究了这些格式,分析通达信的居多,相关资料如下

      这种方法也比较费电,淘宝上有excel格式的历史行情数据出售,不贵

    3. 官方版

      1)寻找合作券商,接入券商量化接口,但一般有资金门槛

      2)直接去证交所买,Level 1 实时行情会员价大概一年30多万 (以官方价格为准),Level 2行情还需要对终端收费,如果做一个正式的财经网站,这是最好的选择

    获取股票实时行情最佳方案

    对个人玩家来说,上面3种方法要么不稳定,要么太贵,其实还有一种性价比更高的方法

    互联网当中存在几家股票行情信息供应商,分别是

    它们历史悠久,非常低调,这些数据接口在淘宝上包年价 100-200元不等,提供sdk(.h头文件和dll),编程难度一般,希望有爱好者能够用ctype把这些接口包装一下,毕竟用python处理数据要容易很多

    同时,这股票实时行情数据源还能够以全推的方式推送行情数据,这样才可以实现 对分时行情编写公式进行筛选

    tips:什么是全推和点播

    交易所的实时行情由于历史原因,一直都是dbf文件格式,比如说上海交易所的实时行情就是著名的show2003.dbf

    行情数据传输以前是通过亚洲X号卫星组网传输,现在是光纤为主,卫星备网

    另外还有一种电视图文卡,也提供行情信息,插在电视上之后,可以在电视上播放行情信息,这种形式很快就会消失了

    参考:https://www.cnblogs.com/timxgb/p/4735436.html

    转载于:https://www.cnblogs.com/GavinSimons/p/10802221.html

    展开全文
  • 可以利用tushare这个库,这个库拥有丰富的数据内容,包括股票、基金、期货、数字货币等,完成了数据从采集、清洗到存储的全过程,能够为金融分析人员提供整洁、多样、便于分析的数据,下面我简单介绍一下这个库的...

    可以利用tushare这个库,这个库拥有丰富的数据内容,包括股票、基金、期货、数字货币等,完成了数据从采集、清洗到存储的全过程,能够为金融分析人员提供整洁、多样、便于分析的数据,下面我简单介绍一下这个库的安装和使用过程,实验环境win10+python3.6+pycharm5.0,主要内容如下:

    1.安装tushare,这个直接在cmd窗口pip install安装就行,如下:

    2.新版的tushare使用,需要到官网注册,获取token后,才能使用,注册的话,直接到官网注册就行,地址https://tushare.pro/,输入必要信息就行,如下:

    登陆成功后,进入“个人主页”,点击TOKEN,获取token,后面的程序中都要使用到这个token,如下:

    3.接着就是获取股票行情信息了,这里tushare官网提供了非常简单入门的示例,初学者很容易就能掌握,如下,这里简单介绍一下:获取股票日线行情数据,这里主要用到daily这个函数,输入参数为ts_code股票代码、trade_date交易日期、start_date开始日期、end_date结束日期,输出为开盘价、最高价、最低价、涨跌额、成交量等,代码如下:

    程序截图如下:

    获取股票复权因子数据,主要用到adj_factor这个函数,输入参数与daily函数一样,输出为股票代码、交易日期、复权因子等,代码如下:

    程序运行截图:

    获取股票停复牌信息,主要用到suspend这个函数,输入参数ts_code股票代码、suspend_date停牌日期、resume_date复牌日期,输出为股票代码、公告日期、停牌原因等,代码如下:

    程序运行截图如下:

    获取股票每日指标信息,主要用到daily_basic函数,输入参数与daily函数类似,输出为当日收盘价、换手率、市盈率、市销率、总股本、总市值、流通市值等,代码如下:

    程序运行截图:

    目前,就介绍这几个吧,更多示例,可以看tushare官网给出的教程,非常详细,地址https://tushare.pro/document/2,我这里就不详细做介绍了,老版的tushare接口,官方不再维护了,有些还能使用,有些不能正常使用,而且不稳定,建议还是使用新的接口能更好些,至于后期的数据保存,官网也有详细介绍,像存储到csv,excel,mysql等,感兴趣的可以看看,希望以上分享的内容能对你有所帮助吧。

    展开全文
  • 查看采集结果 二、详细过程 1.配置数据库信息 建表语句, 以其中部分字段为例: CREATE TABLE `stockmarket` ( `date` varchar(12) NOT NULL DEFAULT '' COMMENT '时间', `stockCode` ...
  • 编程:获取股票实时行情数据大全

    千次阅读 2015-08-17 01:57:00
    编程:获取股票实时行情数据大全 股票软件都提供自定义公式的功能,但因为常规股票软件都是点播数据,自定义公式对分时行情完全无效,要想更细腻的分析,只能找股票实时行情接口 一般来说,有如下3种版本 ...
  • Tushare简介Tushare是一个免费开源的python财经数据接口包,主要能够实现对股票、期货等金融数据从数据采集、清洗加工到数据存储的过程,能够为金融分析人员提供快速、整洁和多样的便于分析的数据,为他们在数据获取...
  • 主要实现对股票等金融数据从数据采集、清洗加工到数据存储的过程,能够为金融分析人员提供快速、整洁、和多样的便于分析的数据,为他们在数据获取方面极大地减轻工作量,使他们更加专注于策略和模型的研究与实现上。...
  • 《QQ群搜索采集工具V2》是智力软件旗下采集类产品(目前已经升级到V2),...例如:想搜索股票QQ群,可以把股票细分:股票交流、股票行情、股票软件、股票资讯等。如SEO的群,可以分为:SEO优化、网站优化等!(大家可
  • 使用Python下载A股行情的几种方法这几种...主要实现对股票等金融数据从数据采集、清洗加工 到 数据存储的过程,能够为金融分析人员提供快速、整洁、和多样的便于分析的数据,为他们在数据获取方面极大地减轻工作量...
  • 本源码是微交易点位盘/时间盘学习的最佳版本,包含期货/外汇/股票等接口行情数据采集,微盘的涨跌显示,模拟操盘等核心内容代码。
  • 网页可分为信息提供和业务操作类,信息提供如新闻、股票行情之类的网站。业务操作如网上营业厅、OA之类的。当然,也有很多网站同时具有这两种性质,像微博、豆瓣、淘宝这类网站,既提供信息,也实现某些业务。 普通...
  • 草动股票网-ASP.NET整站源码

    热门讨论 2010-08-06 13:28:36
    这不是一个纯粹的采集程序,股票信息是实时获取的。 首页是一个股票搜索引擎,可快速查找到某个股票的详情,无需注册会员, 用户的搜索记录和个股收藏自动保存在Cookie中,下次访问时自动调出。 使用VisualStudio ...
  • 网页可分为信息提供和业务操作类,信息提供如新闻、股票行情之类的网站。业务操作如网上营业厅、OA之类的。当然,也有很多网站同时具有这两种性质,像微博、豆瓣、淘宝这类网站,既提供信息,也实现某些业务。普通...
  • 主要实现对股票等金融数据从数据采集、清洗加工到数据存储的过程,能够为金融分析人员提供快速、整洁和多样的便于分析的数据,为他们在数据获取方面极大地减轻工作量,使他们更加专注于策略和模型的研究与实现上。...
  • 主要实现对股票等金融数据从数据采集、清洗加工 到 数据存储的过程,能够为金融分析人员提供快速、整洁、和多样的便于分析的数据,为他们在数据获取方面极大地减轻工作量,使他们更加专注于策略和模型的研究与实现上...
  • 修正了幻灯图片不能后台添加修改的问题修正了财经视频直播视频不能播放的问题修改滚动行情不能自动更新并改为全球主要股指自动实时更新增强后台木马在线检测系统修正了黑客利用后台或会员上传功能上传木马的BUG修正...
  • Windows API一日一练(53)CreateFile函数

    万次阅读 2007-10-18 22:22:00
    比如每天的股票行情数据需要保存起来,以便生成K线图。比如游戏客户端的LOG需要保存起,以便客户端出错时可以把LOG发送回来分析它出错的原因。比如银行每天进行交易时,也需要把所有交易的数据保存到文件备份起来,...
  • 本实验室主要进行金融数据分析 ,包括股票、基金、债券、...项目介绍: 将现有市场上的股票数据采集到Oracle数据库中,运用各种分析工具和手段,为金融市场的投资提供有效、及时、准确的投资建议。 梦想着我的系统
  • 新增了多达超过100种股票实时行情系统 超过数十个频道及200多个栏目 新增了众多标签及文章可视化管理,让您使用更简单管理更方便 新增了MSSQL数据库为你的网站添加无限动力,Access或MSSQL数据库任选 新增了后台...
  • 可以让用户更方便买卖股票,不担心因行情走势延时而亏损。 2、买入股票。用户可以买入任何一只正在交易的股票。资金即时结算。当天买入的股票次**卖出。权证实行T 0,买卖无限制 3、卖出股票。用户可以卖出持仓的...
  • CreateFile函数

    2016-06-22 06:10:20
    比如每天的股票行情数据需要保存起来,以便生成K线图。比如游戏客户端的LOG需要保存起,以便客户端出错时可以把LOG发送回来分析它出错的原因。比如银行每天进行交易时,也需要把所有交易的数据保存到文件备份起来,...
  • 比如每天的股票行情数据需要保存起来,以便生成K线图。比如游戏客户端的LOG需要保存起,以便客户端出错时可以把LOG发送回来分析它出错的原因。比如银行每天进行交易时,也需要把所有交易的数据保存到文件备份起来,...
  • 主要实现对股票等金融数据从数据采集、清洗加工 到 数据存储的过程,能够为金融分析人员提供快速、整洁、和多样的便于分析的数据,为他们在数据获取方面极大地减轻工作量,使他们更加专注于策略和模型的研究与实现上...

空空如也

空空如也

1 2 3
收藏数 53
精华内容 21
关键字:

股票行情采集