jdk批量异步任务最强工具completionservice-星辰平台

发表于 2022/04/25 23:34:56 2022/04/25
【摘要】 如何优化一个查询各个价格接口的代码?若使用“threadpoolexecutor future”,可能优化如下:三个线程异步执行查询价格,通过三次调用future的get()方法获取结果,之后将查询结果保存在mysql。 若获取price1耗时很长,那么即便获取price2耗时短,也无法让保存price2的操作先执行,因为主线程都阻塞在 f1.get()。这种问题如何解决呢? 加个阻塞队列!...

如何优化一个查询各个价格接口的代码?若使用“threadpoolexecutor future”,可能优化如下:

三个线程异步执行查询价格,通过三次调用future的get()方法获取结果,之后将查询结果保存在mysql。

若获取price1耗时很长,那么即便获取price2耗时短,也无法让保存price2的操作先执行,因为主线程都阻塞在 f1.get()。这种问题如何解决呢?

加个阻塞队列!获取到price1、2、3都进入阻塞队列,然后在主线程消费阻塞队列,就能保证先获取到的价格先保存:

实际开发推荐completionservice,不但能帮你解决先获取到的价格先保存,还能精简代码。

completionservice内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果入队,但completionservice是把任务执行结果的future对象入队,而上面demo是把任务最终执行结果入队。

completionservice接口的实现类是executorcompletionservice,这个实现类的构造方法有两个,分别是:

  • executorcompletionservice(executor executor);

  • executorcompletionservice(executor executor, blockingqueue> completionqueue)

这俩构造器都需要传入一个线程池,若不指定completionqueue,默认使用无界linkedblockingqueue。任务执行结果的future对象就是加入到completionqueue中。

学长~你直接给我写段代码解释清楚点呗

让我们试着利用completionservice实现高性能的查询房价系统。之后通过completionservice#submit()提交三个询价操作,这三个询价操作将会被completionservice异步执行。

最后completionservice#take()获取一个future对象(加入到阻塞队列的是任务执行结果的future对象),调用future#get()就能返回执行结果。

completionservice接口提供的方法submit()相关的方法有两个:

  • 一个方法参数是callable task

  • 一个方法有两个参数,分别是runnable task和v result,该方法类似于threadpoolexecutor的 future submit(runnable task, t result)

dubbo中有一种叫做forking的集群模式,这种集群模式下,支持并行调用多个查询服务,只要有一个成功返回结果,整个服务即可返回。例如你需要提供一个地址转坐标的服务,为了保证该服务的高可用和性能,可并行调用3个地图服务商的api,然后只要有1个正确返回了结果r,那么地址转坐标这个服务就可以直接返回r了。这种集群模式可以容忍2个地图服务商服务异常,但缺点是消耗的资源偏多。

 geocoder(addr) {
   // 并行执行以下3个查询服务, 
   r1=geocoderbys1(addr);
   r2=geocoderbys2(addr);
   r3=geocoderbys3(addr);
   // 只要r1,r2,r3有一个返回
   // 则返回
   return r1|r2|r3;
 }

利用completionservice可快速实现 forking 这种集群模式,比如下面示例代码。首先创建一个线程池executor 、一个completionservice对象cs和一个future类型的列表 futures,每次通过调用completionservice的submit()方法提交一个异步任务,会返回一个future对象,把这些future对象保存在列表futures中。通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。

 // 创建线程池
 executorservice executor =
   executors.newfixedthreadpool(3);
 // 创建completionservice
 completionservice cs =
   new executorcompletionservice<>(executor);
 // 用于保存future对象
 list> futures =
   new arraylist<>(3);
 // 提交异步任务,并保存future到futures 
 futures.add(
   cs.submit(()->geocoderbys1()));
 futures.add(
   cs.submit(()->geocoderbys2()));
 futures.add(
   cs.submit(()->geocoderbys3()));
 // 获取最快返回的任务执行结果
 integer r = 0;
 try {
   // 只要有一个成功返回,则break
   for (int i = 0; i < 3;   i) {
     r = cs.take().get();
     // 简单地通过判空来检查是否成功返回
     if (r != null) {
       break;
     }
   }
 } finally {
   // 取消所有任务
   for(future f : futures)
     f.cancel(true);
 }
 // 返回结果
 return r;

当需要批量提交异步任务,推荐completionservice。completionservice将线程池executor和阻塞队列融合,让批量异步任务管理更简单。

completionservice能让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用该特性,可以轻松实现后续处理的有序性,避免无谓等待,同时还可以快速实现诸如forking cluster这样的需求。

completionservice的实现类executorcompletionservice,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个executorcompletionservice的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

【星辰平台的版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。