精华内容
下载资源
问答
  • 068. 缓存冷启动问题新系统...系统第一次上线启动,系统在 redis 故障的情况下重新启动,在高并发的场景下就会出现所有的流量 都会打到 mysql(原始数据库) 上去。可能导致 mysql 崩溃 以下是图示: 「冷启.

     缓存冷启动问题:新系统上线 redis 彻底崩溃导致数据无法恢复

    什么是缓存冷启动?简单说就是缓存中没有数据,考虑下面两个场景

    1. 新系统第一次上线,此时在缓存里可能是没有数据的

    2. 系统在线上稳定运行着,但是突然间重要的 redis 缓存全盘崩溃了,而且不幸的是,数据全都无法找回来

    系统第一次上线启动,系统在 redis 故障的情况下重新启动,在高并发的场景下就会出现所有的流量 都会打到 mysql(原始数据库) 上去。可能导致 mysql 崩溃

    以下是图示:

     

    「冷启动」,是说缓存中没有数据但是缓存短时间又恢复正常后的流量被大量打到 mysql。

    那么还有一种情况是「缓存雪崩」,可能是缓存失效、redis 挂了等,流量被大量打到 mysql 中

    注意这两个场景的关注点是不同的。

    基于 storm 实时热点统计的分布式并行缓存预热

    缓存预热基本思路

    由于缓存冷启动问题,redis 启动后,一点数据都没有,直接就对外提供服务了,

    • 提前给 redis 中灌入部分数据,再提供服务

    • 数据量太大的话,无法将所有数据放入 redis 中

      • 耗费时间过长

      • 或 redis 根本无法容纳下所有的数据

    • 需要根据当天的具体访问情况,实时统计出访问频率较高的热数据

      然后将访问频率较高的热数据写入 redis 中,肯定数据也比较多, 我们也得多个服务并行读取数据去写,并行的分布式缓存预热

    • 都准备好后,在对外服务,就不至于冷启动

    缓存预热具体实现思路

    1. nginx +lua 将访问流量上报到 kafka 中

      要统计出当前最新的实时的热数据是那些,我们就得将商品详情页访问的请求对应的流量 日志实时上报到 kafka 中

    2. storm 从 kafka 中消费数据,实时统计访问次数

      访问次数基于 LRU 内存数据结构的存储方案;

      为什么要基于 LRU 内存方案?

      1. storm 中读写数据频繁

      2. 数据量大

      所以不适合依赖 redis 或者 mysql:

      • redis 可能出现故障,会导致 storm 的稳定性

      • mysql:扛不住高并发读写

      • hbase:hadoop 生态组合还是不错的,但是对于非专业大数据方向来说,维护太重了

      我之前做过的一些项目,一些广告计费类的系统也是用这种方案,有人就直接往 mysql 中去写, 流量上来之后 mysql 直接被打死了

      其实我们的需求就是:统计出最近一段时间访问最频繁的商品,进行访问计数, 同时维护出一个前 N 个访问最多的商品 list 即可

      也就是热数据:最近一段时间(如最近 1 小时、5 分钟),1 万个商品请求, 统计这段时间内每个商品的访问次数,排序后做出一个 top n 列表

      计算好每个 task 大致要存放的商品访问次数的数量,计算出大小, 然后构建一个 LRU MAP,它能够给你一个剩下访问次数最多的商品列表,访问高的才能存活

      LRU MAP 有开源的实现,apach commons collections 中有提供,设置好 map 的最大大小, 就会自动根据 LRU 算法去剔除多余的数据,保证内存使用限制, 即时有部分数据被干掉了,下次会从 0 开始统计,也没有关系,因为被 LRU 算法干掉了, 就表示它不是热数据,说明最近一段时间都很少访问了,热度下降了

    3. 每个 Storm task 启动时,基于 zk 分布式锁,将自己的 ID 写入 zk 同一个节点中

      这个 id 写到一个固定节点中,形成一个 task id 列表, 后续可以通过这个 id 列表去拿到对于 task 存储在 zk node 上的 topn 列表

    4. 每个 Storm task 负责完成自己这里的热数据统计

      比如每隔一段时间,就遍历下这个 map,维护并更新一个前 n 个商品的 list

    5. 定时同步到 zk 中去

      写一个后台线程,每隔一段时间,比如 1 分钟,将这个 task 所有的商品排名算一次 将排名前 n 的热数据 list 同步到 zk 中去

    6. 需要一个服务,根据 top n 列表在 mysql 中获取数据往 redis 中存

      这个服务有会部署多个实例,在启动时会拉取 storm task id 列表, 然后通过 zk 分布式锁,基于 id 去加锁,获取到这个 task id 节点中存储的 topn 列表, 然后读取 mysql 中的数据,存储在 redis 中

      这个服务可以是单独的服务,本课程为了方便会放在缓存服务中

    这整个方案就是分布式并行缓存预热

    小结

    1. 使用 stom 实时计算出最近一段时间内的 n 个 topn 列表,并存储在 zk task id 节点上

    2. 多服务通过 task id 进行分布式锁,获取 topn 列表,去 mysql 拉取数据放入 redis 中

    由于对 storm 不熟悉,这里的思路看来,只是利用了 storm 能创建大量并行的 task 和数据分组策略, 来让大量的访问日志分发到 n 个 task 中,让 storm 这种抗住大量并发访问量的计算能力, 注意这里是计算出 n 个 topn 列表,也就是大量的热数据。而不是唯一的一份 topn 列表, 而且是最近一段时间内的(之前一直想不通 storm 怎么能达到实时计算?原来是通过这种分而治之方式 + 分段时间来重复计算自己负责的部分结果数据实现的)

    如果想维护一个全局的排行榜名单的话,用 storm 应该怎么做?这个数据量就很大了, 比如淘宝的双 11 的秒级统计成交金额

    基于 nginx+lua 完成商品详情页访问流量实时上报 kafka 的开发

    本章节要实现的就是:在 nginx 这一层,接收到访问请求的时候,就把请求的流量上报发送给 kafka, 这样的话,storm 才能去消费 kafka 中的实时的访问日志,然后去进行缓存热数据的统计

    使用到的 lua 工具包:lua-resty-kafka

    安装 lua-resty-kafka

    nginx 三台的作用

    • eshop-01:应用层

    • eshop-02:应用层

    • eshop-03:分发层

    我们需要在 01 和 02 应用层上装上该依赖,并编写上报脚本

    cd /usr/local/
    wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
    yum install -y unzip
    unzip master.zip
    # resty 目录下是 kafka 目录,其实就是讲 kafka 目录放到 lualib 中去
    cp -rf /usr/local/lua-resty-kafka-master/lib/resty/ /usr/hello/lualib
    # 加载依赖包,其实后续写完脚本之后也需要 reload 的
    /usr/servers/nginx/sbin/nginx -s reload

    脚本编写

    /usr/hello/lua/product.lua 中增加这段逻辑

    提示:这种工具类的核心写法,在该工具官网 github 中有示例

    该段逻辑由于比较独立,可以放在 product.lua 顶部。

    local cjson = require("cjson")
    -- 引用之前安装的工具包
    local producer = require("resty.kafka.producer")
    ​
    local broker_list = {
      { host = "192.168.99.170", port = 9092 },  
      { host = "192.168.99.171", port = 9092 },  
      { host = "192.168.99.172", port = 9092 }
    }
    ​
    -- 定义日志信息
    local log_json = {}
    log_json["headers"] = ngx.req.get_headers()  
    log_json["uri_args"] = ngx.req.get_uri_args()  
    log_json["body"] = ngx.req.read_body()  
    log_json["http_version"] = ngx.req.http_version()  
    log_json["method"] =ngx.req.get_method()
    log_json["raw_reader"] = ngx.req.raw_header()  
    log_json["body_data"] = ngx.req.get_body_data()
    ​
    -- 序列化为一个字符串
    local message = cjson.encode(log_json);  
    ​
    -- local offset, err = p:send("test", key, message)
    -- 这里的 key 只是作为消息路由分区使用,kafka 中的概念
    local productId = ngx.req.get_uri_args()["productId"]
    -- 异步发送信息
    local async_producer = producer:new(broker_list, { producer_type = "async" })   
    local ok, err = async_producer:send("access-log", productId, message)  
    ​
    if not ok then  
        ngx.log(ngx.ERR, "kafka send err:", err)  
        return  
    end

    记得需要 /usr/servers/nginx/sbin/nginx -s reload

    kafka topic 创建与消费显示

    详细内容可参考之前的内容

    cd /usr/local/kafka_2.9.2-0.8.1.1
    # 创建测试的 topic,名称为 access-log
    bin/kafka-topics.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic access-log --replication-factor 1 --partitions 1 --create
    # 创建一个消费者
    bin/kafka-console-consumer.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic access-log --from-beginning

    测试脚本是否达到正常效果

    记得后端缓存服务需要启动,nginx 本地缓存是有过期时间的,过期后就会去请求后端服务了

    访问地址:http://eshop-cache03/product?method=product&productId=1&shopId=1

    页面能正常看到商品信息,但是 kafka consumer 无信息

    # 查看 nginx 的错误日志发现
    tail -f /usr/servers/nginx/logs/error.log
    ​
    2019/05/07 20:14:49 [error] 9888#0: [lua] producer.lua:258: buffered messages send to kafka err: no resolver defined to resolve "eshop-cache01", retryable: true, topic: access-log, partition_id: 0, length: 1, context: ngx.timer, client: 192.168.99.172, server: 0.0.0.0:80
    ​

     经过实战排错,resolver 8.8.8.8; 可以不配置,只需要修改 kafka 配置文件配置项 advertised.host.name = 对应机器 ip 即可 :::

    解决方法:

    vi /usr/servers/nginx/conf/nginx.conf
    ​
    在 http 部分,加入 resolver 8.8.8.8;

    再次尝试发现日志变更了

    2019/05/07 20:20:55 [error] 9891#0: [lua] producer.lua:258: buffered messages send to kafka err: eshop-cache01 could not be resolved (3: Host not found), retryable: true, topic: access-log, partition_id: 0, length: 1, context: ngx.timer, client: 192.168.99.172, server: 0.0.0.0:80

    可以看到日志,的确是去解析了,但是这个是我们本地自定义的肯定解析不到,那么这个问题是哪里的问题呢?

    我懒一点,视频中说到,需要更改 kafka 的配置文件,让用本机 ip 而不是 hostName

    vi /usr/local/kafka_2.9.2-0.8.1.1/config/server.properties
    ​
    # 默认是 hostname,更改为自己机器的 ip 地址
    #advertised.host.name=<hostname routable by clients>
    advertised.host.name = 192.168.99.170

    再重启 kafka

    [root@eshop-cache01 lua]# jps
    12698 Jps
    12310 logviewer
    1576 Kafka
    ​
    kill -9 1576
    ​
    cd /usr/local/kafka_2.9.2-0.8.1.1
    nohup bin/kafka-server-start.sh config/server.properties &
    # 查看是否启动是否报错
    cat nohup.out

    再次访问,发现能接受到信息了

    [root@eshop-cache01 kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper 192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181 --topic access-log --from-beginning
    {"method":"GET","http_version":1.1,"raw_reader":"GET \/product?productId=1&shopId=1 HTTP\/1.1\r\nHost: 192.168.99.171\r\nUser-Agent: lua-resty-http\/0.13 (Lua) ngx_lua\/9014\r\n\r\n","uri_args":{"productId":"1","shopId":"1"},"headers":{"host":"192.168.99.171","user-agent":"lua-resty-http\/0.13 (Lua) ngx_lua\/9014"}}

     

     基于 storm+kafka 完成商品访问次数实时统计拓扑的开发

    本节的代码思路如下:

    之前已经完成过 storm hellowd 了,在这模板基础上添加业务代码。

    1. 编写消费 kafka 的 spout
    2. 编写解析日志的 bolt,获取到商品 id
    3. 编写统计商品次数的 bolt

     

    需要重新写一个项目,因为是业务代码了。需要依赖 kafka 等库

    ```

    ## HotProductTopology 编码

    ```java
     

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.LinkedTransferQueue;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;

    /**
     * 消费 kafka 数据的 spout
     *

     */
    public class AccessLogConsumerSpout extends BaseRichSpout {
        private LinkedTransferQueue<String> queue;
        private SpoutOutputCollector collector;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            queue = new LinkedTransferQueue();
            this.collector = collector;
            startKafka();
        }

        @Override
        public void nextTuple() {
            try {
                // 使用 LinkedTransferQueue 的目的是:
                // kafka put 会一直阻塞,直到有一个 take 执行,才会返回
                // 这里能真实的反应客户端消费 kafka 的能力
                // 而不是无限消费,存在内存中
                String message = queue.take();
                collector.emit(new Values(message));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("message"));
        }

        private ConsumerConnector consumer;
        private String topic;

        private void startKafka() {
            consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(
                    "192.168.99.170:2181," +
                            "192.168.99.171:2181," +
                            "192.168.99.172:2181",
                    "eshop-cache-group"));
            this.topic = "access-log";
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Map<String, Integer> topicCountMap = new HashMap<>();
                    topicCountMap.put(topic, 1);
                    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
                    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

                    for (final KafkaStream stream : streams) {
                        ConsumerIterator<byte[], byte[]> it = stream.iterator();
                        while (it.hasNext()) {
                            MessageAndMetadata<byte[], byte[]> next = it.next();
                            String message = new String(next.message());
                            try {
                                queue.transfer(message);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }).start();
        }

        private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            props.put("zookeeper.connect", a_zookeeper);
            props.put("group.id", a_groupId);
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    }

    ```

    ```java
     

    import com.alibaba.fastjson.JSONObject;

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    import java.util.Map;

    /**

     */
    public class LogParseBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            String message = input.getStringByField("message");
            JSONObject jsonObject = JSONObject.parseObject(message);
            // "uri_args":{"productId":"1","shopId":"1"}
            JSONObject uri_args = jsonObject.getJSONObject("uri_args");
            Long productId = uri_args.getLong("productId");
            if (productId != null) {
                collector.emit(new Values(productId));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("productId"));
        }
    }

    ```

    ```java
     

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.trident.util.LRUMap;
    import org.apache.storm.tuple.Tuple;

    import java.util.Map;

    /**

     */
    public class ProductCountBolt extends BaseRichBolt {
        private LRUMap<Long, Long> countMap = new LRUMap(100);

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        @Override
        public void execute(Tuple input) {
            Long productId = input.getLongByField("productId");
            Long count = countMap.get(productId);
            if (count == null) {
                count = 0L;
            }
            countMap.put(productId, ++count);
            System.out.println("商品 " + productId + ",次数 " + countMap.get(productId));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    ```

    ```java
     

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;

    import java.util.concurrent.TimeUnit;

    /**

     */
    public class HotProductTopology {
        public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException, InterruptedException {
            // 构建拓扑,也就是手动定义业务流程
            // 其他的提交到 storm 集群后,由 storm 去调度在哪些机器上启动你所定义的 拓扑
            TopologyBuilder builder = new TopologyBuilder();
            // id、spout、并发数量
            builder.setSpout(AccessLogConsumerSpout.class.getSimpleName(),
                    new AccessLogConsumerSpout(), 2);
            builder.setBolt(LogParseBolt.class.getSimpleName(),
                    new LogParseBolt(), 5)
                    .setNumTasks(5)
                    .shuffleGrouping(AccessLogConsumerSpout.class.getSimpleName());
            builder.setBolt(ProductCountBolt.class.getSimpleName(),
                    new ProductCountBolt(), 5)
                    .setNumTasks(5)
                    .fieldsGrouping(LogParseBolt.class.getSimpleName(), new Fields("productId"));

            Config conf = new Config();
            conf.setDebug(false);
            if (args != null && args.length > 0) {
                // 表示在命令行中运行的,需要提交的 storm 集群中去
                conf.setNumWorkers(3);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
            } else {
                conf.setMaxTaskParallelism(3);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("HotProductTopology", conf, builder.createTopology());
                TimeUnit.SECONDS.sleep(60);
                cluster.shutdown();
            }
        }
    }

    ```

    ## 代码测试

    1. 先本地运行 HotProductTopology
    2. 访问地址:http://eshop-cache03/product?method=product&productId=1&shopId=1

    这个时候回分发到两个 应用层 ningx 上,就会上报到 kafka。

    观察打印的日志信息,访问一次就会打印一次

    ```
    商品 1,次数 1
    商品 1,次数 2
    商品 1,次数 3
    ```

     

    基于 storm 完成 LRUMap 中 topn 热门商品列表的算法讲解与编写

    ## top n 简易算法

    ```java
    public static void main(String[] args) {
        /**
         * top n 简易算法:手写思路
         * top 3 列表: 5、3、1
         * 比如来一个 6,那么比 5 大,把 5 所在位置往后移位。最后把 6 放到 第一位,
         * 变成:6、5、3
         */

        int n = 10;
        int[] topn = new int[n];

        // 循环 n 次,模拟有这么多数据需要计算
        for (int i = 0; i < 100; i++) {
            int randomNum = RandomUtils.nextInt(100);
    //            int randomNum = i;
            // 每次都从第一个开始比较
            for (int j = 0; j < topn.length; j++) {
                int target = topn[j];
                if (randomNum > target) {
                    // 从当前位置往后移动一位
                    System.arraycopy(topn, j, topn, j + 1, n - (j + 1));
                    topn[j] = randomNum;
                    break;
                }
            }
        }
        // 某一次的输出结果 [99, 99, 99, 99, 99, 96, 93, 93, 91, 91]
        System.out.println(Arrays.toString(topn));
    }
    ```

    ## 商品列表计算 top n

    ```java
    public class ProductCountBolt extends BaseRichBolt {
        private LRUMap<Long, Long> countMap = new LRUMap(100);

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            // 启动一个线程,1 分钟计算一次
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int n = 3;

                    Map.Entry<Long, Long>[] top = new Map.Entry[n];
                    while (true) {
                        Arrays.fill(top, null);
                        Utils.sleep(6000);
                        for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
                            long value = entry.getValue();
                            for (int i = 0; i < top.length; i++) {
                                Map.Entry<Long, Long> targetObj = top[i];
                                if (targetObj == null) {
                                    top[i] = entry;
                                    break;
                                }
                                long target = targetObj.getValue();
                                if (value > target) {
                                    // 使用数组 + 系统原生 copy ,性能很棒
                                    // 而且 top n 不大的话,更快
                                    System.arraycopy(top, i, top, i + 1, n - (i + 1));
                                    top[i] = entry;
                                    break;
                                }
                            }
                        }
                        System.out.println(Thread.currentThread().getName() + ":" + Arrays.toString(top));
                    }
                }
            }).start();
        }
      }
    ```

    启动 storm 后,来测试统计是否正确

    访问:`http://eshop-cache03/product?method=product&productId=11&shopId=1`

    这里为了能让商品 id 能落到同一个 task 上,选择了商品 id:2、5、8、11 总共 4个进行访问次数测试

    统计输出如下

    ```
    商品 5,次数 1
    商品 2,次数 1
    Thread-41:[null, null, null]
    Thread-40:[null, null, null]
    Thread-39:[5=1, 2=1, null]
    // 后面的非 39 线程的我就删除了,为了看着清晰一点
    商品 2,次数 2
    Thread-39:[2=2, 5=1, null]
    Thread-39:[2=2, 5=1, null]
    Thread-39:[2=2, 5=1, null]
    Thread-39:[2=2, 5=1, null]
    Thread-39:[2=2, 5=1, null]
    Thread-39:[2=2, 5=1, null]
    商品 8,次数 1
    Thread-39:[2=2, 5=1, 8=1]   // top 列表中 3 个都满了
    Thread-39:[2=2, 5=1, 8=1]
    商品 11,次数 1               // 再来一个 11 ,看看效果
    Thread-39:[2=2, 5=1, 8=1]  // 结果没有看到 11,这个很正常,访问次数都不比现在的大,所以没有入围
    商品 11,次数 2
    Thread-39:[2=2, 11=2, 5=1] // 当访问两次后,商品 8 被挤掉了
    商品 11,次数 3
    Thread-39:[11=3, 2=2, 5=1]
    Thread-39:[11=3, 2=2, 5=1]
    ```

    从日志测试来看,这个算法是没有问题的

    ## 另外一种简单的 top n 算法

    这种 topn 算法没有那么麻烦,思路也很清晰

    ```java
    public static void topn() {
        int n = 10;
        int[] topn = new int[n];

        // 循环 n 次,模拟有这么多数据需要计算
        for (int i = 0; i < 100; i++) {
            int randomNum = RandomUtils.nextInt(100);
            // int randomNum = i;
            for (int j = 0; j < topn.length; j++) {
                int target = topn[j];
                if (randomNum >= target) {
                    topn[j] = randomNum;
                    randomNum = target;
                }
            }
        }
        System.out.println(Arrays.toString(topn));
    }
    ```

     

     基于 storm+zookeeper 完成热门商品列表的分段存储

    分段存储的思路:

    1. 每个 task 启动时,将自己的 task id 存储至 zk 中的 hot-product-task-list 节点
    2. 每个 task 在计算完一次 top n 时,将自己的 列表存储在 hot-product-task-task id 节点中

    ## 改造 zk 工具类

    - 改造了 分布式锁的获取与释放,path 传递,而不再写死代码中了
    - 新增了获取/写入节点数据

    ```java
    public class ZooKeeperSession {
        private ZooKeeper zookeeper;
        private CountDownLatch connectedSemaphore = new CountDownLatch(1);

        private ZooKeeperSession() {
            String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
            int sessionTimeout = 5000;
            try {
                // 异步连接,所以需要一个  org.apache.zookeeper.Watcher 来通知
                // 由于是异步,利用 CountDownLatch 来让构造函数等待
                zookeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        Watcher.Event.KeeperState state = event.getState();
                        System.out.println("watch event:" + state);
                        if (state == Watcher.Event.KeeperState.SyncConnected) {
                            System.out.println("zookeeper 已连接");
                            connectedSemaphore.countDown();
                        }
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                connectedSemaphore.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("zookeeper 初始化成功");
        }

        /**
         * 获取分布式锁
         */
        public void acquireDistributedLock(String path) {
            byte[] data = "".getBytes();
            try {
                // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
                // EPHEMERAL:客户端被断开时,该节点自动被删除
                zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("获取锁成功 [path=" + path + "]");
            } catch (Exception e) {
                e.printStackTrace();
                // 如果锁已经被创建,那么将异常
                // 循环等待锁的释放
                int count = 0;
                while (true) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(20);
                        // 休眠 20 毫秒后再次尝试创建
                        zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    } catch (Exception e1) {
    //                    e1.printStackTrace();
                        count++;
                        continue;
                    }
                    System.out.println("获取锁成功 [path=" + path + "] 尝试了 " + count + " 次.");
                    break;
                }
            }
        }

        /**
         * 释放分布式锁
         */
        public void releaseDistributedLock(String path) {
            try {
                zookeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }

        /**
         * 写节点数据
         */
        public void setNodeData(String path, String data) {
            try {
              Stat exists = zookeeper.exists(path, false);
                if (exists == null) {
                    // 节点不存在,先创建 PERSISTENT 持久连接
                    zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    return;
                }
                zookeeper.setData(path, data.getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public String getNodeData(String path) {
            try {
                return new String(zookeeper.getData(path, false, new Stat()));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }

        private static ZooKeeperSession instance = new ZooKeeperSession();

        public static ZooKeeperSession getInstance() {
            return instance;
        }
    }
    ```

    ## 实现分段存储后的 ProductCountBolt

    ```java
    package cn.mrcode.cachepdp.eshop.storm;

    import com.alibaba.fastjson.JSON;

    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.trident.util.LRUMap;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.utils.Utils;

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;

    public class ProductCountBolt extends BaseRichBolt {
        private LRUMap<Long, Long> countMap = new LRUMap(100);
        private ZooKeeperSession zooKeeperSession;
        private int taskId = -1;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            taskId = context.getThisTaskId();
            zooKeeperSession = ZooKeeperSession.getInstance();
            // 启动一个线程,1 分钟计算一次
            topnStart();
            // 上报自己的节点 id 到列表中
            writeTaskPathToZk();
        }

        private void topnStart() {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int n = 3;
                    Map.Entry<Long, Long>[] top = new Map.Entry[n];
                    while (true) {
                        Arrays.fill(top, null);
                        Utils.sleep(6000);
                        for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
                            long value = entry.getValue();
                            for (int i = 0; i < top.length; i++) {
                                Map.Entry<Long, Long> targetObj = top[i];
                                if (targetObj == null) {
                                    top[i] = entry;
                                    break;
                                }
                                long target = targetObj.getValue();
                                if (value > target) {
                                    System.arraycopy(top, i, top, i + 1, n - (i + 1));
                                    top[i] = entry;
                                    break;
                                }
                            }
                        }
                        System.out.println(Thread.currentThread().getName() + ":" + Arrays.toString(top));
                        // 把结果接入到 zk 上
                        writeTopnToZk(top);
                    }
                }
            }).start();
        }

        private void writeTaskPathToZk() {
            // 由于该操作是并发操作,需要通过分布式锁来写入
            final String lockPath = "/hot-product-task-list-lock";
            final String taskListNode = "/hot-product-task-list";
            zooKeeperSession.acquireDistributedLock(lockPath);
            String nodeData = zooKeeperSession.getNodeData(taskListNode);
            // 已经存在数据的话,把自己的 task id 追加到尾部
            if (nodeData != null && !"".equals(nodeData)) {
                nodeData += "," + taskId;
            } else {
                nodeData = taskId + "";
            }
            zooKeeperSession.setNodeData(taskListNode, nodeData);
            zooKeeperSession.releaseDistributedLock(lockPath);
        }

        private void writeTopnToZk(Map.Entry<Long, Long>[] topn) {
            List<Long> proudcts = new ArrayList<>();
            for (Map.Entry<Long, Long> t : topn) {
              if (t == null) {
                   continue;
               }
                proudcts.add(t.getKey());
            }
            final String taskNodePath = "/hot-product-task-" + taskId;
            zooKeeperSession.setNodeData(taskNodePath, JSON.toJSONString(proudcts));
        }

        @Override
        public void execute(Tuple input) {
            Long productId = input.getLongByField("productId");
            Long count = countMap.get(productId);
            if (count == null) {
                count = 0L;
            }
            countMap.put(productId, ++count);
            System.out.println("商品 " + productId + ",次数 " + countMap.get(productId));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    ```

    基于双重 zookeeper 分布式锁完成分布式并行缓存预热的代码开发

    并行缓存预热思路

    1. 服务启动的时候,进行缓存预热

    2. 从 zk 中读取 taskid 列表

    3. 依次遍历每个 taskid,尝试获取分布式锁,如果获取不到,快速报错,不要等待,因为说明已经有其他服务实例在预热了

    4. 直接尝试获取下一个 taskid 的分布式锁

    5. 即使获取到了分布式锁,也要检查一下这个 taskid 的预热状态,如果已经被预热过了,就不再预热了

      预热状态,也是一个 node path 来存储的,每个 task 一个状态节点

    6. 执行预热操作,遍历 productid 列表,查询数据,然后写 ehcache 和 redis

    7. 预热完成后,设置 taskid 对应的预热状态

    服务启动启动预热与 spring 实例工具类封装

    由于需要在缓存预热的线程中使用缓存服务进行存储,这里需要封装一个 spring bean 获取工具类

    package cn.mrcode.cachepdp.eshop.cache;
    ​
    import org.springframework.web.context.WebApplicationContext;
    import org.springframework.web.context.support.WebApplicationContextUtils;
    ​
    import javax.servlet.ServletContext;
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    ​
    import cn.mrcode.cachepdp.eshop.cache.prewarm.CachePrewarmThread;
    ​
    
     @Component
    public class InitListener implements ServletContextListener {
        @Override
        public void contextInitialized(ServletContextEvent sce) {
            ServletContext servletContext = sce.getServletContext();
            WebApplicationContext webApplicationContext = WebApplicationContextUtils.getWebApplicationContext(servletContext);
            SpringContextUtil.setWebApplicationContext(webApplicationContext);
    ​
            new CachePrewarmThread().start();
        }
    }
    public class SpringContextUtil {
        private static WebApplicationContext context;
    ​
        public static WebApplicationContext getWebApplicationContext() {
            return context;
        }
    ​
        public static void setWebApplicationContext(WebApplicationContext webApplicationContext) {
            context = webApplicationContext;
        }
    }

    zk 工具类的改造

    在思路里面提到了,需要快速失败的一个加锁方式,还有写入/获取数据的方法。 在缓存服务里面的 zk 工具类还没有这样的功能,对这个进行改造

    由于之前写过,这些代码都是体力活了,不想贴上来了

    public class ZooKeeperSession {
        private ZooKeeper zookeeper;
        private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    ​
        private ZooKeeperSession() {
            String connectString = "192.168.99.170:2181,192.168.99.171:2181,192.168.99.172:2181";
            int sessionTimeout = 5000;
            try {
                // 异步连接,所以需要一个  org.apache.zookeeper.Watcher 来通知
                // 由于是异步,利用 CountDownLatch 来让构造函数等待
                zookeeper = new ZooKeeper(connectString, sessionTimeout, event -> {
                    Watcher.Event.KeeperState state = event.getState();
                    System.out.println("watch event:" + state);
                    if (state == Watcher.Event.KeeperState.SyncConnected) {
                        System.out.println("zookeeper 已连接");
                        connectedSemaphore.countDown();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                connectedSemaphore.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("zookeeper 初始化成功");
        }
    ​
        /**
         * 获取分布式锁
         */
        public void acquireDistributedLock(Long productId) {
            String path = "/product-lock-" + productId;
            byte[] data = "".getBytes();
            try {
                // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
                // EPHEMERAL:客户端被断开时,该节点自动被删除
                zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("获取锁成功 product[id=" + productId + "]");
            } catch (Exception e) {
                e.printStackTrace();
                // 如果锁已经被创建,那么将异常
                // 循环等待锁的释放
                int count = 0;
                while (true) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(20);
                        // 休眠 20 毫秒后再次尝试创建
                        zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    } catch (Exception e1) {
    //                    e1.printStackTrace();
                        count++;
                        continue;
                    }
                    System.out.println("获取锁成功 product[id=" + productId + "] 尝试了 " + count + " 次.");
                    break;
                }
            }
        }
    ​
        /**
         * 释放分布式锁
         */
        public void releaseDistributedLock(Long productId) {
            try {
                String path = "/product-lock-" + productId;
                zookeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    ​
        /**
         * 获取分布式锁
         */
        public void acquireDistributedLock(String path) {
            byte[] data = "".getBytes();
            try {
                // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
                // EPHEMERAL:客户端被断开时,该节点自动被删除
                zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("获取锁成功 [path=" + path + "]");
            } catch (Exception e) {
                e.printStackTrace();
                // 如果锁已经被创建,那么将异常
                // 循环等待锁的释放
                int count = 0;
                while (true) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(20);
                        // 休眠 20 毫秒后再次尝试创建
                        zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    } catch (Exception e1) {
    //                    e1.printStackTrace();
                        count++;
                        continue;
                    }
                    System.out.println("获取锁成功 [path=" + path + "] 尝试了 " + count + " 次.");
                    break;
                }
            }
        }
    ​
        /**
         * 获取分布式锁;快速失败,不等待
         */
        public boolean acquireFastFailDistributedLock(String path) {
            byte[] data = "".getBytes();
            try {
                // 创建一个临时节点,后面两个参数一个安全策略,一个临时节点类型
                // EPHEMERAL:客户端被断开时,该节点自动被删除
                zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                System.out.println("获取锁成功 [path=" + path + "]");
                return true;
            } catch (Exception e) {
                System.out.println("获取锁失败 [path=" + path + "]");
                return false;
            }
        }
    ​
        /**
         * 释放分布式锁
         */
        public void releaseDistributedLock(String path) {
            try {
                zookeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    ​
        /**
         * 写节点数据
         */
        public void setNodeData(String path, String data) {
            try {
               Stat exists = zookeeper.exists(path, false);
                if (exists == null) {
                    // 节点不存在,先创建 PERSISTENT 持久连接
                    zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    return;
                }
                zookeeper.setData(path, data.getBytes(), -1);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    ​
        public String getNodeData(String path) {
            try {
                return new String(zookeeper.getData(path, false, new Stat()));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    ​
        private static ZooKeeperSession instance = new ZooKeeperSession();
    ​
        public static ZooKeeperSession getInstance() {
            return instance;
        }
    }

    缓存预热逻辑编写

    public class CachePrewarmThread extends Thread {
        @Override
        public void run() {
            // 1. 获取 task id 列表
            ZooKeeperSession zk = ZooKeeperSession.getInstance();
            final String taskListNode = "/hot-product-task-list";
            String taskListNdeData = zk.getNodeData(taskListNode);
            if (taskListNode == null || "".equals(taskListNdeData)) {
                System.err.println("task list 为空");
                return;
            }
    ​
            CacheService cacheService = SpringContextUtil.getWebApplicationContext().getBean(CacheService.class);
    ​
            String[] taskList = taskListNdeData.split(",");
            for (String taskId : taskList) {
                final String taskNodeLockPath = "/hot-product-task-lock-" + taskId;
                // 尝试获取该节点的锁,如果获取失败,说明被其他服务预热了
                if (!zk.acquireFastFailDistributedLock(taskNodeLockPath)) {
                    continue;
                }
                // 疑问:为什么需要在这里对 预热数据节点加锁?
                // 获取 检查预热状态
                final String taskNodePrewarmStatePath = "/hot-product-task-prewarm-state" + taskId;
                String taskNodePrewarmState = zk.getNodeData(taskNodePrewarmStatePath);
                // 已经被预热过了
                if (taskNodePrewarmState != null && !"".equals(taskNodePrewarmState)) {
                    zk.releaseDistributedLock(taskNodeLockPath);
                    continue;
                }
    ​
                // 还未被预热过,读取 topn 列表,并从数据库中获取商品信息,存入缓存中
                final String taskNodePath = "/hot-product-task-" + taskId;
                String nodeData = zk.getNodeData(taskNodePath);
                if (nodeData == null && "".equals(nodeData)) {
                    // 如果没有数据则不处理
                    zk.releaseDistributedLock(taskNodeLockPath);
                    continue;
                }
    ​
                List<Long> pids = JSON.parseArray(nodeData, Long.class);
    ​
                // 假设这里是从数据库中获取的数据
                pids.forEach(pid -> {
                    ProductInfo productInfo = getProduct(pid);
                    System.out.println("预热缓存信息:" + productInfo);
                    cacheService.saveProductInfo2LocalCache(productInfo);
                    cacheService.saveProductInfo2ReidsCache(productInfo);
                });
    ​
                // 修改预热状态
                zk.setNodeData(taskNodePrewarmStatePath, "success");
                // 释放该 task 节点的锁
                zk.releaseDistributedLock(taskNodeLockPath);
            }
        }
    ​
        private ProductInfo getProduct(Long pid) {
            String productInfoJSON = "{\"id\": " + pid + ", \"name\": \"iphone7手机\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的规格\", \"service\": \"iphone7的售后服务\", \"color\": \"红色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1," +
                    "\"modifyTime\":\"2019-05-13 22:00:00\"}";
            return JSON.parseObject(productInfoJSON, ProductInfo.class);
        }
    }

    热数据 -> 热数据的统计 -> redis 中缓存的预热要解决的场景是:避免新系统刚上线,或者是 redis 崩溃数据丢失后重启, redis 中没有数据,redis 冷启动 -> 大量流量直接到数据库;

    redis 启动前,必须确保其中是有部分热数据的缓存的?什么意思?不是缓存预热就是为了存入热数据到 redis 中吗?

     

    现在这个热点缓存问题,是瞬间的缓存热点,如秒杀,简单说就是负载均衡的特点问题,导致大量访问瞬间被被路由到同一台机器

    在 storm 拓扑中加入热点缓存实时自动识别和感知的代码逻辑

    思路:

    1. 对商品访问次数 LRUMap 中的所有数据进行排序

    2. 计算后 95% 商品平均访值

    3. 对前 5% 的商品进行热点阈值评估,大于 n 倍的视为热点商品,存储在热点商品列表中

    下面用代码来实现这个逻辑

    /**
     * 热点商品感知
     */
    private static class HotProductFindThread extends Thread {
        private Logger logger = LoggerFactory.getLogger(getClass());
        private LRUMap<Long, Long> countMap;
    ​
        public HotProductFindThread(LRUMap<Long, Long> countMap) {
            this.countMap = countMap;
        }
    ​
        @Override
        public void run() {
            List<Map.Entry<Long, Long>> countList = new ArrayList<>();
            List<Long> hotPidList = new ArrayList<>();
    ​
            while (true) {
                Utils.sleep(5000);
                if (countMap.size() < 2) {
                  // 至少有 2 个商品
                  continue;
                }
                // 1. 全局排序
                countList.clear();
                hotPidList.clear();
                for (Map.Entry<Long, Long> entry : countMap.entrySet()) {
                    countList.add(entry);
                }
                Collections.sort(countList, new Comparator<Map.Entry<Long, Long>>() {
                    @Override
                    public int compare(Map.Entry<Long, Long> o1, Map.Entry<Long, Long> o2) {
                        // 取反结果,是降序排列
                        return ~Long.compare(o1.getValue(), o2.getValue());
                    }
                });
    ​
                // 2.计算后 95% 商品平均访值
                int avg95Count = (int) (countList.size() * 0.95);
                int avg95Total = 0;
                // 从列表尾部开始循环 avg95Count 次
                for (int i = countList.size() - 1; i >= countList.size() - avg95Count; i--) {
                    avg95Total += countList.get(i).getValue();
                }
                // 后百分之 95 商品的平均访问值
                int avg95Avg = avg95Total / avg95Count;
                int threshold = 5; // 阈值
    ​
                // 3. 计算热点商品
                for (int i = 0; i < avg95Count; i++) {
                    Map.Entry<Long, Long> entry = countList.get(i);
                    if (entry.getValue() > avg95Avg * threshold) {
                        logger.info("热点商品:" + entry);
                        hotPidList.add(entry.getKey());
                    }
                }
                logger.info("热点商品列表:" + hotPidList);
            }
        }
    ​
        public static void main(String[] args) {
            LRUMap<Long, Long> countMap = new LRUMap<>(5);
            countMap.put(1L, 2L);
            countMap.put(2L, 1L);
            countMap.put(3L, 3L);
            countMap.put(4L, 30L);
            // 最后打印 4L 是热点商品,这个结果是对的
            new HotProductFindThread(countMap).run();
        }
    }

     在 storm 拓扑中加入 nginx 反向推送缓存热点与缓存数据的代码逻辑

    httpClient 工具封装

    这个工具应该都会的

    package cn.mrcode.cachepdp.eshop.storm.http;
    ​
    import org.apache.http.HttpEntity;
    import org.apache.http.HttpResponse;
    import org.apache.http.NameValuePair;
    import org.apache.http.client.HttpClient;
    import org.apache.http.client.entity.UrlEncodedFormEntity;
    import org.apache.http.client.methods.HttpGet;
    import org.apache.http.client.methods.HttpPost;
    import org.apache.http.impl.client.DefaultHttpClient;
    import org.apache.http.message.BasicNameValuePair;
    import org.apache.http.util.EntityUtils;
    ​
    import java.io.BufferedReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    ​
    /**
     * HttpClient工具类
     *
     * @author lixuerui
     */
    @SuppressWarnings("deprecation")
    public class HttpClientUtils {
    ​
        /**
         * 发送GET请求
         *
         * @param url 请求URL
         * @return 响应结果
         */
        @SuppressWarnings("resource")
        public static String sendGetRequest(String url) {
            String httpResponse = null;
    ​
            HttpClient httpclient = null;
            InputStream is = null;
            BufferedReader br = null;
    ​
            try {
                // 发送GET请求
                httpclient = new DefaultHttpClient();
                HttpGet httpget = new HttpGet(url);
                HttpResponse response = httpclient.execute(httpget);
    ​
                // 处理响应
                HttpEntity entity = response.getEntity();
                if (entity != null) {
                    is = entity.getContent();
                    br = new BufferedReader(new InputStreamReader(is));
    ​
                    StringBuffer buffer = new StringBuffer("");
                    String line = null;
    ​
                    while ((line = br.readLine()) != null) {
                        buffer.append(line + "\n");
                    }
    ​
                    httpResponse = buffer.toString();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (br != null) {
                        br.close();
                    }
                    if (is != null) {
                        is.close();
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
    ​
            return httpResponse;
        }
    ​
        /**
         * 发送post请求
         *
         * @param url URL
         * @param map 参数Map
         */
        @SuppressWarnings({"rawtypes", "unchecked", "resource"})
        public static String sendPostRequest(String url, Map<String, String> map) {
            HttpClient httpClient = null;
            HttpPost httpPost = null;
            String result = null;
    ​
            try {
                httpClient = new DefaultHttpClient();
                httpPost = new HttpPost(url);
    ​
                //设置参数
                List<NameValuePair> list = new ArrayList<NameValuePair>();
                Iterator iterator = map.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, String> elem = (Map.Entry<String, String>) iterator.next();
                    list.add(new BasicNameValuePair(elem.getKey(), elem.getValue()));
                }
                if (list.size() > 0) {
                    UrlEncodedFormEntity entity = new UrlEncodedFormEntity(list, "utf-8");
                    httpPost.setEntity(entity);
                }
    ​
                HttpResponse response = httpClient.execute(httpPost);
                if (response != null) {
                    HttpEntity resEntity = response.getEntity();
                    if (resEntity != null) {
                        result = EntityUtils.toString(resEntity, "utf-8");
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
    ​
            }
    ​
            return result;
        }
    ​
    }
    ​
    ​

    热点商品推送 nginx 代码逻辑

    private void pushHotToNginx(Long pid) {
        // 降级策略推送到分发层 nginx
        String distributeNginxURL = "http://eshop-cache03/hot?productId=" + pid;
        HttpClientUtils.sendGetRequest(distributeNginxURL);
    ​
        // 获取商品信息
        String cacheServiceURL = "http://192.168.0.99:6002/getProductInfo?productId=" + pid;
        String response = HttpClientUtils.sendGetRequest(cacheServiceURL);
        try {
            response = URLEncoder.encode(response, "utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        // 推送到应用层 nginx
        String[] appNginxURLs = new String[]{
                "http://eshop-cache01/hot?productId=" + pid + "&productInfo=" + response,
                "http://eshop-cache02/hot?productId=" + pid + "&productInfo=" + response
        };
        for (String appNginxURL : appNginxURLs) {
            HttpClientUtils.sendGetRequest(appNginxURL);
        }
    }
    ​
    public static void main(String[] args) throws UnsupportedEncodingException {
        // 获取商品信息
        String cacheServiceURL = "http://192.168.0.99:6002/getProductInfo?productId=" + 1;
        String response = HttpClientUtils.sendGetRequest(cacheServiceURL);
    ​
        String url = "http://192.168.0.99:6002/test?productId=" + 1 + "&productInfo=" + URLEncoder.encode(response, "UTF-8");
        HttpClientUtils.sendGetRequest(url);
    }

     在流量分发+后端应用双层 nginx 中加入接收热点缓存数据的接口

    分发层接收热点缓存逻辑

    添加 hot 接口映射

    vi /usr/hello/hello.conf

    server {
        listen       80;
        server_name  _;
        location /lua {
          default_type 'text/html';
          content_by_lua_file /usr/hello/lua/hello.lua;
        }
    ​
        location /product {
          default_type 'text/html';
          content_by_lua_file /usr/hello/lua/distribute.lua;
        }
        # 建立接口映射
        location /hot {
          default_type 'text/html';
          content_by_lua_file /usr/hello/lua/hot.lua;
        }                                               
    }
    ​

    编写缓存逻辑 /usr/hello/lua/hot.lua

    local uri_args = ngx.req.get_uri_args()
    local product_id = uri_args["productId"]
    ​
    local cache_ngx = ngx.shared.my_cache
    local hot_product_cache_key = "hot_product_"..product_id
    -- 存入缓存,时间可以设置长一点,1 小时
    cache_ngx:set(hot_product_cache_key,"true",60 * 60)

     

    应用层接收热点缓存逻辑

    应用层都是一样的,也需要先建立接口映射,这里就不贴了

    编写缓存逻辑 /usr/hello/lua/hot.lua

    local uri_args = ngx.req.get_uri_args()
    local product_id = uri_args["productId"]
    local product_info = uri_args["productInfo"]
    ​
    local product_cache_key = "product_info_"..product_id
    ​
    local cache_ngx = ngx.shared.my_cache
    ​
    cache_ngx:set(product_cache_key,product_info,60 * 60)

    ::: tip product_info 发送的时候,编码的,这里不知道不解码是否有问题 :::

     

    在 nginx+lua 中实现热点缓存自动降级为负载均衡流量分发策略的逻辑

    之前思路已经有了:通过 storm 反推过来的

    /usr/hello/lua/distribute.lua

    local uri_args = ngx.req.get_uri_args()
    -- 获取参数
    local productId = uri_args["productId"]
    local shopId = uri_args["shopId"]
    ​
    -- 定义后端应用 ip
    local host = {"192.168.99.170", "192.168.99.171"}
    ​
    local hot_product_key = "hot_product_"..productId
    local cache_ngx = ngx.shared.my_cache
    local hot_product_flag = cache_ngx:get(hot_product_key)
    ​
    local backend = ""
    if hot_product_flag == "true" then
        -- 设置随机数种子
        math.randomseed(tostring(os.time()):reverse():sub(1, 7))
        local index = math.random(1, 2)
        backend = "http://"..host[index]
    else
        -- 对商品 id 取模并计算 hash 值
        local hash = ngx.crc32_long(productId)
        hash = (hash % 2) + 1
        -- 拼接 http 前缀
        backend = "http://"..host[hash]
    end
    ​
    -- 获取到参数中的路径,比如你要访问 /hello,这个例子中是需要传递访问路径的
    local method = uri_args["method"]
    -- 拼接具体的访问地址不带 host,如:/hello?productId=1
    local requestBody = "/"..method.."?productId="..productId.."&shopId="..shopId
    ​
    -- 获取 http 包
    local http = require("resty.http")
    local httpc = http.new()
    ​
    -- 访问,这里有疑问:万一有 cooke 这些脚本支持吗?会很麻烦吗?
    local resp, err = httpc:request_uri(backend, {
        method = "GET",
        path = requestBody,
        keepalive=false
    })
    ​
    -- 如果没有响应则输出一个 err 信息
    if not resp then
        ngx.say("request error :", err)
        return
    end
    ​
    -- 有响应测输出响应信息
    ngx.say(resp.body)  
    ​
    -- 关闭 http 客户端实例
    httpc:close()
    ​

    在 storm 拓扑中加入热点缓存消失的实时自动识别和感知的代码逻辑

    storm 中增加热点消失感知逻辑

    cn.mrcode.cachepdp.eshop.storm.ProductCountBolt.HotProductFindThread#run 中补充该逻辑

    // 4. 热点商品消失,通知 nginx 取消热点缓存
    if (lastTimeHotPids.size() > 0) {
        // 上一次有热点商品
        for (long lastTimeHotPid : lastTimeHotPids) {
            // 但是不在这一次的热点中了,说明热点消失了
            if (!hotPidList.contains(lastTimeHotPid)) {
                // 发送到分发层
                String url = "http://eshop-03/cancel_hot?productId=" + lastTimeHotPid;
                HttpClientUtils.sendGetRequest(url);
            }
        }
    }
    lastTimeHotPids.clear();
    for (Long pid : hotPidList) {
        lastTimeHotPids.add(pid);
    }

    nginx 增加取消热点逻辑

    /usr/hello/hello.conf 中增加接口映射

    location /cancel_hot {
      default_type 'text/html';
      content_by_lua_file /usr/hello/lua/cancel_hot.lua;
    }

    /usr/hello/lua/cancel_hot.lua 逻辑

    local uri_args = ngx.req.get_uri_args()
    local product_id = uri_args["productId"]
    ​
    local cache_ngx = ngx.shared.my_cache
    local hot_product_cache_key = "hot_product_"..product_id
    -- 设置标志,并且过期时间为 60 秒,过期之后条件也不成立
    cache_ngx:set(hot_product_cache_key,"false",60)

    /usr/hello/hello.conf 中声明下 nginx 的缓存变量 my_cache

    lua_shared_dict my_cache 128m;

    将热点缓存自动降级解决方案的代码运行后观察效果以及调试和修复 bug

    热点感知测试

    测试:

    1. nginx 需要 /usr/servers/nginx/sbin/nginx -s reload

    2. 在一些关键处增加日志打印,方便查看调试结果

    3. 后面都在本地运行 storm 了,由于 gradle 打包太麻烦了。 HotProductTopology

    4. 修改应用层 nginx 上的模板 html

      因为需要观察是否被降级为随机路由了,在模板上写上自己本机的 hostanme 即可

    调试日志如下:

    商品 1,次数 1
    Thread-35:[1=1, null, null]
    Thread-36:[null, null, null]
    商品 2,次数 1
    Thread-36:[2=1, null, null]
    Thread-35:[1=1, null, null]
    商品 3,次数 1
    商品 4,次数 1
    51677 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 1 个,平均访问值为 1
    51677 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    51688 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 1 个,平均访问值为 1
    51688 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    Thread-36:[2=1, 4=1, null]
    Thread-35:[1=1, 3=1, null]
    商品 5,次数 1
    56678 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1
    56678 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    56689 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 1 个,平均访问值为 1
    56689 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    商品 6,次数 1
    Thread-36:[2=1, 4=1, 6=1]
    Thread-35:[1=1, 3=1, 5=1]
    商品 6,次数 2
    商品 6,次数 3
    商品 6,次数 4
    商品 6,次数 5
    商品 6,次数 6
    商品 6,次数 7
    商品 6,次数 8
    商品 6,次数 9
    61679 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1
    61679 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    61690 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1
    61690 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 发现一个热点商品:6=9  // 倍数设置的 5 。9 满足条件称为热点了
    商品 6,次数 10
    商品 6,次数 11
    商品 6,次数 12
    商品 6,次数 13
    商品 6,次数 14
    商品 6,次数 15
    商品 6,次数 16
    62209 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[6]
    商品 6,次数 17
    商品 6,次数 18
    Thread-36:[6=18, 2=1, 4=1]
    Thread-35:[1=1, 3=1, 5=1]
    66680 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1
    66681 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    商品 6,次数 19
    商品 6,次数 20
    商品 6,次数 21
    商品 6,次数 22
    商品 6,次数 23
    67211 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1
    67211 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 发现一个热点商品:6=23
    67301 [Thread-38] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[6]

    通过日志观察到该商品,当成为热点的时候,触发了往 nginx 上推送标志, 进行策略降级(这个可以通过)不停的访问 http://eshop-cache03/product?method=product&productId=6&shopId=1 商品 id 为 6 的这个商品,当不是热点商品的时候,只会被路由到指定机器上,当成为热点之后,就会随机路由了

    热点消失感知测试

    怎么测试热点消失呢?看上面的日志有一条很重要的信息

    Thread-36:[6=18, 2=1, 4=1]
    66680 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1

    商品 id 为 6 能被计算为瞬时热点商品是因为,后两个商品平均访问次数为 1,大于 5 倍的阈值, 那么让 商品 id 为 6 的取消热点的方案就出来了:选择商品 id 为 2 的狂刷,把 6 的顶下来

    测试日志如下

    Thread-36:[6=13, 2=2, 4=1]
    85287 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 1
    85287 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 发现一个热点商品:6=13
    85364 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[6]
    ...
    Thread-36:[2=25, 6=13, 4=1]
    90364 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 后 95% 商品数量 2 个,平均访问值为 7
    90364 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 热点商品列表:[]
    90364 [Thread-37] INFO  c.m.c.e.s.ProductCountBolt$HotProductFindThread - 一个热点商品消失了:6

    这个时候再访问 id=6 的商品,发现一直被路由到 eshop-eache01 上了

    优化

    根据日志来看,当商品 id=6 被定为热点的时候,如果没有其他热点商品进来,那么按照现在休眠 5 秒的时间, 每 5 秒就会获取一次缓存并推送到 nginx 上。可以针对这一点进行优化

    // 3. 计算热点商品
    for (int i = 0; i < avg95Count; i++) {
       Map.Entry<Long, Long> entry = countList.get(i);
       if (entry.getValue() > avg95Avg * threshold) {
           logger.info("发现一个热点商品:" + entry);
           hotPidList.add(entry.getKey());
           if (!lastTimeHotPids.contains(entry.getKey())) {
               // 如果该商品已经是热点商品了,则不推送,新热点商品才推送
               // 这里根据具体业务要求进行定制
               // 推送热点商品信息到 所有 nginx 上
               pushHotToNginx(entry.getKey());
           }
       }
    }

    小结

    本小结是为了解决:热点商品在路由 hash 策略下,大流量打到同一台机器上扛不住 方案思路如下:

    1. 通过 storm 实时统计访问次数

    2. 热点商品感知:对每个 task 中的访问列表排序,前 5% 的商品与后 95% 商品平均访问值进行阀值比较,达到到具体设定倍数即认为是热点商品

    3. 热点商品消失感知:记录上一次的热点商品,当它跌出前 5% 时,被感知到,通过两次热点列表比较能得到

    4. 感知到热点商品时通知流量分发层 nginx 改变路由策略

      分发到更多的 nginx 上去,同时 storm 需要反推该商品详情到更多的 nginx 上去, 本列是所有 nginx,随机分发策略

    5. 感知到热点消失时,通知流量分发层取消降级策略

      之前推送到 nginx 上的缓存可以不用理睬,因为设置了缓存过期时间。 只需要再流量分发层上取消掉随机分发策略即可

    展开全文
  • 在沙盒游戏我的世界中,村民与掠夺的首个正式版本1.14已经...众所周知,每次版本更新都会为玩家们带来很多的事物,与此同时也带来不足之处,而那些还没有被官方修复的系统漏洞就被玩家们称之为“特性”,只...

    在沙盒游戏我的世界中,村民与掠夺的首个正式版本1.14已经正式上线了,许多玩家早就迫不及待地进入正版开启全新的冒险旅程!只是玩家们发现全新版本1.14存在大量的系统漏洞,比如村民的“分头行动”、材质丢失或者区块丢失等,这也让玩家们的游戏体验感有所降低,你说呢?

    705e2aa7a90e8e01dbfb67453ae72517.png

    众所周知,每次版本更新都会为玩家们带来很多的新事物,与此同时也会带来不足之处,而那些还没有被官方修复的系统漏洞就会被玩家们称之为“特性”,只有等到它们被修复之后才会改口说是系统漏洞!如今,1.14版本也面临着同样的问题,那就是出现诸多问题,其中就包括闪退!

    536716be8d8540998cf3eee8112060d0.png

    不少玩家纷纷表示自己好不容易安装正版的1.14版本,然而它却频频出现闪退问题,玩家们说纯净原版1.14即使在没有开启其他软件和游戏模组的情况下也会让史蒂夫走着走着就突然闪退,偶尔还会回档几分钟,这到底是怎么回事呢?其实,小编“我尚的大世界”觉得有两个解决方法!

    ba358bf54fabb24d9e6615e1e0db88e9.png

    第一个解决方法是调高游戏启动器里面的内存!说句实在话,如果玩家们在1.14版本中放置红石器件的话,那么系统可能会因为卡顿而造成闪退,因此可以试试把内存调到“1024”或者“2048”看看!不过,有的玩家表示调到“512”都卡得动不了,而调到“2048”可能打不开游戏!

    57705a4b6dea215ee93c677e24174778.png

    第二个解决方法是安装64位的Java并且将内存设置为“2048+”!目前来看,1.14版本出现闪退或许是因为错误地安装Java,所以调整为合理的数值应该就没事了!总的来说,闪退的原因多种多样,而玩家们只能具体问题具体分析,使用前面提到的两个方法一般都能解决!你们觉得呢?

    我的世界:你们是否见过会产蛋的鸡骑士呢?四个移除的神奇特性!

    我的世界:1.14版本的刷铁机为何频频出现问题?这个细节别忘了!

    我的世界:国服的质量为何总是受到质疑?这种玩家才是罪魁祸首!

    我的世界:因玩腻原版生存而找不到目标?这几个内容够你玩很久!

    我的世界:玩家发现骷髅射箭的秘密顺序?前三支箭的方向是这样!

    展开全文
  • 这个现象给开发功能带来很大困扰,当代码上线时候,大家去特别留意有没有的error log出现,但是结果却是看到一屏一屏的不相干的垃圾干扰分清业务日志和系统错误日志,采用日志分析工具,日志分类 最好的...

    多人多个组件的服务器端软件,error logs里面满屏的垃圾或者日志文件混乱,有什么好的方式解决该类问题?
     这个现象给开发新功能带来很大困扰,当新代码上线时候,大家会去特别留意有没有新的error log出现,但是结果却是看到一屏一屏的不相干的垃圾干扰
    分清业务日志和系统错误日志,采用日志分析工具,日志分类
    最好的办法还是code review,对于监控,log这都是定位问题最直接的办法,review的时候重点来看看
    1:统一的错误日志处理模块,并且强制开发人员使用 2:错误信息分级,可参考The BSD syslog Protocol
    1. 规范日志格式, 2. 日志分级, 3. 日志分区
     编码规范, 监控日志, 天天给他发短信!
    上线前,清除或降级 调试信息 是用log4j 控制日志的类型
    在一个整体项目中, 规范日志格式和日志分级是需要先行的标准, 这样用tail grep就能比较方便了, 当然格式规范的话辅佐以awk, 那就更能事倍功半了.
    最终方案:
    规范日志格式,日志分级、日志分区
    统一日志的输出口径,采用remoting后台服务监控日志的写入,异步队列 的插入日志  
    存入数据库、文件、nosql,建立可视化的界面查看日志。
    监控日志,某些日志到达一定数量发送监控短信通知。

     

    转载于:https://www.cnblogs.com/visionwang/archive/2012/09/12/2682641.html

    展开全文
  • 双机冷备一不小心出了问题

    千次阅读 2012-12-08 14:19:38
    上线已经一年的系统,昨天突然甲方的项目经理打电话说系统问题了,一个小弟过去查了半天愣是没有找到怎么回事。 好吧!下午我就过去了。系统出现问题很是诡异: 本来我们系统每天都会从ODS系统获取数据文件并...

    上线已经一年的系统,昨天突然甲方的项目经理打电话说系统出问题了,一个小弟过去查了半天愣是没有找到怎么回事。

    好吧!下午我就过去了。系统出现的问题很是诡异:

    本来我们系统每天都会从ODS系统获取数据文件并导入到我们系统中,导入完成后会在某个表中记录导入成功,

    程序运行的情况很是奇怪:表中的数据导入记录是最新的而本地要导入的文件却是很早以前的。

    通过这些现象分析问题:

    问题一:为什么本地文件是很早以前的?

    分析:本地程序肯定没有执行;

    验证:我就将crontab中要自动执行的脚本手动执行了一下发现要执行的shell没有执行权限。

    修改:给shell脚本赋执行权限后OK;

    问题二、为什么数据表中的导入记录怎么是新的?

    分析:数据肯定有其他来源;

    验证:去其他用户下找会不会有相同功能执行的脚本——发现没有,最后看看备机是不是启动的ping了一下果然是通的,然后登录到备机上发现果然有指向当前机器的脚本被运行,修改后OK;

     

    总结一下:问题出在机器切换是没有对Crontab进行整体全面的梳理!

    展开全文
  • 什么我们放弃使用MongoDB

    千次阅读 2011-06-03 06:31:09
    公司开始项目时部分数据服务选择...直到几天前,在考虑系统上线发布和运营时,一些问题出现了。   1.MongoDB存储文件急剧增大,如果使用32位操作系统很容易达到单个文件体积上限。所以对于MongoDB必须使用6...
  • 并通过基于 Node.js 实现的命令行工具将构建结果发布上线。前端开发开始变得规范化、标准化、工程化。 <p><a name="19b496a8"></a></p> 基于 Node.js 的全栈开发 <p>Node.js 对前端的重要意义还有,以往只能...
  • 版本管理的基本含义: 版本管理是为满足不同需求,对同一产品或系统进行局部的改进和改型所产生的产品或系统系列的变更情况进行记录、跟踪、维护...你说简单,直接把5000行代码去掉就行了,但是现在会出现一个问题...
  • 启用环境后前台输入需要检索的关键词,后台总会报“164_571_use_949a8355491898090149189809d11171/0.fnm (no such file or directory)“,而且此问题间歇出现,开始非常纳闷为什么程序在索引文件夹后面莫名其妙...
  •  但无巧不成书,昨天主管请假了,我只能和组内的另一个人上,生怕会出现什么问题,可怕什么来什么。中间还是出事了,造成线上的系统访问时页面直接出来了错误堆栈,我当时都傻眼了,经查是发布的模板与原来的模板...
  • Gh0st源码+去硬盘锁

    热门讨论 2011-12-12 12:16:53
    4:为什么有时会出现重复上线问题 答:服务端发生异常,就会重新连接创建第二个连接,但是第一个连接已经断开了,控制端在其心跳超时的情况下,会删除 心跳超时为1分钟,也就是说,重复上线情况会持续一分钟,自然就没有...
  • 所 实话这个介面还真不多见,但跟这个相似的就是前后端连接超时也会出现一个提示页面,但内容不太一样。这下我就急了,难道被人黑了?因为最近老有朋友说他的 网站被挂马了,所以我一个冲上来的念头就是被人黑了。...
  • 6、为什么打开控制端连接一个IP的80端口? 答:查找升级,找到提示升级,如不想要升级提示,‘设置’下拉在‘升级提示’选项去勾! ----------------------------------------------------------------------...
  • 如果用人工检查的方式,这些bug可能很难才被发现,或许永远也无法发现,直到运行时发作…当除掉了这些典型的(classic) bug后,可以确信的是,我们的系统稳定度将会上一个的台阶。 以目前遇到的状况来看,...
  • 6、为什么打开控制端连接一个IP的80端口? 答:查找升级,找到提示升级,如不想要升级提示,‘设置’下拉在‘升级提示’选项去勾! -----------------------------------------------------------------------...
  • 6、为什么打开控制端连接一个IP的80端口? 答:查找升级,找到提示升级,如不想要升级提示,‘设置’下拉在‘升级提示’选项去勾! -----------------------------------------------------------------------...
  • 2019数据运营思维导图

    2019-03-29 21:34:09
    渠道是否存在刷量 什么渠道/用户启动次数多 日均使用时长 定义 活跃用户每日平均在线时长 解决问题 游戏的参与度怎么样 产品质量把控指标,游戏粘度如何 渠道质量如何 与单次使用时长结合分析留存和流失问题 用户...
  • 数据运营思维导图

    2018-04-26 14:24:22
    运营活动、服务器问题、版本更新(bug、版本用户不接受) app生态 用户成长体系是否健康 用户调研 用户留存分析流程 第一步:分组 按照不同的(时间/渠道/行为等)维度进行用户分组 时间分组 通常用于看...

空空如也

空空如也

1 2 3 4 5 6
收藏数 109
精华内容 43
关键字:

新上线系统会出现什么问题