精华内容
下载资源
问答
  • 埋点统计自定义上报

    千次阅读 2017-07-31 11:39:01
    /**程序切前台,后台 用户登录 页面载入载出 上报*/ - (void)reportActivePageID:(NSString *)pageId Status:(ReportType)status; /**app启动 */ /**app关闭 */ /**TEST */ - (void)test; @end #import "Report...
    #import <Foundation/Foundation.h>
    #import "SingtonDefine.h"
    
    typedef void (^SuccessResponseHandler)(id responseData);
    typedef void (^FailureResponseHandler)(NSError *error);
    
    typedef enum {
        
        /**启动APP */
        ReportTypeStart = 1,
        /**关闭APP */
        ReportTypeClose = 2,
        /**APP从后台切换到前台 */
        ReportTypeToForeground = 3,
        /**APP从前台切换到后台 */
        ReportTypeEnterBackground = 4,
        /**打开页面(新创建或者从后台切换到前台 */
        ReportTypeOpenPage = 5,
        /**关闭页面(释放或者从前台切换到后台) */
        ReportTypeClosePage = 6,
        /**页面从后台切换到前台 */
        ReportTypePageForeground = 7,
        /**页面从前台切换到后台 */
        ReportTypePageBackground = 8,
        /**页面从前台切换到后台 */
        ReportTypeOpenNewPage = 9,
        /**关闭页面(释放) */
        ReportTypeClosePageRelease = 10,
        /**用户触发(点击/触摸) */
        ReportTypeUserTouchOrClick = 11,
        /**系统触发 */
        ReportTypeSystemTrigger = 12
        
    } ReportType;
    
    @interface ReportEngine : NSObject
    
    /**开始 */
    - (void)start;
    /**结束 */
    - (void)stop;
    
    /**创建单例 */
    SHARED_INTERFACE(ReportEngine)
    
    /**捕获程序异常 */
    void UncaughtExceptionHandler(NSException *exception);
    
    /**获取用户基本信息 */
    - (NSString *)getUserBaseInformation;
    
    /**上传错误日志 */
    - (void)setupTrack;
    
    /**上传数据 */
    #pragma mark - Post
    - (void)post:(NSString *)uri params:(NSDictionary *)params success:(SuccessResponseHandler)successHandler failure:(FailureResponseHandler)failureHandler;
    
    /**程序切前台,后台 用户登录 页面载入载出 上报*/
    - (void)reportActivePageID:(NSString *)pageId Status:(ReportType)status;
    
    /**app启动 */
    
    /**app关闭 */
    
    /**TEST */
    - (void)test;
    
    @end
    
    #import "ReportEngine.h"
    #import <UIKit/UIKit.h>
    #include <ifaddrs.h>
    #include <arpa/inet.h>
    #import "AFNetworking.h"
    #import "ServerHostDefines.h"
    
    #define XcodeAppVersion        [[[NSBundle mainBundle] infoDictionary] objectForKey:@"CFBundleShortVersionString"]
    #define kTimeoutTimeinterval   10.0
    
    #define kPageLogRequestServer  RequestURL([EnvironmentManager sharedInstance].environmentModel.ReportRequestServer, @"/monitor/collectPageLog")
    #define kErrorLogRequestServer RequestURL([EnvironmentManager sharedInstance].environmentModel.ReportRequestServer, @"/monitor/collectPageLog")
    
    @interface ReportEngine()
    
    /**用户ID 如果没有获取到用户ID则为0 */
    @property (nonatomic, strong) NSString *userID;
    /**token 缺省:空字符串*/
    @property (nonatomic, copy) NSString *token;
    /**城市ID 缺省:0*/
    @property (nonatomic, assign) NSInteger cityID;
    /**经纬度 格式为:经度,纬度(中间用逗号分隔)*/
    @property (nonatomic, copy) NSString *accuracyandLatitude;
    /**应用ID 用户APPiOS版:200*/
    @property (nonatomic, assign) NSInteger applicationID;
    /**应用版本Code APP项目编译打包时配置的版本Code*/
    @property (nonatomic, assign) NSInteger versionID;
    /**渠道号 缺省:0 APP打包时配置的渠道号数值*/
    @property (nonatomic, assign) NSInteger channelID;
    /**最近升级时间 格式为:yyyyMMdd*/
    @property (nonatomic, assign) NSInteger updateRecentlyTime;
    /**手机品牌 调用系统API获取的手机品牌参数*/
    @property (nonatomic, copy) NSString  *mobileBrand;
    /**手机型号 调用系统API获取的手机型号参数*/
    @property (nonatomic, copy) NSString *mobileModel;
    /**操作系统 iOS:2*/
    @property (nonatomic, assign) NSInteger systermModel;
    /**系统版本Code 调用系统API获取的系统版本整数值*/
    @property (nonatomic, assign) NSInteger systermVersionCode;
    /**DeviceID 设备的唯一ID*/
    @property (nonatomic, copy) NSString *deviceID;
    /**调用系统API获取的网络接入点参数 */
    @property (nonatomic, copy) NSString *mobileAPN;
    
    @end
    
    @implementation ReportEngine
    
    SHARED_IMPLEMENTATION(ReportEngine)
    
    /**开始 */
    - (void)start {
        
        @synchronized (self)
        {
            
            [self runMonitoring];
            // [self threadForMonitoring];
            // [self report];
        }
    }
    
    /**结束 */
    - (void)stop {
        
    //    [[NSUserDefaults standardUserDefaults] setObject:self.reportURLArray forKey:kUserDefaultsReport];
    //    
    //#ifdef DEBUG
    //    [self saveReportLog];
    //#endif
    }
    
    /**获取日志崩溃信息 */
    void UncaughtExceptionHandler(NSException *exception) {
        /**
         *  获取异常崩溃信息
         */
        NSArray *callStack = [exception callStackSymbols];
        NSString *reason = [exception reason];
        NSString *name = [exception name];
        NSString *content = [NSString stringWithFormat:@"========异常错误报告========\nname:%@\nreason:\n%@\ncallStackSymbols:\n%@",name,reason,[callStack componentsJoinedByString:@"\n"]];
        
        // NSLog(@"--------%@---------",content);
        /**
         *  把异常崩溃信息写入文件
         */
        NSUserDefaults *errLog = [NSUserDefaults standardUserDefaults];
        
        // 首先判断之前是否有内容 如果有则插入
        NSString *errLogStr = [errLog objectForKey:@"ErrorLog"];
    
        if ([errLogStr isEqualToString:@""] || errLogStr == nil) {
            [errLog setObject:content forKey:@"ECEJErrorLog"];
            
        } else {
            
            NSMutableString *logStr = [NSMutableString stringWithString:errLogStr];
            [logStr appendString:content];
            [errLog setObject:logStr forKey:@"ECEJErrorLog"];
        }
    }
    
    - (void)runMonitoring {
        
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
            
            CFRunLoopSourceContext context = {0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL};
            
            // 和模式相关的源才会被监视并允许他们传递事件消息,源有 输入源 定时源
            CFRunLoopSourceRef source = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &context);
            
            NSTimer *timer = [NSTimer scheduledTimerWithTimeInterval:90.0
                                                              target:self
                                                            selector:@selector(report)
                                                            userInfo:nil
                                                             repeats:YES];
            
            CFRunLoopAddSource(CFRunLoopGetCurrent(), source, kCFRunLoopDefaultMode);
            
            // 输入源和runloop模式关联
            CFRunLoopAddTimer(CFRunLoopGetCurrent(), (__bridge CFRunLoopTimerRef)timer, kCFRunLoopDefaultMode);
            
            [timer fire];
            
            BOOL runAlways = YES;
            while (runAlways)
            {
                @autoreleasepool
                {
                    CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0e10, true);
                }
            }
            
            [timer invalidate];
            CFRunLoopRemoveTimer(CFRunLoopGetCurrent(), (__bridge CFRunLoopTimerRef)timer, kCFRunLoopDefaultMode);
            CFRunLoopRemoveSource(CFRunLoopGetCurrent(), source, kCFRunLoopDefaultMode);
            CFRelease(source);
        });
    }
    
    #pragma mark - Runloop
    - (NSThread *)threadForMonitoring
    {
        static NSThread* s_thread;
        
        if (!s_thread)
        {
            @synchronized (self)
            {
                if (!s_thread)
                {
                    s_thread = [[NSThread alloc] initWithTarget:self selector:@selector(runMonitoring) object:nil];
                    [s_thread setName:@"LTReportEngine"];
                    [s_thread start];
                }
            }
        }
        return s_thread;
    }
    
    #pragma mark - Post
    - (void)post:(NSString *)uri params:(NSDictionary *)params success:(SuccessResponseHandler)successHandler failure:(FailureResponseHandler)failureHandler
    {
        if (!uri)
        {
            NSLog(@"Error, requestWithCmd cmd is nil");
            return;
        }
        
        // 创建管理类
        AFHTTPSessionManager *manager = [self sharedHTTPSessionManager];
        
        // 设置请求体的Header 头部
        [manager.requestSerializer setValue:[self getUserBaseInformation] forHTTPHeaderField:@"baseMsg"];
        
        // 利用方法请求数据
        // NSLog(@"Request URL:\n%@\n\n%@", uri, [params description]);
        [manager POST:uri parameters:params progress:nil success:^(NSURLSessionDataTask * _Nonnull task, id  _Nullable responseObject) {
            if (successHandler)
            {
                [self handleRequest:task resesponseData:responseObject forSuccessHandler:successHandler];
            }
        } failure:^(NSURLSessionDataTask * _Nullable task, NSError * _Nonnull error) {
            if (error && failureHandler)
            {
                failureHandler(error);
                return;
            }
        }];
    }
    
    #pragma mark - Response Handler
    - (void)handleRequest:(NSURLSessionDataTask *)task resesponseData:(id)responseData forSuccessHandler:(SuccessResponseHandler)successHandler
    {
        // NSLog(@"response for %@", task.originalRequest.URL.absoluteURL);
        
        if (successHandler)
        {
            successHandler(responseData);
        }
    }
    
    #pragma mark - Config
    - (AFHTTPSessionManager *)sharedHTTPSessionManager
    {
        // 创建管理类
        AFHTTPSessionManager *manager = [AFHTTPSessionManager manager];
        
        // 设置超时时间
        [manager.requestSerializer willChangeValueForKey:@"timeoutInterval"];
        manager.requestSerializer.timeoutInterval = kTimeoutTimeinterval;
        [manager.requestSerializer didChangeValueForKey:@"timeoutInterval"];
        
        // 设置二进制数据,数据格式默认json
        manager.responseSerializer = [AFHTTPResponseSerializer serializer];
        
        return manager;
    }
    
    - (void)report {
        
        // 读取文件 每隔90秒上报一次 并且将上次数据清除
        NSError *error;
        NSArray *paths  = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory,NSUserDomainMask,YES);
        NSString *homePath = [paths objectAtIndex:0];
        
        NSString *filePath = [homePath stringByAppendingPathComponent:@"logMsg.txt"];
    
        NSString *contentString = [NSString stringWithContentsOfFile:filePath encoding:NSUTF8StringEncoding error:&error];
        
        if (contentString == nil) {
            return;
        } else {
            
            [self post:kPageLogRequestServer params:@{@"logMsg":contentString} success:^(id responseData) {
                
                    [self removeFile];
                    NSLog(@"YES");
            } failure:^(NSError *error) {
            
                NSLog(@"NO");
            }];
        }
        
    }
    
    /**删除本地文件 */
    - (void)removeFile {
        
        NSArray *paths = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, YES);
        NSString *documentsDirectory = [paths objectAtIndex:0];
        
        NSFileManager *fileMgr = [NSFileManager defaultManager];
        NSString *MapLayerDataPath = [documentsDirectory stringByAppendingPathComponent:@"logMsg.txt"];
        BOOL bRet = [fileMgr fileExistsAtPath:MapLayerDataPath];
        if (bRet) {
            // 删除
            NSError *err;
            [fileMgr removeItemAtPath:MapLayerDataPath error:&err];
        }
    }
    
    /**程序切前台,后台 用户登录 页面载入载出 上报*/
    - (void)reportActivePageID:(NSString *)pageId Status:(ReportType)status {
        
        // 用户行为数据格式如下:
        // 时间戳/PageID或事件ID/事件类型
        
        /**
         *  时间戳: 发生事件时的时间,时间戳格式,单位毫秒 比如:1469529583356
            PageID: 在APP里为每个页面定义一个Page ID,可以是英文、中文,也可以是类名 比如:home、个人中心
            事件ID: 为事件命名一个名称,比如:定位城市、点击首页广告页
            事件类型:
            常量值:
            1:启动APP
            2:关闭APP
            3:APP从后台切换到前台
            4:APP从前台切换到后台
            5:打开页面(新创建或者从后台切换到前台)
            6:关闭页面(释放或者从前台切换到后台)
            7:页面从后台切换到前台
            8:页面从前台切换到后台
            9:打开页面(新创建)
            10:关闭页面(释放)
            11:用户触发(点击/触摸)
            12:系统触发
         */
        
        //时间戳  格式化时间
        NSDate * date = [NSDate date];
        NSDateFormatter * dateFormatter = [[NSDateFormatter alloc] init];
        dateFormatter.dateFormat = @"yyyyMMddhhmmssSSS";
        NSString * dateStr = [dateFormatter stringFromDate:date];
        
        NSString *writeToFileString = [NSString stringWithFormat:@"%@/%@/%u",dateStr,pageId,status];
        /**将事件以条目形式写入文件 */
        [self writefile:writeToFileString];
    }
    
    /**获取用户基础数据 */
    - (NSString *)getUserBaseInformation {
        
        NSMutableString *getUserBaseStr = [[NSMutableString alloc] init];
        // 用户ID
        self.userID = PublicData.sharedInstance.loginUser.phone ? : @"0";
        [getUserBaseStr appendFormat:@"%@/",self.userID];
        // token
        self.token = [PublicData sharedInstance].loginUser.token;
        [getUserBaseStr appendFormat:@"%@/",self.self.token];
        // 城市ID
        self.cityID = PublicData.sharedInstance.cityModel.cityId.integerValue?:0;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.self.cityID];
        // 经纬度
        self.accuracyandLatitude = [NSString stringWithFormat:@"%@,%@",PublicData.sharedInstance.cityModel.latitude,PublicData.sharedInstance.cityModel.longitude];
        [getUserBaseStr appendFormat:@"%@/",self.self.accuracyandLatitude];
        // 应用ID
        self.applicationID = 200;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.applicationID];
        // 应用版本Code
        self.versionID = ((NSString *)XcodeAppVersion).integerValue;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.versionID];
        // 渠道号
        self.channelID = 0;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.channelID];
        // 最近升级时间
        self.updateRecentlyTime = 0;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.updateRecentlyTime
         ];
        // 手机品牌 手机厂商
        self.mobileBrand = @"";
        [getUserBaseStr appendFormat:@"%@/",self.mobileBrand];
        // 手机型号
        self.mobileModel =  [[UIDevice currentDevice] model];
        [getUserBaseStr appendFormat:@"%@/",self.mobileModel];
        // 操作系统
        self.systermModel = 2;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.systermModel];
        // 系统版本
        self.systermVersionCode = [[UIDevice currentDevice] systemVersion].integerValue;
        [getUserBaseStr appendFormat:@"%ld/",(long)self.systermVersionCode];
        self.deviceID = [[UIDevice currentDevice] identifierForVendor].stringObject;
        [getUserBaseStr appendFormat:@"%@/",self.deviceID];
        self.mobileAPN = [self getNetWorkStates];// [self deviceIPAdress];
        [getUserBaseStr appendFormat:@"%@/",self.mobileAPN];
        
        return getUserBaseStr;
    }
    
    /**上传错误日志 */
    - (void)setupECEJTrack {
        
        NSUserDefaults *errLog = [NSUserDefaults standardUserDefaults];
        NSString *errLogStr = [errLog objectForKey:@"errorLog"];
        
        if ([errLogStr isEqualToString:@""] || errLogStr == nil) {
            return;
        } else {
        
            [self post:kErrorLogRequestServer params:@{@"logMsg":errLogStr?:@""} success:^(id responseData) {
           
                NSLog(@"YES");
                [errLog setObject:@"" forKey:@"ECEJErrorLog"];
            } failure:^(NSError *error) {
            
                NSLog(@"NO");
            }];
        }
    }
    
    /**将用户触发事件插入文件 */
    - (void)writefile:(NSString *)string {
        
        NSArray *paths  = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory,NSUserDomainMask,YES);
        NSString *homePath = [paths objectAtIndex:0];
        
        NSString *filePath = [homePath stringByAppendingPathComponent:@"logMsg.txt"];
        
        NSFileManager *fileManager = [NSFileManager defaultManager];
        
        // 如果不存在
        if(![fileManager fileExistsAtPath:filePath]) {
            
         [@"" writeToFile:filePath atomically:YES encoding:NSUTF8StringEncoding error:nil];
            
        }
        
        NSFileHandle *fileHandle = [NSFileHandle fileHandleForUpdatingAtPath:filePath];
        
        // 将节点跳到文件的末尾
        [fileHandle seekToEndOfFile];
        // NSDateFormatter *dateFormatter = [[NSDateFormatter alloc] init];
        // [dateFormatter setDateFormat:@"yyyy-MM-dd HH:mm:ss"];
        //  NSString *datestr = [dateFormatter stringFromDate:[NSDate date]];
        
        NSString *str = [NSString stringWithFormat:@"%@\n",string];
        
        NSData* stringData  = [str dataUsingEncoding:NSUTF8StringEncoding];
        
        // 追加写入数据
        [fileHandle writeData:stringData];
        
        [fileHandle closeFile];
    }
    
    
    /**获取IP地址 */
    - (NSString *)deviceIPAdress {
        
        NSString *address = @"an error occurred when obtaining ip address";
        struct ifaddrs *interfaces = NULL;
        struct ifaddrs *temp_addr = NULL;
        int success = 0;
        
        success = getifaddrs(&interfaces);
        
        if (success == 0) { // 0 表示获取成功
            
            temp_addr = interfaces;
            while (temp_addr != NULL) {
                if( temp_addr->ifa_addr->sa_family == AF_INET) {
                    // Check if interface is en0 which is the wifi connection on the iPhone
                    if ([[NSString stringWithUTF8String:temp_addr->ifa_name] isEqualToString:@"en0"]) {
                        // Get NSString from C String
                        address = [NSString stringWithUTF8String:inet_ntoa(((struct sockaddr_in *)temp_addr->ifa_addr)->sin_addr)];
                    }
                }
                
                temp_addr = temp_addr->ifa_next;
            }
        }
        freeifaddrs(interfaces);
        // NSLog(@"手机的IP是:%@", address);
        return address;
    }
    
    /**获取当前网络设备连接网络状态 */
    - (NSString *)getNetWorkStates{
       
        UIApplication *app = [UIApplication sharedApplication];
        NSArray *children = [[[app valueForKeyPath:@"statusBar"]valueForKeyPath:@"foregroundView"]subviews];
        NSString *state = [[NSString alloc]init];
        int netType = 0;
        //获取到网络返回码
        for (id child in children) {
            if ([child isKindOfClass:NSClassFromString(@"UIStatusBarDataNetworkItemView")]) {
                //获取到状态栏
                netType = [[child valueForKeyPath:@"dataNetworkType"]intValue];
                
                switch (netType) {
                    case 0:
                        state = @"无网络";
                        //无网模式
                        break;
                    case 1:
                        state =  @"2G";
                        break;
                    case 2:
                        state =  @"3G";
                        break;
                    case 3:
                        state =   @"4G";
                        break;
                    case 5:
                    {
                        state =  @"wifi";
                        break;
                    default:
                        break;
                    }
                }
            }
            //根据状态选择
        }
        return state;
    }
    
    /**TEST */
    - (void)test {
        
    //    // 日志保存 设置成功 每一次启动将上次的崩溃日志发送
    //    NSUserDefaults *errLog = [NSUserDefaults standardUserDefaults];
    //    NSLog(@"%@", [errLog objectForKey:@"errorLog"]);
        
    }
    
    @end
    


    展开全文
  • 11 kafka 用户日志上报

    千次阅读 2018-05-01 19:55:10
    11 kafka 用户日志上报更多干货分布式实战(干货)spring cloud 实战(干货)mybatis 实战(干货)spring boot 实战(干货)React 入门实战(干货)构建中小型互联网企业架构(干货)python 学习持续更新Elastic...

    11 kafka 用户日志上报

    更多干货

    一、概述

    对网站产生的用户访问日志进行处理并分析出该网站在某天的PV、UV等数据,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作:

    • 1.如何一步步构建我们的实时处理系统(Flume+Kafka+Storm+Redis)
    • 2.实时处理网站的用户访问日志,并统计出该网站的PV、UV
    • 3.将实时分析出的PV、UV动态地展示在我们的前面页面上

    实时处理系统架构

    image

    即从上面的架构中我们可以看出,其由下面的几部分构成:

    • Flume集群
    • Kafka集群
    • Storm集群

    从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间打通(从上面的图示中也能很好地说明这一点),即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Storm的整合。当然,各个环境是否使用集群,依个人的实际需要而定,在我们的环境中,Flume、Kafka、Storm都使用集群。

    kafka概述

    image

    整体流程预览如下图所示

    image

    数据源生产

    image

    数据源消费 image

    二、Flume+Kafka整合

    image

    整合思路

    对于Flume而言,关键在于如何采集数据,并且将其发送到Kafka上,并且由于我们这里了使用Flume集群的方式,Flume集群的配置也是十分关键的。而对于Kafka,关键就是如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。

    整合过程:Flume集群配置与Kafka Topic创建

    Flume集群配置

    image

    在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent.

    1、Flume Agent01

    该Flume Agent部署在一台Web服务器上,用来采集产生的Web日志,然后发送到Flume Consolidation Agent上,创建一个新的配置文件flume-sink-avro.conf,其配置内容如下:

    #########################################################
    ##
    ##主要作用是监听文件中的新增数据,采集到数据之后,输出到avro
    ##    注意:Flume agent的运行,主要就是配置source channel sink
    ##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
    #########################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #对于source的配置描述 监听文件中的新增数据 exec
    a1.sources.r1.type = exec
    a1.sources.r1.command  = tail -F /home/uplooking/data/data-clean/data-access.log
    
    #对于sink的配置描述 使用avro日志做数据的消费
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = uplooking03
    a1.sinks.k1.port = 44444
    
    #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
    a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
    
    #通过channel c1将source r1和sink k1关联起来
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    配置完成后, 启动Flume Agent,即可对日志文件进行监听:

    $ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
    

    2、 Flume Agent02

    该Flume Agent部署在一台Web服务器上,用来采集产生的Web日志,然后发送到Flume Consolidation Agent上,创建一个新的配置文件flume-sink-avro.conf,其配置内容如下:

    #########################################################
    ##
    ##主要作用是监听文件中的新增数据,采集到数据之后,输出到avro
    ##    注意:Flume agent的运行,主要就是配置source channel sink
    ##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
    #########################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #对于source的配置描述 监听文件中的新增数据 exec
    a1.sources.r1.type = exec
    a1.sources.r1.command  = tail -F /home/uplooking/data/data-clean/data-access.log
    
    #对于sink的配置描述 使用avro日志做数据的消费
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = uplooking03
    a1.sinks.k1.port = 44444
    
    #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
    a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
    
    #通过channel c1将source r1和sink k1关联起来
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    配置完成后, 启动Flume Agent,即可对日志文件进行监听:

    $ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
    

    3、 Flume Consolidation Agent

    image

    该Flume Agent用于接收其它两个Agent发送过来的数据,然后将其发送到Kafka上,创建一个新的配置文件flume-source_avro-sink_kafka.conf,配置内容如下:

    #########################################################
    ##
    ##主要作用是监听目录中的新增文件,采集到数据之后,输出到kafka
    ##    注意:Flume agent的运行,主要就是配置source channel sink
    ##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
    #########################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #对于source的配置描述 监听avro
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 44444
    
    #对于sink的配置描述 使用kafka做数据的消费
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = f-k-s
    a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    
    #对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #通过channel c1将source r1和sink k1关联起来
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1   
    

    配置完成后, 启动Flume Agent,即可对avro的数据进行监听:

    $ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/null 2>&1 &
    

    4、 Kafka配置

    在我们的Kafka中,先创建一个topic,用于后面接收Flume采集过来的数据

    kafka-topics.sh --create --topic f-k-s  --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
    

    5、整合验证

    启动Kafka的消费脚本

    $ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    

    如果在Web服务器上有新增的日志数据,就会被我们的Flume程序监听到,并且最终会传输到到Kafka的f-k-stopic中,这里作为验证,我们上面启动的是一个kafka终端消费的脚本,这时会在终端中看到数据的输出:

    $ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
    1003    221.8.9.6 80    0f57c8f5-13e2-428d-ab39-9e87f6e85417    10709   0       GET /index HTTP/1.1     null    null      Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1  1523107496164
    1002    220.194.55.244  fb953d87-d166-4cb4-8a64-de7ddde9054c    10201   0       GET /check/detail HTTP/1.1      null      null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523107497165
    1003    211.167.248.22  9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48    10022   1       GET /user/add HTTP/1.1  null    null      Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0)       1523107496664
    1002    61.172.249.96   null    10608   0       POST /updateById?id=21 HTTP/1.1 null    null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko      1523107498166
    1000    202.98.11.101   aa7f62b3-a6a1-44ef-81f5-5e71b5c61368    20202   0       GET /getDataById HTTP/1.0       404       /check/init     Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666
    

    这样的话,我们的整合就没有问题,当然kafka中的数据应该是由我们的storm来进行消费的,这里只是作为整合的一个测试,下面就会来做kafka+storm的整合。

    三、Kafka+Storm整合

    1、整合思路

    在这次的大数据实时处理系统的构建中,Kafka相当于是作为消息队列(或者说是消息中间件)的角色,其产生的消息需要有消费者去消费,所以Kafka与Storm的整合,关键在于我们的Storm如何去消费Kafka消息topic中的消息(kafka消息topic中的消息正是由Flume采集而来,现在我们需要在Storm中对其进行消费)。

    在Storm中,topology是非常关键的概念。

    对比MapReduce,在MapReduce中,我们提交的作业称为一个job,在一个Job中,又包含若干个Mapper和Reducer,正是在Mapper和Reducer中有我们对数据的处理逻辑:

    image

    在Storm中,我们提交的一个作业称为topology,其又包含了spout和bolt,在Storm中,对数据的处理逻辑正是在spout和bolt中体现:

    image

    即在spout中,正是我们数据的来源,又因为其实时的特性,所以可以把它比作一个“水龙头”,表示其源源不断地产生数据:

    image

    所以,问题的关键是spout如何去获取来自kafka的数据?

    好在,storm-kafka的整合库中提供了这样的API来供我们进行操作。

    2、整合过程:KafkaSpout的应用

    在代码的逻辑中只需要创建一个由storm-kafkaAPI提供的KafkaSpout对象即可

    SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
    return new KafkaSpout(spoutConf);
    

    下面给出完整的整合代码

    package cn.xpleaf.bigdata.storm.statics;
    
    import kafka.api.OffsetRequest;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * Kafka和storm的整合,用于统计实时流量对应的pv和uv
     */
    public class KafkaStormTopology {
    
        //    static class MyKafkaBolt extends BaseRichBolt {
        static class MyKafkaBolt extends BaseBasicBolt {
    
            /**
             * kafkaSpout发送的字段名为bytes
             */
            @Override
            public void execute(Tuple input, BasicOutputCollector collector) {
                byte[] binary = input.getBinary(0); // 跨jvm传输数据,接收到的是字节数据
    //            byte[] bytes = input.getBinaryByField("bytes");   // 这种方式也行
                String line = new String(binary);
                System.out.println(line);
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            /**
             * 设置spout和bolt的dag(有向无环图)
             */
            KafkaSpout kafkaSpout = createKafkaSpout();
            builder.setSpout("id_kafka_spout", kafkaSpout);
            builder.setBolt("id_kafka_bolt", new MyKafkaBolt())
                    .shuffleGrouping("id_kafka_spout"); // 通过不同的数据流转方式,来指定数据的上游组件
            // 使用builder构建topology
            StormTopology topology = builder.createTopology();
            String topologyName = KafkaStormTopology.class.getSimpleName();  // 拓扑的名称
            Config config = new Config();   // Config()对象继承自HashMap,但本身封装了一些基本的配置
    
            // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
            if (args == null || args.length < 1) {  // 没有参数时使用本地模式,有参数时使用集群模式
                LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
                localCluster.submitTopology(topologyName, config, topology);
            } else {
                StormSubmitter.submitTopology(topologyName, config, topology);
            }
        }
    
        /**
         * BrokerHosts hosts  kafka集群列表
         * String topic       要消费的topic主题
         * String zkRoot      kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量)
         * String id          当前操作的标识id
         */
        private static KafkaSpout createKafkaSpout() {
            String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
            BrokerHosts hosts = new ZkHosts(brokerZkStr);   // 通过zookeeper中的/brokers即可找到kafka的地址
            String topic = "f-k-s";
            String zkRoot = "/" + topic;
            String id = "consumer-id";
            SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
            // 本地环境设置之后,也可以在zk中建立/f-k-s节点,在集群环境中,不用配置也可以在zk中建立/f-k-s节点
            //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"});
            //spoutConf.zkPort = 2181;
            spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设置之后,刚启动时就不会把之前的消费也进行读取,会从最新的偏移量开始读取
            return new KafkaSpout(spoutConf);
        }
    }
    

    其实代码的逻辑非常简单,我们只创建了 一个由storm-kafka提供的KafkaSpout对象和一个包含我们处理逻辑的MyKafkaBolt对象,MyKafkaBolt的逻辑也很简单,就是把kafka的消息打印到控制台上。

    需要注意的是,后面我们分析网站PV、UV的工作,正是在上面这部分简单的代码中完成的,所以其是非常重要的基础。

    3、整合验证

    上面的整合代码,可以在本地环境中运行,也可以将其打包成jar包上传到我们的Storm集群中并提交业务来运行。如果Web服务器能够产生日志,并且前面Flume+Kafka的整合也没有问题的话,将会有下面的效果。

    如果是在本地环境中运行上面的代码,那么可以在控制台中看到日志数据的输出:

    ......
    45016548 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections
    45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=f-k-s, partitionMap={0=uplooking02:9092, 1=uplooking03:9092, 2=uplooking01:9092}}
    45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=uplooking02:9092, topic=f-k-s, partition=0}, Partition{host=uplooking03:9092, topic=f-k-s, partition=1}, Partition{host=uplooking01:9092, topic=f-k-s, partition=2}]
    45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: []
    45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: []
    45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing
    1003    221.8.9.6 80    0f57c8f5-13e2-428d-ab39-9e87f6e85417    10709   0   GET /index HTTP/1.1 null    null    Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1  1523107496164
    1000    202.98.11.101   aa7f62b3-a6a1-44ef-81f5-5e71b5c61368    20202   0   GET /getDataById HTTP/1.0   404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666
    1002    220.194.55.244  fb953d87-d166-4cb4-8a64-de7ddde9054c    10201   0   GET /check/detail HTTP/1.1  null    null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523107497165
    1003    211.167.248.22  9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48    10022   1   GET /user/add HTTP/1.1  null    null    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0)   1523107496664
    1002    61.172.249.96   null    10608   0   POST /updateById?id=21 HTTP/1.1 null    null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523107498166
    ......
    

    如果是在Storm集群中提交的作业运行,那么也可以在Storm的日志中看到Web服务器产生的日志数据

    image

    这样的话就完成了Kafka+Storm的整合

    四、Storm+Redis整合

    image

    1、整合思路

    其实所谓Storm和Redis的整合,指的是在我们的实时处理系统中的数据的落地方式,即在Storm中包含了我们处理数据的逻辑,而数据处理完毕后,产生的数据处理结果该保存到什么地方呢?显然就有很多种方式了,关系型数据库、NoSQL、HDFS、HBase等,这应该取决于具体的业务和数据量,在这里,我们使用Redis来进行最后分析数据的存储。

    所以实际上做这一步的整合,其实就是开始写我们的业务处理代码了,因为通过前面Flume-Kafka-Storm的整合,已经打通了整个数据的流通路径,接下来关键要做的是,在Storm中,如何处理我们的数据并保存到Redis中。

    而在Storm中,spout已经不需要我们来写了(由storm-kafka的API提供了KafkaSpout对象),所以问题就变成,如何根据业务编写分析处理数据的bolt。

    整合过程:编写Storm业务处理Bolt

    日志分析

    我们实时获取的日志格式如下:

    1002    202.103.24.68   1976dc2e-f03a-44f0-892f-086d85105f7e    14549   1       GET /top HTTP/1.1       200     /tologin  Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806916373
    1000    221.8.9.6 80    542ccf0a-9b14-49a0-93cd-891d87ddabf3    12472   1       GET /index HTTP/1.1     500     /top      Mozilla/4.0 (compatible; MSIE 5.0; WindowsNT)   1523806916874
    1003    211.167.248.22  0e4c1875-116c-400e-a4f8-47a46ad04a42    12536   0       GET /tologin HTTP/1.1   200     /stat     Mozilla/5.0 (Windows; U; Windows NT 5.2) AppleWebKit/525.13 (KHTML,like Gecko) Chrome/0.2.149.27 Safari/525.13    1523806917375
    1000    219.147.198.230 07eebc1a-740b-4dac-b53f-bb242a45c901    11847   1       GET /userList HTTP/1.1  200     /top      Mozilla/4.0 (compatible; MSIE 6.0; Windows NT5.1)       1523806917876
    1001    222.172.200.68  4fb35ced-5b30-483b-9874-1d5917286675    13550   1       GET /getDataById HTTP/1.0       504       /tologin        Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13   1523806918377
    

    其中需要说明的是第二个字段和第三个字段,因为它对我们统计pv和uv非常有帮助,它们分别是ip字段和mid字段,说明如下:

    ip:用户的IP地址
    mid:唯一的id,此id第一次会种在浏览器的cookie里。如果存在则不再种。作为浏览器唯一标示。移动端或者pad直接取机器码。
    

    因此,根据IP地址,我们可以通过查询得到其所在的省份,并且创建一个属于该省份的变量,用于记录pv数,每来一条属于该省份的日志记录,则该省份的pv就加1,以此来完成pv的统计。

    而对于mid,我们则可以创建属于该省的一个set集合,每来一条属于该省份的日志记录,则可以将该mid添加到set集合中,因为set集合存放的是不重复的数据,这样就可以帮我们自动过滤掉重复的mid,根据set集合的大小,就可以统计出uv。

    在我们storm的业务处理代码中,我们需要编写两个bolt:

    • 第一个bolt用来对数据进行预处理,也就是提取我们需要的ip和mid,并且根据IP查询得到省份信息;
    • 第二个bolt用来统计pv、uv,并定时将pv、uv数据写入到Redis中;

    当然上面只是说明了整体的思路,实际上还有很多需要注意的细节问题和技巧问题,这都在我们的代码中进行体现,我在后面写的代码中都加了非常详细的注释进行说明。

    2、编写第一个Bolt:ConvertIPBolt

    根据上面的分析,编写用于数据预处理的bolt,代码如下:

    package cn.xpleaf.bigdata.storm.statistic;
    
    import cn.xpleaf.bigdata.storm.utils.JedisUtil;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import redis.clients.jedis.Jedis;
    
    /**
     * 日志数据预处理Bolt,实现功能:
     *     1.提取实现业务需求所需要的信息:ip地址、客户端唯一标识mid
     *     2.查询IP地址所属地,并发送到下一个Bolt
     */
    public class ConvertIPBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            byte[] binary = input.getBinary(0);
            String line = new String(binary);
            String[] fields = line.split("\t");
    
            if(fields == null || fields.length < 10) {
                return;
            }
    
            // 获取ip和mid
            String ip = fields[1];
            String mid = fields[2];
    
            // 根据ip获取其所属地(省份)
            String province = null;
            if (ip != null) {
                Jedis jedis = JedisUtil.getJedis();
                province = jedis.hget("ip_info_en", ip);
                // 需要释放jedis的资源,否则会报can not get resource from the pool
                JedisUtil.returnJedis(jedis);
            }
    
            // 发送数据到下一个bolt,只发送实现业务功能需要的province和mid
            collector.emit(new Values(province, mid));
    
        }
    
        /**
         * 定义了发送到下一个bolt的数据包含两个域:province和mid
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("province", "mid"));
        }
    }
    

    3、编写第二个Bolt:StatisticBolt

    这个bolt包含我们统计网站pv、uv的代码逻辑,因此非常重要,其代码如下

    package cn.xpleaf.bigdata.storm.statistic;
    
    import cn.xpleaf.bigdata.storm.utils.JedisUtil;
    import org.apache.storm.Config;
    import org.apache.storm.Constants;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    import redis.clients.jedis.Jedis;
    
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    /**
     * 日志数据统计Bolt,实现功能:
     * 1.统计各省份的PV、UV
     * 2.以天为单位,将省份对应的PV、UV信息写入Redis
     */
    public class StatisticBolt extends BaseBasicBolt {
    
        Map<String, Integer> pvMap = new HashMap<>();
        Map<String, HashSet<String>> midsMap = null;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) {  // 如果收到非系统级别的tuple,统计信息到局部变量mids
                String province = input.getStringByField("province");
                String mid = input.getStringByField("mid");
                pvMap.put(province, pvMap.get(province) + 1);   // pv+1
                if(mid != null) {
                    midsMap.get(province).add(mid); // 将mid添加到该省份所对应的set中
                }
            } else {    // 如果收到系统级别的tuple,则将数据更新到Redis中,释放JVM堆内存空间
                /*
                 * 以 广东 为例,其在Redis中保存的数据格式如下:
                 * guangdong_pv(Redis数据结构为hash)
                 *         --20180415
                 *              --pv数
                 *         --20180416
                 *              --pv数
                 * guangdong_mids_20180415(Redis数据结构为set)
                 *         --mid
                 *         --mid
                 *         --mid
                 *         ......
                 * guangdong_mids_20180415(Redis数据结构为set)
                 *         --mid
                 *         --mid
                 *         --mid
                 *         ......
                 */
                Jedis jedis = JedisUtil.getJedis();
                String dateStr = sdf.format(new Date());
                // 更新pvMap数据到Redis中
                String pvKey = null;
                for(String province : pvMap.keySet()) {
                    int currentPv = pvMap.get(province);
                    if(currentPv > 0) { // 当前map中的pv大于0才更新,否则没有意义
                        pvKey = province + "_pv";
                        String oldPvStr = jedis.hget(pvKey, dateStr);
                        if(oldPvStr == null) {
                            oldPvStr = "0";
                        }
                        Long oldPv = Long.valueOf(oldPvStr);
                        jedis.hset(pvKey, dateStr, oldPv + currentPv + "");
                        pvMap.replace(province, 0); // 将该省的pv重新设置为0
                    }
                }
                // 更新midsMap到Redis中
                String midsKey = null;
                HashSet<String> midsSet = null;
                for(String province: midsMap.keySet()) {
                    midsSet = midsMap.get(province);
                    if(midsSet.size() > 0) {  // 当前省份的set的大小大于0才更新到,否则没有意义
                        midsKey = province + "_mids_" + dateStr;
                        jedis.sadd(midsKey, midsSet.toArray(new String[midsSet.size()]));
                        midsSet.clear();
                    }
                }
                // 释放jedis资源
                JedisUtil.returnJedis(jedis);
                System.out.println(System.currentTimeMillis() + "------->写入数据到Redis");
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    
        /**
         * 设置定时任务,只对当前bolt有效,系统会定时向StatisticBolt发送一个系统级别的tuple
         */
        @Override
        public Map<String, Object> getComponentConfiguration() {
            Map<String, Object> config = new HashMap<>();
            config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
            return config;
        }
    
        /**
         * 初始化各个省份的pv和mids信息(用来临时存储统计pv和uv需要的数据)
         */
        public StatisticBolt() {
            pvMap = new HashMap<>();
            midsMap = new HashMap<String, HashSet<String>>();
            String[] provinceArray = {"shanxi", "jilin", "hunan", "hainan", "xinjiang", "hubei", "zhejiang", "tianjin", "shanghai",
                    "anhui", "guizhou", "fujian", "jiangsu", "heilongjiang", "aomen", "beijing", "shaanxi", "chongqing",
                    "jiangxi", "guangxi", "gansu", "guangdong", "yunnan", "sicuan", "qinghai", "xianggang", "taiwan",
                    "neimenggu", "henan", "shandong", "shanghai", "hebei", "liaoning", "xizang"};
            for(String province : provinceArray) {
                pvMap.put(province, 0);
                midsMap.put(province, new HashSet());
            }
        }
    }
    

    4、编写Topology

    我们需要编写一个topology用来组织前面编写的Bolt,代码如下:

    package cn.xpleaf.bigdata.storm.statistic;
    
    import kafka.api.OffsetRequest;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * 构建topology
     */
    public class StatisticTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            /**
             * 设置spout和bolt的dag(有向无环图)
             */
            KafkaSpout kafkaSpout = createKafkaSpout();
            builder.setSpout("id_kafka_spout", kafkaSpout);
            builder.setBolt("id_convertIp_bolt", new ConvertIPBolt()).shuffleGrouping("id_kafka_spout"); // 通过不同的数据流转方式,来指定数据的上游组件
            builder.setBolt("id_statistic_bolt", new StatisticBolt()).shuffleGrouping("id_convertIp_bolt"); // 通过不同的数据流转方式,来指定数据的上游组件
            // 使用builder构建topology
            StormTopology topology = builder.createTopology();
            String topologyName = KafkaStormTopology.class.getSimpleName();  // 拓扑的名称
            Config config = new Config();   // Config()对象继承自HashMap,但本身封装了一些基本的配置
    
            // 启动topology,本地启动使用LocalCluster,集群启动使用StormSubmitter
            if (args == null || args.length < 1) {  // 没有参数时使用本地模式,有参数时使用集群模式
                LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster
                localCluster.submitTopology(topologyName, config, topology);
            } else {
                StormSubmitter.submitTopology(topologyName, config, topology);
            }
        }
    
        /**
         * BrokerHosts hosts  kafka集群列表
         * String topic       要消费的topic主题
         * String zkRoot      kafka在zk中的目录(会在该节点目录下记录读取kafka消息的偏移量)
         * String id          当前操作的标识id
         */
        private static KafkaSpout createKafkaSpout() {
            String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
            BrokerHosts hosts = new ZkHosts(brokerZkStr);   // 通过zookeeper中的/brokers即可找到kafka的地址
            String topic = "f-k-s";
            String zkRoot = "/" + topic;
            String id = "consumer-id";
            SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
            // 本地环境设置之后,也可以在zk中建立/f-k-s节点,在集群环境中,不用配置也可以在zk中建立/f-k-s节点
            //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"});
            //spoutConf.zkPort = 2181;
            spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 设置之后,刚启动时就不会把之前的消费也进行读取,会从最新的偏移量开始读取
            return new KafkaSpout(spoutConf);
        }
    }
    

    5、整合验证

    将上面的程序打包成jar包,并上传到我们的集群提交业务后,如果前面的整合没有问题,并且Web服务也有Web日志产生,那么一段时间后,我们就可以在Redis数据库中看到数据的最终处理结果,即各个省份的uv和pv信息:

    image

    需要说明的是mid信息是一个set集合,只要求出该set集合的大小,也就可以求出uv值。

    至此,准确来说,我们的统计pv、uv的大数据实时处理系统是构建完成了,处理的数据结果的用途根据不同的业务需求而不同,但是对于网站的pv、uv数据来说,是非常适合用作可视化处理的,即用网页动态将数据展示出来,我们下一步正是要构建一个简单的Web应用将pv、uv数据动态展示出来。

    五、总结

    那么至此,从整个大数据实时处理系统的构建到最后的数据可视化处理工作,我们都已经完成了,可以看到整个过程下来涉及到的知识层面还是比较多的,不过我个人觉得,只要把核心的原理牢牢掌握了,对于大部分情况而言,环境的搭建以及基于业务的开发都能够很好地解决。

    写此文,一来是对自己实践中的一些总结,二来也是希望把一些比较不错的项目案例分享给大家,总之希望能够对大家有所帮助。

    例子二

    在用户上报日志中,每条日志记录代表用户的一次活动状态,示例数据如下:

    121.40.174.237 yx12345 [21/July/2015 13:25:45 +0000] chrome 
    appid_5 "http://www.***.cn/sort/channel/2085.html"
    
    

    image

    消费数据源统计的KPI指标,如下图所示

    image

    项目详细设计流程

    image

    配置数据消费模块

    image

    数据持久化

    image

    应用打包部署

    image

    提交 Topology 到 Storm 集群

    image


    https://github.com/csy512889371/learndemo/tree/master/kafka-storm-log


    展开全文
  • Timing window:定时窗口大小 Timing info mode: 周期上报还是事件上报 Timing info period:定期上报的周期 3.2.3 CONFIG.request 该消息用于VFN对PHY进行配置,消息格式如下: 可配置的参数如下所示: OAM配置的VNF...

    目录

    第2章 nFAPI接口详解

    2.3 nFAPI接口传输层协议栈

    2.3.1 传输层协议栈

    2.3.1.1 P5接口的协议栈

    2.3.1.2 P7接口的协议栈

    2.3.1.3 nFAPI接口的安全性

    2.3.2 nFAPI消息的通用格式

    2.3.2.1 nFAPI PDU头组成

    2.3.2.2 nFAPI 消息头组成

    2.3.2.3 nFAPI 消息类型

    2.3.2.4 nFAPI 消息体组成

    2.3.3 nFAPI消息网络字节序

    2.3.4 传输层消息的丢包

    第3章 P5接口的消息格式(C面)

    3.1 nFAPI接口的专有P5消息

    3.1.1 PNF_PARAM.request

    3.1.2 PNF_PARAM.response

    3.1.3 PNF_CONFIG.request

    3.1.4 PNF_CONFIG.response

    3.1.5 PNF_START.request

    3.1.6 PNF_START.response

    3.1.7 PNF_STOP.request

    3.1.8 PNF_STOP.response

    3.2 nFAPI接口的透传P5消息

    3.2.1 PARAM.request

    3.2.2 PARAM.response

    3.2.3 CONFIG.request

    3.2.4 CONFIG.response

    3.2.5 START.request

    3.2.6 ERROR.indication

    3.2.7 STOP.request

    3.2.8 STOP.indication

    3.2.9 nFAPI Configuration TLVs

    第4章 P7接口的消息格式(U面)

    4.1 nFAPI接口的专有P7消息

    4.1.1 DL Node Sync

    4.1.2 UL Node Sync

    4.1.3 Timing Info

    4.2 nFAPI接口的透传P7消息

    4.2.1 Handling of Pointer Option in FAPI Messages

    4.2.2 Extraction of FAPI Messages From an nFAPI Message


    第1章 简介

     

    第2章 nFAPI接口详解

    2.1 nFAPI接口的主要消息流程

    2.2 5G nFAPI接口与4G FAPI的共存

    2.3 nFAPI接口传输层协议栈

    2.3.1 传输层协议栈

    2.3.1.1 P5接口的协议栈

    采用SCTP协议作为传输层的好处是,SCTP是可靠的传输层,应用层不需要对传输进行定时、合法性检查、包的丢失检查等。

    对于5G nFAPI payload标识PPI=TBA.

    SCTP连接由PNF发起,根据OAM配置的VFN的IP地址和端口号,与VFN建立的SCTP流,每个PNF PHY与VFN都有一个独立的SCTP流,用于在PHY与VNF之间传送配置、命令消息。

    SCTP协议支持在两个端点之间建立冗余的连接。

    2.3.1.2 P7接口的协议栈

    nFAPI P7对传输延迟和传输抖动有严格的要求,则就限制了为了提高传输可靠性的支持传送的传输层协议的使用,如TCP。nFAPI P7业务数据面采用UDP作为传输层协议,这就需要应用层通过序列号确保数据包传输的完整性,确保传输过程中没有数据包的丢失。

    在VNF和PNF上使用的IP地址和端口,是通过P5接口交换PHY配置获取的。

    而P7消息头中的PHY ID,用于指向哪个PHY实例/VNF接口。

    这些配置选项允许具有多个PHY实例的PNF使用单个UDP流,或者每个PHY实例使用一个UDP流。

     

    2.3.1.3 nFAPI接口的安全性

    nFAPI接口允许在VNF与PNF之间建立IPSec隧道,P5和P7接口协议在IPSec隧道中传输。

     

    2.3.2 nFAPI消息的通用格式

    nFAPI PDU组成:nFAPI PDU header + n个消息组成

    而每个消息又由消息头Hdr + 消息体body组成。

    如下图所示:

    上图中:

    • 一个nFAPI的PDU可以承载多个消息。
    • nFAPI允许对一个大的数据进行分段。
    • 数据按照big Endian的方式组织。

     

    2.3.2.1 nFAPI PDU头组成

    如果传输机制需要,nFAPI头允许对消息进行分段。

    • Segment Length:本段的payload的长度
    • More:More标志定义是否后续有更多的段,用于把多个段连接成一个更大长度的段,用于承载更大的数据。
    • Segment number:段编号定义消息中的段顺序
    • Sequence number:序列号在整个消息中递增。

     

    2.3.2.2 nFAPI 消息头组成

    • PHY ID: 在一个PNF设备内部,可以创建出多个PHY实体,该ID用于标识PHY实体。
    • Message ID: nFAPI或FAPI消息类型

     

    2.3.2.3 nFAPI 消息类型

    nFAPI定义了三种大类型的消息

    • Transparent messages:透传消息,这些消息是FAPI协议定义的,nFAPI只用透传这些消息,不对消息进行修改。
    • Dedicated nFAPI messages:nFAPI专有的消息,相对于FAPI,这里消息是nFAPI协议新引入的,是nFAPI独有的,如PNF_XXX消息.
    • Combined messages: 组合/混合消息,这些消息类型本身是由FAPI定义的,nFAPI对这些消息的TLV进行了扩展。

    为了避免冲突,所有消息ID必须规划。

     

    2.3.2.3.1 Dedicated nFAPI messages:nFAPI专有的消息

    nFAPI新增加了VNF和PNF设备,上述消息是VNF与PNF设备之间的交互消息。一个PNF可以创建多个物理层PHY的实体。

     

    2.3.2.3.2 Combined messages: 组合/混合消息

     

    2.3.2.3.3 Transparent messages:透传消息

    相对于nFAPI,FAPI是嵌入式的,因此没有定义VNF和PNF, 只定义L2和L1的接口操作。因此上述消息MAC与物理层之间交互的消息。

     

    2.3.2.3.3 厂家可扩展的消息

     

     

    2.3.2.4 nFAPI 消息体组成

    nFAPI消息体由零个或多个TLV结构组成,TLV参数定义为在后续部分中描述。

     

    2.3.3 nFAPI消息网络字节序

    nFAPI消息包装器中嵌入的nFAPI消息(报头和TLV)以及FAPI消息(报头和TLV)中的字段将以大端顺序(网络字节和位顺序)格式排列。

     

    2.3.4 传输层消息的丢包

    2.3.4.1 P5信令消息

    PNF根据OAM接口对SCTP End point的配置,发起与VNF的SCTP连接,这个连接是双向的数据流通信,一旦SCTP连接建立成功,VNF与PNF PHY之间就可以进行双向通信。

    关于SCTP消息的通信,请参考SCTP协议。

     

    2.3.4.2 P7信令消息

    P7用户面接口采用UDP通信。UDP是一种无连接协议,它依赖于应用层来处理丢失和无序的消息。为此,P7协议头定义了每个PHY实例上的段号和序列号,以允许nFAPI P7应用程序根据此信息处理丢包和失序问题。当P7接口上的接收端识别到丢失或无序的消息时,接收实体负责处理此类情况。这意味着对于基于slot调度中的PNF,PNF需要缓冲和重排接收到的消息,处理任何的无序或分段的消息,这些消息需要一些额外的时间来处理。有关这些情况的处理,请参阅第2.1.3.4节和第2.1.3.5节。

     

    第3章 P5接口的消息格式(C面)

    3.1 nFAPI接口的专有P5消息

    3.1.1 PNF_PARAM.request

    该消息是由VNF发起,通过该消息,VNF获取PNF的能力参数。该消息没有消息体。

    3.1.2 PNF_PARAM.response

    该消息是VNF对PNF_PARAM.request的应答消息,消息格式如下

    消息的内容是通过TLV格式进行反馈的,TLV的好处是消息体的结构可以非常灵活和可伸缩性强。

     

    3.1.2.1 TLV的内容

    (1)nFAPI Sync Mode:该参数指明了PHY实例的同步方式

    • 0:表明PHY还没有同步
    • 1:表明PHY内部10ms帧同步
    • 2:表明PHY已经绝对时间同步

    (2)Location Mode:该参数指明了同步源

    • 0:没有同步源
    • 1:GPS同步源(俄罗斯全球导航卫星系统
    • 2:GLONASS同步源(俄罗斯区域导航卫星系统
    • 3:北斗同步源 (中国区域导航卫星系统
    • 4:NavIC同步源 (印度区域导航卫星系统

    • Location Coordinates:定位的位置信息
    • Maximum Number PHYs:支持的最大PHY实例数
    • OUI: 全球组织唯一标识符。

     

    3.1.2.2 PNF PARAM.response Errors

     

    3.1.3 PNF_CONFIG.request

    当VNF处于IDLE状态时,该消息用于VNF对PNF进行配置管理。

    当VNF处于CONFIGURED状态时,该消息用于VNF对PNF进行重新配置。

    消息体的内容是TLV格式,但本标准并没有定义具体的TLV。各个厂家可以采用vendor自定义的TLV对PHY进行配置。

     

    3.1.4 PNF_CONFIG.response

    3.1.5 PNF_START.request

    该消息用于VNF启动PNF, 该消息没有消息体。

     

    3.1.6 PNF_START.response

     

    3.1.7 PNF_STOP.request

    该消息用于VNF停止PNF的运行,使得PNF从RUNNING状态切换到CONFIGED状态。

    PNF在收到该消息后,需要reset所有的PHY实体。

    该消息没有消息体。

     

    3.1.8 PNF_STOP.response

     

    3.2 nFAPI接口的透传P5消息

    3.2.1 PARAM.request

    PNF处于RUNNING状态,PHY处于IDLE状态时,该消息用于VNF获取PHY的能力和配置。消息体的格式由FAPI规范定义。

     

    3.2.2 PARAM.response

    该消息用于PHY向VNF反馈自身的整体的能力和选项。

    消息体的格式如下:

    • PHY的IP地址和端口号,以便于后续VNF直接与PHY建立连接。
    • 而更多的配置信息,需要通过后续的CONFIG.request和CONFIG.response获取。

    PHY与VFN之间的同步参数,这些参数非常重要

    • DL_TTI Timing offset:下行调度TTI的定时偏移
    • UL_TTI Timing offset:上行调度TTI的定时偏移
    • UL_DCI Timing offset:DCI指示的定时偏移值
    • Tx_Data:Timing offset:下行数据发送的定时偏移值
    • Timing window:定时窗口大小
    • Timing info mode: 周期上报还是事件上报
    • Timing info period:定期上报的周期

    3.2.3 CONFIG.request

    该消息用于VFNPHY进行配置,消息格式如下:

     

    可配置的参数如下所示:

    OAM配置的VNF IP地址和端口号,用于PHY与VNF建立连接。

     

    3.2.4 CONFIG.response

    PHY对VNF的CONFIG.request的响应。

     

    3.2.5 START.request

    该消息用于VNF启动PHY, TLV消息体与FAPI一致,nFAPI没有新定义新的TLV.

     

    3.2.6 ERROR.indication

    该消息用于PHY向VNF指示内部错误信息, TLV消息体与FAPI一致,nFAPI没有新定义新的TLV.

     

    3.2.7 STOP.request

    该消息用于VNF停止PHY, TLV消息体与FAPI一致,nFAPI没有新定义新的TLV。

     

    3.2.8 STOP.indication

    有PHY主动向VNF上报STOP实践,TLV消息体与FAPI一致,nFAPI没有新定义新的TLV。

     

    3.2.9 nFAPI Configuration TLVs

    这些TLV参数,可以用于PARAM.xxx and CONFIG.xxx消息交互。

     

     

    第4章 P7接口的消息格式(U面)

    4.1 nFAPI接口的专有P7消息

    4.1.1 DL Node Sync

    • t1:不是绝对时间,而是一个时间偏移量,相对于VNF 10ms帧的起始点(SFN=0, Slot=0)时的时间偏移量。
    • Delta SFN/SL: 这是VNF期望PHY在下一个TTI时隙时,TTI的偏移跳转。该参数可以对PHYTTI时隙进行修正。

     

    4.1.2 UL Node Sync

     

    • T1:不是绝对时间,而是一个时间偏移量,相对于VNF 10ms帧的起始点(SFN=0, Slot=0)时的时间偏移量。
    • T2: 不是绝对时间,而是一个时间偏移量,相对于PNF 10ms帧的起始点(SFN=0, Slot=0)时的时间偏移量。该时间是PHY接收到DL Node Sync消息的时间。
    • T3: 不是绝对时间,而是一个时间偏移量,相对于PNF 10ms帧的起始点(SFN=0, Slot=0)时的时间偏移量。该时间是PHY发送到UL Node Sync消息的时间。
    • T4: 并没有在消息中传输,因此在消息中没有提现出来,该时间是VNF接收到PHY发送的 UL Node Sync消息的时间。该不是绝对时间,而是一个时间偏移量,相对于VNF 10ms帧的起始点(SFN=0, Slot=0)时的时间偏移量。

     

    一个DL Node SyncUL Node Sync消息交互完成后,VNF获取了t1, t2, t3, t44个时间戳。通过这个时间戳,可以计算出如下的信息:

    1. PNFVNF之间消息传输的Delay延时(假定上下行delay延时相同)。
    2. PNFVFN10ms帧的起始时隙的时间差。

     

    演算过程如下:

    (1)前提:

    • 假设VNF 10ms帧起始TTI的时间为0, (SFN=0, SL=0)
    • 假设PNF 10ms帧起始TTI相对于VNF 10ms帧的TTI的偏移为offset.
    • 假设网络的传输延时为delay,且上下行延时相同。
    • t1,t4的时间不是绝对时间,而是相对于VNF 10ms帧起始TTI的偏移。
    • t2,t3的时间不是绝对时间,而是相对于PNF 10ms帧起始TTI的偏移。

     

     

    (2)计算过程:

    t2 + offset = t1 + delay  =>delay – offset = t2 – t1

    t4 = t3 _+ offset + delay => delay + offset = t4 – t3

    delay = [ (t4-t3) + (t2-t1)]/2

    offset = [ (t4-t3) - (t2-t1)]/2

     

    通过公式,VNF很容易计算出VNF与PNF之间的传输延时delay以及PNF的10ms帧的起始TTI相对于VNF的10ms帧的起始TTI时间的偏移offset。

    然后通过就可以通过CONFIG.request对VNF的TTI进行修正。或者VNF修正自己的TTI, 确保VNF和PNF之间的TTI是对齐的(包含网络传输延时)。

     

    4.1.3 Timing Info

    该信息是由PNF向VNF提供TLV信息。

    • Last SFN:发送Time info的PHY TTI对应的SFN值
    • Last slot:发送Time info的PHY TTI对应的slot number值

     

     

    这些信息是PNF自身统计的各种数据的延时抖动。

    在PNF PHY与VNF的10m帧的起始TTI对齐后,由于网络传输延时的变化,导致

    PNF PHY收到的上下行调度消息、UL_DCI指示、VNF下行发送数据消息的时间,并不是稳定不变的,存在一个延时抖动,PNF PHY参照自己当前的10m帧的起始TTI, 对收到这些消息的时间的抖动性进行统计。

    影响PNF PHY接收到数据的时间的抖动性的因素有:

    • 网络传输延时的抖动
    • VNF和PNF晶体震荡器的稳定性

     

     

     

    4.2 nFAPI接口的透传P7消息

    nFAPI并没有定义新的透传 P7 TLV.

    4.2.1 Handling of Pointer Option in FAPI Messages

    FAPI接口是基于板内通信的系统,因此PHY和MAC层可以共享内存访问相同的内存空间,nFAPI针对的是跨网元的板间通信系统,因此通过FAPI 消息偿传递数据时,这两种接口是有差别的。如果透明FAPI消息包含用于指定消息的有效负载部分的指针选项,则通过nFAPI携带的FAPI消息不应使用指针选项。

     

    4.2.2 Extraction of FAPI Messages From an nFAPI Message

    在nFAPI协议中,需要对透明FAPI消息进行格式化,以便只需复制消息位即可从nFAPI消息中提取FAPI消息,如下图展示了一个nFAPI PDU中承载2个FAPI消息的情形。

     

     

     

    展开全文
  • 昨天,和大家讨论了无线APP时代如何进行DNS速度优化【回复“dns”阅读】,今天和大家一起讨论一下无线时代的日志上报流量优化。缘起:无线时代,APP流量敏感,为了统计A...

    昨天,和大家讨论了无线APP时代如何进行DNS速度优化【回复“dns”阅读】,今天和大家一起讨论一下无线时代的日志上报流量优化。

    缘起:无线时代,APP流量敏感,为了统计APP内用户行为,或者需要收集某些产品数据,往往需要进行日志上报,日志上报往往又非常费流量,有没有一些好的节省流量的优化方法呢,这是本文将要讨论的问题。


    一、APP可不可以不进行日志上报,而单纯从服务器日志统计用户的行为和产品数据?

    答:不行,有些用户行为是不会与服务器进行交互的(例如TAB的点击),从服务器日志无法完成所有统计。


    二、APP通常有一些什么方法来上报日志?

    答:常用方法有三种:

    1)利用类似于Google Analytics的第三方工具进行上报,优点是无需开发,缺点是不能做个性化统计

    2)自己制订私有协议进行上报(例如TCP二进制协议),优点是节省流量,缺点是开发成本高

    3)使用HTTP上报,例如通过GET参数传递需要上报的数据,这种方案使用最为广泛。


    三、APP上报日志协议细节是怎么样的?

    答:一般是在web-server下放置一个空文件,APP通过发起HTTP请求访问这个空文件,通过GET参数传递数据,通过分析access日志来得到想要的数据。GET协议一般又有两种方式,约定格式法 + KV法

    1)约定格式法:约定分隔符,约定占位符,约定每个字段的含义,例如:

    http://daojia.com/up?[bj][20151021][1939][1][login]

    APP和server约定好,空白文件是up,分隔符是[],第一个字段[bj]是城市,第二个字段[20151021]是日期,第三个字段[1939]是时间,第四个字段[1]是用户id,第五个字段[login]是行为

    这个方法的缺点是,扩展性较差,有时候某些字段没有值,也必须在相应的位置保留占位符(因为每个字段是什么含义都是事先约定好的),要想新增统计项,只能在GET后面新增[]

    2)KV法:通过自解释的kv方式来上报数据,上面的例子用KV法来上报,则上报形式为:

    http://daojia.com/up?city=bj&date=20151021&time=1939&uid=1&action=login

    这个方法的优点是扩展性好(好太多了),缺点是上报数据量比较大,KEY其实是冗余的字符

    笔者强烈建议使用第二种方法来上报数据,后文会简述一些流量的优化方法。


    四、APP上报日志,流量很大,主要矛盾是什么?

    答:笔者了解到的主要矛盾有:

    1)无效的流量较多,HTTP请求内有很多无效数据

    2)URL冗余,每次都要上报URL

    3)KEY冗余,每次都要上报KEY

    4)上报频度高,每当用户进行了一个操作都要日志上报的话,HTTP量还是很大的。


    五、有什么优化的方法?

    答:针对上述1)-4)的主要矛盾,逐一进行优化:

    1)手动构造HTTP请求,尽可能多的去除HTTP中的无效数据,只保留GET /up HTTP/1.1和GET传递的数据

    2)使用尽可能短的域名来接收上报的日志,例如s.daojia.cn/a

    3)使用尽可能短的KEY来标识数据,例如city=bj可以优化为c=bj,日志收集方注意规范好KEY

    4)批量非实时上报,先将数据保存到APP本地存储(例如sqlite中),定时上报,这类优化对于PV类,SUM类,AVG类统计尤为有效,例如,要统计登录按钮的点击次数,三次点击,传统统计可能需要上报三次

    http://daojia.com/up?city=bj&date=20151021&time=1939&uid=1&action=login

    http://daojia.com/up?city=bj&date=20151021&time=1939&uid=1&action=login

    http://daojia.com/up?city=bj&date=20151021&time=1939&uid=1&action=login

    优化后,只需要上报一次(注意加了一个count=3的参数)

    http://daojia.com/up?city=bj&date=20151021&time=1939&uid=1&action=login&count=3


    六、非实时上报,数据时效性怎么保证?在什么时机进行日志上报呢?

    答:数据的时效性会有一定的影响,但问题不大。为了优化,会在这样的一些时间点进行上报:

    1)特殊时间点:APP打开时,APP关闭时等

    2)按时间上报:例如每隔10分钟上报一次

    3)按数据量上报:例如每收集10条记录才上报一次

    一般来说上述三种优化方法会结合进行。


    七、还有其他什么优化方案?

    答:数据压缩也是一种常见的优化方案。

    ==【完】==

    回【赶集】赶集mysql军规

    回【twitter】twitter系统架构分析

    回【12306】12306系统架构优化

    回【youtube】youtube架构分析

    ==小游戏==

    回大于10的整数,返回随机好文(猜猜怎么实现的)

    原创不易,若有收获,感谢转发。

    展开全文
  • `period` datetime NOT NULL COMMENT '报表时间段', `type` tinyint(4) NOT NULL COMMENT '报表数据格式, 1/xml, 2/json, 默认1', `creation_date` datetime NOT NULL COMMENT '报表创建时间', PRIMARY KEY (`id`)...
  • 是个人开发xb文件(临时公告发布共16大类的,由于公司缺乏资料,本人从开始接到需求到上报成功耗时一个月,特此发布博文分享,如果你正在开发,可能会对你有帮助,毕竟网上的资料需要搜索)1.知识储备:看懂文档...
  • 则返回特定标识id url 当前事件触发页面的url eventTime 触发埋点的时间戳 localTime 触发埋点时的用户本地时间,使用标准YYYY-MM-DD HH:mm:ss格式表示,方便后期直接使用字符串查询 deviceType 当前用户使用的设备...
  • Cocos2dx中文乱码问题

    万次阅读 2014-05-25 20:10:18
    最开始在网上找的一个方法,结果在wp8上报错~ 在windows环境下使用visual studio 开发cocos2d-x,由于visual studio 默认编码为GBK 格式,而cocos2d-x引擎默认编码为UTF-8, 如果有用到中文,在游戏运行时有可能...
  • 总公司要求全国各省的分公司上报指定文件,例如河北省分公司2018年报、山东省分公司2018年报之类的,通过python脚本引入xlrd,首先读取excel中的分公司名册,然后遍历已上报文件的目录,来统计哪个分支机构未上报。...
  • CAN网络dbc格式

    2021-04-28 09:41:57
    dbc 格式 是vector公司定义的私有文件格式。因vector公司在汽车领域,用者甚众,dbc成为描述can matrix,can报文格式的一种重要形式。格式标准说明,见本人上传附件。 CAN协议领域,有很多CAN协议,通用的有J1939,...
  • 办理

    2019-10-05 03:28:04
    第三十条 收办理指对收到公文的办理过程,包括签收、登记、审核、拟办、批办、承办、催办等程序。 第三十一条 收到下级机关上报的需要办理的公文,文秘部门应当进行审核。审核的重点是:是否应由本机关办理;...
  • 报文格式-PCAP文件格式详解

    千次阅读 2019-12-17 20:06:13
    分析pcap文件格式,在wireshark中显示了pcap文件头的一些信息
  • [课堂学习]多线程课程学习(一)MFC - 学生健康体温上报系统垃圾话需求分析框架设计代码展示总结 MFC - 学生健康体温上报系统 关于MFC的学习请移步:http://www.jizhuomi.com/software/257.html 非常感谢鸡啄米团队...
  • 服务器系统用什么格式 内容精选换一换对比以往的C6云服务器,盘符是vd*格式,譬如vda、vdb,新购的C6云服务器盘符变成的sd*格式。本节操作介绍盘符变成sd*格式的原因,以及常见的磁盘操作场景下的sd*格式盘符的处理...
  • 点击“技术领导力”关注∆每天早上8:10推送来源:PM杨堃 作者:杨堃过年闲来无事,研究了几款低代码平台,选择了其中一家,做了个小DEMO。本文将基于我虚构的“幼儿园健康上报系统”...
  • 报错信息:Invalid classpath publish/export dependency c:/Users/maven/.gradle/caches/modules-2/files-2.1/jstl/jstl/1.2/74aca283cd4f4b4f3e425f5820cda58f44409547/jstl-1.2jar. the project contains another...
  • 属性数据,里面包含多个属性对象,每个属性对象包含上报时间(time)和上报的值(value)。 time long 属性上报时间。 value object 上报的属性值。 method string 固定取值  thing.event.property.post 那么,我们...
  • 日常生产环境中,对于埋点上报数据中有json格式的数据,需要解析json格式的字符串里面的字段和数值.对于这种需求,hive系统内置解析json格式的函数. 测试: select REGEXP_replace('[{"id":"001","name":"zhangsan",...
  • 计算机论文发表期刊投稿格式范文参考中国月期刊咨询网计算机论文发表期刊投稿格式范文参考随着时代的不断发展,计算机技术、通信技术、信息技术、网络技术不断的引入企业办公中,给企业的信息管理带来对OA系统的应用...
  • 此外,由于web配置的时候讲过tcp client 连接服务器支持服务器域名格式,那么如果服务器是部署在局域网内,那么为了更方便进行配置部署,在局域网内部署一个DNS服务器,我用的dnsmasq,然后将路由器的DNS解析指向该...
  • 部分配置做了中文翻译,可对照英文原文配置文档...# 单位说明:当需要设置内存大小时,可以用1K 5GB 4M等常用格式指定: # # 1k => 1000 bytes # 1kb => 1024 bytes # 1m => 1000000 bytes # 1mb => 1024*1
  • 目前上市公司的会计报表全部以PDF格式上报到交易所,并且为了安全,实施了加密、禁止修改、禁止打印、禁止文字抽取等各种保护措施,给证券公司带来了很大困难,统计上市公司的数据只好重新把数据用手工方式输入一遍...
  • xml中文乱码解决

    千次阅读 2020-06-03 21:32:32
    记录一下,希望其他人遇到相似问题有一个参考,或者通过此进行一次思路梳理。 要把这个问题搞清楚,你要明确知道一些东西: 1、unicode编码、utf-8编码,它们之间的关系? 2、GB2312编码,为什么有这个东西?GB
  • 工作中遇到两个系统之间通信的问题,需求是这样的:要求将信息上报给上级部门(这里的上级部门是两一个系统),这就是跨系统通信了 解决方案: 使用httpclient实现网络通信,传递数据。 关键问题: httpClient...
  • 应用笔记

    2020-07-05 08:51:14
    情况报告:将相关情况及时上报。 建议报告:侧重意见或建议,要具有科学性、可行性,多采用条文式。 答复报告:答复要有针对性,表明态度,提出意见。 四、通报 1、含义:表彰先进、批评错误、传达精神、告知...
  • 《中文新闻信息分类》标准经过历时一年8个月的研制及反复检测论证,在2005年9月2日专家评审会上,标准审查委员会一致通过对《中文新闻信息分类》国家标准(送审稿)的审查,建议标准起草组尽快形成报批稿,上报国家...
  • 最近用python编写一个上报模块,主要是通过jar包进行上传操作,由于涉及到传入参数中有“中文”,则会导致脚本运行在windows下失败。下面记录一下涉及的问题:核心代码如下:ss = os.popen('java -jar %s %s %s %s %...
  • Gerber 格式详解

    2021-03-30 19:52:07
    Gerber 格式详解 gerber中文 gerber,gerber 文件:590m.com/f/25127180-487459253-79168e(访问密码:551685) 以下内容无关: -------------------------------------------分割线--------------------------------...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,763
精华内容 1,905
关键字:

上报文格式