精华内容
下载资源
问答
  • 1.引入依赖 <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.18.1</version> </depen...

    1.引入依赖

    <dependency>
        <groupId>com.github.shyiko</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>0.18.1</version>
    </dependency>
    

    2.以文件流的形式读取binlog文件然后输出:

    public static void main(String[] args) throws IOException {
            String filePath="D:\\mysql\\mysql-8.0.13-winx64\\log\\mysql-bin.000001";
            File binlogFile = new File(filePath);
            EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(
                    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
            );
            BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
            try {
                for (Event event; (event = reader.readEvent()) != null; ) {
                    System.out.println(event.toString());
                }
            } finally {
                reader.close();
            }
         }
    

    测试结果:

    "C:\Program Files\Java\jdk1.8.0_202\bin\java.exe" "-javaagent:D:\IntelliJ IDEA 2018.3.5\lib\idea_rt.jar=58499:D:\IntelliJ IDEA 2018.3.5\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_202\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_202\jre\lib\rt.jar;E:\驰星教育培训\项目\spring-esdome\target\classes;D:\Maven\repository\org\springframework\boot\spring-boot-starter-web\2.1.8.RELEASE\spring-boot-starter-web-2.1.8.RELEASE.jar;D:\Maven\repository\org\springframework\boot\spring-boot-starter\2.1.8.RELEASE\spring-boot-starter-2.1.8.RELEASE.jar;D:\Maven\repository\org\springframework\boot\spring-boot\2.1.8.RELEASE\spring-boot-2.1.8.RELEASE.jar;D:\Maven\repository\org\springframework\boot\spring-boot-autoconfigure\2.1.8.RELEASE\spring-boot-autoconfigure-2.1.8.RELEASE.jar;D:\Maven\repository\org\springframework\boot\spring-boot-starter-logging\2.1.8.RELEASE\spring-boot-starter-logging-2.1.8.RELEASE.jar;D:\Maven\repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;D:\Maven\repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;D:\Maven\repository\org\apache\logging\log4j\log4j-to-slf4j\2.11.2\log4j-to-slf4j-2.11.2.jar;D:\Maven\repository\org\apache\logging\log4j\log4j-api\2.11.2\log4j-api-2.11.2.jar;D:\Maven\repository\org\slf4j\jul-to-slf4j\1.7.28\jul-to-slf4j-1.7.28.jar;D:\Maven\repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;D:\Maven\repository\org\yaml\snakeyaml\1.23\snakeyaml-1.23.jar;D:\Maven\repository\org\springframework\boot\spring-boot-starter-json\2.1.8.RELEASE\spring-boot-starter-json-2.1.8.RELEASE.jar;D:\Maven\repository\com\fasterxml\jackson\core\jackson-databind\2.9.9.3\jackson-databind-2.9.9.3.jar;D:\Maven\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;D:\Maven\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.9\jackson-datatype-jdk8-2.9.9.jar;D:\Maven\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.9\jackson-datatype-jsr310-2.9.9.jar;D:\Maven\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.9\jackson-module-parameter-names-2.9.9.jar;D:\Maven\repository\org\springframework\boot\spring-boot-starter-tomcat\2.1.8.RELEASE\spring-boot-starter-tomcat-2.1.8.RELEASE.jar;D:\Maven\repository\org\apache\tomcat\embed\tomcat-embed-core\9.0.24\tomcat-embed-core-9.0.24.jar;D:\Maven\repository\org\apache\tomcat\embed\tomcat-embed-el\9.0.24\tomcat-embed-el-9.0.24.jar;D:\Maven\repository\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.24\tomcat-embed-websocket-9.0.24.jar;D:\Maven\repository\org\hibernate\validator\hibernate-validator\6.0.17.Final\hibernate-validator-6.0.17.Final.jar;D:\Maven\repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;D:\Maven\repository\org\jboss\logging\jboss-logging\3.3.3.Final\jboss-logging-3.3.3.Final.jar;D:\Maven\repository\com\fasterxml\classmate\1.4.0\classmate-1.4.0.jar;D:\Maven\repository\org\springframework\spring-web\5.1.9.RELEASE\spring-web-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\spring-beans\5.1.9.RELEASE\spring-beans-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\spring-webmvc\5.1.9.RELEASE\spring-webmvc-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\spring-aop\5.1.9.RELEASE\spring-aop-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\spring-context\5.1.9.RELEASE\spring-context-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\spring-expression\5.1.9.RELEASE\spring-expression-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\boot\spring-boot-starter-data-elasticsearch\2.1.8.RELEASE\spring-boot-starter-data-elasticsearch-2.1.8.RELEASE.jar;D:\Maven\repository\org\springframework\data\spring-data-elasticsearch\3.1.10.RELEASE\spring-data-elasticsearch-3.1.10.RELEASE.jar;D:\Maven\repository\org\springframework\spring-tx\5.1.9.RELEASE\spring-tx-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\data\spring-data-commons\2.1.10.RELEASE\spring-data-commons-2.1.10.RELEASE.jar;D:\Maven\repository\joda-time\joda-time\2.10.3\joda-time-2.10.3.jar;D:\Maven\repository\org\elasticsearch\client\transport\6.4.3\transport-6.4.3.jar;D:\Maven\repository\org\elasticsearch\elasticsearch\6.4.3\elasticsearch-6.4.3.jar;D:\Maven\repository\org\elasticsearch\elasticsearch-core\6.4.3\elasticsearch-core-6.4.3.jar;D:\Maven\repository\org\elasticsearch\elasticsearch-secure-sm\6.4.3\elasticsearch-secure-sm-6.4.3.jar;D:\Maven\repository\org\elasticsearch\elasticsearch-x-content\6.4.3\elasticsearch-x-content-6.4.3.jar;D:\Maven\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-smile\2.9.9\jackson-dataformat-smile-2.9.9.jar;D:\Maven\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-yaml\2.9.9\jackson-dataformat-yaml-2.9.9.jar;D:\Maven\repository\com\fasterxml\jackson\dataformat\jackson-dataformat-cbor\2.9.9\jackson-dataformat-cbor-2.9.9.jar;D:\Maven\repository\org\apache\lucene\lucene-core\7.4.0\lucene-core-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-analyzers-common\7.4.0\lucene-analyzers-common-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-backward-codecs\7.4.0\lucene-backward-codecs-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-grouping\7.4.0\lucene-grouping-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-highlighter\7.4.0\lucene-highlighter-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-join\7.4.0\lucene-join-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-memory\7.4.0\lucene-memory-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-misc\7.4.0\lucene-misc-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-queries\7.4.0\lucene-queries-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-queryparser\7.4.0\lucene-queryparser-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-sandbox\7.4.0\lucene-sandbox-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-spatial\7.4.0\lucene-spatial-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-spatial-extras\7.4.0\lucene-spatial-extras-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-spatial3d\7.4.0\lucene-spatial3d-7.4.0.jar;D:\Maven\repository\org\apache\lucene\lucene-suggest\7.4.0\lucene-suggest-7.4.0.jar;D:\Maven\repository\org\elasticsearch\elasticsearch-cli\6.4.3\elasticsearch-cli-6.4.3.jar;D:\Maven\repository\net\sf\jopt-simple\jopt-simple\5.0.2\jopt-simple-5.0.2.jar;D:\Maven\repository\com\carrotsearch\hppc\0.7.1\hppc-0.7.1.jar;D:\Maven\repository\com\tdunning\t-digest\3.2\t-digest-3.2.jar;D:\Maven\repository\org\hdrhistogram\HdrHistogram\2.1.9\HdrHistogram-2.1.9.jar;D:\Maven\repository\org\elasticsearch\jna\4.5.1\jna-4.5.1.jar;D:\Maven\repository\org\elasticsearch\plugin\reindex-client\6.4.3\reindex-client-6.4.3.jar;D:\Maven\repository\org\elasticsearch\client\elasticsearch-rest-client\6.4.3\elasticsearch-rest-client-6.4.3.jar;D:\Maven\repository\org\apache\httpcomponents\httpclient\4.5.9\httpclient-4.5.9.jar;D:\Maven\repository\org\apache\httpcomponents\httpcore\4.4.12\httpcore-4.4.12.jar;D:\Maven\repository\org\apache\httpcomponents\httpasyncclient\4.1.4\httpasyncclient-4.1.4.jar;D:\Maven\repository\org\apache\httpcomponents\httpcore-nio\4.4.12\httpcore-nio-4.4.12.jar;D:\Maven\repository\commons-codec\commons-codec\1.11\commons-codec-1.11.jar;D:\Maven\repository\org\elasticsearch\plugin\lang-mustache-client\6.4.3\lang-mustache-client-6.4.3.jar;D:\Maven\repository\com\github\spullara\mustache\java\compiler\0.9.3\compiler-0.9.3.jar;D:\Maven\repository\org\elasticsearch\plugin\percolator-client\6.4.3\percolator-client-6.4.3.jar;D:\Maven\repository\org\elasticsearch\plugin\parent-join-client\6.4.3\parent-join-client-6.4.3.jar;D:\Maven\repository\org\elasticsearch\plugin\rank-eval-client\6.4.3\rank-eval-client-6.4.3.jar;D:\Maven\repository\com\fasterxml\jackson\core\jackson-core\2.9.9\jackson-core-2.9.9.jar;D:\Maven\repository\org\elasticsearch\plugin\transport-netty4-client\6.4.3\transport-netty4-client-6.4.3.jar;D:\Maven\repository\io\netty\netty-buffer\4.1.39.Final\netty-buffer-4.1.39.Final.jar;D:\Maven\repository\io\netty\netty-codec\4.1.39.Final\netty-codec-4.1.39.Final.jar;D:\Maven\repository\io\netty\netty-codec-http\4.1.39.Final\netty-codec-http-4.1.39.Final.jar;D:\Maven\repository\io\netty\netty-common\4.1.39.Final\netty-common-4.1.39.Final.jar;D:\Maven\repository\io\netty\netty-handler\4.1.39.Final\netty-handler-4.1.39.Final.jar;D:\Maven\repository\io\netty\netty-resolver\4.1.39.Final\netty-resolver-4.1.39.Final.jar;D:\Maven\repository\io\netty\netty-transport\4.1.39.Final\netty-transport-4.1.39.Final.jar;D:\Maven\repository\org\slf4j\slf4j-api\1.7.28\slf4j-api-1.7.28.jar;D:\Maven\repository\org\springframework\spring-core\5.1.9.RELEASE\spring-core-5.1.9.RELEASE.jar;D:\Maven\repository\org\springframework\spring-jcl\5.1.9.RELEASE\spring-jcl-5.1.9.RELEASE.jar;D:\Maven\repository\com\github\shyiko\mysql-binlog-connector-java\0.18.1\mysql-binlog-connector-java-0.18.1.jar" com.czxy.binlogutil.Sample
    Event{header=EventHeaderV4{timestamp=1570352811000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=84, nextPosition=107, flags=1}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.5.28-log', headerLength=19, dataLength=84, checksumType=NONE}}
    Event{header=EventHeaderV4{timestamp=1570352823000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=179, flags=8}, data=QueryEventData{threadId=2, executionTime=0, errorCode=0, database='seasonal', sql='BEGIN'}}
    Event{header=EventHeaderV4{timestamp=1570352823000, eventType=QUERY, serverId=1, headerLength=19, dataLength=121, nextPosition=319, flags=0}, data=QueryEventData{threadId=2, executionTime=0, errorCode=0, database='seasonal', sql='UPDATE `compose_good` SET `compose_good_name`='abc' WHERE (`id`='100136')'}}
    Event{header=EventHeaderV4{timestamp=1570352823000, eventType=XID, serverId=1, headerLength=19, dataLength=8, nextPosition=346, flags=0}, data=XidEventData{xid=7}}
    Event{header=EventHeaderV4{timestamp=1570352867000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=418, flags=8}, data=QueryEventData{threadId=3, executionTime=0, errorCode=0, database='seasonal', sql='BEGIN'}}
    Event{header=EventHeaderV4{timestamp=1570352867000, eventType=QUERY, serverId=1, headerLength=19, dataLength=109, nextPosition=546, flags=0}, data=QueryEventData{threadId=3, executionTime=0, errorCode=0, database='seasonal', sql='UPDATE `good` SET `create_time`='2019-09-19' WHERE (`id`='5')'}}
    Event{header=EventHeaderV4{timestamp=1570352867000, eventType=XID, serverId=1, headerLength=19, dataLength=8, nextPosition=573, flags=0}, data=XidEventData{xid=18}}
    Event{header=EventHeaderV4{timestamp=1570353445000, eventType=QUERY, serverId=1, headerLength=19, dataLength=53, nextPosition=645, flags=8}, data=QueryEventData{threadId=5, executionTime=0, errorCode=0, database='seasonal', sql='BEGIN'}}
    Event{header=EventHeaderV4{timestamp=1570353445000, eventType=INTVAR, serverId=1, headerLength=19, dataLength=9, nextPosition=673, flags=0}, data=IntVarEventData{type=2, value=100137}}
    Event{header=EventHeaderV4{timestamp=1570353445000, eventType=QUERY, serverId=1, headerLength=19, dataLength=390, nextPosition=1082, flags=0}, data=QueryEventData{threadId=5, executionTime=0, errorCode=0, database='seasonal', sql='INSERT INTO `compose_good` (`compose_good_name`, `compose_good_price`, `compose_good_type`, `compose_good_status`, `compose_good_weight`, `compose_good_icon`, `compose_good_sales`, `comment_number`, `compose_good_describe`, `create_time`, `update_time`) VALUES ('陆旭', '0', '0', '0', '3000', '0', '0', '0', '0', '2019-11-01', '2019-10-06')'}}
    Event{header=EventHeaderV4{timestamp=1570353445000, eventType=XID, serverId=1, headerLength=19, dataLength=8, nextPosition=1109, flags=0}, data=XidEventData{xid=27}}
    
    

    这样 就可以根据需求去提取其中的数据了然后进行你的逻辑即可。

    展开全文
  • mysql binlog系列(二)----java解析binlog

    万次阅读 2015-12-22 23:34:37
    在进入正题之前,我们需要知道binlog的event的类型,先来看看自己binlog文件有哪些? 其中红色部分为event_type。 (一) binlog event 的类型 binlog event 的类型有很多,具体可以参见mysql官方文档:...

    在进入正题之前,我们需要知道binlog的event的类型,先来看看自己binlog文件有哪些?


    其中红色部分为event_type。


    binlog event 的类型有很多,具体可以参见mysql官方文档:http://dev.mysql.com/doc/internals/en/event-meanings.html


    (一)Open Replicator中相关的Event类与接口

    Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。


    所有的Event实现了BinlogEventV4接口。


    BinlogEventV4的接口如下:

    /**
     * +=====================================+
     * | event  | timestamp         0 : 4    |
     * | header +----------------------------+
     * |        | type_code         4 : 1    |
     * |        +----------------------------+
     * |        | server_id         5 : 4    |
     * |        +----------------------------+
     * |        | event_length      9 : 4    |
     * |        +----------------------------+
     * |        | next_position    13 : 4    |
     * |        +----------------------------+
     * |        | flags            17 : 2    |
     * +=====================================+
     * | event  | fixed part       19 : y    |
     * | data   +----------------------------+
     * |        | variable part              |
     * +=====================================+
     * @author Jingqi Xu
     */
    public interface BinlogEventV4 {
    	
    	BinlogEventV4Header getHeader();
    }
    

    (二)利用Open Replicator解析binlog

    在这里首先申明本人的测试环境为:mysql 5.1.61 ,binlog的类型设置为Row,本次解析只考虑insert、update、delete三种事件类型。我们先将三种类型的时间包装为一个新的Event,如下所示:

    public class LogEvent implements Serializable{
    
    	/**
    	 * 只针对delete、insert、update事件
    	 */
    	private static final long serialVersionUID = 5503152746318421290L;
    	
    	private String eventId = null;
    	private String databaseName = null;
    	private String tableName = null;
    	private String  eventType  = null;
        private Long    timestamp  = null;
        private Long timestampRecepite = null;
        private String binlogName = null;
        private Long position = null;
        private Long nextPosition = null;
        private Long serverId = null;
        private Map<String, String> before =null;
        private Map<String, String> after = null;
        
        public LogEvent(){
        	
        }
        
        public LogEvent(final QueryEvent qe,String databaseName,String tableName){
        	this.init(qe);
        	this.databaseName=databaseName;
        	this.tableName=tableName;	
        }
        
        public LogEvent(final AbstractRowEvent re){
        	this.init(re);
        	TableMapEvent tableMapEvent =re.getTme();
        	this.databaseName=tableMapEvent.getDatabaseName().toString();
        	this.tableName=tableMapEvent.getTableName().toString();	
        }
        
        private void init(final BinlogEventV4 be){
        	this.eventId=UUID.randomUUID().toString();
        	BinlogEventV4Header header = be.getHeader();
        	this.binlogName = header.getBinlogName();
        	this.position = header.getPosition();
        	this.nextPosition = header.getNextPosition();
        	this.timestamp = header.getTimestamp();
        	this.timestampRecepite = header.getTimestampOfReceipt();
        	this.serverId=header.getServerId();
        	this.eventType=MySqlEventTypeIdToString.getInstance().get(header.getEventType());
        }
        
        
    
    	@Override
    	public String toString() {
    		StringBuilder builder = new StringBuilder();
    		builder.append("{ eventId:").append(eventId);
    		builder.append(",databaseName:").append(databaseName);
    		builder.append(",tableName:").append(tableName);
    		builder.append(",eventType:").append(eventType);
    		builder.append(",timestamp:").append(timestamp);
    		builder.append(",timestampRecepite:").append(timestampRecepite);
    		builder.append(",binlogName:").append(binlogName);
    		builder.append(",position:").append(position);
    		builder.append(",nextPosition:").append(nextPosition);
    		builder.append(",serverId:").append(serverId);
    		builder.append(",before:").append(before);
    		builder.append(",after:").append(after).append(" }");
    		return builder.toString();
    	}
    
    
    
    	public String getEventId() {
    		return eventId;
    	}
    
    	public void setEventId(String eventId) {
    		this.eventId = eventId;
    	}
    
    	public String getDatabaseName() {
    		return databaseName;
    	}
    
    	public void setDatabaseName(String databaseName) {
    		this.databaseName = databaseName;
    	}
    
    	public String getTableName() {
    		return tableName;
    	}
    
    	public void setTableName(String tableName) {
    		this.tableName = tableName;
    	}
    
    	public String getEventType() {
    		return eventType;
    	}
    
    	public void setEventType(String eventType) {
    		this.eventType = eventType;
    	}
    
    	public Long getTimestamp() {
    		return timestamp;
    	}
    
    	public void setTimestamp(Long timestamp) {
    		this.timestamp = timestamp;
    	}
    
    	public Long getTimestampRecepite() {
    		return timestampRecepite;
    	}
    
    	public void setTimestampRecepite(Long timestampRecepite) {
    		this.timestampRecepite = timestampRecepite;
    	}
    
    	public String getBinlogName() {
    		return binlogName;
    	}
    
    	public void setBinlogName(String binlogName) {
    		this.binlogName = binlogName;
    	}
    
    	public Long getPosition() {
    		return position;
    	}
    
    	public void setPosition(Long position) {
    		this.position = position;
    	}
    
    	public Long getNextPosition() {
    		return nextPosition;
    	}
    
    	public void setNextPosition(Long nextPosition) {
    		this.nextPosition = nextPosition;
    	}
    
    	public Long getServerId() {
    		return serverId;
    	}
    
    	public void setServerId(Long serverId) {
    		this.serverId = serverId;
    	}
    
    	public Map<String, String> getBefore() {
    		return before;
    	}
    
    	public void setBefore(Map<String, String> before) {
    		this.before = before;
    	}
    
    	public Map<String, String> getAfter() {
    		return after;
    	}
    
    	public void setAfter(Map<String, String> after) {
    		this.after = after;
    	}

    其中 before、after为一个map,表示变化前后所在行的所有数据(columnName:columnValue)!


    好的,先上主程序:

    public class OpenReplicatorTest {
    	public static void main(String args[]) throws Exception {	
    		final OpenReplicator or = new OpenReplicator();
    		or.setUser("root");
    		or.setPassword("root");
    		or.setHost("xx.xxx.xx.xx");
    		or.setPort(3306);
    		or.setServerId(23);
    		or.setBinlogPosition(106);
    		or.setBinlogFileName("mysql-bin.000001");
    		
    		or.setBinlogEventListener(new NotificationListener());
    		or.start();
    		}
    	}

    设置监控器NotificationListener,NotificationListener需要实现BinlogEventListener接口:

    public class NotificationListener implements BinlogEventListener{
    	
    	private static Logger logger = LoggerFactory.getLogger(NotificationListener.class);
        
    	private String host="xx.xx.xx.xx";
    	private Integer port=3306;
    	private String username="root";
    	private String password="root";
    	
    	public void onEvents(BinlogEventV4 event) {
    		if(event==null){
    			logger.error("binlog event is null");
    			return;
    		}
    
    		if(event instanceof UpdateRowsEvent){
    			UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent)event;
    			LogEvent logEvent = new LogEvent(updateRowsEvent);
    			
    			List<Pair<Row>> rows = updateRowsEvent.getRows();
    			List<Column> cols_after = null;
    			List<Column> cols_before = null;
    			for(Pair<Row> p : rows){
    				 Row after = p.getAfter();
    				 Row before = p.getBefore();
    				 cols_after = after.getColumns();
    				 cols_before = before.getColumns();
    				 break;
    			}
    			logEvent.setBefore(getMap(cols_before, updateRowsEvent.getTme().getDatabaseName().toString(), updateRowsEvent.getTme().getTableName().toString()));
    			logEvent.setAfter(getMap(cols_after, updateRowsEvent.getTme().getDatabaseName().toString(), updateRowsEvent.getTme().getTableName().toString()));
    			logger.info("update event is:"+logEvent);
    		}else if(event instanceof DeleteRowsEvent){
    			DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent)event;
    			LogEvent logEvent = new LogEvent(deleteRowsEvent);
    			List<Row> rows = deleteRowsEvent.getRows();
    			List<Column> before = null;
    			for(Row row:rows){
    				before = row.getColumns();
    				break;
    			}
    			logEvent.setBefore(getMap(before, deleteRowsEvent.getTme().getDatabaseName().toString(), deleteRowsEvent.getTme().getTableName().toString()));
    			logger.info("delete event is:"+logEvent);
    			
    		}else if(event instanceof WriteRowsEvent){
    			WriteRowsEvent wrtiteRowsEvent = (WriteRowsEvent)event;
    			LogEvent logEvent = new LogEvent(wrtiteRowsEvent);
    			List<Row> rows = wrtiteRowsEvent.getRows();
    			List<Column> before = null;
    			for(Row row:rows){
    				before = row.getColumns();
    				break;
    			}
    			logEvent.setAfter(getMap(before, wrtiteRowsEvent.getTme().getDatabaseName().toString(), wrtiteRowsEvent.getTme().getTableName().toString()));
    			logger.info("write event is:"+logEvent);
    			
    		}	
    	}
    	
    	private Map<String, String> getMap(List<Column> cols,String databaseName,String tableName){
    		if(cols==null||cols.size()==0){
    			return null;
    		}
    		List<String> columnNames = new TableInfo(host,username,password, port).getColumns(databaseName, tableName);
    	    if(columnNames==null){
    	    	return null;
    	    }
    	    if(columnNames.size()!=cols.size()){
    	    	logger.error("the size does not match...");
    	    	return null;
    	    }
    	    Map<String, String> map = new HashMap<String, String>();
    	    for(int i=0;i<columnNames.size();i++){
    	    	if(cols.get(i).getValue()==null){
    	    		map.put(columnNames.get(i).toString(),"");
    	    	}else{
    	    		map.put(columnNames.get(i).toString(),cols.get(i).toString());
    	    	}
    	    	
    	    }
    	    return map;
    	}
    

    由于Open Replicator提供的Event中不包含数据库表中所有字段column name的信息,DeleteRowsEvent、UpdateRowsEvent、WriteRowsEvent包含变化前后的字段column value信息,而我们需要将其组合成before与after,因此需要想办法获取column names:

    public class TableInfo {
    	private static Logger logger = LoggerFactory.getLogger(TableInfo.class);
    	
    	/**
    	 * key:databaseName+""+tableName
    	 * value:columns name
    	 */
    	private static Map<String, List<String>> columnsMap = new HashMap<String, List<String>>();
    	private String host;
    	private Integer port;
    	private String username;
    	private String password;
    	
    	public TableInfo(String host,String username,String password,Integer port){
    		this.host=host;
    		this.username=username;
    		this.password=password;
    		this.port = port;
    		if(columnsMap==null||columnsMap.size()==0){
    			MysqlConnection.setConnection(this.host,this.port,this.username,this.password);
    		    columnsMap = MysqlConnection.getColumns();
    		}
    	}
    	
    	public Map<String, List<String>> getMap(){
    		return columnsMap;
    	}
    	
    	public List<String> getColumns(String databaseName,String tableName){
    		if(StringUtils.isNullOrEmpty(databaseName)||StringUtils.isNullOrEmpty(tableName)){
    			return null;
    		}
    		String key = databaseName + "."+tableName;
    		List<String> list =null;
    		if(columnsMap.size()==0){
    			MysqlConnection.setConnection(this.host,this.port,this.username,this.password);
    		    columnsMap = MysqlConnection.getColumns();
    			list = columnsMap.get(key);
    		}else{
    			list=columnsMap.get(key);
    			if(list==null||list.size()==0){
    				MysqlConnection.setConnection(this.host,this.port,this.username,this.password);
    				columnsMap = MysqlConnection.getColumns();
    				list = columnsMap.get(key);
    			}
    			
    		}
    		return list;
    	}


    MysqlConnection实现类如下:

    public class MysqlConnection {
    
    	private static Connection conn;
    
    	private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
    	private static String host;
    	private static Integer port;
    	private static String user;
    	private static String password;
    
    	public static void setConnection(String mySQLHost, Integer mySQLPort, String mySQLUser,
    			String mySQLPassword) {
    		try {
    			if (conn == null || conn.isClosed()) {
    				Class.forName("com.mysql.jdbc.Driver");
    
    				conn = DriverManager.getConnection("jdbc:mysql://" + mySQLHost + ":" + mySQLPort
    						+ "/", mySQLUser, mySQLPassword);
    				logger.info("connected to mysql:{} : {}", mySQLHost, mySQLPort);
    				host = mySQLHost;
    				port = mySQLPort;
    				user = mySQLUser;
    				password = mySQLPassword;
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage(), e);
    		}
    	}
    
    	public static Connection getConnection() {
    		try {
    			if (conn == null || conn.isClosed()) {
    				setConnection(host, port, user, password);
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage(), e);
    		}
    		return conn;
    	}
    
    	public static Map<String, List<String>> getColumns(){
    		Map<String, List<String>> cols = new HashMap<String, List<String>>();
    		Connection conn = getConnection();
    		try {
    			DatabaseMetaData metaData = conn.getMetaData();
    			ResultSet r = metaData.getCatalogs();
    			String tableType[] = { "TABLE" };
    			while (r.next()) {
    				String databaseName = r.getString("TABLE_CAT");
    				ResultSet result = metaData.getTables(databaseName, null, null, tableType);
    				while (result.next()) {
    					String tableName = result.getString("TABLE_NAME");
    					String key = databaseName + "." + tableName;
    					ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
    					cols.put(key, new ArrayList<String>());
    					while (colSet.next()) {
    						String column = colSet.getString("COLUMN_NAME");
    						cols.get(key).add(column);
    					}
    				}
    			}
    
    		} catch (SQLException e) {
    			logger.error(e.getMessage(), e);
    			return null;
    		}
    		return cols;
    	}
    }
    

    辅助类,根据event id获取event type:

    public class MySqlEventTypeIdToString {
        private static Map<Integer, String> idToString = new HashMap<Integer, String>();
        private MySqlEventTypeIdToString() {
            Init();
        }
        public static MySqlEventTypeIdToString getInstance() {
            return m;
        }
        private void Init() {
            idToString.put(0,"UNKNOWN_EVENT");
            idToString.put(1,"START_EVENT_V3");
            idToString.put(2,"QUERY_EVENT");
            idToString.put(3,"STOP_EVENT");
            idToString.put(4,"ROTATE_EVENT");
            idToString.put(5,"INTVAR_EVENT");
            idToString.put(6,"LOAD_EVENT");
            idToString.put(7,"SLAVE_EVENT");
            idToString.put(8,"CREATE_FILE_EVENT");
            idToString.put(9,"APPEND_BLOCK_EVENT");
            idToString.put(10,"EXEC_LOAD_EVENT");
            idToString.put(11,"DELETE_FILE_EVENT");
            idToString.put(12,"NEW_LOAD_EVENT");
            idToString.put(13,"RAND_EVENT");
            idToString.put(14,"USER_VAR_EVENT");
            idToString.put(15,"FORMAT_DESCRIPTION_EVENT");
            idToString.put(16,"XID_EVENT");
            idToString.put(17,"BEGIN_LOAD_QUERY_EVENT");
            idToString.put(18,"EXECUTE_LOAD_QUERY_EVENT");
            idToString.put(19,"TABLE_MAP_EVENT");
            idToString.put(20,"PRE_GA_WRITE_ROWS_EVENT");
            idToString.put(21,"PRE_GA_UPDATE_ROWS_EVENT");
            idToString.put(22,"PRE_GA_DELETE_ROWS_EVENT");
            idToString.put(23,"WRITE_ROWS_EVENT");
            idToString.put(24,"UPDATE_ROWS_EVENT");
            idToString.put(25,"DELETE_ROWS_EVENT");
            idToString.put(26,"INCIDENT_EVENT");
            idToString.put(27,"HEARTBEAT_LOG_EVENT");
            idToString.put(28,"IGNORABLE_LOG_EVENT");
            idToString.put(29,"ROWS_QUERY_LOG_EVENT");
            idToString.put(30,"WRITE_ROWS_EVENT_V2");
            idToString.put(31,"UPDATE_ROWS_EVENT_V2");
            idToString.put(32,"DELETE_ROWS_EVENT_V2");
            idToString.put(33,"GTID_LOG_EVENT");
            idToString.put(34,"ANONYMOUS_GTID_LOG_EVENT");
            idToString.put(35,"PREVIOUS_GTIDS_LOG_EVENT");
        }
        public String get(Integer eventId) {
            return idToString.get(eventId);
        }
    }

    运行:

    update event is: {
        eventId: a7acc3d0-7721-4ffe-84d4-4c2b7db5423a,
        databaseName: test,
        tableName: task,
        eventType: UPDATE_ROWS_EVENT,
        timestamp: 1450753740000,
        timestampRecepite: 1450887259271,
        binlogName: mysql-bin.000001,
        position: 248,
        nextPosition: 358,
        serverId: 23,
        before: {
            id=791,
            user_name=123,
            topology_path=,
            update_time=2015-08-05 10:53:57.0,
            status=1,
            department=,
            name=user01,
            create_time=2015-12-21 19:30:36.0,
            user_id=-1
        },
        after: {
            id=791,
            user_name=123,
            topology_path=,
            update_time=2015-08-05 10:53:57.0,
            status=2,
            department=,
            name=user02,
            create_time=2015-12-22 11:09:00.0,
            user_id=-1
        }
    }







    展开全文
  • 1,找到TiDB的官方文档中binlog的说明: ...官方的文档说明 TiDB的binlog主要是采用protobuf对日志文件序列化并发送到kafka...2,将文档中protobuf文件的定义的格式copy到一个 binlog.proto文件中; 3,maven项目pom中依赖

    1,找到TiDB的官方文档中binlog的说明:
    https://docs.pingcap.com/zh/tidb/stable/binlog-slave-client#binlog-slave-client-%E7%94%A8%E6%88%B7%E6%96%87%E6%A1%A3

    官方的文档说明 TiDB的binlog主要是采用protobuf对日志文件序列化并发送到kafka中;
    

    2,将文档中protobuf文件的定义的格式copy到一个 binlog.proto文件中;
    3,maven项目pom中依赖

            <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>3.9.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java-util</artifactId>
                <version>3.9.1</version>
            </dependency>
    
          <build>
            <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.5.0.Final</version>
                </extension>
            </extensions>
    
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
    
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.5.1</version>
                    <extensions>true</extensions>
                    <configuration>
    		                    <protoSourceRoot>${project.basedir}/src/main/java/com/zhangna/tidb/binlog/proto</protoSourceRoot>
    		                    <outputDirectory>${project.basedir}/src/main/java/com/zhangna/tidb/binlog/proto</outputDirectory>
                                <!--设置是否在生成java文件之前清空outputDirectory的文件,默认值为true,设置为false时也会覆盖同名文件-->
    		                    <clearOutputDirectory>false</clearOutputDirectory>
    		                </configuration>
    		                <executions>
    		                    <execution>
    		                        <goals>
    		                            <goal>compile</goal>
    		                            <goal>test-compile</goal>
    		                        </goals>
    		                    </execution>
    		                </executions>
    		            </plugin>
    		
    		        </plugins>
    		    </build>
    
    

    4, 将binlog.proto,descriptor.proto,gogo.proto文件放在我们的项目模块中
    在这里插入图片描述

    5, maven clean,compile之后,就会生成解析出来的binlog对应的java文件
    在这里插入图片描述
    6,编代码进一步解析,示例

    
    这里要注意这几点,在kafka消费中,序列化必须是ByteArrayDeserializer,否则解析不出来。
    
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,                   "127.0.0.1:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG ,"binlog_consumer") ;
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,                 ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    
    
    package com.zhangna.tidb.binlog.service;
    
    import com.zhangna.tidb.binlog.util.BaseException;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * @Author zhangna
     * @Date 2019-12-03 20:30
     * @Description
     */
    public interface ParseBinLogService {
    
    	List<Map< String, Object >> parseBinLog2Data(byte[] record) throws BaseException;
    
    }
    
    
    package com.zhangna.tidb.binlog.service;
    
    import com.google.common.collect.Maps;
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.zhangna.tidb.binlog.enums.TableEnums;
    import com.zhangna.tidb.binlog.proto.BinLogInfo;
    import com.zhangna.tidb.binlog.util.BaseException;
    import com.zhangna.tidb.binlog.util.ServiceUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections4.CollectionUtils;
    import org.springframework.stereotype.Service;
    
    import java.util.*;
    
    /**
     * @Author zhangna
     * @Date 2019-12-03 20:30
     * @Description
     */
    @Service
    @Slf4j
    public class ParseBinLogServiceImpl implements ParseBinLogService {
    
    	@Override
    	public List<Map< String, Object >> parseBinLog2Data(byte[] record) throws BaseException {
    
    		return ServiceUtil.execute(() -> parseTable(record),
    				throwable -> log.error("parse binlog is failed.;t:{}",throwable));
    
    	}
    
    
    	private List<Map< String, Object >> parseTable(byte[] record){
    
    		if(Objects.isNull(record)){
    			return null;
    		}
    
    
    		BinLogInfo.Binlog binlog = null;
    
    		try {
    			binlog = BinLogInfo.Binlog.parseFrom(record);
    		} catch (InvalidProtocolBufferException e) {
    			throw new BaseException("parse binlog is failed");
    		}
    		if(Objects.isNull(binlog)){
    			return null;
    		}
    		BinLogInfo.DMLData dmlData = binlog.getDmlData();
    		if(Objects.isNull(dmlData)){
    			return null;
    		}
    
    		List< BinLogInfo.Table > tablesList = dmlData.getTablesList();
    
    		List< String > tableNames = TableEnums.getTableNames();
    
    		List<Map< String, Object >> rs= new ArrayList<>();
    		for( BinLogInfo.Table table : tablesList){
    			String tableName = table.getTableName();
    			log.info("tableName:{}",tableName);
    			if(!tableNames.contains(tableName)){
    				continue;
    			}
    
    			List< Map< String, Object > > result = this.getResult(table);
    			rs.addAll(result);
    		}
    
    		return rs;
    
    	}
    
    	private List<String> parseColumn(BinLogInfo.Table table){
    
    		List< BinLogInfo.ColumnInfo > columnInfoList = table.getColumnInfoList();
    		if(CollectionUtils.isEmpty(columnInfoList)){
    			return Collections.EMPTY_LIST;
    		}
    		List<String> columnKeys = new ArrayList<>();
    		columnInfoList.forEach(columnInfo -> {
    			columnKeys.add(columnInfo.getName());
    		});
    
    		return columnKeys;
    
    	}
    
    	private List< BinLogInfo.TableMutation > getTableMutation(BinLogInfo.Table table){
    
    		if(Objects.isNull(table)){
    			return null;
    		}
    		return table.getMutationsList();
    	}
    
    	private List<List<Object>> parseColumnValue(BinLogInfo.Table table) {
    
    		List<List<Object>> rs = new ArrayList<>();
    
    		List< BinLogInfo.TableMutation > tableMutations = getTableMutation(table);
    		if (CollectionUtils.isEmpty(tableMutations)) {
    			return Collections.EMPTY_LIST;
    		}
    
    		for (BinLogInfo.TableMutation tableMutation : tableMutations) {
    
    			BinLogInfo.Row rowList = tableMutation.getRow();
    			if (Objects.isNull(rowList)) {
    				continue;
    			}
    
    
    			List< BinLogInfo.Column > columnsList = rowList.getColumnsList();
    			if (CollectionUtils.isEmpty(columnsList)) {
    				continue;
    			}
    
    			List< Object > columnValues = new ArrayList<>();
    			columnsList.forEach(column -> {
    				String stringValue = column.getStringValue();
    				long int64Value = column.getInt64Value();
    				if (int64Value != 0) {
    					columnValues.add(int64Value);
    				}
    				if (null != stringValue && int64Value == 0) {
    					columnValues.add(stringValue);
    				}
    			});
    
    			rs.add(columnValues);
    
    		}
    		return rs;
    	}
    
    
    	private String getOperatorType(BinLogInfo.Table table){
    		List< BinLogInfo.TableMutation > tableMutations = getTableMutation(table);
    
    		if (CollectionUtils.isEmpty(tableMutations)){
    			return null;
    		}
    
    		BinLogInfo.TableMutation tableMutation = tableMutations.get(0);
    		BinLogInfo.MutationType type = tableMutation.getType();
    		if (Objects.isNull(type)){
    			return null;
    		}
    
    		return type.toString();
    	}
    
    	private List<Map< String, Object >> getResult(BinLogInfo.Table table) {
    
    		List<Map< String, Object >> rs = new ArrayList<>();
    
    		List< String > columnKeys = parseColumn(table);
    		List< List< Object > > columnValues = parseColumnValue(table);
    
    
    		if (CollectionUtils.isEmpty(columnKeys) || CollectionUtils.isEmpty(columnValues) ) {
    			return null;
    		}
    
    		Map< Integer, String > tempFeildMap = Maps.newHashMapWithExpectedSize(columnKeys.size());
    
    		String operatorType = getOperatorType(table);
    
    		for (String columnKey : columnKeys) {
    			tempFeildMap.put(columnKeys.indexOf(columnKey), columnKey);
    		}
    
    		Map< String, Object > resultMap = new HashMap<>();
    
    		for(List< Object > objects : columnValues){
    			for(int i= 0; i < objects.size(); i++ ){
    				resultMap.put(tempFeildMap.get(i), objects.get(i));
    			}
    			resultMap.put("operatorType",operatorType);
    			rs.add(resultMap);
    
    		}
    
    		return rs;
    	}
    
    
    
    }
    
    

    github代码示例:https://github.com/zhangna-java/sunshine.git

    展开全文
  • Maven 配置com.github.shyikomysql-binlog-connector-java0.13.0Java文件import com.github.shyiko.mysql.binlog.BinaryLogFileReader;import com.github.shyiko.mysql.binlog.event.Event;import ...

    Maven 配置

    com.github.shyiko

    mysql-binlog-connector-java

    0.13.0

    Java文件

    import com.github.shyiko.mysql.binlog.BinaryLogFileReader;

    import com.github.shyiko.mysql.binlog.event.Event;

    import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;

    import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

    import java.io.File;

    import java.io.IOException;

    class Sample {

    public static void main(String[] args) throws IOException {

    String filePath="D:\\DATA\\mysql-bin.000987";

    File binlogFile = new File(filePath);

    EventDeserializer eventDeserializer = new EventDeserializer();

    eventDeserializer.setChecksumType(ChecksumType.CRC32);

    BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);

    try {

    for (Event event; (event = reader.readEvent()) != null; ) {

    System.out.println(event.toString());

    }

    } finally {

    reader.close();

    }

    }

    }

    执行日志

    Event{header=EventHeaderV4{timestamp=1501376071000, eventType=TABLE_MAP, serverId=3366725048, headerLength=19, dataLength=43, nextPosition=494953, flags=0}, data=TableMapEventData{tableId=72, database='mysql', table='ha_health_check', columnTypes=8, -2, columnMetadata=0, 65027, columnNullability={0}}}

    Event{header=EventHeaderV4{timestamp=1501376071000, eventType=UPDATE_ROWS, serverId=3366725048, headerLength=19, dataLength=37, nextPosition=495009, flags=0}, data=UpdateRowsEventData{tableId=72, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[

    {before=[1501376059857, m], after=[1501376071917, m]}

    ]}}

    Frequently Asked Questions

    Q. How does a typical transaction look like?

    A. GTID event (if gtid_mode=ON) -> QUERY event with "BEGIN" as sql -> ... -> XID event | QUERY event with "COMMIT" or "ROLLBACK" as sql.

    Q. EventData for inserted/updated/deleted rows has no information about table (except for some weird id). How do I make sense out of it?

    A. Each WriteRowsEventData/UpdateRowsEventData/DeleteRowsEventData event is preceded by TableMapEventData which contains schema & table name. If for some reason you need to know column names (types, etc). - the easiest way is to

    select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, COLUMN_DEFAULT, IS_NULLABLE,

    DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, CHARACTER_OCTET_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE,

    CHARACTER_SET_NAME, COLLATION_NAME from INFORMATION_SCHEMA.COLUMNS;

    # see https://dev.mysql.com/doc/refman/5.6/en/columns-table.html for more information

    (yes, binary log DOES NOT include that piece of information).

    You can find JDBC snippet here.

    Connection connection = ...

    DatabaseMetaData metaData = connection.getMetaData();

    ResultSet tableResultSet = metaData.getTables(null, "public", null, new String[]{"TABLE"});

    try {

    while (tableResultSet.next()) {

    String tableName = tableResultSet.getString("TABLE_NAME");

    ResultSet columnResultSet = metaData.getColumns(null, "public", tableName, null);

    try {

    while (columnResultSet.next()) {

    String columnName = columnResultSet.getString("COLUMN_NAME");

    ...

    }

    } finally {

    columnResultSet.close();

    }

    }

    } finally {

    tableResultSet.close();

    }

    展开全文
  • java解析 mysql binlog

    千次阅读 2018-06-06 19:30:12
    在进入正题之前,我们需要知道binlog的event的类型,先来看看自己binlog文件有哪些?其中红色部分为event_type。binlog event 的类型有很多,具体可以参见mysql官方文档:...
  • java 解析mysql 的binlog 日志文件

    千次阅读 2018-03-05 10:42:05
    原文链接:https://ask.hellobi.com/blog/cimen/9133 Maven 配置 &lt;dependency&gt; &lt;groupId&gt;com.github.shyiko&...mysql-binlog-connector-java&lt;/artifactId&gt; ...
  • Maven 配置com.github.shyikomysql-binlog-connector-java0.13.0Java文件import com.github.shyiko.mysql.binlog.BinaryLogFileReader;import com.github.shyiko.mysql.binlog.event.Event;import ...
  • MySQL 的 binlog 日志文件,记录了数据库表的全部修改操作。本文简单整理 MySQL binlog 相关知识,以及如何使用 binlog 恢复或闪回数据库数据。STATEMENT 格式的 binlog要想开启 binlog,需要在启动 MySQL 时传入 --...
  • Mysql Binlog 文件读取解析

    千次阅读 2019-04-16 10:07:22
    Maven 配置 <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.13.0</version>...
  • 执行java -jar plumber-1.0-SNAPSHOT.jar 配置文件说明 //配置文件 plumber.json。将文件放在jar对应的user.dir目录下 { //数据来源 " dataSource " : { " host " : " 127.0.0.1 " , " port " :
  • 原文:...MySQL 的 binlog 日志文件,记录了数据库表的全部修改操作。本文简单整理 MySQL binlog 相关知识,以及如何使用 binlog 恢复或闪回数据库数据。 STATEMENT 格式的 binlog 要想开启 binl...
  • Canal是阿里开源的一个比较有名的Java中间件,主要作用是接入数据库(MySQL)的binlog日志,实现数据的增量订阅、解析与消费,即CDC(Change Data Capture)。近期我们计划将数据仓库由基于Sqoop的离线按天入库方式...
  • 中间件---Binlog传输同步---Maxwell

    千次阅读 2019-02-17 15:04:41
    Maxwell 是java语言编写的能够读取、解析MySQL binlog,将行更新以json格式发送到 Kafka、RabbitMQ、AWS Kinesis、Google Cloud Pub/Sub、文件,有了增量的数据流,可以想象的应用场景实在太多了,如ETL、维护缓存、...
  • 自建Binlog订阅服务 —— Maxwell

    千次阅读 2018-10-19 10:17:58
    Maxwell 是java语言编写的能够读取、解析MySQL binlog,将行更新以json格式发送到 Kafka、RabbitMQ、AWS Kinesis、Google Cloud Pub/Sub、文件,有了增量的数据流,可以想象的应用场景实在太多了,如ETL、维护缓存、...
  • 1、Binlog是mysql数据库的操作日志,当有发生增删改查操作时,就会在data目录下生成一个log文件,形如mysql-bin.000001,mysql-bin.000002等格式 2、canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库...
  • canal java_canal使用小结

    2021-03-01 09:16:39
    一、基本概念mysql本身支持主从备份,原理就是主库master生成的binlog文件记录了所有的增删改操作语句,然后slave向master发送dump协议,master将binlog日志文件推送给从库slave解析执行,达到数据一致备份的目的。...
  • canal基本使用

    2018-12-10 09:38:09
    1、pom.xml文件中加入jar ...2、创建CanalClient.java --用来解析binlog文件变化信息 ·import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.Canal...
  • 从数据库通过I/O线程去请求主数据库的binlog日志文件(二进制日志,包含SQL的增删改查等,用来做备份恢复等),并写到中继日志中,SQL线程会读取中继日志,并解析成具体操作同步数据到从数据库。2. 读写分离:数据库...
  • canal使用小结

    2019-07-26 17:37:00
     mysql本身支持主从备份,原理就是主库master生成的binlog文件记录了所有的增删改操作语句,然后slave向master发送dump协议,master将binlog日志文件推送给从库slave解析执行,达到数据一致备份的目的。  canal,...

空空如也

空空如也

1 2
收藏数 40
精华内容 16
关键字:

java解析binlog文件

java 订阅