Skip to content

线程池的配置与使用

配置

  • corePoolSize : 核心线程数,一旦创建将不会再释放。
  • maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。
  • keepAliveSeconds : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。
  • handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序

示例:

sql
package com.greatonce.tuya.biz.impl;

@EnableCaching
@Configuration
public class BizConfiguration {

  @Value("${tuya.biz.task.corePoolSize:10}")
  private int corePoolSize;
  @Value("${tuya.biz.task.maxPoolSize:20}")
  private int maxPoolSize;
  @Value("${tuya.biz.task.queueCapacity:50}")
  private int queueCapacity;
  @Value("${tuya.biz.task.keepAliveSeconds:60}")
  private int keepAliveSeconds;

 @Bean
public ThreadPoolExecutorFactoryBean bizExecutor() {
  ThreadPoolExecutorFactoryBean factoryBean = new ThreadPoolExecutorFactoryBean();
  // 核心线程数,一直存活,图鸦中设置为1
  factoryBean.setCorePoolSize(corePoolSize);
  // 当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。
  // 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
  // 图鸦中设置为10
  factoryBean.setMaxPoolSize(maxPoolSize);
  // 任务队列容量,图鸦中设置为20
  factoryBean.setQueueCapacity(queueCapacity);
  // 当线程空闲时间达到setKeepAliveSeconds,该线程会退出,直到线程数量等于corePoolSize。
  // 图鸦中设置为60
  factoryBean.setKeepAliveSeconds(keepAliveSeconds);
  factoryBean.setThreadNamePrefix("biz-task");
  //(1) 默认的ThreadPoolExecutor.AbortPolicy   处理程序遭到拒绝将抛出运行时RejectedExecutionException;
  //(2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
  //(3) ThreadPoolExecutor.DiscardPolicy  不能执行的任务将被删除;
  //(4) ThreadPoolExecutor.DiscardOldestPolicy  如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
  factoryBean.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  return factoryBean;
}

使用

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

示例:

java
@Autowired
private ExecutorService executorService;

/**
 * 立即投放
 */
@Override
public void goPublish(PublishProduct publishProduct) {
  Store store = storeService.getByKey(publishProduct.getStoreId());
  String publishSetting = publishProduct.getPublishSetting();
  List<PublishProductImage> publishProductImages = publishProduct.getPublishProductImages();
  CountDownLatch countDownLatch = new CountDownLatch(publishProductImages.size());
  publishProductImages.forEach(publishProductImage -> {
    executorService.execute(() -> {
      try {
        publishProductImageService.publish(store, publishProductImage, publishSetting);
      } catch (Exception e) {
        publishLOGGER.error("投放图片失败", e);
      } finally {
        countDownLatch.countDown();
      }
    });
  });
  try {
    countDownLatch.await();
    publishAfter(publishProduct);
  } catch (InterruptedException e) {
    publishLOGGER.error("服务异常中断", e);
  }
}

@Async

  1. 启用异步, 在启动类上标注@EnableAsync
  2. 配置对应的业务线程池, 示例代码如下
java
/**
   * 默认的业务线程池.
   */
  @Bean
  public AsyncTaskExecutor bizExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(bizCorePoolSize);
    executor.setMaxPoolSize(bizMaxPoolSize);
    executor.setQueueCapacity(bizQueueCapacity);
    executor.setThreadNamePrefix("oms-biz-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setKeepAliveSeconds(bizKeepAliveSeconds);
    return executor;
  }
  1. 使用注解并指定线程池, 示例代码如下
java
@Override
  @Async("bizExecutor")
  public void asyncDownload(ProductMallMappingDownloadBO downloadBO) {
    BizContext.setNickname(downloadBO.getOperator());
    Store store = storeService.getByKey(downloadBO.getStoreId());
    downloadBO.setStore(store);
    download(downloadBO);
    BIZ_LOGGER.log("铺货下载", "参数:{}", JsonUtil.toJson(downloadBO));
  }