서론
NDC 2.2.0에서는 NEB 서비스가 추가되면서 NEB와의 연동을 메인으로 개발하게 되었습니다. 연동 기능 중 사용자에 대한 동기화가 필요하여 많은 논의를 통해 Message Queue를 활용하는 방향으로 결정됩니다.
NDC에서 사용하지 않던 모듈 이었으므로 자료 조사를 통한 결과와 기본적인 사용 방법에 대해 설명합니다.
짧은 시간 조사하고 적용하였고 실제로 기능에 필요했던 것도 아주 기본적인 부분이라, 부족하지만 메시지 큐를 처음 접하는 누군가에게는 도움이 되는 내용이었으면 합니다.
관련 기술 조사
Kafka
•
높은 처리량을 요구하는 실시간 데이터 피드 처리나 대기 시간이 짧은 플랫폼을 제공하는 것을 목표로 하며 TCP 기반 프로토콜을 사용합니다.
•
클러스터를 중심으로 Producer와 Consumer가 데이터를 Push하고 Pull하는 구조를 가집니다.
ActiveMQ(JMS)
•
MOM을 지원하는 표준 API입니다. ActiveMQ의 JMS 라이브러리를 사용한 자바 Application들끼리 통신이 가능합니다.
•
효율적이고 사용하기 쉬운 오픈소스 입니다.
•
다른 자바 Application(non ActiveMQ)의 JMS와는 통신할 수 없습니다.
RabbitMQ
•
AMQP(Advanced MEssage Queuing Protocol)를 구현한 오픈소스 메시지 브로커이다.
•
프로토콜만 맞으면 다른 AMQP를 사용한 Application끼리 통신이 가능하다.
•
플러그인을 통해서 SMTP, STOMP 프로토콜과의 확장이 가능하다.
•
Exchange가 Producer로부터 메시지를 받고 Queue에 전달한다. Queue는 Consumer에게 메시지를 전달한다.
•
NDC 2.2 버전의 연동할 NEB에서 이미 사용하고 있으며, Spring 프레임워크에서 적용하기 원활하다고 판단하여 RabbitMQ 적용을 하기로 정했습니다.
적용 방법
•
전체 구조 예시
타입 | 서비스 |
메시지 브로커 | RabbitMQ 브로커 |
Producer | NDC (ndc-portal-api) |
Consumer | NEB |
•
Broker
◦
Producer로부터 메시지를 전달 받고 Routing Key에 해당하는 Queue에 바인딩하고 Queue는 Consumer에게 전달하는 작업을 수행합니다.
◦
Consumer에서 작업을 정상적으로 처리하지 못할 경우 설정된 횟수만큼 재시도 하거나 Dead Letter Queue로 전달합니다.
◦
모든 메시지는 Queue로 직접 전달되지 않고 반드시 Exchange에서 먼저 받습니다. 그리고 Exchang 타입과 바인딩 규칙에 따라 적절한 Queue로 전달됩니다. NDC에서는 기본 Exchange 는 Topic 타입이며, Dead Letter 는 Fanout 타입을 사용합니다.
◦
서비스 별로 ndc.{serviceType} 이름으로 Queue 가 생성되고, Routing Key는 Queue의 이름과 동일합니다.
◦
Spring 적용 예시
# AMQP 및 jackson dependecy 추가
dependencies {
...
// rabbitmq
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'com.fasterxml.jackson.core:jackson-databind'
...
}
Plain Text
복사
# application.yml
spring:
rabbitmq:
host: {{ host }}
port: {{ port }}
username: {{ username }}
password: {{ password }}
template:
exchange: ndc.exchange
Plain Text
복사
# Configuration
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.template.exchange:ndc.exchange}")
private String NDC_EXCHANGE_NAME;
@Bean
TopicExchange ndcExchange() {
return ExchangeBuilder
.topicExchange(NDC_EXCHANGE_NAME)
.build();
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Plain Text
복사
•
Producer
◦
Spring에서 제공하는 RabbitTemplate를 통해 전달할 메시지를 컨버팅하여 메시지 브로커의 Exchange로 전달합니다.
◦
Spring 적용 예시
String serviceQueueName = RabbitMQNames.QUEUE_NAME_PREFIX + roleNameSplit[1];
QueueDto queueDto = QueueDto.builder()
.event(event)
.payload(payload)
.build();
rabbitTemplate.convertAndSend(serviceQueueName, queueDto);
Plain Text
복사
•
Consumer
◦
Spring에서 제공하는 @RabbitListener 어노테이션을 통해 메시지를 받을 Queue 정보를 바인딩하고, 에러 발생 시 이동 할 Dead Letter Queue 정보를 Arguments 로 추가할 수도 있습니다.
◦
Spring 적용 예시
@RabbitListener(queues = { RabbitMQNames.BATCH_TABLE_DEPARTMENT_QUEUE })
public void modifyDepartmentTable(final Message message) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = RabbitMQNames.INIT_DATA_RUNNER_QUEUE, durable = "true", autoDelete = "true",
arguments = {
@Argument(name = "x-queue-type", value = RabbitMQNames.TYPE_QUORUM),
@Argument(name = "x-dead-letter-exchange", value = RabbitMQNames.NDC_DEAD_LETTER_EXCHANGE),
@Argument(name = "x-dead-letter-routing-key", value = RabbitMQNames.INIT_DATA_RUNNER_QUEUE)
}
),
exchange = @Exchange(value = "${spring.rabbitmq.template.exchange:ndc.exchange}", type = ExchangeTypes.TOPIC),
key = RabbitMQNames.INIT_DATA_RUNNER_QUEUE)
)
public void initData(final Message message) {
...
}
Plain Text
복사
•
Retry 및 Dead Letter
◦
Consumer에서 메시지를 정상적으로 처리하지 못한 경우 아래 설정된 옵션에 따라 재시도 합니다.
◦
최대 처리 횟수를 초과한 경우 Dead Letter Queue로 이동합니다.
◦
Spring 적용 예시
# application.yml
spring:
rabbitmq:
...
listener:
simple:
retry:
enabled:true# retry 사용 여부
initial-interval: 3s# 처음 메세지 처리 실패 시 N초 후에 다시 해당 메세지를 처리
max-interval: 10s# 최대 N초 후에 실패한 메세지를 처리
max-attempts: 3# 최대 N 번까지 메세지 처리 시도. N을 넘어가면 해당 메세지는 dlx exchange 로 넘김
multiplier: 2# 동일 메세지에 대한 처리 시도 횟수가 증가 할수록 interval 시간에 N을 곱함. initial-interval * multiplier 계산되며 max-interval 값보다 높으면 max-interval 값을 사용
Plain Text
복사
# Configuration
@Configuration
@ConditionalOnProperty(value = "spring.rabbitmq.init", matchIfMissing = false, havingValue = "true")
public class RabbitMQConfig {
...
@Bean
FanoutExchange deadLetterExchange() {
return ExchangeBuilder
.fanoutExchange(RabbitMQNames.NDC_DEAD_LETTER_EXCHANGE)
.build();
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder
.durable(RabbitMQNames.DEAD_LETTER_QUEUE)
.quorum()
.build();
}
@Bean
Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue deadLetterQueue, @Qualifier("deadLetterExchange") FanoutExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange);
}
...
}
Plain Text
복사
•
User와 Permission
◦
Exchange 및 Queue의 권한을 정할 수 있습니다.
AMQP 0-9-1 Operation | configure | write | read | |
exchange.declare | (passive=false) | exchange | ||
exchange.declare | (passive=true) | |||
exchange.declare | exchange | exchange (AE) | exchange | |
exchange.delete | exchange | |||
queue.declare | (passive=false) | queue | ||
queue.declare | (passive=true) | |||
queue.declare | queue | exchange (DLX) | queue | |
queue.delete | queue | |||
exchange.bind | exchange (destination) | exchange (source) | ||
exchange.unbind | exchange (destination) | exchange (source) | ||
queue.bind | queue | exchange | ||
queue.unbind | queue | exchange | ||
basic.publish | exchange | |||
basic.get | queue | |||
basic.consume | queue | |||
queue.purge | queue |
적용 결과
RabbitMQ 관리 화면에서 Spring을 통해 설정된 Exchange, Queue, Message 등의 정보를 확인할 수 있습니다.
•
Exchange
◦
관리 화면 접속 > Exchanges > ndc.exchange 선택 시 아래와 같이 상세 정보를 보여줍니다. 해당 Exchange 에 바인딩 된 정보 목록 확인이 가능합니다.
•
Queue
◦
관리 화면 접속 > Queues 선택 시 아래와 같이 Queue 목록 및 현황에 대한 정보 확인이 가능합니다. 특정 Queue 를 선택하여 상세한 정보를 볼 수 있습니다.
•
Message
◦
Queue 상세 화면에서 Producer로부터 전달 받은 메시지 정보를 확인 할 수 있습니다.
결론
메시지 큐를 처음 접하면서 정말 많은 방법으로 많은 곳에 유용하게 활용할 수 있겠구나 생각 했습니다. 비록 지금은 아주 기본적인 부분만 사용하고 있지만, 조사 할수록 생각이 확장되어 이것저것 적용 해보고 싶은 욕심도 무럭무럭 자라 났습니다.
후속연구/차후계획
NEB 연동 외 NDC에서도 여러 기능에 활용할 수 있는 방안을 논의하고 추가 할 예정입니다.
본 기술블로그에 게재되는 모든 컨텐츠의 저작권은 케이티넥스알(kt NexR)에서 가지고 있으며, 동의 없는 컨텐츠 수정 및 무단 복제는 금하고 있습니다. 컨텐츠(글/사진/영상 등)를 공유하실 경우 반드시 출처를 밝혀주시기 바랍니다. Copyright(c) kt NexR, Inc. All Rights Reserved. |