精华内容
下载资源
问答
  • 系统可以方便地从专家、监督员库中的一组或几组中随机抽取指定数量的专家、监督员,支持一个项目中多个采购类别(如专业类、经济类、法律类、监督员、其他类别)的反复多次抽取。具有补抽专家等功能,因各种原因抽选...
  • DBA开放各个系统的备库,在业务低峰期(比如夜间),使用方各自抽取所需数据。由于抽取时间不同,各个数据使用方数据不一致,数据发生冲突,而且重复抽取,相信不少DBA很头疼这个事情。公司统一的大数据平台,通过...
  • 日志系统之Flume日志收集

    千次阅读 2015-06-06 21:17:49
    本文介绍在日志系统中如何使用flume agent做日志收集以及为了满足我们的需求对flume agent进行了哪些扩展。

    最近接手维护一个日志系统,它用于对应用服务器上的日志进行收集然后提供实时分析、处理并最后将日志存储到目标存储引擎。针对这三个环节,业界已经有一套组件来应对各自的需求需求,它们是flume+kafka+hdfs/hbase。我们在实时分析、存储这两个环节,选择跟业界的实践相同,但agent是团队自己写的,出于对多种数据源的扩展需求以及原来收集日志的方式存在的一些不足,于是调研了一下flume的agent。结果是flume非常契合我们的实际需求,并且拥有良好的扩展性与稳定性。于是打算采用flume的agent替换我们原先的实现。

    本文介绍我们如何使用flume agent以及为了满足我们的需求进行了哪些扩展。备注:全文所指的flume均指flume-ng,版本基于1.6.0。

    flume简介

    flume 通过Agent对各个服务器上的日志进行收集,它依赖三大核心组件,它们分别是:source,channel,sink。它们之间的串联关系如下图:


    之间的关系也比较简单:source负责应对各种数据源进行日志收集;channel负责日志的中间暂存,将日志收集跟日志发送解耦;sink负责日志的发送,将日志发送到目的地。更详细的讲解,请移步官网。下面谈谈,我们对flume的使用与扩展。

    Source的扩展

    Flume提供了一个基于跟踪文件夹内“文件个数”变动的source称之为Spool Directory Source。它跟踪目标日志文件夹,当有新的日志文件产生时就会触发对新日志文件的收集,但它不支持日志文件的追加。也就是说一旦它开始收集某个日志文件,那么这个日志文件就不能再被编辑,如果在读取日志文件的时候,日志文件产生了变动那么它将会抛出异常。也就是说,当收集到当日日志文件时,同时又有新的日志在往里面写入时,该source是不适合这种需求的。

    如果你的需求是接近“准实时”的日志收集并且你非要用这个souce,应对的方案是:你只能选择将应用程序的日志框架(比如常用的log4j)的appender的“滚动机制”设置为按分钟滚动(也就是每分钟产生一个新日志文件)。这种机制不是不可行,但有些不足的地方,比如日志文件过多:当日志除了要被日志系统收集,还需要本地保留时,这种机制将非常难以接受。

    我们希望日志文件按天滚动产生新的日志文件,当天的日志以追加的方式写入当天的日志文件并且Agent还要能够以接近实时的速度收集新产生的日志(追加)的。如果agent挂掉或者服务器宕机,日志文件不能丢失,agent能够自动跨日期收集。其实,spooling directory source已经为我们的实现提供了模板,但要进行一些改造,主要是以下几点:

    (1)原先的Spooling Directory Source不支持对收集的日志文件的内容进行追加:


    如果文件有任何改动,将以异常的形式抛出。此处需要移除异常

    (2)对当日日志文件进行持续监控

    原先的实现,当获取不到event直接删除或者重命名当前文件,并自动混动到下一个文件:

    /* It's possible that the last read took us just up to a file boundary.
         * If so, try to roll to the next file, if there is one. */
        if (events.isEmpty()) {
          retireCurrentFile();
          currentFile = getNextFile();
          if (!currentFile.isPresent()) {
            return Collections.emptyList();
          }
          events = currentFile.get().getDeserializer().readEvents(numEvents);
        }

    修改后的实现,当当前文件不是当天的日志文件时才处理当前文件并自动滚动到下一个文件,如果是当日文件,则继续跟踪:

    if(!isTargetFile(currentFile) 		//	Only CurrentFile is no longer the target, at the meanwhile, next file exists.
    	    && (isExistNextFile()) ){	//	Then deal with the history file(ever target file)
      logger.info("File:{} is no longer a TARGET File, which will no longer be monitored.", currentFile.get().getFile().getName());
      retireCurrentFile();
      currentFile = getNextFile();
    }

    flume 该source的源码见: github

    另外此处,我们判断是否是目标文件(当日日志文件)的处理方式是比对服务器日期跟文件名中包含的日期是否一致:

    private boolean isTargetFile(Optional<FileInfo> currentFile2) {
    		
      String inputFilename = currentFile2.get().getFile().getName();
      SimpleDateFormat dateFormat = new SimpleDateFormat(targetFilename);
      String substringOfTargetFile = dateFormat.format(new Date());
    		
      if(inputFilename.toLowerCase().contains(substringOfTargetFile.toLowerCase())){
        return true;
      }
    		
      return false;
    }

    所以在新的配置里还需要加入日期格式的配置,通常是:yyyy-MM-dd。

    Sink 的扩展

    Sink在Flume的agent组件中充当数据输出的作用。在flume之前的版本(1.5.2)中已经对多个数据持久化系统提供了内置支持(比如hdfs/HBase等),但默认是没有kafka的。如果我们想将日志消息发送到kafka,就需要自己扩展一个kafkaSink。后来通过搜索发现在最新的stable release版本:1.6.0中,官方已经集成了kafkaSink。不过1.6.0是5月20号刚刚发布,官方的Download页面以及User Guide还没有进行更新,所以请在版本列表页面下载1.6.0版本。在下载到的安装包内有最新的KafkaSink介绍。

    核心的配置有:brokerList(为了高可用性,flume建议至少填写两个broker配置)、topic。详见列表:


    出于好奇心,在github上大概浏览了官方实现kafkaSink的源码,发现Event的Header部分并没有被打包进消息发送走:

            byte[] eventBody = event.getBody();
            Map<String, String> headers = event.getHeaders();
    
            if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
              eventTopic = topic;
            }
    
            eventKey = headers.get(KEY_HDR);
    
            if (logger.isDebugEnabled()) {
              logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
                + new String(eventBody, "UTF-8"));
              logger.debug("event #{}", processedEvents);
            }
    
            // create a message and add to buffer
            KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
              (eventTopic, eventKey, eventBody);
            messageList.add(data);

    这一点,可能并不满足我们的需求:我们需要消息头里的信息成为消息的一部分,然后在storm里针对header信息进行一些处理。比如:

    (1)我们会默认在头里加入产生日志的服务器的Host,以便对日志进行分流或对没有存储host的日志进行“补偿”

    (2)我们会默认在头里加入日志类型的标识,以便区分不同的日志并分流到不同的解析器进行解析

    因为日志的来源以及形式是多样的,所以header里这些携带的信息是必要的。而flume官方的KafkaSink却过滤掉了header中的信息。因此,我们选择对其进行简单的扩张,将Event的header跟body打包成一个完整的json对象。具体的实现:

        private byte[] generateCompleteMsg(Map<String, String> header, byte[] body) {
            LogMsg msg = new LogMsg();
            msg.setHeader(header);
            msg.setBody(new String(body, Charset.forName("UTF-8")));
    
            String tmp = gson.toJson(msg, LogMsg.class);
            logger.info(" complete message is : " + tmp);
            return tmp.getBytes(Charset.forName("UTF-8"));
        }

                    // create a message and add to buffer
                    KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
                        (eventTopic, eventKey, generateCompleteMsg(headers, eventBody));
                    messageList.add(data);

    Interceptor使用

    上面提到日志的源以及格式多种多样,我们不可能将所有工具、组件的日志格式按照我们想要的方式作格式化,特别是一些封闭的组件或线上的系统。很显然source跟sink只负责日志的收集和发送,并不会区分日志内容。而flume提供的Interceptor这一功能,给flume提供了更强大的扩展性。而我们拦击日志,并给其添加特定的header就是通过flume内置的几个interceptor实现的。我们应用了这么几个interceptor:

    (1)host:往header中设置当前主机的Host信息;

    (2)static:往header中设置一个预先配好的key-value对,我们用它来鉴别不同的日志源

    (3)regex:通过将Event的body转换成一个UTF-8的字符串,然后匹配正则表达式,如果匹配成功,则可以选择放行或者选择删除

    前两个interceptor我们之前已经提及过它的用途,而第三个我们用它来匹配日志中是否存在“DEGUG”字样的tag,如此存在,则删除该日志(这个是可选的)。

    Selector 的使用

    目前没有使用Selector的需求,不过它的用途也很常见:它可以用来选择Channel,如果你有多个Channel,并且是有条件得选择性发送的情况下,可以使用Selector来提高日志收集的灵活性。比如:如果你需要将不同不同日志源的日志发往不同的目的地可以建立多个channel然后按一定的规则来匹配,这里主要用到Multiplexing Channel Selector

    展开全文
  • 凯撒加密解密程序 1个目标文件 1、程序结构化,用函数分别实现 2、对文件的加密,解密输出到文件 利用随机函数抽取幸运数字 简单 EJB的真实世界模型(源代码) 15个目标文件 摘要:Java源码,初学实例,基于EJB的真实...
  • Mysql日志抽取与解析

    万次阅读 2014-10-08 17:44:14
    Mysql日志抽取与解析正如名字所将的那样,分抽取和解析两个部分。这里Mysql日志主要是指binlog日志。二进制日志由配置文件的log-bin选项负责启用,Mysql服务器将在数据根目录创建两个新文件XXX-bin.001和XXX-bin....
    1. 摘要:
    
    Mysql日志抽取与解析正如名字所将的那样,分抽取和解析两个部分。这里Mysql日志主要是指binlog日志。二进制日志由配置文件的log-bin选项负责启用,Mysql服务器将在数据根目录创建两个新文件XXX-bin.001和XXX-bin.index,若配置选项没有给出文件名,Mysql将使用主机名称命名这两个文件,其中.index文件包含一份全体日志文件的清单。Mysql会把用户对所有数据库的内容和结构的修改情况记入XXX-bin.n文件,而不会记录SELECT和没有实际操作意义的语句。
    2. 设计概要:
    本项目主要包括两个独立的模块:1、日志抽取(mysql-tracker);2、日志解析(mysql-parser)。日志抽取主要负责与mysql进行交互,通过socket连接以及基于mysql的开源协议数据报文,来进行从mysql主库上dump相应的日志数据下来。而日志解析主要负责与日志抽取模块交互,通过将dump下来的bytes类型的数据,根据mysql协议,对bytes数据进行解析,并封装成易读的event对象。
    2.1  流程概要:
    mysql-tracker 总体流程设计:
    tracker与mysql交互:
    1. 建立socket连接
    2. 加载上次退出时的位点信息(从checkpoint表中加载)
    3. 利用socket连接发送基于mysql协议的数据包+checkpoint表中的位点信息,创建mysql主库的binlog dump线程
    4. 利用socket接受(监听)mysql主库传过来的数据包
    5. 解析数据包(有多种形式,OK包,EOF包,ERROR包,EVENT包等等)
    6. 如果有EVENT包,将基于byte的数据包解析成event对象
    7. 将对象存入List或Queue里面
    tracker与hbase交互:
    1. 从queue中接收固定量数据(上限:防止内存溢出,下限:防止频繁I/O),或固定时间数据(防止内存溢出)。
    2. 这里的数据就是event对象
    3. 将event对象序列化,存入hbase(protobuf 和 entry)
    4. 存入过程中,保证位点确认机制,如果有关于mysql binlog 的标志性位点,则将该event存入hbase后(注意这里有对特殊xid位点的确认机制,而parser是没有的,直接确认即可),然后再将该event的位点信息存入checkpoint表(维护各种位点信息:包括mysql binlog位点,event表(存如序列化后的event)位点,entry表(存入反序列化后的event)位点)
    5. 也就是说只要是存入hbase实体数据,都要伴随位点确认机制。这里tracker确认两个方面的位点:mysql binlog 位点(xid:binlog file name + next position) + event表位点(tracker写位点:row key)

    tracker 每分钟记录位点:

    1. 每分中固定时间记录确认的checkpoint位点(可能有重复,长时间没有数据fetch重复最多)

    mysql-parser 总体流程设计:(设计思路非常类似,只不过是mysql binlog变成了event表,parser fetch数据从这里fetch)
    parser与event表交互:
    1. 建立hbase连接
    2. 加载上次退出的位点信息(从checkpoint表中加载)
    3. 通过hbase连接+checkpoint表中的位点信息,不断监听event表一旦event表有更新,就从event表中把序列化的event fetch下来
    4. 得到的序列化event(bytes) 存入List或Queue里面。
    parser与hbase交互:
    1. 从queue中接收固定量数据(上限:防止内存溢出,下限:防止频繁I/O),或固定时间数据(防止内存溢出)。
    2. 将序列化的event(bytes)反序列化成entry(其实就是event对象)
    3. 将entry存入hbase。
    4. 存入过程中,伴随位点确认机制(直接确认位点,不需要特殊位点确认机制)(存入位点信息到checkpoint表中去:parser 读 event表的位点(row key) + parser 写entry表的位点(row key))
    parser每分钟确认位点:
    1. 每分钟固定时间记录确认的checkpoint位点(可能有重复,长时间没有数据fetch重复最多)

    2.2 架构


    tracker 与 parser都有同样的3个线程结构 取数据线程、消费数据线程、每分钟记录线程。并且每个消费数据线程必定伴随着位点确认的机制,就如前面2.1流程概要所说:
    1、tracker的位点确认机制需要有xid的特殊event作为mysql主库的位点确认点。
    2、而hbase里面的位点确认除了tracker写位点也需要是xid的event作为位点确认点,其他确认点没有特殊位点的要求。
    3. mysql 相关
    主要涉及到mysql的通讯协议和mysql 日志协议。mysql通讯协议,这里主要是利用到与mysql交互中的收发数据包的解析;mysql日志协议,这里主要是利用受到数据包后得到event事件的数据包,然后解析event数据包会用到相关的日志协议。即前者主要用在mysql交互、数据收发上面;后者主要用于日志解析、数据封装上面。
    3.1 mysql 通讯协议
    mysql通讯协议主要用于mysql客户端与mysql服务端的交互,通讯协议通过SSL加密通讯、数据包压缩通讯、连接阶段的强交互性。
    3.1.1 mysql数据包

    如果客户端要和服务端交互,他们会把数据打包成数据包的形式然后通过发送数据包的形式,实现信息的传递。数据包的具体格式如下:

    例如一个COM_BINLOG_DUMP类型的数据包的payload(数据包体)是这样的:



    3.2 mysql 日志协议
    binlog日志是一个对于mysql记录各种变化的日志集合,开启日志功能可以通--log-bin 选项来开启。MySQL的二进制日志可以说或是MySQL最重要的日志了,它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是失误安全型的.
    MySQL的二进制日志的作用是显而易见的,可以方便的备份这些日志以便做数据恢复,也可以作为主从复制的同步文件。
    3.2.1 event事件
    mysql通过C++的类来描述事件的基本类型 log event,在这里我们可以通过mysql源码的log_event.cc来详细了解 各种各样的event事件类型。log event是一个描述事件的基本类型,更加细致的log event 组成了基本的log event,即log event是可派生的,并派生处了一些描述事件信息更详细的子事件类型。比如row event就是一个母事件类型。在mysql源码中是通过一系列枚举整数值来描述各个事件的,如下所示:
    enum Log_event_type { 
      UNKNOWN_EVENT= 0, 
      START_EVENT_V3= 1, 
      QUERY_EVENT= 2, 
      STOP_EVENT= 3, 
      ROTATE_EVENT= 4, 
      INTVAR_EVENT= 5, 
      LOAD_EVENT= 6, 
      SLAVE_EVENT= 7, 
      CREATE_FILE_EVENT= 8, 
      APPEND_BLOCK_EVENT= 9, 
      EXEC_LOAD_EVENT= 10, 
      DELETE_FILE_EVENT= 11, 
      NEW_LOAD_EVENT= 12, 
      RAND_EVENT= 13, 
      USER_VAR_EVENT= 14, 
      FORMAT_DESCRIPTION_EVENT= 15, 
      XID_EVENT= 16, 
      BEGIN_LOAD_QUERY_EVENT= 17, 
      EXECUTE_LOAD_QUERY_EVENT= 18, 
      TABLE_MAP_EVENT = 19, 
      PRE_GA_WRITE_ROWS_EVENT = 20, 
      PRE_GA_UPDATE_ROWS_EVENT = 21, 
      PRE_GA_DELETE_ROWS_EVENT = 22, 
      WRITE_ROWS_EVENT = 23, 
      UPDATE_ROWS_EVENT = 24, 
      DELETE_ROWS_EVENT = 25, 
      INCIDENT_EVENT= 26, 
      HEARTBEAT_LOG_EVENT= 27, 
      ENUM_END_EVENT 
      /* end marker */ 
    };
    具体各种事件含义的详细说明可以参照mysql官方说明文档: http://dev.mysql.com/doc/internals/en/event-meanings.html
    3.2.2 event 事件结构
    接下来我们来看看一个通用事件的具体结构(参照mysql packet 数据包)
    所有的event 都含有如下通用的事件结构:
    +===================+
    | event header      |
    +===================+
    | event data        |
    +===================+
    分别由时间头和时间体组成。
    而事件的内部结构随mysql的版本不同而变化着,这里取出3个代表性的版本结构:
    v1 :用于mysql 3.23
    v3 :用于mysql 4.01
    v4 :用于mysql 5.0 及 以上
    v1 的event 结构:
    +=====================================+
    | event     | timestamp         0 : 4    |
    | header  +----------------------------+
    |               | type_code         4 : 1    |
    |              +----------------------------+
    |               | server_id         5 : 4    |
    |              +----------------------------+
    |               | event_length      9 : 4    |
    +=====================================+
    | event      | fixed part       13 : y    |
    | data       +----------------------------+
    |                | variable part              |
    +=====================================+
    v3 的 event 结构 :
    +=====================================+
    | event     | timestamp         0 : 4    |
    | header  +----------------------------+
    |              | type_code         4 : 1    |
    |             +----------------------------+
    |              | server_id         5 : 4    |
    |             +----------------------------+
    |              | event_length      9 : 4    |
    |             +----------------------------+
    |              | next_position    13 : 4    |
    |             +----------------------------+
    |              | flags            17 : 2    |
    +=====================================+
    | event    | fixed part       19 : y    |
    | data     +----------------------------+
    |             | variable part              |
    +=====================================+
    v4 的event结构:
    +=====================================+
    | event     | timestamp         0 : 4    |
    | header  +----------------------------+
    |              | type_code         4 : 1    |
    |             +----------------------------+
    |              | server_id         5 : 4    |
    |             +----------------------------+
    |              | event_length      9 : 4    |
    |             +----------------------------+
    |              | next_position    13 : 4    |
    |             +----------------------------+
    |              | flags            17 : 2    |
    |             +----------------------------+
    |              | extra_headers    19 : x-19 |
    +=====================================+
    | event     | fixed part        x : y    |
    | data      +----------------------------+
    |              | variable part              |
    +=====================================+
    更详细的事件包数据可见: http://dev.mysql.com/doc/internals/en/event-header-fields.html相关页面


    4. 位点确认机制
    在于mysql的交互过程中发现,xid event通常是作为一个事务的结尾(DML,DDL的话是Query作为结尾),现将DML和DDL的事件组成展示出来(过滤掉一些对解析日志无意义的事件):
    DML:
    1. QUERY EVENT
    2. TABLE MAP EVENT
    3. ROWS EVENT
    4. XID EVENT
    DDL:
    1. QUERY EVENT
    这里我们可以通过一定的辨识机制将DDL的QUERY EVENT 和 DML的XID EVENT归为一类,所以我们把这种结束事务的时间统称为特殊xid 事件。从调试中可以得到这样一个推论:
    在与mysql交互中,binlog dump线程的起始位点一定要是特殊xid事件的next position的。即特殊xid一定要作为mysql的结束标识,读时候一定要确认这里的位点机制。
    所以在tracker重启,重新抓取数据时一定要从xid开始fetch数据,这样就是位点确认机制的由来。
    目前的位点确认机制有:
    1. mysql的位点确认,必须是以xid位点来确认的,所以checpoint表存储mysql位点信息的数据必须要是特殊xid事件
    2. 写event表的checkpoint位点确认,受mysql位点特殊xid的影响,这里checkpoint表中tracker写event表的位点信息也必须是特殊xid的位点信息。(考虑这样一种场景,大事务里面有很多个event,如果tracker在写event表是crash掉了,这样我们可以把大事务的第一个时间a[0] 到发生crash的时间a[i]成为脏数据,为什么呢??,因为如果重启tracker,他与mysql的交互特性是必须要以xid作为起始位点才开始fetch event数据,所以我们tracker会又从这个大事务的a[0]开始fetch,如果hbase event不以xid作为位点确认,那么这次event表就变成a[0]......a[i] a[0] …..a [j] ,这样a[0]......a[i]成了明确的脏数据,如果是以xid作为tracker写event的位点确认,实际上就是重写了一段a[0]......a[i]的数据,当然你可已在tracker fetch a[0]到a[i]这一段,先不写hbase,到crash的位点再开始写hbase也是可以的。注意这里有无限循环的bug漏洞)
    3. 除以上两个的位点,其他位点的确认均采取直接确认,不需要考虑特殊xid事件。
    4.1 确认位点分类:
    大致有以下几类位点需要确认:



    5. tracker设计
    依照2.1的流程概要设计,其流程图如下所示:



      这部分可以结合源代码理解(Handler1.java)
    1. prepare方法:
    1. 建立mysql的两个连接,其中一个连接用于fetch event数据,另外一个连接用于fetch表结构元数据。这里如果创建连接不成功,将一直处于创建连接流程。保证程序的存活不依赖与mysql的存活
    2. 建立hbase连接,这里如果hbase没有启动会处于阻塞和重连的状态。
    3. 加载起始位点,既有mysql的起始位点也有event表的其实位点,注意这两个位点都是受xid影响的位点,如果hbase没有相关信息,这里我们用show master status的mysql命令,让mysql位点处于本库的最末端,而让event表位点置0,即相当于清除所有数据,从0开始。
    4. 启动fetch 线程,开始从mysql主库上fetch event数据。
    5. 启动per minute 线程,开始每分中记录相应位点
    6. 启动persistence 线程,开始接受fetch到的event数据,并且序列化,然后存入hbase event表中,并且伴随位点确认机制。(注意:这个线程其实就是Handler.run()方法,实际上run()方法也是一个线程的机制,只不过对Handler是不可见的而已)
    2. fetch 线程:
    1. preRun方法做一些初始化工作,包括设置binlog dump线程参数、send binlog dump让fetch指针置为到起始位点(start position)、初始化数据抓取器fetcher
    2. fetch方法,抓取一条event数据。
    3. 加入queue多线程队列中
    4. kafka监控相关
    5. 这里fetch是一个循环重连的机制,入股fetch方法失败跳出第一层循环,通过外层循环和checkMysqlConn()方法时间fetch线程重连mysql。即如果fetch中途mysql crash掉,fetch线程会等待mysql有效后重连mysql。

    3. Per minute 线程:
    1. 每分钟执行一次run方法
    2. 具体是将得到的存储位点的全局变量存入checkpoint中去。注意row key的设计
    4. Persistence 线程 Handler1.run()方法:
    1. 接受多线成queue的数据到list中,以此位一批数据,
    2. 以数量的上限,下限和时间的阀值来判断时候执行一批数据的持久化
    3. 进行持久化。
    4. 将一批event数据 序列化 然后 tracker写入到event表中去。
    5. 伴随位点确认机制:当真正写入event表数据成功后,看这一批数据是否有特殊xid事件,如果有则作为位点确认,写入checkpoint表中去(tracker整体重启,启动的时候加载这个位点信息)






    6. parser 设计
    与tracker设计思路基本相同,不过是fetch的目的mysql换成hbase event表,以及位点信息的直接确认,不需要考虑特殊xid。这里不再详述。
    注意:所有的位点确认一定要是在持久化成功之后才开始位点确认。


    7. 重连机制
    tracker中的mysql connector建立过程加入重连与等待机制。
    fetch线程中加入了正在fetch数据,mysql突然断掉的重连机制。
    hbase的断掉的自身重连机制
    8. 性能评测
    目前尚未进行系统性的,正式的性能测试。
    仅以单机作测试有 每1-2秒 tracker能fetch 1万条数据,parser 每1万条数据 需要耗时4~5秒左右。
    本单机测试尚不能作为评测标准,其性能以机器的硬件性能的不同而不同,不能以此作为性能标准。
    9. bug与优化
    1. 对于巨大事务的海量事件的场景,可能存在潜在的无限循环bug,即到事件a[i] crash掉,然后重启,重新fetch时 到 时间 a[i]再一次crash,然后再重启,这样一直不停地循环,永远扫描不完着一个巨量的大事务。
    2. tracker与parser的数据交接目前仍是单线程的模式,可以考虑大规模分布式并行的模式,使tracker与parser在数据交接上能够提升效率(与mysql的交接,与hbase的交接)。
    10. 结论

    基于单机的,传统的,mysql解析就到这里,主要是利用了mysql的协议进行数据传递与解析,后面组件考虑基于分布式的,基于大规模并行化的,基于高HA的模式。

    11. 项目地址

    https://github.com/hackerwin7/mysql-tracker

    https://github.com/hackerwin7/mysql-parser

    展开全文
  • Java版水果管理系统源码 行为分析系统 用户访问session分析模块 用户访问session介绍: 模块的目标:对用户访问session进行分析 1、可以根据使用者指定的某些条件,筛选出指定的一些用户(有特定年龄、职业、城市)...
  • 业务系统日志记录规范总结

    千次阅读 2019-08-28 16:01:09
    业务系统日志记录规范 注意 应用中应该充满了日志记录信息,日志甚至比逻辑代码还要多; 集成 seluth ,开启消息链路;不开启日志上传,不集成 zipkin; 应该避免日志记录过程中出现异常,比如 log.debug(requst...

    业务系统日志记录规范

    注意

    1. 应用中应该充满了日志记录信息,日志甚至比逻辑代码还要多;
    2. 集成 seluth ,开启消息链路;不开启日志上传,不集成 zipkin;
    3. 应该避免日志记录过程中出现异常,比如 log.debug(requst.getid) ,这条日志记录之前一定要判断 request 是否为空;日志记录中使用的信息一定时稳定的,提前准备好的,最好不是专门为此次日志记录专门准备的;
    4. 日志应只记录标识性信息,具体信息从具体存储里取;
    5. 日志应该同时支持人和计算机都可以读;人可读的意思的,要成句子;计算机可读的意思是要有明确的分割符,可以支持正则等工具抽取其中有意义的信息;
    6. service 或 manager 层内有 if…else 或者 switch 这样的分支时,要在分支的首行打印日志,用来确定进入了哪个分支;
    7. 对于 trace / debug / info 级别的日志输出,必须进行日志级别的开关判断。warn 和 error 不用开关判断;

    日志级别 trace 和 debug

    测试环境要实现的目标是,不需要重新完整的调试程序,可以直接定位到出问题的 service 内的逻辑分支,最多在进行一次小范围的测试;
    要实现这个目标,需要记录 service 入参出参,debug级别的日志,只在非生产环境中使用;

    日志级别 info

    1. info级别的日志只在 service 层和 manager 层存在,体现业务逻辑运行的路径,工具类等的不要记录;
    2. Service 方法中对于系统/业务状态的变更处记录
    3. 主要逻辑中的分步骤记录

    日志级别 warn

    1. 有容错机制的时候出现的错误情况,有异常,但是你有 Plan B;
    2. 业务异常的记录,比如:当接口抛出业务异常时,应该记录此异常;
    3. warn 日志级别来记录用户输入参数错误的情况,避免用户投诉时,可以追溯信息;

    日志级别 error

    error 级别只记录系统逻辑出错、异常或者重要的错误信息。
    异常在程序中,一般会沿着调用链路,层层上抛,直到service层,在最终处理异常的地方记录log.error(e);log.error和往外抛异常,不应该同时出现;

    错误异常

    错误异常分为程序异常(系统异常)和业务异常;

    程序异常会导致程序不能正常执行;业务异常不会,业务异常的处理属于业务逻辑的一部分;
    ERROR 级别的日志,一出,意味着开发运维人员要介入了,要操作确认一下东西,要维修一些东西了;

    业务系统开发过程中,不需要 log.error 记录异常,让框架和容器(Tomcat等)来做;
    业务异常的需要开发者开发对应的异常处理逻辑,业务异常不是程序异常;比如用户登陆失败;
    业务异常的处理属于正常的业务逻辑,不应该log.error,不重要的可以不log,重要的可以使用log.warn记录,避免用户投诉时,可以追溯信息;

    数据库或者kafka在应用启动时连不上是如何处理的,在应用运行过程中是如何处理的?启动时,依赖的插件有问题,应用直接启动不起来;如果应用已经启动起来了,触发到跟其交互时,抛出异常,程序还是正常执行;

    ExceptionHandler的默认的处理逻辑

    ExceptionHandler的默认的处理逻辑不要吃掉所有的异常,log.error打印出来,这个逻辑主要用来处理程序异常,业务异常都应有对应的处理逻辑;

    @ExceptionHandler(Throwable.class)
    @ResponseBody
    public String handle(Throwable e) {
    	log.error(e.getMessage(),e);
    	//构造返回信息
    	return e.getMessage();
    }
    

    程序异常

    各个组件的异常信息可以各自处理;比如SQLExcption,需要如果SQL 出异常,异常的信息中会有SQL相关的信息,如果直接返回给前端,会造成安全问题,前端对此异常也不关心,而后端开发者关心的信息应该都记录在日志中,以方便分析问题;

    @ExceptionHandler(SQLException.class)
    @ResponseBody
    public String handle(SQLException e) {
    	log.error(e.getMessage(),e);
    	//构造返回信息,记得脱敏
    	return e.getMessage();
    }
    

    各个组件的SDK一般都会有自己的异常体系;可以根据情况直接对顶级异常类,或者典型的异常子类进行处理;
    接入外部组件时,首先分析其SDK的异常体系,编写对应的ExceptionHandler

    org.apache.kafka.common.KafkaException
    org.springframework.kafka.KafkaException
    java.sql.SQLException

    链路追踪

    多个进程间的日志联动
    集中式日志存储系统的存在,让在一个入口处理业务系统的日志成为了可能,产生了高级的用法,链路追踪;
    用户的一个动作触发的在各个系统的所有的执行逻辑,使用一个标识将其联系起来,开发人员分析的时候,可以根据此标识查询所有相关的日志,哪里出问题,一目了然;

    远程调用

    远程调用的plan b,就是熔断降级里面的Plan B;
    外部接口部分,客户端请求参数(REST/WS),调用第三方时的调用参数和调用结果使用info

    如果出异常,调用过程异常或者返回错误码,根据情况选择抛出异常或者启用Plan B;
    抛出异常意味着程序正常流程执行结束,需要处理这个异常,warn记录此异常,然后返回用户结果;
    Plan B意味着程序还可以正常执行下去,warn记录发生了此事件,出现异常转入Plan B;

    远程调用过程中出异常,意味着需要开发人员介入,应该记录 error 级别的异常;
    如果是返回的错误码是非成功执行的错误码,这时候应该根据错误码的级别抛出不同的异常,处理异常的地方根据远程调用的接口的重要程度评估使用不同的日志级别。

    验证日志

    提交代码前,确定通过日志可以看到一个功能的整个执行流程,可以通过日志进行问题定位程序执行的路径;

    参考

    用JAVA日志来写诗
    JAVA - 优雅的记录日志(log4j实战篇)
    Alibaba Java Coding Guidelines
    小白学习如何打日志
    正确的打日志姿势
    Java常用日志框架介绍

    展开全文
  • 通过系统日志采集大数据

    千次阅读 2019-06-25 21:59:58
    处理这些日志需要特定的日志系统,这些系统需要具有以下特征。 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦。 支持近实时的在线分析系统和分布式并发的离线分析系统。 具有高可扩展性,也就是说,当...

    许多公司的平台每天都会产生大量的日志,并且一般为流式数据,如搜索引擎的 pv 和查询等。处理这些日志需要特定的日志系统,这些系统需要具有以下特征。

    • 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦。
    • 支持近实时的在线分析系统和分布式并发的离线分析系统。
    • 具有高可扩展性,也就是说,当数据量增加时,可以通过增加结点进行水平扩展。

    目前使用最广泛的、用于系统日志采集的海量数据采集工具有 Hadoop 的 Chukwa、ApacheFlumeAFacebook 的 Scribe 和 LinkedIn 的 Kafka 等。

    以上工具均采用分布式架构,能满足每秒数百 MB 的日志数据采集和传输需求。本节我们以 Flume 系统为例对系统日志采集方法进行介绍。

    Flume 的基本概念

    Flume 是一个高可用的、高可靠的、分布式的海量日志采集、聚合和传输系统。

    Flume 支持在日志系统中定制各类数据发送方,用于收集数据,同时,Flume 提供对数据进行简单处理,并写到各种数据接收方(如文本、HDFS、HBase 等)的能力。

    Flume 的核心是把数据从数据源(Source)收集过来,再将收集到的数据送到指定的目的地(Smk)。

    为了保证输送的过程一定成功,在送到目的地之前,会先缓存数据到管道(Channel),待数据真正到达目的地后,Flume 再删除缓存的数据,如图 1 所示。

    Flume的基本概念
    图 1  Flume 的基本概念

    Flume 的数据流由事件(Event)贯穿始终,事件是将传输的数据进行封装而得到的,是 Flume 传输数据的基本单位。

    如果是文本文件,事件通常是一行记录。事件携带日志数据并且携带头信息,这些事件由 Agent 外部的数据源生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个) Channel 中。

    Channel 可以看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或者把事件推向另一个 Source。

    Flume 使用方法

    Flume 的用法很简单,主要是编写一个用户配置文件。在配置文件当中描述 Source、Channel 与 Sink 的具体实现,而后运行一个 Agent 实例。

    在运行 Agent 实例的过程中会读取配置文件的内容,这样 Flume 就会采集到数据。

    Flume 提供了大量内置的 Source、Channel 和 Sink 类型,而且不同类型的Source、Channel 和 Sink 可以进行灵活组合。

    配置文件的编写原则如下。

    1)从整体上描述 Agent 中 Sources、Sinks、Channels 所涉及的组件。

    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    2)详细描述 Agent 中每一个 Source、Sink 与 Channel 的具体实现,即需要指定 Source 到底是什么类型的,是接收文件的、接收 HTTP 的,还是接收 Thrift 的。

    对于 Sink,需要指定结果是输出到 HDFS 中,还是 HBase 中等。

    对于Channel,需要指定格式是内存、数据库,还是文件等。

    #Describe/configure the source
    al.sources.r1.type = netcat
    al.sources.r1.bind = localhost
    al.sources.r1.port = 44444

    #Describe the sink
    a1.sinks.k1.type = logger

    #Use a channel which buffers events in memory.
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactioncapacity = 100

    3)通过 Channel 将 Source 与 Sink 连接起来。

    #Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    4)启动 Agent 的 shell 操作。

    flume-ng agent -n a1 -c ../conf -f ../conf/example.file
    -Dflume.root.logger = DEBUG,console

    参数说明如下。

    • “-n”指定 Agent 的名称(与配置文件中代理的名字相同)。
    • “-c”指定 Flume 中配置文件的目录。
    • “-f”指定配置文件。
    • “-Dflume.root.logger = DEBUG,console”设置日志等级。

    Flume 应用案例

    NetCat Source 应用可监听一个指定的网络端口,即只要应用程序向这个端口写数据,这个 Source 组件就可以获取到信息。其中,Sink 使用 logger 类型,Channel 使用内存(Memory)格式。

    1)编写配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 44444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    该配置文件定义了一个名字为 a1 的 Agent,—个 Source 在 port 44444 监听数据,一个 Channel 使用内存缓存事件,一个 Sink 把事件记录在控制台。

    2)启动 FlumeAgental 服务端。

    $ flume-ng agent -n al -c ../conf -f ../conf/neteat.conf
    -Dflume.root.logger=DEBUG,console

    3)使用 Telnet 发送数据。

    以下代码为从另一个终端,使用 Telnet 通过 port 44444 给 Flume 发送数据。

    $ telnet local host 44444
    Trying 127.0.0.1…
    Connected to localhost.localdomain(127.0.0.1).
    Escape character is ‘^]’.
    Hello world! <ENTER>
    OK

    4)在控制台上查看 Flume 收集到的日志数据。

    17/6/19 15:32:19 INFO source.NetcatSource: Sources tarting
    17/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.
    ServerSocketChannelImpl[/127.0.0.1:44444] 17/06/19 15:32:34 INFO sink.LoggerSink: Event:{ headers:{} body:48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Helloworld! .}

    转自:http://www.ryxxff.com/9275.html

    展开全文
  • 最终我们选用了阿里的canal做位日志抽取方。 Canal最早被用于阿里中美机房同步, canal原理相对比较简单: Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Slave发送dump协议 MySQL ...
  • 1.实时日志处理系统架构及整体思路   整个系统分为三层:收集(Agent)层,汇总(Collector)层和处理层。 其中Agent层采用flume收集日志,每个机器部署一个进程,负责对单机的日志收集工作; Collector层flume...
  • MyBatis面试题(2020最新版)

    万次阅读 多人点赞 2019-09-24 16:40:33
    基础支撑层:负责最基础的功能支撑,包括连接管理、事务管理、配置加载和缓存处理,这些都是共用的东西,将他们抽取出来作为最基础的组件。为上层的数据处理层提供最基础的支撑。 MyBatis的框架架构设计是怎么样的 ...
  • PI日志按XML字段抽取查找(多图) 运行T-code:SXMS_LMS_CONF 填写“接口名”、“发送方系统”和“接收方系统” 填写“XPATH”并选中下面两个框 填写对应的“Namespace” 填写“XPATH”和“Namespace”可以参考ID中的 ...
  • 数据抽取系统,就是从不同的数据源里抽取数据,并将其输入到ETL流程中。数据的抽取方式有两种:基于流的抽取和基于文件的抽取。这两种方式,从本质上讲,都是数据流。唯一有区别的地方,基于静态的文件时进行抽取时...
  • ...nginx是web服务器,不是收集日志系统,擅长在做web服务,或者写日志,每小时切断是为了日志轮换而已。上面说的只是将不同地方日志收拢接受过来进行而已,不负责写日志,也不负责日志解析
  • 【知识图谱】知识图谱的基础概念与构建流程

    千次阅读 多人点赞 2019-11-09 18:46:49
    文献[16]首次实现了一套能够抽取公司名称的实体抽取系统,其中主要用到了启发式算法与规则模板相结合的方法。然而,基于规则模板的方法不仅需要依靠大量的专家来编写规则或模板,覆盖的领域范围有限,而且很难适应...
  • 手机app日志分析系统(一)

    千次阅读 2018-11-19 13:23:08
    抽取共性,创建日志基本父类AppBaseLog package com.test.app.common; import java.io.Serializable; /** * AppBaseLog */ public class AppBaseLog implements Serializable { private Long ...
  • 系统日志分析demo

    2013-08-27 16:17:44
    最近完成了一个日志分析项目,执行效率也还行。顺手抽取了思路写了一个demo,分享给大家。 eclipse直接导入Java Project。build path需要自己修改一下。lib全有。 项目使用了Quartz、Jaxb和spring多线程等技术。
  • ELK日志系统+redis解决数据丢失问题

    千次阅读 2019-07-07 11:40:26
    将server2端的日志采集存入部署在server3端的redis数据库 [root@server2 ~]# cd /etc/logstash/conf.d/ [root@server2 conf.d]# vim apache.conf input { file { path => "/var/log/httpd/access_log" start_...
  • Spring

    千次阅读 多人点赞 2020-02-04 17:41:35
    AOP面向切面编程,可以将通用的任务抽取出来,复用性更高; Spring对于其余主流框架都提供了很好的支持,代码的侵入性很低。 1.1搭建 Spring 运行时环境 加入 JAR 包 在 Spring Tool Suite 工具中通过如下步骤创建 ...
  • 这个log是从开源的chromium工程中抽取出来的。 支持的特性: 支持输出log到文件,系统调试器 支持输出不同等级log 支持错误回调函数 支持惰性输出,支持条件输出,支持仅在debug模式生效 支持同时输出当前的...
  • 4. 基于日志的CDC 三、使用Sqoop抽取数据 1. Sqoop简介 2. 使用Sqoop抽取数据 3. Sqoop优化 (1)调整Sqoop命令行参数 (2)调整数据库 四、小结 本篇介绍如何利用Kettle提供的转换步骤和作业项实现Hadoop...
  • ETL详细讲解

    千次阅读 2019-08-27 23:04:35
    ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据。ETL是BI项目重要的一个环节。 通常情况下,在BI项目中ETL...
  • logstash采集日志

    千次阅读 2019-07-19 10:41:35
    logstash主要用来采集分布式及微服务系统日志,从而对日志进行统一管理分析检索。 下载https://www.elastic.co/cn/downloads/logstash,根据自己系统对应下载(linux建议使用TAR.GZ,win系使用zip)笔者使用windows ...
  • JAVA上百实例源码以及开源项目

    千次下载 热门讨论 2016-01-03 17:37:40
    利用随机函数抽取幸运数字 简单 EJB的真实世界模型(源代码) 15个目标文件 摘要:Java源码,初学实例,基于EJB的真实世界模型  基于EJB的真实世界模型,附源代码,部分功能需JSP配合完成。 J2ME优化压缩PNG文件 4个...
  • 日志处理方法及系统

    千次阅读 2016-07-16 14:02:38
    申请人: 深圳市世纪光速信息技术有限公司 ... A ...针对不同格式的脚本分别建立相应的日志处理脚本从而构成动态脚本库,在该日志处理脚本中各种格式化操作由上述的函数完成;载入具有一定格式的
  • 用户操作日志对于每一个系统来说是不可或缺的,并且操作日志应该单独抽取为一个模块业务,不应该与主业务系统之间耦合在一起。 故而我们需要将其单独抽出并以异步的方式与主模块进行异步通信交互数据。 要求:采用...
  • 同大多数关系型数据库一样,日志文件是MySQL数据库的重要组成部分。MySQL有几种不同的日志文件,通常包括错误日志文件,... 1、MySQL日志文件系统的组成 a、错误日志:记录启动、运行或停止mysqld时出现的问题。 ...
  • 什么是数据抽取 --全量抽取、增量抽取

    万次阅读 多人点赞 2016-06-21 10:27:09
     数据抽取是指从源数据源系统抽取目的数据源系统需要的数据。实际应用中,数据源较多采用的是关系数据库。 [编辑] 数据抽取的方式  (一) 全量抽取  全量抽取类似于数据迁移或数据复制,它将数据源中的表或视图的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 38,520
精华内容 15,408
关键字:

日志抽取系统