精华内容
下载资源
问答
  • SpringBatch批处理框架

    2019-07-22 15:41:40
    资源名称:Spring Batch 批处理框架内容简介:《Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的...
  • Spring Batch批处理框架

    2017-11-16 15:30:45
    Spring Batch批处理框架Spring Batch批处理框架Spring Batch批处理框架
  • Spring batch批处理框架

    2020-08-30 22:36:41
    本文主要介绍了Spring batch批处理框架的相关知识。具有很好的参考价值,下面跟着小编一起来看下吧
  • Spring Batch 批处理框架

    2019-07-07 02:39:33
    Spring Batch 批处理框架》基本信息作者: 刘相 出版社:电子工业出版社ISBN:9787121252419上架时间:2015-1-24出版日期:2015 年2月开本:16开页码:404版次:1-1 内容...

    Spring Batch 批处理框架
    基本信息
    作者: 刘相
    出版社:电子工业出版社
    ISBN:9787121252419
    上架时间:2015-1-24
    出版日期:2015 年2月
    开本:16开
    页码:404
    版次:1-1

     


    内容简介
    Spring Batch 批处理框架
    本书全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的架构设计、源码做了特定的剖析;在帮助读者掌握Spring Batch框架基本功能、高级功能的同时,深入剖析了Spring Batch框架的设计原理,帮助读者可以游刃有余地掌握Spring Batch框架。
    本书分为入门篇、基本篇和高级篇三部分。入门篇介绍了批处理、Spring Batch的基本特性和新特性,快速入门的Hello World等内容引领读者入门,从而进入数据批处理的世界。基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring Batch框架中经典的三步走策略:数据读、数据处理和数据写,详尽地介绍了如何对CVS格式文件、JSON格式文件、XML文件、数据库和JMS消息队列中的数据进行读操作、处理和写操作,对于数据库的操作详细介绍了使用JDBC、Hibernate、存储过程、JPA、Ibatis等处理。高级篇提供了高性能、高可靠性、并行处理的能力,分别向读者展示了如何实现作业流的控制,包括顺序流、条件流、并行流,如何实现健壮的作业,包括跳过、重试和重启等,如何实现扩展作业及并行作业,包括多线程作业、并行作业、远程作业和分区作业等,从而实现分布式、高性能、高扩展性的数据批处理作业。
    目录
    第1篇 入门篇

    第1章 Spring Batch简介 2
    1.1 什么是批处理 2
    1.2 Spring Batch 3
    1.2.1 典型场景 3
    1.2.2 Spring Batch架构 4
    1.3 Spring Batch优势 4
    1.3.1 丰富的开箱即用组件 5
    1.3.2 面向Chunk的处理 5
    1.3.3 事务管理能力 5
    1.3.4 元数据管理 5
    1.3.5 易监控的批处理应用 5
    1.3.6 丰富的流程定义 5
    1.3.7 健壮的批处理应用 6
    1.3.8 易扩展的批处理应用 6
    1.3.9 复用企业现有IT资产 6
    1.4 Spring Batch 2.0新特性 6
    1.4.1 支持Java5 7
    1.4.2 支持非顺序的Step 7
    1.4.3 面向Chunk处理 7
    1.4.4 元数据访问 11
    1.4.5 扩展性 11
    1.4.6 可配置性 12
    1.5 Spring Batch 2.2新特性 13
    1.5.1 Spring Data集成 13
    1.5.2 支持Java配置 13
    1.5.3 Spring Retry 14
    1.5.4 Job Parameters 14
    1.6 开发环境搭建 15
    第2章 Spring Batch之Hello World 16
    2.1 场景说明 16
    2.2 项目准备 16
    2.2.1 项目结构 16
    2.2.2 准备对账单文件 17
    2.2.3 定义领域对象 18
    2.3 定义job基础设施 18
    2.4 定义对账Job 19
    2.4.1 配置ItemReader 19
    2.4.2 配置ItemProcessor 21
    2.4.3 配置ItemWriter 22
    2.5 执行Job 23
    2.5.1 Java调用 23
    2.5.2 JUnit单元测试 24
    2.6 概念预览 26

    第2篇 基本篇

    第3章 Spring Batch基本概念 28
    3.1 命名空间 29
    3.2 Job 30
    3.2.1 Job Instance 31
    3.2.2 Job Parameters 33
    3.2.3 Job Execution 34
    3.3 Step 35
    3.3.1 Step Execution 37
    3.4 Execution Context 38
    3.5 Job Repository 39
    3.5.1 Job Repository Schema 39
    3.5.2 配置Memory Job
    Repository 40
    3.5.3 配置DB Job Repository 41
    3.5.4 数据库Schema 42
    3.6 Job Launcher 48
    3.7 ItemReader 49
    3.8 ItemProcessor 50
    3.9 ItemWriter 50
    第4章 配置作业Job 52
    4.1 基本配置 52
    4.1.1 重启Job 54
    4.1.2 Job拦截器 55
    4.1.3 Job Parameters校验 58
    4.1.4 Job抽象与继承 59
    4.2 高级特性 61
    4.2.1 Step Scope 61
    4.2.2 属性Late Binding 62
    4.3 运行Job 63
    4.3.1 调度作业 65
    4.3.2 命令行执行 68
    4.3.3 与定时任务集成 71
    4.3.4 与Web应用集成 73
    4.3.5 停止Job 77
    第5章 配置作业步Step 85
    5.1 配置Step 86
    5.1.1 Step抽象与继承 87
    5.1.2 Step执行拦截器 89
    5.2 配置Tasklet 92
    5.2.1 重启Step 93
    5.2.2 事务 94
    5.2.3 事务回滚 96
    5.2.4 多线程Step 97
    5.2.5 自定义Tasklet 97
    5.3 配置Chunk 99
    5.3.1 提交间隔 102
    5.3.2 异常跳过 103
    5.3.3 Step重试 105
    5.3.4 Chunk完成策略 107
    5.3.5 读、处理事务 110
    5.4 拦截器 112
    5.4.1 ChunkListener 115
    5.4.2 ItemReadListener 116
    5.4.3 ItemProcessListener 116
    5.4.4 ItemWriteListener 117
    5.4.5 SkipListener 117
    5.4.6 RetryListener 118
    第6章 读数据ItemReader 120
    6.1 ItemReader 120
    6.1.1 ItemReader 120
    6.1.2 ItemStream 121
    6.1.3 系统读组件 122
    6.2 Flat格式文件 122
    6.2.1 Flat文件格式 123
    6.2.2 FlatFileItemReader 125
    6.2.3 RecordSeparatorPolicy 129
    6.2.4 LineMapper 130
    6.2.5 DefaultLineMapper 131
    6.2.6 LineCallbackHandler 138
    6.2.7 读分隔符文件 139
    6.2.8 读定长文件 141
    6.2.9 读JSON文件 143
    6.2.10 读记录跨多行文件 145
    6.2.11 读混合记录文件 147
    6.3 XML格式文件 150
    6.3.1 XML解析 150
    6.3.2 Spring OXM 151
    6.3.3 StaxEventItemReader 153
    6.4 读多文件 156
    6.5 读数据库 159
    6.5.1 JdbcCursorItemReader 160
    6.5.2 HibernateCursorItem
    Reader 167
    6.5.3 StoredProcedureItem
    Reader 171
    6.5.4 JdbcPagingItemReader 174
    6.5.5 HibernatePagingItem
    Reader 179
    6.5.6 JpaPagingItemReader 183
    6.5.7 IbatisPagingItemReader 186
    6.6 读JMS队列 190
    6.6.1 JmsItemReader 190
    6.7 服务复用 194
    6.8 自定义ItemReader 197
    6.8.1 不可重启ItemReader 197
    6.8.2 可重启ItemReader 199
    6.9 拦截器 202
    6.9.1 拦截器接口 202
    6.9.2 拦截器异常 203
    6.9.3 执行顺序 204
    6.9.4 Annotation 204
    6.9.5 属性Merge 205
    第7章 写数据ItemWriter 207
    7.1 ItemWrite 207
    7.1.1 ItemWriter 208
    7.1.2 ItemStream 208
    7.1.3 系统写组件 209
    7.2 Flat格式文件 210
    7.2.1 FlatFileItemWriter 210
    7.2.2 LineAggregator 214
    7.2.3 FieldExtractor 217
    7.2.4 回调操作 219
    7.3 XML格式文件 222
    7.3.1 StaxEventItemWriter 222
    7.3.2 回调操作 226
    7.4 写多文件 230
    7.4.1 MultiResourceItemWriter 230
    7.4.2 扩展MultiResourceItem
    Writer 233
    7.5 写数据库 234
    7.5.1 JdbcBatchItemWriter 235
    7.5.2 HibernateItemWriter 239
    7.5.3 IbatisBatchItemWriter 242
    7.5.4 JpaItemWriter 245
    7.6 写JMS队列 248
    7.6.1 JmsItemWriter 248
    7.7 组合写 252
    7.8 Item路由Writer 254
    7.9 发送邮件 258
    7.9.1 SimpleMailMessageItem
    Writer 258
    7.10 服务复用 262
    7.10.1 ItemWriterAdapter 262
    7.10.2 PropertyExtracting
    DelegatingItemWriter 264
    7.11 自定义ItemWrite 267
    7.11.1 不可重启ItemWriter 267
    7.11.2 可重启ItemWriter 268
    7.12 拦截器 271
    7.12.1 拦截器接口 271
    7.12.2 拦截器异常 273
    7.12.3 执行顺序 274
    7.12.4 Annotation 274
    7.12.5 属性Merge 275
    第8章 处理数据ItemProcessor 277
    8.1 ItemProcessor 277
    8.1.1 ItemProcessor 277
    8.1.2 系统处理组件 278
    8.2 数据转换 279
    8.2.1 部分数据转换 279
    8.2.2 数据类型转换 281
    8.3 数据过滤 282
    8.3.1 数据Filter 282
    8.3.2 数据过滤统计 283
    8.4 数据校验 285
    8.4.1 Validator 285
    8.4.2 ValidatingItemProcessor 286
    8.5 组合处理器 288
    8.6 服务复用 291
    8.6.1 ItemProcessorAdapter 291
    8.7 拦截器 293
    8.7.1 拦截器接口 293
    8.7.2 拦截器异常 295
    8.7.3 执行顺序 295
    8.7.4 Annotation 296
    8.7.5 属性Merge 297

    第3篇 高级篇

    第9章 作业流Step Flow 300
    9.1 顺序Flow 300
    9.2 条件Flow 302
    9.2.1 next 303
    9.2.2 ExitStatus VS
    BatchStatus 306
    9.2.3 decision条件 308
    9.3 并行Flow 311
    9.4 外部Flow定义 314
    9.4.1 Flow 314
    9.4.2 FlowStep 317
    9.4.3 JobStep 319
    9.5 Step数据共享 321
    9.6 终止Job 323
    9.6.1 end 324
    9.6.2 stop 326
    9.6.3 fail 327
    第10章 健壮Job 330
    10.1 跳过Skip 331
    10.1.1 配置Skip 331
    10.1.2 跳过策略SkipPolicy 333
    10.1.3 跳过拦截器 335
    10.2 重试Retry 338
    10.2.1 配置Retry 339
    10.2.2 重试策略RetryPolicy 341
    10.2.3 重试拦截器 343
    10.2.4 重试模板 345
    10.3 重启Restart 353
    10.3.1 重启Job 353
    10.3.2 启动次数限制 355
    10.3.3 重启已完成的任务 355
    第11章 扩展Job、并行处理 357
    11.1 可扩展性 357
    11.2 多线程Step 358
    11.2.1 配置多线程Step 359
    11.2.2 线程安全性 360
    11.2.3 线程安全Step 361
    11.2.4 可重启的线程
    安全Step 363
    11.3 并行Step 365
    11.4 远程Step 366
    11.4.1 远程Step框架 366
    11.4.2 基于SI实现远程Step 368
    11.5 分区Step 373
    11.5.1 关键接口 374
    11.5.2 基本配置 376
    11.5.3 文件分区 378
    11.5.4 数据库分区 382
    11.5.5 远程分区Step 387
    后记 392

    转载于:https://www.cnblogs.com/china-pub/p/4255141.html

    展开全文
  • 主要介绍了Spring Batch批处理框架使用解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • springbatch 批处理框架的介绍

    千次阅读 2018-10-11 20:34:31
    springbatch 批处理框架的介绍 (还在整理中。。。。。。。。有点乱,待更新) Spring Batch 是什么? 官网中介绍 Spring Batch is a lightweight, comprehensive batch framework designed to enable the ...

    springbatch 批处理框架整理

    (还在整理中。。。。。。。。有点乱,待更新)

    Spring Batch 是什么? 官网中介绍 Spring Batch is a lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)相对于他的特点定义我们肯定更倾向于他的使用的业务场景以及他是如何运作的。下面的篇幅将介绍整个springbatch的使用业务场景和它的结构原理以及如何去使用它们(最后会通过一个demo来演示)。
    springbatch结合springboot 的demo:https://github.com/kellypipe/springbatch-springboot-demo

    1、使用场景

    对于没有相关经验的初学者,下面是需要批处理的一些场景,并且如果使用Spring Batch 很可能会节省你很多宝贵的时间:

    • 接收的文件缺少了一部分需要的信息,你需要读取并解析整个文件,调用某个服务来获得缺少的那部分信息,然后写入到某个输出文件,供其他批处理程序使用。
    • 如果执行环境中发生了一个错误,则将失败信息写入数据库。 有专门的程序每隔15分钟来遍历一次失败信息,如果标记为可以重试,那就再执行一次。
    • 在工作流中,你希望其他系统在收到事件消息时,来调用某个特定服务。 如果其他系统没有调用这个服务,那么一段时间后需要自动清理过期数据,以避免影响到正常的业务流程。
    • 每天收到员工信息更新的文件,你需要为新员工建立相关档案和账号(artifacts)。
    • 有些定制订单的服务。 你需要在每天晚上执行批处理程序来生成清单文件,并将它们发送到相应的供应商手上。

    典型的批处理程序通常是从数据库、文件或队列中读取大量数据,然后通过某些方法处理数据,最后将处理好格式的数据写回库。对于批处理经验少的开发者来说,编写批处理程序来处理GB级别数据量无疑是种海啸般难以面对的任务,但我们可以用Spring Batch将其拆解为小块小块的(chunk)。 Spring Batch 是Spring框架的一个模块,专门设计来对各种类型的文件进行批量处理。

    1. 定期提交批处理任务
    2. 并发批处理:并行执行任务
    3. 分阶段,企业消息驱动处理
    4. 高并发批处理任务
    5. 失败后手动或定时重启
    6. 按顺序处理任务依赖(使用工作流驱动的批处理插件)
    7. 局部处理:跳过记录(例如在回滚时)
    8. 完整的批处理事务:因为可能有小数据量的批处理或存在存储过程/脚本

    总的来说,springbatch 封装了一些细节操作(比如批处理数据的时候不需要我们自己去考虑如何去读取数据,如何去操作数据,如何去写入数据,这些框架都封装了),我们需要关注整个批处理任务的流程就可以了;

    2、框架结构

    在这里插入图片描述
    上面图是已经使用了几十年的批处理参考体系结构的简化版本。它概述了组成批处理领域的组件.Spring Batch 在系统中提供了健壮的、可维护的常见的层、组件和技术服务的物理实现,这些系统用于创建简单到复杂的批处理应用程序,其基础结构和扩展可以满足非常复杂的处理需求。

    上图能明显看到有4个主要角色:

    1. JobLauncher:是任务启动器,通过它来启动任务,可以看做是程序的入口。
    2. Job代表着一个具体的任务。
    3. Step代表着一个具体的步骤,一个Job可以包含多个Step.在实际业务场景中,可能一个任务很复杂,这个时候可以将任务 拆分成多个step,分别对这些step 进行管理(将一个复杂任务简单化)。(这些step 默认是串行执行,也可以并行执行,根据具体的业务场景来使用)。每一个都有一个ItemReader(读取数据),一个ItemProcessor(处理数据)和一个ItemWriter(写入数据)
    4. JobRepository:批处理框架执行过程中的上下文(元数据)–这个有两种实现一种是通过内存来管理,一种是进行持久化到数据库。
    2.1 JobLauncher

    JobLauncher是任务启动器,该接口只有一个run方法

    public interface JobLauncher {
     
        public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
     
    }
    

    除了传入Job对象之外,还需要传入JobParameters对象,后续讲到Job再解释为什么要多传一个JobParameters。通过JobLauncher可以在Java程序中调用批处理任务,也可以通过命令行或者其他框架(如定时调度框架Quartz、Web后台框架Spring MVC)中调用批处理任务。Spring Batch框架提供了一个JobLauncher的实现类SimpleJobLauncher。

    2.2、Job

    在Spring批处理中,作业只是步骤实例的容器。它将逻辑上属于流中的多个步骤组合在一起,并允许对所有步骤进行属性全局配置。

    • 简单的工作名称。
    • 步骤实例的定义和排序。
    • 工作是否可以重新开始

    考虑到任务可能不是只执行一次就再也不执行了,更多的情况可能是定时任务,如每天执行一次,每个星期执行一次等等,那么为了区分每次执行的任务,框架使用了JobInstance。如上图所示,Job是一个EndOfDay(每天最后时刻执行的任务),那么其中一个JobInstance就代表着2007年5月5日那天执行的任务实例。框架通过在执行JobLauncher.run(Job, JobParameters)方法时传入的JobParameters来区分是哪一天的任务。

    由于2007年5月5日那天执行的任务可能不会一次就执行完成,比如中途被停止,或者出现异常导致中断,需要多执行几次才能完成,所以框架使用了JobExecution来表示每次执行的任务。

    SimpleJob 是Spring Batch默认简单实现
    类,它在Job之上创建一些标准功能。在使用基于java的配置时,可以使用一组构建器来实例化作业,如下面的示例所示。

    @Bean
    public Job footballJob() {
      return this.jobBuilderFactory.get("footballJob")
      .start(playerLoad())
      .next(gameLoad())
      .next(playerSummarization())
      .end()
      .build();
    }
    
    
    2.2.1、JobInstance

    JobInstance指的是 逻辑作业运行单元的概念。考虑一个批作业,可能不仅仅执行一次。例如前面图中的“EndOfDay”作业,批作业在每一天结束时运行一次。所以对每个作业的运行必须单独逻辑的JobInstance跟踪。比如1月1日的跑步,1月2日的跑步,等等。如果1月1日的第一次运行失败,第二天再次运行,它仍然是
    1月1日的上下文。因此,每个JobInstance可以有多个执行(本章后面将更详细地讨论JobExecution),并且在给定时间内,相同参数的同一个JobInstance只能有一个能运行。

    JobInstance的定义与要加载的数据完全没有关系。如何加载数据完全取决于ItemReader实现在EndOfDay场景中,数据上可能有一列表示“有效日期”或
    数据所属的“调度日期”。因此,1月1日的运行将只加载第1次的数据,而1月2日的运行将只使用第2次的数据。因为这个决定可能是一个业务决策,所以由ItemReader决定。但是,使用相同的JobInstance决定是否使用以前执行的“state”(即ExecutionContext,将在本章后面讨论)。使用新的JobInstance意思是“从头开始”,而使用现有实例通常表示“从头开始”

    2.2.1、JobParameters

    在讨论了JobInstance与Job的区别之后,很自然地要问:“一个JobInstance与另一个区别在哪里?”答案是:JobParameters。JobParameters对象持有一组用于启动批作业的参数。在运行过程中,它们可以用于区别不同的jobinstance,甚至作为实例运行的数据,如下图所示
    [外链图片转存失败(img-Fn9VOCSp-1564901336408)
    在这里插入图片描述

    在前面的示例中,有两个jobInstance,一个用于1月1日,另一个用于1月1日
    1月2日,实际上只有一个Job,但是它有两个JobParameter对象:一个以作业参数01-01-2017开始,另一个以参数01-02-2017开始。
    因此,jobinstance 可抽象为为:JobInstance =Job + identifying JobParameters. 。这允许开发人员通过控制传入的参数有效定义JobInstance。

    Not all job parameters are required to contribute to the identification of a
    JobInstance. By default, they do so. However, the framework also allows the
    submission of a Job with parameters that do not contribute to the identity of a
    JobInstance
    
    2.2.1、JobExecution ######。

    JobExecution作为一个job 一次执行任务的上下文。因为job 的一个instanceJob 有可能执行失败而多次执行,这样就需要一个上下文来管理同一个instanceJob 的多次执行。一次执行可能以失败或成功结束,只有JobExecution 执行成功了JobInstance才被认为是完成的。以前面描述的EndOfDay作业为例,考虑一个01-01-2017的JobInstance,它在第一次运行时失败。
    如果再次使用与第一次运行(01-01-2017)相同的作业参数运行(01-01-2017),则新的
    JobExecution被创建。然而,仍然只有一个JobInstance。JobInstance则纯粹是一个组织对象,他将多个JobExecution 组合一起。而实际运行期间的主要存储机制是JobExecution.

    以上面EndOfDayJob 为例 在01-01-2017 9:00 开始执行任务,到9:30 任务失败。可以看到相关表的记录。
    在这里插入图片描述

    由于第一次执行失败后,任务将停止等待第二次重启(第二次重启将从失败的位置开始)。到01-02-2017 9:00 时候,第一次执行失败的任务将从失败的位置重新开始执行,而10-02-2017的任务也将开始执行,JobInstance被一个接一个地启动,除非两个作业因为同时访问相同的数据造,从而导致在数据库级别的锁定而阻塞。否则何时运行作业完全取决于调度程序。因为他们是分开的工作,spring
    Batch 框架不会阻止它们并发地运行。(当试图运行相同的程序
    当另一个已经在运行时,JobInstance会导致抛出一个JobExecutionAlreadyRunningException)。现观察相关表将有新的记录:
    在这里插入图片描述

    2.3、Step

    Step是一个领域对象,它体现了批处理作业的独立的、连续的阶段。
    因此,每个工作都完全由一个或多个步骤组成。步骤包含定义和控制实际批处理所需的所有信息。step 不是固定的,因为任何给定步骤的内容都由开发人员决定
    。一个步骤可以是简单的,也可以是复杂的。一个简单的步骤可能会将数据从文件加载到数据库中,只需要很少或根本不需要代码(取决于所使用的实现)。更复杂的步骤可能有作为处理一部分应用的复杂业务规则。与作业一样(JobExecution),Step 也有这独立 StepExecution 存储这每一个step 的执行信息。如下图:

    2.3.1、StepExecution

    一个 StepExecution 表示执行step的一次执行。
    每次运行一个step时都会创建一个新的StepExecution,类似于JobExecution。但是,如果一个步骤因为之前的步骤失败而没有执行,则不会为它持久化执行。只有当它的step真正开始时,才会创建StepExecution

    StepExecution 用来表示每一个step 的执行。每个StepExecution都包含对其相应step和与JobExecution以及事务相关数据的引用,比如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个
    ExecutionContext,它包含开发人员在批处理运行期间需要持久化的任何数据,例如重新启动所需的统计信息或状态信息。下表列出了用于的属性.

    在这里插入图片描述

    在这里插入图片描述

    2.3.1、ExecutionContext

    ExecutionContext 存储这 框架需要持久化和控制的键值对集合,以便开发人员存储作用域为StepExecution or a JobExecution的持久状态。对于那些熟悉Quartz的人来说,它与Quartz JobDataMap非常相似。他们的最好作用是在发生异常时为后续的重启做数据基础。

    以读取文件为例,在处理单行时,框架定期在提交点持久化ExecutionContext。这样做允许ItemReader存储它的状态,以防在运行过程中发生致命错误,甚至电源中断。所需要做的就是将当前读取的行数放入上下文中,如下例所示,框架将完成其余持久化的工作。

    executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
    

    以上面的EndOfDay 为示例,假设有一个步骤,‘loadData’,加载一个文件到数据库。第一次运行失败后,元数据表的变化如下:

    在这里插入图片描述

    在前面的示例中,该步骤运行了30分钟,并处理了40,321个“片段”,这将表示此场景中文件中的行。这个值在框架每次提交之前更新,并且可以包含多个行,这些行对应于ExecutionContext中的条目。
    为了在提交之前事件通知,需要实现StepListener(或ItemStream)
    详细内容将在后面描述。还以上面EndOfDay示例,假定任务在第二天重新启动。当它重新启动时
    上次运行的ExecutionContext从数据库中重新构造。打开ItemReader时,它可以检查上下文中是否有任何存储状态,并从那里初始化自己。

    if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
      log.debug("Initializing for restart. Restart data is: " + executionContext);
      long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
      LineReader reader = getReader();
      Object record = "";
      while (reader.getPosition() < lineCount && record != null) {
      record = readLine();
      }
    }
    
    

    在这种情况下,在上面的代码运行之后,当前行是40,322,允许从它停止的地方重新开始。例如,如果一个文件包含处理订单,一个订单包含多个行,可能需要存储多个订单处理(这是不同于读取行数),因此可以将电子邮件发送的最后一步,订单处理的总数。该框架为开发人员处理存储这些内容,以便正确地使用一个JobInstance对其进行调整。很难知道是否应该使用现有的ExecutionContext。例如,使用
    从上面的‘EndOfDay’示例中可以看出,当01-01再次运行时,框架意识到它是相同的JobInstance,并在单个步骤的基础上拉出
    将ExecutionContext从数据库中取出,并将其(作为步骤执行的一部分)交给步骤本身。相反,对于01-02运行,框架认识到它是一个不同的实例,因此必须向步骤传递一个空上下文。框架为开发人员提供了许多这种类型的决定,以确保在正确的时间给他们状态。同样重要的是要注意,在任何给定时间,每一步执行只存在一个ExecutionContext。ExecutionContext的客户端应该小心,因为这会创建一个共享密钥空间。因此,在输入值时应该小心,以确保没有覆盖数据。然而,
    Step在上下文中完全不存储数据,因此没有办法对框架产生负面影响

    同样重要的是,每个JobExecution至少有一个执行上下文,每一步执行一个上下文。例如,考虑下面的代码片段

    ExecutionContext ecStep = stepExecution.getExecutionContext();
    ExecutionContext ecJob = jobExecution.getExecutionContext();
    

    ecStep不等于ecJob。它们是两个不同的执行上下文。
    作用域为step的元素将在该step的每个提交点保存,而作用域为Job则保存在
    存在每一步step执行之间.

    关于springbatch 数据库表可以在maven 引入

             <dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-batch</artifactId>
    		</dependency>
    
    引入后就可以在下面路径下可以找个支持很多种数据库的sql
      org/springframework/batch/core/
    

    在这里插入图片描述
    待更新。。。。。。。。。。。

    参考:

    https://docs.spring.io/spring-batch/4.1.x/reference/pdf/spring-batch-reference.pdf

    http://www.importnew.com/26177.html

    https://kimmking.gitbooks.io/springbatchreference/content/01_introduction/12.html

    展开全文
  • SpringBoot集成Spring Batch批处理框架入门案例实战

    Spring Batch 简介

    spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。

    Spring Batch是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序。 Spring Batch构建了人们期望的Spring Framework特性(生产力,基于POJO的开发方法和一般易用性),同时使开发人员可以在必要时轻松访问和利用更高级的企业服务。 Spring Batch不是一个schuedling的框架。

    spring batch的一个总体的架构如下

    Figure 2.1: Batch Stereotypes
    在spring batch中一个job可以定义很多的步骤step,在每一个step里面可以定义其专属的ItemReader用于读取数据,ItemProcesseor用于处理数据,ItemWriter用于写数据,而每一个定义的job则都在JobRepository里面,我们可以通过JobLauncher来启动某一个job。

    step数据流

    数据输入源(来源于文件,数据库等) --> ItemReader --> ItemProcessor --> ItemWriter --> 数据输出到文件、数据库等

     

    实战案例

    场景描述: 现在需要将一个存储了几百万条用户信息的 data.csv 文件导入到系统的用户表sys_user中, data.csv文件中每一行代表一条用户信息,用户信息字段之间用制表符\t分隔

    数据源文件

    data.csv文件内容

    姓名	性别	年龄	地址
    张三	男	12	深圳
    李四	男	32	广州
    王雪	女	21	上海
    孙云	女	23	北京
    赵柳	女	42	成都
    孙雪	女	15	武汉
    

    文件存放路径: resources/data.csv

    maven jar包依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    

    spring batch配置

    spring:
      batch:
        # 在项目启动时进行执行建表sql
        initialize-schema: always
        job:
          # 禁止Spring Batch自动执行,既需要用户触发才能执行
          enabled: false
          names: parentjob
        # spring batch相关表前缀, 默认为 batch_
        table-prefix: batch_
    
      datasource:
        # 项目启动时的建表sql脚本,该脚本由Spring Batch提供
        schema: classpath:/org/springframework/batch/core/schema-mysql.sql
    

    项目启动后,会在系统对应的数据库中新建以batch开头的Spring Batch 相关表

    • batch_job_execution: 表示Job执行的句柄(一次执行)
    • batch_job_execution_params: 通过Job参数区分不同的Job实例,实际使用hashMap存储参数(仅4种数据类型)
    • batch_job_instance: 作业实例,一个运行期概念(一次执行关联一个实例)
    • batch_job_execution_seq
    • batch_job_seq
    • batch_step_execution: 执行上下文,在job/Step执行时保存需要进行持久化的状态信息。
    • batch_job_execution_context: 执行上下文,在job/Step执行时保存需要进行持久化的状态信息。
    • batch_step_execution_context
    • batch_step_execution_seq

    用户实体对象

    @Data
    public class User {
        private String userName;
        private String sex;
        private Integer age;
        private String address;
        private Byte status;
        private Date createTime;
    }
    

    Spring Batch配置类

    import org.springframework.batch.core.*;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.batch.item.validator.Validator;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    
    import javax.sql.DataSource;
    import java.io.FileNotFoundException;
    
    @Configuration
    @EnableBatchProcessing
    public class CsvBatchJobConfig {
    
        // 用来读取数据
        @Bean
        public ItemReader<User> reader() {
            // FlatFileItemReader是一个用来加载文件的itemReader
            FlatFileItemReader<User> reader = new FlatFileItemReader<>();
            // 跳过第一行的标题
            reader.setLinesToSkip(1);
            // 设置csv的位置
            reader.setResource(new ClassPathResource("data.csv"));
            // 设置每一行的数据信息
            reader.setLineMapper(new DefaultLineMapper<User>(){{
                setLineTokenizer(new DelimitedLineTokenizer(){{
                    // 配置了四行文件
                    setNames(new String[]{"userName","sex","age", "address"});
                    // 配置列于列之间的间隔符,会通过间隔符对每一行进行切分
                    setDelimiter("\t");
                }});
    
                // 设置要映射的实体类属性
                setFieldSetMapper(new BeanWrapperFieldSetMapper<User>(){{
                    setTargetType(User.class);
                }});
            }});
            return reader;
        }
    
        // 用来处理数据
        @Bean
        public ItemProcessor<User,User> processor(){
            // 使用我们自定义的ItemProcessor的实现CsvItemProcessor
            CsvItemProcessor processor = new CsvItemProcessor();
            // 为processor指定校验器为CsvBeanValidator()
            processor.setValidator(csvBeanValidator());
            return processor;
        }
    
    
        // 用来输出数据
        @Bean
        public ItemWriter<User> writer(@Qualifier("dataSource") DataSource dataSource) {
            // 通过Jdbc写入到数据库中
            JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
            writer.setDataSource(dataSource);
            // setItemSqlParameterSourceProvider 表示将实体类中的属性和占位符一一映射
            writer.setItemSqlParameterSourceProvider(
                    new BeanPropertyItemSqlParameterSourceProvider<>());
    
            // 设置要执行批处理的SQL语句。其中占位符的写法是 `:属性名`
            writer.setSql("insert into sys_user(user_name, sex, age, address, status, create_time) " +
                    "values(:userName, :sex, :age, :address, :status, :createTime)");
            return writer;
        }
    
    
        // 配置一个Step
        @Bean
        public Step csvStep(
                StepBuilderFactory stepBuilderFactory,
                ItemReader<User> reader,
                ItemProcessor<User,User> processor,
                ItemWriter<User> writer) {
    
            return stepBuilderFactory.get("csvStep")
                    // 批处理每次提交5条数据
                    .<User, User>chunk(5)
                    // 给step绑定 reader
                    .reader(reader)
                    // 给step绑定 processor
                    .processor(processor)
                    // 给step绑定 writer
                    .writer(writer)
                    .faultTolerant()
                    // 设定一个我们允许的这个step可以跳过的异常数量,假如我们设定为3,则当这个step运行时,只要出现的异常数目不超过3,整个step都不会fail。注意,若不设定skipLimit,则其默认值是0
                    .skipLimit(3)
                    // 指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的
                    .skip(Exception.class)
                    // 出现这个异常我们不想跳过,因此这种异常出现一次时,计数器就会加一,直到达到上限
                    .noSkip(FileNotFoundException.class)
                    .build();
        }
    
        /**
         * 配置一个要执行的Job任务, 包含一个或多个Step
         */
        @Bean
        public Job csvJob(JobBuilderFactory jobBuilderFactory, Step step) {
            // 为 job 起名为 csvJob
            return jobBuilderFactory.get("csvJob")
                    .start(step)
    //                .next(step)
    				.listener(listener())
                    .build();
        }
    
        @Bean
        public Validator<User> csvBeanValidator(){
            return new CsvBeanValidator<>();
        }
    
    	@Bean
        public JobExecutionListener listener() {
            return new JobCompletionListener();
        }
    }
    

    自定义校验器

    import org.springframework.batch.item.validator.ValidatingItemProcessor;
    import java.util.Date;
    
    public class CsvItemProcessor extends ValidatingItemProcessor<User> {
        @Override
        public User process(User item) {
            super.process(item);
    
            // 对数据进行简单的处理,若性别为男,则数据转换为1,其余转换为2
            if (item.getSex().equals("男")) {
                item.setSex("1");
            } else {
                item.setSex("2");
            }
    
            // 设置默认值
            item.setStatus((byte) 1);
            item.setCreateTime(new Date());
            return item;
        }
    }
    

    数据校验类

    import org.springframework.batch.item.validator.ValidationException;
    import org.springframework.batch.item.validator.Validator;
    import org.springframework.beans.factory.InitializingBean;
    
    import javax.validation.ConstraintViolation;
    import javax.validation.Validation;
    import javax.validation.ValidatorFactory;
    import java.util.Set;
    
    public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {
    
        private javax.validation.Validator validator;
    
        @Override
        public void validate(T value) throws ValidationException {
            // 使用Validator的validate方法校验数据
            Set<ConstraintViolation<T>> constraintViolations =
                    validator.validate(value);
    
            if (constraintViolations.size() > 0) {
                StringBuilder message = new StringBuilder();
                for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                    message.append(constraintViolation.getMessage() + "\n");
                }
                throw new ValidationException(message.toString());
            }
        }
    
        /**
         * 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            ValidatorFactory validatorFactory =
                    Validation.buildDefaultValidatorFactory();
            validator = validatorFactory.usingContext().getValidator();
        }
    }
    

    批处理监听类

    public class JobCompletionListener extends JobExecutionListenerSupport {
    	// 用于批处理开始前执行
        @Override
        public void beforeJob(JobExecution jobExecution) {
            System.out.println(String.format("任务id=%s开始于%s", jobExecution.getJobId(), jobExecution.getStartTime()));
        }
    
    	// 用于批处理开始后执行
        @Override
        public void afterJob(JobExecution jobExecution) {
            if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
                System.out.println(String.format("任务id=%s结束于%s", jobExecution.getJobId(), jobExecution.getEndTime()));
            } else {
                System.out.println(String.format("任务id=%s执行异常状态=%s", jobExecution.getJobId(),  jobExecution.getStatus()));
            }
        }
    
    }
    

    执行批处理接口

    @RestController
    public class JobController {
        @Autowired
        private JobLauncher jobLauncher;
        @Autowired
        private Job job;
    
        @GetMapping("/doJob")
        public void doJob() {
            try {
                // 同一个job执行多次, 由于job定义一样, 则无法区分jobInstance, 所以增加jobParameter用于区分
                JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
                jobParametersBuilder.addDate("jobDate", new Date());
    
                // 执行一个批处理任务
                jobLauncher.run(job, jobParametersBuilder.toJobParameters());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    调用接口执行批处理: http://localhost:8080/doJob

    数据库建表语句

    CREATE TABLE `sys_user` (
      `id` bigint(18) NOT NULL AUTO_INCREMENT COMMENT '主键id',
      `user_name` varchar(55) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '姓名',
      `sex` tinyint(1) DEFAULT NULL COMMENT '性别',
      `age` int(5) DEFAULT NULL COMMENT '年龄',
      `address` varchar(255) DEFAULT NULL COMMENT '地址',
      `status` tinyint(4) DEFAULT NULL COMMENT '状态',
      `create_time` datetime DEFAULT NULL COMMENT '创建时间',
      `update_time` datetime DEFAULT NULL COMMENT '更新时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;
    

     

    Reference

    展开全文
  • 内如主要来自以下链接: http://www.importnew.com/26177.html ... spring batch批处理框架(书名)非常推荐!!!写得非常好!!! 一.spring batch介绍 什么是批...

    内如主要来自以下链接:

    1. http://www.importnew.com/26177.html
    2. http://www.infoq.com/cn/articles/analysis-of-large-data-batch-framework-spring-batch
    3. spring batch批处理框架(书名)非常推荐!!!写得非常好!!!

    一.spring batch介绍

    什么是批处理:

    在信息系统中,联机和批处理是计算机处理的两种基本模式。同联机模式汗牛充栋的著作、框架相比,批处理抽象模式的抽象不多,著名的MapReduce就是其中之一。spring batch将批处理程序分解为job和job step两个部分,将处理环节定义为数据读、数据处理和数据写三个步骤,将异常处理机制归结为跳过、重试和重启三种类型,将作业方式区分为多线程、并行、远程和分区四大特征。下图是一个典型的批处理应用:

    调用spring batch的几种方式:可以通过web、命令行和调度框架,方式为同步(等待批处理完成)和异步调用。

    二.spring batch三层架构:应用层、核心层和基础架构层

    其中几个关键词的描述如下:

    三.spring batch批处理框架

    spring batch框架如上图,框架一共有4个主要角色:JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。Job代表着一个具体的任务。Step代表着一个具体的步骤,一个Job可以包含多个Step(想象把大象放进冰箱这个任务需要多少个步骤你就明白了)。JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。

    • JobLauncher

    JobLauncher是任务启动器,该接口只有一个run方法。除了传入Job对象之外,还需要传入JobParameters对象,后续讲到Job再解释为什么要多传一个JobParameters。通过JobLauncher可以在Java程序中调用批处理任务,也可以通过命令行或者其他框架(如定时调度框架Quartz、Web后台框架Spring MVC)中调用批处理任务。Spring Batch框架提供了一个JobLauncher的实现类SimpleJobLauncher。

    • Job

    Job代表着一个任务,一个Job与一个或者多个Job Instance相关联,而一个Job Instance又与一个或者多个Job Execution相关联:

    考虑到任务可能不是只执行一次就再也不执行了,更多的情况可能是定时任务,如每天执行一次,每个星期执行一次等等,那么为了区分每次执行的任务,框架使用了JobInstance。如上图所示,Job是一个EndOfDay(每天最后时刻执行的任务),那么其中一个JobInstance就代表着2007年5月5日那天执行的任务实例。框架通过在执行JobLauncher.run(Job, JobParameters)方法时传入的JobParameters来区分是哪一天的任务。由于2007年5月5日那天执行的任务可能不会一次就执行完成,比如中途被停止,或者出现异常导致中断,需要多执行几次才能完成,所以框架使用了JobExecution来表示每次执行的任务。

    • Step

    一个Job任务可以分为几个Step步骤,与JobExection相同,每次执行Step的时候使用StepExecution来表示执行的步骤。每一个Step还包含着一个ItemReader、ItemProcessor、ItemWriter,下面分别介绍这三者。

    1. ItemReader:ItemReader代表着读操作,框架已经提供了多种ItemReader接口的实现类,包括对文本文件、XML文件、数据库、JMS消息等读的处理,当然我们也可以自己实现该接口。下图:FlatFileItemReader将Flat文件中的记录转换为java对象。对应写的话,就是将java对象转化为记录。

    1. ItemProcessor:ItemReader代表着处理操作,process方法的形参传入I类型的对象,通过处理后返回O型的对象。开发者可以实现自己的业务代码来对数据进行处理。
    2. ItemWriter:ItemReader代表着写操作,框架已经提供了多种ItemWriter接口的实现类,包括对文本文件、XML文件、数据库、JMS消息等写的处理,当然我们也可以自己实现该接口。

    FlatFileItemReader的组件如下图:

    lines to skip:需要跳过多少行,一般文件的一开始,第一行为标题,可以设置跳过第一行。

    line mapper:将一行转换为一个java对象,主要依赖两个组件:linetokenizer-将一行拆分成多个字段;fieldsetmapper-将字段值构造成一个java对象。

    StaxEventItemReader 提供了从XML输入流进行记录处理的典型设置。

    按照面向chunk的操作,如果提交间隔(commit-interval)是3次,那么读操作被调用3次,写操作被调用1次。

    支持非顺序的step,如下图:

     

    • JobRepository

    JobRepository用于存储任务执行的状态信息,比如什么时间点执行了什么任务、任务执行结果如何等等。框架提供了2种实现,一种是通过Map形式保存在内存中,当Java程序重启后任务信息也就丢失了,并且在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,并且将数据保存在下面6张表里,包括Job、Step的实例,上下文,执行器信息,为后续的监控、重启、重试、状态恢复等提供了可能:

    1. BATCH_JOB_INSTANCE:作业实例表,用于存放Job的实例信息
    2. BATCH_JOB_EXECUTION_PARAMS:作业参数表,用于存放每个Job执行时候的参数信息,该参数实际对应Job实例的。
    3. BATCH_JOB_EXECUTION:作业执行器表,用于存放当前作业的执行信息,比如创建时间,执行开始时间,执行结束时间,执行的那个Job实例,执行状态等。
    4. BATCH_STEP_EXECUTION:作业步执行器表,用于存放每个Step执行器的信息,比如作业步开始执行时间,执行完成时间,执行状态,读写次数,跳过次数等信息。
    5. BATCH_JOB_EXECUTION_CONTEXT:作业执行上下文表,用于存放作业执行器上下文的信息。
    6. BATCH_STEP_EXECUTION_CONTEXT:作业步执行上下文表,用于存放每个作业步上下文的信息。

    这6张表的创建脚本可以从以下图中得到:

    因为我的数据库是oracle,所以选下图红框中的,打开就可以看到数据库的创建脚本

    四.作业的健壮性和扩展性

    批处理要求Job必须有较强的健壮性,通常Job是批量处理数据、无人值守的,这要求在Job执行期间能够应对各种发生的异常、错误,并对Job执行进行有效的跟踪。一个健壮的Job通常需要具备如下的几个特性:

    1. 容错性:在Job执行期间非致命的异常,Job执行框架应能够进行有效的容错处理,而不是让整个Job执行失败;通常只有致命的、导致业务不正确的异常才可以终止Job的执行。
    2. 可追踪性:Job执行期间任何发生错误的地方都需要进行有效的记录,方便后期对错误点进行有效的处理。例如在Job执行期间任何被忽略处理的记录行需要被有效的记录下来,应用程序维护人员可以针对被忽略的记录后续做有效的处理。
    3. 可重启性:Job执行期间如果因为异常导致失败,应该能够在失败的点重新启动Job;而不是从头开始重新执行Job。

    框架提供了支持上面所有能力的特性,包括Skip(跳过记录处理)、Retry(重试给定的操作)、Restart(从错误点开始重新启动失败的Job):

    1. Skip:在对数据处理期间,如果数据的某几条的格式不能满足要求,可以通过Skip跳过该行记录的处理,让Processor能够顺利的处理其余的记录行。
    2. Retry:将给定的操作进行多次重试,在某些情况下操作因为短暂的异常导致执行失败,如网络连接异常、并发处理异常等,可以通过重试的方式避免单次的失败,下次执行操作时候网络恢复正常,不再有并发的异常,这样通过重试的能力可以有效的避免这类短暂的异常。
    3. Restart:在Job执行失败后,可以通过重启功能来继续完成Job的执行。在重启时候,批处理框架允许在上次执行失败的点重新启动Job,而不是从头开始执行,这样可以大幅提高Job执行的效率。但仍需要保证job instance的状态不能为completed。

    Spring Batch Admin中有一系列工具JobService,用以取消正在进行执行的任务。

     五.线程安全性

     

    转载于:https://www.cnblogs.com/fxl-njfu/p/8405686.html

    展开全文
  • Spring Batch 批处理框架》全面、系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的架构设计、源码做了特定的剖析;在帮助读者掌握...
  • springbatch书籍批处理加课程源码,很全,可以给大家学习
  • spring batch 批处理框架(一) 一、spring batch是什么: SpringBatch 是一个大数据量的并行处理框架。通常用于数据的离线迁移,和数据处理,⽀持事务、并发、流程、监控、纵向和横向扩展,提供统⼀的接⼝管理和...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,172
精华内容 1,668
关键字:

springbatch批处理框架

spring 订阅