精华内容
下载资源
问答
  • spark screaming 模拟实战项目实例

    千次阅读 2018-05-03 23:15:43
    由于没有网络日志,我们这里用之前写的python脚本爬取新浪微博热搜模拟产生日志文件,通过kafka和flume整合 将日志定时抽取到 spark上进行处理,微博热搜是十分钟更新一次,我们这里也设置十分钟的定时任务,具体...

    由于没有网络日志,我们这里用之前写的python脚本爬取新浪微博热搜模拟产生日志文件,通过kafka和flume整合 将日志定时抽取到 spark上进行处理,微博热搜是十分钟更新一次,我们这里也设置十分钟的定时任务,具体步骤如下

    第一步

    编写python脚本获取微博热搜 实时排名,主题和url,然后运行测试,代码如下

    #!python2
    # -*- coding:utf-8 -*-
    import urllib,requests,re,sys
    
    #获取热搜源码
    
    weiboHotFile=requests.get('http://s.weibo.com/top/summary')
    weiboHotHtml=weiboHotFile.text
    #正则表达式匹配URL ,找到title
    hotKey=re.compile(r'td class=\\"td_05\\"><a href=\\"\\/weibo\\/(.*?)&Refer=top\\"')
    hotKeyListBe=hotKey.findall(weiboHotHtml)
    rank=1
    #遍历获取的title 列表
    for title in hotKeyListBe:
        #去除干扰数字
        title=title.replace('25','')
        url='http://s.weibo.com/weibo/'+title   
        print(str(rank)+'\t'+(str(urllib.unquote(title.encode('utf-8'))).decode('utf-8'))+'\t'+url+'\n')
        rank+=1    

    运行python脚本



    第二步

    在mysql创建表,包括当天最高排名,主题,url,和日期

    #spark screaming 将数据导入这张表,创建 主题索引,用来加快替换排名的速度
    Create  table weiboHotSearch(
    highest_rank int(4),
    title varchar(100) unique,
    url varchar(100),
    day_date date);
    #创建临时表,每天晚上数据导入表中的当天的数据加载到零时表
    Create table weiboHotSearch_temp(
    highest_rank int(4),
    title varchar(100),
    url varchar(100),
    day_date date);
    


    第三步

    编写代码,实现从kafka实时获取热搜榜,并存入数据库,然后打成jar包

    package com.stanley.sparktest.weibo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.kafka.KafkaUtils
    import kafka.serializer.StringDecoder
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.Row
    import java.util.Date
    import com.stanley.sparktest.sqlUtil.ConnectionPool
    
    object WeiBoHot {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf()
             .setAppName("Weibo HotSerach Application")
             .setMaster("local[2]")
         // Create SparkContext
         val sc = new SparkContext(sparkConf)
        //5秒获取一次
         val ssc = new StreamingContext(sc, Seconds(5))
        //设置checkPoint
        ssc.checkpoint(".")
        // Kafka Cluster
         val kafkaParam = Map("metadata.broker.list" -> "master:9092")
    
         // Kafka Topics
         val topics = Set("weiboTopic")
     
         // Step 1: Create DStream
         val lineDStream = KafkaUtils.createDirectStream[
           String, String, StringDecoder,StringDecoder](
             ssc, // StreamingContext
             kafkaParam, // kafkaParams: Map[String, String]
             topics // topics: Set[String]
         ).map(tuple => tuple._2)
         // Step 2: DStream Transformation
         val tupleDStream = lineDStream
               .map(line => line.split("\t"))
               .map(arr=>{
                 val rank=arr(0)
                 val title=arr(1)
                 val url=arr(2)
                 val date=arr(3)
                 (rank,title,url,date)
               })
           tupleDStream.foreachRDD(rdd => rdd.foreachPartition(partitionOfRecords =>{
           val connection = ConnectionPool.getConnection()
           partitionOfRecords.foreach(record => {
             println("input data is " + record._1 + "\t" +record._2+"\t"+record._3+"\t"+record._4)    
            
            val sql1="select * from weiboHotSearch where title='"+record._2+"'"
             println("sql:" + sql1)
             val stmt = connection.createStatement        
             val rs=stmt.executeQuery(sql1)
             if(rs.next()){
               //对比之前的排名,更新排名
               var highest_rank=rs.getInt("highest_rank")
               if(record._1.toInt<highest_rank){
                 highest_rank=record._1.toInt
               }
               val sql="update  weiboHotSearch set highest_rank="+highest_rank+" where title='"+record._2+"'"
               println("sql:" + sql)
               stmt.executeUpdate(sql)
             }else{
               val sql="insert into weiboHotSearch values("+record._1+",'"+record._2+"','"+record._3+"','"+record._4+"')"
              println("sql:" + sql)
              stmt.executeUpdate(sql)          
             }      
              
           })
           ConnectionPool.returnConnection(connection)
         }
         ))
         //封装事件
         tupleDStream.print()
         ssc.start()
         ssc.awaitTermination()
         // Stop Context
         ssc.stop()
         sc.stop()
      }
    } 
    
    package com.stanley.sparktest.sqlUtil;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.util.LinkedList;
    
    public class ConnectionPool {
        private static LinkedList<Connection> connectionQueue;
    
        static {
            try {
                Class.forName("com.mysql.jdbc.Driver");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    
        public synchronized static Connection getConnection() {
            try {
                if (connectionQueue == null) {
                    connectionQueue = new LinkedList<Connection>();
                    for (int i = 0; i < 5; i++) {
                        Connection conn = DriverManager.getConnection(
                                "jdbc:mysql://master:3306/test",
                                "root",
                                "123456");
                        connectionQueue.push(conn);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return connectionQueue.poll();
        }
        public static void returnConnection(Connection conn){connectionQueue.push(conn);}
        
    }


    第四步

    整合flume kafka 通过执行tail-F抽取微博热搜榜

    # The configuration file needs to define the sources, 
    # the channels and the sinks.
    # Sources, channels and sinks are defined per agent, 
    # in this case called 'agent'
    
    a2.sources = r2
    a2.channels = c2
    a2.sinks = k2
    
    # define sources
    a2.sources.r2.type = exec
    ## 注意一定要执行flume命令的用户对该/var/log/httpd/access_log文件
    ## 具有可读的权限
    a2.sources.r2.command = tail -F /opt/project/weibo/data_`date +"%Y-%m-%d"`
    a2.sources.r2.shell = /bin/bash -c
    
    # define channels
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # define sinks
    #启用设置多级目录,这里按年/月/日/时 2级目录,每个小时生成一个文件夹
    a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
    a2.sinks.k2.brokerList = master:9092
    a2.sinks.k2.topic = weiboTopic
    
    # bind the sources and sinks to the channels
    a2.sources.r2.channels = c2
    a2.sinks.k2.channel = c2


    第五步

    启动kafka 创建topic ,启动flume

    bin/kafka-server-start.sh config/server.properties
    bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic weiboTopic


    第六步

    创建hive分区表与mysql关联


    用来存放mysql数据
    create external table weiboHotSearch(
    highest_rank int,
    title string,
    url string,
    day_date string
    )partitioned by (year string,month string,day string)
    row format delimited fields terminated by '\t';
    
    临时表,用来导入mysql数据
    create table weiboHotSearch_temp(
    highest_rank int,
    title string,
    url string,
    day_date string
    )row format delimited fields terminated by '\t';


    第七步

    编写shell脚本

    #!/bin/sh
    PATH=/usr/local/bin
    export PATH
    python_dir=/opt/project/weibo/weiboHot.py
    python ${python_dir} 
    

    #!/bin/sh
    #操作脚本
    #使用环境变量
    . /etc/profile
    
    #HDFS数据源目录
    DATA_LOG=/user/hive/warehouse/work0403.db
    
    #当天日期
    TODAY=`date +%Y-%m-%d`
    
    #前一天日期
    YESTERDAY=$(date -d "yesterday" +%Y-%m-%d)
    
    #设置数据库变量
    HOSTNAME=master
    PORT=3306
    USERNAME=root
    PASSWORD=123456
    DATABASE=weibo
    #设置sql语句
    sql_truncate_temp="truncate table ${DATABASE}.weiboHotSearch_temp"
    sql_insert="insert into ${DATABASE}.weiboHotSearch_temp (select * from ${DATABASE}.weiboHotSearch where day_date=\""${TODAY}"\")"
    sql_truncate_main="truncate table ${DATABASE}.weiboHotSearch"
    #将前一天的临时表清空
    mysql -h${HOSTNAME}  -P${PORT}  -u${USERNAME} -p${PASSWORD} -e "${sql_truncate_temp}" --default-character-set=UTF8
    #将主表数据插入临时表
    mysql -h${HOSTNAME}  -P${PORT}  -u${USERNAME} -p${PASSWORD} -e "${sql_insert}" --default-character-set=UTF8
    #主表数据清空
    mysql -h${HOSTNAME}  -P${PORT}  -u${USERNAME} -p${PASSWORD} -e "${sql_truncate_main}" --default-character-set=UTF8
    #清空hive前一天临时表
    hive -e "
    use ${DATABASE};
    truncate table weiboHotSearch_temp;"
    #用sqoop 将临时表数据导入hive 临时表
    sqoop import \
    --connect jdbc:mysql://${HOSTNAME}:${PORT}/${DATABASE} \
    --username ${USERNAME} \
    --password ${PASSWORD} \
    --table weiboHotSearch_temp \
    --num-mappers 1 \
    --fields-terminated-by "\t" \
    --delete-target-dir \
    --hive-database ${DATABASE} \
    --hive-import \
    --hive-table weiboHotSearch_temp
    
    #将hive 临时表数据导入hive主表
    hive -e "
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nostrick;
    insert into table ${DATABASE}.weiboHotSearch partition(year,month,day)
    select highest_rank,title,url,day_date,substr(day_date,0,4) year,substr(day_date,6,2) month, substr(day_date,9,2) day
    
    from ${DATABASE}.weiboHotSearch_temp;"
    
    #删除前一天数据文件
    rm -rf /opt/project/weibo/data_${YESTERDAY}

    第八步

    测试运行

    启动hdfs
    ${HADOOP_HOME}/sbin/start-dfs.sh
    ${HADOOP_HOME}/sbin/start-yarn.sh
    ${HADOOP_HOME}/sbin/mr-jobhistory-daemon.sh start historyserver 
    启动zookeeper
    ${ZOOKEEPER_HOME}/sbin/zkServer.sh start
    启动hive metastore
    ${HIVE_HOME}/bin/hive --service metastore &
    启动flume
    ${FLUME_HOME}/bin/flume-ng agent --conf conf/ --name a2 --conf-file /opt/project/weibo/flume-kafka_weibo.conf
    启动kafka
    ${KAFKA_HOME}/bin/kafka-server-start.sh config/server.properties
    运行 spark java 包
    ${SPARK_HOME}/bin/spark-submit \
    --class com.stanley.sparktest.weibo.WeiBoHot \
    /opt/project/weibo/weiboHot.jar
    执行python 脚本
    bash /opt/project/weibo/python_shell.sh
    python脚本执行完后执行操作脚本
    bash /opt/project/weibo/operation.sh

    爬取的内容抽取到kafka集群中

    查看数据库,数据已存入数据库

    执行操作脚本后

    数据库中接收数据表被清空,数据转入准备导入和hive 连接的表



    数据已经存入到hive 中



    数据在存入到hdfs 分区当中


    数据文件已经自动产生


    第九步

    设置定时任务

    可以用oozie来调度,

    Crontab调度如下

    #每十分钟爬取一次微博热搜数据
    2,12,22,32,42,52 * * * * bash /opt/project/weibo/python_shell.sh
    #每天23点55分执行操作脚本,把数据存入到hdfs
    55 23 * * * bash /opt/project/weibo/operation.sh
    

    使用hue 页面操作oozie

    创建两个workflow,分别对应python脚本,和操作脚本

    两个脚本分别创建coordinator,频率和上面的crontab一致



    创建bundle,将两个coordinator绑在一起

    最后提交bundle任务 



    展开全文
  • spark操作读取hbase实例

    千次阅读 2017-01-23 16:49:15
    博主项目实践中,经常需要用spark从hbase中读取数据。其中,spark的版本为1.6,hbase的版本为0.98。现在记录一下如何在spark中操作读取hbase中的数据。 对于这种操作型的需求,没有什么比直接上代码更简单明了的了...

    项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
    欢迎大家star,留言,一起学习进步

    博主项目实践中,经常需要用spark从hbase中读取数据。其中,spark的版本为1.6,hbase的版本为0.98。现在记录一下如何在spark中操作读取hbase中的数据。
    对于这种操作型的需求,没有什么比直接上代码更简单明了的了。so,show me the code!

    object Demo extends Logging{
    
      val CF_FOR_FAMILY_USER = Bytes.toBytes("U");
      val CF_FOR_FAMILY_DEVICE = Bytes.toBytes("D")
      val QF_FOR_MODEL = Bytes.toBytes("model")
      val HBASE_CLUSTER = "hbase://xxx/"
      val TABLE_NAME = "xxx";
      val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME
    
      def genData(sc:SparkContext) = {
        //20161229的数据,rowkey的设计为9999-yyyyMMdd
        val filter_of_1229 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("79838770"))
        //得到qf为w:00-23的数据
        val filter_of_qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("w"))
    
        val all_filters = new util.ArrayList[Filter]()
        all_filters.add(filter_of_1229)
        all_filters.add(filter_of_qf)
    
    	//hbase多个过滤器
        val filterList = new FilterList(all_filters)
    
        val scan = new Scan().addFamily(CF_FOR_FAMILY_USER)
        scan.setFilter(filterList)
        scan.setCaching(1000)
        scan.setCacheBlocks(false)
    
        val conf = HBaseConfiguration.create()
        conf.set(TableInputFormat.INPUT_TABLE,HBASE_TABLE )
        conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
           sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
        //后面是针对hbase查询结果的具体业务逻辑
        .map()
        ...
    
      def main(args: Array[String]): Unit = {
        val Array(output_path) = args
    
        val sparkConf = new SparkConf().setAppName("demo")
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val sc = new SparkContext(sparkConf)
    
    	genUuidWifi(sc).saveAsTextFile(output_path)
    	sc.stop()
      }
    }
    

    需要注意的一个小点就是如果hbase里有多个过滤器,注意需要使用FilterList。

    展开全文
  • 从Github上获取Spark源码编译Spark项目 源码导入与代码运行 导入源码到Intellij IDEA 16运行实例代码 1. 配置运行参数2. 添加缺失的flume sink源代码3. 添加运行依赖的jars4. 成功运行实例代码 单步调试源...

    工欲善其事,必先利其器,第一篇笔记介绍如何搭建源码研读和代码调试的开发环境。 一些必要的开发工具,请自行提前安装:

    • scala 2.11.8
    • sbt 0.13.12
    • maven 3.3.9
    • git 2.10.2
    • IntelliJ IDEA 2016.3 (scala plugin)

    本人使用macOS 10.12,所有笔记都基于这个系统,但是其他系统也可以很容易找到对应的解决方案,比如IDE的快捷键。

    源码获取与编译

    从Github上获取Spark源码

    可以直接从Spark官方Github仓库拉取。本系列笔记基于Spark 2.1.0这个版本,所以先checkout这个tag,再进行之后的步骤:

    $ git clone git@github.com:apache/spark.git
    $ cd spark
    $ git tag
    $ git checkout v2.1.0
    $ git checkout -b pin-tag-210

    如果想要push自己的commits,也可以fork到自己的Github账号下,再拉取到本地,可以参考我之前的文章:Reading Spark Souce Code in IntelliJ IDEA

    编译Spark项目

    参考官方文档,编译很简单,这里使用4个线程,跳过tests,以此加速编译。这个编译会产生一些必要的源代码,如Catalyst项目下的,所以是必要的一步:

    $ build/mvn -T 4 -DskipTests clean package
    # 编译完成后,测试一下
    $ ./bin/spark-shell

    packageAndRun

    源码导入与代码运行

    导入源码到Intellij IDEA 16

    现在IDEA对scala支持已经比较完善,导入Spark工程非常简单:

    Menu -> File -> Open -> {spark dir}/pom.xml -> Open as Project

    openIDE

    运行实例代码

    导入工程后,介绍一下如何运行Spark项目自带的实例代码,在{spark dir}/examples/目录下,这里以LogQuery为例:

    command + o -> 输入LogQuery打开

    LogQueryCode

    1. 配置运行参数:

    Menu -> Run -> Edit Configurations -> 选择 + -> Application

    参数配置如下: runConfigVM options: -Dspark.master=local代表使用本地模式运行Spark代码,也可以选择其他模式。 保存配置后,可以看到LogQuery在运行选项里了: runLogQuery

    2. 添加缺失的flume sink源代码

    首次运行LogQuery会报错,因为IDE找不到flume依赖的部分源码: flumeSinkError解决方案如下:

    Menu -> File -> Project Structure -> Modules -> spark-streaming-flume-sink_2.11 -> Sources

    1. 把 target目录加入Sources(点击蓝色Sources)
    2. 把子目录sink也加入Sources

    参考下图,注意右边的Source Folders列表: moduleFlumeSink

    3. 添加运行依赖的jars

    再次运行,这次会花费比较长的时间,因为已经可以成功编译LogQuery啦,但是还是没能运行成功,报错如下: missDepJars不要慌,这说明你的代码编译已经成功啦,运行出错的原因是,运行Spark App一般都是通过spark-submit命令,把你的jar运行到已经安装的Spark环境里,也就是所有的Spark依赖都已经有啦,现在你用IDE的方式,就会缺少依赖。

    解决方案如下:

    Menu -> File -> Project Structure -> Modules -> spark-examples_2.11 -> Dependencies 添加依赖 jars -> {spark dir}/spark/assembly/target/scala-2.11/jars/

    addDepJars

    有两点需要注意:

    • jars/*.jar: 这些依赖jars是在第一步编译打包Spark项目的时候产生的,如果这个目录是空的,或者修改了源代码想更新这些jars,可以用同样的命令再次编译Spark:
    $ build/mvn -T 4 -DskipTests clean package
    
    • 从上图中右侧的Scope一栏可以看到,基本上所有依赖jars都是Provided,也就是说默认都是提供的,因为默认都是用spark-submit方式运行Spark App的。

    4. 成功运行实例代码

    终于再次运行LogQuery的时候,可以看到输出啦: runLogQuerySuccessfully

    单步调试源代码

    千辛万苦地终于让实例代码在IDE里跑起来了,是不是很有成就感。其实做了那么多的铺垫工作,在IDE里面运行代码的最大福利是可以单步调试! 很简单,选择断点,然后Run -> Debug,可以看到中间变量值等等,其他的自行探索吧: debug

    展开全文
  • 从Github上获取Spark源码编译Spark项目 源码导入与代码运行 导入源码到Intellij IDEA 16运行实例代码 1. 配置运行参数2. 添加缺失的flume sink源代码3. 添加运行依赖的jars4. 成功运行实例代码 ...

    搭建Spark源码研读和代码调试的开发环境

    工欲善其事,必先利其器,第一篇笔记介绍如何搭建源码研读和代码调试的开发环境。 一些必要的开发工具,请自行提前安装:

    • scala 2.11.8
    • sbt 0.13.12
    • maven 3.3.9
    • git 2.10.2
    • IntelliJ IDEA 2016.3 (scala plugin)


    源码获取与编译

    从Github上获取Spark源码

    可以直接从Spark官方Github仓库拉取。本系列笔记基于Spark 2.0.2这个版本,所以先checkout这个tag,再进行之后的步骤:

    $ git clone git@github.com:apache/spark.git
    $ cd spark
    $ git tag
    $ git checkout v2.0.2 
    $ git checkout -b pin-tag-202

    如果想要push自己的commits,也可以fork到自己的Github账号下,再拉取到本地

    编译Spark项目

    参考官方文档,编译很简单,这里使用4个线程,跳过tests,以此加速编译。这个编译会产生一些必要的源代码,如Catalyst项目下的,所以是必要的一步:

    $ build/mvn -T 4 -DskipTests clean package
    # 编译完成后,测试一下
    $ ./bin/spark-shell

    packageAndRun

    源码导入与代码运行

    导入源码到Intellij IDEA 16

    现在IDEA对scala支持已经比较完善,导入Spark工程非常简单:

    Menu -> File -> Open -> {spark dir}/pom.xml -> Open as Project

    openIDE

    运行实例代码

    导入工程后,介绍一下如何运行Spark项目自带的实例代码,在{spark dir}/examples/目录下,这里以LogQuery为例:

    command + o -> 输入LogQuery打开

    LogQueryCode

    1. 配置运行参数:

    Menu -> Run -> Edit Configurations -> 选择 + -> Application

    参数配置如下: runConfigVM options: -Dspark.master=local代表使用本地模式运行Spark代码,也可以选择其他模式。 保存配置后,可以看到LogQuery在运行选项里了: runLogQuery

    2. 添加缺失的flume sink源代码

    首次运行LogQuery会报错,因为IDE找不到flume依赖的部分源码: flumeSinkError解决方案如下:

    Menu -> File -> Project Structure -> Modules -> spark-streaming-flume-sink_2.11 -> Sources 1. 把 target目录加入Sources(点击蓝色Sources) 2. 把子目录sink也加入Sources

    参考下图,注意右边的Source Folders列表: moduleFlumeSink

    3. 添加运行依赖的jars

    再次运行,这次会花费比较长的时间,因为已经可以成功编译LogQuery啦,但是还是没能运行成功,报错如下: missDepJars不要慌,这说明你的代码编译已经成功啦,运行出错的原因是,运行Spark App一般都是通过spark-submit命令,把你的jar运行到已经安装的Spark环境里,也就是所有的Spark依赖都已经有啦,现在你用IDE的方式,就会缺少依赖。

    解决方案如下:

    Menu -> File -> Project Structure -> Modules -> spark-examples_2.11 -> Dependencies 添加依赖 jars -> {spark dir}/spark/assembly/target/scala-2.11/jars/

    addDepJars

    有两点需要注意:

    • jars/*.jar: 这些依赖jars是在第一步编译打包Spark项目的时候产生的,如果这个目录是空的,或者修改了源代码想更新这些jars,可以用同样的命令再次编译Spark:

      $ build/mvn -T 4 -DskipTests clean package
      
    • 从上图中右侧的Scope一栏可以看到,基本上所有依赖jars都是Provided,也就是说默认都是提供的,因为默认都是用spark-submit方式运行Spark App的。

    4. 成功运行实例代码

    终于再次运行LogQuery的时候,可以看到输出啦: runLogQuerySuccessfully

    单步调试源代码

    千辛万苦地终于让实例代码在IDE里跑起来了,是不是很有成就感。其实做了那么多的铺垫工作,在IDE里面运行代码的最大福利是可以单步调试! 很简单,选择断点,然后Run -> Debug,可以看到中间变量值等等,其他的自行探索吧: debug

    展开全文
  • spark更新广播变量实例

    千次阅读 2017-10-11 00:27:48
    实际项目中需要使用更新广播变量的方式来满足需求,其代码具体如下: @NotProguard public class WbBroadcastWrapper { Broadcast> broadcastBlackWhiteList = null; private static WbBroadcastWrapper obj = ...
  • 本课程我们只针对以上图示的浅蓝色部分内容(即与机器学习相关的内容),通过用真实的智能客户项目系统作为案例(案例附带源码,可以直接做二次开发),主要根据项目实例穿讲机器学习以及相关知识,包括有:数据提取...
  • 目录 ...为每个线程创建独立的格式实例 共用一个格式,但是外部代码进行同步 解决问题:java.lang.NumberFormatException: multiple points 在实时广告流统计模块测试的时候,使用的是数据库连...
  • 使用Google Cloud Platform的Kubernetes引擎进行自然语言处理本文是一个较大的项目的一部分。 如果您还对可伸缩的Web... 不过,您可以在本地计算机上运行它,但是您需要更改代码并替换一些使用的资源。本文面向的是...
  • Spark大型电商项目-用户活跃度分析模块一共实现了五个计算功能,七个实例 1.指定时间内话同次数显多的10个户 2.指定时间即内购买商品全额最多的10个用户 3.量近周期力相对之第一个周啊访问次数增长最识的10个用...
  • 我们自己定义的类,其实默认情况下,都是可以让外界的代码随意创建任意多个实例的但是有些时候,我们不希望外界来随意创建实例,而只是希望一个类,在整个程序运行期间,只有一个实例 任何外界代码,都不能随意创建...
  • spark streaming 实时计算

    2016-10-12 18:37:00
    如何创建spark项目 编写streaming代码示例 如何调试 环境配置: spark 原生语言是scala,我用的是spark-1.4.1-bin-hadoop2.6,可以查阅官方说明,用的是scala-2.10.1。 网上下载 scala-2.10.1 安装包。解压...
  • SessionDetail表实例化 SessionDetail.java ISessionDetailDAO.java UserVisitSessionAnalyzeSpark.java UserVisitSessionAnalyzeSpark.java完整代码 本篇文章将记录用户访问session分析-session随机抽取之...
  • 上一篇文章写了如何通过Maven...一、打开IDE,并加入项目,按如下操作Menu -> File -> Open -> {spark dir}/pom.xml -> Open as Project二、导入spark工程后,实例代码在:在{spark dir}/examples/目录下 。双击“Shi
  • 高职高专大数据技术与应用专业系列教材本书摒弃了传统IT教材结构,采用更加符合学习者认知规律的项目化、任务驱动方式编写,包含7大项目、42个任务、200余个实例。本书提供了大量配套资源,包括搭建好运行环境的...
  • 目录 一、理论依据 1、说明 2、saveAsHadoopFile算子 (1)形式 (2)解析说明 3、MultipleOutputFormat 二、代码实例 1、SparkSaveAsHadoopFiles 2、自定义...在spark实际项目应用中,总会...
  • 基本告一段落,项目也接近尾声,那么整理下spark所有配置参数与优化策略,方便以后开发与配置: Spark安装配置与代码框架 spark-default.conf 配置 spark.executor.instance 参数,向Yarn申请创建的资源池实例数 ...
  • ALS ,ALS-WR算法1

    2016-12-20 17:04:33
    原理介绍:https://github.com/ceys/jdml/wiki/ALS针对wiki中的公式显示整理:http://www.fuqingchuan.com/2015/03/812.htmlspark mllib及代码实例:https://spark.apache.org/docs/latest/mllib-col
  • 在上节中我们学会了建立手机项目,在这节我们将来实践我们第一个实例,虽然只是在界面上显示一句话,但是意义不同证明了我们我们各方面已经到位,随时都可以进行战斗了呵呵。 2.1.1自我介绍 表误会!这里自我...
  • 实时项目统计 javaweb log -> flume -> kafka -> sparkStreaming -> Hbase -> Spring boot -> echart 参考博客 https://blog.csdn.net/qq_27384769/article/details/80220626 20 python/...
  • 基于spring-ladp的统一用户中心结构设计以及代码结构设计 基于ldap实现的统一用户代码 spring data jpa 使用技巧 ta jpa扩展支持动态sql jpa data 对象关联 springboot使用hibernate validator校验 ...
  • Airbnb JavaScript 代码规范(ES6).epub Ajax 专栏.epub Android Gradle 用户指南.epub Android SDK上手指南.epub android SDK开发.epub Android Studio新手完全指引.epub Android Studio详细教程.mobi Android ...
  • Airbnb JavaScript 代码规范(ES6).epub Ajax 专栏.epub Android Gradle 用户指南.epub Android SDK上手指南.epub android SDK开发.epub Android Studio新手完全指引.epub Android Studio详细教程.mobi Android ...
  • awesome-python 是 vinta 发起维护的 Python 资源列表,内容包括:Web 框架、网络爬虫、网络内容提取、模板引擎、数据库、数据可视化、图片处理、文本处理、自然语言处理、机器学习、日志、代码分析等。由「开源前哨...
  • <p>Based on the existing task execution method of DolphinScheduler, a more appropriate way is to use Spark as the execution engine for data quality tasks, pass specific execution SQL to the Spark job ...

空空如也

空空如也

1 2
收藏数 26
精华内容 10
关键字:

spark项目实例代码