2017-12-14 16:33:13 leen0304 阅读数 1363
  • 玩转Linux:常用命令实例指南

    本课程以简洁易懂的语言手把手教你系统掌握日常所需的Linux知识,每个知识点都会配合案例实战让你融汇贯通 。课程通俗易懂,简洁流畅,适合0基础以及对Linux掌握不熟练的人学习; 注意: 1、本课程本月原价99元,截止2月22日前仅需29元!购课就送5门价值300元的编程课! 2、课程内容不仅是大纲上这些,2月底前老师会继续增加10余节课程,届时会恢复原件!现在购买最划算! 3、购课后登陆csdn学院官网,在课程目录页面即可下载课件。 学完即可轻松应对工作中 85% 以上的 Linux 使用场景 【限时福利】 1)购课后按提示添加小助手,进答疑群,还可获得价值300元的编程大礼包! 2)本课程【现在享受秒杀价29元】 3)本月购买此套餐加入老师答疑交流群,可参加老师的免费分享活动,学习最新技术项目经验。 注意: 1)现在购买至少享受70元优惠; 2)购课后添加微信itxy06,发送订单截图领取300元编程礼包。 --------------------------------------------------------------- 这门课程,绝对不会让你觉得亏! 29元=掌握Linux必修知识+社群答疑+讲师社群分享会+300元编程礼包。   人工智能、物联网、大数据时代,Linux正有着一统天下的趋势,几乎每个程序员岗位,都要求掌握Linux。本课程零基础也能轻松入门。   在这门课中,我们保证你能收获到这些 1)快速掌握 Linux 常用命令及配置 2)Linux核心知识点 3) 文件、进程、磁盘、用户管理,以及网络工具命令、文件传输等 4)Vi/Vim编辑器用法  

    5417 人正在学习 去看看 严宇

sortBy是对标准的RDD进行排序。[在scala语言中,RDD与PairRDD没有太严格的界限]。
sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。


sortBy

源码

  /**
   * RDD.scala
   * Return this RDD sorted by the given key function.
   */
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }

该函数最多可以传三个参数:
第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了,它的实现如下:

def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
  val cleanedF = sc.clean(f)
  map(x => (cleanedF(x), x))
}

Scala版本案例1

针对scala语言的pairRDD操作

def sortBy(): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("sortBy")
  val sc = new SparkContext(conf)
  val rdd = sc.makeRDD(List(("a",0),("d",5),("d",2),("c",1),("b",2),("b",0)),4)

  // 按照tuple2的第二个元素降序排列
  val res =rdd.sortBy(_._2,false,2)
  res.foreach(x=> print(x+ " "))
}

结果:(d,5) (d,2) (b,2) (c,1) (a,0) (b,0)


Scala版本案例2

针对scala的rdd操作

def sortBy(): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("sortBy")
  val sc = new SparkContext(conf)
  val rdd = sc.makeRDD(List(5,1,9,12),2)

  val res =rdd.sortBy(x=>x,false,1) //其中的x=>x相当于是函数,不能用_代替
  res.foreach(x=> print(x+ " "))
}

结果:12 9 5 1

上面的实例对rdd中的元素进行降序排序。并对排序后的RDD的分区个数进行了修改,上面的res就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。


Java版本案例

org.apache.spark.api.java.JavaRDD源码如下:

/**
 * Return this RDD sorted by the given key function.
 */
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T] = {
  def fn: (T) => S = (x: T) => f.call(x)
  import com.google.common.collect.Ordering  // shadows scala.math.Ordering
  implicit val ordering = Ordering.natural().asInstanceOf[Ordering[S]]
  implicit val ctag: ClassTag[S] = fakeClassTag
  wrapRDD(rdd.sortBy(fn, ascending, numPartitions))
}

需要注意的是,此处和scala不同,必须是三个参数!

private static void sortBy() {
    SparkConf conf = new SparkConf().setAppName("sortBy").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numList = Arrays.asList(1, 5, 3, 5, 6, 7, 0, 1, 10);
    JavaRDD<Integer> numRdd = sc.parallelize(numList);

    Function<Integer, Integer> fun1 = new Function<Integer, Integer>() {
        @Override
        public Integer call(Integer v1) throws Exception {
            return v1;
        }
    };

    JavaRDD<Integer> res = numRdd.sortBy(fun1, false, 1);
    res.foreach(x -> System.out.println(x));

    sc.close();
}

结果:10 7 6 5 5 3 1 1 0


sortByKey

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。
它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的。
 
源码

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。


Scala版本案例

def sortByKey(): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("sortBy")
  val sc = new SparkContext(conf)
  val rdd = sc.makeRDD(List((11,0),(10,5),(4,2),(6,1),(20,2),(8,0)))

  val res =rdd.sortByKey(true,1)
  res.foreach(x => print(x + " "))
}

结果:(4,2) (6,1) (8,0) (10,5) (11,0) (20,2)


上面对Key进行了排序。细心的读者可能会问,sortBy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:
private val ordering = implicitly[Ordering[K]]
他就是默认的排序规则,我们可以对它进行重写,如下:

def sortByKey(): Unit = {
  val conf = new SparkConf().setMaster("local").setAppName("sortBy")
  val sc = new SparkContext(conf)
  val rdd = sc.makeRDD(List((11,0),(10,5),(4,2),(6,1),(20,2),(8,0)))
  //重写ordering
  implicit val sortIntegersByString = new Ordering[Int]{
    override def compare(x: Int, y: Int): Int = x.toString.compareTo(y.toString)
  }
  val res =rdd.sortByKey(true,1)
  res.foreach(x => print(x + " "))
}

结果:(10,5) (11,0) (20,2) (4,2) (6,1) (8,0)


Java版本案例

源码

def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
  implicit val ordering = comp // 允许比较器隐式转换为排序。
  fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
}

def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(): JavaPairRDD[K, V]


public static void sortByKey() {
    SparkConf conf = new SparkConf().setAppName("sortBy").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<Tuple2<String, Integer>> scoreList = Arrays.asList(
            new Tuple2<String, Integer>("1", 90),
            new Tuple2<String, Integer>("2", 60),
            new Tuple2<String, Integer>("21", 60),
            new Tuple2<String, Integer>("3", 50)
    );

    JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(scoreList);

    // 自定义比较器:直接这样在内部定义是不正确的
    // 抛出异常Task not serializable: java.io .NotSerializableException:
    // 解决办法,另外定义一个小class ,implements Comparator<String>,Serializable

/*      
    Comparator<String> comparator = new Comparator<String>() {
        @Override
        public int compare(String o1, String o2) {
            return Integer.valueOf(o1).compareTo(Integer.valueOf(o2));
        }
    };
*/
    JavaPairRDD<String, Integer> res = pairRDD.sortByKey();
    //JavaPairRDD<String, Integer> res = pairRDD.sortByKey(new MyComparator());

    res.foreach(x -> System.out.print(x + " "));
    sc.close();
}

自定义比较器:

public class MyComparator implements Comparator<String>,Serializable {
    @Override
    public int compare(String o1, String o2) {
        return Integer.valueOf(o1).compareTo(Integer.valueOf(o2));
    }
}

不使用比较器结果:(1,90) (2,60) (21,60) (3,50)
使用比较器结果:(1,90) (2,60) (21,60) (3,50)


二次排序

Spark Java 二次排序

Spark Scala 二次排序

2020-01-10 13:30:55 haoxuan06 阅读数 12
  • 玩转Linux:常用命令实例指南

    本课程以简洁易懂的语言手把手教你系统掌握日常所需的Linux知识,每个知识点都会配合案例实战让你融汇贯通 。课程通俗易懂,简洁流畅,适合0基础以及对Linux掌握不熟练的人学习; 注意: 1、本课程本月原价99元,截止2月22日前仅需29元!购课就送5门价值300元的编程课! 2、课程内容不仅是大纲上这些,2月底前老师会继续增加10余节课程,届时会恢复原件!现在购买最划算! 3、购课后登陆csdn学院官网,在课程目录页面即可下载课件。 学完即可轻松应对工作中 85% 以上的 Linux 使用场景 【限时福利】 1)购课后按提示添加小助手,进答疑群,还可获得价值300元的编程大礼包! 2)本课程【现在享受秒杀价29元】 3)本月购买此套餐加入老师答疑交流群,可参加老师的免费分享活动,学习最新技术项目经验。 注意: 1)现在购买至少享受70元优惠; 2)购课后添加微信itxy06,发送订单截图领取300元编程礼包。 --------------------------------------------------------------- 这门课程,绝对不会让你觉得亏! 29元=掌握Linux必修知识+社群答疑+讲师社群分享会+300元编程礼包。   人工智能、物联网、大数据时代,Linux正有着一统天下的趋势,几乎每个程序员岗位,都要求掌握Linux。本课程零基础也能轻松入门。   在这门课中,我们保证你能收获到这些 1)快速掌握 Linux 常用命令及配置 2)Linux核心知识点 3) 文件、进程、磁盘、用户管理,以及网络工具命令、文件传输等 4)Vi/Vim编辑器用法  

    5417 人正在学习 去看看 严宇

一、前置知识详解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。
二、Spark SQL读写数据代码实战

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
 
import java.util.ArrayList;
import java.util.List;
 
public class SparkSQLLoadSaveOps {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
  JavaSparkContext sc = new JavaSparkContext(conf);
  SQLContext = new SQLContext(sc);
  /**
   * read()是DataFrameReader类型,load可以将数据读取出来
   */
  DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");
 
  /**
   * 直接对DataFrame进行操作
   * Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
   * 通过扫描整个Json。扫描之后才会知道元数据
   */
  //通过mode来指定输出文件的是append。创建新文件来追加文件
 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
 }
}

读取过程源码分析如下:

  1. read方法返回DataFrameReader,用于读取数据。
/**
 * :: Experimental ::
 * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
 * {{{
 *  sqlContext.read.parquet("/path/to/file.parquet")
 *  sqlContext.read.schema(schema).json("/path/to/file.json")
 * }}}
 *
 * @group genericdata
 * @since 1.4.0
 */
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)
  1. 然后再调用DataFrameReader类中的format,指出读取文件的格式。
 /**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source
 this
}
  1. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame
 /**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!

  1. 调用DataFrame中select函数进行对列筛选
 /**
 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 *
 * {{{
 *  // The following two are equivalent:
 *  df.select("colA", "colB")
 *  df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0
 */
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
  1. 然后通过write将结果写入到外部存储系统中。
 /**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
  1. 在保持文件的时候mode指定追加文件的方式
/**
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆盖
 *  - `SaveMode.Overwrite`: overwrite the existing data.
//创建新的文件,然后追加
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}
  1. 最后,save()方法触发action,将文件输出到指定文件中。
/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

三、Spark SQL读写整个流程图如下在这里插入图片描述
四、对于流程中部分函数源码详解

DataFrameReader.Load()

  1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取
/**
 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 *
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
 */
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此时的read就是DataFrameReader
 read.load(path)
}
  1. 追踪load源码进去,源码如下:
    在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。
/** 
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}
  1. 追踪load源码如下:
/**
 * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0
 */
def load(): DataFrame = {
//对传入的Source进行解析
 val resolved = ResolvedDataSource(
  sqlContext,
  userSpecifiedSchema = userSpecifiedSchema,
  partitionColumns = Array.empty[String],
  provider = source,
  options = extraOptions.toMap)
 DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

DataFrameReader.format()

  1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
    Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
/**
 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source //FileType
 this
}

DataFrame.write()

  1. 创建DataFrameWriter实例
/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
1
  1. 追踪DataFrameWriter源码如下:
    以DataFrame的方式向外部存储系统中写入数据。
/**
 * :: Experimental ::
 * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use [[DataFrame.write]] to access this.
 *
 * @since 1.4.0
 */
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

  1. Overwrite是覆盖,之前写的数据全都被覆盖了。
    Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `SaveMode.Overwrite`: overwrite the existing data.
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默认操作
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}
  1. 通过模式匹配接收外部参数
/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `overwrite`: overwrite the existing data.
 *  - `append`: append the data.
 *  - `ignore`: ignore the operation (i.e. no-op).
 *  - `error`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: String): DataFrameWriter = {
 this.mode = saveMode.toLowerCase match {
  case "overwrite" => SaveMode.Overwrite
  case "append" => SaveMode.Append
  case "ignore" => SaveMode.Ignore
  case "error" | "default" => SaveMode.ErrorIfExists
  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
 }
 this
}

DataFrameWriter.save()

  1. save将结果保存传入的路径。
/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}
  1. 追踪save方法。
/**
 * Saves the content of the [[DataFrame]] as the specified table.
 *
 * @since 1.4.0
 */
def save(): Unit = {
 ResolvedDataSource(
  df.sqlContext,
  source,
  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
  mode,
  extraOptions.toMap,
  df)
}
  1. 其中source是SQLConf的defaultDataSourceName
    private var source: String = df.sqlContext.conf.defaultDataSourceName
    其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
 defaultValue = Some("org.apache.spark.sql.parquet"),
 doc = "The default data source to use in input/output.")

DataFrame.scala中部分函数详解:

  1. toDF函数是将RDD转换成DataFrame
/**
 * Returns the object itself.
 * @group basic
 * @since 1.3.0
 */
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this
  1. show()方法:将结果显示出来
/**
 * Displays the [[DataFrame]] in a tabular form. For example:
 * {{{
 *  year month AVG('Adj Close) MAX('Adj Close)
 *  1980 12  0.503218    0.595103
 *  1981 01  0.523289    0.570307
 *  1982 02  0.436504    0.475256
 *  1983 03  0.410516    0.442194
 *  1984 04  0.450090    0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *       be truncated and all cells will be aligned right
 *
 * @group action
 * @since 1.5.0
 */
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println

追踪showString源码如下:showString中触发action收集数据。

/**
 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right
 */
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
 val numRows = _numRows.max(0)
 val sb = new StringBuilder
 val takeResult = take(numRows + 1)
 val hasMoreData = takeResult.length > numRows
 val data = takeResult.take(numRows)
 val numCols = schema.fieldNames.length

以上就是本文的全部内容,希望对大家的学习有所帮助

2019-12-16 18:14:17 weixin_43124279 阅读数 30
  • 玩转Linux:常用命令实例指南

    本课程以简洁易懂的语言手把手教你系统掌握日常所需的Linux知识,每个知识点都会配合案例实战让你融汇贯通 。课程通俗易懂,简洁流畅,适合0基础以及对Linux掌握不熟练的人学习; 注意: 1、本课程本月原价99元,截止2月22日前仅需29元!购课就送5门价值300元的编程课! 2、课程内容不仅是大纲上这些,2月底前老师会继续增加10余节课程,届时会恢复原件!现在购买最划算! 3、购课后登陆csdn学院官网,在课程目录页面即可下载课件。 学完即可轻松应对工作中 85% 以上的 Linux 使用场景 【限时福利】 1)购课后按提示添加小助手,进答疑群,还可获得价值300元的编程大礼包! 2)本课程【现在享受秒杀价29元】 3)本月购买此套餐加入老师答疑交流群,可参加老师的免费分享活动,学习最新技术项目经验。 注意: 1)现在购买至少享受70元优惠; 2)购课后添加微信itxy06,发送订单截图领取300元编程礼包。 --------------------------------------------------------------- 这门课程,绝对不会让你觉得亏! 29元=掌握Linux必修知识+社群答疑+讲师社群分享会+300元编程礼包。   人工智能、物联网、大数据时代,Linux正有着一统天下的趋势,几乎每个程序员岗位,都要求掌握Linux。本课程零基础也能轻松入门。   在这门课中,我们保证你能收获到这些 1)快速掌握 Linux 常用命令及配置 2)Linux核心知识点 3) 文件、进程、磁盘、用户管理,以及网络工具命令、文件传输等 4)Vi/Vim编辑器用法  

    5417 人正在学习 去看看 严宇

1、Spark的三种运行模式

1.1、Local模式

单机运行,通常用于测试。

1.2、Standalone模式

独立运行在一个spark的集群中。

1.3、Spark on Yarn/Mesos模式

Spark程序运行在资源管理器上,例如YARN/Mesos
Spark on Yarn存在两种模式
• yarn-client
• yarn-cluster

2.安装spark,并启动spark-shell;分别用local/standalone/yarn模式运行workcount。

1)截取spark-UI执行进度。
2)截取执行成功后输出的结果。
3)Spark on yarn模式,截取8088端口页面的截图。

Local模式:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

standalone模式:

在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在这里插入图片描述
yarn模式

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.请对Spark的RDD做简要的概述。

RDD 是 Spark 提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。
通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。

4.请对Saprk的Transformation和Action做简要描述,以及spark的懒执行是什么?

1、spark的transformation
Transformation用于对RDD的创建,RDD只能使用Transformation创建,同时还提供大量操作方法,包括map,filter,groupBy,join等,RDD利用这些操作生成新的RDD,但是需要注意,无论多少次Transformation,在RDD中真正数据计算Action之前都不可能真正运行。
2、Spark的action
Action是数据执行部分,其通过执行count,reduce,collect等方法真正执行数据的计算部分。实际上,RDD中所有的操作都是Lazy模式进行,运行在编译中不会立即计算最终结果,而是记住所有操作步骤和方法,只有显示的遇到启动命令才执行。这样做的好处在于大部分前期工作在Transformation时已经完成,当Action工作时,只需要利用全部自由完成业务的核心工作。
3、懒执行
Spark中,Transformation方法都是懒操作方法,比如map,flatMap,reduceByKey等。当触发某个Action操作时才真正执行。
①不运行job就触发计算,避免了大量的无意义的计算,即避免了大量的无意义的中间结果的产生,即避免产生无意义的磁盘I/O及网络传输
②更深层次的意义在于,执行运算时,看到之前的计算操作越多,执行优化的可能性就越高

2019-10-03 11:58:49 weixin_43930865 阅读数 32
  • 玩转Linux:常用命令实例指南

    本课程以简洁易懂的语言手把手教你系统掌握日常所需的Linux知识,每个知识点都会配合案例实战让你融汇贯通 。课程通俗易懂,简洁流畅,适合0基础以及对Linux掌握不熟练的人学习; 注意: 1、本课程本月原价99元,截止2月22日前仅需29元!购课就送5门价值300元的编程课! 2、课程内容不仅是大纲上这些,2月底前老师会继续增加10余节课程,届时会恢复原件!现在购买最划算! 3、购课后登陆csdn学院官网,在课程目录页面即可下载课件。 学完即可轻松应对工作中 85% 以上的 Linux 使用场景 【限时福利】 1)购课后按提示添加小助手,进答疑群,还可获得价值300元的编程大礼包! 2)本课程【现在享受秒杀价29元】 3)本月购买此套餐加入老师答疑交流群,可参加老师的免费分享活动,学习最新技术项目经验。 注意: 1)现在购买至少享受70元优惠; 2)购课后添加微信itxy06,发送订单截图领取300元编程礼包。 --------------------------------------------------------------- 这门课程,绝对不会让你觉得亏! 29元=掌握Linux必修知识+社群答疑+讲师社群分享会+300元编程礼包。   人工智能、物联网、大数据时代,Linux正有着一统天下的趋势,几乎每个程序员岗位,都要求掌握Linux。本课程零基础也能轻松入门。   在这门课中,我们保证你能收获到这些 1)快速掌握 Linux 常用命令及配置 2)Linux核心知识点 3) 文件、进程、磁盘、用户管理,以及网络工具命令、文件传输等 4)Vi/Vim编辑器用法  

    5417 人正在学习 去看看 严宇

spark开发实例

hadoop开发以及配置

1:开发准备

java,hadoop,scala,maven的windows环境都已配置并验证完毕
在这里插入图片描述

2:spark核心组件编程介绍

  * 在Spark中,所有的编程入口都是各种各样的Context
  *  **2.1  在SparkCore中的入口:SparkContext**
  *       有java和scala之分,java中是JavaSparkContext,scala是SparkContext
  *  **2.2在SparkSQL,
  *     spark2.0以前使用SQLContext,或者HiveContext
  *     spark2.0以后统一使用SparkSession的api
  *  **2.3 在SparkStreaming中,使用StreamingContext作为入口

3:在SparkCore的wordcount代码详解

先把集群的hdfs-site,xml,core-site.xml都放到resources下面
原始数据:data1.txt
1 Java bigdata
2 Java bigdata

/**

  *
  *   推荐大家的编程方式:倒推+填空(大体的逻辑架构)
  *
  * 编程步骤:
  *     1、构建SparkContext
  *         SparkContext需要一个SparkConf的依赖
  *             conf中必须要指定master url
  *             conf中必须要指定应用名称
  *     2、加载外部数据,形成一个RDD
  *     3、根据业务需要对RDD进行转换操作
  *     4、提交作业
  *     5、释放资源
  */
object SparkScalaWordCount {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        /**Master URL配置
          *   说明:Master URL代表的含义是Spark作业运行的方式
          *   本地模式(local) --Spark作业在本地运行(Spark Driver和Executor在本地运行)
          *     local       : 为当前的Spark作业分配一个工作线程
          *     local[M]    :为当前的Spark作业分配M个工作线程
          *     local[*]    :为当前的Spark作业分配当前计算集中所有可用的工作线程
          *     local[M, N] :为当前的Spark作业分配M个工作线程,如果Spark作业提交失败,会进行最多N的重试
          *   基于spark自身集群(standalone)
          *     格式:spark://<masterIp>:<masterPort>
          *         比如:spark://bigdata01:7077
          *     HA格式: spark://<masterIp1>:<masterPort1>,<masterIp2>:<masterPort2>,...
          *         比如:spark://bigdata01:7077,bigdata02:7077
          *   基于yarn(国内主流)
          *        cluster: SparkContext或者Driver在yarn集群中进行创建
          *        client:  SparkContext或者Driver在本地创建(所谓本地就是提交作业的那一台机器)
          *   基于mesos(欧美主流):
          *         master
          *         slave
          */
        val conf = new SparkConf()
      //conf指定
        conf.setAppName(s"${SparkScalaWordCount.getClass.getSimpleName}")
        conf.setMaster("local[*]")
        val sc = new SparkContext(conf)
        //加载本地的文件
        val lines:RDD[String] = sc.textFile("file:///C:data1.txt")
//        val lines:RDD[String] = sc.textFile("hdfs://ns1/data/spark/streaming/test/hello.txt")
        println("partition: " + lines.getNumPartitions)
//        lines.foreach(line => println(line))
        val words:RDD[String] = lines.flatMap(line => line.split("\\s+"))
//        words.foreach(word => println(word))
        val pairs:RDD[(String, Int)] = words.map(word => (word, 1))
//      pairs.foreach{case (word, count) => println(word + "--->" + count)}
        //select key, count(*) from t group by key
        val ret:RDD[(String, Int)] = pairs.reduceByKey((v1, v2) => {
//            val i = 1 / 0
            v1 + v2
        })

        ret.foreach{case (word, count) => println(word + "--->" + count)}
        sc.stop()
    }
}

结果:

Java--->2
1--->1
2--->1
bigdata--->2

4:spark sql编程

SparkSQL整合Hive的操作准备

/**
  * SparkSQL整合Hive的操作
  * 有两张hive表:
  *     teacher_info
  *         name,height
  *     teacher_basic
  *         name,
  *         age
  *         married
  *         children
  *     进行关联查询,查询出所有的teacher信息并且保存在hive的表中
  *     select
  *         i.name,
  *         b.age,
  *         b.married,
  *         b.children,
  *         i.height
  *     from teacher_info i
  *     left join teacher_basic b on i.name = b.name
  * SparkSQL和Hive进行整合需要做如下的配置:
  *     1、需要将hive-site.xml添加到classpath下面
  *         将hive-site.xml放到项目的resources目录下面
  *     2、将mysql的驱动jar包拷贝到spark的classpath下面
  *         直接将jar扔到spark的jars目录下面
  *         cp ~/app/hive/lib/mysql-connector-java-5.1.39.jar ~/app/spark/jars/
  *         scp ~/app/spark/jars/mysql-connector-java-5.1.39.jar bigdata@bigdata02:/home/bigdata/app/spark/jars/
  *         scp ~/app/spark/jars/mysql-connector-java-5.1.39.jar bigdata@bigdata03:/home/bigdata/app/spark/jars/
  */

代码开发

 def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        val spark = SparkSession.builder()
            .appName("SparkSQLLoadAndSave")
            .enableHiveSupport()//有他可以提供hive的相关操作
            .getOrCreate()
        //创建一个数据库
        spark.sql("create database db_1810")

        val createInfoSQL =
            """
              |create table `db_1810`.`teacher_info` (
              |   name string,
              |   height int
              |) row format delimited
              |fields terminated by ','
            """.stripMargin
        spark.sql(createInfoSQL)

        val createBasicSQL =
            """
              |create table `db_1810`.`teacher_basic` (
              |   name string,
              |   age int,
              |   married boolean,
              |   children int
              |) row format delimited
              |fields terminated by ','
            """.stripMargin
        spark.sql(createBasicSQL)
        //加载数据
        val loadInfoSQL = "load data inpath 'hdfs://ns1/input/spark/teacher_info.txt' into table `db_1810`.`teacher_info`"
        val loadBasicSQL = "load data inpath 'hdfs://ns1/input/spark/teacher_basic.txt' into table `db_1810`.`teacher_basic`"
        spark.sql(loadInfoSQL)
        spark.sql(loadBasicSQL)
        //执行关联查询
        val sql =
            """
              |select
              |  i.name,
              |  b.age,
              |  b.married,
              |  b.children,
              |  i.height
              |from `db_1810`.`teacher_info` i
              |left join `db_1810`.`teacher_basic` b on i.name = b.name
            """.stripMargin
        val joinedDF = spark.sql(sql)

        //将结果落地到Hive中的表teacher中
        joinedDF.write.saveAsTable("`db_1810`.`teacher`")
        spark.stop()
    }

5 :sparkstreaming

sparkstreaming多结合kafka一起使用
本次基于kafka的direct方式进行读取

  def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.spark-project").setLevel(Level.WARN)
        if(args == null || args.length < 3) {
            println(
                """Parameter Errors ! Usage: <batchInterval> <groupId> <topicList>
                  |batchInterval    :  作业提交的间隔时间
                  |groupId          :  分组id
                  |topicList        :  要消费的topic列表
                """.stripMargin)
            System.exit(-1)
        }
        val Array(batchInterval, groupId, topicList) = args

        val conf = new SparkConf()
                    .setAppName("SparkStreamingWithDirectKafkaOps")
                    .setMaster("local[*]")

        val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))
        val kafkaParams = Map[String, String](
            "bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
            "group.id" -> groupId,
            //largest从偏移量最新的位置开始读取数据
            //smallest从偏移量最早的位置开始读取
            "auto.offset.reset" -> "smallest"
        )
        val topics = topicList.split(",").toSet
        //基于Direct的方式读取数据
        val kafkaDStream:InputDStream[(String, String)] = KafkaUtils
            .createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topics)

        kafkaDStream.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                println("-------------------------------------------")
                println(s"Time: $bTime")
                println("-------------------------------------------")
                rdd.foreach{case (key, value) => {
                    println(value)
                }}
                //查看rdd的范围
                println("偏移量范围:")
                val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
                for (offsetRange <- offsetRanges) {
                    val topic = offsetRange.topic
                    val parition = offsetRange.partition
                    val fromOffset = offsetRange.fromOffset
                    val untilOffset = offsetRange.untilOffset
                    val count = offsetRange.count()
                    println(s"topic:${topic}, partition:${parition}, " +
                        s"fromOffset:${fromOffset}, untilOffset:${untilOffset}, count:${count}")
                }
            }
        })
        ssc.start()
        //保证持续运行
        ssc.awaitTermination()
    }
2016-11-21 17:46:31 zhaojw_420 阅读数 5440
  • 玩转Linux:常用命令实例指南

    本课程以简洁易懂的语言手把手教你系统掌握日常所需的Linux知识,每个知识点都会配合案例实战让你融汇贯通 。课程通俗易懂,简洁流畅,适合0基础以及对Linux掌握不熟练的人学习; 注意: 1、本课程本月原价99元,截止2月22日前仅需29元!购课就送5门价值300元的编程课! 2、课程内容不仅是大纲上这些,2月底前老师会继续增加10余节课程,届时会恢复原件!现在购买最划算! 3、购课后登陆csdn学院官网,在课程目录页面即可下载课件。 学完即可轻松应对工作中 85% 以上的 Linux 使用场景 【限时福利】 1)购课后按提示添加小助手,进答疑群,还可获得价值300元的编程大礼包! 2)本课程【现在享受秒杀价29元】 3)本月购买此套餐加入老师答疑交流群,可参加老师的免费分享活动,学习最新技术项目经验。 注意: 1)现在购买至少享受70元优惠; 2)购课后添加微信itxy06,发送订单截图领取300元编程礼包。 --------------------------------------------------------------- 这门课程,绝对不会让你觉得亏! 29元=掌握Linux必修知识+社群答疑+讲师社群分享会+300元编程礼包。   人工智能、物联网、大数据时代,Linux正有着一统天下的趋势,几乎每个程序员岗位,都要求掌握Linux。本课程零基础也能轻松入门。   在这门课中,我们保证你能收获到这些 1)快速掌握 Linux 常用命令及配置 2)Linux核心知识点 3) 文件、进程、磁盘、用户管理,以及网络工具命令、文件传输等 4)Vi/Vim编辑器用法  

    5417 人正在学习 去看看 严宇
import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }

    /**
     * 对于所有的spark程序所言,要进行所有的操作,首先要创建一个spark上下文。
     * 在创建上下文的过程中,程序会向集群申请资源及构建相应的运行环境。
     * 设置spark应用程序名称
     * 创建的 sarpkContext 唯一需要的参数就是 sparkConf,它是一组 K-V 属性对。
     */
    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);

    /**
     * 利用textFile接口从文件系统中读入指定的文件,返回一个RDD实例对象。
     * RDD的初始创建都是由SparkContext来负责的,将内存中的集合或者外部文件系统作为输入源。
     * RDD:弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD 的生成只有两种途径,
     * 一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 Map、Filter、Join,等等。
     * textFile()方法可将本地文件或HDFS文件转换成RDD,读取本地文件需要各节点上都存在,或者通过网络共享该文件
     *读取一行
     */
    JavaRDD<String> lines = ctx.textFile(args[0], 1);
    /**
     *
     * new FlatMapFunction<String, String>两个string分别代表输入和输出类型
     * Override的call方法需要自己实现一个转换的方法,并返回一个Iterable的结构
     *
     * flatmap属于一类非常常用的spark函数,简单的说作用就是将一条rdd数据使用你定义的函数给分解成多条rdd数据
     * 例如,当前状态下,lines这个rdd类型的变量中,每一条数据都是一行String,我们现在想把他拆分成1个个的词的话,
     * 可以这样写 :
     */ 
    //flatMap与map的区别是,对每个输入,flatMap会生成一个或多个的输出,而map只是生成单一的输出
    //用空格分割各个单词,输入一行,输出多个对象,所以用flatMap
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String s) {
        return Arrays.asList(SPACE.split(s)).iterator();
      }
    });
    /**
     * map 键值对 ,类似于MR的map方法
     * pairFunction<T,K,V>: T:输入类型;K,V:输出键值对
     * 表示输入类型为T,生成的key-value对中的key类型为k,value类型为v,对本例,T=String, K=String, V=Integer(计数)
     * 需要重写call方法实现转换
     */
    JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
        //scala.Tuple2<K,V> call(T t)
        //Tuple2为scala中的一个对象,call方法的输入参数为T,即输入一个单词s,新的Tuple2对象的key为这个单词,计数为1
      @Override
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    });
    //A two-argument function that takes arguments 
    // of type T1 and T2 and returns an R. 
    /**
     * 调用reduceByKey方法,按key值进行reduce
     *  reduceByKey方法,类似于MR的reduce
     *  要求被操作的数据(即下面实例中的ones)是KV键值对形式,该方法会按照key相同的进行聚合,在两两运算
     *  /若ones有<"one", 1>, <"one", 1>,会根据"one"将相同的pair单词个数进行统计,输入为Integer,输出也为Integer
     *输出<"one", 2>
     */ 
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
        //reduce阶段,key相同的value怎么处理的问题 
        @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });
    //备注:spark也有reduce方法,输入数据是RDD类型就可以,不需要键值对, 
    // reduce方法会对输入进来的所有数据进行两两运算 

    /**
     * collect方法其实之前已经出现了多次,该方法用于将spark的RDD类型转化为我们熟知的java常见类型
     */ 
    List<Tuple2<String, Integer>> output = counts.collect();
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
    }
    ctx.stop();
  }
}

Spark RDD操作总结

阅读数 762

没有更多推荐了,返回首页