Spring/Issue

STOMP와 RabbitMQ를 이용한 채팅 서비스

블로그 주인장 2024. 1. 30.

이전 포스팅에서 STOMP를 이용한 채팅서비스를 개발했었다.

 

https://miiro-under.tistory.com/273

 

Stomp를 활용한 웹소켓 구현

서론 Stomp를 이용하여 웹소켓을 구현하는 방법에 대해 알아보겠습니다. 이전 포스팅에서 Stomp에 관한 개념에 대해 설명했었습니다. https://miiro-under.tistory.com/272 Message Broker STOMP 개념 서론 Message를

miiro-under.tistory.com

 

여기서 간단히 정리를 하자면

 

STOMP 는 publish/subscribe(발행/구독) 구조로 간단하게 메시지를 선택해서 수신을 할 수 있다.

 

기본적으로 스프링에서는 내장 브로커를 제공하지만, 여러 문제점이 있다.

1. In Memory 형식으로 데이터의 유실 위험이 있다.

2. Spring boot 서버 내에서 함께 처리하기 때문에 서버의 부담도 커진다.

3. Message Queue를 모니터링하기 어렵다.

4. 추후, 서버를 여러 개를 사용한다면 메시지를 함께 처리할 수 없기에 확장성이 떨어진다는 점이다.

 

이를 기반으로 외부 메시지 브로커인 RabbitMQ를 이용해보려고 한다.

 

설치 방법

Windows에 RabbitMQ를 설치하는 방법은 해당 링크를 통해 확인하면 된다.

그 후 STOMP 관련 기능 설치는  공식사이트 링크 에서 제공하는 방법을 따라 진행하면 된다.

 

RabbitMQ 메시지 전달

  1. 송신자가 메시지를 보내면 브로커가 처리과정을 위임받는다.
  2. 일종의 우체통 역할을 하는 exchange로 먼저 전달해서 메시지를 분류한다.
  3. Topic은 라우팅 키를 패턴으로 검사하고 전달한다.
  4. exchange와 MessageQueue는 라우팅 키, 혹은 패턴으로 바인딩 되어있기에 일치하는 큐로 메시지를 전달한다.
  5. 수신자는 queue에서 메시지를 가져온다.

 

코드 구현

 

의존성(Dependencies) 추가

//RabbitMQ
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty'
implementation 'org.springframework.boot:spring-boot-starter-amqp'

 

 

WebSocketConfig 수정

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
  registry.enableStompBrokerRelay("/exchange")
      .setRelayHost("localhost")
      .setRelayPort(61613)
      .setClientLogin("guest")
      .setClientPasscode("guest")
      .setSystemLogin("guest")
      .setSystemPasscode("guest");

  registry.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
  registry.addEndpoint("/ws")
      .setAllowedOriginPatterns("*");
}

 

RabbitMQ는 기본적은 Queue.exchange의 이름이나 라우팅 키, 패턴 등을 작성할 때  '.' 을 구분자로 사용한다.

여기서 Port 61613은 RabbitMQ의 기본 포트라고 보면 된다.

 

그리고 Client, System의 로그인, 로그아웃 내용은 초기 상태인 "guest"로 설정하면 된다.

 

아래 코드는 기존 STOMP 사용 시에 적용했던 설정이다.

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
  //클라이언트에게 메시지 전달
  registry.enableSimpleBroker("/topic");
  //클라이언트의 Send 요청 처리
  registry.setApplicationDestinationPrefixes("/app");
}

 

여기서 기존 STOMP 설정과 다른 부분은 enableStompBrokerRelay 이다.

기존 내장 메모리를 사용할 경우에는 SimpleBroker를 사용했지만

외부 메시지 브로커를 사용할 때는 StompBrokerRelay를 사용한다는 것이 다르다.

 

 

ChattingConstant

  • RabbitMQ를 사용할 때 필요한 상수(Constant)를 따로 클래스화 시킨 것이다.
public class ChattingConstant {
  public static final String CHAT_QUEUE_NAME = "chat.queue";
  public static final String CHAT_EXCHANGE_NAME = "chat.exchange";
  public static final String ROUTING_KEY = "room.*";
}

 

 

RabbitConfig

@EnableRabbit
@Configuration
@RequiredArgsConstructor
public class RabbitConfig {
  /**
   * Queue 등록
   */
  @Bean
  public Queue queue() {
    return new Queue(ChattingConstant.CHAT_QUEUE_NAME, true);
  }

  /**
   * exchange 등록
   */
  @Bean
  public TopicExchange topicExchange() {
    return new TopicExchange(ChattingConstant.CHAT_EXCHANGE_NAME, true, false);
  }

  /**
   * Exchange와 queue 바인딩
   */
  @Bean
  public Binding binding() {
    return BindingBuilder.bind(queue())
        .to(topicExchange())
        .with(ChattingConstant.ROUTING_KEY);
  }

  /**
   * RabbitMQ 연결을 관리하는 클래스
   */
  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("guest");
    return factory;
  }

  /**
   * MessageConverter 커스터마이징 하기 위한 Bean 등록
   */
  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(jsonMessageConverter());
    rabbitTemplate.setRoutingKey(ChattingConstant.ROUTING_KEY);
    return rabbitTemplate;
  }

  /**
   * LocalDateTime Serializable 설정 컨버팅
   */
  @Bean
  public MessageConverter jsonMessageConverter() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true);
    objectMapper.registerModule(new JavaTimeModule());
    return new Jackson2JsonMessageConverter(objectMapper);
  }

  /**
   * RabbitMQ MessageListener 설정
   */
  @Bean
  SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setMessageConverter(jsonMessageConverter());
    return factory;
  }

}

 

@EnableRabbit 어노테이션을 사용하여 RabbitMQ를 사용할 수 있는 상태로 구현한다.

 

해당 Config에서 주요적으로 살펴볼 내용은 connectionFactory() 메서드이다.

factory.setHost("localhost") 는 현재 실행 중인 로컬 시스템에 RabbitMQ 서버에 연결하려는 것을 의미한다.

factory.setVirtualHost("/")는 가상 호스트(Virtual Host)를 설정합니다.

 

RabbitMQ는 가상 호스트를 사용하여 여러 개의 독립적인 브로커 환경을 구성할 수 있습니다.

각 가상 호스트는 독립적인 메시지 브로커로 간주되며 queue, exchange, binding, 권한 등을 해당 가상 호스트 내에서 관리하게 됩니다.

 

 

컨트롤러 설정

public class ChatMessageController {

  private final RabbitTemplate rabbitTemplate;
  private final ChatMessageService chatMessageService;

  @MessageMapping("/chat/message/{roomId}")
  public void sendMessage(@Payload ChatMessageDto chatDto, @DestinationVariable("roomId") Long roomId) {
    rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "room." + roomId,
        chatMessageService.chatMessage(chatDto, roomId));
  }

  //기본적으로 chat.queue가 exchange에 바인딩 되어있기 때문에 모든 메시지 처리
  @RabbitListener(queues = ChattingConstant.CHAT_QUEUE_NAME)
  public void receive(ChatMessageDto response) {
    log.info("ChatDto.getMessage() : {}", response.getMessage());
  }

}

 

RabbitConfig에서 미리 chat.queue를 만들어두고 room.* 을 라우팅 키로 사용하여 exchange에 연결해놓은 상태이다.

기존 Stomp와의 달리 '/' 를 사용하지 않고 '.' 을 이용하여 exchange 경로 + 라우팅 키 + payLoad 형식으로 사용한다.

또한, SimpMessagingTemplate -> RabbitTemplate을 이용하여 메시지를 컨버팅하여 발송한다.

 

 

여기서 @RabbitListener(queues = CHAT_QUEUE_NAME) 어노테이션은

'chat queue' 라는 Queue를 구독해 해당 큐에 들어온 메시지를 소비하는 소비자가 되겠다는 어노테이션이다.

이로 인해, exchange로 들어오는 모든 채팅방의 메시지를 receive()를 통해서 처리할 수 있다.

 

 

Test 진행하기

브라우저에 http://localhost:15672 를 검색하면 RabbitMQ 홈페이지를 확인할 수 있다.

기본적으로 ID와 Password는 guest 이다.

 

 

apic 을 이용해서 메시지를 발송해보겠습니다.

 

apic을 이용해서 메시지를 전달한 상태입니다.

message와 memberId를 Json 형태로 발송한 상태이고, 이를 바탕으로 프로그램 상 DTO 변환 시킨 내용이 나오는 것을 확인할 수 있습니다.

 

 

아래 로그를 통해 receive() 를 통해 처리된 내용의 메시지를 확인할 수 있다.

 

 

RabbitMQ의 Exchange 페이지의 일부를 첨부한 내용입니다.

위에서 메시지를 발송할 때 room.1 을 라우팅키로 exchange에 보낸 내용을 확인할 수 있습니다.

 

 

RabbitMQ의 Queues 페이지의 일부를 첨부한 내용입니다.

 

위의 이미지인 exchange 에서 나온 stomp-subscription 내용의 queue를 확인할 수 있다.

이를 통해 queue에 push된 메시지들은 해당 queue를 구독하고 있는 클라이언트(Consumer)들에게 소비된다.

 

마무리

채팅 서버를 구현하기 위해서 WebSocket + STOMP + RabbitMQ를 적용했습니다. 

이번 기회를 통해서 외부 메시지 브로커인 RabbitMQ에 대해 동작하는 방법이나, 설정 등에 대해

조금이나마 공부할 수 있었던 거 같다.

외부 메시지 브로커를 통해 채팅방에서 예외 발생으로 인한 메시지 유실이나, 대용량 트래픽 발생 시에

이점이 있다는 것을 알게 되었습니다.

 

 

References

https://dev-gorany.tistory.com/325

 

https://velog.io/@junsu1222/STOMP%EC%97%90-RabbitMQ%EB%A5%BC-%EC%B6%94%EA%B0%80%ED%95%B4%EB%B3%B4%EC%95%98%EB%8B%A4#stompwepsocketconfig

 

https://velog.io/@yyong3519/WebSocket-RabbitMQ

 

https://devsungwon.tistory.com/entry/Spring-%EC%86%8C%EC%BC%93%ED%86%B5%EC%8B%A0-%EC%9D%B4%EC%9A%A9%ED%95%9C-%EC%B1%84%ED%8C%85-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0-3-RabbitMQ-%EC%97%B0%EB%8F%99

 

 

 

본 포스트는 작성자가 공부한 내용을 바탕으로 작성한 글입니다.
잘못된 내용이 있을 시 언제든 댓글로 피드백 부탁드리겠습니다.
항상 정확한 내용을 포스팅하도록 노력하겠습니다.

반응형

댓글