rdd保存到文件 spark

2018-04-12 16:48:45 yeqingyun2012 阅读数 1326


JavaRDD<String> product = ....;

product.repartition(1).saveAsTextFile(outPutPath1);

2018-06-05 19:22:58 kwame211 阅读数 2172

Spark是当前最流行的分布式数据处理框架之一,相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录),Spark就无能为力了。

        有网友给出建议,用

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")
  • 1
  • 2

把数据合并到一个分区中,然而得到的结果是这样的:

$ ./bin/hadoop fs -du -h test/test.txt
0        test/test.txt/_SUCCESS
499.9 M  test/test.txt/part-00000
  • 1
  • 2
  • 3
  • 4

Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中。

       Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于Hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.URI;
/**
 * 使用Hadoop的FileSystem把数据写入到HDFS
 */
public class HdfsOperate implements Serializable{

    private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);
    private static Configuration conf = new Configuration();
    private static BufferedWriter writer = null;

    //在hdfs的目标位置新建一个文件,得到一个输出流
    public static void openHdfsFile(String path) throws Exception {
        FileSystem fs = FileSystem.get(URI.create(path),conf);
        writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));
        if(null!=writer){
            logger.info("[HdfsOperate]>> initialize writer succeed!");
        }
    }

    //往hdfs文件中写入数据
    public static void writeString(String line) {
        try {
            writer.write(line + "\n");
        }catch(Exception e){
            logger.error("[HdfsOperate]>> writer a line error:"  ,  e);
        }
    }

    //关闭hdfs输出流
    public static void closeHdfsFile() {
        try {
            if (null != writer) {
                writer.close();
                logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");
            }
            else{
                logger.error("[HdfsOperate]>> closeHdfsFile writer is null");
            }
        }catch(Exception e){
            logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

在Spark中处理并保存数据:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import hdfsoperate.HdfsOperate;
import org.apache.spark.Partition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.NlpModuleWrapper;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * 调用HdfsOperate类的方法把RDD数据保存到Hdfs上
 */
public class FeatureExtractor implements Serializable {
    private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);

    public void extractFeature(JavaSparkContext sc, int repartitionNum) throws Exception {
        String hdfsPath = "test/corpus/2016-09-02"; //存放原始数据的文件
        //Spark可以读取单独的一个文件或整个目录
        JavaRDD<String> rddx = sc.textFile(hdfsPath).repartition(repartitionNum); 
        rddx = rddx.map(new ExtractFeatureMap());

        //写入hdfs文件位置
        String destinationPath = "test/result/2016-09-02" ;
        //创建Hdfs文件,打开Hdfs输出流
        HdfsOperate.openHdfsFile(destinationPath);

        //分块读取RDD数据并保存到hdfs
        //如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败
        for (int i = 0; i < repartitionNum; i++) {
            int[] index = new int[1];
            index[0] = i;
            List<String>[] featureList = rddx.collectPartitions(index);
            if (featureList.length != 1) {
                logger.error("[FeatureExtractor]>> featureList.length is not 1!");
            }
            for (String str : featureList[0]) {
                //写一行到Hdfs文件
                HdfsOperate.writeString(str);
            }
        }
        //关闭Hdfs输出流
        HdfsOperate.closeHdfsFile();

    }



    class ExtractFeatureMap implements Function<String, String> {
        @Override
        public String call(String line) throws Exception {
            try {
                //TODO:你自己的操作,返回String类型
            } catch (Exception e) {
                logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);
            }
            return null;
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

(PS:目前还没有看到过单用Spark接口能实现该功能,有知道的大神欢迎指点)

2019-09-09 16:42:09 Zsigner 阅读数 1108

在该语句之前加上repartition(1),即写作以下形式:
rdd.repartition(1).saveAsTextFile("out.txt")即,之前每个partition保存成一个txt文件,现repartition成一个分区,然后再保存。

注意(未验证):这个操作对于大文件来说,repartition(1)可能会导致某一个节点内存不足,超出的部分会临时存放在硬盘中,影响saveAsTextFile存储速度。

验证结果:

合并生成一个part文件,最后的文件还不是txt

2017-06-28 17:47:00 weixin_34162695 阅读数 789

相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录)

 

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")

 

把分区设置成1个 结果是Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中

问题:如何让spark将Rdd结果输出到一个文件而不是目录中呢?

Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:

 

package com.ys.penspark.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;

/**
 * @ClassName: HdfsOperate
 * @Description:
 * @Author: Administrator
 * @Date: 2017/6/28
 */
public class HdfsOperate implements Serializable {
    private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class);
    private static Configuration conf = new Configuration();
    private static BufferedWriter writer = null;

    //在hdfs的目标位置新建一个文件,得到一个输出流
    public static void openHdfsFile(String path) throws Exception {
        FileSystem fs = FileSystem.get(URI.create(path),conf);
        writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path))));
        if(null!=writer){
            logger.info("[HdfsOperate]>> initialize writer succeed!");
        }
    }

    //往hdfs文件中写入数据
    public static void writeString(String line) {
        try {
            writer.write(line + "\n");
        }catch(Exception e){
            logger.error("[HdfsOperate]>> writer a line error:"  ,  e);
        }
    }

    //关闭hdfs输出流
    public static void closeHdfsFile() {
        try {
            if (null != writer) {
                writer.close();
                logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!");
            }
            else{
                logger.error("[HdfsOperate]>> closeHdfsFile writer is null");
            }
        }catch(Exception e){
            logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e);
        }
    }

}

 

  先将spark的Rdd重新分区,再将每个分区的数据collectPartitions按行写入hdfs文件中

package com.ys.penspark.util;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: FeatureExtractor
 * @Description:
 * @Author: mashiwei
 * @Date: 2017/6/28
 */
public class FeatureExtractor implements Serializable{
    private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);

    public void extractFeature(Dataset<Row> s, int repartitionNum,String out) throws Exception {

        StringBuffer sb = new StringBuffer();
        for (int i = 0; i<= s.schema().fieldNames().length-1;i++) {
            sb.append(s.schema().fieldNames()[i]);
            if (i == s.schema().fieldNames().length-1){
               break;
            }
            sb.append(",");
        }

        s.show();

        JavaRDD<String> rddx = s.toJavaRDD().map(new ExtractFeatureMap()).repartition(repartitionNum);

        //写入hdfs文件位置
//        String destinationPath = "/kettle/penspark/data.txt" ;
        //创建Hdfs文件,打开Hdfs输出流
        HdfsOperate.openHdfsFile(out);
        HdfsOperate.writeString(sb.toString());
        //分块读取RDD数据并保存到hdfs
        //如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败
        for (int i = 0; i < repartitionNum; i++) {
            int[] index = new int[1];
            index[0] = i;
//            List<String>[] featureList = rddx.collectPartitions(index);
//            List<String> strs = rddx.collect();
            List<String>[] featureList = rddx.collectPartitions(index);
            if (featureList.length != 1) {
                logger.error("[FeatureExtractor]>> featureList.length is not 1!");
            }
            for (String str : featureList[0]) {
                //写一行到Hdfs文件
                logger.info("-----"+str);
                HdfsOperate.writeString(str);
            }
        }
        //关闭Hdfs输出流
        HdfsOperate.closeHdfsFile();

    }
    class ExtractFeatureMap implements Function<Row, String> {


        @Override
        public String call(Row line) throws Exception {
            try {
                StringBuffer sb = new StringBuffer();
                int len = line.length();
                for (int i = 0; i<= len-1; i++){
                    sb.append(line.get(i).toString());
                    if (i == len-1){
                        break;
                    }
                    sb.append(",");
                }
                return sb.toString();

            } catch (Exception e) {
                logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);
            }

            return null;
        }
    }


    public static void main(String[] args) {

//        SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
//        JavaSparkContext sc= new JavaSparkContext(conf);

        StructType Schemafinal = new StructType();
        Map<String,String> options = new HashMap<String,String>();
        LinkedList<StructField> obj = new LinkedList<StructField>();
        StructField structField = new StructField("name", DataTypes.StringType, true, Metadata.empty());
        StructField structField1 = new StructField("age", DataTypes.StringType, true, Metadata.empty());
//        StructField structField2 = new StructField("字段2", DataTypes.StringType, true, Metadata.empty());
//        StructField structField3 = new StructField("字段3", DataTypes.StringType, true, Metadata.empty());
        obj.add(structField);
        obj.add(structField1);
//        obj.add(structField2);
//        obj.add(structField3);

        Schemafinal = new StructType(obj.toArray(new StructField[obj.size()]));
        SparkConf conf = new SparkConf().setAppName("Example App").setMaster("local[*]");
        options.put("delimiter",",");
        options.put("header","true");
        JavaSparkContext sc = new JavaSparkContext(conf);
        @SuppressWarnings("deprecation")
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession
                .builder()
                .appName("Pentaho Logic as Spark")
                .config("spark.some.config.option", "some-value")
                .config("spark.sql.warehouse.dir", "file:///C:/tmp/")
                .getOrCreate();


        Dataset<Row> tempdf = spark.read()
                .format("com.databricks.spark.csv")
                .options(options)
                .schema(Schemafinal)
                .option("header", true)
                .load("file:///"+"C:\\Users\\Administrator\\Desktop\\测试\\功能开发\\excel.txt");
        tempdf.show();
        FeatureExtractor fx = new FeatureExtractor();
        try {
//            fx.extractFeature(sc,5);
            fx.extractFeature(tempdf,2,"/kettle/tempData.txt");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

  数据

name,age
zs, 44
li, 22
ww, 18

  

 

转载于:https://www.cnblogs.com/xiaoma0529/p/7090912.html

2019-09-03 11:10:00 weixin_30381793 阅读数 49

  写随笔大概也是做笔记记录下自己思考的意思吧,之前有些事情觉得做随笔还是比较有用的,mark一下一个有用的网址

  关于rdd的操作,网上有很多很多的教程,当初全部顺一遍,除了对rdd这个类型有了点概念,剩下具体的方法以及方法的写法已经快忘记了,所以具体还是记一下对某些事情的思考吧。

  关于将rdd保存为文件,我使用的是

import org.apache.spark.{SparkConf, SparkContext}

object Wordcount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("wordcount").setMaster("local")
    val sc=new SparkContext(conf)

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
    //求jion
    val rdd3 = rdd1.union(rdd2)
    //按key进行聚合
    val rdd4 = rdd3.reduceByKey(_ + _)
    rdd4.collect().foreach({println})
    //按value的降序排序
    val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
    rdd5.collect().foreach({println})
    rdd5.saveAsTextFile("aaaaa1")
  }

}

  以上代码,rdd是我通过将两个rdd合并后得到,查看的时候发现rdd5是有两个分区的,输出结果是:

 

  而保存的文件aaaa1则是一个文件夹,所以如果已存在会报文件已存在的错误无法运行(源代码里应该没有做相关的判断和处理),最终结果生成在文件夹中,如下:

 

  具体的数据保存在后两个文件夹中:

  part-00000:

(jerry,5)

 

  part-00001:

(tom,2)
(shuke,2)
(kitty,2)

 

  这是按rdd5里面本身的分区数,各分区内容来生成的,而crc指的是循环冗余,.crc文件是指文件摘要,好像并没有什么特殊的,就是进行crc校验的文件。我本来以为crc和几个分区文件的联系有关系,后来想想spark处理大量数据的时候各分区数据的联系也并不算紧密,基本不会去区分前后顺序,并不需要保存彼此的联系。

  大概发现了两件事情:

1、是小文件产生的大量冗余,有时候这些是没必要的,给读写它带来了一些麻烦,但是这个是有方法解决的,暂时还没有涉猎。

2、分区间不平衡,虽然只是很少条数据,但是之前也有做过某些RDD练习也说明rdd里分区并不会实时帮你平衡各分区的数据数量,但是spark有提供相应的方法,其他的,比如hive可能就不一定了。

 

转载于:https://www.cnblogs.com/liwxmyself/p/11451723.html

Spark中RDD的理解

阅读数 1590