精华内容
下载资源
问答
  • Flink如何做关联

    2021-01-17 19:58:53
    在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如:在订单数据中,希望能得到订单收货人所在省的名称,一般来说订单中会记录一个省的ID,那么需要根据ID...

    声明:本系列博客为原创,最先发表在拉勾教育,其中一部分为免费阅读部分。被读者各种搬运至各大网站。所有其他的来源均为抄袭。

    《2021年最新版大数据面试题全面开启更新》

     

         在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如:在订单数据中,希望能得到订单收货人所在省的名称,一般来说订单中会记录一个省的ID,那么需要根据ID去查询外部的维度表补充省名称属性。

         在Flink流式计算中,一些维度表属性一般存储在MySQL/HBase/Redis中,这些维度表数据存在定时更新,我们根据业务进行关联。根据业务对维表数据关联的时效性要求,有一下几种解决方案:

    • 实时查询维表
    • 预加载全量数据
    • LRU缓存
    • 其他

    实时查询维表

         实时查询维表是指用户在Flink算子中直接访问外部数据库,比如用MySQL来进行关联。这种方式是同步方式,数据保证是最新的。但是,当流式计算数据过大,会对外部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于使用的同步调研,所以一般会导致现场阻塞、Task等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的QPS要求较高,在大数据实时计算场景下,QPS远远高于普通的后台系统,整体作业瓶颈转移到外部系统。
         这种方式的核心是可以在Flink的Map算子中建立访问外部系统的连接。下面以订单数据为例,根据下单用户的城市ID,去关联城市名称,核心代码如下:

    public class Order {
    
        private Integer cityId;
        private String userName;
        private String items;
        public Integer getCityId() {
            return cityId;
        }
        public void setCityId(Integer cityId) {
            this.cityId = cityId;
        }
        public String getUserName() {
            return userName;
        }
        public void setUserName(String userName) {
            this.userName = userName;
        }
        public String getItems() {
            return items;
        }
        public void setItems(String items) {
            this.items = items;
        }
        @Override
        public String toString() {
            return "Order{" +
                    "cityId=" + cityId +
                    ", userName='" + userName + '\'' +
                    ", items='" + items + '\'' +
                    '}';
        }
    }
    
    public class DimSync extends RichMapFunction<String,Order> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
        private Connection conn = null;
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
        }
    
        public Order map(String in) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(in);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
    
            //根据city_id 查询 city_name
            PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
            pst.setInt(1,cityId);
            ResultSet resultSet = pst.executeQuery();
            String cityName = null;
            while (resultSet.next()){
                cityName = resultSet.getString(1);
            }
            pst.close();
            return new Order(cityId,userName,items,cityName);
        }
    
        public void close() throws Exception {
            super.close();
            conn.close();
        }
    }
    

         在上面这段代码中,RichMapFunction中封装了整个查询维表,然后进行关联这个过程。需要注意的是,一般我们在查询小数据量的维表情况下才使用这种方式,并且要妥善处理连接外部系统的线程,一般还会用到线程池。最后为了保证连接及时关闭和释放,一定要在最后的close方式释放连接,否则会将MySQL的连接数打满导致任务失败。

    预加载全量数据

         预加载全量数据是为了解决每条数据流经数据系统都会对外部系统发起访问,以及对外部系统频繁访问而导致的连接和性能问题。这种思路是,每当系统启动时,就会维度表数据全部加载到内存中,然后数据在内存中进行关联,不需要直接访问外部数据库。
         这种方式的优势是只需要一次性地访问外部数据库,大大提高了效率。但问题在于,一旦我们维表数据发送更新,那么Flink任务是无法感知的,可能会出现维表数据不一致,针对这种情况可以采取定时拉取维表数据。并且这种方式由于是将维表数据缓存在内存中,对计算节点的内存消耗很高,所以不能适用于数量很大的维度表。

         还是上面的例子,根据下单用户的城市ID去关联城市名称,核心代码实现如下:

    public class WholeLoad extends RichMapFunction<String,Order> {
        private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
        ScheduledExecutorService executor = null;
        private Map<String,String> cache;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            executor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        load();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },5,5, TimeUnit.MINUTES);
        }
    
        @Override
        public Order map(String value) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(value);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            String cityName = cache.get(cityId);
            return new Order(cityId,userName,items,cityName);
    
        }
    
        public void load() throws Exception {
            Class.forName("com.mysql.jdbc.Driver");
            Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
            PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
            ResultSet rs = statement.executeQuery();
            while (rs.next()) {
                String cityId = rs.getString("city_id");
                String cityName = rs.getString("city_name");
                cache.put(cityId, cityName);
            }
            con.close();
        }
    }
    

         在上面例子中,使用ScheduledExecutorService每隔5分组拉去一次维表数据。这种方式适用于那么实时场景不是很高,维表数据较小的场景。

    LRU缓存

         LRU是一种缓存算法,意思是最近最少使用的数据则被淘汰。在这种策略中,维表数据天然被分为冷数据和热数据,所谓冷数据指的是那些不经常使用的数据,热数据是那些查询频率高的数据。
         对应到上面的场景,根据城市ID关联城市名称,北京、上海这些城市的订单远远高于偏远地区的一些城市,那么北京、上海就是热数据,偏远城市就是冷数据,这种方式存在一定的数据延迟,并且需要额外设置每天数据的失效时间。因为热点数据经常被使用,会常驻缓存中,一旦维表发生变更是无法感知变化的。这里使用Guava库提供的CacheBuilder来创建我们的缓存:

    CacheBuilder.newBuilder()
            //最多存储10000条
            .maximumSize(10000)
            //过期时间为1分钟
            .expireAfterWrite(60, TimeUnit.SECONDS)
            .build();
    

         整体的实现思路是:利用Flink的RichAsyncFunction读取Hbase的数据到缓存中,在关联维表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询HBase,然后插入到缓存中。
         首先需要一个Hbase的异步客户端:

    <dependency>
        <groupId>org.hbase</groupId>
        <artifactId>asynchbase</artifactId>
        <version>1.8.2</version>
    </dependency>
    

         核心代码实现如下:

    public class LRU extends RichAsyncFunction<String,Order> {
        private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
        String table = "info";
        Cache<String, String> cache = null;
        private HBaseClient client = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //创建hbase客户端
            client = new HBaseClient("127.0.0.1","7071");
            cache = CacheBuilder.newBuilder()
                    //最多存储10000条
                    .maximumSize(10000)
                    //过期时间为1分钟
                    .expireAfterWrite(60, TimeUnit.SECONDS)
                    .build();
        }
    
        @Override
        public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
            JSONObject jsonObject = JSONObject.parseObject(input);
            Integer cityId = jsonObject.getInteger("city_id");
            String userName = jsonObject.getString("user_name");
            String items = jsonObject.getString("items");
            //读缓存
            String cacheCityName = cache.getIfPresent(cityId);
            //如果缓存获取失败再从hbase获取维度数据
            if(cacheCityName != null){
                Order order = new Order();
                order.setCityId(cityId);
                order.setItems(items);
                order.setUserName(userName);
                order.setCityName(cacheCityName);
                resultFuture.complete(Collections.singleton(order));
            }else {
                client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                    for (KeyValue kv : arg) {
                        String value = new String(kv.value());
                        Order order = new Order();
                        order.setCityId(cityId);
                        order.setItems(items);
                        order.setUserName(userName);
                        order.setCityName(value);
                        resultFuture.complete(Collections.singleton(order));
                        cache.put(String.valueOf(cityId), value);
                    }
                    return null;
                });
            }
        }
    }
    

         这里需要注意的是用到了异步IO(RichAsyncFunction),这个功能的出现就是为了解决与外部系统交互时网络延迟成为系统瓶颈的问题。
         在流计算环境中,在查询外部维表时,加入访问是同步进行的,那么整体能力势必受限于外部系统。正是因为异步IO的出现使得访问外部系统可以并发的进行,并且不需要同步等待返回,大大减轻了因为网络等待时间等引起的系统吞吐和延迟问题。
         在使用异步IO时,一定要使用异步客户端,如果没有异步客户端可以自己创建线程池模拟异步请求。

    其他

         除了上述常见的处理方式,还可以通过将维表消息广播出去,或者自定义异步线程池访问维表,甚至可以自己扩展Flink SQL中关联维表的方式直接使用SQL Join的方法关联查询结果。

         我们应该从业务本身出发,不同业务场景下使用不同的方式。

    展开全文
  • 需求描述:二维关联数组转字符串,并且能过滤重复的关键字。下面分享一个具体的例子,供朋友们参考。1,php的二数组:复制代码 代码示例:$name = array("self" => "jbxue","student" => array("chenshan",...

    本节内容:

    php编程中,将二维关联数组为字符串的就去。

    需求描述:

    二维关联数组转字符串,并且能过滤重复的关键字。

    下面分享一个具体的例子,供朋友们参考。

    1,php的二维数组:

    复制代码 代码示例:

    $name = array(

    "self" => "jbxue",

    "student" => array(

    "chenshan",

    "xiaolingang"

    ),

    "unkmow" => "chaikun",

    "teacher" => array(

    "huangwei",

    "fanwenqing"

    )

    );

    最后要获得的string格式为:

    jbxue,chenshan,xiaolingang,chaikun,huangwei,fanwenqing

    2,思路分析,与实现方法。

    方法

    利用static关键字和递归的思想去遍历数组。

    复制代码 代码示例:

    function arrToStr ($array)

    {

    // 定义存储所有字符串的数组

    static $r_arr = array();

    if (is_array($array)) {

    foreach ($array as $key => $value) {

    if (is_array($value)) {

    // 递归遍历

    arrToStr($value);

    } else {  // www.jbxue.com

    $r_arr[] = $value;

    }

    }

    } else if (is_string($array)) {

    $r_arr[] = $array;

    }

    //数组去重

    $r_arr = array_unique($r_arr);

    $string = implode(",", $r_arr);

    return $string;

    }

    效果,如下图:

    1405435033_5239.png

    展开全文
  • 本节内容:php 判断数组维数例子:代码示例:/*** 返回数组的维度* @param [type] $arr [description]* @return [type] [description]* @site www.*/function arrayLevel($arr){$al = array(0);function aL($arr,&...

    本节内容:

    php 判断数组维数

    例子:

    代码示例:

    /**

    * 返回数组的维度

    * @param  [type] $arr [description]

    * @return [type]      [description]

    * @site www.

    */

    function arrayLevel($arr){

    $al = array(0);

    function aL($arr,&$al,$level=0){

    if(is_array($arr)){

    $level++;

    $al[] = $level;

    foreach($arr as $v){

    aL($v,$al,$level);

    }

    }

    }

    aL($arr,$al);

    return max($al);

    }

    ?>

    您可能感兴趣的文章:

    有关PHP数组的使用技巧

    PHP去除数组中空值的例子

    php 二维数组排序的两个例子

    php 二维数组转树状数组的实例代码

    php 数组排序的实例代码

    php数组入门教程之从数组尾删除元素

    php数组入门教程之从数组头删除元素值

    php数组入门教程之数组尾添加元素

    php数组入门教程之数组头添加元素

    php数组入门教程之array_keys()函数

    php数组入门教程之array_search()函数

    php数组入门教程之in_array()函数

    php数组入门教程之array_key_exists()函数

    php数组入门教程之获取当前数组键和值

    php数组入门教程之array_values()函数

    php数组入门教程之获取当前数组键

    php数组入门教程之获取当前数组值

    php数组入门教程之关联数组的交集

    php数组入门教程之求数组差集

    php数组入门教程之求关联数组的差集

    php数组实例之数组排序

    php数组入门教程之数组填充

    php数组入门教程之数组遍历

    php数组入门教程之数组的指针操作

    php数组入门教程之数组定义与创建数组

    展开全文
  • 数组是将一个或多个相似类型的值存储在单个值中的数据结构。...关联数组- 具有字符string作为索引的数组。这将元素值与键值相关联而不是严格的线性索引顺序存储。多维数组- 使用多个索引访问包含一个或多...

    数组是将一个或多个相似类型的值存储在单个值中的数据结构。例如,如果要存储100个数字,而不是定义100个变量,它很容易定义100个数组。

    有三种不同类型的数组,并且使用称为数组索引的ID c访问每个数组值。

    数字数组- 具有数字索引的数组。值以线性方式存储和访问。

    关联数组- 具有字符string作为索引的数组。这将元素值与键值相关联而不是严格的线性索引顺序存储。

    多维数组- 使用多个索引访问包含一个或多个数组和值的数组

    注- 内置数组函数在函数参考PHP Array函数中给出

    数字数组

    这些数组可以存储数字,字符string和任何对象,但它们的索引将由数字表示。默认情况下,数组索引从零开始。

    以下是显示如何创建和访问数字数组的示例。

    这里我们使用array()函数来创建数组。该功能在功能参考中说明。

    <?php /* First method to create array. */$numbers=array(1,2,3,4,5);foreach($numbersas$value){echo"Value is $value
    ";}/* Second method to create array. */$numbers[0]="one";$numbers[1]="two";$numbers[2]="three";$numbers[3]="four";$numbers[4]="five";foreach($numbersas$value){echo"Value is $value
    ";}?>

    输出结果如下 -

    Value is 1

    Value is 2

    Value is 3

    Value is 4

    Value is 5

    Value is one

    Value is two

    Value is three

    Value is four

    Value is five

    关联数组

    关联数组在功能方面非常类似于数字数组,但它们的索引方面是不同的。关联数组将其索引作为字符string,以便您可以在键和值之间建立强关联。

    要将员工的工资存储在数组中,数字索引数组将不是最佳选择。相反,我们可以使用员工姓名作为我们的关联数组中的关键字,并且该值将是他们各自的薪水。

    注意- 打印时不要将关联数组保留在双引号内,否则不会返回任何值。

    <?php /* First method to associate create array. */$salaries=array("mohammad"=>2000,"qadir"=>1000,"zara"=>500);echo"Salary of mohammad is ".$salaries["mohammad"]."
    ";echo"Salary of qadir is ".$salaries["qadir"]."
    ";echo"Salary of zara is ".$salaries["zara"]."
    ";/* Second method to create array. */$salaries["mohammad"]="high";$salaries["qadir"]="medium";$salaries["zara"]="low";echo"Salary of mohammad is ".$salaries["mohammad"]."
    ";echo"Salary of qadir is ".$salaries["qadir"]."
    ";echo"Salary of zara is ".$salaries["zara"]."
    ";?>

    输出结果如下 -

    Salary of mohammad is 2000

    Salary of qadir is 1000

    Salary of zara is 500

    Salary of mohammad is high

    Salary of qadir is medium

    Salary of zara is low

    多维数组

    一个多维数组,主数组中的每个元素也可以是一个数组。并且子数组中的每个元素都可以是数组,等等。使用多个索引访问多维数组中的值。

    在这个例子中,我们创建了一个二维数组来存储三个学生的三个学科的标记 -

    此示例是一个关联数组,您可以以相同的方式创建数组。

    <?php

    $marks=array("mohammad"=>array("physics"=>35,"maths"=>30,"chemistry"=>39),"qadir"=>array("physics"=>30,"maths"=>32,"chemistry"=>29),"zara"=>array("physics"=>31,"maths"=>22,"chemistry"=>39));/* Accessing multi-dimensional array values */echo"Marks for mohammad in physics : ";echo $marks["mohammad"]["physics"]."
    ";echo"Marks for qadir in maths : ";echo $marks["qadir"]["maths"]."
    ";echo"Marks for zara in chemistry : ";echo $marks["zara"]["chemistry"]."
    ";?>

    输出结果如下 -

    Marks for mohammad in physics : 35

    Marks for qadir in maths : 32

    Marks for zara in chemistry : 39

    展开全文
  • 以线性方式值存储和访问关联数组,数组与字符串索引。元素值与键值不是一个严格的线性索引顺序。多维数组,包含一个或多个数组和数组值是使用多个索引访问数字数组这些数组可以存储数字、字符串和任何对象,但他们将...
  • 一、三点线图、 1、plot3 函数、 2、plot3 绘图示例、 3、plot3 绘图示例 2、 4、plot3 绘图示例 3、 二、2D 与 3D 关联
  • 所以有2个简单的办法(1)先翻转一数组,然后进行合并,再翻转回来这里就不写详细代码,写下简单过程$array_a =array_flip($array_a);这样交换后的数组内容如下Array([DHL] =>carrier)Array([HKPT] =>carrier)...
  • --定义许可二数组类型 TYPE type_permission_array IS TABLE OF type_permission INDEX BY BINARY_INTEGER; --定义字符串一数组类型 TYPE type_str_array IS TABLE OF varchar(30) INDEX BY BINARY_INTEGER; --...
  • } } } 2、 热存储维表 这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下: 优点:维度数据量不受内存限制,可以存储很大的数据量。...
  • => move:movesubmit [12] => move2:ONLINE Submit [13] => thisstep:1 [14] => sid:36771 [15] => token:1234567890 ) 或者,对于我最后的例子: $fieldnames_original = explode('|', $_POST['fieldnames']);...
  • 第一种类型想用foreach()遍历整个二数组:$team = array('lk','ok');$book = array('linux服务器配置与管理',$team);foreach($book as $k=>$val) //for $book each $value( as )echo $k.'=>'.$val.'';输出...
  • 1 目标1.1 在 Laravel 项目的开发中,多态的需求很常见,按多态关联进行排序的需求也是必须的。1.2 请想像,我们有一个需求,荣誉栏目多态关联一个档案模型,要求在荣誉中按档案的推荐时间进行排序,以获取最近推荐...
  • 通过维数(此处是3)乘以每元素的个数(此处是2)就可以得出该多维数组的元素个是6了。但是这并不是保险的做法,因为多维数组中每一个维度的元素个是可以不一样的,如: var persons = [["zhangsan", 25], [...
  • Bash 的关联数组详解Bash 支持关联数组(associative arrays),可以使用任意的字符串、或者整数作为下标来访问数组元素。关联数组的下标和值称为键值对,它们是一一对应关系,键是唯一的,值可以不唯一。要使用关联...
  • 曲线越接近,相应序列之间的关联度就越大,反之就越小。 图像越接近则联系越紧密,权重也就越大 灰色关联分析与传统数理统计的比较 数理统计有以下不足之处 而灰色关联分析对样本量要求小,样本量小的时候...
  • 声明:本系列博客是根据SGG的视频整理而成,非常适合大家入门学习。 《2021年最新版大数据面试题全面开启更新》 常见的表Join方式有四种: 预加载表 热存储表 广播表 Temporal table function ...
  • 《二粒子群算法matlab源程序》由会员分享,可在线阅读,更多相关《二粒子群算法matlab源程序(11页珍藏版)》请在技术文库上搜索。1、9微博,快来“关注”我,了解我的最新动态吧。... %不可行解elsebool=1;...
  • php怎么把关联数组转成索引数组PHP和PERL、VB不同,其只有“数组”,没有单独的“关联数组”类型,实际上PHP的所有数组都是关联数组,当没有指定键值的时候,键值...PHP中一索引数组转二维关联数组$u_id = 2;$a_i...
  • 关联式规则(Association Rules, AR),又称关联规则,是数据挖掘的一个重要课题,用于从大量数据中挖掘出有价值的数据项之间的相关关系。关联规则解决的常见问题如:“如果一个消费者购买了产品A,那么他有多大机会...
  • 一、前言在学习The Apriori algorithm算法时,参考了多篇博客和一篇论文,尽管这些都是很...关联规则的目的在于在一个数据集中找出项之间的关系,也称之为购物蓝分析 (market basket analysis)。例如,购买鞋的顾客...
  • 我发现的python语言实现的包有两个:pymining:根据Apriori算法进行关联规则挖掘Orange3的关联规则库:根据FP-growth算法进行关联规则挖掘经过分析,我决定使用Oranges进行关联规则的实现,原因如下:FP-growth算法...
  • }更完整的例子:(还要注意我把类别而不是类别,如果需要改变)。 要从这种方法中剔除的关键事项: 检查密钥是否存在 如果没有,请创建 .push可用于javascript数组以将项目添加到数组 使用字符串作为对象的类型: var ...
  • 直接修改state中的的数据是不被允许的,会报错这个时候可以使用三种种方式处理第一种:使用拓展运算符,深拷贝一数组或对象var arrA = [1,2,3,4]var a = [...arr]||var ObjB = {a:1,b:2}var b = {...ObjB}如果只是...
  • php中数组遍历方法有很多种,如有:for,foreach,while(),list(),each()这些函数与方法都要我们在数组遍历中会用到的,下面... 数组必须是索引数组,而且下标还必须是连续的 索引数组下标还可以不连序,数组还有关联...
  • 关联规则挖掘 关联规则挖掘发现大量数据中项集之间有趣的关联或相关联系 关联规则挖掘的基本概念 关联规则挖掘(Association rule mining): 关联规则挖掘的主要对象是交易型数据库,一个交易一般会由交易处理时间,...
  • 关联规则挖掘算法——Apriori算法

    千次阅读 2021-03-19 08:34:45
    关联规则挖掘这一章中,Apriori算法是非常重要的~之后本章的实验中也要实现—— 理解&自己编程实现Apriori算法 (所以要好好学一下哈~) 为啥需要Apriori算法呢?—— 在关联规则挖掘中,要产生频繁项集,要...
  • 定义:集合C的仿射维数为其仿射包的维数。 通俗理解:仿射包包含集合C中任意点的线性组合。所有线性组合最终组成的空间是几维,仿射包就是几维。比如上的单位圆环,任意点的线性组合都在一个平面上,即单位圆环的...
  • 本文充分利用立体图像中的稀疏、密集、语义和几何信息,提出了一种用于自动驾驶的三目标检测方法。我们的方法,称为 Stereo R-CNN,扩展了 Faster R-CNN 用于立体输入,以同时检测和关联左右图像中的目标。我们在...
  • 最近需要使用PHP中的二数组,就用一个简单的例子来说明PHP中二数组是如何使用$a=array('a','b','c');$c=array('a1','b1','c1');$b=array('id'=>18,'count'=>27,$a,$c);echo $b['id']; // 18echo $b['count']...
  • 我们生产常有将实时数据流与 Hive 表 join 来丰富数据的需求,其中 Hive 表是分区表,业务上需要关联上 Hive 最新分区的数据。上周 Flink 1.12 发布了,刚好支撑了这种业务场景,我也将 1.12 版本部署后做了一个...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 46,521
精华内容 18,608
关键字:

关联维数例子