• 当数据膨胀到一定规模时,基于机器学习对海量复杂数据的分析更能产生较好的价值,而深度学习大数据场景下更能揭示数据内部的逻辑关系。本文就以大数据作为场景,通过自底向上的教程详述在大数据架构体系中如何应用...

    近几年,信息时代的快速发展产生了海量数据,诞生了无数前沿的大数据技术与应用。在当今大数据时代的产业界,商业决策日益基于数据的分析作出。当数据膨胀到一定规模时,基于机器学习对海量复杂数据的分析更能产生较好的价值,而深度学习在大数据场景下更能揭示数据内部的逻辑关系。本文就以大数据作为场景,通过自底向上的教程详述在大数据架构体系中如何应用深度学习这一技术。大数据架构中采用的是hadoop系统以及Kerberos安全认证,深度学习采用的是分布式的Tensorflow架构,hadoop解决了大数据的存储问题,而分布式Tensorflow解决了大数据训练的问题。本教程是我们团队在开发基于深度学习的实时欺诈预警服务时,部署深度学习这一模块时总结出的经验,感兴趣的欢迎深入交流。

    安装Tensorflow

    我们安装Tensorflow选择的是Centos7,因为Tensorflow需要使用GNU发布的1.5版本的libc库,Centos6系统并不适用该版本库而被抛弃。对于如何联网在线安装Tensorflow,官网有比较详尽的教程。本教程着重讲一下网上资料较少的离线安装方式,系统的安装更需要在意的是各软件版本的一致性,下面教程也是解决了很多版本不一致的问题后给出的一个方案。首先我们先将整个系统搭建起来吧。

    1.安装编程语言Python3.5:在官网下载软件并解压后执行如下安装命令:

    ./configure
     make
     make test
    sudo make install

    2.安装基于Python的科学计算包python-numpy:在官网下载软件并解压后执行如下安装命令:

    python setup.py install

    3.安装Python模块管理的工具wheel:在官网下载软件后执行如下安装命令:

    pip install wheel-0.30.0a0-py2.py3-none-any.whl

    4.安装自动下载、构建、安装和管理 python 模块的工具setuptools:在官网下载软件并解压后执行如下安装命令:

    python setup.py install

    5.安装Python开发包python-devel:在官网下载软件后执行如下安装命令:

    sudo rpm -i --nodeps python3-devel-3.5.2-4.fc25.x86_64.rpm

    6.安装Python包安装管理工具six:在官网下载软件后执行如下安装命令:

    sudo pip install six-1.10.0-py2.py3-none-any.whl

    7.安装Java 开发环境JDK8:在官网下载软件并解压后执行如下移动命令:

    mv java1.8 /usr/local/software/jdk

    设置JDK的环境变量,编辑文件 .bashrc,加入下面内容

    export JAVA_HOME=/usr/local/software/jdk
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=$CLASSPATH:${JAVA_HOME}/lib:${JRE_HOME}/lib
    export PATH=$PATH:${JAVA_HOME}/bin

    进行Java版本的切换,选择对应的版本

    sudo update-alternatives --config java
    sudo update-alternatives --config javac

    8.安装Bazel:Bazel是一个类似于Make的工具,是Google为其内部软件开发的特点量身定制的工具,构建Tensorflow项目。在官网下载后执行如下安装命令:

    chmod +x bazel-0.4.3-installer-linux-x86_64.sh
    ./bazel-0.4.3-installer-linux-x86_64.sh –user

    9.安装Tensorflow:在官网下载软件后执行如下安装命令:

    pip install --upgrade tensorflow-0.12.1-cp35-cp35m-linux_x86_64.whl

    Tensorflow访问HDFS的部署

    1.首先安装Hadoop客户端,在官网下载后执行下面解压移动命令:

    tar zxvf hadoop-2.6.0.tar.gz
    mv hadoop-2.6.0.tar.gz /usr/local/software/Hadoop

    进行环境变量的配置/etc/profile,加入如下内容

    export PATH=$PATH:/usr/local/software/hadoop/bin
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server
    export HADOOP_HOME=/usr/local/software/hadoop
    export HADOOP_HDFS_HOME=/usr/local/software/hadoop

    配置完后进行配置更新source /etc/profile

    2.其次,安装完客户端后,配置自己的hadoop集群环境文件。

    Tensorflow与Kerberos验证的部署

    在Tesorflow0.12版本中已经支持了Kerberos验证,本机只要配置好Kerberos文件即可使用。该文中不详述Kerberos的配置内容,罗列一下相关的配置流程。

    • 首先在/etc/krb5.conf文件中进行服务器跟验证策略的配置;
    • 然后在Kerberos服务端生成一个用户文件传至本机;
    • 最后进行Kerberos客户端的权限认证并设置定时任务。

    大数据场景下基于分布式Tensorflow的深度学习示例

    一、进行数据格式的转换

    本文的示例是做的MNIST数据的识别模型,为了更好的读取数据更好的利用内存,我们将本地GZ文件转换成Tensorflow的内定标准格式TFRecord,然后再将转换后的文件上传到HDFS存储。在实际应用中,我们实际利用Spark做了大规模格式转换的处理程序。我们对本地数据处理的相应的转换代码为:

    from __future__ import absolute_import
    from __future__ import division
    from __future__ import print_function
    import argparse
    import os
    import tensorflow as tf
    from tensorflow.contrib.learn.python.learn.datasets import mnist
    SOURCE_URL = 'http://yann.lecun.com/exdb/mnist/'
    TRAIN_IMAGES = 'train-images-idx3-ubyte.gz'  # MNIST filenames
    TRAIN_LABELS = 'train-labels-idx1-ubyte.gz'
    TEST_IMAGES = 't10k-images-idx3-ubyte.gz'
    TEST_LABELS = 't10k-labels-idx1-ubyte.gz'
    FLAGS = None
    def _int64_feature(value):
      return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
    
    def _bytes_feature(value):
      return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
    
    def convert_to(data_set, name):
      images = data_set.images
      labels = data_set.labels
      num_examples = data_set.num_examples
      if images.shape[0] != num_examples:
        raise ValueError('Images size %d does not match label size %d.' %
                         (images.shape[0], num_examples))
      rows = images.shape[1]
      cols = images.shape[2]
      depth = images.shape[3]
      filename = os.path.join(FLAGS.directory, name + '.tfrecords')
      print('Writing', filename)
      writer = tf.python_io.TFRecordWriter(filename)
      for index in range(num_examples):
        image_raw = images[index].tostring()
        example = tf.train.Example(features=tf.train.Features(feature={
            'height': _int64_feature(rows),
            'width': _int64_feature(cols),
            'depth': _int64_feature(depth),
            'label': _int64_feature(int(labels[index])),
            'image_raw': _bytes_feature(image_raw)}))
        writer.write(example.SerializeToString())
      writer.close()
    
    def main(argv):
      # Get the data.
      data_sets = mnist.read_data_sets(FLAGS.directory,
                                       dtype=tf.uint8,
                                       reshape=False,
                                       validation_size=FLAGS.validation_size)
      # Convert to Examples and write the result to TFRecords.
      convert_to(data_sets.train, 'train')
      convert_to(data_sets.validation, 'validation')
      convert_to(data_sets.test, 'test')
    if __name__ == '__main__':
      parser = argparse.ArgumentParser()
      parser.add_argument(
          '--directory',
          type=str,
          default='/tmp/data',
          help='Directory to download data files and write the converted result'
      )
      parser.add_argument(
          '--validation_size',
          type=int,
          default=5000,
          help="""\
          Number of examples to separate from the training data for the validation
          set.\
          """
      )
      FLAGS = parser.parse_args()
      tf.app.run()

    二、Tensorflow读取HDFS数据的设置

    文中前面内容介绍了HDFS的配置以及将数据转换后存储到HDFS,Tensorflow读取HDFS时只需要简单的两步,首先执行项目时需要加入环境前缀:

    CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python example.py

    其次读取数据时,需要在数据的路径前面加入HDFS前缀,比如:

    hdfs://default/user/data/example.txt

    三、分布式模型的示例代码

    该示例代码是读取HDFS上的MNIST数据,建立相应的server与work集群构建出一个三层的深度网络,包含两层卷积层以及一层SoftMax层。代码如下:

    from __future__ import print_function
    import math
    import os
    import tensorflow as tf
    flags = tf.app.flags
    # Flags for configuring the task
    flags.DEFINE_string("job_name", None, "job name: worker or ps")
    flags.DEFINE_integer("task_index", 0,
                         "Worker task index, should be >= 0. task_index=0 is "
                         "the chief worker task the performs the variable "
                         "initialization")
    flags.DEFINE_string("ps_hosts", "",
                        "Comma-separated list of hostname:port pairs")
    flags.DEFINE_string("worker_hosts", "",
                        "Comma-separated list of hostname:port pairs")
    # Training related flags
    flags.DEFINE_string("data_dir", None,
                        "Directory where the mnist data is stored")
    flags.DEFINE_string("train_dir", None,
                        "Directory for storing the checkpoints")
    flags.DEFINE_integer("hidden1", 128,
                         "Number of units in the 1st hidden layer of the NN")
    flags.DEFINE_integer("hidden2", 128,
                         "Number of units in the 2nd hidden layer of the NN")
    flags.DEFINE_integer("batch_size", 100, "Training batch size")
    flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
    FLAGS = flags.FLAGS
    TRAIN_FILE = "train.tfrecords"
    NUM_CLASSES = 10
    IMAGE_SIZE = 28
    IMAGE_PIXELS = IMAGE_SIZE * IMAGE_SIZE
    
    def inference(images, hidden1_units, hidden2_units):
      with tf.name_scope('hidden1'):
        weights = tf.Variable(
            tf.truncated_normal([IMAGE_PIXELS, hidden1_units],
                                stddev=1.0 / math.sqrt(float(IMAGE_PIXELS))),name='weights')
        biases = tf.Variable(tf.zeros([hidden1_units]),name='biases')
        hidden1 = tf.nn.relu(tf.matmul(images, weights) + biases)
      with tf.name_scope('hidden2'):
        weights = tf.Variable(
            tf.truncated_normal([hidden1_units, hidden2_units],
                                stddev=1.0 / math.sqrt(float(hidden1_units))),
            name='weights')
        biases = tf.Variable(tf.zeros([hidden2_units]),
                             name='biases')
        hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
      with tf.name_scope('softmax_linear'):
        weights = tf.Variable(
            tf.truncated_normal([hidden2_units, NUM_CLASSES],
                                stddev=1.0 / math.sqrt(float(hidden2_units))),name='weights')
        biases = tf.Variable(tf.zeros([NUM_CLASSES]),name='biases')
        logits = tf.matmul(hidden2, weights) + biases
      return logits
    
    def lossFunction(logits, labels):
      labels = tf.to_int64(labels)
      cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
          logits, labels, name='xentropy')
      loss = tf.reduce_mean(cross_entropy, name='xentropy_mean')
      return loss
    
    def training(loss, learning_rate):
      tf.summary.scalar(loss.op.name, loss)
      optimizer = tf.train.GradientDescentOptimizer(learning_rate)
      global_step = tf.Variable(0, name='global_step', trainable=False)
      train_op = optimizer.minimize(loss, global_step=global_step)
      return train_op
    
    def read_and_decode(filename_queue):
      reader = tf.TFRecordReader()
      _, serialized_example = reader.read(filename_queue)
      features = tf.parse_single_example(
          serialized_example,
          # Defaults are not specified since both keys are required.
          features={
              'image_raw': tf.FixedLenFeature([], tf.string),
              'label': tf.FixedLenFeature([], tf.int64),
          })
    
      # Convert from a scalar string tensor (whose single string has
      # length mnist.IMAGE_PIXELS) to a uint8 tensor with shape
      # [mnist.IMAGE_PIXELS].
      image = tf.decode_raw(features['image_raw'], tf.uint8)
      image.set_shape([IMAGE_PIXELS])
      image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
      # Convert label from a scalar uint8 tensor to an int32 scalar.
      label = tf.cast(features['label'], tf.int32)
      return image, label
    
    def inputs(batch_size):
      """Reads input data.
    
      Args:
        batch_size: Number of examples per returned batch.
      Returns:
        A tuple (images, labels), where:
        * images is a float tensor with shape [batch_size, mnist.IMAGE_PIXELS]
          in the range [-0.5, 0.5].
        * labels is an int32 tensor with shape [batch_size] with the true label,
          a number in the range [0, mnist.NUM_CLASSES).
      """
      filename = os.path.join(FLAGS.data_dir, TRAIN_FILE)
    
      with tf.name_scope('input'):
        filename_queue = tf.train.string_input_producer([filename])
        # Even when reading in multiple threads, share the filename
        # queue.
        image, label = read_and_decode(filename_queue)
        # Shuffle the examples and collect them into batch_size batches.
        # (Internally uses a RandomShuffleQueue.)
        # We run this in two threads to avoid being a bottleneck.
        images, sparse_labels = tf.train.shuffle_batch(
            [image, label], batch_size=batch_size, num_threads=2,
            capacity=1000 + 3 * batch_size,
            # Ensures a minimum amount of shuffling of examples.
            min_after_dequeue=1000)
        return images, sparse_labels
    
    def device_and_target():
      # If FLAGS.job_name is not set, we're running single-machine TensorFlow.
      # Don't set a device.
      if FLAGS.job_name is None:
        raise ValueError("Must specify an explicit `job_name`")
      # Otherwise we're running distributed TensorFlow.
      print("Running distributed training")
      if FLAGS.task_index is None or FLAGS.task_index == "":
        raise ValueError("Must specify an explicit `task_index`")
      if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":
        raise ValueError("Must specify an explicit `ps_hosts`")
      if FLAGS.worker_hosts is None or FLAGS.worker_hosts == "":
        raise ValueError("Must specify an explicit `worker_hosts`")
      cluster_spec = tf.train.ClusterSpec({
          "ps": FLAGS.ps_hosts.split(","),
          "worker": FLAGS.worker_hosts.split(","),
      })
      server = tf.train.Server(
          cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
      return (
          cluster_spec,
          server,
      )
    
    def main(unused_argv):
      if FLAGS.data_dir is None or FLAGS.data_dir == "":
        raise ValueError("Must specify an explicit `data_dir`")
      if FLAGS.train_dir is None or FLAGS.train_dir == "":
        raise ValueError("Must specify an explicit `train_dir`")
      cluster_spec, server = device_and_target()
      if FLAGS.job_name == "ps":
          server.join()
      elif FLAGS.job_name == "worker":
          with tf.device(tf.train.replica_device_setter(worker_device = "/job:worker/task:{}".format(FLAGS.task_index), cluster=cluster_spec)):
            images, labels = inputs(FLAGS.batch_size)
            logits = inference(images, FLAGS.hidden1, FLAGS.hidden2)
            loss = lossFunction(logits, labels)
            train_op = training(loss, FLAGS.learning_rate)
          with tf.train.MonitoredTrainingSession(
              master=server.target,
              is_chief=(FLAGS.task_index == 0),
              checkpoint_dir=FLAGS.train_dir) as sess:
            while not sess.should_stop():
              sess.run(train_op)
    
    if __name__ == "__main__":
      tf.app.run()

    四、分布式模型的启动

    首先关闭防火墙

    sudo iptable –F

    然后在不同的机器上面启动服务

    #在246.1机器上面运行参数服务器,命令:
    CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py --ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=ps --task_index=0
    
    
    #在78.41机器上面运行worker0,命令:
    CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py --ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=worker --task_index=0
    
    #在78.45机器上面运行worker1,命令:
    CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) python /home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py--ps_hosts=10.142.246.1:1120 --worker_hosts=10.142.78.41:1121,10.142.78.45:1122 --data_dir=hdfs://default/user/bdusr01/asy/MNIST_data --train_dir=/home/bdusr01/checkpoint/ --job_name=worker --task_index=1
    
    #在78.41机器上面运行监控,命令:
    tensorboard --logdir=/home/bdusr01/checkpoint/

    五、模型监控

    我们在刚刚的41机器上面启动了TensorBoard,可以通过地址http://10.142.78.41:6006/进行模型的监控。模型训练过程中参数可以动态的进行观测,示例如下:

    图片描述

    模型的网络结构可以详细的参看每个细节,示例如下:

    图片描述

    当我们利用分布式的Tensorflow对大数据进行训练完成后,可以利用Bazel构建一个灵活高可用的服务–TensorFlow Serving,能够很方便的将深度学习生产化,解决了模型无法提供服务的弊端。到此为止,本文就将自己项目中的一个基础模块的示例介绍完了,本项目更有含金量的是模型建立、工程开发、业务逻辑部分,如有机会再进行更详细的交流作者:丁廷鹤,硕士期间在复旦大学计算机学院上海市智能信息重点实验室从事数据挖掘学习,目前在上海一家央企总部工作,从事大数据领域spark全栈模块、机器学习、深度学习方面的开发和研究。

    转载:http://geek.csdn.net/news/detail/136629

    展开全文
  • 深度学习大数据 2019-09-13 14:59:14
    在数据呈指数增长的这个数字世界中,深度学习大数据是最为热门的两个技术趋势。深度学习大数据是数据科学领域相互关联的两个话题,而在技术发展方面,两者紧密关联且同样重要。 数字数据和云存储遵循名为摩尔...
  • 大数据学习框架及指南 2018-07-28 14:42:49
    Hadoop生态圈 一 ,采集,数据从哪里来?主要包括flume等; 一 ,存储,海量的数据怎样有效的存储?主要包括hdfs、Kafka; 二,计算,海量的数据怎样快速计算?主要包括MapReduce、Spark、storm等;...
  • 大数据深度学习 2018-06-01 15:03:33
    大数据是我们现在经常听到的一个词,在互联网时代迅速发展的今天,大数据的应用范围越来越广,但是深度学习这个词对于很多人来说是比较陌生的,深度学习是什么,是一种要求还是一种技术,这种技术与我们日常可能听到...
  • 大数据几个框架整理 2018-03-07 09:55:27
    大数据整理:一、大数据框架: Impala:hadoop的sql平台、支持hbase/hdfs、支持超大数据、支持多并发、sql支持好、对内存依赖比较严重。需要自己优化,并且有的语句超过内存会报错。 Spark:各种格式、各种计算...
  • BigDL:用于大数据的分布式深度学习框架 摘要 在本文中,作者提出了用于大数据平台和大数据工作流的分布式深度学习框架BigDL。它是基于Apache Spark实现的,允许用户将其深度学习应用程序编写为标准的Spark程序(以...
  • Hadoop、Spark等5种大数据框架对比,你的项目该用哪种?  本文将介绍并对比5种主流大数据框架,助你更深层次了解这些框架,从而在项目中更好地使用它们。  本文首发于InfoQ垂直号「大数据杂谈」,转载...
  • Hadoop、Spark等5种大数据框架对比,你的项目该用哪种?  本文将介绍并对比5种主流大数据框架,助你更深层次了解这些框架,从而在项目中更好地使用它们。  本文首发于InfoQ垂直号「大数据杂谈」,转载已获授权。...
  • 进入学习群,获取唐宇迪...本课程从Tensorflow安装开始讲起,从基本计算结构到深度学习各大神经网络,全程案例代码实战,一步步带大家入门如何使用深度学习框架Tensorflow,玩转Tensorflow模型训练、等所有知识点。
  • 大数据开源框架技术汇总 2019-08-13 11:11:41
    主要基于对现阶段一些常用的大数据开源框架技术的整理,只是一些简单的介绍,并不是详细技术梳理。可能会有疏漏,发现再整理。参考的太多,就不一一列出来了。这只是作为一个梳理,对以后选型或者扩展的做个参考。 ...
  • 深度学习框架caffe入门,详解网络配置中每一个层的结构和参数项,对于超参数配置文件详解每一个参数的含义以及选择策略。对于数据源实例演示了两种常用的数据源LMDB和HDF5格式。课程涉及许多caffe框架的小技巧如...
  • 这是深度学习500问系列笔记之一,帮助我深入记忆知识,如有不足,随时欢迎交流和探讨。 5.大数据深度学习的关系? 大数据通常被定义为“超出常用软件工具捕获,管理和处理能力”的数据集。 机器学习关心的问题...
  • 简介 大数据是收集、整理、处理大...本文将介绍大数据系统一个最基本的组件:处理框架。处理框架负责对系统中的数据进行计算,例如处理从非易失存储中读取的数据,或处理刚刚摄入到系统中的数据。数据的计算则
  • 主流深度学习框架对比 2017-02-20 20:20:35
    深度学习研究的热潮持续高涨,各种开源深度学习框架也层出不穷,其中包括TensorFlow、Caffe、Keras、CNTK、Torch7、MXNet、Leaf、Theano、DeepLearning4、Lasagne、Neon,等等。然而TensorFlow却杀出重围,在关注度...
  • 大数据框架 2016-01-13 10:37:20
    2、Hadoop在大数据、云计算中的位置和关系 3、国内外Hadoop应用案例介绍 4、国内Hadoop的就业情况分析及课程大纲介绍 5、分布式系统概述 6、Hadoop生态圈以及各组成部分的简介 7、Hadoop核心MapReduce例子说明 ...
  • 技术型的高科技创业公司都喜欢闪闪发光的新东西,而...2015年数据世界中时尚年轻人喜欢转移到AI的相关概念,他们口味变成:机器智能,深度学习等。 在这里还是要推荐下我自己建的大数据学习交流群:199427210...
  • Apsara Clouder大数据专项技能认证:使用深度学习TensorFlow框架进行图片识别本认证系统的介绍了深度学习的一些基础知识,以及Tensorflow的工作原理。通过阿里云机器学习PAI基于经典的CIFAR-10数据集实现图片识别。...
  • 深度学习与计算机视觉 算法原理、框架应用》全书共13章,分为2篇,第1篇基础知识,第2篇实例精讲。用通俗易懂的文字表达公式背后的原理,实例部分提供了一些工具,很实用。 《大数据架构详解:从数据获取到深度...
1 2 3 4 5 ... 20
收藏数 27,944
精华内容 11,177