Lingmoumou's Blog

きっといつかって愿うまま

0%

SpringBoot 异步处理

同步处理与异步处理请求流程

在Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由某一个线程从头到尾负责处理,也就是同步处理请求。如果一个请求需要进行IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待IO操作完成,而IO操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,在并发量越来越大的情况下,这将带来严重的性能问题。

同步处理请求流程

而在Servlet3.0发布后,提供了一个新特性:异步处理请求。可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应。其请求流程为:

异步处理请求流程

通常开发过程中,一般上我们都是同步调用,即:程序按定义的顺序依次执行的过程,每一行代码执行过程必须等待上一行代码执行完毕后才执行。而异步调用指:程序在执行时,无需等待执行的返回值可继续执行后面的代码。显而易见,同步有依赖相关性,而异步没有,所以异步可并发执行,可提高执行效率,在相同的时间做更多的事情。

异步请求的实现

  • 获取AsyncContext:根据HttpServletRequest对象获取。
  • 设置监听器: 可设置其开始、完成、异常、超时等事件的回调处理.
  • 设置超时:通过setTimeout方法设置,单位:毫秒。

监听器的接口代码

说明:

  • onStartAsync:异步线程开始时调用
  • onError:异步线程出错时调用
  • onTimeout:异步线程执行超时调用
  • onComplete:异步执行完毕时调用

一般上,我们在超时或者异常时,会返回给前端相应的提示,比如说超时了,请再次请求等等,根据各业务进行自定义返回。同时,在异步调用完成时,一般需要执行一些清理工作或者其他相关操作。

需要注意的是只有在调用request.startAsync前将监听器添加到AsyncContext,监听器的onStartAsync方法才会起作用,而调用startAsync前AsyncContext还不存在,所以第一次调用startAsync是不会被监听器中的onStartAsync方法捕获的,只有在超时后又重新开始的情况下onStartAsync方法才会起作用。

Servlet方式实现异步请求

通过HttpServletRequest对象中获得一个AsyncContext对象,该对象构成了异步处理的上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// ServletController.java

@Slf4j
@RestController
@RequestMapping("/servlet")
public class AsyncController {

@RequestMapping("/origin")
public void origin(HttpServletRequest request,
HttpServletResponse response) throws Exception {

Thread.sleep(100);
response.getWriter().println("这是【正常】的请求返回");
}

@RequestMapping("/async")
public void todoAsync(HttpServletRequest request,HttpServletResponse response) {
AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AsyncListener() {

@Override
public void onTimeout(AsyncEvent event) throws IOException {
log.info("超时了:");
//做一些超时后的相关操作
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
log.info("线程开始");
}

@Override
public void onError(AsyncEvent event) throws IOException {
log.info("发生错误:",event.getThrowable());
}

@Override
public void onComplete(AsyncEvent event) throws IOException {
log.info("执行完成");
//这里可以做一些清理资源的操作
}
});

//设置超时时间
asyncContext.setTimeout(200);

asyncContext.start(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
log.info("内部线程:" + Thread.currentThread().getName());
asyncContext.getResponse().setCharacterEncoding("utf-8");
asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
asyncContext.getResponse().getWriter().println("这是【异步】的请求返回");
} catch (Exception e) {
log.error("异常:",e);
}
//异步请求完成通知,此时整个请求才完成
//其实可以利用此特性 进行多条消息的推送把连接挂起。。
asyncContext.complete();
}
});
//此时之类 request的线程连接已经释放了
log.info("线程:" + Thread.currentThread().getName());
}
}

Spring方式实现异步请求

在Spring中,有多种方式实现异步请求,比如callable、DeferredResult或者WebAsyncTask。每个的用法略有不同,可根据不同的业务场景选择不同的方式。

使用Callable异步处理请求

使用很简单,直接返回的参数包裹一层callable即可,可以继承WebMvcConfigurerAdapter类来设置默认线程池和超时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AsyncController.java

@Slf4j
@RestController
public class AsyncController {

@RequestMapping("/order")
public Callable<String> order() {
log.info("主线程开始");
Callable<String> result = new Callable<String>() {
@Override
public String call() throws Exception {
log.info("副线程开始");
Thread.sleep(1000);
log.info("副线程返回");
return "success";
}
};
log.info("主线程返回");
return result;
}
}

使用DeferredResult异步处理请求

相比于Callable,DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// DeferredResultController.java

@Slf4j
@RestController
public class DeferredResultController {

public static ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(30); // 线程池

@RequestMapping("/deferredresult")
public DeferredResult<String> deferredResult(){
log.info("外部线程:" + Thread.currentThread().getName());
//设置超时时间
DeferredResult<String> result = new DeferredResult<String>(60*1000L);
//处理超时事件 采用委托机制
result.onTimeout(() -> {
log.error("DeferredResult超时");
result.setResult("超时了!");
});

result.onCompletion(() -> log.info("调用完成"));

FIXED_THREAD_POOL.execute(() -> {
log.info("内部线程:" + Thread.currentThread().getName());
//返回结果
result.setResult("DeferredResult!!");
});
return result;
}
}

下面将通过DeferredResult方式,模拟以下流程:

同步处理与异步处理请求流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// AsyncController.java

@Slf4j
@RestController
public class AsyncController {

@Autowired
private MockQueue mockQueue;

@Autowired
private DeferredResultHolder deferredResultHolder;

@RequestMapping("/order")
public DeferredResult<String> order() throws InterruptedException {
log.info("主线程开始");
String orderNumber= RandomStringUtils.randomNumeric(8);
mockQueue.setPlaceOrder(orderNumber);

DeferredResult<String> result=new DeferredResult<>();

deferredResultHolder.getMap().put(orderNumber,result);
log.info("主线程返回");
return result;
}
}

// MockQueue.java

@Slf4j
@Data
@Component
public class MockQueue {
private String placeOrder;
private String completeOrder;

public void setPlaceOrder(String placeOrder) throws InterruptedException {
new Thread(()->{
log.info("接到下单请求,"+placeOrder);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.completeOrder=placeOrder;
log.info("下单请求处理完毕,"+placeOrder);
}).start();
}
}

// QueueListener.java

@Slf4j
@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {

@Autowired
private MockQueue mockQueue;

@Autowired
private DeferredResultHolder deferredResultHolder;

@Override
public void onApplicationEvent(ContextRefreshedEvent event){
new Thread(()->{
while (true){
if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())){
String orderNumber=mockQueue.getCompleteOrder();
log.info("返回订单处理结果:"+orderNumber);
deferredResultHolder.getMap().get(orderNumber).setResult("place order");
mockQueue.setCompleteOrder(null);
}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}

// DeferredResultHolder.java

@Component
public class DeferredResultHolder {
private Map<String, DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>();

public Map<String,DeferredResult<String>> getMap(){
return map;
}

public void setMap(Map<String,DeferredResult<String>> map){
this.map=map;
}
}

使用WebAsyncTask 异步处理请求

和Callable差不多,在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// AsyncController.java

@Slf4j
@RestController
public class AsyncController {

@RequestMapping("/webAsyncTask")
public WebAsyncTask<String> asyncTaskTimeout() {
// 打印处理线程名
log.info(format("请求处理线程:"+ currentThread().getName()));

// 模拟开启一个异步任务,超时时间为10s
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> {
log.info(format("异步工作线程:"+ currentThread().getName()));
// 任务处理时间5s,不超时
sleep(15 * 1000L);
return TIME_MESSAGE;
});

// 任务执行完成时调用该方法
asyncTask.onCompletion(() -> log.info("任务执行完成"));

asyncTask.onTimeout(() -> {
log.info("任务执行超时");
return TIME_MESSAGE;
});

log.info("继续处理其他事情");
return asyncTask;
}
}

异步处理配置

1
2
3
4
5
6
7
8
@Configuration
public class WebConfig extends WebMvcConfigurationSupport {

@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer){
configurer.setDefaultTimeout(1000);
}
}

CallableProcessingInterceptor与CallableInterceptor中的内容大致相似,但多了一些设置超时时间等方法。可以WebConfig中设置异步处理的超时时间,也可以setTaskExecutor设置相关线程池。
CallableProcessingInterceptor.java

异步调用的实现

在SpringBoot中使用异步调用是很简单的,只需要使用@Async注解即可实现方法的异步调用。

需要在启动类加入@EnableAsync使异步调用@Async注解生效。

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
@EnableAsync
@Slf4j
public class Chapter21Application {

public static void main(String[] args) {
SpringApplication.run(Chapter21Application.class, args);
log.info("Chapter21启动!");
}
}

@Async异步调用

使用@Async很简单,只需要在需要异步执行的方法上加入此注解即可。这里创建一个控制层和一个服务层,进行简单示例下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// SyncService.java

@Component
public class SyncService {

@Async
public void asyncEvent() throws InterruptedException {
//休眠1s
Thread.sleep(1000);
//log.info("异步方法输出:{}!", System.currentTimeMillis());
}

public void syncEvent() throws InterruptedException {
Thread.sleep(1000);
//log.info("同步方法输出:{}!", System.currentTimeMillis());
}
}

// AsyncController.java

@RestController
@Slf4j
public class AsyncController {

@Autowired
SyncService syncService;

@GetMapping("/async")
public String doAsync() throws InterruptedException {
long start = System.currentTimeMillis();
log.info("方法执行开始:{}", start);
//调用同步方法
syncService.syncEvent();
long syncTime = System.currentTimeMillis();
log.info("同步方法用时:{}", syncTime - start);
//调用异步方法
syncService.asyncEvent();
long asyncTime = System.currentTimeMillis();
log.info("异步方法用时:{}", asyncTime - syncTime);
log.info("方法执行完成:{}!",asyncTime);
return "async!!!";
}
}

自定义线程池

在默认情况下,系统使用的是默认的SimpleAsyncTaskExecutor进行线程创建。所以一般上我们会自定义线程池来进行线程的复用。

关于ThreadPoolTaskExecutor参数说明:

  • corePoolSize:线程池维护线程的最少数量
  • keepAliveSeconds:允许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  • maxPoolSize:线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • queueCapacity:缓存队列
  • rejectedExecutionHandler:线程池对拒绝任务(无线程可用)的处理策略。这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。还有一个是AbortPolicy策略:处理程序遭到拒绝将抛出运行时RejectedExecutionException。

而在一些场景下,若需要在关闭线程池时等待当前调度任务完成后才开始关闭,可以通过简单的配置,进行优雅的停机策略配置。关键就是通过setWaitForTasksToCompleteOnShutdown(true)和setAwaitTerminationSeconds方法。

  • setWaitForTasksToCompleteOnShutdown: 表明等待所有线程执行完,默认为false。
  • setAwaitTerminationSeconds: 等待的时间,因为不能无限的等待下去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Config.java

@Configuration
public class Config {

// 配置线程池
@Bean(name = "asyncPoolTaskExecutor")
public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(200);
taskExecutor.setQueueCapacity(25);
taskExecutor.setKeepAliveSeconds(200);
taskExecutor.setThreadNamePrefix("oKong-");
// 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
}
}

// SyncService.java

@Component
@Slf4j
public class SyncService {
@Async("asyncPoolTaskExecutor")
public void asyncEvent() throws InterruptedException {
//休眠1s
Thread.sleep(1000);
log.info("异步方法内部线程名称:{}!", Thread.currentThread().getName());
}
}

异步回调及超时处理

对于一些业务场景下,需要异步回调的返回值时,就需要使用异步回调来完成了。主要就是通过Future进行异步回调。

异步回调

  1. 修改下异步方法的返回类型,加入Future。
  2. 其中AsyncResult是Spring提供的一个Future接口的子类。
  3. 然后通过isDone方法,判断是否已经执行完毕。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// SyncService.java

@Component
@Slf4j
public class SyncService {
@Async("asyncPoolTaskExecutor")
public Future<String> asyncEvent() throws InterruptedException {
//休眠1s
Thread.sleep(1000);
log.info("异步方法内部线程名称:{}!", Thread.currentThread().getName());
return new AsyncResult<>("异步方法返回值");
}
}

// AsyncController.java

@RestController
@Slf4j
public class AsyncController {
@GetMapping("/async")
public String doAsync() throws InterruptedException {
long start = System.currentTimeMillis();
log.info("方法执行开始:{}", start);
//调用同步方法
syncService.syncEvent();
long syncTime = System.currentTimeMillis();
log.info("同步方法用时:{}", syncTime - start);
//调用异步方法
Future<String> doFutrue = syncService.asyncEvent();
while(true) {
//判断异步任务是否完成
if(doFutrue.isDone()) {
break;
}
Thread.sleep(100);
}
long asyncTime = System.currentTimeMillis();
log.info("异步方法用时:{}", asyncTime - syncTime);
log.info("方法执行完成:{}!",asyncTime);
return "async!!!";
}
}

超时处理

对于一些需要异步回调的函数,不能无期限的等待下去,所以一般上需要设置超时时间,超时后可将线程释放,而不至于一直堵塞而占用资源。

  1. 对于Future配置超时,很简单,通过get方法即可;
  2. 超时后,会抛出异常TimeoutException类,此时可进行统一异常捕获即可。
1
2
3
//get方法会一直堵塞,直到等待执行完成才返回
//get(long timeout, TimeUnit unit) 在设置时间类未返回结果,会直接排除异常TimeoutException,messages为null
String result = doFutrue.get(60, TimeUnit.SECONDS);//60s

异步请求与异步调用的区别

两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,比如同步日志到kafka中做日志分析等。
异步请求是会一直等待response相应的,需要返回结果给客户端的;而异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。

参考文献