오늘은 간단하게 Kafka와 Websocket을 이용하여 멀티서버에 대응할 수 있는 구조를 만들어 보려고한다.
심플하게 구동이 잘 되는지 용도이기때문에, 보안관련 Interceptor던, Kafka를 정밀하게 사용하는 코드는 없다...!
먼저 Zookeeper와 Kafka가 켜져있어야 app을 실행시킬때 오류가 나지 않는다...
궁금한것들은 하나씩 찾아보기로 하고... 우선 샘플 코드를 실행시키는데에 집중해보자..!
우선 websocket 서버를 간단하게 구축하고...
1). webSocketConfig
websocket을 사용할때 필요한 설정들을 넣어주자..!
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private CustomWebSocketHandler customWebSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(customWebSocketHandler, "/ws/chat")
.setAllowedOriginPatterns("*")
.withSockJS();
// .addInterceptors(new WebSocketHandShakeInterceptor())
}
}
2). SocketHandler
Websocket을 설명하는 내용은아니니.. 자세한 설명은 생략한다.
@Slf4j
@Component
public class CustomWebSocketHandler extends TextWebSocketHandler {
HashMap<String, WebSocketSession> sessionMap = new HashMap<>(); //웹소켓 세션을 담아둘 맵
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
//메시지 발송
String msg = message.getPayload();
System.out.println("=====================handleTextMessage :"+msg+"=====================");
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
//소켓 연결
System.out.println("=====================Connection Socket Success=====================");
sessionMap.put(session.getId(), session);
log.info("@@@@"+sessionMap);
super.afterConnectionEstablished(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
System.out.println("=====================Disconnection Socket Success=====================");
//소켓 종료
sessionMap.remove(session.getId());
super.afterConnectionClosed(session, status);
}
private void sendMessage(WebSocketSession session, String payload){
log.info("@@@@@@SendMessage");
TextMessage message = new TextMessage(payload);
try {
session.sendMessage(message);
}catch (IOException ie){
ie.printStackTrace();
}
}
/**
* @desc 전체 세션에 메세지 보내기.
* @param payload
*/
public void sendMessageToAll(String payload) {
sessionMap.forEach((s, webSocketSession) -> this.sendMessage(webSocketSession,payload));
}
}
Kafka 설정
1). dependency 설정
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
2). application.properties 설정
#Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id= consumerGroup1
# 데이터를 어디까지 읽었는지 offset을 주기적으로 저장할지 여부
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
#Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#Consumer가 한번에 가져오는 message수
spring.kafka.consumer.max-poll-records=1000
#기본 설정 topic
spring.kafka.template.default-topic=kafkaTest
3). ProducerConfig 작성
Producer는 별도로 KafkaTemplate에 ProducerFactory를 적용해서 사용했다.
방법은 여러가지로 많은것 같은데, 차차 하나씩 알아가면되겠지!
우선 producerConfig에 Serializer를 다 설정시켜준다.
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4). Producer 생성
Kafka Template에 담아놓은 설정으로 Producer를 생성해서 사용한다고 보면된다.
kafkaTemplate.send로 kafka에 이벤트를 발행한다.
@Slf4j
@Component
public class Producer {
private final KafkaTemplate<String, String> kafkaTemplate;
public Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String payload) {
log.info("Producer TOPIC : " + topic);
log.info("Producer PAYLOAD : " + payload);
ListenableFuture<SendResult<String, String>> listenable = kafkaTemplate.send(topic, payload);
}
}
5). Consumer 생성
여러가지 설정이 있는데, 하나씩 알아보는건 다음시간에....
우선 기본으로 설정해놓은 Topic을 기준으로 Consumer를 만들었다.
Consumer가 이벤트를 감지해서 가져온후 가져온 이벤트를 소켓으로 발송하는것까지 테스트하기위해 SocketHandler를 가져왔다.
@Slf4j
@Component
public class Consumer {
private KafkaConsumer<String, String> kafkaConsumer = null;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupID;
@Value("${spring.kafka.consumer.value-deserializer}")
private String keyDeSerializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeSerializer;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Value("${spring.kafka.template.default-topic}")
private String topicName;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Autowired
private CustomWebSocketHandler customWebSocketHandler;
@PostConstruct
public void build() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeSerializer);
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
kafkaConsumer = new KafkaConsumer<>(properties);
}
/**
* @desc 로직 구현
* @param headers
* @param payload
*/
@KafkaListener(topics="${spring.kafka.template.default-topic}")
public void consume(@Headers MessageHeaders headers, @Payload String payload) {
log.info("CONSUME HEADERS : " + headers.toString());
log.info("CONSUME PAYLOAD : " + payload);
customWebSocketHandler.sendMessageToAll(payload);
}
}
이렇게 하면 테스트 준비는 끝났다.
크롬 플러그인 중에 Simple WebSocket Client가 있는데, 해당 플러그인으로 소켓 테스트를 진행하였다.
우선 app을 먼저 실행 시켜서 Wesocket 서버에 Simple Websocket client가 붙을 수 있도록 만들어준다.
App이 올라간 뒤 Client를 통해 Socket에 접속한다.
Open을 클릭하여 서버에 접속시킨후
Producer로 이벤트를 발행해보자...!
class ProducerTest {
@Autowired
Producer producer;
@Test
void test() {
producer.sendMessage("kafkaTest","오늘은 Kafka 샘플 코드를 작성해봅시다.");
}
}
Producer를 사용해서 Default Topic인 KafkaTest에 "오늘은 Kafka 샘플 코드를 작성해봅시다." 이벤트를 발행해보자!!
결과 !!!
내가 발행한 이벤트 메세지를 소켓으로 발송하여 Client에서 확인 할 수 있다.
순서는 Producer 이벤트 발행 -> Consumer 이벤트 소비 -> Websocket 메세지 발송! -> Websocket Client 메세지 확인 이다.
원하는데로 잘 되는것을 확인 할 수 있다.
간단하게 막 적은 샘플 코드에 설명인데...
시간내서 조금 더 자세하게 수정 할 예정이다...!!!!
Good Luck!
'Kafka' 카테고리의 다른 글
[Kafka] Windows에 Kafka 설치 및 간단한 예제 (0) | 2022.06.23 |
---|---|
[Kafka] Kafka란? Kafka의 기본 개념 (0) | 2022.06.21 |
[Kafka] 설치시 오류 대응 (0) | 2022.05.21 |