Vert.x

[Vert.x] Vert.x의 EventBus 사용하기

Jeong Jeon
반응형

Vert.x는 기본적으로 Single Thread이지만 내부적으로는 Multi Thread로 구동된다.

Verticle 간에는 메세지만 주고 받을 수 있고, 메모리는 공유하지 않아 다른 Verticle에 접근할 수 없다고 보면 된다.

 

그래서 Verticle 1에서 Verticle 2의 기능을 쓰고싶다면, Vert.x 가 지원하는 EventBus를 사용하여 원하는 Verticle에 접근할 수 있다

 

필자는 Vert.x를 사용하여 Socket Client를 구현해야 했는데,

Deploy되는  Verticle의 숫자를 늘리지 않고, handler Verticle을 별도로 만들어 handler Verticle을 Worker Verticle로 만들었다.

Client Connection은 하난데, 내부적으로 요청이 들어왔을때 Main Verticle이 handlerVerticle(worker)에게 해야할일을 위임시킨다. 이로써 얻게 되는 이점은 Main Verticle은 원래 자기가 해야할 일을 다른사람한테 맡겨두고,

자기는 다시 요청을 받을 준비를 할 수 있다.

 

순서는 Main Verticle이 socket Message를 받았으면 -> Event Bus를 사용하여 HandlerVerticle(worker)에게 일을 넘긴다.

-> HandlerVerticle은 Main Vertilce이 요청한 일을 수행한다.

 

HandlerVerticle을 worker로 만든이유는 동시에 MainVerticle에서 위임받은 일을 할수있게 하기 위해 사용했다..

 

코드를 한번 보자.

 

1).Main Verticle

@Verticle(instance = 1, isWorker = false, poolSize = 100)
public class SocketVerticle extends AbstractVerticle {

	private NetClient client;
	private NetSocket socket;

	@Override
	public void start() throws Exception {
		doConnection();
	}

	@Override
	public void stop() throws Exception {
		System.out.println("[ daemon process " + Thread.currentThread().getName() + " is daed now : " + new Date() + "]");
		// TODO Auto-generated method stub
		super.stop();
	}

	/**
	 * Socket 연결 메서드
	 */
	public void doConnection() {
		System.out.println("[ Trying to Connection ]");
		NetClientOptions options = new NetClientOptions();
		options.setTcpNoDelay(true);
		options.setTcpKeepAlive(true);
		// 재연결 시도할 횟수
//		options.setReconnectAttempts(10);
		options.setReconnectInterval(500);

		// client 해제 후 재연결
		if (client != null) {
			client.close();
		}
		if (socket != null) {
			socket.close();
		}

		client = vertx.createNetClient(options);
		client.connect(port, host, asyncResult -> {
			// isConnected
			if (asyncResult.succeeded()) {
				socket = asyncResult.result();
				System.out.println("[ socket is connected ]");
				socket.handler(recvBuffer -> {
					System.out.println("socket -> daemon : " + new Date().getTime() + " : " + recvBuffer);
					
					try {
						// HandlerVerticle로 위임
						EventBus eb = vertx.eventBus();
						eb.request("RtoA", recvBuffer, reply -> {
                        // 위임한 로직을 수행하고 reply받은 메세지를 socket으로 다시 전송해줬다.
							socket.write(reply.result().body().toString());
						});
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				});

				socket.closeHandler(closerst -> {
					System.out.println("[ socket is closed ]");
					doConnection();
				});
			} else {
				// fail
				System.out.println("[ connectFail ]");
				// 뭔가 하면 됨 재접속 한다 던지 등
				doConnection();
			}
		});
	}

}

 

 

2). HandlerVerticle

@Verticle(instance = 1, isWorker = true, poolSize = 100)
public class SocketHandlerVerticle  extends AbstractVerticle{

	@Override
	public void start() throws Exception {
		EventBus eb = vertx.eventBus();
		eb.consumer("RtoA", message -> {
		    //로직
            ...
            
            //로직후 결과 reply 
			message.reply(reply);
		});
	}
}

 

만약 handlerVerticle에서 로직을 수행하고 reply 해줄 메세지가 없다면

eventBus의 send();를 사용하면 된다.

본인은 받은 결과를 다시 Socket으로 쏴주기 위해 request();를 사용하였다.

 

엄청 자세하게 파보고싶은데... 현실이 받쳐주질 않는다 ㅠㅠ

 

설명중 잘못된 내용이 있을수 있으니 수정가능 성 有

반응형

'Vert.x' 카테고리의 다른 글

[Vert.x] Vertx Websocket Client 만들기  (0) 2021.07.26
[Vert.x] Vert.x란? Vert.x 개념 잡기  (0) 2021.04.26