SpringBoot 中使用 @Async 实现异步调用

lz 1年前 ⋅ 980 阅读

在 Java 中,当我们需要执行异步操作时,往往会去创建一个新线程去执行,如下:

public class App {
    public static void main( String[] args ) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + ":异步任务");
        }).start();
    }
}

Spring 3.0 之后提供了一个 @Async注解,使用 @Async 注解进行优雅的异步调用。

其实,@Async注解本质上还是通过线程池创建线程去异步执行任务

在启动类开启启用异步调用,同时注入 ApplicationRunner 对象在启动类进行调用测试(没有使用单元测试类)。

ApplicationRunner 可以实现系统启动完后做一些系统初始化的操作。

1. @Async 注解的基本使用

使用 @Async 注解步骤:

  1. 添加 @EnableAsync 注解。在主类上或者 某个类上,否则,异步方法不会生效
  2. 添加 @Async 注解。在异步方法上添加此注解。异步方法不能被 static 修饰
  3. (可选)如果需要自定义线程池的话,则可以配置线程池(下文有)

App: 启动类(测试类)

@MapperScan("com.zzc.mapper")
@SpringBootApplication
public class App {

    @Autowired
    private AsyncService asyncService;

    public static void main( String[] args ) {
        SpringApplication.run(App.class, args);
    }

    @Bean
    public ApplicationRunner applicationRunner() {
        return applicationArguments -> {
            long startTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
			
			// 无返回值
            asyncService.asyncTask();
			
			// 有返回值--主线程未使用到
            //Future<String> asyncTask = asyncService.asyncTask("666");
            // 有返回值--主线程使用到
            //System.out.println(Thread.currentThread().getName() + ":返回值:" + asyncTask.get());
			
			// 模拟事务异常回滚
            //asyncService.asyncTaskForTransaction(true);

            long endTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
        };
    }

}

AsyncServiceImpl:业务处理类

@EnableAsync
@Service
public class AsyncServiceImpl implements AsyncService {

    @Autowired
    private UserMapper userMapper;

    @Async
    @Override
    public void asyncTask() {
        long startTime = System.currentTimeMillis();
        try {
            //模拟耗时
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":void asyncTask(),耗时:" + (endTime - startTime));
    }

    @Async
    @Override
    public Future<String> asyncTask(String s) {
        long startTime = System.currentTimeMillis();
        try {
            //模拟耗时
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":Future<String> asyncTask(String s),耗时:" + (endTime - startTime));
        return AsyncResult.forValue(s);
    }

    @Async
    @Override
    @Transactional
    public void asyncTaskForTransaction(Boolean exFlg) {
        TabUser tabUser = new TabUser();
        tabUser.setId("111111");
        tabUser.setUserName("zzc");
        tabUser.setPhoto("12");
        userMapper.addUser(tabUser);
        if (exFlg) {
            throw new RuntimeException("模拟异常");
        }
    }

}

1.1 无返回值

@MapperScan("com.zzc.mapper")
@SpringBootApplication
public class App {

    @Autowired
    private AsyncService asyncService;

    public static void main( String[] args ) {
        SpringApplication.run(App.class, args);
    }

    @Bean
    public ApplicationRunner applicationRunner() {
        return applicationArguments -> {
            long startTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
            asyncService.asyncTask();
            long endTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
        };
    }

}

运行代码后:

由打印结果知:主线程并未因为子线程阻塞而阻塞,子线程在睡眠过程中,主线程就已经执行完了。由此可知,确实是实现了异步方法调用!

1.2 有返回值–主线程未使用到

@Bean
public ApplicationRunner applicationRunner() {
    return applicationArguments -> {
        long startTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
        Future<String> asyncTask = asyncService.asyncTask("666");
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
    };
}

运行结果如下:

同案例1,也是实现了异步方法的调用

1.3 有返回值–主线程使用到

@Bean
public ApplicationRunner applicationRunner() {
    return applicationArguments -> {
        long startTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
        Future<String> asyncTask = asyncService.asyncTask("666");
        System.out.println(Thread.currentThread().getName() + ":返回值:" + asyncTask.get());
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
    };
}

运行结果如下:

结果:主线程会因子线程的阻塞而阻塞。

有返回值的情况下,虽然异步业务逻辑是由子线程执行,但如果在主线程操作返回值对象,主线程会等待,还是顺序执行

1.4 模拟事务异常回滚

为了方便观察、测试,我们在配置文件中将日志级别设置成debug

logging:
  level:
    root: debug

事务正常提交是没有问题的,数据可以添加到数据库(通过 Mybatis),所以,这里只演示事务异常情况。

@Bean
public ApplicationRunner applicationRunner() {
    return applicationArguments -> {
        long startTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
        asyncService.asyncTaskForTransaction(true);
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
    };
}

运行结果如下:

在发生异常后,会进行事务回滚,所以,数据不会添加到数据库里面去。

2. @Async 注解使用自定义的线程池

@Async 底层原理:就是通过线程池创建一个线程,然后去执行业务逻辑。

@Async 注解会应用默认线程池 SimpleAsyncTaskExecutor

默认线程池的弊端

在线程池应用中,参考阿里巴巴 Java 开发规范:程池不允许使用 Executors 去创建,不允许使用系统默认的线程池,推荐通过 ThreadPoolExecutor 的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。

Executors 各个方法的弊端:

  • newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM
  • newCachedThreadPool和newScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM

@Async 默认异步配置使用的是 SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发 OutOfMemoryError 错误。

针对线程创建问题,SimpleAsyncTaskExecutor 提供了限流机制,通过 concurrencyLimit 属性来控制开关,当concurrencyLimit>=0 时开启限流机制,默认关闭限流机制,即 concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor 并不是严格意义的线程池,达不到线程复用的功能 。

自定义线程池

新建一个配置类:

@Configuration
public class AsyncThreadPoolConfig {

    private static final int MAX_POOL_SIZE = 50;
    private static final int CORE_POOL_SIZE = 20;
    private static final int TASK_NUM = 200;
    private static final int ACTIVE_TIME = 60;

    @Bean("asyncTaskExecutor")
    public AsyncTaskExecutor asyncTaskExecutor() {
        ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
        asyncTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        asyncTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        asyncTaskExecutor.setQueueCapacity(TASK_NUM);
        asyncTaskExecutor.setKeepAliveSeconds(ACTIVE_TIME);
        asyncTaskExecutor.setThreadNamePrefix("async-task-thread-pool");
        asyncTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        asyncTaskExecutor.initialize();
        return asyncTaskExecutor;
    }

}

上面我们通过使用 ThreadPoolTaskExecutor 创建了一个线程池,同时设置了以下这些参数:

  • 最大线程数 50:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • 核心线程数 20:线程池创建时候初始化的线程数
  • 缓冲队列 200:用来缓冲执行任务的队列
  • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  • 线程池对拒绝任务的处理策略:这里采用了 CallerRunsPolicy 策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

使用线程池

在定义了线程池之后,我们如何让异步调用的执行任务使用这个线程池中的资源来运行呢?方法非常简单,我们只需要在 @Async 注解中指定线程池名即可。当然,也可以不用指定,比如:

@Async("asyncTaskExecutor")  // @Async
@Override
public void asyncTask() {
    long startTime = System.currentTimeMillis();
    try {
        //模拟耗时
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    long endTime = System.currentTimeMillis();
    System.out.println(Thread.currentThread().getName() + ":void asyncTask(),耗时:" + (endTime - startTime));
}

测试线程池

@Bean
public ApplicationRunner applicationRunner() {
    return applicationArguments -> {
        long startTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":开始调用异步业务");
        asyncService.asyncTask();
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName() + ":调用异步业务结束,耗时:" + (endTime - startTime));
    };
}

运行结果如下:

我们可以在控制台中看到所有输出的线程名前都是之前我们定义的线程池前缀名开始的,说明我们使用线程池来执行异步任务的试验成功了!

 

 

 

 

版权 本文为TIMO社区原创文章,转载无需和我联系,但请注明来自TIMO社区 http://timo.aikanmv.cn