ThreadPoolExecutor是一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。
基本介紹
- 外文名:ThreadPoolExecutor
- 本質:一個 ExecutorService
- 適用: Executors 工廠方法配置
- 作用:執行每個提交的任務
定義,設定,按需構造,創建新執行緒,保持活動時間,排隊,使用方法,佇列維護,
定義
執行緒池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量異步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計數據,如完成的任務數。
為了便於跨大量上下文使用,此類提供了很多可調整的參數和擴展鉤子 (hook)。但是,強烈建議程式設計師使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池)和 Executors.newSingleThreadExecutor()(單個後台執行緒),它們均為大多數使用場景預定義了設定。否則,在手動配置和調整此類時,使用以下指導:
核心和最大池大小
ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見 getMaximumPoolSize())設定的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,如果運行的執行緒少於 corePoolSize,則創建新執行緒來處理請求,即使其他輔助執行緒是空閒的。如果運行的執行緒多於 corePoolSize 而少於 maximumPoolSize,則僅當佇列滿時才創建新執行緒。如果設定的 corePoolSize 和 maximumPoolSize 相同,則創建了固定大小的執行緒池。如果將 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的並發任務。在大多數情況下,核心和最大池大小僅基於構造來設定,不過也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。
設定
按需構造
默認情況下,即使核心執行緒最初只是在新任務到達時才創建和啟動的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。如果構造帶有非空佇列的池,則可能希望預先啟動執行緒。
創建新執行緒
使用 ThreadFactory 創建新執行緒。如果沒有另外說明,則在同一個 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 創建執行緒,並且這些執行緒具有相同的 NORM_PRIORITY 優先權和非守護進程狀態。通過提供不同的 ThreadFactory,可以改變執行緒的名稱、執行緒組、優先權、守護進程狀態,等等。如果從 newThread 返回 null 時 ThreadFactory 未能創建執行緒,則執行程式將繼續運行,但不能執行任何任務。
保持活動時間
如果池中當前有多於 corePoolSize 的執行緒,則這些多出的執行緒在空閒時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當池處於非活動狀態時減少資源消耗的方法。如果池後來變得更為活動,則可以創建新的執行緒。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在關閉前有效地從以前的終止狀態禁用空閒執行緒。默認情況下,保持活動策略只在有多於 corePoolSizeThreads 的執行緒時套用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可將此逾時策略套用於核心執行緒。
排隊
所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此佇列與池大小進行互動:
如果運行的執行緒少於 corePoolSize,則 Executor 始終首選添加新的執行緒,而不進行排隊。
如果運行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不添加新的執行緒。
如果無法將請求加入佇列,則創建新的執行緒,除非創建此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。
排隊有三種通用策略:
直接提交。工作佇列的默認選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即運行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,創建的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
被拒絕的任務
當 Executor 已經關閉,並且 Executor 將有限邊界用於最大執行緒和工作佇列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程式策略:
在默認的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將拋出運行時 RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。
定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。
使用方法
鉤子 (hook) 方法
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後調用。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、蒐集統計信息或添加日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成的所有特殊處理。
如果鉤子 (hook) 或回調方法拋出異常,則內部輔助執行緒將依次失敗並突然終止。
佇列維護
方法 getQueue() 允許出於監控和調試目的而訪問工作佇列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。
終止
程式 AND 不再引用的池沒有剩餘執行緒會自動 shutdown。如果希望確保回收取消引用的池(即使用戶忘記調用 shutdown()),則必須安排未使用的執行緒最終終止:設定適當保持活動時間,使用 0 核心執行緒的下邊界和/或設定 allowCoreThreadTimeOut(boolean)。
擴展示例。此類的大多數擴展可以重寫一個或多個受保護的鉤子 (hook) 方法。例如,下面是一個添加了簡單的暫停/恢復功能的子類:
class PausableThreadPoolExecutor extends ThreadPoolExecutor { private boolean isPaused; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unpaused = pauseLock.newCondition(); public PausableThreadPoolExecutor(...) { super(...); } protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try { while (isPaused) unpaused.await(); } catch(InterruptedException ie) { t.interrupt(); } finally { pauseLock.unlock(); } } public void pause() { pauseLock.lock(); try { isPaused = true; } finally { pauseLock.unlock(); } } public void resume() { pauseLock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { pauseLock.unlock(); } } }