精华内容
下载资源
问答
  • Druid flink kafka

    2020-06-19 15:05:30
    Druid实际应用 配合flink 接收处理kafka数据

    Druid实际应用 配合flink 接收处理kafka数据

    1.druid重要概念  roll-up聚合

      druid可以汇总原始数据。汇总是就是把选定的相同维度的数据进行聚合操作,可减少存储的大小。 druid要求数据具有如下要求,数据分为三部分:时间戳,维度列,指标列  以下数据

    {"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":20,"bytes":9024}
    {"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":255,"bytes":21133}
    {"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":11,"bytes":5780}
    {"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":38,"bytes":6289}
    {"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":377,"bytes":359971}
    {"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2","packets":49,"bytes":10204}
    {"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":38,"bytes":6289}
    {"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":123,"bytes":93999}
    {"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8","packets":12,"bytes":2818}

    如果该数据被druid加载并且我们打开了聚合功能,聚合的要求是packets和bytes对应的数据进行累加,并且对条数进行计数(具体设置操作后续加载数据时讲解),聚合后的数据如下:

    ┌──────────────────────────┬────────┬───────┬─────────┬─────────┬─────────┐
    │ __time                   │ bytes  │ count │ dstIP   │ packets │ srcIP   │
    ├──────────────────────────┼────────┼───────┼─────────┼─────────┼─────────┤
    │ 2018-01-01T01:01:00.000Z │  35937 │     3 │ 2.2.2.2 │     286 │ 1.1.1.1 │
    │ 2018-01-01T01:02:00.000Z │ 366260 │     2 │ 2.2.2.2 │     415 │ 1.1.1.1 │
    │ 2018-01-01T01:03:00.000Z │  10204 │     1 │ 2.2.2.2 │      49 │ 1.1.1.1 │
    │ 2018-01-02T21:33:00.000Z │ 100288 │     2 │ 8.8.8.8 │     161 │ 7.7.7.7 │
    │ 2018-01-02T21:35:00.000Z │   2818 │     1 │ 8.8.8.8 │      12 │ 7.7.7.7 │
    └──────────────────────────┴────────┴───────┴─────────┴─────────┴─────────┘

    所以由于druid对摄入的数据有聚合的机制,并且也能对接实时流数据,我们可以使用druid对接kafka数据进行聚合。

    架构体系图

    数据到kafka后,flink只对数据进行预处理,然后再放回kafka,很大程度减少flink的计算量,druid拉去预处理的数据进行聚合,然后到库中,提供查询。

    2.创建kafka生产类生产广告数据

    1.模型类创建

    public class AdClickLog {
        //广告ID
        private long t_id;
        //广告主ID
        private long corpuin;
        //域名
        private String host;
        //设备类型
        private String device_type;
        //广告来源
        private String ad_source;
        //广告媒介
        private String ad_media;
        //广告系列
        private String ad_compaign;
        //城市
        private String city;
        //点击时间
        private String timeStamp;
        //用户的ID
        private String user_id;
        //点击发生时该字段会填入user_id
        private String click_user_id;}

    2:创建一份模拟生成日志的代码,注意修改最后的请求地址;

    import com.alibaba.fastjson.JSONObject;
    import com.itheima.report.bean.AdClickLog;
    import com.itheima.report.bean.ClickLog;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.http.HttpResponse;
    import org.apache.http.HttpStatus;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.entity.StringEntity;
    import org.apache.http.impl.client.CloseableHttpClient;
    import org.apache.http.impl.client.HttpClientBuilder;
    import org.apache.http.util.EntityUtils;
    
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Random;
    
    /**
     * 点击流日志模拟器
     */
    public class AdClickLogGenerator {
        private static Long[] t_ids = new Long[]{1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l, 9l, 10l, 11l, 12l, 13l, 14l, 15l, 16l, 17l, 18l, 19l, 20l};//频道id集合
        private static Long[] corpuins = new Long[]{1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l, 9l, 10l, 11l, 12l, 13l, 14l, 15l, 16l, 17l, 18l, 19l, 20l};//产品类别id集合
        private static Long[] user_ids = new Long[]{1l, 2l, 3l, 4l, 5l, 6l, 7l, 8l, 9l, 10l, 11l, 12l, 13l, 14l, 15l, 16l, 17l, 18l, 19l, 20l};//用户id集合
    
        /*
        host域名
         */
        private static String[] hosts = new String[]{"baidu.com", "google"};//地区-国家集合
        /**
         * 是否是新用户
         */
        private static int[] is_news = new int[]{0, 1};
        /*
        广告来源
         */
        private static String[] ad_sources = new String[]{"s1", "s2"};
        /*
        广告媒介
         */
        private static String[] ad_medias = new String[]{"m1", "m2"};
        /*
       广告系列
        */
        private static String[] ad_campagins = new String[]{"风系列", "人生","爱情"};
    
        /**
         * 设备类型
         */
        private static String[] device_types = new String[]{"pc", "mobile", "other"};
    
    /*
    城市
     */
    private static String[] citys = new String[]{"beijing", "shanghai", "guangzhou"};
        private static Long[] gettimes(String time) {
            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss:SSS");
            try {
                Date date = dateFormat.parse(time);
                long timetemp = date.getTime();
                Random random = new Random();
                int randomint = random.nextInt(10);
                long starttime = timetemp - randomint * 3600 * 1000;
                long endtime = starttime + randomint * 3600 * 1000;
                return new Long[]{starttime, endtime};
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return new Long[]{0l, 0l};
        }
    
    
        /**
         * 模拟发送Http请求到上报服务系统
         *
         * @param url
         * @param json
         */
        public static void send(String url, String json) {
            try {
                CloseableHttpClient httpClient = HttpClientBuilder.create().build();
                HttpPost post = new HttpPost(url);
                JSONObject response = null;
                try {
                    StringEntity s = new StringEntity(json.toString(), "utf-8");
                    s.setContentEncoding("utf-8");
                    // 发送json数据需要设置contentType
                    s.setContentType("application/json");
                    post.setEntity(s);
    
                    HttpResponse res = httpClient.execute(post);
                    if (res.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                        // 返回json格式:
                        String result = EntityUtils.toString(res.getEntity());
                        System.out.println(result);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
    
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
            Random random = new Random();
            for (int i = 0; i < 10; i++) {
                //频道id 类别id 产品id 用户id 打开时间 离开时间 地区 网络方式 来源方式 浏览器
                AdClickLog clickLog = new AdClickLog();
                clickLog.setCity(citys[random.nextInt(citys.length)]);
                clickLog.setAd_compaign(ad_campagins[random.nextInt(ad_campagins.length)]);
                clickLog.setAd_media(ad_medias[random.nextInt(ad_medias.length)]);
                clickLog.setAd_source(ad_sources[random.nextInt(ad_sources.length)]);
                clickLog.setCorpuin(corpuins[random.nextInt(corpuins.length)]);
                clickLog.setDevice_type(device_types[random.nextInt(device_types.length)]);
                clickLog.setHost(hosts[random.nextInt(hosts.length)]);
                clickLog.setT_id(t_ids[random.nextInt(t_ids.length)]);
                Date date = new Date();
                clickLog.setTimeStamp(df.format(date));
                clickLog.setUser_id(user_ids[random.nextInt(user_ids.length)].toString());
                //设置点击事件的用户id
                if(i%2==0){
                  clickLog.setClick_user_id(clickLog.getUser_id());
                }
                String jonstr = JSONObject.toJSONString(clickLog);
                System.out.println(jonstr);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                send("http://localhost:8888/adReceive", jonstr);
            }
        }
    }
    

    3.创建Contorller接收请求并把日志数据发送到kafka中,注意修改kafka中的topic,使用kafka manager创建topic

    import com.alibaba.fastjson.JSON;
    import com.itheima.report.bean.Message;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @RestController
    @RequestMapping("/")
    public class AdReportController {
    
        @Value("adtest")
        private String topic;
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @RequestMapping("/adReceive")
        public Map receive(@RequestBody String json) {
            // 保存返回给前端的结果
            Map<String, String> result = new HashMap<>();
            try {
                // 将所有的数据都封装在一个Message实体类中
                Message message = new Message();
                // 消息体(点击流消息)
                message.setMessage(json);
                // 点击的数量
                message.setCount(1L);
                // 事件时间
                message.setTimestamp(System.currentTimeMillis());
                // 将实体类转换为JSON字符串
                String messageJSON = JSON.toJSONString(message);
                System.out.println(messageJSON);
    
                // 发送消息到kafka的指定topic中
                kafkaTemplate.send(topic,messageJSON);
    
                result.put("result", "ok");
            } catch (Exception e) {
                e.printStackTrace();
                result.put("result", "failed");
            }
    
            return result;
        }
    }
    

    3.flink数据预处理

    1:创建样例类来封装数据

    import com.alibaba.fastjson.JSON
    import org.apache.commons.lang3.StringUtils
    
    case class AdClickLog(
                           city:String,
                           ad_compaign: String,
                           ad_media: String,
                           ad_source: String,
                           corpuin: String,
                           device_type: String,
                           host: String,
                           t_id: String,
                           user_id: String,
                           click_user_id: String,
                           timestamp: String
                         )
    
    object AdClickLog {
    
      def apply(json: String) = {
        //使用FastJSON的JSON.parseObject方法将JSON字符串构建一个ClickLog实例对象
        //{\"ad_compaign\":\"风系列\",\"ad_media\":\"m1\",\"ad_source\":\"s2\",\"corpuin\":18,
        // \"device_type\":\"mobile\",\"host\":\"google\",\"is_new\":1,\"t_id\":5,\"timestamp\":1559092939197}
        val jsonObject = JSON.parseObject(json)
        var click_user_id = jsonObject.getString("click_user_id")
        if (StringUtils.isBlank(click_user_id)) {
          click_user_id = "null"
        }
        new AdClickLog(
          jsonObject.getString("city"),
          jsonObject.getString("ad_compaign"),
          jsonObject.getString("ad_media"),
          jsonObject.getString("ad_source"),
          jsonObject.getString("corpuin"),
          jsonObject.getString("device_type"),
          jsonObject.getString("host"),
          jsonObject.getString("t_id"),
          jsonObject.getString("user_id"),
          click_user_id,
          jsonObject.getString("timestamp")
        )
      }
    }

    准备好修改GalobalUtil,获取消费的topic,添加flink消费的topic,以及生产消息的topic,在GlobalConfig中获取topic;

    先去配置文件中添加topic信息,然后在GlobalConfigUtils中添加获取方法

    flink的任务程序

    import java.text.SimpleDateFormat
    import java.util.Properties
    
    import com.alibaba.fastjson.JSON
    import com.itheima.realprocess.bean.AdClickLog
    import com.itheima.realprocess.task._
    import com.itheima.realprocess.util.GlobalConfigUtil
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.scala._
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.environment.CheckpointConfig
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}
    import org.json4s.DefaultFormats
    import org.json4s.native.Serialization.write
    
    object AdApp {
      def main(args: Array[String]): Unit = {
        // 创建main方法,获取StreamExecutionEnvironment运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 设置流处理的时间为EventTime,使用数据发生的时间来进行数据处理
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 将Flink默认的开发环境并行度设置为1
        env.setParallelism(1)
    
        // 保证程序长时间运行的安全性进行checkpoint操作
        //
        // 5秒启动一次checkpoint
        env.enableCheckpointing(5000)
        // 设置checkpoint只checkpoint一次
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        // 设置两次checkpoint的最小时间间隔
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
        // checkpoint超时的时长
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        // 允许的最大checkpoint并行度
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
        // 当程序关闭的时,触发额外的checkpoint
        env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        // 设置checkpoint的地址
        env.setStateBackend(new FsStateBackend("hdfs://hp101:9000/flink-checkpoints/"))
    
        //
        // 整合Kafka
        //
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", GlobalConfigUtil.bootstrapServers)
        properties.setProperty("zookeeper.connect", GlobalConfigUtil.zookeeperConnect)
        properties.setProperty("group.id", GlobalConfigUtil.groupId)
        properties.setProperty("enable.auto.commit", GlobalConfigUtil.enableAutoCommit)
        properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.autoCommitIntervalMs)
    
        // 配置下次重新消费的话,从哪里开始消费
        // latest:从上一次提交的offset位置开始的
        // earlist:从头开始进行(重复消费数据)
        properties.setProperty("auto.offset.reset", GlobalConfigUtil.autoOffsetReset)
        // 配置序列化和反序列化
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    
        val consumer: FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String](
          GlobalConfigUtil.adTopic,
          new SimpleStringSchema(),
          properties
        )
    
        val kafkaDataStream: DataStream[String] = env.addSource(consumer)
        // 使用map算子,将kafka中消费到的数据
        val messageDataStream = kafkaDataStream.map {
          msgJson =>
            // 使用FastJSON转换为JSON对象
            val jsonObject = JSON.parseObject(msgJson)
            val count = jsonObject.getLong("count")
            val message = jsonObject.getString("message")
            val timestamp = jsonObject.getLong("timestamp")
            print(message)
            // 将JSON的数据解析封装到Message样例类中
            // 将数据封装到ClickLog样例类
            AdClickLog(message)
        }
        messageDataStream.print()
        // 添加flink的水印处理 , 允许得最大延迟时间是2S
        val watermarkDataStream: DataStream[AdClickLog] = messageDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[AdClickLog] {
          var currentTimestamp: Long = 0L
          val maxDelayTime = 2000L
          val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
    
          var watermark: Watermark = _
    
          // 获取当前的水印
          override def getCurrentWatermark = {
            watermark = new Watermark(currentTimestamp - maxDelayTime)
            watermark
          }
    
          // 时间戳抽取操作
          override def extractTimestamp(t: AdClickLog, l: Long) = {
            val timeStamp = t.timeStamp
            currentTimestamp = Math.max(df.parse(timeStamp).getTime, currentTimestamp)
            currentTimestamp
          }
        })
        // 在App中调用预处理任务的process方法,并打印测试
        val clicklogWideDataStream = PreprocessTask.processAd(watermarkDataStream)
        val value: DataStream[String] = clicklogWideDataStream.map {
          x =>
            implicit val formats = DefaultFormats
            val jsonString = write(x)
            jsonString
        }
        value.addSink(
          new FlinkKafkaProducer09[String](
            GlobalConfigUtil.bootstrapServers,
            GlobalConfigUtil.adProcessTopic,
            new SimpleStringSchema()
          )
        )
        //    clicklogWideDataStream.addSink()
    
        env.execute("RealProcessApp")
      }
    }

    添加process方法对数据进行预处理,主要是判断用户是否为新用户,以及添加点击次数字段

    import com.itheima.realprocess.bean.{AdClickLog, AdClickLogWide, ClickLogWide, Message}
    import com.itheima.realprocess.util.HBaseUtil
    import org.apache.commons.lang3.StringUtils
    import org.apache.commons.lang3.time.FastDateFormat
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.scala.DataStream
    
    /**
      * 预处理的任务
      */
    object PreprocessTask {
    
      // 包装isNew字段的样例类
      case class IsNewWrapper(isNew: Int, isHourNew: Int, isDayNew: Int, isMonthNew: Int)
    
      //处理广告点击数据,增加isNew字段
      def processAd(watermarkDataStream: DataStream[AdClickLog]) = {
        watermarkDataStream.map {
          msg =>
            val isNew = analysisNew(msg)
            var click_cnt = 0
            if (!msg.click_user_id.equalsIgnoreCase("null")) {
              click_cnt = 1
            }
            AdClickLogWide(
              msg.city,
              msg.ad_compaign,
              msg.ad_media,
              msg.ad_source,
              msg.corpuin,
              msg.device_type,
              msg.host,
              msg.t_id,
              msg.user_id,
              msg.click_user_id,
              msg.timeStamp,
              isNew,
              click_cnt
            )
        }
      }
    
      /*
      判断用户是否为新用户
       */
      def analysisNew(adlog: AdClickLog) = {
        //先把要拓宽的字段isNew、isHourNew、isDayNew、isMonthNew都创建出来,初始化为0
        var isNew = 0
        // 封装操作hbase需要的字段
        val tableName = "user_history"
        val rowkey = adlog.user_id
        val cfName = "info"
        val userIdColName = "userId" // 用户ID
        //从hbase查询rowkey为userid:channlid查询user_history中userid列的数据
        val userIdInHBase = HBaseUtil.getData(tableName, rowkey, cfName, userIdColName)
        //判断userid列数据是否为空
        if (StringUtils.isBlank(userIdInHBase)) {
          //如果为空
          //设置isNew字段为1,表示是新用户,
          isNew = 1
          //将该用户数据添加到user_history表中
          HBaseUtil.putMapData(tableName, rowkey, cfName, Map(
            userIdColName -> adlog.user_id
          ))
        }
        isNew
      }
      
    }

    三 :druid实时摄入数据开发

    1:编写kafkadruidinex摄入文件

    {
        "type": "kafka", 
        "dataSchema": {
            "dataSource": "adclicklog", 
            "parser": {
                "type": "string", 
                "parseSpec": {
                    "format": "json", 
                    "timestampSpec": {
                        "column": "timestamp", 
                        "format": "auto"
                    }, 
                    "dimensionsSpec": {
                        "dimensions": [ ], 
                        "dimensionExclusions": [
                            "timestamp", 
                            "is_new", 
                            "click_cnt"
                        ]
                    }
                }
            }, 
            "metricsSpec": [
                {
                    "name": "count", 
                      "type": "count"
                 },
                {
                    "name": "click_cnt", 
                    "fieldName": "click_cnt", 
                    "type": "longSum"
                }, 
                {
                    "name": "new_cnt", 
                    "fieldName": "is_new", 
                    "type": "longSum"
                }, 
                {
                    "name": "uv", 
                    "fieldName": "user_id", 
                    "type": "thetaSketch", 
                    "isInputThetaSketch": "false", 
                    "size": "16384"
                }, 
                {
                    "name": "click_uv", 
                    "fieldName": "click_user_id", 
                    "type": "thetaSketch", 
                    "isInputThetaSketch": "false", 
                    "size": "16384"
                }
            ], 
            "granularitySpec": {
                "type": "uniform", 
                "segmentGranularity": "DAY", 
                "queryGranularity": "NONE", 
                "rollup": false
            }
        }, 
        "tuningConfig": {
            "type": "kafka", 
            "reportParseExceptions": false
        }, 
        "ioConfig": {
            "topic": "ad_click_process", 
            "replicas": 1, 
            "taskDuration": "PT10M", 
            "completionTimeout": "PT20M", 
            "consumerProperties": {
                "bootstrap.servers": "hp101:9092,hp102:9092"
            }
        }
    }

    2:提交kafka索引任务

    curl -X POST -H 'Content-Type: application/json' -d @kafka-index-adclicklog.json http://hp101:8090/druid/indexer/v1/supervisor

    3.查询数据

    {
        "queryType":"timeseries",
        "dataSource":"adclicklog",
        "granularity":{"type": "period", "period": "PT1H", "timeZone": "Asia/Shanghai"},
        "aggregations":[
            {
                "type":"longSum",
                "name":"click",
                "fieldName":"click_cnt"
            },{
                "type":"longSum",
                "name":"pv",
                "fieldName":"count"
            }
        ],
        "intervals":["2019-06-01/2019-06-30"]
    }

     

    展开全文
  • Flink+JDBCInputFormat+Druid

    2020-07-22 19:16:35
    如果想作为一个web项目实时的访问某个数据库,这样的方式就会有点不太合适,但是也有更好的方式结合Druid使用,可以自行去CSDN上下载https://www.iteye.com/resource/cx361006796-10638225 以下是两种方式去访问...

    Flink是适合流计算和批处理,不管是哪种方式,主要的重点是对数据的处理计算。如果想作为一个web项目实时的访问某个数据库,这样的方式就会有点不太合适,但是也有更好的方式结合Druid使用,可以自行去CSDN上下载 https://www.iteye.com/resource/cx361006796-10638225

    以下是两种方式去访问关系型数据库,一、自定义的Source,二、重写JDBCInputFormat然后在里面重写一些方法实现数据库连接池的修改。

    Flink的自定义Source的方式实现连接关系型数据库

    package com.connect.hbase
    
    
    
    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    
    class MySourceOracle extends RichSourceFunction[Person]{
      var ps:PreparedStatement  = null
      var connection:Connection =null
      val driver = "oracle.jdbc.driver.OracleDriver"
      val url = "jdbc:oracle:thin:@192.168.10.10:1521:orcl"
      val username = "test"
      val password = "test"
    
      override def open(parameters:Configuration ) {
    
        super.open(parameters)
        // 获取连接
        connection = getConn
    
      }
    
      def getConn = {
        try {
    //      connection = dataSource.getConnection
          Class.forName(driver)
          connection = DriverManager.getConnection(url, username, password)
        } catch {
          case e: Exception =>
            e.printStackTrace()
        }
        connection
      }
    
      def run(ctx: SourceContext[Person]): Unit = {
        //执行查询
        ps = connection.prepareStatement("select YEAR_DESC AS name,DAY_ID AS age from TP_GL_DAY where DAY_ID = '20141007'")
        val resultSet:ResultSet  = ps.executeQuery
        while (resultSet.next) {
          val p = new Person(resultSet.getString("name"),resultSet.getString("age"))
          println("*****************year=>"+p.name+"*********age=>"+p.age)
          // 将数据发送出去
    //      ctx.collect(p)
        }
      }
    
      override def cancel(): Unit = {}
    
      @throws[Exception]
      override def close(): Unit = {
        super.close
        if (connection != null) connection.close
        if (ps != null) ps.close
      }
    
    }
    
    //测试类
    object TestSource {
      def main(args: Array[String]): Unit = {
        // 调用程序// 调用程序
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 添加自定义数据源
        val data = env.addSource(new MySourceOracle)
        data.print.setParallelism(1)
        // 提交执行任务
        val result = env.execute("TestSource")
        System.out.println(result.getJobID)
    
    
      }
    
    }
    
    class Person{
      var name = "lisi"
      var age="12"
      def this(name:String,age:String){
        this()
        this.name=name
        this.age=age
    
      }
    }

     Flink使用 JDBCInputFormat 的方式连接数据库

     var jdbc = JDBCInputFormat1.buildJDBCInputFormat()
          // 数据库连接驱动名称
          .setDrivername(parameter.get("oracleDriverClass").get.toString)
          // 数据库连接地址
          .setDBUrl(parameter.get("oracleDriverUrl").get.toString)
          // 数据库连接用户名
          .setUsername(parameter.get("oracleUsername").get.toString)
          // 数据库连接密码
          .setPassword(parameter.get("oraclePassword").get.toString)
          // 数据库连接查询SQL
          .setQuery(sql)
          //加入sql中的参数
          .setParametersProvider(paramProvider)
          // 字段类型,顺序个个数必须与SQL保持一致
          .setRowTypeInfo(rowTypeInfo)
          .finish()
    
        val inputOracle: DataSet[Row] = env.createInput(jdbc)

     JDBCInputFormat重构,增加Druid的连接池

    package com.connect.oracle;
    
    import com.alibaba.druid.pool.DruidDataSource;
    import org.apache.flink.annotation.VisibleForTesting;
    import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
    import org.apache.flink.api.common.io.RichInputFormat;
    import org.apache.flink.api.common.io.statistics.BaseStatistics;
    import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
    import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.core.io.GenericInputSplit;
    import org.apache.flink.core.io.InputSplit;
    import org.apache.flink.core.io.InputSplitAssigner;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Preconditions;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.math.BigDecimal;
    import java.sql.*;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class JDBCInputDataSource extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> {
        private static JDBCInputDataSource instance = null;
        synchronized public static JDBCInputDataSource getInstance() {
            if (instance == null) {
                instance = new JDBCInputDataSource();
            }
            return instance;
        }
    
        private static final long serialVersionUID = 1L;
        private static final Logger LOG = LoggerFactory.getLogger(JDBCInputDataSource.class);
    
        private String username;
        private String password;
        private String drivername;
        private String dbURL;
        private String queryTemplate;
        private int resultSetType;
        private int resultSetConcurrency;
        private RowTypeInfo rowTypeInfo;
    
        private transient Connection dbConn;
        private transient PreparedStatement statement;
        private transient ResultSet resultSet;
        private int fetchSize;
    
        private boolean hasNext;
        private Object[][] parameterValues;
        private static DruidDataSource dataSource;
    
        public JDBCInputDataSource() {
        }
    
        @Override
        public RowTypeInfo getProducedType() {
            return rowTypeInfo;
        }
    
        @Override
        public void configure(Configuration parameters) {
            //do nothing here
        }
        //Druid连接池的加入
        public DruidDataSource getDataSource(){
            dataSource = new DruidDataSource();
    //        dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver");
    //        dataSource.setUrl("jdbc:oracle:thin:@192.168.100.130:1521/orcl");
    //        dataSource.setUsername("NEWRISK");
    //        dataSource.setPassword("NEWRISK");
            dataSource.setDriverClassName(drivername);
            dataSource.setUrl(dbURL);
            dataSource.setUsername(username);
            dataSource.setPassword(password);
            dataSource.setInitialSize(10);   //初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时
            dataSource.setMinIdle(10);  //最小连接池数量
            dataSource.setMaxActive(100);  //最大连接池数量
            dataSource.setMaxWait(1000 * 20); //获取连接时最大等待时间,单位毫秒。配置了maxWait之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置useUnfairLock属性为true使用非公平锁。
            dataSource.setTimeBetweenEvictionRunsMillis(1000 * 60);  //有两个含义:1) Destroy线程会检测连接的间隔时间2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
            dataSource.setMaxEvictableIdleTimeMillis(1000 * 60 * 60 * 10);  //<!-- 配置一个连接在池中最大生存的时间,单位是毫秒 -->
            dataSource.setMinEvictableIdleTimeMillis(1000 * 60 * 60 * 9);  //<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
            dataSource.setTestWhileIdle(true);  // <!-- 这里建议配置为TRUE,防止取到的连接不可用 -->
            dataSource.setTestOnBorrow(true);
            dataSource.setTestOnReturn(false);
            Properties prop = new Properties();
            return dataSource;
        }
    
    
        @Override
        public void openInputFormat() {
            //called once per inputFormat (on open)
            try {
                Class.forName(drivername);
                if(dataSource==null){
                    getDataSource();
                }
                dbConn=dataSource.getConnection();
                statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
                if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
                    statement.setFetchSize(fetchSize);
                }
            } catch (SQLException se) {
                throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
            } catch (ClassNotFoundException cnfe) {
                throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
            }
        }
    
        @Override
        public void closeInputFormat() {
            //called once per inputFormat (on close)
            try {
                if (statement != null) {
                    statement.close();
                }
            } catch (SQLException se) {
                LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
            } finally {
                statement = null;
            }
    
            try {
                if (dbConn != null) {
                    dbConn.close();
                }
            } catch (SQLException se) {
                LOG.info("Inputformat couldn't be closed - " + se.getMessage());
            } finally {
                dbConn = null;
            }
    
            parameterValues = null;
        }
    
        @Override
        public void open(InputSplit inputSplit) throws IOException {
            try {
                if (inputSplit != null && parameterValues != null) {
                    for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
                        Object param = parameterValues[inputSplit.getSplitNumber()][i];
                        if (param instanceof String) {
                            statement.setString(i + 1, (String) param);
                        } else if (param instanceof Long) {
                            statement.setLong(i + 1, (Long) param);
                        } else if (param instanceof Integer) {
                            statement.setInt(i + 1, (Integer) param);
                        } else if (param instanceof Double) {
                            statement.setDouble(i + 1, (Double) param);
                        } else if (param instanceof Boolean) {
                            statement.setBoolean(i + 1, (Boolean) param);
                        } else if (param instanceof Float) {
                            statement.setFloat(i + 1, (Float) param);
                        } else if (param instanceof BigDecimal) {
                            statement.setBigDecimal(i + 1, (BigDecimal) param);
                        } else if (param instanceof Byte) {
                            statement.setByte(i + 1, (Byte) param);
                        } else if (param instanceof Short) {
                            statement.setShort(i + 1, (Short) param);
                        } else if (param instanceof Date) {
                            statement.setDate(i + 1, (Date) param);
                        } else if (param instanceof Time) {
                            statement.setTime(i + 1, (Time) param);
                        } else if (param instanceof Timestamp) {
                            statement.setTimestamp(i + 1, (Timestamp) param);
                        } else if (param instanceof Array) {
                            statement.setArray(i + 1, (Array) param);
                        } else {
                            //extends with other types if needed
                            throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
                        }
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
                    }
                }
                resultSet = statement.executeQuery();
                hasNext = resultSet.next();
            } catch (SQLException se) {
                throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
            }
        }
    
    
        @Override
        public void close() throws IOException {
            if (resultSet == null) {
                return;
            }
            try {
                resultSet.close();
            } catch (SQLException se) {
                LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
            }
        }
    
        @Override
        public boolean reachedEnd() throws IOException {
            return !hasNext;
        }
    
        @Override
        public Row nextRecord(Row row) throws IOException {
            try {
                if (!hasNext) {
                    return null;
                }
                for (int pos = 0; pos < row.getArity(); pos++) {
                    row.setField(pos, resultSet.getObject(pos + 1));
                }
                hasNext = resultSet.next();
                return row;
            } catch (SQLException se) {
                throw new IOException("Couldn't read data - " + se.getMessage(), se);
            } catch (NullPointerException npe) {
                throw new IOException("Couldn't access resultSet", npe);
            }
        }
    
        @Override
        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
            return cachedStatistics;
        }
    
        @Override
        public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
            if (parameterValues == null) {
                return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
            }
            GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
            for (int i = 0; i < ret.length; i++) {
                ret[i] = new GenericInputSplit(i, ret.length);
            }
            return ret;
        }
    
        @Override
        public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
            return new DefaultInputSplitAssigner(inputSplits);
        }
    
        @VisibleForTesting
        PreparedStatement getStatement() {
            return statement;
        }
    
        public static JDBCInputFormatBuilder buildJDBCInputFormat() {
            return new JDBCInputFormatBuilder();
        }
    
        public static class JDBCInputFormatBuilder {
            private final JDBCInputDataSource format;
    
            public JDBCInputFormatBuilder() {
                this.format = getInstance();
                this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
                this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
            }
    
            public JDBCInputFormatBuilder setUsername(String username) {
                format.username = username;
                return this;
            }
    
            public JDBCInputFormatBuilder setPassword(String password) {
                format.password = password;
                return this;
            }
    
            public JDBCInputFormatBuilder setDrivername(String drivername) {
                format.drivername = drivername;
                return this;
            }
    
            public JDBCInputFormatBuilder setDBUrl(String dbURL) {
                format.dbURL = dbURL;
                return this;
            }
    
            public JDBCInputFormatBuilder setQuery(String query) {
                format.queryTemplate = query;
                return this;
            }
    
            public JDBCInputFormatBuilder setResultSetType(int resultSetType) {
                format.resultSetType = resultSetType;
                return this;
            }
    
            public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) {
                format.resultSetConcurrency = resultSetConcurrency;
                return this;
            }
    
            public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
                format.parameterValues = parameterValuesProvider.getParameterValues();
                return this;
            }
    
            public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
                format.rowTypeInfo = rowTypeInfo;
                return this;
            }
    
            public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
                Preconditions.checkArgument(fetchSize == Integer.MIN_VALUE || fetchSize > 0,
                        "Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.", fetchSize);
                format.fetchSize = fetchSize;
                return this;
            }
    
            public JDBCInputDataSource finish() {
                if (format.username == null) {
                    LOG.info("Username was not supplied separately.");
                }
                if (format.password == null) {
                    LOG.info("Password was not supplied separately.");
                }
                if (format.dbURL == null) {
                    throw new IllegalArgumentException("No database URL supplied");
                }
                if (format.queryTemplate == null) {
                    throw new IllegalArgumentException("No query supplied");
                }
                if (format.drivername == null) {
                    throw new IllegalArgumentException("No driver supplied");
                }
                if (format.rowTypeInfo == null) {
                    throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
                }
                if (format.parameterValues == null) {
                    LOG.debug("No input splitting configured (data will be read with parallelism 1).");
                }
                return format;
            }
    
        }
    
    }
    

     

    展开全文
  • 大数据实时计算系统实践Flink+Druid配套PPT
  • flink配合druid连接池的使用

    千次阅读 2020-05-28 16:14:49
    博主之前在flink集成连接池的使用,一直存在一个误区,就是为了减少程序对资源的创建,所有加载资源或者获取连接都放在了flink的open方法中,虽然这样可以减少资源的使用,也可以满足大多数场景,但是如:mysql的...

    博主之前在flink集成连接池的使用,一直存在一个误区,就是为了减少程序对资源的创建,所有加载资源或者获取连接都放在了flink的open方法中,虽然这样可以减少资源的使用,也可以满足大多数场景,但是如:mysql的connection长时间不用,该connection 会被mysql数据库本身给回收。

    先简单介绍一下Druid:
    DRUID是阿里巴巴开源平台上一个数据库连接池实现,它结合了C3P0、DBCP、PROXOOL等DB池的优点,同时加入了日志监控,可以很好的监控DB池连接和SQL的执行情况,可以说是针对监控而生的DB连接池(据说是目前最好的连接池,不知道速度有没有BoneCP快)。

    配置参数
    和其它连接池一样DRUID的DataSource类为:com.alibaba.druid.pool.DruidDataSource,基本配置参数如下:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    本人的使用:

     @Override
        public void open(Configuration parameters) throws Exception {
            dataSource = new DruidDataSource();
            dataSource.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource.setUrl("****");
            dataSource.setUsername("***");
            dataSource.setPassword("****");
            dataSource.setInitialSize(10);   //初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时
            dataSource.setMinIdle(10);  //最小连接池数量
            dataSource.setMaxActive(20);  //最大连接池数量
            dataSource.setMaxWait(1000 * 20); //获取连接时最大等待时间,单位毫秒。配置了maxWait之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置useUnfairLock属性为true使用非公平锁。
            dataSource.setTimeBetweenEvictionRunsMillis(1000 * 60);  //有两个含义:1) Destroy线程会检测连接的间隔时间2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
            dataSource.setMaxEvictableIdleTimeMillis(1000 * 60 * 60 * 10);  //<!-- 配置一个连接在池中最大生存的时间,单位是毫秒 -->
            dataSource.setMinEvictableIdleTimeMillis(1000 * 60 * 60 * 9);  //<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
            dataSource.setTestWhileIdle(true);  // <!-- 这里建议配置为TRUE,防止取到的连接不可用 -->
            dataSource.setTestOnBorrow(true);
            dataSource.setTestOnReturn(false);
            dataSource.setValidationQuery("select 1");
        }
    

    在flink的open方法创建连接池,在使用的时候通过getConnection获取连接,使用完之后调用.close方法,将连接归还给连接池。

    展开全文
  • 主要涉及到几个组件,kafka,flink,redis,druid和es。相信大家对以上几个组件都比较熟悉了,这里就不细说了。我们从一个简单的需求,来说明各个组件是怎么协作的。 假如现在我们有一个电商平台,每天访问的流量...

    最近在从事实时方面的工作,主要涉及到数据处理、加工及可视化,在采坑的过程中总结出一套比较简单的实时计算方案,供大家参考。主要涉及到几个组件,kafka,flink,redis,druid和es。相信大家对以上几个组件都比较熟悉了,这里就不细说了。我们从一个简单的需求,来说明各个组件是怎么协作的。

         假如现在我们有一个电商平台,每天访问的流量巨大,主要访问流量都集中在衣服类、家电类页面,那么我们想实时看到这两类页面的访问量走势(十分钟出一个统计量),当做平台的重要指标,可视化的数据如下。

    时间段     页面类型     访问量
    07:00am     衣服类    100000
    07:00am     家电类    60000
    07:10am     衣服类    80000
    07:10am     家电类    70000
    数据采集    
    为了计算访问量,前提就是要进行数据采集,假设平台在每一次用户访问的时候,我们可以获取到信息包括:手机号,访问页面类型,访问时间。数据采集的方式有很多,一般我采用埋点的方式,将【手机号,访问页面类型,访问时间】,形成一条埋点计入日志,然后采用flume或其他组件,将埋点日志收集进入kafka(根据访问量,设置partition数量)

    数据处理
    数据处理是最关键的阶段,这里选用flink处理kafka的流,该过程包括过滤、格式转换、分组等等、去重、附加字段、聚合等。针对我们的需求,我们可以分为几个阶段:

    1.数据过滤
    数据过滤,就是为了过滤掉非法数据,针对我们的需求,比如过滤掉手机号为空的记录

    2.数据分组
    数据分组是一个比较重要的阶段,这涉及到我们数据统计的方式,在分组的时候我们一般按照数据最低维度来分组,增加数据灵活性,如果我们这里先按照页面类型分组,分组的结果就是我相同页面类型的数据会在同一个分组。

    3.数据window
    window选择
    数据window是实时处理中比较重要的特点,因为我们需要看到数据的统计结果,所以必须先给数据流划分批次,然后对批中的数据做聚合,flink的window比较丰富,包括time window,thumb window等等。在我们的需求中要看到10分钟内的访问量,所以这里选用time window,为了应对灵活多变的需求,我们需要选择合适的窗口时长,比如现在的需求要看到10分钟内的访问量,如果想要看到每分钟的访问量该怎么办呢,所以一般我们窗口时长会选择最小的粒度,这里我们选择1分钟的窗口时长。

    time 类型
    选用time window时需要注意,我们的时间标准,有两个概念需要注意,一个是event time指时间发生时间,另一个是process time指消息处理时间,这两个时间是有差别的,比如用户在7:01:23的时候访问了平台,但是埋点经过flume,kafka再到flink延迟至7:01:45的时候才到,那么event time是7:01:23,process time是7:01:45,我们如果想准确统计访问量,就需要选用event time,值得注意的是,如果以event time为时间标准的话,需要kafka中的消息,带有时间戳。

    4.数据聚合
    当分组和window都设定好以后,就可以对数据做聚合了,比如分组之后的数据,我们直接可以做reduce,或count,sum,max,min。这里我们做reduce,对记录做count。需要注意的是,如果需求变了,需要对手机号做去重,那么在去重的时候还要加入去重的逻辑,去重如果量小的话可以再flink中做,如果量大的话,可以依赖redis等中间kv存储,做去重。

    数据落地
    数据聚合完之后,就需要将数据落地,这是可以选择直接落入存储,或发送到下游topic用来进行更加复杂的计算,我一般为了灵活扩展会将数据sink到下游topic,然后由kafka直接接入druid或es。在落入druid的时候需要注意,因为druid特有的预聚合方式,你要指定维度,指标,聚合时间戳字段以及时间段长度,所以聚合结果中需要带上,event time的时间戳,同时决定预聚合时长。回到需要:10分钟统计一次,因此预聚合时长可以在1~10分钟内任意选择。需要说明的是,数据不需要做特殊加工(比如不需要去重、不需要关联、数据量没那么大)的时候可以跳过flink阶段,直接落入druid中,因为druid本身就带有多种预聚合功能。

    数据校验
    本需求可能比较简单,但是在实际需求可能复杂的多,为了确保数据的正确性,需要把明细数据备份下来,方便数据校验,一般备份的数据不需要实时性的时候,可以将数据落入hive中,而需要实时校对的时候,可以将数据落入es中。

          在实际场景中,数据多种多样,该方案仅使用于特定场景,如有不足之处欢迎大家,评论指正!!!
     

    展开全文
  • 点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源实时方案之数据湖探究调研笔记生产上的坑才是真的坑 | 盘一盘Flink那些经典线上问题FileSystem/JDBC/Kafka ...
  • 使用Flink+Druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,Flink现在如日中天,各大厂都在使用,Druid是OLAP的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下 这篇文章 ,我的博客其它...
  • 主要涉及到几个组件,kafka,flink,redis,druid和es。相信大家对以上几个组件都比较熟悉了,这里就不细说了。我们从一个简单的需求,来说明各个组件是怎么协作的。 假如现在我们有一个电商平台,每天访问的流量...
  • 在以前的集群的搭建过程种发现flink 对hadoop 3.0支持的版本不好,经过调研cdh6 已经提供了对flink编译安装的支持 参考:https://blog.csdn.net/kwame211/article/details/104690955 注意:druid在调研过程中没有网上...
  • AbutionGraph是众多国产数据库中新兴的一员,且是唯一一款GraphHOLAP的实时知识图谱数据仓库,将在本文中介绍Abution如何结合Flink(无缝对接Flink-1.12.1)构建增强的流批一体实时数仓。 Abution结合Flink和...
  • Flink 在实时数据数仓建设中结合 Druid、Hive 的应用场景 实时任务监控 Streaming SQL 平台化 Streaming Job 管理 未来规划优化方向 建设背景 Shopee 是东南亚与台湾领航电商平台,覆盖新加坡、马来西亚、菲律宾、...
  • flink入门资料

    2019-10-12 21:07:49
    flink基本概念与部署,DataStreamAPI,Druid数据存储,Druid基本概念。Flink应用案例教程,Flink状态管理与恢复及容错设计与指标设计。
  • Flink学习资料

    2018-10-09 10:41:26
    附件:Flink-on-yarn部署指南 0、Flink基本原理与生产实践 1、Flink基本概念与部署 2、DataStreamAPI介绍与实战 3、Window与Time 4、Connector 5、Flink状态管理与恢复 6、Metrics与监控 7、Flink应用案例介绍 8、...
  • Druid 笔记一

    2021-01-24 16:40:37
    1Apache Druid简介及架构 1.1 Druid概述 ... 将数据源进行流式处理,对接流式计算框架(如Storm、Spark、Flink),结果保存到 RDBMS或NoSQL中 将数据源进行流式处理,对接分析数据库,例如:Dr...
  • Druid.io数据仓库实践

    2021-01-16 21:22:07
    flink维表join实践-druid.io前言一、flink读取druid.io数据流插件二、flinkdruid.io lookup功能依赖于JdbcTableSourceflink写入druid.io数据插件总结 前言 Druid.io做为目前正在使用的OLAP分析数据库,已在公司大...
  • 点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源数据湖在大数据典型场景下应用调研个人笔记基于FlinkDruid的实时多维分析系统在蔚来汽车的应用生产上的坑才是真的坑 | 盘一...
  • 接下来就得讲讲咱们阿里云 E-MapReduce (EMR) ,作为构建在阿里云云服务器 ECS 上的开源 Hadoop、Spark、HBase、Hive、Flink 生态大数据 PaaS 产品,它集成的 Druid 有什么优势呢,我们一起来看看 ( •̀ ω •́ )...
  • 2、Flink 在实时数据数仓建设中结合 Druid、Hive 有哪些应用场景? 3、实时任务监控如何实现? 4、Streaming SQL 如何平台化? 导读:本文讲述 Flink 在 Shopee 新加坡数据组 ( Shopee Singapore Data ...
  • MartinHub的学习笔记 关于我 MartinHub :smiling_face_with_sunglasses: ,,热爱生活!热爱技术! 微信公众号【MartinHub】 个人微信号【MartinHub】 项目介绍 大数据 Java ... :black_nib:本仓库有来源自己总结,网上...
  • 快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者...
  • 其中包括通用计算引擎(如Spark和Flink),交互式分析系统(如Presto,Druid和ClickHouse),数据湖框架(如Iceberg,Hudi和Delta Lake),和底层存储(如Ozone)。 Debezium 概念:Debezium是一个开源的分布式平台...
  • 一、Druid 简介 1.1 大数据分析平台架构分类 数据分析的基础架构可以分为以下几类: 使用Hadoop/Spark 进行分析 将Hadoop/Spark 的结果导入 RDBMS 中提供数据分析;补充说明:RDBMS一般指关系数据库管理系统 ...
  • Flink 在实时数据数仓建设中结合 Druid、Hive 的应用场景 实时任务监控 Streaming SQL 平台化 Streaming Job 管理 未来规划优化方向 建设背景 Shopee 是东南亚与台湾领航电商平台,...
  • 文章目录一、MySql Sink二、控制事务代码1、主线代码2、Druid 数据库连接池类 一、MySql Sink 要想使用TwoPhaseCommitSinkFunction,存储系统必须支持事务 Mysql Sink继承TwoPhaseCommitSinkFunction抽象类,分两个...
  • 目前的自主分析是使用的开源产品Superset做一部分的改造,接入Druid,ES,Impala,分析师们已经全部转到我们的平台,大部分的使用都是基于我们数仓的DWS,但是除此之外实时数据没有完全接入,这是目前的痛点,也是最...
  • 基于Druid实现动态切换数据源(场景:数据源经常变化) 报表数据可视化Demo SpringBoot整合集 SpringMVC Demo 流计算 Flink Streaming Flink基础教程介绍 Flink ParameterTool fromArgs源码分析 Flink程序的基本结构 ...
  • Redis分布式锁实现数据写入幂等性

    千次阅读 2018-12-19 17:52:17
    Flink时间窗口计算将计算结果写入Druid.io的拓补图结构如下: Flink从kafka消费数据,进行分别进行计量和及时的计算,最终将计算结果写入到kafka,最终druid.io使用kafka-inex-service从kafka中拉取数据,将结果写入到...
  • druid (imply-2.7.10) Azkaban-2.5.0 开始安装 Flink 一、解压缩 flink-1.7.2-bin-hadoop27-scala_2.11.tgz,进入conf目录中。 二、修改配置 1)修改 flink/conf/flink-conf.yaml 文件,里面开头位置有一个...

空空如也

空空如也

1 2 3 4 5
收藏数 81
精华内容 32
关键字:

druidflink