目次
メッセージミドルウェア概要
同期通信と非同期通信の比較
結合の緩和:
非同期による結合緩和
トラフィックスムージング:
従来のHTTPリクエストの課題:
具体例:
コードデモンストレーション:
解決策:
1.マルチスレッドによるビジネスロジック処理(非同期操作の実現):
2.MQによるビジネスロジック処理(非同期操作の実現):
MQの二つの実装バージョン:
1.ネットワークなしでのMQ実装:マルチスレッドによるプロデューサーとコンシューマーの作成
2.ネットワーク通信ベースのMQ実装:Nettyを使用
MQとマルチスレッドの違い:
MQメッセージミドルウェアの専門用語
メッセージミドルウェア
メッセージミドルウェアはキューモデルに基づいて非同期/同期データ転送を実現します。
役割:高並発対応、非同期結合緩和、トラフィックスムージング、結合度の低減を実現できます。
同期通信と非同期通信
通常、データ送信(生産)と消費(処理)の方法を指します。
結合の緩和:
結合の緩和とは、本来密接に関連または相互依存しているコンポーネント、機能、システムを分離し、それらが他の部分に影響を与えずに独立して実行、変更、拡張できるようにすることを意味します。
プロセスまたはスレッドにおいて、結合の緩和により、複数ステップの操作を逐次実行するのではなく並列実行でき、効率が向上します。例えば、ダウンロードと解析のシナリオでは、ダウンロード関数と解析関数をプロセスプールまたはスレッドプールで並列実行し、ダウンロードが完了したものから解析することで結合を緩和できます。
非同期による結合緩和
並列プログラミングまたはシステム設計において、非同期処理の方式を通じてシステムコンポーネントまたは機能間の結合を緩和し、それらが並列実行、独立した拡張ができ、相互にブロックまたは依存しないようにすることと理解できます。
トラフィックスムージング:
トラフィックスムージングとは、一時的なリクエストピークを技術的手法で削減し、システムのスループットがピーク時のリクエスト下でも制御可能な状態を保つことを指します。
従来のHTTPリクエストには多くの欠点があります:
1.高並発状況下で、大量のリクエストがサーバー側に送られ、サーバー側のリクエスト処理が蓄積します。
2.Tomcatサーバーは各リクエストを独立したスレッドで処理し、最大スレッド数を超えるとそのリクエストをキューにキャッシュします。リクエストが過剰に蓄積すると、Tomcatサーバーがクラッシュする可能性があります。
そのため、通常はnginxの入口でリクエスト制限を実装し、サービス保護フレームワークを統合します。
3.HTTPリクエストのビジネスロジック処理は比較的時間がかかり、クライアントが待機し続ける原因となり、ブロック待機中にクライアントがタイムアウトして再送信し、べき等性の問題を引き起こす可能性があります。
注意事項:インターフェースがHTTPプロトコルの場合、時間のかかるビジネスロジックの処理は避けるべきです。時間のかかるビジネスロジックは、マルチスレッドまたはMQに任せるべきです。
具体例:
クライアントがリクエストをサーバー側に送信し、サーバー側で会員登録のビジネスロジックを実装します。
1.insertMember() -- 会員データの挿入 1秒
2.sendSms() -- ログインSMS通知の送信 3秒
3.sendCoupons() -- 新人クーポンの送信 3秒
合計で7秒の応答時間が必要となり、クライアントが7秒間ブロックされる可能性があり、ユーザーエクスペリエンスに悪影響を与えます。
コードデモンストレーション:
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MemberController {
@RequestMapping("/registerMember")
public String registerMember(){
//1.データベースにデータを挿入
System.out.println(">データ挿入開始<");
sendNotification();
System.out.println(">通知処理完了<");
return "ユーザー登録完了!!";
}
private String sendNotification(){
System.out.println(">通知処理開始<");
try{
System.out.println(">通知を送信中<");
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}
System.out.println(">通知処理完了<");
return "通知送信完了!";
}
}
解決策:
マルチスレッドとMQによる非同期実現
小規模プロジェクトではマルチスレッドで非同期を実現できますが、大規模プロジェクトではMQを使用する必要があります。なぜなら、マルチスレッドは高並発時にCPUに負荷をかけるためです。
1.マルチスレッドによるビジネスロジック処理(非同期操作の実現):
ユーザーがデータベースにデータを挿入した後、別のスレッドを開いてSMSとクーポン操作を非同期で送信します。クライアントは1秒待つだけで済みます。
利点:小規模プロジェクトに適し、非同期を実現できます。
欠点:サーバーのCPUリソースを消費する可能性があります。
コードデモンストレーション:
非同期クラス:
メイン設定クラスにアノテーションを追加:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
//非同期クラスで通知メソッドを実装
@Component
public class NotificationService
{
@Async
public String sendNotification(){
System.out.println(">通知処理開始<");
System.out.println(">通知を送信中<");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(">通知処理完了<");
return "通知送信完了!!";
}
}
解説:@Asyncの役割
これはメソッドが非同期であることを宣言するために使用されます。
@Asyncアノテーションをメソッドに使用し、関連する非同期サポートを正しく構成すると、そのメソッドはメインスレッドで同期的に実行されるのではなく、Springのタススケジューラ(通常はスレッドプール)によって非同期的に実行されます。
非同期サポートを構成するには、設定クラスに@EnableAsyncアノテーションを追加するか、XML設定にを追加することで実現します。
2.MQによるビジネスロジック処理(非同期操作の実現):
プロデューサーがデータベースにデータを保存すると、メッセージミドルウェアにmsgの通知メッセージを送信します。コンシューマーが存在する場合、メッセージミドルウェアは最初にメッセージキューからメッセージをプルします。その後、コンシューマーが存在する限り、サーバー(メッセージミドルウェア)はメッセージをコンシューマー側にプッシュします。
MQの二つの実装バージョン:
1.ネットワークなしでのMQ実装:マルチスレッドによるプロデューサーとコンシューマーの作成
コード実装:
package com.example.async;
import org.json.JSONObject;
import java.util.concurrent.LinkedBlockingDeque;
/**
* ネットワークなしでのMQ実装
* マルチスレッドを使用してプロデューサーとコンシューマーを作成
*/
public class SimpleThreadMQ {
//MQサーバー メッセージキューの初期化
private static LinkedBlockingDeque<JSONObject> messageQueue = new LinkedBlockingDeque<>();
//メイン関数 プログラムのエントリーポイント
public static void main(String[] args) {
//プロデューサー 生成スレッド
Thread producerThread = new Thread(() -> {
try {
while(true){
Thread.sleep(1000);
JSONObject userData = new JSONObject();
userData.put("userId", "67890");
//メッセージキューに保存
messageQueue.offer(userData);
}
} catch (Exception e) {
e.printStackTrace();
}
}, "プロデューサー");
producerThread.start();
//コンシューマー 消費スレッド
Thread consumerThread = new Thread(() -> {
try {
while(true){
JSONObject userData = messageQueue.poll();
if(userData != null){
System.out.println(Thread.currentThread().getName() + ",データ取得:" + userData);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}, "コンシューマー");
consumerThread.start();
}
}
2.ネットワーク通信ベースのMQ実装:Nettyを使用
説明:
コンシューマーのNettyクライアントはNettyサーバー側のMQサーバーとの間に長期接続を維持し、MQサーバーはコンシューマーの接続を保持します。
プロデューサーのNettyクライアントはリクエストをNettyサーバー側のMQサーバーに送信し、MQサーバーはそのメッセージ内容をコンシューマーに送信します。
プロデューサーがメッセージをMQサーバーに送信すると、MQサーバーはそのメッセージをキャッシュする必要があります。
MQサーバーがダウンした後、メッセージが失われないことをどのように保証しますか?
答え:永続化メカニズムにより、ディスクに保存されます。
MQがプロデューサーからメッセージを受信したが、コンシューマーが存在しない場合、そのメッセージは失われますか?
答え:失われません。なぜなら、メッセージ確認メカニズムがあり、コンシューマーがメッセージを正常に消費した後、MQサーバーにそのメッセージを削除することを通知する必要があるからです。
(MQとコンシューマー間の情報伝達:まずプルし、長期接続を確立し、その後プッシュする)
複数のコンシューマーが、トピックがない場合(または同じトピック内で)同じメッセージを消費しますか?
答え:しません。各コンシューマーが消費すると、MQに「消費完了、削除可」と通知します。メッセージ確認メカニズムにより、重複消費を回避します。同じグループのコンシューマー間では重複消費が発生しません。
MQが高並発をどのように処理しますか?
答え:MQコンシューマーは自身の能力に基づいて、MQサーバーからメッセージを消費します。デフォルトでは1つのメッセージを取得します。
欠点:1.遅延の問題がある 2.MQコンシューマーの速度向上を考慮する必要がある
コンシューマーの速度をどのように向上させますか?
答え:コンシューマーのクラスター化、コンシューマーによるメッセージのバッチ取得を実行します。
MQとマルチスレッドの違い:
MQは非同期/結合緩和/トラフィックスムージングの問題を実現できます。
マルチスレッドも非同期を実現できますが、CPUリソースを消費し、結合を緩和していません。
MQメッセージミドルウェアの専門用語
Producer プロデューサー:メッセージをMQサーバーに送信します。
Consumer コンシューマー:MQサーバーからメッセージを取得してビジネスロジックを処理します。
Broker MQサーバー:
Topic トピック:ビジネスロジックの分類(SMS送信トピック、クーポン送信トピックなど)
Queue メッセージモデルのキュー:FIFO(先入れ先出し)原則 配列/連結リスト
Message プロデューサーが送信するメッセージ本文:JSON例: body:{"msg":{"userId":"654321","age":"30"},"type":"producer","topic":"notification"}