您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

使用SpringBatch读取csv文件

时间:2022-08-31 15:53:45  来源:  作者:huan1993

1、需求

系统每日从某个固定的目录中读取csv文件,并在控制台上打印。

2、解决方案

要解决上述需求,可以使用的方法有很多,此处选择使用Spring Batch来实现。

3、注意事项

1、文件路径的获取

此处简单处理,读取 JobParameters 中的日期,然后构建一个文件路径,并将文件路径放入到 ExecutionContext中。此处为了简单,文件路径会在程序中写死,但是同时也会将文件路径存入到 ExecutionContext 中,并且在具体的某个Step中从ExecutionContext中获取路径。

注意:
ExecutionContext中存入的数据虽然在各个Step中都可以获取到,但是不推荐存入比较大的数据到ExecutionContext中,因为这个对象的数据需要存入到数据库中。

2、各个Step如果获取到ExecutionContext中的值

  1. 类上加入 @StepScope 注解
  2. 通过 @Value("#{jobExecutionContext['importPath']}") 来获取

eg:

@Bean
@StepScope
public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
    // 读取数据
    return new FlatFileItemReaderBuilder<Person>()
            .name("read-csv-file")
            .resource(new ClassPathResource(importPath))
            .delimited().delimiter(",")
            .names("username", "age", "sex")
            .fieldSetMApper(new RecordFieldSetMapper<>(Person.class))
            .build();
}

解释:在程序实例化FlatFileItemReader的时候,此时是没有jobExecutionContext的,那么就会报错,如果加上@StepScope,此时就没有问题了。@StepScope表示到达Step阶段才实例化这个Bean

3、FlatFileItemReader使用注意

当我们使用FlatFileItemReader来读取我们的csv文件时,此处需要返回 FlatFileItemReader类型,而不能直接返回ItemReader,否则可能出现如下错误 Reader must be open before it can be read

4、实现步骤

1、导入依赖,配置

1、导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
</dependencies>

2、初始化SpringBatch数据库

spring.datasource.username=root
spring.datasource.password=root@1993
spring.datasource.url=jdbc:MySQL://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# 程序启动时,默认不执行job
spring.batch.job.enabled=false
spring.batch.jdbc.initialize-schema=always
# 初始化spring-batch数据库脚本
spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql

2、构建文件读取路径

此处我的想法是,在JobExecutionListener中完成文件路径的获取,并将之放入到ExecutionContext,然后在各个Step中就可以获取到文件路径的值了。

/**
 * 在此监听器中,获取到具体的需要读取的文件路径,并保存到 ExecutionContext
 *
 * @author huan.fu
 * @date 2022/8/30 - 22:22
 */
@Slf4j
public class AssemblyReadCsvPathListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        ExecutionContext executionContext = jobExecution.getExecutionContext();
        JobParameters jobParameters = jobExecution.getJobParameters();
        String importDate = jobParameters.getString("importDate");
        log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate);
        String readCsvPath = "data/person.csv";
        log.info("根据日期组装需要读取的csv路径为:[{}],此处排除日期,直接写一个死的路径", readCsvPath);
        executionContext.putString("importPath", readCsvPath);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {

    }
}

3、构建Tasklet,输出文件路径

@Slf4j
@Component
@StepScope
public class PrintImportFilePathTaskLet implements Tasklet {

    @Value("#{jobExecutionContext['importPath']}")
    private String importFilePath;

    @Value("#{jobParameters['importDate']}")
    private String importDate;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        log.info("从job parameter 中获取到的 importDate:[{}],从 jobExecutionContext 中获取的 importPath:[{}]",
                importDate, importFilePath);

        return RepeatStatus.FINISHED;
    }
}

需要注意的是,此类上加入了 @StepScope注解

4、编写实体类

@AllArgsConstructor
@Getter
@ToString
public class Person {
    /**
     * 用户名
     */
    private String username;
    /**
     * 年龄
     */
    private Integer age;
    /**
     * 性别
     */
    private String sex;
}

5、编写Job配置

@Configuration
@AllArgsConstructor
@Slf4j
public class ImportPersonJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final PrintImportFilePathTaskLet printImportFilePathTaskLet;
    private final ItemReader<Person> readCsvItemReader;

    @Bean
    public Job importPersonJob() {
        // 获取一个job builder, jobName可以是不存在的
        return jobBuilderFactory.get("import-person-job")
                // 添加job execution 监听器
                .listener(new AssemblyReadCsvPathListener())
                // 打印 job parameters 和 ExecutionContext 中的值
                .start(printParametersAndContextVariables())
                // 读取csv的数据并处理
                .next(handleCsvFileStep())
                .build();
    }

    /**
     * 读取数据
     * 注意:此处需要返回 FlatFileItemReader类型,而不要返回ItemReader
     * 否则可能报如下异常 Reader must be open before it can be read
     *
     * @param importPath 文件路径
     * @return reader
     */
    @Bean
    @StepScope
    public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
        // 读取数据
        return new FlatFileItemReaderBuilder<Person>()
                .name("read-csv-file")
                .resource(new ClassPathResource(importPath))
                .delimited().delimiter(",")
                .names("username", "age", "sex")
                .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
                .build();
    }

    @Bean
    public Step handleCsvFileStep() {

        // 每读取一条数据,交给这个处理
        ItemProcessor<Person, Person> processor = item -> {
            if (item.getAge() > 25) {
                log.info("用户[{}]的年龄:[{}>25]不处理", item.getUsername(), item.getAge());
                return null;
            }
            return item;
        };

        // 读取到了 chunk 大小的数据后,开始执行写入
        ItemWriter<Person> itemWriter = items -> {
            log.info("开始写入数据");
            for (Person item : items) {
                log.info("{}", item);
            }
        };

        return stepBuilderFactory.get("handle-csv-file")
                // 每读取2条数据,执行一次write,当每read一条数据后,都会执行process
                .<Person, Person>chunk(2)
                // 读取数据
                .reader(readCsvItemReader)
                // 读取一条数据就开始处理
                .processor(processor)
                // 当读取的数据的数量到达 chunk 时,调用该方法进行处理
                .writer(itemWriter)
                .build();
    }

    /**
     * 打印 job parameters 和 ExecutionContext 中的值
     * <p>
     * TaskletStep是一个非常简单的接口,仅有一个方法——execute。
     * TaskletStep会反复的调用这个方法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异常。
     * 所有的Tasklet调用都会包装在一个事物中。
     *
     * @return Step
     */
    private Step printParametersAndContextVariables() {
        return stepBuilderFactory.get("print-context-params")
                .tasklet(printImportFilePathTaskLet)
                // 当job重启时,如果达到了3此,则该step不在执行
                .startLimit(3)
                // 当job重启时,如果该step的是已经处理完成即COMPLETED状态时,下方给false表示该step不在重启,即不在执行
                .allowStartIfComplete(false)
                // 添加 step 监听
                .listener(new CustomStepExecutionListener())
                .build();
    }
}

6、编写Job启动类

@Component
@Slf4j
public class StartImportPersonJob {

    @Autowired
    private Job importPersonJob;
    @Autowired
    private JobLauncher jobLauncher;

    @PostConstruct
    public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd")))
                .toJobParameters();
        JobExecution execution = jobLauncher.run(importPersonJob, jobParameters);
        log.info("job invoked");
    }
}

7、自动配置SpringBatch

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchReadCsvApplication {

    public static void mAIn(String[] args) {
        SpringApplication.run(SpringBatchReadCsvApplication.class, args);
    }
}

主要是 @EnableBatchProcessing 注解

5、执行结果

 

执行结果

6、完整代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv



Tags:SpringBatch   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
使用SpringBatch读取csv文件
1、需求系统每日从某个固定的目录中读取csv文件,并在控制台上打印。2、解决方案要解决上述需求,可以使用的方法有很多,此处选择使用Spring Batch来实现。3、注意事项1、文件路...【详细内容】
2022-08-31  Search: SpringBatch  点击:(484)  评论:(0)  加入收藏
▌简易百科推荐
Qt与Flutter:在跨平台UI框架中哪个更受欢迎?
在跨平台UI框架领域,Qt和Flutter是两个备受瞩目的选择。它们各自具有独特的优势,也各自有着广泛的应用场景。本文将对Qt和Flutter进行详细的比较,以探讨在跨平台UI框架中哪个更...【详细内容】
2024-04-12  刘长伟    Tags:UI框架   点击:(1)  评论:(0)  加入收藏
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(8)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(17)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(54)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(47)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(39)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(51)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(68)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(88)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(86)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条