/** * 一个有缺陷的线程池构造器 * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param blockingQueueSize 拥塞队列尺寸 * @param tag 线程标签 * @return 有缺陷的线程池 */ publicstatic ThreadPoolExecutor arrayBlockingExecutor(int corePoolSize, int maximumPoolSize , long keepAliveTime, TimeUnit unit, int blockingQueueSize, final String tag) { log.info("create thread pool with blocking array queue, " + "tag : {}, corePoolSize : {}, maximumPoolSize : {}, " + "keepAliveTime : {}, unit : {}, blockingQueueSize : {}", tag, corePoolSize, maximumPoolSize, keepAliveTime, unit, blockingQueueSize ); returnnewThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, newArrayBlockingQueue<>(blockingQueueSize), r -> { Threadthread=newThread(r); thread.setName(Joiner.on("_").join(tag, "thread", Thread.NORM_PRIORITY)); thread.setPriority(Thread.NORM_PRIORITY); return thread; }, (r, executor) -> log.error( "{}_thread_full:queue_size={}, ActiveCount={}, CorePoolSize={}, CompletedTaskCount={}", executor.getQueue().size(), executor.getActiveCount(), executor.getCorePoolSize(), executor.getCompletedTaskCount() ) ); }
// 生成一系列批量任务 final List<Callable<Integer>> tasks = IntStream.range(1, 10).mapToObj((i) -> { returnnewCallable<Integer>() { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ @Override public Integer call()throws Exception { // 模仿长时间操作,防止线程一提交即立即执行完 Thread.sleep(1000L); return i; } }; }).collect(Collectors.toList()); try { testPool.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } // 永远打不出这个语句出来 log.info("this executor can run through all tasks"); testPool.shutdownNow(); // 永远打不出这个语句出来 log.info("good bye"); }
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * 简单翻译下 Doug Lea 的设计思想: * 1. 如果线程总数小于 corePoolSize 先加线程 * 2. 不行就把线程入队 * 3. 不行就增加线程 * 4. 不行就想办法拒绝这个线程 * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) addWorker(null, false); } elseif (!addWorker(command, false)) reject(command); }
/** * @throws CancellationException {@inheritDoc} */ public V get()throws InterruptedException, ExecutionException { ints= state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
/** * A handler for rejected tasks that silently discards the * rejected task. */ publicstaticclassDiscardPolicyimplementsRejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ publicDiscardPolicy() { }
/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) { // 这个方法的问题在于,它什么都没有做 } }
publicstaticvoidmain(String[] args) { AtomicIntegercounter=newAtomicInteger(0); // a very small thread pool which is easy to fulfilled ExecutorServicetestPool=newThreadPoolExecutor( 1, 1, 1, TimeUnit.SECONDS, newArrayBlockingQueue<>(1), r -> { Threadthread=newThread(r); thread.setName("test-thread" + counter.getAndIncrement()); return thread; }, newThreadPoolExecutor.DiscardPolicy() );
// gengerate several jobs here final List<Callable<Integer>> tasks = IntStream.range(1, 10).mapToObj((i) -> { returnnewCallable<Integer>() { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ @Override public Integer call()throws Exception { // sleep to simulate long-run job Thread.sleep(1000L); return i; } }; }).collect(Collectors.toList()); try { testPool.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } // you will never see this log.info("this executor can run through all tasks"); testPool.shutdownNow(); // you will never see this log.info("good bye"); }
Unfortunately shutdownNow returns a List so it’s not clear (to me at least) how the caller of shutdownNow can cancel those tasks. Perhaps a note in the documentation could recommend something, though I’m not sure what the recommendation would be.
这个问题的 workaround 非常丑陋:
1 2 3 4 5 6 7 8 9
Stuart makes a good point. There is a mismatch between the Runnable-oriented Executor interface and the Future-oriented ExecutorService interface. With ThreadPoolExecutor, if you execute() a Runnable, that Runnable is enqueued and returned by shutdownNow. But if you submit() a Runnable, a FutureTask is enqueued and returned by shutdownNow.
So in fact users can in practice do
for (Runnable r : pool.shutdownNow()) { if (r instanceof Future) ((Future)r).cancel(false); }
but I don't think we guarantee that they can do this. Is there some way we can improve the docs?