精华内容
下载资源
问答
  • I've searched for some time now and none of the solutions seem to work for me.Pretty straightforward - I want to upload data from my local file system to HDFS using the Java API. The Java program wil....

    I've searched for some time now and none of the solutions seem to work for me.

    Pretty straightforward - I want to upload data from my local file system to HDFS using the Java API. The Java program will be run on a host that has been configured to talk to a remote Hadoop cluster through shell (i.e. hdfs dfs -ls, etc.).

    I have included the below dependencies in my project:

    hadoop-core:1.2.1

    hadoop-common:2.7.1

    hadoop-hdfs:2.7.1

    I have code that looks like the following:

    File localDir = ...;

    File hdfsDir = ...;

    Path localPath = new Path(localDir.getCanonicalPath());

    Path hdfsPath = new Path(hdfsDir.getCanonicalPath());

    Configuration conf = new Configuration();

    conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());

    conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

    Filesystem fs = FileSystem.get(configuration);

    fs.getFromLocalFile(localPath, hdfsPath);

    The local data is not being copied to the Hadoop cluster, but no errors are reported and no exceptions are thrown. I've enabled TRACE logging for the org.apache.hadoop package. I see the following outputs:

    DEBUG Groups:139 - Creating new Groups object

    DEBUG Groups:139 - Creating new Groups object

    DEBUG Groups:59 - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000

    DEBUG Groups:59 - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000

    DEBUG UserGroupInformation:147 - hadoop login

    DEBUG UserGroupInformation:147 - hadoop login

    DEBUG UserGroupInformation:96 - hadoop login commit

    DEBUG UserGroupInformation:96 - hadoop login commit

    DEBUG UserGroupInformation:126 - using local user:UnixPrincipal: willra05

    DEBUG UserGroupInformation:126 - using local user:UnixPrincipal: willra05

    DEBUG UserGroupInformation:558 - UGI loginUser:

    DEBUG UserGroupInformation:558 - UGI loginUser:

    DEBUG FileSystem:1441 - Creating filesystem for file:///

    DEBUG FileSystem:1441 - Creating filesystem for file:///

    DEBUG FileSystem:1290 - Removing filesystem for file:///

    DEBUG FileSystem:1290 - Removing filesystem for file:///

    DEBUG FileSystem:1290 - Removing filesystem for file:///

    DEBUG FileSystem:1290 - Removing filesystem for file:///

    Can anyone assist in helping me resolve this issue?

    EDIT 1: (09/15/2015)

    I've removed 2 of the Hadoop dependencies - I'm only using one now:

    hadoop-core:1.2.1

    My code is now the following:

    File localDir = ...;

    File hdfsDir = ...;

    Path localPath = new Path(localDir.getCanonicalPath());

    Path hdfsPath = new Path(hdfsDir.getCanonicalPath());

    Configuration conf = new Configuration();

    fs.getFromLocalFile(localPath, hdfsPath);

    I was previously executing my application with the following command:

    $ java -jar .jar ...

    Now I'm executing it with this command:

    $ hadoop jar .jar ...

    With these changes, my application now interacts with HDFS as intended. To my knowledge, the hadoop jar command is meant only for Map Reduce jobs packaged as an executable jar, but these changes did the trick for me.

    解决方案

    i am not sure about the approach you are following, but below is one way data can be uploaded to hdfs using java libs :

    //imports required

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FileSystem;

    //some class here .....

    Configuration conf = new Configuration();

    conf.set("fs.defaultFS", );

    FileSystem fs = FileSystem.get(conf);

    fs.copyFromLocalFile(, );

    Also if you have hadoop conf xmls locally, you can include them in you class path. Then hadoop fs details will automatically be picked up at runtime, and you will not need to set "fs.defaultFS" . Also if you are running in old hdfs version you might need to use "fs.default.name" instead of "fs.defaultFS". If you are not sure of the hdfs endpoint, it is usually the hdfs namenode url . Here is example from previous similar question copying directory from local system to hdfs java code

    展开全文
  • 然后添加上传日志的功能到crontab,内容如下: 55 23 * * * /usr/bin/python /usr/local/python/source/hdfsput.py >> /dev/null 2>&1截止目前,数据的分析源已经准备就绪,接下来的工作便是分析了。

    一、场景描述

    比如我们的网站共有5台Web设备,日志文件存放在/data/logs/日期(20180114)/access.log。日志为默认的Nginx定义格式,如下所示:

    10.2.2.234 - - [12/Jan/2018:08:36:23 +0800] "POST /statistics/count/collection?company=6F79078F79D77550739EF61CD0DC2A83&nonce_str=nbmmn8bds4j84spu0c50ed0tm2ihfk3e&timeStamp=1515717383974&sign=91803CDB91BD598F29643F899E529D4A&client=ios&server=statistics HTTP/1.1" 200 287 "-" "CareHeart/2.2.7 (iPhone; iOS 10.3.1; Scale/2.00)"共有12列数据(空格分隔)分别为:

    1)客户端IP

    2)空白(远程登录名称)

    3)空白(认证的远程用户)

    4)请求时间

    5)UTF时差

    6)方法

    7)资源(访问的路径)

    8)协议

    9)状态码

    10)发送的字节数

    11)访问来源

    12)客户端信息(不具体拆分)

    二、编写部署HDFS的客户端

    这里,我们新建脚本文件hdfsput.py,具体通过subprocess.Popen()方法调用HDFS相关外部命令,实现创建HDFS目录及客户端文件上传,具体代码如下:

    【/usr/local/python/source/hdfsput.py】

    # -*- coding:UTF-8 -*-

    '''

    Created on 2018年1月14日

    @author: liuyazhuang

    '''

    import subprocess

    import sys

    import datetime

    webid = "web1" #HDFS存储日志的标志,其他Web服务器分别为web2、web3、web4、web5

    currdate = datetime.datetime.now().strftime('%Y%m%d')

    #日志本地存储路径

    logspath = "/data/logs/" + currdate + "/access.log"

    #HDFS存储日志名

    logname = "access.log." + webid

    try:

    #创建HDFS目录,格式为website.com/20180114

    subprocess.Popen(["/usr/local/hadoop-2.5.2/bin/hadoop", "fs", "-mkdir", "hdfs://liuyazhuang121:9000/user/root/website.com/" + currdate], stdout = subprocess.PIPE)

    except Exception,e:

    pass

    #上传本地日志到HDFS

    putinfo = subprocess.Popen(["/usr/local/hadoop-2.5.2/bin/hadoop", "fs", "-put", logspath, "hdfs://liuyazhuang121:9000/user/root/website.com/" + currdate+ "/" + logname], stdout = subprocess.PIPE)

    for line in putinfo.stdout:

    print line

    三、部署HDFS的客户端

    我们需要在5台Web服务器部署HDFS的客户端,以便定期上传Web日志到HDFS存储平台,最终实现分布式计算。需要安装JDK(配置环境变量)、Hadoop(配置环境变量)。然后添加上传日志的功能到crontab,内容如下:

    55 23 * * * /usr/bin/python /usr/local/python/source/hdfsput.py >> /dev/null 2>&1截止目前,数据的分析源已经准备就绪,接下来的工作便是分析了。

    展开全文
  • 1.1 新建表,并将本地csv文件上传到该表中(Linux命令行模式下) 准备本地文件,一般都是csv文件。 1、注意记下该文件的分隔符; 2、记下该文件字段名称和字段类型,以及字段顺序!!; 3、注意该文件是否有表头,...

    一般建表的时候要建外部表,这样一不小心删除的话,还能够恢复。

    1、自己设计表结构,然后上传文件到该表中

    1.1 新建表,并将本地csv文件上传到该表中(Linux命令行模式下)

    准备本地文件,一般都是csv文件。

    1、注意记下该文件的分隔符;
    2、记下该文件字段名称和字段类型,以及字段顺序!!;
    3、注意该文件是否有表头,如果有,在下面进行载入空表的时候,表头也会被当做数据传入表中。如果不需要,就把表头去掉,但是注意记下字段顺序;

    建表
    写好建表的sql文件,假设命名为create_table.sql,内容如下;

    --create_table.sql内容如下
    CREATE EXTERNAL TABLE IF NOT EXISTS algo_tinyv.sample_data (
    user_id STRING COMMENT '用户唯一id',
    order_id STRING COMMENT '订单编号'
    )--注意字段顺序与上述文件字段的顺序要保持一致!!!!
    --PARTITIONED BY(dt STRING) --分区
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' --以逗号作为分隔符
    STORED AS TEXTFILE;
    

    运行 hive -f create_table.sql

    -f的含义是运行一个sql文件,若运行单个sql语句则是hive -e “select * from table”;

    载入本地数据

    运行 hive -e "load data local inpath 'sample_data.csv' overwrite into table 库名.表名 PARTITION(dt='字段名')"

    1、overwrite的意思是覆盖表algo_tinyv.sample_data中的数据!
    2、PARTITION:建一个分区表,如果没有可省略

    1.2 新建表,并将hdfs文件上传到该表中(hive-sql模式下)

    准备hdfs文件

    1、注意记下该文件的分隔符;
    2、记下该文件字段名称和字段类型,以及字段顺序!!;
    3、注意该文件是否有表头,如果有,在下面进行载入空表的时候,表头也会被当做数据传入表中。如果不需要,就把表头去掉,但是注意记下字段顺序;

    建空表并传入数据
    hive-sql模式下,运行

    use 库名; # 选择数据库
    drop table 库名.表名;
    create external table if not exists 库名.表名 (
    appl_no string COMMENT '订单id',
    score string COMMENT '分数'
    )--注意字段顺序与上述文件字段的顺序要保持一致!!!!
    row format delimited fields terminated by '\t' --设置分隔符
    lines terminated by '\n'
    stored as textfile
    location '/home/workdir/yourHDFSfile; --hdfs文件位置
    
    展开全文
  • public class PutMergeCompression { //本地多压缩文件上传hdfs public static void putMergeFunc(String LocalDir, String fsFile) throws IOException { Configuration conf = new Configuration(); ...

    代码如下:

    package net.maichuang.log;

    import java.io.File;

    import java.io.FileOutputStream;

    import java.io.IOException;

    import java.io.InputStream;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.FSDataInputStream;

    import org.apache.hadoop.fs.FSDataOutputStream;

    import org.apache.hadoop.fs.FileStatus;

    import org.apache.hadoop.fs.FileSystem;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.IOUtils;

    import org.apache.hadoop.io.compress.CompressionCodec;

    import org.apache.hadoop.io.compress.CompressionCodecFactory;

    import org.apache.hadoop.io.compress.CompressionOutputStream;

    import org.apache.hadoop.io.compress.GzipCodec;

    import org.apache.hadoop.util.ReflectionUtils;

    public class PutMergeCompression {

    //本地多压缩文件上传hdfs

    public static void putMergeFunc(String LocalDir, String fsFile) throws IOException

    {

    Configuration  conf = new Configuration();

    CompressionCodecFactory factory = new CompressionCodecFactory(conf);

    FileSystem fs = FileSystem.get(conf);       //fs是HDFS文件系统

    FileSystem local = FileSystem.getLocal(conf);   //本地文件系统

    Path localDir = new Path(LocalDir);

    Path HDFSFile = new Path(fsFile);

    FileStatus[] status =  local.listStatus(localDir);  //得到输入目录

    FSDataOutputStream out = fs.create(HDFSFile);       //在HDFS上创建输出文件

    for(FileStatus st: status)

    {

    Path temp = st.getPath();

    CompressionCodec codec = factory.getCodec(temp);

    InputStream in = codec.createInputStream(local.open(temp));

    IOUtils.copyBytes(in, out, 4096, false);    //读取in流中的内容放入out

    in.close(); //完成后,关闭当前文件输入流

    }

    out.close();

    }

    //hdfs上多文件合并压缩到本地

    public static void getMergeFunc(String fsFile,String LocalDir) throws IOException

    {

    Configuration  conf = new Configuration();

    CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(GzipCodec.class, conf);

    FileSystem fs = FileSystem.get(conf);       //fs是HDFS文件系统

    Path HDFSFile = new Path(fsFile);

    FileStatus[] status =  fs.listStatus(HDFSFile); //得到输入目录

    FileOutputStream outFile = new FileOutputStream(LocalDir);

    CompressionOutputStream outGzip = codec.createOutputStream(outFile);

    for(FileStatus st: status)

    {

    Path temp = st.getPath();

    FSDataInputStream in = fs.open(temp);

    IOUtils.copyBytes(in, outGzip, 4096, false);    //读取in流中的内容放入out

    in.close(); //完成后,关闭当前文件输入流

    }

    outGzip.close();

    outFile.close();

    }

    //从hdfs 上拷贝并压缩到本地

    public static void GzipFile() throws IOException {

    String inputFile = “hdfs://master:9000/user/xie/input/2013-11-01/”;

    String outputFile = “/home/xie/data/2013-11-01/”;

    Configuration  conf = new Configuration();

    FileSystem fs = FileSystem.get(conf);       //fs是HDFS文件系统

    Path HDFSFile = new Path(inputFile);

    FileStatus[] status =  fs.listStatus(HDFSFile); //得到输入目录

    File dir = new File(outputFile);

    if(!dir.exists())

    dir.mkdirs();

    for(FileStatus st: status)

    {

    String[] path = st.getPath().toString().split(“/”);

    String filename = path[path.length -1];

    getMergeFunc(inputFile+filename,outputFile+filename+”.gz”);

    }

    }

    //从本地解压并拷贝到hdfs上

    public static void UnzipFile() throws IOException {

    String local = “/home/xie/data/2013-11-01”;

    String fs = “hdfs://master:9000/user/xie/test/2013-11-01”;

    putMergeFunc(local,fs);

    }

    public static void main(String [] args) throws IOException

    {

    UnzipFile();

    //        GzipFile();

    //        String l = “/home/xie/data/xx”;

    //        String f = “hdfs://master:9000/user/xie/test1/PutMergeTest”;

    //        putMergeFunc(l,f);

    //        String x = “/home/xie/data/xx/wo.gz”;

    //        String y = “hdfs://master:9000/user/xie/test”;

    //        getMergeFunc(y,x);

    }

    }

    展开全文
  • 记录如何将本地文件上传HDFS中前提是已经启动了hadoop成功(nodedate都成功启动)①先切换到HDFS用户②创建一个input文件夹zhangsf@hadoop1:~$ hdfs dfs -mkdir /input查看创建的文件夹在 hadoop1:50070中查看(我...
  • HDFS文件上传下载步骤

    2020-12-23 17:52:14
    先来了解下分布式文件系统(Distributed FileSystem),它是跨越多台...1.当客户端要向HDFS文件系统上传文件时,第一步通过分布式文件系统先与NameNode建立连接,上传自己的元数据。(a)通过调用客户端的对象Distribute...
  • HDFS操作、数据上传与下载原理解析、高级特性及底层原理1 HDFS操作1.1 Web Console网页工具1.2 命令行1.2.1 普通的操作命令1.2.2 管理员命令1.3 Java API2 HDFS的原理解析2.1 数据上传的过程2.2 数据下载的过程2.3 ...
  • 1.实时读取本地文件到HDFS案例 需求分析 实现步骤 1.Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包 将commons-configuration-1.6.jar、 hadoop-auth-2.7.2.jar、 hadoop-common-2.7.2.jar、 ...
  • 这里使用的是CDH 5.13.0分布式系统在网上搜集了很久,找到一套能上传文件的java代码package com.data.sysatem.crawler;import java.io.BufferedInputStream;import java.io.File;import java.io.FileInputStream;...
  • 2.将本地的test.csv文件上传hdfs文件系统中 方法一:-put命令,其中/user/tmp/为hdfs中的路径 hdfs dfs -put test.csv /user/tmp/ 方法二:-moveFromLocal命令 hdfs dfs -moveFromLocal test.csv /user/tmp/ .....
  • 本地文件上传HDFS

    2021-06-30 20:59:01
    需求:按照配置文件的内容解析要上传文件的路径,HDFS的目标路径,文件格式匹配 具体代码实现 hdfsETL.json文件 { "hdfsProjectList":[ { "projectName":"data_collection", "localSrcPath":"E:/hadoop/...
  • 在做Hadoop数据挖掘项目的时候,我们第一步是源数据的获取,即把相应的数据放置到HDFS中,以便Hadoop进行计算,手动将文件上传HDFS中,未免太费时费力,所以我们可以采取像Flume一样的框架,或者采用Shell脚本进行...
  • 但是整体来看还是没有python简洁.1.1开发环境Hadoop 2.7.4Python3.7在Windows 下写Python1.2准备环境Python环境这里大手子就不用看啦,浪费时间啊这两条命令一个不行换另一个pip install hdfspip3 install hdfs我直接...
  • 一、什么是HDFS文件系统 文件系统是对文件存储设备的空间进行组织和分配,负责文件存储并对存入的文件进行保护和检索的系统。即:为用户建立文件、存入、读出、修改、转储文件,控制文件的存取,当用户不再使用时...
  • 当使用 hadoop fs -put localfile /user/xxx 时提示:put: Permission denied: user=root, access=WRITE, inode="/user/shijin":hdfs:supergroup:...一个是本地文件系统中localfile 文件的权限,一个是HDFS上 /user...
  • HDFS上传文件,如果是从某个datanode开始上传文件,会导致上传数据优先写满当前datanode的磁盘,这对于运行分布式程序是非常不利的。解决的办法:1、从其他非datanode节点上传可以将Hadoop的安装目录复制一份到...
  • How can I read a file from HDFS using Scala (not using Spark)?When I googled it I only found writing option to HDFS.import org.apache.hadoop.conf.Configuration;import org.apache.had...
  • 最近在搞数据传输的东西,涉及到hdfs的操作,主要有两种方法: 1、hive -e方法 2、hdfs dfs -get方法 二、详解: 两者都可以将集群的数据写入到本地,但是所存储的格式存在差别: 方法一: hive -e将集群的...
  • 本地选择需要上传的csv文件,如下图,一起瞅瞅吧! 将刚刚上传到服务器的文件,放到hdfs集群(hive目录下):hdfs dfs -put 文件名称 指定路径(需要放到hive目录下) 比如我上传到hive的目录如下:hdfs dfs -put...
  • 019-07-01 16:45:24,933 INFO org.apache.hadoop.ipc.Server: IPC Server handler 2 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock from 58.211.111.42:63048 Call#3 Retry#0java.io.IOEx...
  • HDFS上传文件,如果是从某个datanode开始上传文件,会导致上传数据优先写满当前datanode的磁盘,这对于运行分布式程序是非常不利的。解决的办法:1、从其他非datanode节点上传可以将Hadoop的安装目录复制一份到...
  • 项目场景:利用flume监控文件并上传hdfs上面出现的问题 问题描述: 在: @Override public void run() { bytes = mmInStream.read(buffer); mHandler.obtainMessage(READ_DATA, bytes, -1, buffer)....
  • hdfs数据流,的写入过程,也就是,文件从本地上传hdfs的过程,我们来看原理. 1.首先第一步我们的客户端去向,我们的完全分布式集群,像namenode节点,发起一个文件上传请求,并且把要上传的文件的路径带上。 2....
  • 服务器远程上传hdfs文件大小 内容精选换一换首先需要准备一台Linux系统的物理服务器作为镜像制作服务器(即宿主机),以及一台用于登录宿主机的本地Windows跳板机(物理服务器或虚拟机)。宿主机必须安装:vsftpd服务...
  • FS Shell调用文件系统(FS)Shell命令应使用bin/hadoop...对HDFS文件系统,scheme是hdfs,对本地文件系统,scheme是file。其中scheme和authority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme。一个H...
  • Given a Parquet dataset distributed on HDFS (metadata file + may .parquet parts), how to correctly merge parts and collect the data onto local file system? dfs -getmerge ... doesn't work - it merges m...
  • } //上传本地文件 @ResponseBody @RequestMapping("/uploadFile") @ApiOperation(value = "上传本地文件") public Result uploadFile(String src,String dst) throws IOException{ try{ init(); FileSystem fs = ...
  • window 系统 linux 系统 hdfs 文件系统 相互转移eclipse 安装插件成功后: 会出现如下 可以user-》右键 upload window 本地一个文件, 但是像这种要执行的jar 包 直接在hdfs 文件系统上 使用 hadoop jar 命令是无法...
  • 装载数据:LOAD移动数据 LOCAL:指定文件位于本地文件系统 ;OVERWRITE表示覆盖现有数据 ...-- 方式一:LOCAL表示文件位于本地,OVERWRITE表示覆盖现有数据 LOAD DATA LOCAL INPATH '/home/dayongd/Downloads/empl

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 29,532
精华内容 11,812
关键字:

本地数据上传hdfs