精华内容
下载资源
问答
  • 自定义数据源 Druid 引入数据源 切换 Druid 数据源 配置 Druid 数据源参数 配置 Druid 数据源监控 配置 Druid 后台管理 Servlet 配置 Druid web 监控 filter DRUID 简介 1、Druid 是阿里巴巴开源平台上一个...

    目录

    DRUID 数据源概述

    自定义数据源 Druid

    引入 Druid 数据源

    切换 Druid 数据源

    配置 Druid 数据源参数

    配置 Druid 数据源监控

    配置 Druid 后台管理 Servlet

    配置 Druid web 监控 filter


    DRUID 数据源概述

    1、Druid 是阿里巴巴开源平台上一个数据库连接池实现,结合了 C3P0、DBCP、PROXOOL 等 DB 池的优点,同时加入了日志监控

    2、Druid 可以很好的监控 DB 池连接和 SQL 的执行情况,天生就是针对监控而生的 DB 连接池

    3、《Spring Boot 默认数据源 HikariDataSource 与 JdbcTemplate》中已经介绍 Spring Boot 2.0 以上默认使用 Hikari 数据源,可以说 Hikari 与 Driud 都是当前 Java Web 上最优秀的数据源

    4、本文承接《Spring Boot 默认数据源 HikariDataSource 与 JdbcTemplate》,重点介绍 Spring Boot 如何集成 Druid 数据源,如何实现数据库监控

    com.alibaba.druid.pool.DruidDataSource 基本配置参数
    配置 缺省值 说明
    name   配置这个属性的意义在于,如果存在多个数据源,监控的时候可以通过名字来区分开来。 
    如果没有配置,将会生成一个名字,格式是:"DataSource-" + System.identityHashCode(this)
    jdbcUrl   连接数据库的url,不同数据库不一样。例如: 
    mysql : jdbc:mysql://10.20.153.104:3306/druid2 
    oracle : jdbc:oracle:thin:@10.20.149.85:1521:ocnauto
    username   连接数据库的用户名
    password   连接数据库的密码。如果你不希望密码直接写在配置文件中,可以使用ConfigFilter。详细看这里:https://github.com/alibaba/druid/wiki/%E4%BD%BF%E7%94%A8ConfigFilter
    driverClassName 根据url自动识别 这一项可配可不配,如果不配置druid会根据url自动识别dbType,然后选择相应的driverClassName(建议配置下)
    initialSize 0 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时
    maxActive 8 最大连接池数量
    maxIdle 8 已经不再使用,配置了也没效果
    minIdle   最小连接池数量
    maxWait   获取连接时最大等待时间,单位毫秒。配置了maxWait之后,缺省启用公平锁,并发效率会有所下降,如果需要可以通过配置useUnfairLock属性为true使用非公平锁。
    poolPreparedStatements false 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql下建议关闭。
    maxOpenPreparedStatements -1 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100
    validationQuery   用来检测连接是否有效的sql,要求是一个查询语句。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会其作用。
    testOnBorrow true 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
    testOnReturn false 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
    testWhileIdle false 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
    timeBetweenEvictionRunsMillis   有两个含义: 
    1) Destroy线程会检测连接的间隔时间2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
    numTestsPerEvictionRun   不再使用,一个DruidDataSource只支持一个EvictionRun
    minEvictableIdleTimeMillis    
    connectionInitSqls   物理连接初始化的时候执行的sql
    exceptionSorter 根据dbType自动识别 当数据库抛出一些不可恢复的异常时,抛弃连接
    filters   属性类型是字符串,通过别名的方式配置扩展插件,常用的插件有: 
    监控统计用的filter:stat日志用的filter:log4j防御sql注入的filter:wall
    proxyFilters  

    类型是List<com.alibaba.druid.filter.Filter>,如果同时配置了filters和proxyFilters,是组合关系,并非替换关系

    自定义数据源 Druid

    引入 Druid 数据源

    1、第一步需要在应用的 pom.xml 文件中添加上 Druid 数据源依赖,可以从 Maven 仓库官网 Maven Repository 中获取.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>www.wmx.com.horse</groupId>
        <artifactId>horse</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>horse</name>
        <description>Demo project for Spring Boot</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.4.RELEASE</version>
            <relativePath/>
            <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <!-- 引入Spring封装的jdbc-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
    
            <!-- 引入html模板引擎Thymeleaf-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
    
            <!-- 因为web项目启动模块-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!-- 引入mysql数据库连接驱动-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <scope>runtime</scope>
            </dependency>
    
            <!-- 引入 Druid 数据源依赖:https://mvnrepository.com/artifact/com.alibaba/druid -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.1.9</version>
            </dependency>
    
            <!-- 引入Spring Boot 测试模块-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <!-- Spring Boot 打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    切换 Druid 数据源

    1、《Spring Boot 默认数据源 HikariDataSource 与 JdbcTemplate中已经说过 Spring Boot 2.0 以上默认使用 com.zaxxer.hikari.HikariDataSource 数据源

    2、但可以 通过 spring.datasource.type 指定数据源,可以从 Spring Boot 官方文档 查看:

    spring.datasource.type= # Fully qualified name of the connection pool implementation to use. By default, it is auto-detected from the classpath.

    spring:
      datasource:
        username: root
        password: root
        url: jdbc:mysql://192.168.58.129:3307/horse?characterEncoding=UTF-8
        driver-class-name: com.mysql.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource

    高版本的 spring boot 搭配 mysql 驱动版本较高时,如 mysql-connector-java:8.0.16,此时 driver-class-name 的值要带 cj;url 的值要带时区 serverTimezone,如:
    url: jdbc:mysql://127.0.0.1:3306/test?characterEncoding=UTF-8&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver

    3、数据源切换之后,同理可以注入 DataSource,然后获取到它,输出一看便知是否成功切换:

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import javax.annotation.Resource;
    import javax.sql.DataSource;
    import java.sql.Connection;
    import java.sql.SQLException;
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class HorseApplicationTests {
        /**
         * Spring Boot 默认已经配置好了数据源,程序员可以直接 DI 注入然后使用即可
         */
        @Resource
        DataSource dataSource;
        @Test
        public void contextLoads() throws SQLException {
            System.out.println("数据源>>>>>>" + dataSource.getClass());
            Connection connection = dataSource.getConnection();
    
            System.out.println("连接>>>>>>>>>" + connection);
            System.out.println("连接地址>>>>>" + connection.getMetaData().getURL());
            connection.close();
        }
    }
    

    4、如下所示 数据源切换成功,Druid 数据源切换成功之后,便是要考虑设置它的参数,就如同以前 c3p0、dbcp 一样需要设置数据源连接初始化大小、最大连接数、等待时间、最小连接数、以及数据库监控 等等。

    2018-08-20 08:54:08.276  INFO 8128 --- [           main] com.lct.www.HorseApplicationTests        : Started HorseApplicationTests in 3.181 seconds (JVM running for 4.892)
    数据源>>>>>>class com.alibaba.druid.pool.DruidDataSource
    2018-08-20 08:54:08.523  INFO 8128 --- [           main] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} inited
    连接>>>>>>>>>com.mysql.jdbc.JDBC4Connection@7026b7ee
    连接地址>>>>>jdbc:mysql://192.168.58.129:3307/horse?characterEncoding=UTF-8
    2018-08-20 08:54:08.897  INFO 8128 --- [       Thread-2] o.s.w.c.s.GenericWebApplicationContext   : Closing org.springframework.web.context.support.GenericWebApplicationContext@7e990ed7: startup date [Mon Aug 20 08:54:05 CST 2018]; root of context hierarchy
    2018-08-20 08:54:08.904  INFO 8128 --- [       Thread-2] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} closed

    Process finished with exit code 0

    配置 Druid 数据源参数

    1、如同以前 c3p0、dbcp 数据源可以设置数据源连接初始化大小、最大连接数、等待时间、最小连接数 等一样,Druid 数据源同理可以进行设置

    2、Druid 数据源参数配置在全局配置文件中即可,如下所示:

    spring:
      datasource:
        username: root
        password: root
        url: jdbc:mysql://192.168.58.129:3307/horse?characterEncoding=UTF-8
        driver-class-name: com.mysql.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource
    
    #上半区公共部分对应的是 org.springframework.boot.autoconfigure.jdbc.DataSourceProperties 中的属性
    
    #下半区属性对应的是 com.alibaba.druid.pool.DruidDataSource 中的属性,Spring Boot 默认是不注入不了这些属性值的,需要自己绑定
    #druid 数据源专有配置
        initialSize: 5
        minIdle: 5
        maxActive: 20
        maxWait: 60000
        timeBetweenEvictionRunsMillis: 60000
        minEvictableIdleTimeMillis: 300000
        validationQuery: SELECT 1 FROM DUAL
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        poolPreparedStatements: true
    #配置监控统计拦截的filters,stat:监控统计、log4j:日志记录、wall:防御sql注入
    #如果允许时报错  java.lang.ClassNotFoundException: org.apache.log4j.Priority
    #则导入 log4j 依赖即可,Maven 地址: https://mvnrepository.com/artifact/log4j/log4j
        filters: stat,wall,log4j
        maxPoolPreparedStatementPerConnectionSize: 20
        useGlobalDataSourceStat: true
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500

    3、下半区 Druid 数据源的专有属性对应的是 com.alibaba.druid.pool.DruidDataSource 中的属性,虽然切换为 Druid 数据源之后,Spring Boot 会自动生成 DruidDataSource 并放入容器中供程序员使用,但是它并不会自动绑定配置文件的参

    4、所以需要程序员自己为 com.alibaba.druid.pool.DruidDataSource 绑定全局配置文件中的参数,再添加到容器中,而不再使用 Spring Boot 的自动生成了

    5、如下所示,自己添加 DruidDataSource 组件到容器中,并绑定属性:

    import com.alibaba.druid.pool.DruidDataSource;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.sql.DataSource;
    /**
     * Created by Administrator on 2018/8/20 0020.
     * Druid 数据源配置类
     */
    @Configuration
    public class DruidConfig {
        /**
         * 将自定义的 Druid 数据源添加到容器中,不再让 Spring Boot 自动创建
         * 这样做的目的是:绑定全局配置文件中的 druid 数据源属性到 com.alibaba.druid.pool.DruidDataSource
         * 从而让它们生效
         * @ConfigurationProperties(prefix = "spring.datasource"):作用就是将 全局配置文件中 前缀为 spring.datasource
         * 的属性值注入到 com.alibaba.druid.pool.DruidDataSource 的同名参数中
         *
         * @return
         */
        @ConfigurationProperties(prefix = "spring.datasource")
        @Bean
        public DataSource druidDataSource() {
            return new DruidDataSource();
        }
    }
    

    6、对于 @ConfigurationProperties 绑定配置文件参数不熟悉时,可以参考《Spring Boot 全局配置文件

    7、现在可以获取容器中的 DataSource 转为 DruidDataSource ,然后取值看配置文件中的参数是否已经生效,也可以直接 Debug。

    import com.alibaba.druid.pool.DruidDataSource;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import javax.annotation.Resource;
    import javax.sql.DataSource;
    import java.sql.Connection;
    import java.sql.SQLException;
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class HorseApplicationTests {
        /**
         * Spring Boot 默认已经配置好了数据源,程序员可以直接 DI 注入然后使用即可
         */
        @Resource
        DataSource dataSource;
        @Test
        public void contextLoads() throws SQLException {
            System.out.println("数据源>>>>>>" + dataSource.getClass());
            Connection connection = dataSource.getConnection();
    
            System.out.println("连接>>>>>>>>>" + connection);
            System.out.println("连接地址>>>>>" + connection.getMetaData().getURL());
    
            DruidDataSource druidDataSource = (DruidDataSource) dataSource;
            System.out.println("druidDataSource 数据源最大连接数:" + druidDataSource.getMaxActive());
            System.out.println("druidDataSource 数据源初始化连接数:" + druidDataSource.getInitialSize());
    
            connection.close();
        }
    }
    

    8、控制台输出如下,可见配置参数已经生效:

    2018-08-20 10:21:48.498  INFO 18284 --- [           main] com.lct.www.HorseApplicationTests        : Started HorseApplicationTests in 3.494 seconds (JVM running for 4.763)
    数据源>>>>>>class com.alibaba.druid.pool.DruidDataSource
    log4j:WARN No appenders could be found for logger (druid.sql.Connection).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    2018-08-20 10:21:48.921  INFO 18284 --- [           main] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} inited
    连接>>>>>>>>>com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@37d871c2
    连接地址>>>>>jdbc:mysql://192.168.58.129:3307/horse?characterEncoding=UTF-8
    druidDataSource 数据源最大连接数:20
    druidDataSource 数据源初始化连接数:5
    2018-08-20 10:21:48.936  INFO 18284 --- [       Thread-2] o.s.w.c.s.GenericWebApplicationContext   : Closing org.springframework.web.context.support.GenericWebApplicationContext@58e1d9d: startup date [Mon Aug 20 10:21:45 CST 2018]; root of context hierarchy
    2018-08-20 10:21:48.948  INFO 18284 --- [       Thread-2] com.alibaba.druid.pool.DruidDataSource   : {dataSource-1} closed

    Process finished with exit code 0

    配置 Druid 数据源监控

    配置 Druid 后台管理 Servlet

    1、Druid 数据源具有监控的功能,并提供了一个 web 界面方便用户查看,类似安装 路由器 时,人家也提供了一个默认的 web 页面。

    2、所以第一步需要设置 Druid 的后台管理页面,比如 登录账号、密码 等

    3、配置一个管理后台的 Servlet,因为使用的是内置 Servlet 容器,所以可以参考《 Web 项目 tiger 之12 注册 Servlet 三大组件之 Servlet

    import com.alibaba.druid.pool.DruidDataSource;
    import com.alibaba.druid.support.http.StatViewServlet;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.web.servlet.ServletRegistrationBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.sql.DataSource;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * Created by Administrator on 2018/8/20 0020.
     * Druid 数据源配置类
     */
    @Configuration
    public class DruidConfig {
        /**
         * 将自定义的 Druid 数据源添加到容器中,不再让 Spring Boot 自动创建
         * 这样做的目的是:绑定全局配置文件中的 druid 数据源属性到 com.alibaba.druid.pool.DruidDataSource
         * 从而让它们生效
         *
         * @return
         * @ConfigurationProperties(prefix = "spring.datasource"):作用就是将 全局配置文件中 前缀为 spring.datasource
         * 的属性值注入到 com.alibaba.druid.pool.DruidDataSource 的同名参数中
         */
        @ConfigurationProperties(prefix = "spring.datasource")
        @Bean
        public DataSource druidDataSource() {
            return new DruidDataSource();
        }
        /**
         * 配置 Druid 监控 之  管理后台的 Servlet
         * 内置 Servler 容器时没有web.xml 文件,所以使用 Spring Boot 的注册 Servlet 方式
         */
        @Bean
        public ServletRegistrationBean statViewServlet() {
            ServletRegistrationBean bean = new ServletRegistrationBean(new StatViewServlet(),
                    "/druid/*");
    
            /**
             * loginUsername:Druid 后台管理界面的登录账号
             * loginPassword:Druid 后台管理界面的登录密码
             * allow:Druid 后台允许谁可以访问
             *      initParams.put("allow", "localhost"):表示只有本机可以访问
             *      initParams.put("allow", ""):为空或者为null时,表示允许所有访问
             * deny:Druid 后台拒绝谁访问
             *      initParams.put("deny", "192.168.1.20");表示禁止此ip访问
             */
            Map<String, String> initParams = new HashMap<>();
            initParams.put("loginUsername", "admin");
            initParams.put("loginPassword", "123456");
            initParams.put("allow", "");
            /*initParams.put("deny", "192.168.1.20");*/
    
            /** 设置初始化参数*/
            bean.setInitParameters(initParams);
            return bean;
        }
    }
    

    4、这些参数可以在 com.alibaba.druid.support.http.StatViewServlet 的父类 com.alibaba.druid.support.http.ResourceServlet 中找到

    5、运行应用,测试结果

    配置 Druid web 监控 filter

    1、这个过滤器的作用就是统计 web 应用请求中所有的数据库信息,比如 发出的 sql 语句,sql 执行的时间、请求次数、请求的 url 地址、以及seesion 监控、数据库表的访问次数 等等。

    import com.alibaba.druid.pool.DruidDataSource;
    import com.alibaba.druid.support.http.StatViewServlet;
    import com.alibaba.druid.support.http.WebStatFilter;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.web.servlet.FilterRegistrationBean;
    import org.springframework.boot.web.servlet.ServletRegistrationBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import javax.sql.DataSource;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * Created by Administrator on 2018/8/20 0020.
     * Druid 数据源配置类
     */
    @Configuration
    public class DruidConfig {
        /**
         * 将自定义的 Druid 数据源添加到容器中,不再让 Spring Boot 自动创建
         * 这样做的目的是:绑定全局配置文件中的 druid 数据源属性到 com.alibaba.druid.pool.DruidDataSource
         * 从而让它们生效
         *
         * @return
         * @ConfigurationProperties(prefix = "spring.datasource"):作用就是将 全局配置文件中 前缀为 spring.datasource
         * 的属性值注入到 com.alibaba.druid.pool.DruidDataSource 的同名参数中
         */
        @ConfigurationProperties(prefix = "spring.datasource")
        @Bean
        public DataSource druidDataSource() {
            return new DruidDataSource();
        }
        /**
         * 配置 Druid 监控 之  管理后台的 Servlet
         * 内置 Servler 容器时没有web.xml 文件,所以使用 Spring Boot 的注册 Servlet 方式
         */
        @Bean
        public ServletRegistrationBean statViewServlet() {
            ServletRegistrationBean bean = new ServletRegistrationBean(new StatViewServlet(),
                    "/druid/*");
    
            /**
             * loginUsername:Druid 后台管理界面的登录账号
             * loginPassword:Druid 后台管理界面的登录密码
             * allow:Druid 后台允许谁可以访问
             *      initParams.put("allow", "localhost"):表示只有本机可以访问
             *      initParams.put("allow", ""):为空或者为null时,表示允许所有访问
             * deny:Druid 后台拒绝谁访问
             *      initParams.put("deny", "192.168.1.20");表示禁止此ip访问
             */
            Map<String, String> initParams = new HashMap<>();
            initParams.put("loginUsername", "admin");
            initParams.put("loginPassword", "123456");
            initParams.put("allow", "");
            /*initParams.put("deny", "192.168.1.20");*/
    
            /** 设置初始化参数*/
            bean.setInitParameters(initParams);
            return bean;
        }
        /**
         * 配置 Druid 监控 之  web 监控的 filter
         * WebStatFilter:用于配置Web和Druid数据源之间的管理关联监控统计
         */
        @Bean
        public FilterRegistrationBean webStatFilter() {
            FilterRegistrationBean bean = new FilterRegistrationBean();
            bean.setFilter(new WebStatFilter());
    
            /** exclusions:设置哪些请求进行过滤排除掉,从而不进行统计*/
            Map<String, String> initParams = new HashMap<>();
            initParams.put("exclusions", "*.js,*.css,/druid/*");
            bean.setInitParameters(initParams);
            
            /** "/*" 表示过滤所有请求*/
            bean.setUrlPatterns(Arrays.asList("/*"));
            return bean;
        }
    }
    

    2、运行应用,可以往后台发一些请求,同时进行一些 数据库 sql 操作,测试结果效果如下:

     

     

    展开全文
  • JDBC之自定义数据源

    千次阅读 2018-08-29 20:53:25
    实现一个自定义的数据源需要实现javax.sql.DataSource接口,并重写获取连接的...以下代码采用了装饰和适配两种设计模式来完成自定义数据源。 说明:本文JDBCUtils工具类在https://blog.csdn.net/qq_15076569/arti...

    实现一个自定义的数据源需要实现javax.sql.DataSource接口,并重写获取连接的getConnection()方法,但是调用close方法关闭资源时不能去关闭资源,而是把资源归还给连接池,所以还需重写close方法。以下代码采用了装饰和适配两种设计模式来完成自定义数据源。

    说明:本文JDBCUtils工具类在https://blog.csdn.net/qq_15076569/article/details/82191167已经附上源码

    一:编写MyDataSource实现DataSource接口

    public class MyDataSource implements DataSource{
    
        private List<Connection> list = new ArrayList<>();
    
        public MyDataSource(){
            for (int i=0;i<10;i++){
                Connection conn = JDBCUtils.getConnection();
                MyConnection myConnection = new MyConnection(conn, list);
                list.add(myConnection);
            }
        }
    
        @Override
        public Connection getConnection() throws SQLException {
            if(list != null && !list.isEmpty()){
                return list.remove(0);
            }
            return null;
        }
    
        /**
         * 归还连接对象
         * @param conn:连接对象
         */
        public void putSource(Connection conn){
            list.add(conn);
        }
    
        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return null;
        }
    
        @Override
        public <T> T unwrap(Class<T> iface) throws SQLException {
            return null;
        }
    
        @Override
        public boolean isWrapperFor(Class<?> iface) throws SQLException {
            return false;
        }
    
        @Override
        public PrintWriter getLogWriter() throws SQLException {
            return null;
        }
    
        @Override
        public void setLogWriter(PrintWriter out) throws SQLException {
    
        }
    
        @Override
        public void setLoginTimeout(int seconds) throws SQLException {
    
        }
    
        @Override
        public int getLoginTimeout() throws SQLException {
            return 0;
        }
    
        @Override
        public Logger getParentLogger() throws SQLFeatureNotSupportedException {
            return null;
        }
    }

    二:创建装饰类MyConnection,该装饰类需要实现Connection接口,但该接口方法众多,可以采用适配器模式编写一个适配类ConnectionWrapper来实现Connection接口,该装饰类直接继承ConnectionWrapper,重写getConnection及close方法

    /**
     *适配类
     */
    public class ConnectionWrapper implements Connection {
        private Connection conn;
        public ConnectionWrapper(Connection conn){
            super();
            this.conn = conn;
        }
        @Override
        public Statement createStatement() throws SQLException {
            return conn.createStatement();
        }
    
        @Override
        public PreparedStatement prepareStatement(String sql) throws SQLException {
            return conn.prepareStatement(sql);
        }

             .....

    }

    /**
     * 装饰类
     */
    public class MyConnection extends ConnectionWrapper {
    
        private Connection conn = null;
        private List<Connection> list = null;
        public MyConnection(Connection conn, List<Connection> list){
            super(conn);
            this.conn = conn;
            this.list = list;
        }
    
        @Override
        public void close() throws SQLException {
            list.add(this);
            System.out.println("归还了...");
        }
    
    }

    三:使用自定义数据源

    Connection conn = null;
    Statement st = null;
    ResultSet rs = null;
    MyDataSource mc = new MyDataSource();
    try {
        conn = mc.getConnection();
        conn.setAutoCommit(false);
        st = conn.createStatement();
        st.addBatch("update users set name = 'wangwu3' where id = '3232423425sdsfs'");
        int[] ints = st.executeBatch();
        conn.commit();
        System.out.println(Arrays.toString(ints));
    } catch (SQLException e) {
        try {
            conn.rollback();
        } catch (SQLException e1) {
            e1.printStackTrace();
        }
        e.printStackTrace();
    }finally {
        JDBCUtils.closeResource(rs,st,conn);
    }

    展开全文
  • 实现自定义数据源 import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import scala.util.Random //自定义数据类型 case class WaterSensor...

    实现自定义数据源

    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala._
    import scala.util.Random
    
    //自定义数据类型
    case class WaterSensor(id:String,ts:Long,vc:Double)
    //创建类并继承SourceFunction指定类型
    class MySource extends SourceFunction[WaterSensor]{
      val flag=true
      //重写run方法
      override def run(sourceContext: SourceFunction.SourceContext[WaterSensor]): Unit = {
        while(flag){
          sourceContext.collect(WaterSensor("sensor_"+new Random().nextInt(3),16092345345623L,new Random().nextInt(5)+40))
          //睡眠1秒
          Thread.sleep(1000)
        }
      }
      override def cancel(): Unit = ???
    }
    
    object SourceMy{
      def main(args: Array[String]): Unit = {
      //创建流处理执行环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //获取自定义数据源中数据
        val mydefDStream: DataStream[WaterSensor] = env.addSource(new MySource)
        //输出获取的数据
        mydefDStream.print()
        env.execute("MySource"}
    }
    
    

    启动后输出数据
    在这里插入图片描述

    展开全文
  • StructuredStreaming 内置数据源及实现自定义数据源 版本说明: Spark:2.3/2.4 代码仓库:https://github.com/shirukai/spark-structured-datasource.git 1 Structured内置的输入源 Source 官网文档:...

    StructuredStreaming 内置数据源及实现自定义数据源

    版本说明:

    Spark:2.3/2.4

    代码仓库:https://github.com/shirukai/spark-structured-datasource.git

    1 Structured内置的输入源 Source

    官网文档:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

    Source Options Fault-tolerant Notes
    File Source maxFilesPerTrigger:每个触发器中要考虑的最大新文件数(默认值:无最大值)latestFirst:是否先处理最新的新文件,当存在大量积压的文件时有用(默认值:false)
    fileNameOnly:是否基于以下方法检查新文件只有文件名而不是完整路径(默认值:false)。
    支持容错 支持glob路径,但不支持以口号分割的多个路径
    Socket Source host:要连接的主机,必须指定
    port:要连接的端口,必须指定
    不支持容错
    Rate Source rowsPerSecond(例如100,默认值:1):每秒应生成多少行。
    rampUpTime(例如5s,默认值:0s):在生成速度变为之前加速多长时间rowsPerSecond。使用比秒更精细的粒度将被截断为整数秒。numPartitions(例如10,默认值:Spark的默认并行性):生成的行的分区号
    支持容错
    Kafka Source http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 支持容错

    1.1 File Source

    将目录中写入的文件作为数据流读取。支持的文件格式为:text、csv、json、orc、parquet

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

    val source = spark
      .readStream
      // Schema must be specified when creating a streaming source DataFrame.
      .schema(schema)
      // 每个trigger最大文件数量
      .option("maxFilesPerTrigger",100)
      // 是否首先计算最新的文件,默认为false
      .option("latestFirst",value = true)
      // 是否值检查名字,如果名字相同,则不视为更新,默认为false
      .option("fileNameOnly",value = true)
      .csv("*.csv")
    

    1.2 Socket Source

    从Socket中读取UTF8文本数据。一般用于测试,使用nc -lc 端口号 向Socket监听的端口发送数据。

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9090)
      .load()
    

    1.3 Rate Source

    以每秒指定的行数生成数据,每个输出行包含一个timestampvalue。其中timestamp是一个Timestamp含有信息分配的时间类型,并且valueLong包含消息的计数从0开始作为第一行类型。此源用于测试和基准测试。

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

        val rate = spark.readStream
          .format("rate")
          // 每秒生成的行数,默认值为1
          .option("rowsPerSecond", 10)
          .option("numPartitions", 10)
          .option("rampUpTime",0)
          .option("rampUpTime",5)
          .load()
    

    1.4 Kafka Source

    官网文档:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribePattern", "topic.*")
      .load()
    

    2 Structured 内置的输出源 Sink

    Sink Supported Output Modes Options Fault-tolerant Notes
    File Sink Append path:输出路径(必须指定) 支持容错(exactly-once) 支持分区写入
    Kafka Sink Append,Update,Complete See the Kafka Integration Guide 支持容错(at-least-once) Kafka Integration Guide
    Foreach Sink Append,Update,Complete None Foreach Guide
    ForeachBatch Sink Append,Update,Complete None Foreach Guide
    Console Sink Append,Update,Complete numRows:每次触发器打印的行数(默认值:20)
    truncate:是否过长时截断输出(默认值:true
    Memory Sink Append,Complete None 表名是查询的名字

    2.1 File Sink

    将结果输出到文件,支持格式parquet、csv、orc、json等

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

    val fileSink = source.writeStream
      .format("parquet")
      //.format("csv")
      //.format("orc")
     // .format("json")
      .option("path", "data/sink")
      .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString)
      .start()
    

    2.2 Console Sink

    将结果输出到控制台

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

        val consoleSink = source.writeStream
          .format("console")
          // 是否压缩显示
          .option("truncate", value = false)
          // 显示条数
          .option("numRows", 30)
          .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString)
          .start()
    
    

    2.3 Memory Sink

    将结果输出到内存,需要指定内存中的表名。可以使用sql进行查询

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

    
        val memorySink = source.writeStream
          .format("memory")
          .queryName("memorySinkTable")
          .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString)
          .start()
    
    
        new Thread(new Runnable {
          override def run(): Unit = {
            while (true) {
              spark.sql("select * from memorySinkTable").show(false)
              Thread.sleep(1000)
            }
          }
        }).start()
        memorySink.awaitTermination()
    

    2.4 Kafka Sink

    将结果输出到Kafka,需要将DataFrame转成key,value两列,或者topic、key、value三列

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

        import org.apache.spark.sql.functions._
        import spark.implicits._
        val kafkaSink = source.select(array(to_json(struct("*"))).as("value").cast(StringType),
          $"timestamp".as("key").cast(StringType)).writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString)
          .option("topic", "hiacloud-ts-dev")
          .start()
    

    2.5 ForeachBatch Sink(2.4)

    适用于对于一个批次来说应用相同的写入方式的场景。方法传入这个batch的DataFrame以及batchId。这个方法在2.3之后的版本才有而且仅支持微批模式。

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

        val foreachBatchSink = source.writeStream.foreachBatch((batchData: DataFrame, batchId) => {
          batchData.show(false)
        }).start()
    

    2.6 Foreach Sink

    Foreach 每一条记录,通过继承ForeachWriter[Row],实现open(),process(),close()方法。在open方法了我们可以获取一个资源连接,如MySQL的连接。在process里我们可以获取一条记录,并处理这条数据发送到刚才获取资源连接的MySQL中,在close里我们可以关闭资源连接。注意,foreach是对Partition来说的,同一个分区只会调用一次open、close方法,但对于每条记录来说,都会调用process方法。

    用例

    代码位置:org.apache.spark.sql.structured.datasource.example

        val foreachSink = source.writeStream
            .foreach(new ForeachWriter[Row] {
              override def open(partitionId: Long, version: Long): Boolean = {
                println(s"partitionId=$partitionId,version=$version")
                true
    
              }
    
              override def process(value: Row): Unit = {
                println(value)
              }
    
              override def close(errorOrNull: Throwable): Unit = {
                println("close")
              }
            })
          .start()
    
    

    3 自定义输入源

    某些应用场景下我们可能需要自定义数据源,如业务中,需要在获取KafkaSource的同时,动态从缓存中或者http请求中加载业务数据,或者是其它的数据源等都可以参考规范自定义。自定义输入源需要以下步骤:

    第一步:继承DataSourceRegister和StreamSourceProvider创建自定义Provider类

    第二步:重写DataSourceRegister类中的shotName和StreamSourceProvider中的createSource以及sourceSchema方法

    第三步:继承Source创建自定义Source类

    第四步:重写Source中的schema方法指定输入源的schema

    第五步:重写Source中的getOffest方法监听流数据

    第六步:重写Source中的getBatch方法获取数据

    第七步:重写Source中的stop方法用来关闭资源

    3.1 创建CustomDataSourceProvider类

    3.1.1 继承DataSourceRegister和StreamSourceProvider

    要创建自定义的DataSourceProvider必须要继承位于org.apache.spark.sql.sources包下的DataSourceRegister以及该包下的StreamSourceProvider。如下所示:

    class CustomDataSourceProvider extends DataSourceRegister
      with StreamSourceProvider
      with Logging {
          //Override some functions ……
      }
    

    3.1.2 重写DataSourceRegister的shotName方法

    该方法用来指定一个数据源的名字,用来想spark注册该数据源。如Spark内置的数据源的shotName:kafka

    、socket、rate等,该方法返回一个字符串,如下所示:

      /**
        * 数据源的描述名字,如:kafka、socket
        *
        * @return 字符串shotName
        */
      override def shortName(): String = "custom"
    

    3.1.3 重写StreamSourceProvider中的sourceSchema方法

    该方法是用来定义数据源的schema,可以使用用户传入的schema,也可以根据传入的参数进行动态创建。返回值是个二元组(shotName,scheam),代码如下所示:

      /**
        * 定义数据源的Schema
        *
        * @param sqlContext   Spark SQL 上下文
        * @param schema       通过.schema()方法传入的schema
        * @param providerName Provider的名称,包名+类名
        * @param parameters   通过.option()方法传入的参数
        * @return 元组,(shotName,schema)
        */
      override def sourceSchema(sqlContext: SQLContext,
                                schema: Option[StructType],
                                providerName: String,
                                parameters: Map[String, String]): (String, StructType) = (shortName(),schema.get)
    

    3.1.4 重写StreamSourceProvider中的createSource方法

    通过传入的参数,来实例化我们自定义的DataSource,是我们自定义Source的重要入口的地方

    /**
      * 创建输入源
      *
      * @param sqlContext   Spark SQL 上下文
      * @param metadataPath 元数据Path
      * @param schema       通过.schema()方法传入的schema
      * @param providerName Provider的名称,包名+类名
      * @param parameters   通过.option()方法传入的参数
      * @return 自定义source,需要继承Source接口实现
      **/
    
    override def createSource(sqlContext: SQLContext,
                              metadataPath: String,
                              schema: Option[StructType],
                              providerName: String,
                              parameters: Map[String, String]): Source = new CustomDataSource(sqlContext,parameters,schema)
    

    3.1.5 CustomDataSourceProvider.scala完整代码

    package org.apache.spark.sql.structured.datasource.custom
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.execution.streaming.{Sink, Source}
    import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types.StructType
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 17:49
      *       自定义Structured Streaming数据源
      *
      *       (1)继承DataSourceRegister类
      *       需要重写shortName方法,用来向Spark注册该组件
      *
      *       (2)继承StreamSourceProvider类
      *       需要重写createSource以及sourceSchema方法,用来创建数据输入源
      *
      *       (3)继承StreamSinkProvider类
      *       需要重写createSink方法,用来创建数据输出源
      *
      *
      */
    class CustomDataSourceProvider extends DataSourceRegister
      with StreamSourceProvider
      with StreamSinkProvider
      with Logging {
    
    
      /**
        * 数据源的描述名字,如:kafka、socket
        *
        * @return 字符串shotName
        */
      override def shortName(): String = "custom"
    
    
      /**
        * 定义数据源的Schema
        *
        * @param sqlContext   Spark SQL 上下文
        * @param schema       通过.schema()方法传入的schema
        * @param providerName Provider的名称,包名+类名
        * @param parameters   通过.option()方法传入的参数
        * @return 元组,(shotName,schema)
        */
      override def sourceSchema(sqlContext: SQLContext,
                                schema: Option[StructType],
                                providerName: String,
                                parameters: Map[String, String]): (String, StructType) = (shortName(),schema.get)
    
      /**
        * 创建输入源
        *
        * @param sqlContext   Spark SQL 上下文
        * @param metadataPath 元数据Path
        * @param schema       通过.schema()方法传入的schema
        * @param providerName Provider的名称,包名+类名
        * @param parameters   通过.option()方法传入的参数
        * @return 自定义source,需要继承Source接口实现
        **/
    
      override def createSource(sqlContext: SQLContext,
                                metadataPath: String,
                                schema: Option[StructType],
                                providerName: String,
                                parameters: Map[String, String]): Source = new CustomDataSource(sqlContext,parameters,schema)
    
    
      /**
        * 创建输出源
        *
        * @param sqlContext       Spark SQL 上下文
        * @param parameters       通过.option()方法传入的参数
        * @param partitionColumns 分区列名?
        * @param outputMode       输出模式
        * @return
        */
      override def createSink(sqlContext: SQLContext,
                              parameters: Map[String, String],
                              partitionColumns: Seq[String],
                              outputMode: OutputMode): Sink = new CustomDataSink(sqlContext,parameters,outputMode)
    }
    
    

    3.2 创建CustomDataSource类

    3.2.1 继承Source创建CustomDataSource类

    要创建自定义的DataSource必须要继承位于org.apache.spark.sql.sources包下的Source。如下所示:

    class CustomDataSource(sqlContext: SQLContext,
                           parameters: Map[String, String],
                           schemaOption: Option[StructType]) extends Source
      with Logging {
      //Override some functions ……
    }
    

    3.2.2 重写Source的schema方法

    指定数据源的schema,需要与Provider中的sourceSchema指定的schema保持一致,否则会报异常

      /**
        * 指定数据源的schema,需要与Provider中sourceSchema中指定的schema保持一直,否则会报异常
        * 触发机制:当创建数据源的时候被触发执行
        *
        * @return schema
        */
      override def schema: StructType = schemaOption.get
    

    3.2.3 重写Source的getOffset方法

    此方法是Spark不断的轮询执行的,目的是用来监控流数据的变化情况,一旦数据发生变化,就会触发getBatch方法用来获取数据。

      /**
        * 获取offset,用来监控数据的变化情况
        * 触发机制:不断轮询调用
        * 实现要点:
        * (1)Offset的实现:
        * 由函数返回值可以看出,我们需要提供一个标准的返回值Option[Offset]
        * 我们可以通过继承 org.apache.spark.sql.sources.v2.reader.streaming.Offset实现,这里面其实就是保存了个json字符串
        *
        * (2) JSON转化
        * 因为Offset里实现的是一个json字符串,所以我们需要将我们存放offset的集合或者case class转化重json字符串
        * spark里是通过org.json4s.jackson这个包来实现case class 集合类(Map、List、Seq、Set等)与json字符串的相互转化
        *
        * @return Offset
        */
      override def getOffset: Option[Offset] = ???
    

    3.2.4 重写Source的getBatch方法

    此方法是Spark用来获取数据的,getOffset方法检测的数据发生变化的时候,会触发该方法, 传入上一次触发时的end Offset作为当前batch的start Offset,将新的offset作为end Offset。

      /**
        * 获取数据
        *
        * @param start 上一个批次的end offset
        * @param end   通过getOffset获取的新的offset
        *              触发机制:当不断轮询的getOffset方法,获取的offset发生改变时,会触发该方法
        *
        *              实现要点:
        *              (1)DataFrame的创建:
        *              可以通过生成RDD,然后使用RDD创建DataFrame
        *              RDD创建:sqlContext.sparkContext.parallelize(rows.toSeq)
        *              DataFrame创建:sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
        * @return DataFrame
        */
      override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???
    

    3.2.5 重写Source的stop方法

    用来关闭一些需要关闭或停止的资源及进程

      /**
        * 关闭资源
        * 将一些需要关闭的资源放到这里来关闭,如MySQL的数据库连接等
        */
      override def stop(): Unit = ???
    

    3.2.6 CustomDataSource.scala完整代码

    package org.apache.spark.sql.structured.datasource.custom
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.execution.streaming.{Offset, Source}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SQLContext}
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 18:03
      *       自定义数据输入源:需要继承Source接口
      *       实现思路:
      *       (1)通过重写schema方法来指定数据输入源的schema,这个schema需要与Provider中指定的schema保持一致
      *       (2)通过重写getOffset方法来获取数据的偏移量,这个方法会一直被轮询调用,不断的获取偏移量
      *       (3) 通过重写getBatch方法,来获取数据,这个方法是在偏移量发生改变后被触发
      *       (4)通过stop方法,来进行一下关闭资源的操作
      *
      */
    class CustomDataSource(sqlContext: SQLContext,
                           parameters: Map[String, String],
                           schemaOption: Option[StructType]) extends Source
      with Logging {
    
      /**
        * 指定数据源的schema,需要与Provider中sourceSchema中指定的schema保持一直,否则会报异常
        * 触发机制:当创建数据源的时候被触发执行
        *
        * @return schema
        */
      override def schema: StructType = schemaOption.get
    
      /**
        * 获取offset,用来监控数据的变化情况
        * 触发机制:不断轮询调用
        * 实现要点:
        * (1)Offset的实现:
        * 由函数返回值可以看出,我们需要提供一个标准的返回值Option[Offset]
        * 我们可以通过继承 org.apache.spark.sql.sources.v2.reader.streaming.Offset实现,这里面其实就是保存了个json字符串
        *
        * (2) JSON转化
        * 因为Offset里实现的是一个json字符串,所以我们需要将我们存放offset的集合或者case class转化重json字符串
        * spark里是通过org.json4s.jackson这个包来实现case class 集合类(Map、List、Seq、Set等)与json字符串的相互转化
        *
        * @return Offset
        */
      override def getOffset: Option[Offset] = ???
    
      /**
        * 获取数据
        *
        * @param start 上一个批次的end offset
        * @param end   通过getOffset获取的新的offset
        *              触发机制:当不断轮询的getOffset方法,获取的offset发生改变时,会触发该方法
        *
        *              实现要点:
        *              (1)DataFrame的创建:
        *              可以通过生成RDD,然后使用RDD创建DataFrame
        *              RDD创建:sqlContext.sparkContext.parallelize(rows.toSeq)
        *              DataFrame创建:sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
        * @return DataFrame
        */
      override def getBatch(start: Option[Offset], end: Offset): DataFrame = ???
    
      /**
        * 关闭资源
        * 将一些需要关闭的资源放到这里来关闭,如MySQL的数据库连接等
        */
      override def stop(): Unit = ???
    }
    

    3.3 自定义DataSource的使用

    自定义DataSource的使用与内置DataSource一样,只需要在format里指定一下我们的Provider类路径即可。如

        val source = spark
          .readStream
          .format("org.apache.spark.sql.kafka010.CustomSourceProvider")
          .options(options)
          .schema(schema)
          .load()
    

    3.4 实现MySQL自定义数据源

    此例子仅仅是为了演示如何自定义数据源,与实际业务场景无关。

    3.4.1 创建MySQLSourceProvider.scala

    package org.apache.spark.sql.structured.datasource
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.execution.streaming.{Sink, Source}
    import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types.StructType
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 09:10
      *       自定义MySQL数据源
      */
    class MySQLSourceProvider extends DataSourceRegister
      with StreamSourceProvider
      with StreamSinkProvider
      with Logging {
      /**
        * 数据源的描述名字,如:kafka、socket
        *
        * @return 字符串shotName
        */
      override def shortName(): String = "mysql"
    
    
      /**
        * 定义数据源的Schema
        *
        * @param sqlContext   Spark SQL 上下文
        * @param schema       通过.schema()方法传入的schema
        * @param providerName Provider的名称,包名+类名
        * @param parameters   通过.option()方法传入的参数
        * @return 元组,(shotName,schema)
        */
      override def sourceSchema(
                                 sqlContext: SQLContext,
                                 schema: Option[StructType],
                                 providerName: String,
                                 parameters: Map[String, String]): (String, StructType) = {
        (providerName, schema.get)
      }
    
      /**
        * 创建输入源
        *
        * @param sqlContext   Spark SQL 上下文
        * @param metadataPath 元数据Path
        * @param schema       通过.schema()方法传入的schema
        * @param providerName Provider的名称,包名+类名
        * @param parameters   通过.option()方法传入的参数
        * @return 自定义source,需要继承Source接口实现
        */
      override def createSource(
                                 sqlContext: SQLContext,
                                 metadataPath: String, schema: Option[StructType],
                                 providerName: String, parameters: Map[String, String]): Source = new MySQLSource(sqlContext, parameters, schema)
    
      /**
        * 创建输出源
        *
        * @param sqlContext       Spark SQL 上下文
        * @param parameters       通过.option()方法传入的参数
        * @param partitionColumns 分区列名?
        * @param outputMode       输出模式
        * @return
        */
      override def createSink(
                               sqlContext: SQLContext,
                               parameters: Map[String, String],
                               partitionColumns: Seq[String], outputMode: OutputMode): Sink = new MySQLSink(sqlContext: SQLContext,parameters, outputMode)
    }
    

    3.4.2 创建MySQLSource.scala

    package org.apache.spark.sql.structured.datasource
    
    import java.sql.Connection
    
    import org.apache.spark.executor.InputMetrics
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.catalyst.InternalRow
    import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
    import org.apache.spark.sql.execution.streaming.{Offset, Source}
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.{DataFrame, SQLContext}
    import org.json4s.jackson.Serialization
    import org.json4s.{Formats, NoTypeHints}
    
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 09:41
      */
    class MySQLSource(sqlContext: SQLContext,
                      options: Map[String, String],
                      schemaOption: Option[StructType]) extends Source with Logging {
    
      lazy val conn: Connection = C3p0Utils.getDataSource(options).getConnection
    
      val tableName: String = options("tableName")
    
      var currentOffset: Map[String, Long] = Map[String, Long](tableName -> 0)
    
      val maxOffsetPerBatch: Option[Long] = Option(100)
    
      val inputMetrics = new InputMetrics()
    
      override def schema: StructType = schemaOption.get
    
      /**
        * 获取Offset
        * 这里监控MySQL数据库表中条数变化情况
        * @return Option[Offset]
        */
      override def getOffset: Option[Offset] = {
        val latest = getLatestOffset
        val offsets = maxOffsetPerBatch match {
          case None => MySQLSourceOffset(latest)
          case Some(limit) =>
            MySQLSourceOffset(rateLimit(limit, currentOffset, latest))
        }
        Option(offsets)
      }
    
      /**
        * 获取数据
        * @param start 上一次的offset
        * @param end 最新的offset
        * @return df
        */
      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    
        var offset: Long = 0
        if (start.isDefined) {
          offset = offset2Map(start.get)(tableName)
        }
        val limit = offset2Map(end)(tableName) - offset
        val sql = s"SELECT * FROM $tableName limit $limit offset $offset"
    
        val st = conn.prepareStatement(sql)
        val rs = st.executeQuery()
        val rows: Iterator[InternalRow] = JdbcUtils.resultSetToSparkInternalRows(rs, schemaOption.get, inputMetrics) //todo 好用
        val rdd = sqlContext.sparkContext.parallelize(rows.toSeq)
    
        currentOffset = offset2Map(end)
    
        sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
      }
    
      override def stop(): Unit = {
        conn.close()
      }
    
      def rateLimit(limit: Long, currentOffset: Map[String, Long], latestOffset: Map[String, Long]): Map[String, Long] = {
        val co = currentOffset(tableName)
        val lo = latestOffset(tableName)
        if (co + limit > lo) {
          Map[String, Long](tableName -> lo)
        } else {
          Map[String, Long](tableName -> (co + limit))
        }
      }
    
      // 获取最新条数
      def getLatestOffset: Map[String, Long] = {
        var offset: Long = 0
        val sql = s"SELECT COUNT(1) FROM $tableName"
        val st = conn.prepareStatement(sql)
        val rs = st.executeQuery()
        while (rs.next()) {
          offset = rs.getLong(1)
        }
        Map[String, Long](tableName -> offset)
      }
    
      def offset2Map(offset: Offset): Map[String, Long] = {
        implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
        Serialization.read[Map[String, Long]](offset.json())
      }
    }
    
    case class MySQLSourceOffset(offset: Map[String, Long]) extends Offset {
      implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
    
      override def json(): String = Serialization.write(offset)
    }
    

    3.4.3 测试MySQLSource

    package org.apache.spark.sql.structured.datasource
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 15:12
      */
    object MySQLSourceTest {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName(this.getClass.getSimpleName)
          .master("local[2]")
          .getOrCreate()
        val schema = StructType(List(
          StructField("name", StringType),
          StructField("creatTime", TimestampType),
          StructField("modifyTime", TimestampType)
        )
        )
        val options = Map[String, String](
          "driverClass" -> "com.mysql.cj.jdbc.Driver",
          "jdbcUrl" -> "jdbc:mysql://localhost:3306/spark-source?useSSL=false&characterEncoding=utf-8",
          "user" -> "root",
          "password" -> "hollysys",
          "tableName" -> "model")
        val source = spark
          .readStream
          .format("org.apache.spark.sql.structured.datasource.MySQLSourceProvider")
          .options(options)
          .schema(schema)
          .load()
    
        import org.apache.spark.sql.functions._
        val query = source.writeStream.format("console")
          // 是否压缩显示
          .option("truncate", value = false)
          // 显示条数
          .option("numRows", 30)
          .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString)
          .start()
        query.awaitTermination()
      }
    }
    
    

    4 自定义输出源

    相比较输入源的自定义性,输出源自定义的应用场景貌似更为常用。比如:数据写入关系型数据库、数据写入HBase、数据写入Redis等等。其实Structured提供的foreach以及2.4版本的foreachBatch方法已经可以实现绝大数的应用场景的,几乎是数据想写到什么地方都能实现。但是想要更优雅的实现,我们可以参考Spark SQL Sink规范,通过自定义的Sink的方式来实现。实现自定义Sink需要以下四个个步骤:

    第一步:继承DataSourceRegister和StreamSinkProvider创建自定义SinkProvider类

    第二步:重写DataSourceRegister类中的shotName和StreamSinkProvider中的createSink方法

    第三步:继承Sink创建自定义Sink类

    第四步:重写Sink中的addBatch方法

    4.1 改写CustomDataSourceProvider类

    4.1.1 新增继承StreamSinkProvider

    在上面创建自定义输入源的基础上,新增继承StreamSourceProvider。如下所示:

    class CustomDataSourceProvider extends DataSourceRegister
      with StreamSourceProvider
      with StreamSinkProvider
      with Logging {
          //Override some functions ……
      }
    

    4.1.2 重写StreamSinkProvider中的createSink方法

    通过传入的参数,来实例化我们自定义的DataSink,是我们自定义Sink的重要入口的地方

      /**
        * 创建输出源
        *
        * @param sqlContext       Spark SQL 上下文
        * @param parameters       通过.option()方法传入的参数
        * @param partitionColumns 分区列名?
        * @param outputMode       输出模式
        * @return
        */
      override def createSink(sqlContext: SQLContext,
                              parameters: Map[String, String],
                              partitionColumns: Seq[String],
                              outputMode: OutputMode): Sink = new CustomDataSink(sqlContext,parameters,outputMode)
    

    4.2 创建CustomDataSink类

    4.2.1 继承Sink创建CustomDataSink类

    要创建自定义的DataSink必须要继承位于org.apache.spark.sql.sources包下的Sink。如下所示:

    class CustomDataSink(sqlContext: SQLContext,
                         parameters: Map[String, String],
                         outputMode: OutputMode) extends Sink with Logging {
        // Override some functions
    }
    

    4.2.2 重写Sink中的addBatch方法

    该方法是当发生计算时会被触发,传入的是一个batchId和dataFrame,拿到DataFrame之后,我们有三种写出方式,第一种是使用Spark SQL内置的Sink写出,如 JSON数据源、CSV数据源、Text数据源、Parquet数据源、JDBC数据源等。第二种是通过DataFrame的foreachPartition写出。第三种就是自定义SparkSQL的输出源然后写出。

    /**
      * 添加Batch,即数据写出
      *
      * @param batchId batchId
      * @param data    DataFrame
      *                触发机制:当发生计算时,会触发该方法,并且得到要输出的DataFrame
      *                实现摘要:
      *                1. 数据写入方式:
      *                (1)通过SparkSQL内置的数据源写出
      *                我们拿到DataFrame之后可以通过SparkSQL内置的数据源将数据写出,如:
      *                JSON数据源、CSV数据源、Text数据源、Parquet数据源、JDBC数据源等。
      *                (2)通过自定义SparkSQL的数据源进行写出
      *                (3)通过foreachPartition 将数据写出
      */
    override def addBatch(batchId: Long, data: DataFrame): Unit = ???
    

    注意

    当我们使用第一种方式的时候要注意,此时拿到的DataFrame是一个流式的DataFrame,即isStreaming=ture,通过查看KafkaSink,如下代码所示,先是通过DataFrame.queryExecution执行查询,然后在wite里转成rdd,通过rdd的foreachPartition实现。同样的思路,我们可以利用这个rdd和schema,利用sqlContext.internalCreateDataFrame(rdd, data.schema)重新生成DataFrame,这个在MySQLSink中使用过。

    override def addBatch(batchId: Long, data: DataFrame): Unit = {
      if (batchId <= latestBatchId) {
        logInfo(s"Skipping already committed batch $batchId")
      } else {
        KafkaWriter.write(sqlContext.sparkSession,
          data.queryExecution, executorKafkaParams, topic)
        latestBatchId = batchId
      }
    }
    
      def write(
          sparkSession: SparkSession,
          queryExecution: QueryExecution,
          kafkaParameters: ju.Map[String, Object],
          topic: Option[String] = None): Unit = {
        val schema = queryExecution.analyzed.output
        validateQuery(schema, kafkaParameters, topic)
        queryExecution.toRdd.foreachPartition { iter =>
          val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
          Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
            finallyBlock = writeTask.close())
        }
      }
    

    4.3 自定义DataSink的使用

    自定义DataSink的使用与自定义DataSource的使用相同,在format里指定一些类Provider的类路径即可。

    val query = source.groupBy("creatTime").agg(collect_list("name")).writeStream
          .outputMode("update")
          .format("org.apache.spark.sql.kafka010.CustomDataSourceProvider")
          .option(options)
          .start()
        query.awaitTermination()
    

    4.4 实现MySQL自定义输出源

    4.4.1 修改MySQLSourceProvider.scala

    上面我们实现MySQL自定义输入源的时候,已经创建了MySQLSourceProvider类,我们需要在这个基础上新增继承StreamSinkProvider,并重写createSink方法,如下所示:

    package org.apache.spark.sql.structured.datasource
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.execution.streaming.{Sink, Source}
    import org.apache.spark.sql.kafka010.{MySQLSink, MySQLSource}
    import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.types.StructType
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 09:10
      *       自定义MySQL数据源
      */
    class MySQLSourceProvider extends DataSourceRegister
      with StreamSourceProvider
      with StreamSinkProvider
      with Logging {
          
      //……省略自定义输入源的方法
    
      /**
        * 创建输出源
        *
        * @param sqlContext       Spark SQL 上下文
        * @param parameters       通过.option()方法传入的参数
        * @param partitionColumns 分区列名?
        * @param outputMode       输出模式
        * @return
        */
      override def createSink(
                               sqlContext: SQLContext,
                               parameters: Map[String, String],
                               partitionColumns: Seq[String], outputMode: OutputMode): Sink = new MySQLSink(sqlContext: SQLContext,parameters, outputMode)
    }
    
    

    4.4.1 创建MySQLSink.scala

    package org.apache.spark.sql.structured.datasource
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
    import org.apache.spark.sql.execution.streaming.Sink
    import org.apache.spark.sql.streaming.OutputMode
    
    /**
      * @author : shirukai
      * @date : 2019-01-25 17:35
      */
    class MySQLSink(sqlContext: SQLContext,parameters: Map[String, String], outputMode: OutputMode) extends Sink with Logging {
      override def addBatch(batchId: Long, data: DataFrame): Unit = {
        val query = data.queryExecution
        val rdd = query.toRdd
        val df = sqlContext.internalCreateDataFrame(rdd, data.schema)
        df.show(false)
        df.write.format("jdbc").options(parameters).mode(SaveMode.Append).save()
      }
    }
    

    4.2.3 测试MySQLSink

    package org.apache.spark.sql.structured.datasource
    
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
    
    /**
      * @author : shirukai
      * @date : 2019-01-29 09:57
      *       测试自定义MySQLSource
      */
    object MySQLSourceTest {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName(this.getClass.getSimpleName)
          .master("local[2]")
          .getOrCreate()
        val schema = StructType(List(
          StructField("name", StringType),
          StructField("creatTime", TimestampType),
          StructField("modifyTime", TimestampType)
        )
        )
        val options = Map[String, String](
          "driverClass" -> "com.mysql.cj.jdbc.Driver",
          "jdbcUrl" -> "jdbc:mysql://localhost:3306/spark-source?useSSL=false&characterEncoding=utf-8",
          "user" -> "root",
          "password" -> "hollysys",
          "tableName" -> "model")
        val source = spark
          .readStream
          .format("org.apache.spark.sql.structured.datasource.MySQLSourceProvider")
          .options(options)
          .schema(schema)
          .load()
    
        import org.apache.spark.sql.functions._
        val query = source.groupBy("creatTime").agg(collect_list("name").cast(StringType).as("names")).writeStream
          .outputMode("update")
          .format("org.apache.spark.sql.structured.datasource.MySQLSourceProvider")
          .option("checkpointLocation", "/tmp/MySQLSourceProvider11")
          .option("user","root")
          .option("password","hollysys")
          .option("dbtable","test")
          .option("url","jdbc:mysql://localhost:3306/spark-source?useSSL=false&characterEncoding=utf-8")
          .start()
    
        query.awaitTermination()
      }
    }
    

    3 总结

    通过上面的笔记,参看官网文档,可以学习到Structured支持的几种输入源:File Source、Socket Source、Rate Source、Kafka Source,平时我们会用到KafkaSource以及FileSource,SocketSource、RateSource多用于测试场景。关于输入源没有什么优雅的操作,只能通过重写Source来实现。对于输出源来说,Spark Structured提供的foreach以及foreachBatch已经能适用于大多数场景,没有重写Sink的必要。关于Spark SQL 自定义输入源、Streaming自定义数据源后期会慢慢整理出来。

    展开全文
  • MyBatis-自定义数据源

    千次阅读 2016-12-11 17:11:19
    以DBCP为例实现自定义数据源 pomxml添加依赖 <!-- https://mvnrepository.com/artifact/commons-dbcp/commons-dbcp --> <groupId>commons-dbcp <artifactId>commons-dbcp <version>
  • Druid+自定义数据源

    2020-05-30 18:18:03
    1.若采用自定义数据源: spring: datasource: type: com.alibaba.druid.pool.DruidDataSource # 地址 http://localhost:8080/druid/sql.html url: jdbc:mysql://127.0.0.1:3306/demo?serverTimezone=UTC&...
  • 润乾 自定义数据源

    千次阅读 2013-01-11 14:35:11
    报表的数据来源大部分来自数据库,正常情况润乾报表都能自动处理,但是也会有特殊的情况,例如通过中间件连接数据库而非直连?...以下是尝试自定义数据源的方案: package test.runqian; import java.sql.Conne
  • flink自定义数据源emqtt

    千次阅读 2019-04-09 08:51:38
    flink自定义数据源emqtt 测试环境 : 单机服务器:8核12G, 设置并行度为2, 测试结果: 执行3分钟, 大概1秒4万的并发量, 未延迟, 只是简单测试, 并未达到极限。 Client11.java (主要用来处理emqtt的配置...
  • SparkSQL基于DataSourceV2自定义数据源

    千次阅读 2019-02-28 17:18:23
    SparkSQL基于DataSourceV2自定义数据源 版本说明:Spark 2.3 前言:之前在SparkSQL数据源操作文章中整理了一些SparkSQL内置数据源的使用,总的来说SparkSQL支持的数据源还是挺丰富的,但业务上可能不拘束于这几种...
  • CAS 自定义数据源

    千次阅读 2015-09-26 10:31:27
    自定义数据源 在deployConfigContext.xml文件中,找到org.jasig.cas.authentication.AcceptUsersAuthenticationHandler, 首先去掉默认的用户配置,添加上自定义的dataSource,并配置查询密码的sql,及自定义的...
  • Springboot和MyBatis整合(自定义数据源,可多数据源) ​ 由于我们项目中会用到根据条件不同操作不同的数据源,之前用的Springboot和JDBC整合的时候可以在JDBC里轻松选择数据源,写这个贴子则是记录在使用MyBatis时...
  • Spring Boot 自定义数据源 DruidDataSource Druid简介Druid功能配置参数表合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容...
  •  前面讲解了一些JasperReport给我们提供的一些实现好的数据源,当然如果我们有自己的特殊需要,还可以自定义数据源。   正题 跟之前的一样,我们要生成报表需要以下几个步骤: 1.引入jar包,请看《静态文本...
  • CityPicker 自定义数据源

    千次阅读 2019-04-02 07:15:57
    城市选择自定义数据 在使用 Citypicker 的时候,觉得其 UI 很不错,很多项目可以拿来即可使用。先来两张效果图镇楼! 问题是其数据源不支持自定义,因此使用起来很不方便。后来在看了源码之后,修改了其中几处代码...
  • 使用方便,简单,支持自定义数据源和使用该库提供的城市数据(该数据来源于国家统计局,比较准确),欢迎大家使用。 V5.0.0隆重发布!!! v5.0.0 支持自定义数据源 CityPicker城市选择器迎来史上最大更新!!!期待...
  • spring-boot入门(五)自定义数据源:druid 上一章讲了如何使用默认的tomcat数据源及用hibernate与数据库交互,这篇我们以阿里的druid数据源为例子,来说下如何使用自定义的数据源。 如何spring boot在启动的时候...
  • SimpleJson数据源简介 Grafana作为最火热的开源数据可视化工具,最大的特点就是支持多种数据源以及丰富的插件库了。官方提供了Elasictsearch、Prometheus、Mysql等常见数据库的数据源,社区也提供了支持Zabbix监控...
  • Sharpmap自定义数据源

    千次阅读 2014-01-06 22:17:57
    业务数据有位置信息,都可以在地图上显示...Sharpmap为我们提供了数据源接口,来完成各种各样数据的接入。 类图: 核心方法: /// ///返回查询到的数据集 /// /// 当前地图视野矩形 /// 填充后的返回的数据集
  • flink自定义数据源

    2020-03-20 11:20:55
    转载自flink从0到1学习个人收藏学习所用,
  • 实现自定义数据源函数读取,Flink为我们提供了两个方法: SourceFunction通过实现RichSourceFunction来定义非并行的数据源连接器。 ParllelSourceFunction通过试下RichParalleSourceFunction来定义能并行的数据源...
  • //添加自定义数据源 val source : DataStream [ MarketingUserBehavior ] = env . addSource ( new EventSourceFunction ( ) ) . assignAscendingTimestamps ( _ . timeStamp ) source . print ( "..." ...
  • 使用的是jasperreports-5.6.0版本的包。 需求: 报表由一个基本Basic对象和一个集合类CustomList...解决方法:自定义一个DataSource实现JRDataSource接口即可 仿照JRBeanCollectionDataSource写了一个ReportDataS
  • Spark Streaming 自定义数据源之 WordCount

    千次阅读 2020-02-12 16:30:19
    * 自定义数据源 */ object MyDefine { def main ( args : Array [ String ] ) : Unit = { val sparkConf = new SparkConf ( ) . setMaster ( "local[2]" ) . setAppName ( "MyDefine" ) val ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 443,279
精华内容 177,311
关键字:

自定义数据源