目录

1. 基于 ExecutorService 自定义线程池(Java 5中引入的)

2. 基于 ThreadPoolTaskExecutor 线程池的使用 (Spring提供,以及监听线程池)

3. 自定义 ThreadPoolTaskExecutor 线程池


用于大数据量的导出报表、远程请求处理数据量同步等等

日常项目中可以定义多个线程池,如:报表导出使用的线程池 或 大数据量操作时使用(在配合webSocket通知前端,再或者大文件上传的线程池)

导出大数据量Excel文件时,单个xlsx文件最大只能存储一百多万的数据行,假设有2000万的数据我们日常清空必定导出多个Excel文件,此时就可以使用本文章的案例。

1. 基于 ExecutorService 自定义线程池(Java 5中引入的)

Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下。

自定义线程池

    private Logger logger = LoggerFactory.getLogger(InitBeanConfig.class);

    @Bean
    public ExecutorService callbackThreadPool() {
        ThreadFactory factory = new ThreadFactoryBuilder()
                .setUncaughtExceptionHandler((t, e) -> logger.error(t.getName() + " excute error:", e))
                .setNameFormat("callback-pool-%d").build();
        int corePoolSize = 3;
        int maxPoolSize = 4;
        long keepAliveTime = 5;
        ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(100000), factory, new ThreadPoolExecutor.CallerRunsPolicy());
        return pool;
    }

使用案例

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;

@RestController
@RequestMapping("thread")
public class ThreadPoolController {

    @Resource
    private ExecutorService callbackThreadPool;

    @GetMapping
    public Object thread(){
        
        // 不等待该线程执行结果
        callbackThreadPool.execute(() -> {
            // 代码块
        });


        // 等待所有线程都执行完,再执行后续操作
        List<String> dataList = Arrays.asList("123","456");

        CompletableFuture<?>[] cfs = new CompletableFuture[laoUserPartition.size()];
        int index = 0;
        for (String data : dataList) {
            // 每次循环创建一个线程
            CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
                // 线程内执行的操作

            }, callbackThreadPool);
            cfs[index++] = cf;
        }
        CompletableFuture.allOf(cfs).join();  // 此处等待监听的几个线程全部执行完      


        return 1;
    }



}

2. 基于 ThreadPoolTaskExecutor 线程池的使用 (Spring提供,以及监听线程池)

在springboot项目启动,如下查询加载的Bean

使用方法:

@RestController
public class PageController {

    @Autowired
    ThreadPoolTaskExecutor applicationTaskExecutor;  // applicationTaskExecutor 为spring注册时定义得 beanName


    // 开辟两个线程,后等待两个线程 都执行完的案例
    @GetMapping("/thread")
    public Object thread() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            for(int i = 0 ; i < 100000 ; i++){
                System.out.println("a-"+i);
            }
        }, applicationTaskExecutor);

        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            for(int i = 0 ; i < 100000 ; i++){
                System.out.println("w-"+i);
            }
        }, applicationTaskExecutor);
        
        // 等待这两个线程都执行完
        CompletableFuture.allOf(completableFuture1, completableFuture2).get();
        return "success";
    }
}

3. 自定义 ThreadPoolTaskExecutor 线程池

自定义设置线程的最大线程数等参数。

自定义Bean

    @Bean
    public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //此方法返回可用处理器的虚拟机的最大数量; 不小于1
        //int core = Runtime.getRuntime().availableProcessors();
        int core = 1;
        //设置核心线程数
        executor.setCorePoolSize(core);
        //设置最大线程数
        executor.setMaxPoolSize(core * 2 + 1);
        //除核心线程外的线程存活时间
        executor.setKeepAliveSeconds(3);
        //如果传入值大于0,底层队列使用的是LinkedBlockingQueue,否则默认使用SynchronousQueue
        executor.setQueueCapacity(40);
        //线程名称前缀
        executor.setThreadNamePrefix("thread-execute");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

使用方法


    @Autowired
    ThreadPoolTaskExecutor myThreadPoolTaskExecutor;  // myThreadPoolTaskExecutor 为beanName

    @GetMapping("/thread")
    public Object thread() throws ExecutionException, InterruptedException {

        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            for(int i = 0 ; i < 100000 ; i++){
                System.out.println("a-"+i);
            }
        }, myThreadPoolTaskExecutor);

        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            for(int i = 0 ; i < 100000 ; i++){
                System.out.println("w-"+i);
            }
        }, myThreadPoolTaskExecutor);

        // 等待两个线程执行完
        CompletableFuture.allOf(completableFuture1, completableFuture2).get();
        return "success";
    }

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐