diff --git a/src/main/java/org/codenil/comm/Communication.java b/src/main/java/dev/xiushen/andes/comm/Communication.java similarity index 88% rename from src/main/java/org/codenil/comm/Communication.java rename to src/main/java/dev/xiushen/andes/comm/Communication.java index 8db55a3..1443494 100644 --- a/src/main/java/org/codenil/comm/Communication.java +++ b/src/main/java/dev/xiushen/andes/comm/Communication.java @@ -1,21 +1,19 @@ -package org.codenil.comm; +package dev.xiushen.andes.comm; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; - -import org.codenil.comm.callback.ConnectCallback; -import org.codenil.comm.callback.DisconnectCallback; -import org.codenil.comm.callback.MessageCallback; -import org.codenil.comm.connections.ConnectionInitializer; -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.RawMessage; +import dev.xiushen.andes.comm.callback.ConnectCallback; +import dev.xiushen.andes.comm.callback.DisconnectCallback; +import dev.xiushen.andes.comm.callback.MessageCallback; +import dev.xiushen.andes.comm.connections.ConnectionInitializer; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.message.DisconnectReason; +import dev.xiushen.andes.comm.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -57,17 +55,17 @@ public class Communication { new IllegalStateException("Unable to start an already started " + getClass().getSimpleName())); } - //注册回调监听 + //注册连接成功回调 connectionEvents.subscribeConnect(this::dispatchConnect); //启动连接初始化 return connectionInitializer .start() .thenApply((socketAddress) -> { - logger.info("P2P RLPx agent started and listening on {}.", socketAddress); + logger.info("Communication started and listening on {}.", socketAddress); return socketAddress.getPort(); }) - .whenComplete((_, err) -> { + .whenComplete((integer, err) -> { if (err != null) { logger.error("Failed to start Communication. Check for port conflicts."); } @@ -179,14 +177,14 @@ public class Communication { /** * 收到消息后调用注册的回调 */ - private void dispatchMessage(final PeerConnection connection, final RawMessage message) { + private void dispatchMessage(final PeerConnection connection, final String message) { connectionEvents.dispatchMessage(connection, message); } /** * 收到指定code消息后调用指定回调 */ - private void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) { + private void dispatchMessageByCode(final int code, final PeerConnection connection, final String message) { connectionEvents.dispatchMessageByCode(code, connection, message); } } diff --git a/src/main/java/org/codenil/comm/NetworkConfig.java b/src/main/java/dev/xiushen/andes/comm/NetworkConfig.java similarity index 92% rename from src/main/java/org/codenil/comm/NetworkConfig.java rename to src/main/java/dev/xiushen/andes/comm/NetworkConfig.java index 32f41ce..5a1d449 100644 --- a/src/main/java/org/codenil/comm/NetworkConfig.java +++ b/src/main/java/dev/xiushen/andes/comm/NetworkConfig.java @@ -1,4 +1,4 @@ -package org.codenil.comm; +package dev.xiushen.andes.comm; public class NetworkConfig { diff --git a/src/main/java/org/codenil/comm/NetworkService.java b/src/main/java/dev/xiushen/andes/comm/NetworkService.java similarity index 75% rename from src/main/java/org/codenil/comm/NetworkService.java rename to src/main/java/dev/xiushen/andes/comm/NetworkService.java index 983799e..aa56de8 100644 --- a/src/main/java/org/codenil/comm/NetworkService.java +++ b/src/main/java/dev/xiushen/andes/comm/NetworkService.java @@ -1,17 +1,22 @@ -package org.codenil.comm; +package dev.xiushen.andes.comm; -import org.codenil.comm.callback.ConnectCallback; -import org.codenil.comm.callback.DisconnectCallback; -import org.codenil.comm.callback.MessageCallback; -import org.codenil.comm.connections.*; -import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.Message; -import org.codenil.comm.netty.NettyConnectionInitializer; +import dev.xiushen.andes.comm.callback.ConnectCallback; +import dev.xiushen.andes.comm.callback.DisconnectCallback; +import dev.xiushen.andes.comm.callback.MessageCallback; +import dev.xiushen.andes.comm.connections.ConnectionInitializer; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.message.DisconnectReason; +import dev.xiushen.andes.comm.message.Message; +import dev.xiushen.andes.comm.netty.NettyConnectionInitializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Clock; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -58,7 +63,7 @@ public class NetworkService { if (stopped.compareAndSet(false, true)) { logger.info("Stopping Network."); CompletableFuture stop = communication.stop(); - return stop.whenComplete((result, throwable) -> { + return stop.whenComplete((Void, throwable) -> { shutdown.countDown(); }); } else { @@ -77,8 +82,8 @@ public class NetworkService { /** * 断开连接 */ - public void disconnect(final String pkiId, final DisconnectReason reason) { - PeerConnection connection = aliveConnections.get(pkiId); + public void disconnect(final RemotePeer remotePeer, final DisconnectReason reason) { + PeerConnection connection = aliveConnections.get(remotePeer.identifier()); if(Objects.nonNull(connection)) { try { connection.disconnect(reason); @@ -90,12 +95,12 @@ public class NetworkService { /** * 发送消息 - * @param pkiId 远程节点的PKIID + * @param remotePeer 远程节点RemotePeer * @param message 发送的信息 */ - public void send(final String pkiId, final Message message) { + public void send(final RemotePeer remotePeer, final Message message) { try { - PeerConnection connection = aliveConnections.get(pkiId); + PeerConnection connection = aliveConnections.get(remotePeer.identifier()); if (Objects.nonNull(connection)) { connection.send(message); } @@ -124,8 +129,8 @@ public class NetworkService { communication.subscribeConnect(newConnection -> { synchronized (this) { callback.onConnect(newConnection); - aliveConnections.put(newConnection.pkiId(), newConnection); - deadConnections.remove(newConnection.pkiId(), newConnection); + aliveConnections.put(newConnection.remoteIdentifier(), newConnection); + deadConnections.remove(newConnection.remoteIdentifier(), newConnection); } }); } @@ -138,8 +143,8 @@ public class NetworkService { communication.subscribeDisconnect(connection -> { synchronized (this) { callback.onDisconnect(connection); - aliveConnections.remove(connection.pkiId(), connection); - deadConnections.put(connection.pkiId(), connection); + aliveConnections.remove(connection.remoteIdentifier(), connection); + deadConnections.put(connection.remoteIdentifier(), connection); } }); } @@ -161,5 +166,4 @@ public class NetworkService { public List aliveConnections() { return new ArrayList<>(aliveConnections.values()); } - } diff --git a/src/main/java/org/codenil/comm/RemotePeer.java b/src/main/java/dev/xiushen/andes/comm/RemotePeer.java similarity index 57% rename from src/main/java/org/codenil/comm/RemotePeer.java rename to src/main/java/dev/xiushen/andes/comm/RemotePeer.java index 1b2218d..21e126d 100644 --- a/src/main/java/org/codenil/comm/RemotePeer.java +++ b/src/main/java/dev/xiushen/andes/comm/RemotePeer.java @@ -1,8 +1,8 @@ -package org.codenil.comm; +package dev.xiushen.andes.comm; public class RemotePeer { - private String pkiId; + private String identifier; private final String endpoint; @@ -11,8 +11,8 @@ public class RemotePeer { this.endpoint = endpoint; } - public String pkiId() { - return pkiId; + public String identifier() { + return identifier; } public String endpoint() { @@ -20,11 +20,11 @@ public class RemotePeer { } public void setPkiId(String pkiId) { - this.pkiId = pkiId; + this.identifier = pkiId; } public String toString() { - return String.format("%s, pkiId:%s", this.endpoint(), this.pkiId); + return String.format("%s, identifier:%s", this.endpoint(), this.identifier); } } diff --git a/src/main/java/org/codenil/comm/Test.java b/src/main/java/dev/xiushen/andes/comm/Test.java similarity index 68% rename from src/main/java/org/codenil/comm/Test.java rename to src/main/java/dev/xiushen/andes/comm/Test.java index 310922d..c4a0da9 100644 --- a/src/main/java/org/codenil/comm/Test.java +++ b/src/main/java/dev/xiushen/andes/comm/Test.java @@ -1,8 +1,7 @@ -package org.codenil.comm; +package dev.xiushen.andes.comm; -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.message.MessageCodes; -import org.codenil.comm.message.RawMessage; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.message.MessageCodes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +21,7 @@ public class Test { private static void server() throws Exception { NetworkConfig config = new NetworkConfig(); - config.setBindHost("192.168.8.30"); + config.setBindHost("192.168.0.26"); config.setBindPort(8080); NetworkService service = new NetworkService(config, ""); @@ -31,29 +30,27 @@ public class Test { start.whenComplete((res, err) -> { service.subscribeMessageByCode(MessageCodes.GOSSIP, message -> { - logger.info("接收到消息:" + message.message().code()); + logger.info("接收到消息:{}", message.message()); }); }); } private static void client() throws Exception { NetworkConfig config = new NetworkConfig(); - config.setBindHost("192.168.8.30"); - config.setBindPort(8090); + config.setBindHost("192.168.31.58"); + config.setBindPort(9091); NetworkService service = new NetworkService(config, ""); CompletableFuture start = service.start(); start.whenComplete((res, err) -> { - RemotePeer remotePeer = new RemotePeer("192.168.8.30:8080"); + RemotePeer remotePeer = new RemotePeer("192.168.31.58:9090"); CompletableFuture conn = service.connect(remotePeer); conn.whenComplete((cres, cerr) -> { if (cerr == null) { try { - RawMessage rawMessage = RawMessage.create(MessageCodes.GOSSIP); - rawMessage.setData("test".getBytes(StandardCharsets.UTF_8)); - service.send(remotePeer.pkiId(), rawMessage); + } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/main/java/org/codenil/comm/callback/ConnectCallback.java b/src/main/java/dev/xiushen/andes/comm/callback/ConnectCallback.java similarity index 56% rename from src/main/java/org/codenil/comm/callback/ConnectCallback.java rename to src/main/java/dev/xiushen/andes/comm/callback/ConnectCallback.java index 3f8ba46..531f1a9 100644 --- a/src/main/java/org/codenil/comm/callback/ConnectCallback.java +++ b/src/main/java/dev/xiushen/andes/comm/callback/ConnectCallback.java @@ -1,6 +1,6 @@ -package org.codenil.comm.callback; +package dev.xiushen.andes.comm.callback; -import org.codenil.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnection; /** * 连接回调 diff --git a/src/main/java/org/codenil/comm/callback/DisconnectCallback.java b/src/main/java/dev/xiushen/andes/comm/callback/DisconnectCallback.java similarity index 54% rename from src/main/java/org/codenil/comm/callback/DisconnectCallback.java rename to src/main/java/dev/xiushen/andes/comm/callback/DisconnectCallback.java index 661f6d6..6baaf67 100644 --- a/src/main/java/org/codenil/comm/callback/DisconnectCallback.java +++ b/src/main/java/dev/xiushen/andes/comm/callback/DisconnectCallback.java @@ -1,6 +1,6 @@ -package org.codenil.comm.callback; +package dev.xiushen.andes.comm.callback; -import org.codenil.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnection; @FunctionalInterface public interface DisconnectCallback { diff --git a/src/main/java/org/codenil/comm/callback/MessageCallback.java b/src/main/java/dev/xiushen/andes/comm/callback/MessageCallback.java similarity index 53% rename from src/main/java/org/codenil/comm/callback/MessageCallback.java rename to src/main/java/dev/xiushen/andes/comm/callback/MessageCallback.java index 432ebaf..fb5f090 100644 --- a/src/main/java/org/codenil/comm/callback/MessageCallback.java +++ b/src/main/java/dev/xiushen/andes/comm/callback/MessageCallback.java @@ -1,9 +1,8 @@ -package org.codenil.comm.callback; +package dev.xiushen.andes.comm.callback; -import org.codenil.comm.message.DefaultMessage; +import dev.xiushen.andes.comm.message.DefaultMessage; @FunctionalInterface public interface MessageCallback { - void onMessage(final DefaultMessage message); } diff --git a/src/main/java/dev/xiushen/andes/comm/callback/ResponseCallback.java b/src/main/java/dev/xiushen/andes/comm/callback/ResponseCallback.java new file mode 100644 index 0000000..2fa65e5 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/callback/ResponseCallback.java @@ -0,0 +1,9 @@ +package dev.xiushen.andes.comm.callback; + +import dev.xiushen.andes.comm.message.DefaultMessage; +import dev.xiushen.andes.comm.message.Message; + +@FunctionalInterface +public interface ResponseCallback { + Message response(DefaultMessage message); +} diff --git a/src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java b/src/main/java/dev/xiushen/andes/comm/connections/AbstractPeerConnection.java similarity index 82% rename from src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java rename to src/main/java/dev/xiushen/andes/comm/connections/AbstractPeerConnection.java index 833a93c..d9c9f44 100644 --- a/src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java +++ b/src/main/java/dev/xiushen/andes/comm/connections/AbstractPeerConnection.java @@ -1,12 +1,11 @@ -package org.codenil.comm.connections; +package dev.xiushen.andes.comm.connections; +import dev.xiushen.andes.comm.RemotePeer; +import dev.xiushen.andes.comm.message.Message; import io.netty.channel.ChannelHandler; -import org.codenil.comm.RemotePeer; -import org.codenil.comm.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; public abstract class AbstractPeerConnection implements PeerConnection { @@ -26,14 +25,6 @@ public abstract class AbstractPeerConnection implements PeerConnection { this.remoteIdentifier = remoteIdentifier; } - @Override - public String pkiId() { - if(Objects.isNull(remotePeer)) { - throw new IllegalStateException("connection not complated yet"); - } - return remotePeer.pkiId(); - } - @Override public void send(final Message message) { doSendMessage(message); diff --git a/src/main/java/org/codenil/comm/connections/ConnectionInitializer.java b/src/main/java/dev/xiushen/andes/comm/connections/ConnectionInitializer.java similarity index 77% rename from src/main/java/org/codenil/comm/connections/ConnectionInitializer.java rename to src/main/java/dev/xiushen/andes/comm/connections/ConnectionInitializer.java index 7f41995..c1535c8 100644 --- a/src/main/java/org/codenil/comm/connections/ConnectionInitializer.java +++ b/src/main/java/dev/xiushen/andes/comm/connections/ConnectionInitializer.java @@ -1,6 +1,6 @@ -package org.codenil.comm.connections; +package dev.xiushen.andes.comm.connections; -import org.codenil.comm.RemotePeer; +import dev.xiushen.andes.comm.RemotePeer; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; diff --git a/src/main/java/org/codenil/comm/connections/KeepAlive.java b/src/main/java/dev/xiushen/andes/comm/connections/KeepAlive.java similarity index 88% rename from src/main/java/org/codenil/comm/connections/KeepAlive.java rename to src/main/java/dev/xiushen/andes/comm/connections/KeepAlive.java index 9d72e39..3e068f0 100644 --- a/src/main/java/org/codenil/comm/connections/KeepAlive.java +++ b/src/main/java/dev/xiushen/andes/comm/connections/KeepAlive.java @@ -1,12 +1,11 @@ -package org.codenil.comm.connections; +package dev.xiushen.andes.comm.connections; +import dev.xiushen.andes.comm.message.DisconnectReason; +import dev.xiushen.andes.comm.message.PingMessage; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; - -import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.PingMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +45,7 @@ public class KeepAlive extends ChannelDuplexHandler { try { logger.debug("Idle connection detected, sending Wire PING to peer."); - connection.send(PingMessage.get()); + connection.send(new PingMessage()); waitingForPong.set(true); } catch (Exception e) { logger.trace("PING not sent because peer is already disconnected"); diff --git a/src/main/java/org/codenil/comm/connections/PeerConnection.java b/src/main/java/dev/xiushen/andes/comm/connections/PeerConnection.java similarity index 69% rename from src/main/java/org/codenil/comm/connections/PeerConnection.java rename to src/main/java/dev/xiushen/andes/comm/connections/PeerConnection.java index 83b65df..e6011fd 100644 --- a/src/main/java/org/codenil/comm/connections/PeerConnection.java +++ b/src/main/java/dev/xiushen/andes/comm/connections/PeerConnection.java @@ -1,14 +1,12 @@ -package org.codenil.comm.connections; +package dev.xiushen.andes.comm.connections; +import dev.xiushen.andes.comm.RemotePeer; +import dev.xiushen.andes.comm.message.DisconnectReason; +import dev.xiushen.andes.comm.message.Message; import io.netty.channel.ChannelHandler; -import org.codenil.comm.RemotePeer; -import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.Message; public interface PeerConnection { - String pkiId(); - String remoteIdentifier(); boolean disconnected(); diff --git a/src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java b/src/main/java/dev/xiushen/andes/comm/connections/PeerConnectionEvents.java similarity index 80% rename from src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java rename to src/main/java/dev/xiushen/andes/comm/connections/PeerConnectionEvents.java index dd2cf2e..44e8255 100644 --- a/src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java +++ b/src/main/java/dev/xiushen/andes/comm/connections/PeerConnectionEvents.java @@ -1,10 +1,9 @@ -package org.codenil.comm.connections; +package dev.xiushen.andes.comm.connections; -import org.codenil.comm.callback.ConnectCallback; -import org.codenil.comm.callback.DisconnectCallback; -import org.codenil.comm.callback.MessageCallback; -import org.codenil.comm.message.DefaultMessage; -import org.codenil.comm.message.RawMessage; +import dev.xiushen.andes.comm.callback.ConnectCallback; +import dev.xiushen.andes.comm.callback.DisconnectCallback; +import dev.xiushen.andes.comm.callback.MessageCallback; +import dev.xiushen.andes.comm.message.DefaultMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -36,7 +35,7 @@ public class PeerConnectionEvents { public void subscribeByCode(final int messageCode, final MessageCallback callback) { subscribersByCode - .computeIfAbsent(messageCode, _ -> Subscribers.create()) + .computeIfAbsent(messageCode, integer -> Subscribers.create()) .subscribe(callback); } @@ -50,12 +49,12 @@ public class PeerConnectionEvents { disconnectSubscribers.forEach(s -> s.onDisconnect(connection)); } - public void dispatchMessage(final PeerConnection connection, final RawMessage message) { + public void dispatchMessage(final PeerConnection connection, final String message) { final DefaultMessage msg = new DefaultMessage(connection, message); messageSubscribers.forEach(s -> s.onMessage(msg)); } - public void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) { + public void dispatchMessageByCode(final int code, final PeerConnection connection, final String message) { final DefaultMessage msg = new DefaultMessage(connection, message); subscribersByCode.get(code).forEach(s -> s.onMessage(msg)); } diff --git a/src/main/java/org/codenil/comm/connections/Subscribers.java b/src/main/java/dev/xiushen/andes/comm/connections/Subscribers.java similarity index 98% rename from src/main/java/org/codenil/comm/connections/Subscribers.java rename to src/main/java/dev/xiushen/andes/comm/connections/Subscribers.java index f212461..cfcbc3f 100644 --- a/src/main/java/org/codenil/comm/connections/Subscribers.java +++ b/src/main/java/dev/xiushen/andes/comm/connections/Subscribers.java @@ -1,4 +1,4 @@ -package org.codenil.comm.connections; +package dev.xiushen.andes.comm.connections; import com.google.common.collect.ImmutableSet; diff --git a/src/main/java/org/codenil/comm/handler/CommonHandler.java b/src/main/java/dev/xiushen/andes/comm/handler/CommonHandler.java similarity index 70% rename from src/main/java/org/codenil/comm/handler/CommonHandler.java rename to src/main/java/dev/xiushen/andes/comm/handler/CommonHandler.java index 32d4691..1ec8339 100644 --- a/src/main/java/org/codenil/comm/handler/CommonHandler.java +++ b/src/main/java/dev/xiushen/andes/comm/handler/CommonHandler.java @@ -1,19 +1,19 @@ -package org.codenil.comm.handler; +package dev.xiushen.andes.comm.handler; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.message.MessageCodes; +import dev.xiushen.andes.comm.message.PongMessage; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; - -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.message.MessageCodes; -import org.codenil.comm.message.PongMessage; -import org.codenil.comm.message.RawMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.atomic.AtomicBoolean; -public class CommonHandler extends SimpleChannelInboundHandler { +public class CommonHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(CommonHandler.class); @@ -31,13 +31,15 @@ public class CommonHandler extends SimpleChannelInboundHandler { } @Override - protected void channelRead0(final ChannelHandlerContext ctx, final RawMessage originalMessage) { - logger.debug("Received a message from {}", originalMessage.code()); - switch (originalMessage.code()) { + protected void channelRead0(final ChannelHandlerContext ctx, final String message) { + JsonObject jsonObject = new Gson().fromJson(message, JsonObject.class); + int code = jsonObject.getAsJsonPrimitive("code").getAsInt(); + logger.debug("Received a message from {}", code); + switch (code) { case MessageCodes.PING: logger.trace("Received Wire PING"); try { - connection.send(PongMessage.get()); + connection.send(new PongMessage()); } catch (Exception e) { // Nothing to do } @@ -55,7 +57,7 @@ public class CommonHandler extends SimpleChannelInboundHandler { connection.terminateConnection(); } - connectionEvents.dispatchMessage(connection, originalMessage); + connectionEvents.dispatchMessage(connection, message); } @Override diff --git a/src/main/java/org/codenil/comm/handler/MessageFrameDecoder.java b/src/main/java/dev/xiushen/andes/comm/handler/MessageFrameDecoder.java similarity index 58% rename from src/main/java/org/codenil/comm/handler/MessageFrameDecoder.java rename to src/main/java/dev/xiushen/andes/comm/handler/MessageFrameDecoder.java index 42ab7cc..aa8b269 100644 --- a/src/main/java/org/codenil/comm/handler/MessageFrameDecoder.java +++ b/src/main/java/dev/xiushen/andes/comm/handler/MessageFrameDecoder.java @@ -1,32 +1,31 @@ -package org.codenil.comm.handler; +package dev.xiushen.andes.comm.handler; -import io.netty.buffer.ByteBuf; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import dev.xiushen.andes.comm.connections.KeepAlive; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.message.DisconnectMessage; +import dev.xiushen.andes.comm.message.DisconnectReason; +import dev.xiushen.andes.comm.message.HelloMessage; +import dev.xiushen.andes.comm.message.MessageCodes; +import dev.xiushen.andes.comm.netty.NettyPeerConnection; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.timeout.IdleStateHandler; - -import org.codenil.comm.connections.KeepAlive; -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.message.DisconnectMessage; -import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.MessageCodes; -import org.codenil.comm.message.RawMessage; -import org.codenil.comm.netty.NettyPeerConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -public class MessageFrameDecoder extends ByteToMessageDecoder { +public class MessageFrameDecoder extends MessageToMessageDecoder { private static final Logger logger = LoggerFactory.getLogger(MessageFrameDecoder.class); + private static final int LENGTH_FIELD_LENGTH = 4; // 长度字段占4字节 private final CompletableFuture connectionFuture; private final PeerConnectionEvents connectionEvents; @@ -41,46 +40,17 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List out) throws Exception { - if (byteBuf.readableBytes() < 4) { - return; // 不足4字节长度字段,等待更多数据 - } - byteBuf.readerIndex(0); - - // 读取协议头:消息总长度 - int totalLength = byteBuf.readInt(); - - if (byteBuf.readableBytes() < totalLength - 4) { - return; // 不足消息总长度,等待更多数据 - } - - // 读取payload - - // 读取id - int idLength = byteBuf.readInt(); - byte[] idBytes = new byte[idLength]; - byteBuf.readBytes(idBytes); - String id = new String(idBytes, StandardCharsets.UTF_8); - - // 读取code - int code = byteBuf.readInt(); - - // 读取data - int dataLength = byteBuf.readInt();; - byte[] data = new byte[dataLength]; - byteBuf.readBytes(data); - + protected void decode(ChannelHandlerContext ctx, String message, List out) { // 创建消息对象 - RawMessage message = RawMessage.create(code); - message.setRequestId(id); - message.setData(data); - + JsonObject jsonObject = new Gson().fromJson(message, JsonObject.class); + int code = jsonObject.getAsJsonPrimitive("code").getAsInt(); if (hellosExchanged) { out.add(message); - } else if (message.code() == MessageCodes.HELLO) { + } else if (code == MessageCodes.HELLO) { hellosExchanged = true; - String remoteIdentifier = new String(message.data()); + HelloMessage helloMessage = new Gson().fromJson(message, HelloMessage.class); + String remoteIdentifier = helloMessage.getData(); final PeerConnection connection = new NettyPeerConnection(ctx, remoteIdentifier, connectionEvents); /* @@ -98,22 +68,20 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { .addLast("Common", new CommonHandler(connection, connectionEvents, waitingForPong)) .addLast("FrameEncoder", new MessageFrameEncoder()); connectionFuture.complete(connection); - } else if (message.code() == MessageCodes.DISCONNECT) { + } else if (code == MessageCodes.DISCONNECT) { logger.debug("Disconnected before sending HELLO."); ctx.close(); connectionFuture.completeExceptionally(new RuntimeException("Disconnect")); } else { - logger.debug( - "Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}", - message.code(), Arrays.toString(message.data())); + if(code != MessageCodes.PONG) { + logger.debug( + "Message received before HELLO's exchanged, disconnecting. Code: {}", + code); - DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.UNKNOWN); - - RawMessage rawMessage = RawMessage.create(disconnectMessage.code()); - rawMessage.setData(disconnectMessage.data()); - ctx.writeAndFlush(rawMessage) - .addListener(_ -> ctx.close()); - connectionFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged")); + DisconnectMessage disconnectMessage = new DisconnectMessage(DisconnectReason.UNKNOWN.message()); + ctx.writeAndFlush(disconnectMessage).addListener(Void -> ctx.close()); + connectionFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged")); + } } } diff --git a/src/main/java/dev/xiushen/andes/comm/handler/MessageFrameEncoder.java b/src/main/java/dev/xiushen/andes/comm/handler/MessageFrameEncoder.java new file mode 100644 index 0000000..936b930 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/handler/MessageFrameEncoder.java @@ -0,0 +1,18 @@ +package dev.xiushen.andes.comm.handler; + +import com.google.gson.Gson; +import dev.xiushen.andes.comm.message.Message; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; + +import java.util.List; + +public class MessageFrameEncoder extends MessageToMessageEncoder { + + public MessageFrameEncoder() {} + + @Override + protected void encode(ChannelHandlerContext ctx, Message message, List list) { + list.add(new Gson().toJson(message)); + } +} \ No newline at end of file diff --git a/src/main/java/org/codenil/comm/handler/TimeoutHandler.java b/src/main/java/dev/xiushen/andes/comm/handler/TimeoutHandler.java similarity index 91% rename from src/main/java/org/codenil/comm/handler/TimeoutHandler.java rename to src/main/java/dev/xiushen/andes/comm/handler/TimeoutHandler.java index a081d5f..8df7820 100644 --- a/src/main/java/org/codenil/comm/handler/TimeoutHandler.java +++ b/src/main/java/dev/xiushen/andes/comm/handler/TimeoutHandler.java @@ -1,4 +1,4 @@ -package org.codenil.comm.handler; +package dev.xiushen.andes.comm.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -21,7 +21,7 @@ public class TimeoutHandler extends ChannelInitializer { } @Override - protected void initChannel(final C ch) throws Exception { + protected void initChannel(final C ch) { ch.eventLoop().schedule(() -> { if (!condition.get()) { callback.invoke(); diff --git a/src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java b/src/main/java/dev/xiushen/andes/comm/handshake/AbstractHandshakeHandler.java similarity index 65% rename from src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java rename to src/main/java/dev/xiushen/andes/comm/handshake/AbstractHandshakeHandler.java index f57d4a8..cde70cd 100644 --- a/src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java +++ b/src/main/java/dev/xiushen/andes/comm/handshake/AbstractHandshakeHandler.java @@ -1,25 +1,23 @@ -package org.codenil.comm.handshake; +package dev.xiushen.andes.comm.handshake; -import io.netty.buffer.ByteBuf; +import com.google.gson.Gson; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.handler.MessageFrameDecoder; +import dev.xiushen.andes.comm.message.HelloMessage; +import dev.xiushen.andes.comm.message.Message; +import dev.xiushen.andes.comm.message.MessageCodes; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.MessageToByteEncoder; - -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.handler.MessageFrameDecoder; -import org.codenil.comm.message.HelloMessage; -import org.codenil.comm.message.MessageCodes; -import org.codenil.comm.message.RawMessage; -import org.codenil.comm.serialize.SerializeHelper; +import io.netty.handler.codec.MessageToMessageEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler { +public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class); @@ -43,8 +41,8 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl } @Override - protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) { - final Optional nextMsg = nextHandshakeMessage(msg); + protected void channelRead0(final ChannelHandlerContext ctx, final String msg) { + final Optional nextMsg = nextHandshakeMessage(msg); if (nextMsg.isPresent()) { ctx.writeAndFlush(nextMsg.get()); @@ -67,18 +65,13 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl * 替换完编解码器后发送Hello消息 * hello消息需要带一些数据 */ - HelloMessage helloMessage = HelloMessage.create(selfIdentifier.getBytes(StandardCharsets.UTF_8)); - RawMessage rawMessage = RawMessage.create(helloMessage.code()); - rawMessage.setData(helloMessage.data()); - rawMessage.setRequestId(helloMessage.requestId()); - ctx.writeAndFlush(rawMessage) + HelloMessage helloMessage = new HelloMessage(selfIdentifier); + ctx.writeAndFlush(helloMessage) .addListener(ff -> { if (ff.isSuccess()) { logger.trace("Successfully wrote hello message"); } }); - - msg.retain(); ctx.fireChannelRead(msg); } } @@ -90,31 +83,22 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl ctx.close(); } - protected abstract Optional nextHandshakeMessage(ByteBuf msg); + protected abstract Optional nextHandshakeMessage(String msg); /** Ensures that wire hello message is the first message written. */ - private static class FirstMessageFrameEncoder extends MessageToByteEncoder { + private static class FirstMessageFrameEncoder extends MessageToMessageEncoder { private FirstMessageFrameEncoder() {} @Override protected void encode( final ChannelHandlerContext context, - final RawMessage msg, - final ByteBuf out) { - if (msg.code() != MessageCodes.HELLO) { + final Message msg, + final List list) { + if (msg.getCode() != MessageCodes.HELLO) { throw new IllegalStateException("First wire message sent wasn't a HELLO."); } - byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8); - - SerializeHelper builder = new SerializeHelper(); - ByteBuf buf = builder.writeBytes(idBytes) - .writeInt(msg.code()) - .writeBytes(msg.data()) - .build(); - - out.writeBytes(buf); - buf.release(); + list.add(new Gson().toJson(msg)); context.pipeline().remove(this); } } diff --git a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java b/src/main/java/dev/xiushen/andes/comm/handshake/HandshakeHandlerInbound.java similarity index 73% rename from src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java rename to src/main/java/dev/xiushen/andes/comm/handshake/HandshakeHandlerInbound.java index 9912cb4..fa5aa80 100644 --- a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java +++ b/src/main/java/dev/xiushen/andes/comm/handshake/HandshakeHandlerInbound.java @@ -1,8 +1,7 @@ -package org.codenil.comm.handshake; +package dev.xiushen.andes.comm.handshake; -import io.netty.buffer.ByteBuf; -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -22,8 +21,8 @@ public class HandshakeHandlerInbound extends AbstractHandshakeHandler { } @Override - protected Optional nextHandshakeMessage(ByteBuf msg) { - final Optional nextMsg; + protected Optional nextHandshakeMessage(String msg) { + final Optional nextMsg; if (handshaker.getStatus() == HandshakeStatus.IN_PROGRESS) { nextMsg = handshaker.handleMessage(msg); } else { diff --git a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java b/src/main/java/dev/xiushen/andes/comm/handshake/HandshakeHandlerOutbound.java similarity index 79% rename from src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java rename to src/main/java/dev/xiushen/andes/comm/handshake/HandshakeHandlerOutbound.java index 8df6d78..7252f95 100644 --- a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java +++ b/src/main/java/dev/xiushen/andes/comm/handshake/HandshakeHandlerOutbound.java @@ -1,10 +1,8 @@ -package org.codenil.comm.handshake; +package dev.xiushen.andes.comm.handshake; -import io.netty.buffer.ByteBuf; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; import io.netty.channel.ChannelHandlerContext; - -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,9 +14,9 @@ import java.util.concurrent.CompletableFuture; */ public class HandshakeHandlerOutbound extends AbstractHandshakeHandler { - private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class); + private static final Logger logger = LoggerFactory.getLogger(HandshakeHandlerOutbound.class); - private final ByteBuf first; + private final String first; public HandshakeHandlerOutbound( final String selfIdentifier, @@ -32,8 +30,8 @@ public class HandshakeHandlerOutbound extends AbstractHandshakeHandler { } @Override - protected Optional nextHandshakeMessage(ByteBuf msg) { - final Optional nextMsg; + protected Optional nextHandshakeMessage(String msg) { + final Optional nextMsg; if (handshaker.getStatus() == HandshakeStatus.IN_PROGRESS) { nextMsg = handshaker.handleMessage(msg); } else { diff --git a/src/main/java/org/codenil/comm/handshake/HandshakeStatus.java b/src/main/java/dev/xiushen/andes/comm/handshake/HandshakeStatus.java similarity index 72% rename from src/main/java/org/codenil/comm/handshake/HandshakeStatus.java rename to src/main/java/dev/xiushen/andes/comm/handshake/HandshakeStatus.java index b0da579..f92ac6c 100644 --- a/src/main/java/org/codenil/comm/handshake/HandshakeStatus.java +++ b/src/main/java/dev/xiushen/andes/comm/handshake/HandshakeStatus.java @@ -1,4 +1,4 @@ -package org.codenil.comm.handshake; +package dev.xiushen.andes.comm.handshake; public enum HandshakeStatus { UNINITIALIZED, diff --git a/src/main/java/org/codenil/comm/handshake/Handshaker.java b/src/main/java/dev/xiushen/andes/comm/handshake/Handshaker.java similarity index 51% rename from src/main/java/org/codenil/comm/handshake/Handshaker.java rename to src/main/java/dev/xiushen/andes/comm/handshake/Handshaker.java index 00be08a..362ca81 100644 --- a/src/main/java/org/codenil/comm/handshake/Handshaker.java +++ b/src/main/java/dev/xiushen/andes/comm/handshake/Handshaker.java @@ -1,6 +1,4 @@ -package org.codenil.comm.handshake; - -import io.netty.buffer.ByteBuf; +package dev.xiushen.andes.comm.handshake; import java.util.Optional; @@ -12,7 +10,7 @@ public interface Handshaker { HandshakeStatus getStatus(); - ByteBuf firstMessage(); + String firstMessage(); - Optional handleMessage(ByteBuf buf); + Optional handleMessage(String buf); } diff --git a/src/main/java/org/codenil/comm/handshake/PlainHandshaker.java b/src/main/java/dev/xiushen/andes/comm/handshake/PlainHandshaker.java similarity index 67% rename from src/main/java/org/codenil/comm/handshake/PlainHandshaker.java rename to src/main/java/dev/xiushen/andes/comm/handshake/PlainHandshaker.java index 88dffa5..f562c7e 100644 --- a/src/main/java/org/codenil/comm/handshake/PlainHandshaker.java +++ b/src/main/java/dev/xiushen/andes/comm/handshake/PlainHandshaker.java @@ -1,10 +1,10 @@ -package org.codenil.comm.handshake; +package dev.xiushen.andes.comm.handshake; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -import org.codenil.comm.handler.MessageHandler; -import org.codenil.comm.message.MessageType; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import dev.xiushen.andes.comm.message.MessageCodes; +import dev.xiushen.andes.comm.message.PingMessage; +import dev.xiushen.andes.comm.message.PongMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,14 +15,14 @@ import static com.google.common.base.Preconditions.checkState; public class PlainHandshaker implements Handshaker { - private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class); + private static final Logger logger = LoggerFactory.getLogger(PlainHandshaker.class); private final AtomicReference status = new AtomicReference<>(HandshakeStatus.UNINITIALIZED); private boolean initiator; - private byte[] initiatorMsg; - private byte[] responderMsg; + private String initiatorMsg; + private String responderMsg; @Override public void prepareInitiator() { @@ -46,42 +46,41 @@ public class PlainHandshaker implements Handshaker { } @Override - public ByteBuf firstMessage() { + public String firstMessage() { checkState(initiator, "illegal invocation of firstMessage on non-initiator end of handshake"); checkState(status.compareAndSet(HandshakeStatus.PREPARED, HandshakeStatus.IN_PROGRESS), "illegal invocation of firstMessage, handshake had already started"); - initiatorMsg = MessageHandler.buildMessage(MessageType.PING, MessageType.PING.getValue(), new byte[0]); logger.trace("First plain handshake message under INITIATOR role"); - return Unpooled.wrappedBuffer(initiatorMsg); + return new Gson().toJson(new PingMessage()); } @Override - public Optional handleMessage(ByteBuf buf) { + public Optional handleMessage(String message) { checkState(status.get() == HandshakeStatus.IN_PROGRESS, "illegal invocation of onMessage on handshake that is not in progress"); - PlainMessage message = MessageHandler.parseMessage(buf); - Optional nextMsg = Optional.empty(); + JsonObject jsonObject = new Gson().fromJson(message, JsonObject.class); + int code = jsonObject.getAsJsonPrimitive("code").getAsInt(); + Optional nextMsg = Optional.empty(); if (initiator) { checkState(responderMsg == null, "unexpected message: responder message had " + "already been received"); - checkState(message.messageType().equals(MessageType.PONG), + checkState(code == MessageCodes.PONG, "unexpected message: needs to be a pong"); - responderMsg = message.data(); - + responderMsg = message; } else { checkState(initiatorMsg == null, "unexpected message: initiator message " + "had already been received"); - checkState(message.messageType().equals(MessageType.PING), + checkState(code == MessageCodes.PING, "unexpected message: needs to be a ping"); - initiatorMsg = message.data(); - responderMsg = MessageHandler.buildMessage(MessageType.PONG, MessageType.PONG.getValue(), new byte[0]); + initiatorMsg = message; + responderMsg = new Gson().toJson(new PongMessage()); nextMsg = Optional.of(responderMsg); } status.set(HandshakeStatus.SUCCESS); logger.trace("Handshake status set to {}", status.get()); - return nextMsg.map(Unpooled::wrappedBuffer); + return nextMsg; } } diff --git a/src/main/java/dev/xiushen/andes/comm/message/AbstractMessage.java b/src/main/java/dev/xiushen/andes/comm/message/AbstractMessage.java new file mode 100644 index 0000000..82ab964 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/AbstractMessage.java @@ -0,0 +1,14 @@ +package dev.xiushen.andes.comm.message; + +public abstract class AbstractMessage implements Message { + + private final int code; + + public AbstractMessage(final int code) { + this.code = code; + } + + public int getCode() { + return code; + } +} diff --git a/src/main/java/dev/xiushen/andes/comm/message/DataMessage.java b/src/main/java/dev/xiushen/andes/comm/message/DataMessage.java new file mode 100644 index 0000000..12641a9 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/DataMessage.java @@ -0,0 +1,15 @@ +package dev.xiushen.andes.comm.message; + +public abstract class DataMessage extends AbstractMessage { + + private final String data; + + public DataMessage(final int code, final String data) { + super(code); + this.data = data; + } + + public String getData() { + return data; + } +} diff --git a/src/main/java/org/codenil/comm/message/DefaultMessage.java b/src/main/java/dev/xiushen/andes/comm/message/DefaultMessage.java similarity index 62% rename from src/main/java/org/codenil/comm/message/DefaultMessage.java rename to src/main/java/dev/xiushen/andes/comm/message/DefaultMessage.java index aecb4ed..8378828 100644 --- a/src/main/java/org/codenil/comm/message/DefaultMessage.java +++ b/src/main/java/dev/xiushen/andes/comm/message/DefaultMessage.java @@ -1,21 +1,21 @@ -package org.codenil.comm.message; +package dev.xiushen.andes.comm.message; -import org.codenil.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnection; public class DefaultMessage { - private final RawMessage message; + private final String message; private final PeerConnection connection; public DefaultMessage( final PeerConnection connection, - final RawMessage message) { + final String message) { this.message = message; this.connection = connection; } - public RawMessage message() { + public String message() { return message; } diff --git a/src/main/java/org/codenil/comm/message/DisconnectCallback.java b/src/main/java/dev/xiushen/andes/comm/message/DisconnectCallback.java similarity index 54% rename from src/main/java/org/codenil/comm/message/DisconnectCallback.java rename to src/main/java/dev/xiushen/andes/comm/message/DisconnectCallback.java index b6597ef..5d7e4f8 100644 --- a/src/main/java/org/codenil/comm/message/DisconnectCallback.java +++ b/src/main/java/dev/xiushen/andes/comm/message/DisconnectCallback.java @@ -1,6 +1,6 @@ -package org.codenil.comm.message; +package dev.xiushen.andes.comm.message; -import org.codenil.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnection; @FunctionalInterface public interface DisconnectCallback { diff --git a/src/main/java/dev/xiushen/andes/comm/message/DisconnectMessage.java b/src/main/java/dev/xiushen/andes/comm/message/DisconnectMessage.java new file mode 100644 index 0000000..e57afb8 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/DisconnectMessage.java @@ -0,0 +1,8 @@ +package dev.xiushen.andes.comm.message; + +public class DisconnectMessage extends DataMessage { + + public DisconnectMessage(final String data) { + super(MessageCodes.DISCONNECT, data); + } +} diff --git a/src/main/java/org/codenil/comm/message/DisconnectReason.java b/src/main/java/dev/xiushen/andes/comm/message/DisconnectReason.java similarity index 93% rename from src/main/java/org/codenil/comm/message/DisconnectReason.java rename to src/main/java/dev/xiushen/andes/comm/message/DisconnectReason.java index d74aa4d..ee12d64 100644 --- a/src/main/java/org/codenil/comm/message/DisconnectReason.java +++ b/src/main/java/dev/xiushen/andes/comm/message/DisconnectReason.java @@ -1,4 +1,4 @@ -package org.codenil.comm.message; +package dev.xiushen.andes.comm.message; public enum DisconnectReason { diff --git a/src/main/java/dev/xiushen/andes/comm/message/EmptyMessage.java b/src/main/java/dev/xiushen/andes/comm/message/EmptyMessage.java new file mode 100644 index 0000000..a90a0c5 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/EmptyMessage.java @@ -0,0 +1,8 @@ +package dev.xiushen.andes.comm.message; + +public abstract class EmptyMessage extends AbstractMessage { + + public EmptyMessage(int code) { + super(code); + } +} diff --git a/src/main/java/dev/xiushen/andes/comm/message/HelloMessage.java b/src/main/java/dev/xiushen/andes/comm/message/HelloMessage.java new file mode 100644 index 0000000..7acef0d --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/HelloMessage.java @@ -0,0 +1,8 @@ +package dev.xiushen.andes.comm.message; + +public class HelloMessage extends DataMessage { + + public HelloMessage(final String data) { + super(MessageCodes.HELLO, data); + } +} diff --git a/src/main/java/dev/xiushen/andes/comm/message/Message.java b/src/main/java/dev/xiushen/andes/comm/message/Message.java new file mode 100644 index 0000000..20015c9 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/Message.java @@ -0,0 +1,6 @@ +package dev.xiushen.andes.comm.message; + +public interface Message { + + int getCode(); +} diff --git a/src/main/java/org/codenil/comm/message/MessageCallback.java b/src/main/java/dev/xiushen/andes/comm/message/MessageCallback.java similarity index 73% rename from src/main/java/org/codenil/comm/message/MessageCallback.java rename to src/main/java/dev/xiushen/andes/comm/message/MessageCallback.java index cd2d368..8e34100 100644 --- a/src/main/java/org/codenil/comm/message/MessageCallback.java +++ b/src/main/java/dev/xiushen/andes/comm/message/MessageCallback.java @@ -1,4 +1,4 @@ -package org.codenil.comm.message; +package dev.xiushen.andes.comm.message; @FunctionalInterface public interface MessageCallback { diff --git a/src/main/java/org/codenil/comm/message/MessageCodes.java b/src/main/java/dev/xiushen/andes/comm/message/MessageCodes.java similarity index 93% rename from src/main/java/org/codenil/comm/message/MessageCodes.java rename to src/main/java/dev/xiushen/andes/comm/message/MessageCodes.java index ce7919c..84d3813 100644 --- a/src/main/java/org/codenil/comm/message/MessageCodes.java +++ b/src/main/java/dev/xiushen/andes/comm/message/MessageCodes.java @@ -1,4 +1,4 @@ -package org.codenil.comm.message; +package dev.xiushen.andes.comm.message; public class MessageCodes { public static final int HELLO = 0x00; diff --git a/src/main/java/dev/xiushen/andes/comm/message/PingMessage.java b/src/main/java/dev/xiushen/andes/comm/message/PingMessage.java new file mode 100644 index 0000000..2e31872 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/PingMessage.java @@ -0,0 +1,13 @@ +package dev.xiushen.andes.comm.message; + +public class PingMessage extends EmptyMessage { + + public PingMessage() { + super(MessageCodes.PING); + } + + @Override + public String toString() { + return "PingMessage{data=''}"; + } +} diff --git a/src/main/java/dev/xiushen/andes/comm/message/PongMessage.java b/src/main/java/dev/xiushen/andes/comm/message/PongMessage.java new file mode 100644 index 0000000..b727616 --- /dev/null +++ b/src/main/java/dev/xiushen/andes/comm/message/PongMessage.java @@ -0,0 +1,8 @@ +package dev.xiushen.andes.comm.message; + +public class PongMessage extends EmptyMessage { + + public PongMessage() { + super(MessageCodes.PONG); + } +} diff --git a/src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java b/src/main/java/dev/xiushen/andes/comm/netty/NettyConnectionInitializer.java similarity index 73% rename from src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java rename to src/main/java/dev/xiushen/andes/comm/netty/NettyConnectionInitializer.java index ecead03..66758dd 100644 --- a/src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java +++ b/src/main/java/dev/xiushen/andes/comm/netty/NettyConnectionInitializer.java @@ -1,5 +1,16 @@ -package org.codenil.comm.netty; +package dev.xiushen.andes.comm.netty; +import dev.xiushen.andes.comm.NetworkConfig; +import dev.xiushen.andes.comm.RemotePeer; +import dev.xiushen.andes.comm.callback.ConnectCallback; +import dev.xiushen.andes.comm.connections.ConnectionInitializer; +import dev.xiushen.andes.comm.connections.PeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.connections.Subscribers; +import dev.xiushen.andes.comm.handler.TimeoutHandler; +import dev.xiushen.andes.comm.handshake.HandshakeHandlerInbound; +import dev.xiushen.andes.comm.handshake.HandshakeHandlerOutbound; +import dev.xiushen.andes.comm.handshake.PlainHandshaker; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; @@ -7,22 +18,15 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import org.codenil.comm.NetworkConfig; -import org.codenil.comm.RemotePeer; -import org.codenil.comm.callback.ConnectCallback; -import org.codenil.comm.connections.ConnectionInitializer; -import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.connections.Subscribers; -import org.codenil.comm.handler.TimeoutHandler; -import org.codenil.comm.handshake.HandshakeHandlerInbound; -import org.codenil.comm.handshake.HandshakeHandlerOutbound; -import org.codenil.comm.handshake.PlainHandshaker; - +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; import javax.annotation.Nonnull; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; @@ -34,6 +38,11 @@ import java.util.concurrent.atomic.AtomicBoolean; public class NettyConnectionInitializer implements ConnectionInitializer { private static final int TIMEOUT_SECONDS = 10; + private static final int MAX_FRAME_LENGTH = 1024 * 1024; // 最大消息长度 + private static final int LENGTH_FIELD_OFFSET = 0; // 长度字段起始位置 + private static final int LENGTH_FIELD_LENGTH = 4; // 长度字段占4字节 + private static final int LENGTH_ADJUSTMENT = 0; // 长度字段后数据的偏移量 + private static final int INITIAL_BYTES_TO_STRIP = 4; // 跳过长度字段 private final Subscribers connectSubscribers = Subscribers.create(); private final PeerConnectionEvents eventDispatcher; @@ -76,9 +85,8 @@ public class NettyConnectionInitializer implements ConnectionInitializer { .channel(NioServerSocketChannel.class) .childHandler(inboundChannelInitializer()) .bind(config.bindHost(), config.bindPort()); - server.addListener(future -> { - final InetSocketAddress socketAddress = - (InetSocketAddress) server.channel().localAddress(); + this.server.addListener(future -> { + final InetSocketAddress socketAddress = (InetSocketAddress) server.channel().localAddress(); if (!future.isSuccess() || socketAddress == null) { final String message = String.format("Unable to start listening on %s:%s. Check for port conflicts.", @@ -154,9 +162,17 @@ public class NettyConnectionInitializer implements ConnectionInitializer { protected void initChannel(final SocketChannel ch) throws Exception { final CompletableFuture connectionFuture = new CompletableFuture<>(); connectionFuture.thenAccept(connection -> connectSubscribers.forEach(c -> c.onConnect(connection))); - //连接处理器 + //连接超时处理器 ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to fully establish incoming connection")); - //其他处理器,TLS之类的 + + ch.pipeline().addLast(new LengthFieldPrepender(4)); // 编码器:添加4字节长度头 + ch.pipeline().addLast(new LengthFieldBasedFrameDecoder( + MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, + LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); + // JSON 编解码(基于String) + ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // 解码ByteBuf -> String + ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8)); // 编码String -> ByteBuf + addAdditionalInboundHandlers(ch); //握手消息处理器 ch.pipeline().addLast(inboundHandler(selfIdentifier, connectionFuture)); @@ -172,8 +188,18 @@ public class NettyConnectionInitializer implements ConnectionInitializer { protected void initChannel(final SocketChannel ch) throws Exception { //连接处理器 ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.toString())); + + ch.pipeline().addLast(new LengthFieldPrepender(4)); // 编码器:添加4字节长度头 + ch.pipeline().addLast(new LengthFieldBasedFrameDecoder( + MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, + LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); + // JSON 编解码(基于String) + ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // 解码ByteBuf -> String + ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8)); // 编码String -> ByteBuf + //其他处理器 addAdditionalOutboundHandlers(ch, remotePeer); + //握手消息处理器 ch.pipeline().addLast(outboundHandler(selfIdentifier, remotePeer, connectionFuture)); } diff --git a/src/main/java/org/codenil/comm/netty/NettyPeerConnection.java b/src/main/java/dev/xiushen/andes/comm/netty/NettyPeerConnection.java similarity index 77% rename from src/main/java/org/codenil/comm/netty/NettyPeerConnection.java rename to src/main/java/dev/xiushen/andes/comm/netty/NettyPeerConnection.java index 9ddc768..eddf503 100644 --- a/src/main/java/org/codenil/comm/netty/NettyPeerConnection.java +++ b/src/main/java/dev/xiushen/andes/comm/netty/NettyPeerConnection.java @@ -1,15 +1,13 @@ -package org.codenil.comm.netty; +package dev.xiushen.andes.comm.netty; +import dev.xiushen.andes.comm.connections.AbstractPeerConnection; +import dev.xiushen.andes.comm.connections.PeerConnectionEvents; +import dev.xiushen.andes.comm.message.DisconnectMessage; +import dev.xiushen.andes.comm.message.DisconnectReason; +import dev.xiushen.andes.comm.message.Message; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; - -import org.codenil.comm.connections.AbstractPeerConnection; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.message.DisconnectMessage; -import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.Message; -import org.codenil.comm.message.RawMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +17,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; public class NettyPeerConnection extends AbstractPeerConnection { - private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class); + private static final Logger logger = LoggerFactory.getLogger(NettyPeerConnection.class); private final ChannelHandlerContext ctx; @@ -53,16 +51,14 @@ public class NettyPeerConnection extends AbstractPeerConnection { public void disconnect(DisconnectReason reason) { if (disconnected.compareAndSet(false, true)) { connectionEvents.dispatchDisconnect(this); - doSendMessage(DisconnectMessage.create(reason)); + doSendMessage(new DisconnectMessage(reason.message())); closeConnection(); } } @Override protected void doSendMessage(final Message message) { - RawMessage rawMessage = RawMessage.create(message.code()); - rawMessage.setData(message.data()); - ctx.channel().writeAndFlush(rawMessage); + ctx.channel().writeAndFlush(message); } @Override diff --git a/src/main/java/org/codenil/comm/serialize/SerializeHelper.java b/src/main/java/dev/xiushen/andes/comm/serialize/SerializeHelper.java similarity index 99% rename from src/main/java/org/codenil/comm/serialize/SerializeHelper.java rename to src/main/java/dev/xiushen/andes/comm/serialize/SerializeHelper.java index 5eaba23..bc29633 100644 --- a/src/main/java/org/codenil/comm/serialize/SerializeHelper.java +++ b/src/main/java/dev/xiushen/andes/comm/serialize/SerializeHelper.java @@ -1,4 +1,4 @@ -package org.codenil.comm.serialize; +package dev.xiushen.andes.comm.serialize; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; diff --git a/src/main/java/org/codenil/comm/serialize/Version.java b/src/main/java/dev/xiushen/andes/comm/serialize/Version.java similarity index 86% rename from src/main/java/org/codenil/comm/serialize/Version.java rename to src/main/java/dev/xiushen/andes/comm/serialize/Version.java index 4f56d0f..9ff1cd0 100644 --- a/src/main/java/org/codenil/comm/serialize/Version.java +++ b/src/main/java/dev/xiushen/andes/comm/serialize/Version.java @@ -1,4 +1,4 @@ -package org.codenil.comm.serialize; +package dev.xiushen.andes.comm.serialize; public class Version { diff --git a/src/main/java/org/codenil/comm/callback/ResponseCallback.java b/src/main/java/org/codenil/comm/callback/ResponseCallback.java deleted file mode 100644 index b9b27ef..0000000 --- a/src/main/java/org/codenil/comm/callback/ResponseCallback.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.codenil.comm.callback; - -import org.codenil.comm.message.DefaultMessage; -import org.codenil.comm.message.Message; - -@FunctionalInterface -public interface ResponseCallback { - Message response(DefaultMessage message); -} diff --git a/src/main/java/org/codenil/comm/handler/MessageFrameEncoder.java b/src/main/java/org/codenil/comm/handler/MessageFrameEncoder.java deleted file mode 100644 index 2a20fb1..0000000 --- a/src/main/java/org/codenil/comm/handler/MessageFrameEncoder.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.codenil.comm.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.codenil.comm.message.RawMessage; -import org.codenil.comm.serialize.SerializeHelper; - -import java.nio.charset.StandardCharsets; -import java.util.Optional; - -public class MessageFrameEncoder extends MessageToByteEncoder { - - public MessageFrameEncoder() {} - - @Override - protected void encode( - final ChannelHandlerContext ctx, - final RawMessage msg, - final ByteBuf out) { - byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8); - SerializeHelper builder = new SerializeHelper(); - ByteBuf buf = builder.writeBytes(idBytes) - .writeInt(msg.code()) - .writeBytes(msg.data()) - .build(); - - out.writeBytes(buf); - buf.release(); - } -} \ No newline at end of file diff --git a/src/main/java/org/codenil/comm/handler/MessageHandler.java b/src/main/java/org/codenil/comm/handler/MessageHandler.java deleted file mode 100644 index a72bc20..0000000 --- a/src/main/java/org/codenil/comm/handler/MessageHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.codenil.comm.handler; - -import io.netty.buffer.ByteBuf; -import org.codenil.comm.handshake.PlainMessage; -import org.codenil.comm.message.MessageType; -import org.codenil.comm.serialize.SerializeHelper; - -public class MessageHandler { - public static byte[] buildMessage(final PlainMessage message) { - SerializeHelper builder = new SerializeHelper(); - ByteBuf buf = builder.writeInt(message.messageType().getValue()) - .writeInt(message.code()) - .writeBytes(message.data()).build(); - - byte[] result = new byte[buf.readableBytes()]; - buf.readBytes(result); - buf.release(); - return result; - } - - public static byte[] buildMessage( - final MessageType messageType, - final int code, - final byte[] data) { - return buildMessage(new PlainMessage(messageType, code, data)); - } - - public static PlainMessage parseMessage(final ByteBuf buf) { - PlainMessage ret = null; - - buf.readerIndex(0); - - //跳过版本 - int versionLength = buf.readInt(); - buf.skipBytes(versionLength); - - int payloadLength = buf.readInt(); - if(payloadLength < 8) { - return ret; - } - - int messageType = buf.readInt(); - int code = buf.readInt(); - int dataLength = buf.readInt(); - byte[] data = new byte[dataLength]; - buf.readBytes(data); - - ret = new PlainMessage(MessageType.forNumber(messageType), code, data); - return ret; - } -} diff --git a/src/main/java/org/codenil/comm/handshake/PlainMessage.java b/src/main/java/org/codenil/comm/handshake/PlainMessage.java deleted file mode 100644 index aa376e1..0000000 --- a/src/main/java/org/codenil/comm/handshake/PlainMessage.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.codenil.comm.handshake; - -import org.codenil.comm.message.MessageType; - -public class PlainMessage { - private final MessageType messageType; - private final int code; - private final byte[] data; - - public PlainMessage(final MessageType messageType, final byte[] data) { - this(messageType, -1, data); - } - - public PlainMessage(final MessageType messageType, final int code, final byte[] data) { - this.messageType = messageType; - this.code = code; - this.data = data; - } - - public MessageType messageType() { - return messageType; - } - - public byte[] data() { - return data; - } - - public int code() { - return code; - } -} diff --git a/src/main/java/org/codenil/comm/message/AbstractMessage.java b/src/main/java/org/codenil/comm/message/AbstractMessage.java deleted file mode 100644 index 7682fcc..0000000 --- a/src/main/java/org/codenil/comm/message/AbstractMessage.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.codenil.comm.message; - -public abstract class AbstractMessage implements Message { - - private String requestId; - - private byte[] data; - - @Override - public String requestId() { - return requestId; - } - - @Override - public final int size() { - return data.length; - } - - @Override - public byte[] data() { - return data; - } - - public void setRequestId(String requestId) { - this.requestId = requestId; - } - - public void setData(byte[] data) { - this.data = data; - } -} diff --git a/src/main/java/org/codenil/comm/message/DisconnectMessage.java b/src/main/java/org/codenil/comm/message/DisconnectMessage.java deleted file mode 100644 index 2639e3a..0000000 --- a/src/main/java/org/codenil/comm/message/DisconnectMessage.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.codenil.comm.message; - -public class DisconnectMessage extends AbstractMessage { - - private DisconnectMessage(final byte[] data) { - super.setData(data); - } - - public static DisconnectMessage create(final DisconnectReason reason) { - return new DisconnectMessage(new byte[]{reason.code()}); - } - - public static DisconnectMessage readFrom(final Message message) { - if (message instanceof DisconnectMessage) { - return (DisconnectMessage) message; - } - final int code = message.code(); - if (code != MessageCodes.DISCONNECT) { - throw new IllegalArgumentException( - String.format("Message has code %d and thus is not a DisconnectMessage.", code)); - } - return new DisconnectMessage(message.data()); - } - - @Override - public int code() { - return MessageCodes.DISCONNECT; - } -} diff --git a/src/main/java/org/codenil/comm/message/EmptyMessage.java b/src/main/java/org/codenil/comm/message/EmptyMessage.java deleted file mode 100644 index e8c50fd..0000000 --- a/src/main/java/org/codenil/comm/message/EmptyMessage.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.codenil.comm.message; - -public abstract class EmptyMessage implements Message { - - @Override - public final int size() { - return 0; - } - - @Override - public byte[] data() { - return new byte[]{}; - } - - @Override - public String toString() { - return getClass().getSimpleName() + "{ code=" + code() + ", size=0}"; - } -} diff --git a/src/main/java/org/codenil/comm/message/HelloMessage.java b/src/main/java/org/codenil/comm/message/HelloMessage.java deleted file mode 100644 index f642ab3..0000000 --- a/src/main/java/org/codenil/comm/message/HelloMessage.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.codenil.comm.message; - -public class HelloMessage extends AbstractMessage { - - public HelloMessage(final byte[] data) { - super.setData(data); - } - - public static HelloMessage create(byte[] bytes) { - return new HelloMessage(new byte[0]); - } - - @Override - public int code() { - return MessageCodes.HELLO; - } -} diff --git a/src/main/java/org/codenil/comm/message/Message.java b/src/main/java/org/codenil/comm/message/Message.java deleted file mode 100644 index c4cc6e8..0000000 --- a/src/main/java/org/codenil/comm/message/Message.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.codenil.comm.message; - -public interface Message { - - String requestId(); - - int size(); - - int code(); - - byte[] data(); - - default RawMessage wrapMessage(final String requestId) { - RawMessage rawMessage = RawMessage.create(code()); - rawMessage.setRequestId(requestId); - rawMessage.setData(data()); - return rawMessage; - } -} diff --git a/src/main/java/org/codenil/comm/message/MessageType.java b/src/main/java/org/codenil/comm/message/MessageType.java deleted file mode 100644 index 6a048c3..0000000 --- a/src/main/java/org/codenil/comm/message/MessageType.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.codenil.comm.message; - -public enum MessageType { - PING(0), - PONG(1), - DATA(2), - UNRECOGNIZED(-1), - ; - - private final int value; - - MessageType(final int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - public static MessageType forNumber(final int value) { - return switch (value) { - case 0 -> PING; - case 1 -> PONG; - case 2 -> DATA; - case -1 -> UNRECOGNIZED; - default -> null; - }; - } -} diff --git a/src/main/java/org/codenil/comm/message/PingMessage.java b/src/main/java/org/codenil/comm/message/PingMessage.java deleted file mode 100644 index b214724..0000000 --- a/src/main/java/org/codenil/comm/message/PingMessage.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.codenil.comm.message; - -public class PingMessage extends EmptyMessage { - private static final PingMessage INSTANCE = new PingMessage(); - - public static PingMessage get() { - return INSTANCE; - } - - private PingMessage() {} - - @Override - public String requestId() { - return ""; - } - - @Override - public int code() { - return MessageCodes.PING; - } - - @Override - public String toString() { - return "PingMessage{data=''}"; - } -} diff --git a/src/main/java/org/codenil/comm/message/PongMessage.java b/src/main/java/org/codenil/comm/message/PongMessage.java deleted file mode 100644 index e56a193..0000000 --- a/src/main/java/org/codenil/comm/message/PongMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.codenil.comm.message; - -public class PongMessage extends EmptyMessage { - private static final PongMessage INSTANCE = new PongMessage(); - - public static PongMessage get() { - return INSTANCE; - } - - private PongMessage() {} - - @Override - public String requestId() { - return ""; - } - - @Override - public int code() { - return MessageCodes.PONG; - } -} diff --git a/src/main/java/org/codenil/comm/message/RawMessage.java b/src/main/java/org/codenil/comm/message/RawMessage.java deleted file mode 100644 index bd1f798..0000000 --- a/src/main/java/org/codenil/comm/message/RawMessage.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.codenil.comm.message; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.codenil.comm.serialize.SerializeHelper; - -public class RawMessage extends AbstractMessage { - - private final int code; - - protected RawMessage(final int code) { - this.code = code; - } - - public static RawMessage create( - final int code) { - return new RawMessage(code); - } - - public static RawMessage decode(byte[] messageBytes) { - ByteBuf buf = Unpooled.wrappedBuffer(messageBytes); - buf.readerIndex(0); - - int code = buf.readInt(); - - int requestIdLength = buf.readInt(); - byte[] requestIdBytes = new byte[requestIdLength]; - buf.readBytes(requestIdBytes); - - int dataLength = buf.readInt(); - byte[] dataBytes = new byte[dataLength]; - buf.readBytes(dataBytes); - - RawMessage rawMessage = create(code); - rawMessage.setRequestId(new String(requestIdBytes)); - rawMessage.setData(dataBytes); - return rawMessage; - } - - public static byte[] encode(RawMessage rawMessage) { - SerializeHelper converter = new SerializeHelper(); - - ByteBuf byteBuf = converter - .writeVersion("1.0") - .writeInt(rawMessage.code()) - .writeString(rawMessage.requestId()) - .writeBytes(rawMessage.data()) - .build(); - return converter.readPayload(byteBuf); - } - - @Override - public int code() { - return code; - } -}