精华内容
下载资源
问答
  • compile 'org.mongodb.mongo-hadoop:mongo-hadoop-core:1.5.1' You can also download the jars files yourself from the Maven Central Repository. New releases are announced on the releases page. ...
  • hadoop-mongo map/reduce java

    2015-05-12 16:33:00
    官方 ...mongo-haoop项目地址https://github.com/mongodb/mongo-hadoop 该代码托管 https://github.com/cclient/mongo_hadoop_map-reduce 原分析由n...

    官方 http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

    mongo-haoop项目地址 https://github.com/mongodb/mongo-hadoop

    该代码托管 https://github.com/cclient/mongo_hadoop_map-reduce

     

    原分析 由nodejs+async编写

     

    用游标迭代查询mongo数据库,分析数据

     

    因数据量较大,目前执行分析任务耗时4个小时,这只是极限数据量的1%

    为优化,采用hadoop-mongo 方案

    优点:mongo只能单机单线程(不作shard的情况),hadoop-mongo可以集群处理。

     

    完成代码

     

    近期一直写的脚本语言,再回头写点JAVA,好悲催,感觉很受限制。

     

    初步代码 很粗糙

    MAIN 入口

     1 package group.artifactid;
     2 
     3 //cc MaxTemperature Application to find the maximum temperature in the weather dataset
     4 //vv MaxTemperature
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.MapWritable;
     8 import org.apache.hadoop.io.Text;
     9 import org.apache.hadoop.mapreduce.Job;
    10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    12 
    13 import com.mongodb.hadoop.MongoConfig;
    14 import com.mongodb.hadoop.io.BSONWritable;
    15 import com.mongodb.hadoop.util.MongoTool;
    16 
    17 import com.mongodb.hadoop.MongoConfig;
    18 import com.mongodb.hadoop.MongoInputFormat;
    19 import com.mongodb.hadoop.MongoOutputFormat;
    20 import com.mongodb.hadoop.util.MapredMongoConfigUtil;
    21 import com.mongodb.hadoop.util.MongoConfigUtil;
    22 import com.mongodb.hadoop.util.MongoTool;
    23 import org.apache.hadoop.conf.Configuration;
    24 import org.apache.hadoop.io.IntWritable;
    25 import org.apache.hadoop.util.ToolRunner;
    26 
    27 public class MongoMaxTemperature extends MongoTool {
    28     public MongoMaxTemperature() {
    29         Configuration conf = new Configuration();
    30         MongoConfig config = new MongoConfig(conf);
    31         setConf(conf);
    32         MongoConfigUtil.setInputFormat(getConf(), MongoInputFormat.class);
    33         MongoConfigUtil.setOutputFormat(getConf(), MongoOutputFormat.class);
    34         config.setInputURI("mongodb://localhost:27017/db1.collection1");
    35         config.setMapper(MongoMaxTemperatureMapper.class);
    36         // Combiner
    37         config.setCombiner(MongoMaxTemperatureCombine.class);
    38         // config.setReducer(MongoMaxTemperatureReducer.class);
    39         config.setReducer(MongoMaxTemperatureReducerCombine.class);
    40         config.setMapperOutputKey(Text.class);
    41         config.setMapperOutputValue(Text.class);
    42         config.setOutputKey(Text.class);
    43         config.setOutputValue(BSONWritable.class);
    44         config.setOutputURI("mongodb://localhost:27017/db2.collection2");
    45     }
    46 
    47     public static void main(String[] args) throws Exception {
    48         System.exit(ToolRunner.run(new MongoMaxTemperature(), args));
    49     }
    50 }

     

    MAPER代码

    package group.artifactid;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.bson.BSONObject;
    
    import com.mongodb.hadoop.io.BSONWritable;
    
    public class MongoMaxTemperatureMapper extends
            Mapper<Object, BSONObject, Text, Text> {
        @Override
        public void map(final Object key, BSONObject val, Context context)
                throws IOException, InterruptedException {
            String apmac = (String) val.get("apMac");
            String clientmac = (String) val.get("clientMac");
            String url = (String) val.get("url");
            String proto = (String) val.get("proto");
            if (proto.equals("http")&&!url.equals("")) {
                if (url.indexOf("http://") == 0) {
                    url = url.substring(7);
                }
                int firstargindex = url.indexOf('/');
                if(firstargindex>-1){
                    url = url.substring(0, firstargindex);    
                }
                //验证输入 带.则参数错误,临时转为}
                url=url.replace('.','}');
                context.write(new Text(apmac), new Text(clientmac + url));
            }
        }
    }

     

    COMBINE代码

    package group.artifactid;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.List;
    import java.util.Map;
    import com.mongodb.hadoop.io.BSONWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.bson.BasicBSONObject;
    
    public class MongoMaxTemperatureReducerCombine extends
            Reducer<Text, Text, Text, BSONWritable> {
        public class UrlCount {
            public UrlCount(String url, int count) {
                this.Url = url;
                this.Count = count;
            }
            String Url;
            int Count;
        }
        public List<UrlCount> compresstopobj(BasicBSONObject topobj, int topnum) {
            List<UrlCount> studentList = new ArrayList<UrlCount>();
            for (Map.Entry<String, Object> entry : topobj.entrySet()) {
                String Url = entry.getKey();
                String scount = entry.getValue().toString();
                studentList.add(new UrlCount(Url, Integer.parseInt(scount)));
            }
            Collections.sort(studentList, new Comparator<UrlCount>() {
                @Override
                public int compare(UrlCount o1, UrlCount o2) {
                    if (o1.Count > o2.Count) {
                        return -1;
                    } else if (o1.Count < o2.Count) {
                        return 1;
                    } else {
                        return 0;
                    }
                }
            });
    //        System.out.print("--------这里排序成功,但入库时,mongo按键名()排序,这里的排序是为筛选前100条用\n");
    //        for (int i = 0; i < studentList.size(); i++) {
    //            System.out.print(studentList.get(i).Count + "\n");
    //        }
            if (studentList.size() > topnum) {
                studentList = studentList.subList(0, topnum);
            }
            return studentList;
        }
    
        @Override
        public void reduce(Text apmac, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            BasicBSONObject clientmacmap = new BasicBSONObject();
            int count = 0;
            for (Text value : values) {
                String subline = value.toString();
                String clientmac = subline.substring(0, 17);
                int indexcount = subline.indexOf("|");
                int maplastcount = 1;
                String url = null;
                if (indexcount > -1) {
                    indexcount++;
                    url = subline.substring(17, indexcount);
                    String mapcount = subline.substring(indexcount);
                    maplastcount = Integer.parseInt(mapcount);
    
                } else {
                    url = subline.substring(17);
                }
                BasicBSONObject urlmap = (BasicBSONObject) clientmacmap
                        .get(clientmac);
                if (urlmap == null) {
                    urlmap = new BasicBSONObject();
                    clientmacmap.put(clientmac, urlmap);
                }
                Object eveurl = urlmap.get(url);
    
                if (eveurl == null && !url.equals(" ")) {
                    urlmap.put(url, maplastcount);
                } else {
                    urlmap.put(url, Integer.parseInt(eveurl.toString())
                            + maplastcount);
                }
                count++;
                if (count == 10000) {
                    List<UrlCount> arr = compresstopobj(urlmap, 100);
                    BasicBSONObject newurlcmap = new BasicBSONObject();
                    for (int i = 0; i < arr.size(); i++) {
                        UrlCount cuc = arr.get(i);
                        newurlcmap.put(cuc.Url, cuc.Count);
                    }
                    urlmap = newurlcmap;
                }
            }
            for (Map.Entry<String, Object> entry : clientmacmap.entrySet()) {
                BasicBSONObject urlmap = (BasicBSONObject) entry.getValue();
                List<UrlCount> arr = compresstopobj(urlmap, 100);
                BasicBSONObject newurlcmap = new BasicBSONObject();
                for (int i = 0; i < arr.size(); i++) {
                    UrlCount cuc = arr.get(i);
                    newurlcmap.put(cuc.Url, cuc.Count);
                }
                urlmap = newurlcmap;
            }
            context.write(apmac, new BSONWritable(clientmacmap));
        }
    }

    REDUCER代码

    package group.artifactid;
    
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.TreeSet;
    
    import com.mongodb.hadoop.io.BSONWritable;
    
    import org.apache.commons.io.output.ByteArrayOutputStream;
    import org.apache.hadoop.io.ArrayWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.MapWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.StringUtils;
    import org.apache.zookeeper.server.util.SerializeUtils;
    import org.bson.BasicBSONObject;
    
    public class MongoMaxTemperatureReducer extends
            Reducer<Text, Text, Text, BSONWritable> {
        public class UrlCount {
            public UrlCount(String url, int count) {
                this.Url = url;
                this.Count = count;
            }
            String Url;
            int Count;
        }
        class SortByCount implements Comparator {
            public int compare(Object o1, Object o2) {
                UrlCount s1 = (UrlCount) o1;
                UrlCount s2 = (UrlCount) o2;
                if (s1.Count > s2.Count)
                    return 1;
                return 0;
            }
        }
        public List<UrlCount> compresstopobj(BasicBSONObject topobj, int topnum) {
            List<UrlCount> studentList = new ArrayList<UrlCount>();
            for (Map.Entry<String, Object> entry : topobj.entrySet()) {
                String Url = entry.getKey();
                String scount = entry.getValue().toString();
                System.out.print(scount + "\n");
                studentList.add(new UrlCount(Url, Integer.parseInt(scount)));
            }
            Collections.sort(studentList, new SortByCount());
            if (studentList.size() > topnum) {
                studentList = studentList.subList(0, topnum);
            }
            return studentList;
        }
        @Override
        public void reduce(Text apmac, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            BasicBSONObject clientmacmap = new BasicBSONObject();
            int count = 0;
            for (Text value : values) {
                String subline = value.toString();
                String clientmac = subline.substring(0, 17);
                String url = subline.substring(17);
                BasicBSONObject urlmap = (BasicBSONObject) clientmacmap
                        .get(clientmac);
                if (urlmap == null) {
                    urlmap = new BasicBSONObject();
                    clientmacmap.put(clientmac, urlmap);
                }
                Object eveurl = urlmap.get(url);
                if (eveurl == null && !url.equals(" ")) {
                    urlmap.put(url, 1);
                } else {
                    urlmap.put(url, Integer.parseInt(eveurl.toString()) + 1);
                }
                count++;
                if (count == 1000) {
                    List<UrlCount> arr = compresstopobj(urlmap, 100);
                    BasicBSONObject newurlcmap = new BasicBSONObject();
                    for (int i = 0; i < arr.size(); i++) {
                        UrlCount cuc = arr.get(i);
                        newurlcmap.put(cuc.Url, cuc.Count);
                    }
                    urlmap = newurlcmap;
                }
            }
            context.write(apmac, new BSONWritable(clientmacmap));
        }
    }

     

    Mongo collection 数据格式

    {
        "_id" : ObjectId("54d83f3548c9bc218e056ce6"),"apMac" : "aa:bb:cc:dd:ee:ff","proto" : "http",
        "url" : "extshort.weixin.qq.comhttp",
        "clientMac" : "ff:ee:dd:cc:bb:aa"
    }

     

    clientMac和url 先拼在一起,再按mac长度分割

    数据流程 

    orgin->map

    map:[{"aa:bb:cc:dd:ee:ff":[ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp]}]

     

    假如是多条数据则 

    map:[{"aa:bb:cc:dd:ee:ff":["ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp","ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp1","ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp2"]}]

    map->compine

    如果有相同的client+url 则统计个数,以|分隔

    compine:[{"aa:bb:cc:dd:ee:ff":[ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp|100]}]

    compine->reducer

    reducer中 按mac长度分割出 clientMac url 再按“|”分割出 个数

    统计前每个clientMac的前100条

    reduce:

    {
        "_id": "00:21:26:00:0A:FF",
        "aa:bb:cc:1c:b9:8f": {
            "c}tieba}baidu}com|": 1,
            "short}weixin}qq}comhttp:|": 1,
            "get}sogou}com|": 1,
            "md}openapi}360}cn|": 1,
            "74}125}235}224|": 1,
            "mmbiz}qpic}cn|": 1,
            "tb}himg}baidu}com|": 1
        },
        "cc:bb:aa:d5:30:8a": {
            "captive}apple}com|": 2,
            "www}airport}us|": 1,
            "www}itools}info|": 2,
            "www}thinkdifferent}us|": 1,
            "www}ibook}info|": 1
        },
        "ee:ee:bb:78:31:74": {
            "www}itools}info|": 1,
            "www}ibook}info|": 1
        }
        
    }

     

    转载于:https://www.cnblogs.com/zihunqingxin/p/4497759.html

    展开全文
  • 在 EMR 中使用 Mongo-Hadoop Mongo-Hadoop 是 MongoDB 推出的用于 Hadoop 系列组件连接 MongoDB 的组件。其原理跟我们上一篇文章介绍的 ES-Hadoop 类似。EMR 中已经集成了 Mongo-Hadoop,用户不用做任何部署配置,...

    在 EMR 中使用 Mongo-Hadoop

    Mongo-Hadoop 是 MongoDB 推出的用于 Hadoop 系列组件连接 MongoDB 的组件。其原理跟我们上一篇文章介绍的 ES-Hadoop 类似。EMR 中已经集成了 Mongo-Hadoop,用户不用做任何部署配置,即可使用 Mongo-Hadoop。下面我们通过几个例子来展示一下 Mongo-Hadoop 的用法。

    准备

    在下面这几个例子中,我们使用一个统一的数据模型

    {
      "id": long,
      "name": text,
      "age": integer,
      "birth": date
    }

    由于我们是要通过 Mongo-Hadoop 向 MongoDB 的特定 collection (可以理解成数据库中的表)写数据,因此需要首先确保 MongoDB 上存在这个 collection。为此,首先需要在一台能够连接到 MongoDB 的客户机上运行 mongo client(你可能需要安装一下客户端程序,客户端程序可在 mongo 官网下载)。我们以连接阿里云数据库 MongoDB 版为例:

    mongo --host dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717 --authenticationDatabase admin -u root -p 123456

    其中 dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com 为 MongoDB 的主机名,3717 为端口号(该端口号根据您的 MongoDB 集群而定,对于自建集群,默认为 27017),-p 为密码(这里假设密码为 123456)。进入交互式页面,运行如下命令,在 company 数据库下创建名为 employees 的 collection:

    > use company;
    > db.createCollection("employees")

    准备一个文件,每一行为一个 json 对象,如下所示,

    {"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
    {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
    {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}

    并保存至 HDFS 指定目录(如 "/mongo-hadoop/employees.txt")。

    Mapreduce

    在下面这个例子中,我们读取 HDFS 上 /mongo-hadoop 目录下的 json 文件,并将这些 json 文件中的每一行作为一个 document 写入 MongoDB。

    package com.aliyun.emr;
    
    import com.mongodb.BasicDBObject;
    import com.mongodb.hadoop.MongoOutputFormat;
    import com.mongodb.hadoop.io.BSONWritable;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Test implements Tool {
    
      private Configuration conf;
    
      @Override
      public int run(String[] args) throws Exception {
    
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
        conf.set("mongo.output.uri", "mongodb://<your_username>:<your_password>@dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717/company.employees?authSource=admin");
    
        Job job = Job.getInstance(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BSONWritable.class);
    
        job.setJarByClass(Test.class);
        job.setMapperClass(MongoMapper.class);
    
        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
    
        return job.waitForCompletion(true) ? 0 : 1;
      }
    
      @Override
      public Configuration getConf() {
        return conf;
      }
    
      @Override
      public void setConf(Configuration conf) {
        this.conf = conf;
      }
    
      public static class MongoMapper extends Mapper<Object, Text, Text, BSONWritable> {
    
        private BSONWritable doc = new BSONWritable();
        private int employeeNo = 1;
        private Text id;
    
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
          if (value.getLength() > 0) {
            doc.setDoc(BasicDBObject.parse(value.toString()));
            id = new Text("employee" + employeeNo++);
            context.write(id, doc);
          }
        }
      }
    
      public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new Test(), args);
        System.exit(ret);
      }
    }

    将该代码编译打包为 mr-test.jar, 运行

    hadoop jar mr-test.jar com.aliyun.emr.Test -Dmapreduce.job.reduces=0 -libjars mr-test.jar /mongo-hadoop

    待任务执行完毕后可以使用 MongoDB 客户端查询结果:

    > db.employees.find();
    { "_id" : "employee1", "id" : 1, "name" : "zhangsan", "birth" : "1990-01-01", "addr" : "No.969, wenyixi Rd, yuhang, hangzhou" }
    { "_id" : "employee2", "id" : 2, "name" : "lisi", "birth" : "1991-01-01", "addr" : "No.556, xixi Rd, xihu, hangzhou" }
    { "_id" : "employee3", "id" : 3, "name" : "wangwu", "birth" : "1992-01-01", "addr" : "No.699 wangshang Rd, binjiang, hangzhou" }

    Spark

    本示例同 Mapreduce 一样,也是向 MongoDB 写入数据,只不过是通过 Spark 来执行。

    package com.aliyun.emr;
    
    import com.mongodb.BasicDBObject;
    import com.mongodb.hadoop.MongoOutputFormat;
    import java.util.concurrent.atomic.AtomicInteger;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.bson.BSONObject;
    import scala.Tuple2;
    
    public class Test {
    
      public static void main(String[] args) {
    
        SparkSession ss = new SparkSession(new SparkContext());
    
        final AtomicInteger employeeNo = new AtomicInteger(0);
        JavaRDD<Tuple2<Object, BSONObject>> javaRDD =
            ss.read().text("hdfs://emr-header-1:9000/mongo-hadoop/employees.txt")
                .javaRDD().map((Function<Row, Tuple2<Object, BSONObject>>) row -> {
              BSONObject bson = BasicDBObject.parse(row.mkString());
              return new Tuple2<>("employee" + employeeNo.getAndAdd(1), bson);
            });
    
        JavaPairRDD<Object, BSONObject> documents = JavaPairRDD.fromJavaRDD(javaRDD);
    
        Configuration outputConfig = new Configuration();
        outputConfig.set("mongo.output.uri", "mongodb://<your_username>:<your_password>@dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717/company.employees?authSource=admin");
    
        // 将其保存为一个 "hadoop 文件",实际上通过 MongoOutputFormat 写入 mongo。
        documents.saveAsNewAPIHadoopFile(
            "file:///this-is-completely-unused",
            Object.class,
            BSONObject.class,
            MongoOutputFormat.class,
            outputConfig
        );
      }
    }

    将其打包成 spark-test.jar,运行如下命令执行写入过程

    spark-submit --master yarn --class com.aliyun.emr.Test spark-test.jar

    待任务执行完毕后可以使用 MongoDB 客户端查询结果。

    Hive

    这里展示使用 Hive 通过 SQL 来读写 MongoDB 的方法。

    首先运行 hive 命令进入交互式环境,先创建一个表

    CREATE DATABASE IF NOT EXISTS company;

    之后创建一个外部表,表存储在 MongoDB 上。但是创建外部表之前,注意像第一小节中介绍的,需要首先创建 MongoDB Collection —— employees。

    下面回到 Hive 交互式控制台,运行如下 SQL 创建一个外部表,MongoDB 连接通过 TBLPROPERTIES 来设置:

    CREATE EXTERNAL TABLE IF NOT EXISTS employees(
      id BIGINT,
      name STRING,
      birth STRING,
      addr STRING
    )
    STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
    WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id"}')
    TBLPROPERTIES('mongo.uri'='mongodb://<your_username>:<your_password>@dds-xxxxxxxxxxxxxxxxxxxxx.mongodb.rds.aliyuncs.com:3717/company.employees?authSource=admin');

    注意这里通过 SERDEPROPERTIES 把 Hive 的字段 "id" 和 MongoDB 的字段 "_id" 做了映射(用户可以根据自身需要选择做或者不做某些映射)。
    另外注意在 Hive 表中我们将 birth 设置成了 STRING 类型。这是因为 Hive 和 MongoDB 对于数据格式处理的不一致造成的问题。Hive 将原始 date 转换后发送给 MongoDB,然后再从 Hive 中查询可能会得到 NULL。

    往表中插入一些数据:

    INSERT INTO TABLE employees VALUES (1, "zhangsan", "1990-01-01","No.969, wenyixi Rd, yuhang, hangzhou");
    INSERT INTO TABLE employees VALUES (2, "lisi", "1991-01-01", "No.556, xixi Rd, xihu, hangzhou");
    INSERT INTO TABLE employees VALUES (3, "wangwu", "1992-01-01", "No.699 wangshang Rd, binjiang, hangzhou");

    执行查询即可看到结果:

    SELECT * FROM employees LIMIT 100;
    OK
    1    zhangsan    1990-01-01    No.969, wenyixi Rd, yuhang, hangzhou
    2    lisi    1991-01-01    No.556, xixi Rd, xihu, hangzhou
    3    wangwu    1992-01-01    No.699 wangshang Rd, binjiang, hangzhou
    展开全文
  • 报错信息![](https://img-ask.csdn.net/upload/201910/09/1570610725_197608.png) ![图片说明](https://img-ask.csdn.net/upload/201910/09/1570610769_40828.png) 代码!...eclipse下可以正常运行
  • 使用mongo作为数据源写MapReduce时,在eclipse上运行正常,打包成jar后在linux运行报错。打包使用的是自带的export。 [img=https://img-bbs.csdn.net/upload/201910/09/1570607467_320442.png][/img] [img=...
  • Mongo-Hadoop

    2015-02-10 10:55:03
    解压到/home/kevin/hadoop/hadoop/share/mongo-hadoop-2.2-1.2.0 (其实是任意,真正分布式的时候要注意这个jar包的问题)   Mapreduce结果输出到MongDB 要修改的代码(input类似 ps:extends Ma

    下载

    https://github.com/mongodb/mongo-hadoop/releases

     

    解压到/home/kevin/hadoop/hadoop/share/mongo-hadoop-2.2-1.2.0

    (其实是任意,真正分布式的时候要注意这个jar包的问题)

     

    Mapreduce结果输出到MongDB

    要修改的代码(input类似  ps:extends Mapper<Object,BSONObject, IntWritable,DoubleWritable>

    //FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    MongoConfigUtil.setOutputURI(conf, "mongodb://localhost:27017/test.out");
     job.setOutputFormatClass(MongoOutputFormat.class);

     

    注意

    import的不要是 com.mongodb.hadoop.mapred.MongoInputFormat;,而是

    import com.mongodb.hadoop.MongoOutputFormat;
    import com.mongodb.hadoop.MongoInputFormat;
    import com.mongodb.hadoop.util.MongoConfigUtil;

     

       BasicBSONObject output = new BasicBSONObject ();
             output . put ( "count" , count );
             output . put ( "avg" , avg );
             output . put ( "sum" , sum );
             pContext . write ( pKey , new BSONWritable ( output ));
    展开全文
  • mongo-hadoop搭建问题

    2015-04-17 03:09:20
    搭建mongo-hadoop环境老出问题,网上资料比较少,英文比较菜,请问大神们能帮忙给个java例子么。顺带问问hadoop2.6是不是不能使用这个插件?
  • The MongoDB Connector for Hadoop is a library which allows MongoDB (or backup files in its data format, BSON) to be used as an input source, or output destination, for Hadoop MapReduce tasks....
  • MongoDB Hadoop连接器 目的 用于HadoopMongoDB连接器是一个库,该库允许将MongoDB(或数据格式为BSON的备份文件)用作Hadoop MapReduce任务的输入源或输出目标。 它旨在提供更大的灵活性和性能,并使MongoDB中的数据...
  • mongo-hadoop-hive-2.0.0

    2017-10-12 15:00:21
    mongo-hadoop-hive-2.0.0.jar xx cc vvs . dd s . s ww w .
  • mongo-hadoop-core-2.0.0

    2017-10-12 15:03:12
    mongo-hadoop-core-2.0.0.jar x x s s sa a a s dd . d d
  • Hadoop数据采集方案

    千次阅读 2016-09-30 11:11:14
    数据源 RDBMS Oracle MySQL NOSQL MongoDB 文件 日志文件 JSON XML 数据存储 HDFS HBase 工具 Sqoop Flume Streamsets Oracle GoldenGate for Big Data MySQL Applier for Hadoop mongo-hadoop

    数据源

    • RDBMS
      • Oracle
      • MySQL
    • NOSQL
      • MongoDB
    • 文件
      • 日志文件
      • JSON
      • XML

    数据存储

    • HDFS
    • HBase

    工具

    • Sqoop
    • Flume
    • Streamsets
    • Oracle GoldenGate for Big Data
    • MySQL Applier for Hadoop
    • mongo-hadoop

    展开全文
  • mongo-hadoop-core-2.0.1.jar

    2018-01-26 10:39:20
    本jar包 用于mongo 数据同步到hive,需要三个jar包之一
  • flink 官方示例比较简单,也很久没更新过了。 mongos生产集群需要认证权限,按文档提示配置好规则,这里output...import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.MongoOutputFormat; impo...
  • mongo-hadoop-hive-2.0.2.jar

    2018-01-26 10:40:54
    本JAR包用于把Mongo数据抽取到hive种对表字段的映射,如果没有这个包,是无法映射的
  • 由于Spark2.0.0以上默认2.11.x编译,所以对用哪个版本,...'mongo-spark-connector_2.11' , version : '2.2.1' /* fastjson */ compile group : 'com.alibaba' , name : 'fastjson' , version : '1.2.44' }
  • // conf.set("mongo.output.uri","mongodb://localhost:27017/mongo-hadoop-test.appsout"); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); // 5 ...
  • 在使用Mongodb Hadoop Connector的时,采用了Mongodb-based方式,也就是hive直接在mongo上进行查询,在测试的时候,遇到了一个问题,那就是在进行查询的时候,where后面不可以使用“=”。 示例 首先在Mongodb中插入...
  • PySpark to connect to MongoDB via mongo-hadoop 二、配置步骤 (注意版本作相应调整,spark-2.4.3,hadoop2.7,Scala2.11) 1. # Get the MongoDB Java Driver #PROJECT_HOME 自定义的项目根目录,下面...
  • MongoSpark是一个基于Maven的Java项目,它显示了使用Mongodb Hadoop Connector将Spark与MongoDb集成。 最初,它不包含任何bigData分析,但将在不久的将来包含。 最初,该项目仅将一个馆藏的所有文档复制到另一个...
  • 一.jdk安装部署 1.去官网下载jdk1.8.0_201.tat.gz; 解压jdk1.8.0_201.tat.gz至D:\soft\Java\jdk1.8.0_201 ...1.去Hadoop官网下载hadoop2.7.7.tar.gz; 下载hadooponwindows-master.zip(后面有用) 解压hadoop...
  • mongo-hadoop是用于Hadoop的MongoDB连接器是一个库,该库允许将MongoDB(或其数据格式的备份文件,BSON)用作Hadoop MapReduce任务的输入源或输出目标。它旨在提供更大的灵活性和性能,并使MongoDB中的数据与Hadoop...
  • 使用hadoop读写mongodb

    千次阅读 2016-11-07 11:58:49
    想要用hadoop读写mongodb内的数据,首先需要mongo-hadoop包。我是使用maven自动下载的,包名:org.mongodb.mongo-hadoop:mongo-hadoop-core 或者你也可以去http://search.maven.org/ 或者其他网站
  • hadoop 分布式集群搭建

    2018-01-26 10:36:26
    集群搭建:里面包括hadoop hive hbase spark mongo 等组建
  • Awesome Hadoop

    千次阅读 2015-12-26 12:17:59
    A curated list of amazingly awesome Hadoop and Hadoop ecosystem resources. Inspired by Awesome PHP, Awesome Python and Awesome Sysadmin Awesome Hadoop Hadoop YARN NoSQL SQL on Hadoop Data Management ...
  • spark连接mongodb

    千次阅读 2016-02-19 18:24:31
    org.mongodb.mongo-hadoop mongo-hadoop-core 1.4.2 java连接mongodb连接器 org.mongodb mongo-java-driver 2.13.0 2.使用示例 import com.mongodb.hadoop.MongoOutputFormat; im
  • Mongo hadoop connector  https://github.com/mongodb/mongo-hadoop wiki https://github.com/mongodb/mongo-hadoop/wiki ppt http://www.slideshare.net/mongodb/hadoop-webinar?from_embed_lead_cta=t
  • maven下下来的jar包都在setting.xml文件中,去找这个文件(如果记不住的话按图操作) ... 复制目录,去文件夹中搜索 D:\apache-maven\apache-maven-3.6.1\conf\ 打开往下翻 找到 d:/jarstore ...mongo-hadoop

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,548
精华内容 1,419
关键字:

hadoopmongo