选择语言
durumis AI 总结的文章
- 說明如何使用 Spring @Async 實作 Java 異步處理,以及其優點和注意事項。
- 使用 Spring @Async,您可以透過新增 @EnableAsync 注解,並在需要進行異步處理的方法上附加 @Async 注解,來輕鬆實作異步處理。
- 您可以透過執行緒池設定來有效管理執行緒,並使用 Future、ListenableFuture、CompletableFuture 等回傳類型來回傳異步處理結果。
Java 異步處理
在深入了解 Spring @Async 之前,同步、異步和多線程的概念是必不可少的。假設您已經了解這些概念, 讓我們通過代碼來了解純 Java 異步處理的方式。如果您熟悉 Java 線程,則可以跳過本章節。
public class MessageService {
public void print(String message) {
System.out.println(message);
}
}
public class Main {
public static void main(String[] args) {
MessageService messageService = new MessageService();
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
如果要以同步方式創建接收消息並簡單打印消息的功能,則可以像上面一樣編寫代碼。如果將其更改為多線程 異步方式,則可以按如下方式編寫源代碼。
public class MessageService {
public void print(String message) {
new Thread(() -> System.out.println(message))
.start();
}
}
public class Main {
public static void main(String[] args) {
MessageService messageService = new MessageService();
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
但是這種方法無法管理 Thread,因此非常危險。例如,如果同時進行 10,000 次調用,則需要在很短的時間內 創建 10,000 個 Thread。創建 Thread 的成本並不低,因此會對程序的性能產生負面影響,甚至可能導致 OOM 錯誤。 因此,需要實現 Thread Pool 來管理 Thread,Java 提供了 ExecutorService 類。
public class MessageService {
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void print(String message) {
executorService.submit(() -> System.out.println(message));
}
}
public class Main {
public static void main(String[] args) {
MessageService messageService = new MessageService();
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
將所有線程的數量限制為 10 個,並且我們能夠正確地執行所需的基於多線程的異步處理。 但是,對於每個需要異步處理的方法,上述方法都需要應用 ExecutorService 的 submit() 方法,因此需要進行重複的修改工作。 也就是說,如果最初要將以同步邏輯編寫的方法更改為異步方法,則必須不可避免地更改方法本身的邏輯。
Spring @Async
簡單方法
@EnableAsync
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Service
public class MessageService {
@Async
public void print(String message) {
System.out.println(message);
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() {
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
將 @EnableAsync 註解添加到 Application 類之上,並將 @Async 註解添加到要異步處理的同步邏輯方法之上即可。 但是,這種方法存在不管理線程的問題。這是因為 @Async 的默認設置是使用 SimpleAsyncTaskExecutor, 它不是線程池,而只是創建線程。
使用線程池的方法
首先,從 Application 類中刪除 @EnableAsync。如果 Application 類已設置 @EnableAutoConfiguration 或 @SpringBootApplication,則運行時將讀取 SpringAsyncConfig 類(稍後將創建)中定義的 @Configuration threadPoolTaskExecutor bean 信息。
@Configuration
@EnableAsync // 應附加到 Async 設置類,而不是 Application
public class SpringAsyncConfig {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3); // 默認線程數
taskExecutor.setMaxPoolSize(30); // 最大線程數
taskExecutor.setQueueCapacity(100); // 隊列大小
taskExecutor.setThreadNamePrefix("Executor-");
return taskExecutor;
}
可以設置 core 和 max 大小。在這種情況下,可以預期最初僅使用 core 大小的線程,如果無法處理超過此數量的任務, 則將增加到 max 大小的線程,但實際上並非如此。
內部將創建大小為 Integer.MAX_VALUE 的 LinkedBlockingQueue,如果 core 大小的線程無法處理任務,則將等待在隊列中。 如果隊列已滿,則將創建 max 大小的線程來處理任務。
如果設置 queueCapacity 為 Integer.MAX_VALUE 的大小過於繁重,則可以設置 queueCapacity。 如果像上面一樣設置,則將使用最初的 3 個線程進行處理,如果處理速度變慢,則將任務放入大小為 100 的隊列中, 如果收到更多請求,則將創建最多 30 個線程來處理任務。
線程池設置完成後,只需在添加了 @Async 註解的方法中添加 bean 的名稱即可。
@Service
public class MessageService {
@Async("threadPoolTaskExecutor")
public void print(String message) {
System.out.println(message);
}
如果要設置多種類型的線程池,則可以創建多個 bean 生成方法,如 threadPoolTaskExecutor(), 並在設置 @Async 時添加所需的線程池 bean。
不同返回類型對應的返回形式
無返回值的情況
在需要異步處理的方法不需要傳遞處理結果的情況下。在這種情況下,可以將 @Async 註解的返回類型設置為 void。
返回值的情況
可以將 Future、ListenableFuture 和 CompletableFuture 類型用作返回類型。可以將異步 方法的返回形式包裝在 new AsyncResult() 中。
[Future]
@Service
public class MessageService {
@Async
public Future print(String message) throws InterruptedException {
System.out.println("Task Start - " + message);
Thread.sleep(3000);
return new AsyncResult<>("jayon-" + message);
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() throws ExecutionException, InterruptedException {
for (int i = 1; i <= 5; i++) {
Future future = messageService.print(i + "");
System.out.println(future.get());
}
}
}
// 執行結果
Task Start - 1
jayon-1
Task Start - 2
jayon-2
Task Start - 3
jayon-3
Task Start - 4
jayon-4
Task Start - 5
future.get() 會阻塞,直到收到請求結果。因此,它變成了異步阻塞方式,性能不佳。通常不使用 Future。
[ListenableFuture]
@Service
public class MessageService {
@Async
public ListenableFuture print(String message) throws InterruptedException {
System.out.println("Task Start - " + message);
Thread.sleep(3000);
return new AsyncResult<>("jayon-" + message);
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() throws InterruptedException {
for (int i = 1; i <= 5; i++) {
ListenableFuture listenableFuture = messageService.print(i + "");
listenableFuture.addCallback(System.out::println, error -> System.out.println(error.getMessage()));
}
}
}
// 執行結果
Task Start - 1
Task Start - 3
Task Start - 2
jayon-1
jayon-2
Task Start - 5
jayon-3
Task Start - 4
jayon-4
ListenableFuture 可以通過回調以非阻塞的方式處理任務。addCallback() 方法的第一個參數是任務完成回調方法, 第二個參數是任務失敗回調方法。值得注意的是,由於線程池的 core 線程設置為 3 個,因此可以確認“Task Start”消息 最初出現了 3 次。
[CompletableFuture]
雖然 ListenableFuture 可以實現非阻塞邏輯,但如果需要在回調中使用回調,則會導致非常複雜的代碼, 稱為“回調地獄”。
當然,本次時間不會詳細介紹 CompletableFuture,因此,如果您想知道如何處理複雜的回調,請參考以下 鏈接。
@Service
public class MessageService {
@Async
public CompletableFuture print(String message) throws InterruptedException {
System.out.println("Task Start - " + message);
Thread.sleep(3000);
return new AsyncResult<>("jayon-" + message).completable();
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() throws InterruptedException {
for (int i = 1; i <= 5; i++) {
CompletableFuture completableFuture = messageService.print(i + "");
completableFuture
.thenAccept(System.out::println)
.exceptionally(error -> {
System.out.println(error.getMessage());
return null;
});
}
}
可讀性比 ListenableFuture 的回調定義更好,並完全執行非阻塞功能。因此,建議在使用 @Async 時, 如果需要返回值,則使用 CompletableFuture。
@Async 的優點
開發人員可以以同步方式編寫方法,如果需要異步方式,只需將 @Async 註解添加到方法之上即可。 因此,可以創建易於維護的同步和異步代碼。
@Async 的注意事項
為了使用 @Async 功能,需要聲明 @EnableAsync 註解,如果沒有單獨設置,它將以代理模式運行。 也就是說,使用 @Async 註解運行的異步方法將遵循 Spring AOP 的所有限制。 有關詳細原因,請參考該帖子。
- 即使將 @Async 添加到私有方法中,AOP 也不會生效。
- 在同一個對象內的方法之間調用時,AOP 也不會生效。