重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
本篇文章给大家分享的是有关java中怎么实现异步处理,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
创新互联主要从事网站制作、成都网站制作、网页设计、企业做网站、公司建网站等业务。立足成都服务清水,十载网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220
1.DeferredResult 加线程池 (DeferredResult 提供了超时、错误处理,功能非常完善,再加上多线程处理请求效果很不错)
2.新开个定时任务线程池 定时轮询当前任务列表 超时就停止(需要自己维护任务列表)Hystrix就是这种方案
3.JDK9 可以采用CompletableFuture orTimeout、completeOnTimeout 方法处理 前者抛出异常后者返回默认值
总结,其实线程池统一设置超时这个需求本身就是伪需求,线程执行任务时间本身就是参差不齐的,而且这个控制权应该交给Runable或Callable内部业务处理,不同的业务处理超时、异常、报警等各不相同。CompletableFuture、ListenableFuture 、DeferredResult 的功能相当丰富,建议在多线程处理的场景多使用这些api。
具体实现:
DeferredResult 先建个工具类。调用方使用execute方法,传入new的DeferredResultDTO(DeferredResultDTO只有msgId,也可以自定义一些成员变量方便后期业务扩展使用)
然后在其他线程业务处理完设置结果,调用setResult方法,传入msgId相同的DeferredResultDTO和result对象
/** * DeferredResult 工具类 * * @author tiancong * @date 2020/10/14 19:23 */ @UtilityClass @Slf4j public class DeferredResultUtil { private Map>> taskMap = new ConcurrentHashMap<>(16); public DeferredResult > execute(DeferredResultDTO dto) { return execute(dto, 5000L); } public DeferredResult > execute(DeferredResultDTO dto, Long time) { if (taskMap.containsKey(dto)) { throw new BusinessException(String.format("msgId=%s 已经存在,请勿重发消息", dto.getMsgId())); } DeferredResult > deferredResult = new DeferredResult<>(time); deferredResult.onError((e) -> { taskMap.remove(dto); log.info("处理失败 ", e); deferredResult.setResult(ResultVoUtil.fail("处理失败")); }); deferredResult.onTimeout(() -> { taskMap.remove(dto); if (dto.getType().equals(DeferredResultTypeEnum.CLOTHES_DETECTION)) { ExamController.getCURRENT_STUDENT().remove(dto.getMsgId()); } deferredResult.setResult(ResultVoUtil.fail("请求超时,请联系工作人员!")); }); taskMap.putIfAbsent(dto, deferredResult); return deferredResult; } public void setResult(DeferredResultDTO dto, ResultVO
2. 新开个定时任务线程池 定时轮询当前任务列表
/** * @author tiancong * @date 2021/4/10 11:06 */ @Slf4j public class T { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 2, r -> { Thread thread = new Thread(r); thread.setName("failAfter-%d"); thread.setDaemon(true); return thread; }); private static int timeCount; public static void main(String[] args) throws InterruptedException { ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor(); executorService.setCorePoolSize(4); executorService.setQueueCapacity(10); executorService.setMaxPoolSize(100); executorService.initialize(); // executorService.setAwaitTerminationSeconds(5); // executorService.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS); executorService.setWaitForTasksToCompleteOnShutdown(true); Random random = new Random(); long start = System.currentTimeMillis(); List> asyncResultList = new ArrayList<>(); for (int i = 0; i < 100; i++) { ListenableFuture asyncResult = executorService.submitListenable(() -> { int r = random.nextInt(10); log.info("{} 开始睡{}s", Thread.currentThread().getName(), r); TimeUnit.SECONDS.sleep(r); log.info("{} 干完了 {}s", Thread.currentThread().getName(), r); //throw new RuntimeException("出现异常"); return true; }); asyncResult.addCallback(data -> { try { // 休息3毫秒模拟获取到执行结果后的操作 TimeUnit.MILLISECONDS.sleep(3); log.info("{} 收到结果:{}", Thread.currentThread().getName(), data); } catch (Exception e) { e.printStackTrace(); } }, ex -> log.info("**异常信息**", ex)); asyncResultList.add(asyncResult); } System.out.println(String.format("总结耗时:%s ms", System.currentTimeMillis() - start)); // 守护进程 定时轮询 终止超时的任务 scheduler.scheduleAtFixedRate(() -> { // 模拟守护进程 终止超过6s的任务 timeCount++; if (timeCount > 6) { for (ListenableFuture future : asyncResultList) { if (!future.isDone()) { log.error("future 因超时终止任务,{}", future); future.cancel(true); } } } }, 0, 1000, TimeUnit.MILLISECONDS); } }
额外补充:
CompletableFuture实现了CompletionStage接口,里面很多丰富的异步编程接口。
applyToEither方法是哪个先完成,就apply哪一个结果(但是两个任务都会最终走完)
/** * @author tiancong * @date 2021/4/10 11:06 */ @Slf4j public class T { public static void main(String[] args) throws InterruptedException { // CompletableFutureresponseFuture = within( // createTaskSupplier("5"), 3000, TimeUnit.MILLISECONDS); // responseFuture // .thenAccept(T::send) // .exceptionally(throwable -> { // log.error("Unrecoverable error", throwable); // return null; // }); // // 注意 exceptionally是new 的CompletableFuture CompletableFuture
以上就是java中怎么实现异步处理,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。