RabbitMQを用いた長時間処理の進捗表示

システム概要

長時間処理の進捗をリアルタイム表示するシステムを実装します。フロントエンドからリクエスト受信後、バックグラウンドで処理を実行し、RabbitMQとWebSocketで進捗情報を送信します。

依存関係

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>

設定ファイル

spring.rabbitmq.host=rabbitmq-server
spring.rabbitmq.port=5672
server.port=8080

RabbitMQ設定

@Configuration
public class TaskQueueConfig {
    public static final String TASK_EXCHANGE = "task.progress.exchange";
    public static final String TASK_QUEUE = "task.progress.queue";
    
    @Bean
    public DirectExchange taskExchange() {
        return new DirectExchange(TASK_EXCHANGE);
    }
    
    @Bean
    public Queue progressQueue() {
        return new Queue(TASK_QUEUE);
    }
    
    @Bean
    public Binding taskBinding() {
        return BindingBuilder.bind(progressQueue())
               .to(taskExchange()).with("progress.key");
    }
}

WebSocket設定

@Configuration
@EnableWebSocketMessageBroker
public class SocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/progress-endpoint")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }
}

メッセージ処理

@Component
@RabbitListener(queues = TaskQueueConfig.TASK_QUEUE)
public class ProgressForwarder {
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @RabbitHandler
    public void forwardProgress(String percentage) {
        messagingTemplate.convertAndSend("/topic/progress-updates", percentage);
    }
}

タスク処理コントローラ

@RestController
public class TaskController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private final ExecutorService taskExecutor = 
        Executors.newFixedThreadPool(5);
    
    @PostMapping("/start-task")
    public ResponseEntity<String> startLongTask() {
        taskExecutor.execute(this::executeTask);
        return ResponseEntity.ok("処理を開始しました");
    }
    
    private void executeTask() {
        for (int i = 1; i <= 100; i++) {
            rabbitTemplate.convertAndSend(
                TaskQueueConfig.TASK_EXCHANGE,
                "progress.key",
                String.valueOf(i)
            );
            
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

フロントエンド実装

<script>
const socket = new SockJS('/progress-endpoint');
const client = Stomp.over(socket);

client.connect({}, () => {
    client.subscribe('/topic/progress-updates', (message) => {
        document.getElementById('progress').innerText = 
            `${message.body}% 完了`;
    });
});

function startTask() {
    fetch('/start-task', { method: 'POST' });
}
</script>

<button onclick="startTask()">処理開始</button>
<div id="progress">0% 完了</div>

タグ: RabbitMQ SpringBoot websocket 非同期処理 進捗管理

5月26日 11:31 投稿