言語を選択
durumis AIが要約した文章
- Java 非同期処理を Spring @Async を使用して実装する方法、利点、注意事項を説明します。
- Spring @Async を使用すると、@EnableAsync アノテーションを追加し、非同期処理を希望するメソッドに @Async アノテーションを 付けることで、簡単に非同期処理を実装できます。
- スレッドプール設定により、効率的なスレッド管理が可能になり、戻り値の型に応じて Future、ListenableFuture、 CompletableFuture などを使用して非同期処理結果を返すことができます。
Java 非同期処理
Spring @Async を見ていく前に、同期、非同期、マルチスレッドの概念は必須です。これらの概念を理解していることを前提とし、 純粋な Java 非同期処理方法をコードで見てみましょう。もし Java スレッドに慣れているなら、この章は飛ばしても構いません。
public class MessageService {
public void print(String message) {
System.out.println(message);
}
}
public class Main {
public static void main(String[] args) {
MessageService messageService = new MessageService();
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
もし message を受け取って単純に出力する機能を同期方式で作成するなら、上記の様にコードを作成できます。これをマルチスレッディング 非同期方式に変えるなら、以下の様にソースコードを作成できます。
public class MessageService {
public void print(String message) {
new Thread(() -> System.out.println(message))
.start();
}
}
public class Main {
public static void main(String[] args) {
MessageService messageService = new MessageService();
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
しかし、この方法は Thread を管理することができないため、非常に危険です。例えば、同時に 10,000 個の呼び出しが行われた場合、非常に短い 時間で Thread を 10,000 個生成する必要があります。Thread を生成するコストは少なくないため、プログラムのパフォーマンスに悪影響を 及ぼし、最悪の場合 OOM エラーが発生する可能性があります。そのため、Thread を管理するには Thread Pool を実装する必要があり、 Java では ExecutorService クラスを提供しています。
public class MessageService {
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void print(String message) {
executorService.submit(() -> System.out.println(message));
}
}
public class Main {
public static void main(String[] args) {
MessageService messageService = new MessageService();
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
全 Thread の数を 10 個に制限し、私たちが望むマルチスレッディング方式の非同期処理も正しく行えるようになりました。 しかし、上記の方法では、非同期方式で処理したいメソッドごとに ExecutorService の submit() メソッドを適用する必要があり、 反復的な修正作業を行う必要があります。つまり、最初は同期ロジックで作成したメソッドを非同期に変えたい場合、メソッド自体のロジック 変更は避けられないということです。
Spring @Async
簡単な方法
@EnableAsync
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Service
public class MessageService {
@Async
public void print(String message) {
System.out.println(message);
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() {
for (int i = 1; i <= 100; i++) {
messageService.print(i + "");
}
}
@EnableAsync アノテーションを Application クラスの上に付け、非同期方式で処理したい同期ロジックの メソッドの上に @Async アノテーションを付ければ完了です。しかし、上記の方法ではスレッドを管理していないという問題があります。なぜなら @Async のデフォルト設定は SimpleAsyncTaskExecutor を使用することになっているのですが、これはスレッドプールではなく、単に スレッドを作成する役割を果たすからです。
スレッドプールを使用する方法
まず、Application クラスから @EnableAsync を削除します。Application クラスに @EnableAutoConfiguration または @SpringBootApplication 設定がある場合、実行時に @Configuration が設定された SpringAsyncConfig クラス(後で作成する予定)の threadPoolTaskExecutor Bean 情報を読み込むためです。
@Configuration
@EnableAsync // Application ではなく、Async 設定クラスに付ける必要がある
public class SpringAsyncConfig {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3); // デフォルトのスレッド数
taskExecutor.setMaxPoolSize(30); // 最大スレッド数
taskExecutor.setQueueCapacity(100); // Queue サイズ
taskExecutor.setThreadNamePrefix("Executor-");
return taskExecutor;
}
core と max サイズを設定できます。この時、最初の core サイズで動作し、これ以上処理できない場合は、max サイズまでスレッドが増加すると予想されますが、そうではありません。
内部的には Integer.MAX_VALUE サイズの LinkedBlockingQueue を生成し、core サイズのスレッドで 処理できない場合は、Queue で待機します。Queue がいっぱいになると、その時に max サイズまでスレッドを生成して 処理されます。
この時、Queue サイズを Integer.MAX_VALUE に設定するのが負担な場合は、queueCapacity を設定できます。 上記の様に設定すると、最初の 3 つのスレッドで処理を行い、処理速度が遅くなると、作業を 100 個のサイズの Queue に入れておき、 それ以上のリクエストが入ると、最大 30 個のスレッドを作成して処理を行います。
スレッドプールの設定が完了したら、@Async アノテーションが付いているメソッドに、上記の Bean の名前を付ければOKです。
@Service
public class MessageService {
@Async("threadPoolTaskExecutor")
public void print(String message) {
System.out.println(message);
}
もしスレッドプールの種類を複数設定したい場合は、threadPoolTaskExecutor() のような Bean 生成メソッドを複数 作成し、@Async 設定時に希望のスレッドプール Bean を入れるだけです。
リターンタイプ別の返される形式
リターン値がない場合
非同期で処理する必要があるメソッドが処理結果を渡す必要がない場合です。この場合は、@Async アノテーションのリターンタイプを void に設定すればOKです。
リターン値がある場合
Future、ListenableFuture、CompletableFuture タイプをリターンタイプとして使用できます。非同期 メソッドの返り値形式を new AsyncResult() で囲みます。
[Future]
@Service
public class MessageService {
@Async
public Future print(String message) throws InterruptedException {
System.out.println("Task Start - " + message);
Thread.sleep(3000);
return new AsyncResult<>("jayon-" + message);
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() throws ExecutionException, InterruptedException {
for (int i = 1; i <= 5; i++) {
Future future = messageService.print(i + "");
System.out.println(future.get());
}
}
}
// 実行結果
Task Start - 1
jayon-1
Task Start - 2
jayon-2
Task Start - 3
jayon-3
Task Start - 4
jayon-4
Task Start - 5
future.get() は、ブロッキングによってリクエスト結果が来るまで待つ役割を果たします。そのため、非同期ブロッキング方式になってしまい、 パフォーマンスが良くありません。通常、Future は使用しません。
[ListenableFuture]
@Service
public class MessageService {
@Async
public ListenableFuture print(String message) throws InterruptedException {
System.out.println("Task Start - " + message);
Thread.sleep(3000);
return new AsyncResult<>("jayon-" + message);
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() throws InterruptedException {
for (int i = 1; i <= 5; i++) {
ListenableFuture listenableFuture = messageService.print(i + "");
listenableFuture.addCallback(System.out::println, error -> System.out.println(error.getMessage()));
}
}
}
// 実行結果
Task Start - 1
Task Start - 3
Task Start - 2
jayon-1
jayon-2
Task Start - 5
jayon-3
Task Start - 4
jayon-4
ListenableFuture は、コールバックを通じてノンブロッキング方式で作業を処理できます。addCallback() メソッドの最初の パラメータは作業完了コールバックメソッド、2 つ目のパラメータは作業失敗コールバックメソッドを定義すればOKです。ちなみに、スレッドプールの core スレッドを 3 つに設定したため、”Task Start” メッセージが最初に 3 つ表示されることがわかります。
[CompletableFuture]
ListenableFuture 単体でもノンブロッキングロジックを実装できますが、コールバックの中にコールバックが必要な場合、コールバック地獄と呼ばれる 非常に複雑なコードを発生させてしまいます。
もちろん、今回は CompletableFuture を詳しく解説することはありませんので、複雑なコールバックに対処するコードが気になる場合は、下記 ソースのリンクを参照してください。
@Service
public class MessageService {
@Async
public CompletableFuture print(String message) throws InterruptedException {
System.out.println("Task Start - " + message);
Thread.sleep(3000);
return new AsyncResult<>("jayon-" + message).completable();
}
}
@RequiredArgsConstructor
@RestController
public class MessageController {
private final MessageService messageService;
@GetMapping("/messages")
@ResponseStatus(code = HttpStatus.OK)
public void printMessage() throws InterruptedException {
for (int i = 1; i <= 5; i++) {
CompletableFuture completableFuture = messageService.print(i + "");
completableFuture
.thenAccept(System.out::println)
.exceptionally(error -> {
System.out.println(error.getMessage());
return null;
});
}
}
ListenableFuture のコールバック定義よりも可読性が向上し、ノンブロッキング機能を完璧に実行します。そのため、@Async を 使用する際に、リターン値が必要な場合は、CompletableFuture を使用することをお勧めします。
@Async のメリット
開発者は、メソッドを同期方式で作成し、非同期方式が必要になった場合は、単に @Async アノテーションをメソッドの上に付ければOKです。 そのため、同期、非同期についてメンテナンス性の良いコードを作成できます。
@Async の注意点
@Async 機能を使用するには、@EnableAsync アノテーションを宣言する必要がありますが、この時、別途設定しないとプロキシ モードで動作します。つまり、@Async アノテーションで動作する非同期メソッドはすべて、Spring AOP の制約をそのまま受け継ぐことになります。 詳しい理由はこの投稿を参照してください。
- private メソッドに @Async を付けても、AOP は動作しません。
- 同じオブジェクト内のメソッド間で呼び出しても、AOP は動作しません。