精华内容
下载资源
问答
  • I am writing a spark/scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can get this to work for writing to the local file system but wondered if there was ...

    I am writing a spark/scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can get this to work for writing to the local file system but wondered if there was a way to to write the output files to a distributed file system such as HDFS. Code is shown below`

    import java.util.zip.ZipInputStream

    import org.apache.spark.input.PortableDataStream

    import java.io._

    var i =1

    sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {

    val zipStream = new ZipInputStream(file._2.open)

    val entry = zipStream.getNextEntry

    val iter = scala.io.Source.fromInputStream(zipStream).getLines

    val fname = f"/d/tmp/myfile$i.txt"

    i = i + 1

    val xx = iter.mkString

    val writer = new PrintWriter(new File(fname))

    writer.write(xx)

    writer.close()

    iter

    }).collect()

    `

    解决方案

    You can easy write data to HDFS using hadoop-common library (if you are using sbt as dependency manangement tool, add thath library to your dependency). With that you can create a FileSystem object :

    private val fs = {

    val conf = new Configuration()

    FileSystem.get(conf)

    }

    Be sure to configure the FileSystem with your hadoop cluster information (core-site.xml, etc)

    Then you can write, for example a String to path (in your case you should deal with streams), on HDFS as following:

    @throws[IOException]

    def writeAsString(hdfsPath: String, content: String) {

    val path: Path = new Path(hdfsPath)

    if (fs.exists(path)) {

    fs.delete(path, true)

    }

    val dataOutputStream: FSDataOutputStream = fs.create(path)

    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))

    bw.write(content)

    bw.close

    }

    展开全文
  • Spark HadoopRDD读取HDFS文件

    万次阅读 2018-12-10 00:05:00
    Spark HadoopRDD读取HDFS文件 更多资源 SPARK 源码分析技术分享(bilibilid视频汇总套装视频): https://www.bilibili.com/video/av37442139/ github: https://github.com/opensourceteams/spark-scala-maven csdn...

    Spark HadoopRDD读取HDFS文件

    更多资源

    bilibili 视频说明

    width="800" height="500" src="//player.bilibili.com/player.html?aid=37442139&cid=66303785&page=28" scrolling="no" border="0" allowfullscreen="true">

    前置条件

    • Hadoop版本: Hadoop 2.6.0-cdh5.15.0
    • Spark版本: SPARK 1.6.0-cdh5.15.0

    概述

    • 源码分析Spark HadoopRDD是如何读取HDFS上的文件
    • 分析HadoopRDD预分区的计算方式,非首个分区的开始位置计算
    • 来三种情况分析,不同情部下HadoopRDD的分区计算方式

    HDFS数据文件

    a b k l j
    c a n m o
    

    HDFS 数据文件图解

    在这里插入图片描述

    HDFS 数据文件图解(对比)

    图一

    在这里插入图片描述

    图二

    在这里插入图片描述

    断点位置

    • org.apache.hadoop.mapred.LineRecordReader 241行, 246行, 248行,136行

    HadoopRDD partition预划分方式(实际会有小的调整)

    • 每个partition的长度= 文件的总长度 / 最小的分区数(默认分区数为2) //注意,是除,结果会取整, 即 goalSize = totalSize / numSplits
    • 示例中每个partition的长度 = 20 / 2 =10 // 即为10个byte
    • 然后依次从0开始划分10个byte长度为一个partition,最后一个小于等于10个byte的为最后一个partition
    • 所以 parition(0) = hdfs文件(0 + 10) //即从文件偏移量为0开始,共10byte,0 <= 值 < 10
    • 所以 parition(1) = hdfs文件(10 + 10) //即从文件偏移量为10开始,共10byte,10 <= 值 < 20
    • 即 partition(i) = hdfs文件( i * goalSize + 10 )

    HadoopRDD partition划分原理

    • 由于需要考虑,每个partition谁先执行是不确定的,所以每个partition执行时,都需要可明确计算当前partition的数据范围
    • 由于直接按partition预划分方式,会把有的一行数据拆分,有些场景不适合(如钱金额,词组一般都不希望被拆分,所以一般按行拆分)
    • 所以需要按行做为最小的数据划分单元,来进行partition的数据范围划分
    • HadoopRDD是这样划分的partition,还是按partition预划分方式进行预先划分,不过在计算时会进行调整
    • 对于首个partition,也就是partition(0),分区数据范围的开始位置就是从0开始(0 + goalSize )
    • 对于非首个partition,的开始位置需要从新计算,从预划分的当前partition的开始位置开始找第一个换行符位置(indexNewLine),当前partition的开始位置为= indexNewLine + 1,长度还是goalSize
    • 对于首个partition一定能分到数据(只要HDFS文件有数据)
    • 非首个partition,有可能分不到数据的情况,分不到数据的情况,就是数据被上一个partition划分完了

    partition分不到数据(以下情况同时满足)

    • 是非首个partition,也就是不是partition为索引为0
    • partition从预分区开始位置往后读到的第一个换行符大于等于预分区的结束位置
      (或者该partition就没有一个换行符)

    源码分析

     override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
        val iter = new NextIterator[(K, V)] {
    
          val split = theSplit.asInstanceOf[HadoopPartition]
          logInfo("Input split: " + split.inputSplit)
          val jobConf = getJobConf()
    
          val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
    
          // Sets the thread local variable for the file's name
          split.inputSplit.value match {
            case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
            case _ => SqlNewHadoopRDDState.unsetInputFileName()
          }
    
          // Find a function that will return the FileSystem bytes read by this thread. Do this before
          // creating RecordReader, because RecordReader's constructor might read some bytes
          val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
            split.inputSplit.value match {
              case _: FileSplit | _: CombineFileSplit =>
                SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
              case _ => None
            }
          }
          inputMetrics.setBytesReadCallback(bytesReadCallback)
    
          var reader: RecordReader[K, V] = null
          //返回TextInputFormat对象
          val inputFormat = getInputFormat(jobConf)
          HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
            context.stageId, theSplit.index, context.attemptNumber, jobConf)
          //实例化对象 org.apache.hadoop.mapred.LineRecordReader
          //new LineRecordReader()实例方法中, 并且会重新计算当前partition的开始位置(与预分区的会有出入)
          reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
    
          // Register an on-task-completion callback to close the input stream.
          context.addTaskCompletionListener{ context => closeIfNeeded() }
          val key: K = reader.createKey()
          val value: V = reader.createValue()
    
          override def getNext(): (K, V) = {
            try {
              //调用 org.apache.hadoop.mapred.LineRecordReader.next()方法
              finished = !reader.next(key, value)
            } catch {
              case _: EOFException if ignoreCorruptFiles => finished = true
            }
            if (!finished) {
              inputMetrics.incRecordsRead(1)
            }
            //返回当前一对(key,value)对应的值
            (key, value)
          }
    
          override def close() {
            if (reader != null) {
              SqlNewHadoopRDDState.unsetInputFileName()
              // Close the reader and release it. Note: it's very important that we don't close the
              // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
              // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
              // corruption issues when reading compressed input.
              try {
                reader.close()
              } catch {
                case e: Exception =>
                  if (!ShutdownHookManager.inShutdown()) {
                    logWarning("Exception in RecordReader.close()", e)
                  }
              } finally {
                reader = null
              }
              if (bytesReadCallback.isDefined) {
                inputMetrics.updateBytesRead()
              } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
                         split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
                // If we can't get the bytes read from the FS stats, fall back to the split size,
                // which may be inaccurate.
                try {
                  inputMetrics.incBytesRead(split.inputSplit.value.getLength)
                } catch {
                  case e: java.io.IOException =>
                    logWarning("Unable to get input size to set InputMetrics for task", e)
                }
              }
            }
          }
        }
        new InterruptibleIterator[(K, V)](context, iter)
      }
    
    • TextInputFormat
    • 返回LineRecordReader
      public RecordReader<LongWritable, Text> getRecordReader(
                                              InputSplit genericSplit, JobConf job,
                                              Reporter reporter)
        throws IOException {
        
        reporter.setStatus(genericSplit.toString());
        String delimiter = job.get("textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter) {
          recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }
        return new LineRecordReader(job, (FileSplit) genericSplit,
            recordDelimiterBytes);
      }
    
    • LineRecordReader
    • 实例方法中,重新定位当前partition的开始位置
    • 如果是partition(0),开始位置是0
    • 如果不是partition(0),开始位置重新计算
    • 调用 in.readLine()方法,等于调用 UncompressedSplitLineReader.readLine(),注意此时传的maxLineLength参数为0
    public LineRecordReader(Configuration job, FileSplit split,
          byte[] recordDelimiter) throws IOException {
        this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
          LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        codec = compressionCodecs.getCodec(file);
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        if (isCompressedInput()) {
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn; // take pos from compressed stream
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, recordDelimiter);
            filePosition = fileIn;
          }
        } else {
          fileIn.seek(start);
    	  //读取文件,定位的文件偏移量为,当前partition预分区的开始位置
          in = new UncompressedSplitLineReader(
              fileIn, job, recordDelimiter, split.getLength());
          filePosition = fileIn;
        }
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        if (start != 0) {
    	//调用 in.readLine()方法,等于调用 UncompressedSplitLineReader.readLine(),
    	//注意此时传的maxLineLength参数为0
    	 //定位当前分区的开始位置,等于预分区的位置 + 读到的第一个换行符的长度
    	 //也就是从当前partition开始位置计算,到读到的第一次换行符,属于上一个partition,在向后位置偏移位置+1,就是当前分区的实时开始位置
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
    
    • HadoopRDD.compute() 重写迭代器getNext()方法
    • 计算下一个(key,value)的值
    • 具体reader.next()方法为 LineRecordReader.next() 方法
          override def getNext(): (K, V) = {
            try {
              finished = !reader.next(key, value)
            } catch {
              case _: EOFException if ignoreCorruptFiles => finished = true
            }
            if (!finished) {
              inputMetrics.incRecordsRead(1)
            }
            (key, value)
          }
    
    • LineRecordReader.next()
    • 遍历当前分区的(key,value)值,就是去计算每个key,对应的值,每计算完一个(key,value)的值后,会把下一个key的索引位置进行更新
    /** Read a line. */
      public synchronized boolean next(LongWritable key, Text value)
        throws IOException {
    
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
    	// getFilePosition() 等于 pos位置
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          key.set(pos);//调置本次的偏移位置
    
          int newSize = 0;
          if (pos == 0) { //第一个partition(0)
            newSize = skipUtfByteOrderMark(value);
          } else {
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if (newSize == 0) {
            return false;
          }
          if (newSize < maxLineLength) {
            return true;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
    
        return false;
      }
    


    • UncompressedSplitLineReader.readLine()
    • 调用LineReader.readLine()方法
    @Override
      public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
          throws IOException {
        int bytesRead = 0;
        if (!finished) {
          // only allow at most one more record to be read after the stream
          // reports the split ended
          if (totalBytesRead > splitLength) {
            finished = true;
          }
    
          bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
        }
        return bytesRead;
      }
    
    • LineReader.readLine()方法
    • 调用 LineReader.readDefaultLine()方法
    /**
       * Read one line from the InputStream into the given Text.
       *
       * @param str the object to store the given line (without newline)
       * @param maxLineLength the maximum number of bytes to store into str;
       *  the rest of the line is silently discarded.
       * @param maxBytesToConsume the maximum number of bytes to consume
       *  in this call.  This is only a hint, because if the line cross
       *  this threshold, we allow it to happen.  It can overshoot
       *  potentially by as much as one buffer length.
       *
       * @return the number of bytes read including the (longest) newline
       * found.
       *
       * @throws IOException if the underlying stream throws
       */
      public int readLine(Text str, int maxLineLength,
                          int maxBytesToConsume) throws IOException {
        if (this.recordDelimiterBytes != null) {
          return readCustomLine(str, maxLineLength, maxBytesToConsume);
        } else {
          return readDefaultLine(str, maxLineLength, maxBytesToConsume);
        }
      }
    
    
    • LineReader.readDefaultLine()方法
    • 具体计算partition的开始位置的方法
    • 注意,此时传过来的maxLineLength参数值为0,也就是先不实际读取数据放到(key,value)的value中
    • 调用 UncompressedSplitLineReader.fillBuffer()方法,实际读取HDFS上的文件
    /**
       * Read a line terminated by one of CR, LF, or CRLF.
       * 当maxLineLength=0时,也就是partition不为0时,定位开始位置的时候,该方法会读取到
       */
      private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
      throws IOException {
        /* We're reading data from in, but the head of the stream may be
         * already buffered in buffer, so we have several cases:
         * 1. No newline characters are in the buffer, so we need to copy
         *    everything and read another buffer from the stream.
         * 2. An unambiguously terminated line is in buffer, so we just
         *    copy to str.
         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
         *    in CR.  In this case we copy everything up to CR to str, but
         *    we also need to see what follows CR: if it's LF, then we
         *    need consume LF as well, so next call to readLine will read
         *    from after that.
         * We use a flag prevCharCR to signal if previous character was CR
         * and, if it happens to be at the end of the buffer, delay
         * consuming it until we have a chance to look at the char that
         * follows.
         */
        str.clear();
        int txtLength = 0; //tracks str.getLength(), as an optimization
        int newlineLength = 0; //length of terminating newline
        boolean prevCharCR = false; //true of prev char was CR
        long bytesConsumed = 0;
        do {
          int startPosn = bufferPosn; //starting from where we left off the last time
          if (bufferPosn >= bufferLength) {
            startPosn = bufferPosn = 0;
            if (prevCharCR) {
    		//bytesConsumed:总计读取的数据长度(包括换行符)
              ++bytesConsumed; //account for CR from previous read
            }
    	    /**
    		 * 实际读取HDFS文件的方法
    		 * buffer:缓冲区
    		 * bufferLength : 这一次读到的数据长度
    		   
    		 */
            bufferLength = fillBuffer(in, buffer, prevCharCR);
            if (bufferLength <= 0) {
              break; // EOF
            }
          }
    	  //对读到的buffer数组数据进行遍历,找找第一个换行符
    	  // bufferPosn: 读到换行符时的位置(索引),同一个分区中这个值是会保存的
          for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
            if (buffer[bufferPosn] == LF) {
    		//调试时prevCharCR = false, 当找到换行符\n时,newlineLength=1
              newlineLength = (prevCharCR) ? 2 : 1;
              ++bufferPosn; // at next invocation proceed from following byte
              break;
            }
            if (prevCharCR) { //CR + notLF, we are at notLF
              newlineLength = 1;
              break;
            }
    		//在linux平台测试数据中没看到等于\r的,也就是调试prevCharCR一直等于false
            prevCharCR = (buffer[bufferPosn] == CR);
          }
          int readLength = bufferPosn - startPosn;//这一次读取的数据长度(包括换行符)
          if (prevCharCR && newlineLength == 0) {
            --readLength; //CR at the end of the buffer
          }
    	  //总计读取的数据长度(包括换行符)
          bytesConsumed += readLength;
    	  //这一次读取的数据长度(不包括换行符)
          int appendLength = readLength - newlineLength;
          if (appendLength > maxLineLength - txtLength) {
    	  //如果读到的数据长度,大于最大长度限制,做个控制
    	  //如果maxLineLength=0, txtLength =0 时,此时是不需要读数据的,就给appendLength赋值为0
            appendLength = maxLineLength - txtLength;
          }
          if (appendLength > 0) {
    	     //如果计算appendLength >0 时,把值赋值给str,也就是我们读到的值
            str.append(buffer, startPosn, appendLength);
    		//txtLength变量累加每次实际读到的长度(不包括换行符)
            txtLength += appendLength;
          }
    	  //循环条件,是没有读到换行符,并且
        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
    
        if (bytesConsumed > Integer.MAX_VALUE) {
          throw new IOException("Too many bytes before newline: " + bytesConsumed);
        }
        return (int)bytesConsumed;
      }
    
    • UncompressedSplitLineReader.fillBuffer()方法
    protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
          throws IOException {
        int maxBytesToRead = buffer.length; //缓冲的大小,默认为64KB
    	//splitLength 当前partition的预分区大小(长度)
    	// totalBytesRead 当前partitition总共读取了的数据长度
        if (totalBytesRead < splitLength) {
    	   //说明当前partition预分区长度还没有读完,还需要继续读取剩下的长度
          long leftBytesForSplit = splitLength - totalBytesRead;
          // check if leftBytesForSplit exceed Integer.MAX_VALUE
          if (leftBytesForSplit <= Integer.MAX_VALUE) {
    	    //做个比较,当前分区剩余的长度小于等于Integer.MAX_VALUE),取64KB默认长度和实际长度的一个小的值
            maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit);
          }
        }
    	//实际读取的数据长度
        int bytesRead = in.read(buffer, 0, maxBytesToRead);
    
        // If the split ended in the middle of a record delimiter then we need
        // to read one additional record, as the consumer of the next split will
        // not recognize the partial delimiter as a record.
        // However if using the default delimiter and the next character is a
        // linefeed then next split will treat it as a delimiter all by itself
        // and the additional record read should not be performed.
        if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
          if (usingCRLF) {
            needAdditionalRecord = (buffer[0] != '\n');
          } else {
            needAdditionalRecord = true;
          }
        }
        if (bytesRead > 0) {
    	//读到了数据,当前partitition读到的总数据长度做个累加
          totalBytesRead += bytesRead;
        }
        return bytesRead;
      }
    
    展开全文
  • 这里展示的文件读写方式,都是针对dataFrame数据结构的,也就是文件读进来之后,是一个spark dataFrame。 1、读写hdfs上的文件 1.1 读写hdfs上的文件 ——> 按照指定文件格式读取与保存 SparkSession在读取文件时...

    spark有3种数据结构——RDD、DataFrame、DataSet。这里展示的文件读写方式,都是针对dataFrame数据结构的,也就是文件读进来之后,是一个spark dataFrame。

    0、首先连接spark

    from pyspark.sql import SparkSession # SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession
    spark=SparkSession \
            .builder \
            .appName('my spark task') \
            .getOrCreate()
    

    1、读写hdfs上的文件

    SparkSession在读取【hdfs上文件】的时候,可以指定读取文件的格式,支持的文件格式有json, parquet, jdbc, orc, libsvm, csv, text。

    1.1 读写hdfs上的文件 ——> 读写csv、json、text文件

    【读取】:按照csv文件格式,读取文件

    其余的文件格式只需将csv变成相应的文件格式名称即可

    from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
    # 文件在hdfs上的位置
    file_path = r"/user/lanyue/data.csv"  
    # 指定文件的schema
    schema = StructType([
            StructField("column_1", StringType(), True), # nullable=True代表此字段的值不为空
            StructField("column_2", StringType(), True),
            StructField("column_3", StringType(), True),
            ])
    
    # 方法一
    # 推荐这种,指定什么文件格式都可以,只需要修改参数format即可
    # 不同的格式其load函数会有不同,用的时候请自行搜索。
    df = spark.read.format("csv").load(file_path, header=True, inferSchema=True, encoding="utf-8", sep=',') 
    # sep=',',表示指定分隔符为逗号,同参数delimiter。
    # header=TRUE,表示数据的第一行为列名
    # inferSchema,表示是否对字段类型进行推测。=False,默认读取后都按照文本字符处理。=True表示自动推断schema。
    
    # 或者下面这种形式。这两种形式都可以
    df = spark.read.format("csv").option("encoding","utf-8").option("header",True).load(file_path, schema=schema)  # 使用指定的schema
    
    
    
    # 方法二
    df = spark.read.csv(file_path, encoding='utf-8', header=True, inferSchema=True) 
    df = spark.read.csv(file_path, encoding='utf-8', header=True, schema=schema) 
    # 如果想指定文件格式是json,那就是spark.read.json,其他类似
    
    

    【保存】:以csv文件的格式保存,注意:是保存在hdfs上,不是保存在本地

    其余的文件格式只需将csv变成相应的文件格式名称即可

    # 保存在【hdfs上】,以csv文件的格式。指定什么文件格式都可以,只需要修改参数format即可
    df.write.mode('append').format("csv").repartition(1).option("encoding","utf-8").option("header",True).save("/lanyue/data.csv") 
    # mode,保存模式:ovewriter重写、append文件末尾追加、error如果文件存在抛出异常、ignore如果文件存在忽略不更新
    # repartition, 在yarn模式下,Spark会根据hdfs文件的块数据大小来划分默认的分区数目,但是我们也可以自己设置分区数目,使用参数repartition。=1表示只保存成一个数据块
    
    # 或者
    df.write.csv("/lanyue/data.csv", sep="\t", encoding="utf-8", mode='overwrite') 
    # 如果想指定文件格式是json,那就是df.write.json,其他类似
    
    

    【注意】该保存方法有一个需要注意的地方:传输的文件名,是一个在hdfs上的文件路径,而不是单独一个文件(Spark会根据hdfs文件的块数据大小来划分默认的分区数目,然后保存成多个数据块。除非设置repartition=1,保存成一个数据块)。比如存为csv文件格式,在Linux shell中查看其在hdfs上的情况的时候,需要执行hdfs dfs -ls /lanyue/data.csv/*

    1.2、读写hdfs上的文件 ——> 按照指定分隔符来读取文件

    有的时候,文件中的数据,其分隔符是自定义的,如何指定分隔符来读取文件呢?

    可以使用读取csv的格式来读,虽然文件不是csv格式,但是通过spark.read.format(“csv”)的api指定分隔符,从而实现指定分隔符来读取文件的目的。

    df = spark.read.format("csv").load(file_path, header=True, inferSchema=True, encoding="utf-8", sep=',') 
    # 通过指定参数sep,来指定分隔符,可以是",", "\t","\x01"等。同参数delimiter。
    

    1.3、读写hdfs上的文件 —— 读写hive中的表

    hive中的表,其本质上,也是保存在hdfs上的文件,其文件格式是orc或者parquet

    【读】:从hive表中读取数据

    df = spark.sql("select * from hive_table_name")
    df.show()
    

    【写】:保存成hive表

    # spark-dataframe 保存成hive表
    df.write.format("orc").mode("append").partitionBy("dt").saveAsTable(table_name)
    # partitionBy,设置分区。如果没有分区,直接去掉该参数即可
    # mode="append",以追加的形式写入
    
    # 直接表to表 
    spark.sql = """
    	insert overwrite table databases.hive_table_name_2
        partition (saledate) 
        select *  
        from databases.hive_table_name_1
    """
    

    1.4 读写hdfs上的文件 —— 读写mysql中的表

    MySQL中的表,其文件存储格式是jdbc

    【读】:读取MySQL中的表

    df = spark.read.format("jdbc")
          .option("url",url) # database地址,格式为jdbc:mysql://主机:端口/数据库
          .option("dbtable","table_name") # 表名
          .option("user",user)
          .option("password",password)
          .option("driver",driver)
          .load()
          
    # 或者以下形式
    df = spark.read.format('jdbc').options(url="jdbc:mysql://host:port/database", # database地址
    									 driver="com.mysql.jdbc.Driver",
                                         dbtable="table_name", 
                                         user="XXX",
                                         password="XXX").load()
    
    # 或者以下形式
    # mysql的相关配置
    prop = {'user': 'xxx', 
            'password': 'xxx', 
            'driver': 'com.mysql.jdbc.Driver'}
    url = 'jdbc:mysql://host:port/database' # database地址
    df = spark.read.jdbc(url=url, table='mysql_table_name', properties=prop)
    

    【写】:保存成MySQL表

    prop = {'user': 'xxx', 
            'password': 'xxx', 
            'driver': 'com.mysql.jdbc.Driver'}
    url = 'jdbc:mysql://host:port/database' # database地址
    df.write.jdbc(url=url, table='table_name', mode='append', properties=prop)
    
    # 或者以下形式
    df.write.format("jdbc")
      .option("url","jdbc:mysql://host:port/database") # database地址
      .option("dbtable","table_name")
      .option("user",user)
      .option("password",password)
      .option("driver",driver).mode("overwrite")
      .save()
    
    展开全文
  • spark读取hdfs文件是怎么分区的,读取代码如下: val df = sc.textFile("data/wc.txt",3) 一.分析 spark读取hdfs文件分区跟hadoop的分区完全相同,因为底层使用的就是Hadoop的TextInputFormat,考虑两内容: ...

    spark读取hdfs的文件是怎么分区的,读取代码如下:

    val df = sc.textFile("data/wc.txt",3)

    一.分析

    spark读取hdfs的文件分区跟hadoop的分区完全相同,因为底层使用的就是Hadoop的TextInputFormat,考虑两内容:

    1)关于文件分区数量计算:

    指定的预分区数量是最小分区数量,如:代码中的参数3。

    真正的分区计算: 每个分区字节数 = 文件字节数/预分区数量

    如果没有整除,判断余数是否大于分区字节数 * 0.1,如果大于则会新增一个分区,剩余的放在这个分区。否则不会新加分区,把余数放到最后一个分区里。

    2)分区数据如何读取:

    分区读取数据是按行读取,但是会考虑文件的偏移量(offset)的设置。虽然第一个分区字节数不包含一整行,但是会读取一整行。当某个分区的偏移量全被之前的分读走了,这个分区就是空的。

    注意:

    1.当位移量读取了回撤换行,会把下一行的数据也会读取。

    2.当读取多个文件时,会把所有文件字节加起来计算分区,但是读取的时候不会夸文件读取。

     

    二.代码分析

    1)读取文件数据时,数据是按照Hadoop文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体Spark核心源码如下,FileInputFormat类中的方法:

    public InputSplit[] getSplits(JobConf job, int numSplits)

        throws IOException {

     

        long totalSize = 0; // compute total size

        for (FileStatus file: files) {// check we have valid files

          if (file.isDirectory()) {

            throw new IOException("Not a file: "+ file.getPath());

          }

          totalSize += file.getLen();

        }

     

        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.

          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

          

        ...

        

        for (FileStatus file: files) {

        

            ...

        

        if (isSplitable(fs, path)) {

              long blockSize = file.getBlockSize();

              long splitSize = computeSplitSize(goalSize, minSize, blockSize);

     

              ...

     

      }

      protected long computeSplitSize(long goalSize, long minSize,long blockSize) {

        return Math.max(minSize, Math.min(goalSize, blockSize));

     

     
    2) 分区数据读取的代码LineReader
    public LineRecordReader(Configuration job, FileSplit split,
        byte[] recordDelimiter) throws IOException {
      this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
      start = split.getStart();
      end = start + split.getLength();
      final Path file = split.getPath();
      compressionCodecs = new CompressionCodecFactory(job);
      codec = compressionCodecs.getCodec(file);
    
      // open the file and seek to the start of the split
      final FileSystem fs = file.getFileSystem(job);
      fileIn = fs.open(file);
      if (isCompressedInput()) {
        decompressor = CodecPool.getDecompressor(codec);
        if (codec instanceof SplittableCompressionCodec) {
          final SplitCompressionInputStream cIn =
            ((SplittableCompressionCodec)codec).createInputStream(
              fileIn, decompressor, start, end,
              SplittableCompressionCodec.READ_MODE.BYBLOCK);
          in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
          start = cIn.getAdjustedStart();
          end = cIn.getAdjustedEnd();
          filePosition = cIn; // take pos from compressed stream
        } else {
          in = new SplitLineReader(codec.createInputStream(fileIn,
              decompressor), job, recordDelimiter);
          filePosition = fileIn;
        }
      } else {
        fileIn.seek(start);
        in = new UncompressedSplitLineReader(
            fileIn, job, recordDelimiter, split.getLength());
        filePosition = fileIn;
      }
    

    三案例分析

    案例:读取文件重新分区,再写入到文件
    代码:
    val conf = new Configuration()
    val fs = FileSystem.getLocal(conf)
    
    fs.delete(new Path("data/wc.out2"),true)
    
    val df = sc.textFile("data/wc.txt",3)
    
    df.saveAsTextFile("data/wc.out2")
     
     文件内容:
       d
       a
       y
       u
       e
       e
       t
     1)计算分区:
     总字节数:14;指定预分区为3.
     目标分区大小: goalSize = 14/3 =4, 余2; 2/4>0.1,所以会生成4个分区,每个分区大小是4个字节。
    每个分区读取偏移量入下:
       0,4
       4,8
       8,12
       12, 13
    2)分区读取内容
     按行读取
     文件内容的offset为:
     0 1
     2 3
     4 5
     6 7
     8 9
     10 11
     12
    
      第1个分区读取:0到4,读取前3行 ;注意:当位移量偏过换行符时,会把下一行的数据也会读取了
      第2个分区读取:4到8,因为4,5被读去了,从来6开始读到9。(因为按行读取所以读到9)
      第3个分区读取:8到12,因为8,9被读去了,从来10开始读到12。
      第4个分区读取:12到13 所以为
    

     

    四.总结

    spark读取hdfs文件分区比较复杂,需要仔细研究研究。

    展开全文
  • spark读取hdfs文件的时候,.tmp文件不是我们需要的,需要进行过滤 import org.apache.hadoop.fs.{Path, PathFilter} /** * filter temp files in hdfs dirs * eg: 1234556.tmp */ class FilterTmpPath ...
  • 使用Spark读取HDFS文件时没有什么问题,只要把读取的路径设置为HDFS文件系统中文件的位置即可,可是在读取本地文件时(比如读取E:/file/user.json,E盘中的user.json文件时却总是报错)。 先展示正确的操作方法, ...
  • package spark.SparkStreaming.file import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Description:统计...
  • 利用python将数据保存到hadoop的hdfs目录下,利用spark读取hdfs目录下内容,然后做sql统计。
  • 在Hadoop集群上配置的Spark如果直接读取文件默认的是读取HDFS上的文件,那么如果想要读取本地的文件就不能直接写还需要说明如下: from pyspark.sql import SparkSession def CreateSparkContext(): # 构建...
  • Spark读取HDFS文件夹时, 其中的小文件过多会降低程序性能,也给HDFS带来压力。 当Spark读取文件时, 会为每个文件的数据分配一个Partition, 可以使用coalesce算子减少rdd中的分区。 首先, 编写函数实现根据...
  • 一直以来都没搞懂spark读hdfs文件到底是怎么确定分区数的,分区数如果超过了spark的并行度怎么办。...例如:如果读取的一个hdfs文件大小为1280MB,可能是存储为10块,那么spark读取这个文件的分区数就是.
  • sparkspark SQL 读取HDFS中的文件 处理数据 在IDEA中: 导入 spark-core_2.11的依赖包 版本和安装spark的版本相同 Object ConnTest{ def main(args: Array[String]): Unit = { val conf = SparkConf()....
  • 解决了数据本地性问题,Locality Level 显示为NODE_LOCAL,但是通过spark读取hdfs文件存入hbase还是很慢,不知道为什么
  • spark-shell命令读取HDFS文件文件不存在,读取本地文件是存在的
  • Spark 读取HDFS文件

    千次阅读 2014-06-19 18:00:05
    scala> val distFile = sc.textFile("hdfs://localhost:54310/data/in/log")
  • Spark读取HDFS文件

    千次阅读 2016-05-17 22:02:32
    import org.apache.spark._ import SparkContext._object WordCount { def main(args: Array[String]){ if(args.length != 3){ println("usage: com.qiurc.test.WordCount <master> <input> <outpu
  • SPARK读取HDFS文件

    2021-03-25 16:20:30
    sc.textFile("hdfs://polo:9000/sparktest/data.txt");//polo是你的主机名
  • Spark读取HDFS文件代码示例如下 package demo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object _00wcHDFS { def main(args: Array[String]): Unit = { val conf: ...
  • Spark读取HDFS或者AFS等文件系统文件

    千次阅读 2018-12-30 23:49:21
    Spark读取HDFS或者AFS等文件系统文件 Spark读取文件有很多方法,我这里主要介绍一下读取非结构化的文件的两种方式,针对多文件读取,单文件读取也是一样的。 方案一:spark的textFile方法,也是最简单的方案,支持...
  • file:///home/learning-spark/files/ham.txt" In [11]: input = hiveCtx.read.json(inputFile) 19/01/23 17:12:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 21...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 35,225
精华内容 14,090
关键字:

hdfsspark文件读取