批处理的概念是将当前执行的工作卸载到后台的过程。这允许执行大量的异步流程引擎命令而不会阻塞实例。它还对单独命令调用进行了分离。

例如,流程实例迁移 命令可以使用批处理执行。 这样可以异步迁移流程实例。 在同步流程实例迁移中,所有迁移都在单个事务中执行。 首先,这需要成功提交事务。 对于大量流程实例,事务可能变得太大,甚至无法提交到数据库。 随着使用批处理迁移,这情况发生了变化。 批处理以较小的块执行迁移,每个块使用单个事务。

优点:

  • 异步(非阻塞)执行
  • 可以使用多线程和Job执行器处理执行
  • 执行解耦,即每个批处理执行Job使用自己的事务

缺点:

  • 手动轮询以完成批处理
  • 与流程引擎执行的其他Job的争用
  • 当部分子集已经执行时,批处理可能会出现部分失败的情况,例如,某些流程实例被迁移而其他流程实例迁移失败

从技术上讲,批处理表示在流程引擎的上下文中执行命令的一组Job。

批处理利用流程引擎的Job执行器来执行批处理Job。单个批处理包含三种Job类型:

  • 种子Job(Seed job):创建所有批处理Job的Job
  • 执行Job(Execution jobs):批处理命令的实际执行,例如流程实例迁移
  • 监控Job(Monitor job):种子Job完成后,监控批处理执行和完成的进度

API

下面概述了用于批处理的 Java API。

创建一个批处理

批处理是通过异步执行流程引擎命令创建的。

当前支持的批处理类型:

Java API 可用于创建批处理命令,具体使用示例请参考具体命令。

查询批处理

你可以通过 id 和类型查询正在运行的批处理,例如查询所有正在运行的流程实例迁移批处理:

List<Batch> migrationBatches = processEngine.getManagementService()
  .createBatchQuery()
  .type(Batch.TYPE_PROCESS_INSTANCE_MIGRATION)
  .list();

批处理统计数据

你可以通过管理服务(ManagementService)查询批处理统计信息。 批处理统计信息将包含有关剩余的、已完成的和失败的批处理执行Job的信息。

List<BatchStatistics> migrationBatches = processEngine.getManagementService()
  .createBatchStatisticsQuery()
  .type(Batch.TYPE_PROCESS_INSTANCE_MIGRATION)
  .list();

批处理的历史

历史等级FULL 时,会创建一个历史批处理条目。你可以使用历史服务(HistoryService)查询它。

HistoricBatch historicBatch = processEngine.getHistoryService()
  .createHistoricBatchQuery()
  .batchId(batch.getId())
  .singleResult();

历史记录还包含种子、监控和执行Job的Job日志条目。 你可以通过某个Job定义 id 查询相应的Job日志条目。

HistoricBatch historicBatch = ...

List<HistoricJobLog> batchExecutionJobLogs = processEngine.getHistoryService()
  .createHistoricJobLogQuery()
  .jobDefinitionId(historicBatch.getBatchJobDefinitionId())
  .orderByTimestamp()
  .list();

你可以设置对已完成的历史批处理操作的历史清理配置。

暂停批处理

要暂停批处理和所有相应Job的执行,可以使用管理服务。

processEngine.getManagementService()
  .suspendBatchById("myBatch");

然后可以再次激活暂停的批处理,同样使用管理服务。

processEngine.getManagementService()
  .activateBatchById("myBatch");

删除批处理

可以使用管理服务删除正在运行的批处理。

// 删除批处理,保留批处理的历史记录
processEngine.getManagementService()
  .deleteBatch("myBatch", false);

// 删除批处理包括批处理的历史记录
processEngine.getManagementService()
  .deleteBatch("myBatch", true);

可以使用历史服务删除历史批处理。

processEngine.getHistoryService()
  .deleteHistoricBatch("myBatch");

对于仍在执行Job的正在运行的批处理,建议在删除之前暂停该批处理。 有关详细信息,请参阅 暂停批处理 部分。

批处理的优先级

由于所有批处理Job都是使用Job执行器执行的,因此可以使用 Job优先级 功能来调整批处理Job的优先级。 默认批处理Job优先级由流程引擎配置项 batchJobPriority 设置。

可以使用管理服务调整特定批处理的 Job定义 甚至单个批处理 Job 的优先级。

Batch batch = ...;

String batchJobDefinitionId = batch.getBatchJobDefinitionId();

processEngine.getManagementService()
  .setOverridingJobPriorityForJobDefinition(batchJobDefinitionId, 100, true);

操作日志

请注意,用户操作日志仅针对 Batch 创建本身而写入,种子Job的执行以及执行操作的单个Job均由 Job执行器 执行,因此不被视为用户操作。

Job 定义

种子Job

批处理最初会创建一个种子Job。 该种子将被重复执行以创建所有批处理执行Job。 例如,如果用户为 1000 个流程实例启动 流程实例迁移批处理。 使用默认流程引擎配置时,种子Job将在每次调用时创建 10 个批处理执行Job。 每个执行Job将迁移 1 个流程实例。 总之,种子Job将被调用 100 次,直到它创建了完成批处理所需的 1000 个执行Job。

可以使用 Java API 获取批处理种子Job的Job定义:

Batch batch = ...;

JobDefinition seedJobDefinition = processEngine.getManagementService()
  .createJobDefinitionQuery()
  .jobDefinitionId(batch.getSeedJobDefinitionId())
  .singleResult();

要暂停创建更多批处理执行Job,可以使用管理服务暂停种子Job定义:

processEngine.getManagementService()
  .suspendJobByJobDefinitionId(seedJobDefinition.getId());

执行Job

一个批处理的执行被分成几个执行Job。 具体Job数取决于批处理的总Job数和流程引擎配置(参见种子Job)。 每个执行Job针对给定数量的调用执行实际的批处理命令,例如,迁移多个流程实例。执行Job将由Job执行器执行。 它们的行为与其他Job一样,这意味着它们可能会失败并且Job执行器将 重试 失败的批处理执行Job。 此外,对于没有重试的失败的批处理执行Job,将产生 事件

Java API 可用于获取批处理的执行Job的Job定义,例如,对于 流程实例迁移批处理

Batch batch = ...;

JobDefinition executionJobDefinition = processEngine.getManagementService()
  .createJobDefinitionQuery()
  .jobDefinitionId(batch.getBatchJobDefinitionId())
  .singleResult();

要批量暂停批处理Job的执行,可以使用管理服务暂停批处理Job定义:

processEngine.getManagementService()
  .suspendJobByJobDefinitionId(executionJobDefinition.getId());

监控Job

种子job 创建所有批处理执行Job后,将为批处理创建一个监控Job。 此Job定期轮询批处理是否已完成,即所有批处理执行Job是否已完成。 轮询间隔可以通过流程引擎配置batchPollTime(默认:30秒)属性进行配置。

Java API 可用于获取批处理的监控Job的Job定义:

Batch batch = ...;

JobDefinition monitorJobDefinition = processEngine.getManagementService()
  .createJobDefinitionQuery()
  .jobDefinitionId(batch.getMonitorJobDefinitionId())
  .singleResult();

为防止批处理完成,即删除运行时数据,可以使用管理服务暂停监视Job定义:

processEngine.getManagementService()
  .suspendJobByJobDefinitionId(monitorJobDefinition.getId());

配置

You can configure the number of jobs created by every seed job invocation batchJobsPerSeed (default: 100) and the number of invocations per batch execution job invocationsPerBatchJob (default: 1) in the process engine configuration. 你可以在流程引擎配置中配置每个种子Job的调用创建Job数量batchJobsPerSeed(默认:100)和每个批量执行Job的调用数量invocationsPerBatchJob(默认:1)

通过使用流程引擎配置项 invocationsPerBatchJobByBatchType,可以为每个批处理操作类型单独更改每个批处理执行Job的调用次数。 如果你没有按类型指定每个批处理Job的调用,配置将回退到通过 invocationsPerBatchJob 指定的全局配置。

你可以通过三种方式配置该属性:

  1. 通过 流程引擎插件 编程
  2. 在 Spring-based 环境中通过 Spring XML 配置文件

    <bean id="processEngineConfiguration" 
          class="org.camunda.bpm.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
        
      <!-- ... -->
        
      <property name="invocationsPerBatchJobByBatchType">
        <map>
          <entry key="process-set-removal-time" value="10" />
          <entry key="historic-instance-deletion" value="3" />
            
          <!-- 在自定义批处理操作的情况下 -->
          <entry key="my-custom-operation" value="7" />
        </map>
      </property>
    </bean>
    
  3. 在 Spring Boot 环境中通过 application.yaml 文件

    camunda.bpm.generic-properties.properties:
      invocations-per-batch-job-by-batch-type:
        process-set-removal-time:     10
        historic-instance-deletion:   3
        my-custom-operation:          7  # 在自定义批处理操作的情况下
    

目录: