在上一篇文章中,步后我们使用双异步后,何保从 191s 优化到 2s,证数有个小伙伴在评论区问我,使用双异如何保证插入后数据的步后一致性呢? 很简单,通过对比Excel文件行数和入库数量是何保否相等即可。 那么,证数如何获取异步线程的使用双异返回值呢? 我们可以通过给异步方法添加Future返回值的步后方式获取结果。 FutureTask 除了实现 Future 接口外,何保还实现了 Runnable 接口。证数因此,使用双异FutureTask 可以交给 Executor 执行,步后也可以由调用线程直接执行FutureTask.run()。何保 AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。香港云服务器 基于 AQS实现的同步器包含两种操作: get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。 自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。 很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~ 其实,只要把源码的云服务器提供商执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~ 简而言之: 1. 如果异步线程还没执行完,则进入CAS自旋; 2. 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程; 3. 再通过get()判断状态state; 4. 直至返回结果或(取消、超时、异常)为止。 通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。 { / * Sets this Future to the result of its computation * unless it has been cancelled. ; } { state; { s = state; (s <= COMPLETING) ); report(s); } { ; ; ; (;;) { (Thread.interrupted()) { removeWaiter(q); InterruptedException(); } s = state; (s > COMPLETING) { ) ; s; } Thread.yield(); ) WaitNode(); (!queued) , waitersOffset, q.next = waiters, q); (timed) { nanos = deadline - System.nanoTime(); ) { removeWaiter(q); state; } , nanos); } ); } } { Object x = outcome; (s == NORMAL) (V)x; (s >= CANCELLED) CancellationException(); ExecutionException((Throwable)x); } } ) { { ArrayList<>(); ; time < times; time++) { Future futureList.add(sumFuture); } (Exception e){ ,e); } } ) { { AsyncResult<>(sum); (Exception e){ ); } } { [futureList.size()]; ;i { Future ) { (future.isDone() && !future.isCancelled()) { Integer futureSum = future.get(); + future + futureSum); futureSumArr[i] += futureSum; ; { ); ); } } (Exception e) { , e); } } insertFlag = getInsertSum(futureSumArr, excelRow); +insertFlag); insertFlag; } 不过感觉多此一举了,就当练习Future异步取返回值了~ { ExecutorService service = Executors.newSingleThreadExecutor(); }; Runnable() { { { ] = getFutureResult(futureList, excelRow); (Exception e) { , e); ; } } }); service.shutdown(); ]); } 获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。 但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?来源:哪吒编程
unsetunset一、使用双异前情提要unsetunset
unsetunset二、通过Future获取异步返回值unsetunset
1、FutureTask 是基于 AbstractQueuedSynchronizer实现的
2、FutureTask执行流程
执行@Async异步方法;建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable);判断状态state;如果未新建或者不处于AQS,直接返回;否则进入COMPLETING状态,执行异步线程代码;如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1;线程async-executor-1被唤醒后将自己从AQS队列中移除;然后唤醒next线程async-executor-2;改变线程async-executor-1的state;等待get()线程取值。next等待线程被唤醒后,循环线程async-executor-1的步骤被唤醒从AQS队列中移除唤醒next线程改变异步线程状态新建线程async-executor-N,监听异步方法的state如果处于EXCEPTIONAL以上状态,抛出异常;如果处于COMPLETING状态,加入AQS队列等待;如果处于NORMAL状态,返回结果;
3、站群服务器get()方法执行流程
如果state小于等于COMPLETING,表示任务还在执行中;计算超时时间;如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state;阻塞队列nanos毫秒。如果已有等待节点WaitNode,将线程置空;返回当前状态;如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常;如果state大于COMPLETING;如果任务正在执行,让出时间片;如果还未构造等待节点,则new一个新的等待节点;如果未入队列,CAS尝试入队;如果有超时时间参数;否则阻塞队列;如果state大于COMPLETING;如果执行完毕,返回结果;如果大于等于取消状态,则抛出异常。
unsetunset三、FutureTask源码具体分析unsetunset
1、FutureTask源码
3、通过Future<Integer>.get()获取返回值:
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) 4、这里也可以通过新线程+Future获取Future返回值