在Servlet 3.0之前,Servlet采用Thread-Per-Request的方式处理请求,即每一次Http请求都由某一个线程从头到尾负责处理,也就是同步处理请求。如果一个请求需要进行IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待IO操作完成,而IO操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,在并发量越来越大的情况下,这将带来严重的性能问题。
而在Servlet3.0发布后,提供了一个新特性:异步处理请求。可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应。其请求流程为:
通常开发过程中,一般上我们都是同步调用,即:程序按定义的顺序依次执行的过程,每一行代码执行过程必须等待上一行代码执行完毕后才执行。而异步调用指:程序在执行时,无需等待执行的返回值可继续执行后面的代码。显而易见,同步有依赖相关性,而异步没有,所以异步可并发执行,可提高执行效率,在相同的时间做更多的事情。
异步请求的实现
- 获取AsyncContext:根据HttpServletRequest对象获取。
- 设置监听器: 可设置其开始、完成、异常、超时等事件的回调处理.
- 设置超时:通过setTimeout方法设置,单位:毫秒。
说明:
- onStartAsync:异步线程开始时调用
- onError:异步线程出错时调用
- onTimeout:异步线程执行超时调用
- onComplete:异步执行完毕时调用
一般上,我们在超时或者异常时,会返回给前端相应的提示,比如说超时了,请再次请求等等,根据各业务进行自定义返回。同时,在异步调用完成时,一般需要执行一些清理工作或者其他相关操作。
需要注意的是只有在调用request.startAsync前将监听器添加到AsyncContext,监听器的onStartAsync方法才会起作用,而调用startAsync前AsyncContext还不存在,所以第一次调用startAsync是不会被监听器中的onStartAsync方法捕获的,只有在超时后又重新开始的情况下onStartAsync方法才会起作用。
Servlet方式实现异步请求
通过HttpServletRequest对象中获得一个AsyncContext对象,该对象构成了异步处理的上下文。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
@Slf4j @RestController @RequestMapping("/servlet") public class AsyncController { @RequestMapping("/origin") public void origin(HttpServletRequest request, HttpServletResponse response) throws Exception {
Thread.sleep(100); response.getWriter().println("这是【正常】的请求返回"); } @RequestMapping("/async") public void todoAsync(HttpServletRequest request,HttpServletResponse response) { AsyncContext asyncContext = request.startAsync(); asyncContext.addListener(new AsyncListener() { @Override public void onTimeout(AsyncEvent event) throws IOException { log.info("超时了:"); } @Override public void onStartAsync(AsyncEvent event) throws IOException { log.info("线程开始"); } @Override public void onError(AsyncEvent event) throws IOException { log.info("发生错误:",event.getThrowable()); } @Override public void onComplete(AsyncEvent event) throws IOException { log.info("执行完成"); } });
asyncContext.setTimeout(200);
asyncContext.start(new Runnable() { @Override public void run() { try { Thread.sleep(100); log.info("内部线程:" + Thread.currentThread().getName()); asyncContext.getResponse().setCharacterEncoding("utf-8"); asyncContext.getResponse().setContentType("text/html;charset=UTF-8"); asyncContext.getResponse().getWriter().println("这是【异步】的请求返回"); } catch (Exception e) { log.error("异常:",e); } asyncContext.complete(); } }); log.info("线程:" + Thread.currentThread().getName()); } }
|
Spring方式实现异步请求
在Spring中,有多种方式实现异步请求,比如callable、DeferredResult或者WebAsyncTask。每个的用法略有不同,可根据不同的业务场景选择不同的方式。
使用Callable异步处理请求
使用很简单,直接返回的参数包裹一层callable即可,可以继承WebMvcConfigurerAdapter类来设置默认线程池和超时处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Slf4j @RestController public class AsyncController {
@RequestMapping("/order") public Callable<String> order() { log.info("主线程开始"); Callable<String> result = new Callable<String>() { @Override public String call() throws Exception { log.info("副线程开始"); Thread.sleep(1000); log.info("副线程返回"); return "success"; } }; log.info("主线程返回"); return result; } }
|
使用DeferredResult异步处理请求
相比于Callable,DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Slf4j @RestController public class DeferredResultController { public static ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(30); @RequestMapping("/deferredresult") public DeferredResult<String> deferredResult(){ log.info("外部线程:" + Thread.currentThread().getName()); DeferredResult<String> result = new DeferredResult<String>(60*1000L); result.onTimeout(() -> { log.error("DeferredResult超时"); result.setResult("超时了!"); });
result.onCompletion(() -> log.info("调用完成")); FIXED_THREAD_POOL.execute(() -> { log.info("内部线程:" + Thread.currentThread().getName()); result.setResult("DeferredResult!!"); }); return result; } }
|
下面将通过DeferredResult方式,模拟以下流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
@Slf4j @RestController public class AsyncController {
@Autowired private MockQueue mockQueue;
@Autowired private DeferredResultHolder deferredResultHolder;
@RequestMapping("/order") public DeferredResult<String> order() throws InterruptedException { log.info("主线程开始"); String orderNumber= RandomStringUtils.randomNumeric(8); mockQueue.setPlaceOrder(orderNumber);
DeferredResult<String> result=new DeferredResult<>();
deferredResultHolder.getMap().put(orderNumber,result); log.info("主线程返回"); return result; } }
@Slf4j @Data @Component public class MockQueue { private String placeOrder; private String completeOrder;
public void setPlaceOrder(String placeOrder) throws InterruptedException { new Thread(()->{ log.info("接到下单请求,"+placeOrder); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } this.completeOrder=placeOrder; log.info("下单请求处理完毕,"+placeOrder); }).start(); } }
@Slf4j @Component public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired private MockQueue mockQueue;
@Autowired private DeferredResultHolder deferredResultHolder;
@Override public void onApplicationEvent(ContextRefreshedEvent event){ new Thread(()->{ while (true){ if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())){ String orderNumber=mockQueue.getCompleteOrder(); log.info("返回订单处理结果:"+orderNumber); deferredResultHolder.getMap().get(orderNumber).setResult("place order"); mockQueue.setCompleteOrder(null); }else { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
@Component public class DeferredResultHolder { private Map<String, DeferredResult<String>> map=new HashMap<String,DeferredResult<String>>();
public Map<String,DeferredResult<String>> getMap(){ return map; }
public void setMap(Map<String,DeferredResult<String>> map){ this.map=map; } }
|
使用WebAsyncTask 异步处理请求
和Callable差不多,在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
@Slf4j @RestController public class AsyncController {
@RequestMapping("/webAsyncTask") public WebAsyncTask<String> asyncTaskTimeout() { log.info(format("请求处理线程:"+ currentThread().getName()));
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(10 * 1000L, () -> { log.info(format("异步工作线程:"+ currentThread().getName())); sleep(15 * 1000L); return TIME_MESSAGE; });
asyncTask.onCompletion(() -> log.info("任务执行完成"));
asyncTask.onTimeout(() -> { log.info("任务执行超时"); return TIME_MESSAGE; });
log.info("继续处理其他事情"); return asyncTask; } }
|
异步处理配置
1 2 3 4 5 6 7 8
| @Configuration public class WebConfig extends WebMvcConfigurationSupport {
@Override public void configureAsyncSupport(AsyncSupportConfigurer configurer){ configurer.setDefaultTimeout(1000); } }
|
CallableProcessingInterceptor与CallableInterceptor中的内容大致相似,但多了一些设置超时时间等方法。可以WebConfig中设置异步处理的超时时间,也可以setTaskExecutor设置相关线程池。
异步调用的实现
在SpringBoot中使用异步调用是很简单的,只需要使用@Async
注解即可实现方法的异步调用。
需要在启动类加入@EnableAsync
使异步调用@Async
注解生效。
1 2 3 4 5 6 7 8 9 10
| @SpringBootApplication @EnableAsync @Slf4j public class Chapter21Application {
public static void main(String[] args) { SpringApplication.run(Chapter21Application.class, args); log.info("Chapter21启动!"); } }
|
@Async异步调用
使用@Async很简单,只需要在需要异步执行的方法上加入此注解即可。这里创建一个控制层和一个服务层,进行简单示例下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
@Component public class SyncService { @Async public void asyncEvent() throws InterruptedException { Thread.sleep(1000); } public void syncEvent() throws InterruptedException { Thread.sleep(1000); } }
@RestController @Slf4j public class AsyncController { @Autowired SyncService syncService; @GetMapping("/async") public String doAsync() throws InterruptedException { long start = System.currentTimeMillis(); log.info("方法执行开始:{}", start); syncService.syncEvent(); long syncTime = System.currentTimeMillis(); log.info("同步方法用时:{}", syncTime - start); syncService.asyncEvent(); long asyncTime = System.currentTimeMillis(); log.info("异步方法用时:{}", asyncTime - syncTime); log.info("方法执行完成:{}!",asyncTime); return "async!!!"; } }
|
自定义线程池
在默认情况下,系统使用的是默认的SimpleAsyncTaskExecutor进行线程创建。所以一般上我们会自定义线程池来进行线程的复用。
关于ThreadPoolTaskExecutor参数说明:
- corePoolSize:线程池维护线程的最少数量
- keepAliveSeconds:允许的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
- maxPoolSize:线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程
- queueCapacity:缓存队列
- rejectedExecutionHandler:线程池对拒绝任务(无线程可用)的处理策略。这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。还有一个是AbortPolicy策略:处理程序遭到拒绝将抛出运行时RejectedExecutionException。
而在一些场景下,若需要在关闭线程池时等待当前调度任务完成后才开始关闭,可以通过简单的配置,进行优雅的停机策略配置。关键就是通过setWaitForTasksToCompleteOnShutdown(true)和setAwaitTerminationSeconds方法。
- setWaitForTasksToCompleteOnShutdown: 表明等待所有线程执行完,默认为false。
- setAwaitTerminationSeconds: 等待的时间,因为不能无限的等待下去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
@Configuration public class Config {
@Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(20); taskExecutor.setMaxPoolSize(200); taskExecutor.setQueueCapacity(25); taskExecutor.setKeepAliveSeconds(200); taskExecutor.setThreadNamePrefix("oKong-"); taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } }
@Component @Slf4j public class SyncService { @Async("asyncPoolTaskExecutor") public void asyncEvent() throws InterruptedException { Thread.sleep(1000); log.info("异步方法内部线程名称:{}!", Thread.currentThread().getName()); } }
|
异步回调及超时处理
对于一些业务场景下,需要异步回调的返回值时,就需要使用异步回调来完成了。主要就是通过Future进行异步回调。
异步回调
- 修改下异步方法的返回类型,加入Future。
- 其中AsyncResult是Spring提供的一个Future接口的子类。
- 然后通过isDone方法,判断是否已经执行完毕。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Component @Slf4j public class SyncService { @Async("asyncPoolTaskExecutor") public Future<String> asyncEvent() throws InterruptedException { Thread.sleep(1000); log.info("异步方法内部线程名称:{}!", Thread.currentThread().getName()); return new AsyncResult<>("异步方法返回值"); } }
@RestController @Slf4j public class AsyncController { @GetMapping("/async") public String doAsync() throws InterruptedException { long start = System.currentTimeMillis(); log.info("方法执行开始:{}", start); syncService.syncEvent(); long syncTime = System.currentTimeMillis(); log.info("同步方法用时:{}", syncTime - start); Future<String> doFutrue = syncService.asyncEvent(); while(true) { if(doFutrue.isDone()) { break; } Thread.sleep(100); } long asyncTime = System.currentTimeMillis(); log.info("异步方法用时:{}", asyncTime - syncTime); log.info("方法执行完成:{}!",asyncTime); return "async!!!"; } }
|
超时处理
对于一些需要异步回调的函数,不能无期限的等待下去,所以一般上需要设置超时时间,超时后可将线程释放,而不至于一直堵塞而占用资源。
- 对于Future配置超时,很简单,通过get方法即可;
- 超时后,会抛出异常TimeoutException类,此时可进行统一异常捕获即可。
1 2 3
|
String result = doFutrue.get(60, TimeUnit.SECONDS);
|
异步请求与异步调用的区别
两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,比如同步日志到kafka中做日志分析等。
异步请求是会一直等待response相应的,需要返回结果给客户端的;而异步调用我们往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。
参考文献