把复杂的协议改成json

This commit is contained in:
alyenc 2025-04-01 01:55:41 +08:00
parent ca6a44bbed
commit 086979ef33
55 changed files with 358 additions and 665 deletions

View File

@ -1,21 +1,19 @@
package org.codenil.comm; package dev.xiushen.andes.comm;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import dev.xiushen.andes.comm.callback.ConnectCallback;
import org.codenil.comm.callback.ConnectCallback; import dev.xiushen.andes.comm.callback.DisconnectCallback;
import org.codenil.comm.callback.DisconnectCallback; import dev.xiushen.andes.comm.callback.MessageCallback;
import org.codenil.comm.callback.MessageCallback; import dev.xiushen.andes.comm.connections.ConnectionInitializer;
import org.codenil.comm.connections.ConnectionInitializer; import dev.xiushen.andes.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import org.codenil.comm.connections.PeerConnectionEvents; import dev.xiushen.andes.comm.message.DisconnectReason;
import org.codenil.comm.message.DisconnectReason; import dev.xiushen.andes.comm.message.Message;
import org.codenil.comm.message.RawMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -57,17 +55,17 @@ public class Communication {
new IllegalStateException("Unable to start an already started " + getClass().getSimpleName())); new IllegalStateException("Unable to start an already started " + getClass().getSimpleName()));
} }
//注册回调监听 //注册连接成功回调
connectionEvents.subscribeConnect(this::dispatchConnect); connectionEvents.subscribeConnect(this::dispatchConnect);
//启动连接初始化 //启动连接初始化
return connectionInitializer return connectionInitializer
.start() .start()
.thenApply((socketAddress) -> { .thenApply((socketAddress) -> {
logger.info("P2P RLPx agent started and listening on {}.", socketAddress); logger.info("Communication started and listening on {}.", socketAddress);
return socketAddress.getPort(); return socketAddress.getPort();
}) })
.whenComplete((_, err) -> { .whenComplete((integer, err) -> {
if (err != null) { if (err != null) {
logger.error("Failed to start Communication. Check for port conflicts."); logger.error("Failed to start Communication. Check for port conflicts.");
} }
@ -179,14 +177,14 @@ public class Communication {
/** /**
* 收到消息后调用注册的回调 * 收到消息后调用注册的回调
*/ */
private void dispatchMessage(final PeerConnection connection, final RawMessage message) { private void dispatchMessage(final PeerConnection connection, final String message) {
connectionEvents.dispatchMessage(connection, message); connectionEvents.dispatchMessage(connection, message);
} }
/** /**
* 收到指定code消息后调用指定回调 * 收到指定code消息后调用指定回调
*/ */
private void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) { private void dispatchMessageByCode(final int code, final PeerConnection connection, final String message) {
connectionEvents.dispatchMessageByCode(code, connection, message); connectionEvents.dispatchMessageByCode(code, connection, message);
} }
} }

View File

@ -1,4 +1,4 @@
package org.codenil.comm; package dev.xiushen.andes.comm;
public class NetworkConfig { public class NetworkConfig {

View File

@ -1,17 +1,22 @@
package org.codenil.comm; package dev.xiushen.andes.comm;
import org.codenil.comm.callback.ConnectCallback; import dev.xiushen.andes.comm.callback.ConnectCallback;
import org.codenil.comm.callback.DisconnectCallback; import dev.xiushen.andes.comm.callback.DisconnectCallback;
import org.codenil.comm.callback.MessageCallback; import dev.xiushen.andes.comm.callback.MessageCallback;
import org.codenil.comm.connections.*; import dev.xiushen.andes.comm.connections.ConnectionInitializer;
import org.codenil.comm.message.DisconnectReason; import dev.xiushen.andes.comm.connections.PeerConnection;
import org.codenil.comm.message.Message; import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import org.codenil.comm.netty.NettyConnectionInitializer; import dev.xiushen.andes.comm.message.DisconnectReason;
import dev.xiushen.andes.comm.message.Message;
import dev.xiushen.andes.comm.netty.NettyConnectionInitializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Clock; import java.time.Clock;
import java.util.*; import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -58,7 +63,7 @@ public class NetworkService {
if (stopped.compareAndSet(false, true)) { if (stopped.compareAndSet(false, true)) {
logger.info("Stopping Network."); logger.info("Stopping Network.");
CompletableFuture<Void> stop = communication.stop(); CompletableFuture<Void> stop = communication.stop();
return stop.whenComplete((result, throwable) -> { return stop.whenComplete((Void, throwable) -> {
shutdown.countDown(); shutdown.countDown();
}); });
} else { } else {
@ -77,8 +82,8 @@ public class NetworkService {
/** /**
* 断开连接 * 断开连接
*/ */
public void disconnect(final String pkiId, final DisconnectReason reason) { public void disconnect(final RemotePeer remotePeer, final DisconnectReason reason) {
PeerConnection connection = aliveConnections.get(pkiId); PeerConnection connection = aliveConnections.get(remotePeer.identifier());
if(Objects.nonNull(connection)) { if(Objects.nonNull(connection)) {
try { try {
connection.disconnect(reason); connection.disconnect(reason);
@ -90,12 +95,12 @@ public class NetworkService {
/** /**
* 发送消息 * 发送消息
* @param pkiId 远程节点的PKIID * @param remotePeer 远程节点RemotePeer
* @param message 发送的信息 * @param message 发送的信息
*/ */
public void send(final String pkiId, final Message message) { public void send(final RemotePeer remotePeer, final Message message) {
try { try {
PeerConnection connection = aliveConnections.get(pkiId); PeerConnection connection = aliveConnections.get(remotePeer.identifier());
if (Objects.nonNull(connection)) { if (Objects.nonNull(connection)) {
connection.send(message); connection.send(message);
} }
@ -124,8 +129,8 @@ public class NetworkService {
communication.subscribeConnect(newConnection -> { communication.subscribeConnect(newConnection -> {
synchronized (this) { synchronized (this) {
callback.onConnect(newConnection); callback.onConnect(newConnection);
aliveConnections.put(newConnection.pkiId(), newConnection); aliveConnections.put(newConnection.remoteIdentifier(), newConnection);
deadConnections.remove(newConnection.pkiId(), newConnection); deadConnections.remove(newConnection.remoteIdentifier(), newConnection);
} }
}); });
} }
@ -138,8 +143,8 @@ public class NetworkService {
communication.subscribeDisconnect(connection -> { communication.subscribeDisconnect(connection -> {
synchronized (this) { synchronized (this) {
callback.onDisconnect(connection); callback.onDisconnect(connection);
aliveConnections.remove(connection.pkiId(), connection); aliveConnections.remove(connection.remoteIdentifier(), connection);
deadConnections.put(connection.pkiId(), connection); deadConnections.put(connection.remoteIdentifier(), connection);
} }
}); });
} }
@ -161,5 +166,4 @@ public class NetworkService {
public List<PeerConnection> aliveConnections() { public List<PeerConnection> aliveConnections() {
return new ArrayList<>(aliveConnections.values()); return new ArrayList<>(aliveConnections.values());
} }
} }

View File

@ -1,8 +1,8 @@
package org.codenil.comm; package dev.xiushen.andes.comm;
public class RemotePeer { public class RemotePeer {
private String pkiId; private String identifier;
private final String endpoint; private final String endpoint;
@ -11,8 +11,8 @@ public class RemotePeer {
this.endpoint = endpoint; this.endpoint = endpoint;
} }
public String pkiId() { public String identifier() {
return pkiId; return identifier;
} }
public String endpoint() { public String endpoint() {
@ -20,11 +20,11 @@ public class RemotePeer {
} }
public void setPkiId(String pkiId) { public void setPkiId(String pkiId) {
this.pkiId = pkiId; this.identifier = pkiId;
} }
public String toString() { public String toString() {
return String.format("%s, pkiId:%s", this.endpoint(), this.pkiId); return String.format("%s, identifier:%s", this.endpoint(), this.identifier);
} }
} }

View File

@ -1,8 +1,7 @@
package org.codenil.comm; package dev.xiushen.andes.comm;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnection;
import org.codenil.comm.message.MessageCodes; import dev.xiushen.andes.comm.message.MessageCodes;
import org.codenil.comm.message.RawMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -22,7 +21,7 @@ public class Test {
private static void server() throws Exception { private static void server() throws Exception {
NetworkConfig config = new NetworkConfig(); NetworkConfig config = new NetworkConfig();
config.setBindHost("192.168.8.30"); config.setBindHost("192.168.0.26");
config.setBindPort(8080); config.setBindPort(8080);
NetworkService service = new NetworkService(config, ""); NetworkService service = new NetworkService(config, "");
@ -31,29 +30,27 @@ public class Test {
start.whenComplete((res, err) -> { start.whenComplete((res, err) -> {
service.subscribeMessageByCode(MessageCodes.GOSSIP, message -> { service.subscribeMessageByCode(MessageCodes.GOSSIP, message -> {
logger.info("接收到消息:" + message.message().code()); logger.info("接收到消息:{}", message.message());
}); });
}); });
} }
private static void client() throws Exception { private static void client() throws Exception {
NetworkConfig config = new NetworkConfig(); NetworkConfig config = new NetworkConfig();
config.setBindHost("192.168.8.30"); config.setBindHost("192.168.31.58");
config.setBindPort(8090); config.setBindPort(9091);
NetworkService service = new NetworkService(config, ""); NetworkService service = new NetworkService(config, "");
CompletableFuture<Integer> start = service.start(); CompletableFuture<Integer> start = service.start();
start.whenComplete((res, err) -> { start.whenComplete((res, err) -> {
RemotePeer remotePeer = new RemotePeer("192.168.8.30:8080"); RemotePeer remotePeer = new RemotePeer("192.168.31.58:9090");
CompletableFuture<PeerConnection> conn = service.connect(remotePeer); CompletableFuture<PeerConnection> conn = service.connect(remotePeer);
conn.whenComplete((cres, cerr) -> { conn.whenComplete((cres, cerr) -> {
if (cerr == null) { if (cerr == null) {
try { try {
RawMessage rawMessage = RawMessage.create(MessageCodes.GOSSIP);
rawMessage.setData("test".getBytes(StandardCharsets.UTF_8));
service.send(remotePeer.pkiId(), rawMessage);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -1,6 +1,6 @@
package org.codenil.comm.callback; package dev.xiushen.andes.comm.callback;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnection;
/** /**
* 连接回调 * 连接回调

View File

@ -1,6 +1,6 @@
package org.codenil.comm.callback; package dev.xiushen.andes.comm.callback;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnection;
@FunctionalInterface @FunctionalInterface
public interface DisconnectCallback { public interface DisconnectCallback {

View File

@ -1,9 +1,8 @@
package org.codenil.comm.callback; package dev.xiushen.andes.comm.callback;
import org.codenil.comm.message.DefaultMessage; import dev.xiushen.andes.comm.message.DefaultMessage;
@FunctionalInterface @FunctionalInterface
public interface MessageCallback { public interface MessageCallback {
void onMessage(final DefaultMessage message); void onMessage(final DefaultMessage message);
} }

View File

@ -0,0 +1,9 @@
package dev.xiushen.andes.comm.callback;
import dev.xiushen.andes.comm.message.DefaultMessage;
import dev.xiushen.andes.comm.message.Message;
@FunctionalInterface
public interface ResponseCallback {
Message response(DefaultMessage message);
}

View File

@ -1,12 +1,11 @@
package org.codenil.comm.connections; package dev.xiushen.andes.comm.connections;
import dev.xiushen.andes.comm.RemotePeer;
import dev.xiushen.andes.comm.message.Message;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import org.codenil.comm.RemotePeer;
import org.codenil.comm.message.Message;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractPeerConnection implements PeerConnection { public abstract class AbstractPeerConnection implements PeerConnection {
@ -26,14 +25,6 @@ public abstract class AbstractPeerConnection implements PeerConnection {
this.remoteIdentifier = remoteIdentifier; this.remoteIdentifier = remoteIdentifier;
} }
@Override
public String pkiId() {
if(Objects.isNull(remotePeer)) {
throw new IllegalStateException("connection not complated yet");
}
return remotePeer.pkiId();
}
@Override @Override
public void send(final Message message) { public void send(final Message message) {
doSendMessage(message); doSendMessage(message);

View File

@ -1,6 +1,6 @@
package org.codenil.comm.connections; package dev.xiushen.andes.comm.connections;
import org.codenil.comm.RemotePeer; import dev.xiushen.andes.comm.RemotePeer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;

View File

@ -1,12 +1,11 @@
package org.codenil.comm.connections; package dev.xiushen.andes.comm.connections;
import dev.xiushen.andes.comm.message.DisconnectReason;
import dev.xiushen.andes.comm.message.PingMessage;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import org.codenil.comm.message.DisconnectReason;
import org.codenil.comm.message.PingMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,7 +45,7 @@ public class KeepAlive extends ChannelDuplexHandler {
try { try {
logger.debug("Idle connection detected, sending Wire PING to peer."); logger.debug("Idle connection detected, sending Wire PING to peer.");
connection.send(PingMessage.get()); connection.send(new PingMessage());
waitingForPong.set(true); waitingForPong.set(true);
} catch (Exception e) { } catch (Exception e) {
logger.trace("PING not sent because peer is already disconnected"); logger.trace("PING not sent because peer is already disconnected");

View File

@ -1,14 +1,12 @@
package org.codenil.comm.connections; package dev.xiushen.andes.comm.connections;
import dev.xiushen.andes.comm.RemotePeer;
import dev.xiushen.andes.comm.message.DisconnectReason;
import dev.xiushen.andes.comm.message.Message;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import org.codenil.comm.RemotePeer;
import org.codenil.comm.message.DisconnectReason;
import org.codenil.comm.message.Message;
public interface PeerConnection { public interface PeerConnection {
String pkiId();
String remoteIdentifier(); String remoteIdentifier();
boolean disconnected(); boolean disconnected();

View File

@ -1,10 +1,9 @@
package org.codenil.comm.connections; package dev.xiushen.andes.comm.connections;
import org.codenil.comm.callback.ConnectCallback; import dev.xiushen.andes.comm.callback.ConnectCallback;
import org.codenil.comm.callback.DisconnectCallback; import dev.xiushen.andes.comm.callback.DisconnectCallback;
import org.codenil.comm.callback.MessageCallback; import dev.xiushen.andes.comm.callback.MessageCallback;
import org.codenil.comm.message.DefaultMessage; import dev.xiushen.andes.comm.message.DefaultMessage;
import org.codenil.comm.message.RawMessage;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -36,7 +35,7 @@ public class PeerConnectionEvents {
public void subscribeByCode(final int messageCode, final MessageCallback callback) { public void subscribeByCode(final int messageCode, final MessageCallback callback) {
subscribersByCode subscribersByCode
.computeIfAbsent(messageCode, _ -> Subscribers.create()) .computeIfAbsent(messageCode, integer -> Subscribers.create())
.subscribe(callback); .subscribe(callback);
} }
@ -50,12 +49,12 @@ public class PeerConnectionEvents {
disconnectSubscribers.forEach(s -> s.onDisconnect(connection)); disconnectSubscribers.forEach(s -> s.onDisconnect(connection));
} }
public void dispatchMessage(final PeerConnection connection, final RawMessage message) { public void dispatchMessage(final PeerConnection connection, final String message) {
final DefaultMessage msg = new DefaultMessage(connection, message); final DefaultMessage msg = new DefaultMessage(connection, message);
messageSubscribers.forEach(s -> s.onMessage(msg)); messageSubscribers.forEach(s -> s.onMessage(msg));
} }
public void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) { public void dispatchMessageByCode(final int code, final PeerConnection connection, final String message) {
final DefaultMessage msg = new DefaultMessage(connection, message); final DefaultMessage msg = new DefaultMessage(connection, message);
subscribersByCode.get(code).forEach(s -> s.onMessage(msg)); subscribersByCode.get(code).forEach(s -> s.onMessage(msg));
} }

View File

@ -1,4 +1,4 @@
package org.codenil.comm.connections; package dev.xiushen.andes.comm.connections;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;

View File

@ -1,19 +1,19 @@
package org.codenil.comm.handler; package dev.xiushen.andes.comm.handler;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import dev.xiushen.andes.comm.connections.PeerConnection;
import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import dev.xiushen.andes.comm.message.MessageCodes;
import dev.xiushen.andes.comm.message.PongMessage;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import org.codenil.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnectionEvents;
import org.codenil.comm.message.MessageCodes;
import org.codenil.comm.message.PongMessage;
import org.codenil.comm.message.RawMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class CommonHandler extends SimpleChannelInboundHandler<RawMessage> { public class CommonHandler extends SimpleChannelInboundHandler<String> {
private static final Logger logger = LoggerFactory.getLogger(CommonHandler.class); private static final Logger logger = LoggerFactory.getLogger(CommonHandler.class);
@ -31,13 +31,15 @@ public class CommonHandler extends SimpleChannelInboundHandler<RawMessage> {
} }
@Override @Override
protected void channelRead0(final ChannelHandlerContext ctx, final RawMessage originalMessage) { protected void channelRead0(final ChannelHandlerContext ctx, final String message) {
logger.debug("Received a message from {}", originalMessage.code()); JsonObject jsonObject = new Gson().fromJson(message, JsonObject.class);
switch (originalMessage.code()) { int code = jsonObject.getAsJsonPrimitive("code").getAsInt();
logger.debug("Received a message from {}", code);
switch (code) {
case MessageCodes.PING: case MessageCodes.PING:
logger.trace("Received Wire PING"); logger.trace("Received Wire PING");
try { try {
connection.send(PongMessage.get()); connection.send(new PongMessage());
} catch (Exception e) { } catch (Exception e) {
// Nothing to do // Nothing to do
} }
@ -55,7 +57,7 @@ public class CommonHandler extends SimpleChannelInboundHandler<RawMessage> {
connection.terminateConnection(); connection.terminateConnection();
} }
connectionEvents.dispatchMessage(connection, originalMessage); connectionEvents.dispatchMessage(connection, message);
} }
@Override @Override

View File

@ -1,32 +1,31 @@
package org.codenil.comm.handler; package dev.xiushen.andes.comm.handler;
import io.netty.buffer.ByteBuf; import com.google.gson.Gson;
import com.google.gson.JsonObject;
import dev.xiushen.andes.comm.connections.KeepAlive;
import dev.xiushen.andes.comm.connections.PeerConnection;
import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import dev.xiushen.andes.comm.message.DisconnectMessage;
import dev.xiushen.andes.comm.message.DisconnectReason;
import dev.xiushen.andes.comm.message.HelloMessage;
import dev.xiushen.andes.comm.message.MessageCodes;
import dev.xiushen.andes.comm.netty.NettyPeerConnection;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import org.codenil.comm.connections.KeepAlive;
import org.codenil.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnectionEvents;
import org.codenil.comm.message.DisconnectMessage;
import org.codenil.comm.message.DisconnectReason;
import org.codenil.comm.message.MessageCodes;
import org.codenil.comm.message.RawMessage;
import org.codenil.comm.netty.NettyPeerConnection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class MessageFrameDecoder extends ByteToMessageDecoder { public class MessageFrameDecoder extends MessageToMessageDecoder<String> {
private static final Logger logger = LoggerFactory.getLogger(MessageFrameDecoder.class); private static final Logger logger = LoggerFactory.getLogger(MessageFrameDecoder.class);
private static final int LENGTH_FIELD_LENGTH = 4; // 长度字段占4字节
private final CompletableFuture<PeerConnection> connectionFuture; private final CompletableFuture<PeerConnection> connectionFuture;
private final PeerConnectionEvents connectionEvents; private final PeerConnectionEvents connectionEvents;
@ -41,46 +40,17 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
} }
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, String message, List<Object> out) {
if (byteBuf.readableBytes() < 4) {
return; // 不足4字节长度字段等待更多数据
}
byteBuf.readerIndex(0);
// 读取协议头消息总长度
int totalLength = byteBuf.readInt();
if (byteBuf.readableBytes() < totalLength - 4) {
return; // 不足消息总长度等待更多数据
}
// 读取payload
// 读取id
int idLength = byteBuf.readInt();
byte[] idBytes = new byte[idLength];
byteBuf.readBytes(idBytes);
String id = new String(idBytes, StandardCharsets.UTF_8);
// 读取code
int code = byteBuf.readInt();
// 读取data
int dataLength = byteBuf.readInt();;
byte[] data = new byte[dataLength];
byteBuf.readBytes(data);
// 创建消息对象 // 创建消息对象
RawMessage message = RawMessage.create(code); JsonObject jsonObject = new Gson().fromJson(message, JsonObject.class);
message.setRequestId(id); int code = jsonObject.getAsJsonPrimitive("code").getAsInt();
message.setData(data);
if (hellosExchanged) { if (hellosExchanged) {
out.add(message); out.add(message);
} else if (message.code() == MessageCodes.HELLO) { } else if (code == MessageCodes.HELLO) {
hellosExchanged = true; hellosExchanged = true;
String remoteIdentifier = new String(message.data()); HelloMessage helloMessage = new Gson().fromJson(message, HelloMessage.class);
String remoteIdentifier = helloMessage.getData();
final PeerConnection connection = new NettyPeerConnection(ctx, remoteIdentifier, connectionEvents); final PeerConnection connection = new NettyPeerConnection(ctx, remoteIdentifier, connectionEvents);
/* /*
@ -98,24 +68,22 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
.addLast("Common", new CommonHandler(connection, connectionEvents, waitingForPong)) .addLast("Common", new CommonHandler(connection, connectionEvents, waitingForPong))
.addLast("FrameEncoder", new MessageFrameEncoder()); .addLast("FrameEncoder", new MessageFrameEncoder());
connectionFuture.complete(connection); connectionFuture.complete(connection);
} else if (message.code() == MessageCodes.DISCONNECT) { } else if (code == MessageCodes.DISCONNECT) {
logger.debug("Disconnected before sending HELLO."); logger.debug("Disconnected before sending HELLO.");
ctx.close(); ctx.close();
connectionFuture.completeExceptionally(new RuntimeException("Disconnect")); connectionFuture.completeExceptionally(new RuntimeException("Disconnect"));
} else { } else {
if(code != MessageCodes.PONG) {
logger.debug( logger.debug(
"Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}", "Message received before HELLO's exchanged, disconnecting. Code: {}",
message.code(), Arrays.toString(message.data())); code);
DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.UNKNOWN); DisconnectMessage disconnectMessage = new DisconnectMessage(DisconnectReason.UNKNOWN.message());
ctx.writeAndFlush(disconnectMessage).addListener(Void -> ctx.close());
RawMessage rawMessage = RawMessage.create(disconnectMessage.code());
rawMessage.setData(disconnectMessage.data());
ctx.writeAndFlush(rawMessage)
.addListener(_ -> ctx.close());
connectionFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged")); connectionFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged"));
} }
} }
}
@Override @Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable throwable) public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable throwable)

View File

@ -0,0 +1,18 @@
package dev.xiushen.andes.comm.handler;
import com.google.gson.Gson;
import dev.xiushen.andes.comm.message.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
public class MessageFrameEncoder extends MessageToMessageEncoder<Message> {
public MessageFrameEncoder() {}
@Override
protected void encode(ChannelHandlerContext ctx, Message message, List<Object> list) {
list.add(new Gson().toJson(message));
}
}

View File

@ -1,4 +1,4 @@
package org.codenil.comm.handler; package dev.xiushen.andes.comm.handler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
@ -21,7 +21,7 @@ public class TimeoutHandler<C extends Channel> extends ChannelInitializer<C> {
} }
@Override @Override
protected void initChannel(final C ch) throws Exception { protected void initChannel(final C ch) {
ch.eventLoop().schedule(() -> { ch.eventLoop().schedule(() -> {
if (!condition.get()) { if (!condition.get()) {
callback.invoke(); callback.invoke();

View File

@ -1,25 +1,23 @@
package org.codenil.comm.handshake; package dev.xiushen.andes.comm.handshake;
import io.netty.buffer.ByteBuf; import com.google.gson.Gson;
import dev.xiushen.andes.comm.connections.PeerConnection;
import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import dev.xiushen.andes.comm.handler.MessageFrameDecoder;
import dev.xiushen.andes.comm.message.HelloMessage;
import dev.xiushen.andes.comm.message.Message;
import dev.xiushen.andes.comm.message.MessageCodes;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import org.codenil.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnectionEvents;
import org.codenil.comm.handler.MessageFrameDecoder;
import org.codenil.comm.message.HelloMessage;
import org.codenil.comm.message.MessageCodes;
import org.codenil.comm.message.RawMessage;
import org.codenil.comm.serialize.SerializeHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> { public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandler<String> {
private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class); private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class);
@ -43,8 +41,8 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl
} }
@Override @Override
protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) { protected void channelRead0(final ChannelHandlerContext ctx, final String msg) {
final Optional<ByteBuf> nextMsg = nextHandshakeMessage(msg); final Optional<String> nextMsg = nextHandshakeMessage(msg);
if (nextMsg.isPresent()) { if (nextMsg.isPresent()) {
ctx.writeAndFlush(nextMsg.get()); ctx.writeAndFlush(nextMsg.get());
@ -67,18 +65,13 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl
* 替换完编解码器后发送Hello消息 * 替换完编解码器后发送Hello消息
* hello消息需要带一些数据 * hello消息需要带一些数据
*/ */
HelloMessage helloMessage = HelloMessage.create(selfIdentifier.getBytes(StandardCharsets.UTF_8)); HelloMessage helloMessage = new HelloMessage(selfIdentifier);
RawMessage rawMessage = RawMessage.create(helloMessage.code()); ctx.writeAndFlush(helloMessage)
rawMessage.setData(helloMessage.data());
rawMessage.setRequestId(helloMessage.requestId());
ctx.writeAndFlush(rawMessage)
.addListener(ff -> { .addListener(ff -> {
if (ff.isSuccess()) { if (ff.isSuccess()) {
logger.trace("Successfully wrote hello message"); logger.trace("Successfully wrote hello message");
} }
}); });
msg.retain();
ctx.fireChannelRead(msg); ctx.fireChannelRead(msg);
} }
} }
@ -90,31 +83,22 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl
ctx.close(); ctx.close();
} }
protected abstract Optional<ByteBuf> nextHandshakeMessage(ByteBuf msg); protected abstract Optional<String> nextHandshakeMessage(String msg);
/** Ensures that wire hello message is the first message written. */ /** Ensures that wire hello message is the first message written. */
private static class FirstMessageFrameEncoder extends MessageToByteEncoder<RawMessage> { private static class FirstMessageFrameEncoder extends MessageToMessageEncoder<Message> {
private FirstMessageFrameEncoder() {} private FirstMessageFrameEncoder() {}
@Override @Override
protected void encode( protected void encode(
final ChannelHandlerContext context, final ChannelHandlerContext context,
final RawMessage msg, final Message msg,
final ByteBuf out) { final List<Object> list) {
if (msg.code() != MessageCodes.HELLO) { if (msg.getCode() != MessageCodes.HELLO) {
throw new IllegalStateException("First wire message sent wasn't a HELLO."); throw new IllegalStateException("First wire message sent wasn't a HELLO.");
} }
byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8); list.add(new Gson().toJson(msg));
SerializeHelper builder = new SerializeHelper();
ByteBuf buf = builder.writeBytes(idBytes)
.writeInt(msg.code())
.writeBytes(msg.data())
.build();
out.writeBytes(buf);
buf.release();
context.pipeline().remove(this); context.pipeline().remove(this);
} }
} }

View File

@ -1,8 +1,7 @@
package org.codenil.comm.handshake; package dev.xiushen.andes.comm.handshake;
import io.netty.buffer.ByteBuf; import dev.xiushen.andes.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import org.codenil.comm.connections.PeerConnectionEvents;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -22,8 +21,8 @@ public class HandshakeHandlerInbound extends AbstractHandshakeHandler {
} }
@Override @Override
protected Optional<ByteBuf> nextHandshakeMessage(ByteBuf msg) { protected Optional<String> nextHandshakeMessage(String msg) {
final Optional<ByteBuf> nextMsg; final Optional<String> nextMsg;
if (handshaker.getStatus() == HandshakeStatus.IN_PROGRESS) { if (handshaker.getStatus() == HandshakeStatus.IN_PROGRESS) {
nextMsg = handshaker.handleMessage(msg); nextMsg = handshaker.handleMessage(msg);
} else { } else {

View File

@ -1,10 +1,8 @@
package org.codenil.comm.handshake; package dev.xiushen.andes.comm.handshake;
import io.netty.buffer.ByteBuf; import dev.xiushen.andes.comm.connections.PeerConnection;
import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.codenil.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnectionEvents;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -16,9 +14,9 @@ import java.util.concurrent.CompletableFuture;
*/ */
public class HandshakeHandlerOutbound extends AbstractHandshakeHandler { public class HandshakeHandlerOutbound extends AbstractHandshakeHandler {
private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class); private static final Logger logger = LoggerFactory.getLogger(HandshakeHandlerOutbound.class);
private final ByteBuf first; private final String first;
public HandshakeHandlerOutbound( public HandshakeHandlerOutbound(
final String selfIdentifier, final String selfIdentifier,
@ -32,8 +30,8 @@ public class HandshakeHandlerOutbound extends AbstractHandshakeHandler {
} }
@Override @Override
protected Optional<ByteBuf> nextHandshakeMessage(ByteBuf msg) { protected Optional<String> nextHandshakeMessage(String msg) {
final Optional<ByteBuf> nextMsg; final Optional<String> nextMsg;
if (handshaker.getStatus() == HandshakeStatus.IN_PROGRESS) { if (handshaker.getStatus() == HandshakeStatus.IN_PROGRESS) {
nextMsg = handshaker.handleMessage(msg); nextMsg = handshaker.handleMessage(msg);
} else { } else {

View File

@ -1,4 +1,4 @@
package org.codenil.comm.handshake; package dev.xiushen.andes.comm.handshake;
public enum HandshakeStatus { public enum HandshakeStatus {
UNINITIALIZED, UNINITIALIZED,

View File

@ -1,6 +1,4 @@
package org.codenil.comm.handshake; package dev.xiushen.andes.comm.handshake;
import io.netty.buffer.ByteBuf;
import java.util.Optional; import java.util.Optional;
@ -12,7 +10,7 @@ public interface Handshaker {
HandshakeStatus getStatus(); HandshakeStatus getStatus();
ByteBuf firstMessage(); String firstMessage();
Optional<ByteBuf> handleMessage(ByteBuf buf); Optional<String> handleMessage(String buf);
} }

View File

@ -1,10 +1,10 @@
package org.codenil.comm.handshake; package dev.xiushen.andes.comm.handshake;
import io.netty.buffer.ByteBuf; import com.google.gson.Gson;
import io.netty.buffer.Unpooled; import com.google.gson.JsonObject;
import dev.xiushen.andes.comm.message.MessageCodes;
import org.codenil.comm.handler.MessageHandler; import dev.xiushen.andes.comm.message.PingMessage;
import org.codenil.comm.message.MessageType; import dev.xiushen.andes.comm.message.PongMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -15,14 +15,14 @@ import static com.google.common.base.Preconditions.checkState;
public class PlainHandshaker implements Handshaker { public class PlainHandshaker implements Handshaker {
private static final Logger logger = LoggerFactory.getLogger(AbstractHandshakeHandler.class); private static final Logger logger = LoggerFactory.getLogger(PlainHandshaker.class);
private final AtomicReference<HandshakeStatus> status = private final AtomicReference<HandshakeStatus> status =
new AtomicReference<>(HandshakeStatus.UNINITIALIZED); new AtomicReference<>(HandshakeStatus.UNINITIALIZED);
private boolean initiator; private boolean initiator;
private byte[] initiatorMsg; private String initiatorMsg;
private byte[] responderMsg; private String responderMsg;
@Override @Override
public void prepareInitiator() { public void prepareInitiator() {
@ -46,42 +46,41 @@ public class PlainHandshaker implements Handshaker {
} }
@Override @Override
public ByteBuf firstMessage() { public String firstMessage() {
checkState(initiator, "illegal invocation of firstMessage on non-initiator end of handshake"); checkState(initiator, "illegal invocation of firstMessage on non-initiator end of handshake");
checkState(status.compareAndSet(HandshakeStatus.PREPARED, HandshakeStatus.IN_PROGRESS), checkState(status.compareAndSet(HandshakeStatus.PREPARED, HandshakeStatus.IN_PROGRESS),
"illegal invocation of firstMessage, handshake had already started"); "illegal invocation of firstMessage, handshake had already started");
initiatorMsg = MessageHandler.buildMessage(MessageType.PING, MessageType.PING.getValue(), new byte[0]);
logger.trace("First plain handshake message under INITIATOR role"); logger.trace("First plain handshake message under INITIATOR role");
return Unpooled.wrappedBuffer(initiatorMsg); return new Gson().toJson(new PingMessage());
} }
@Override @Override
public Optional<ByteBuf> handleMessage(ByteBuf buf) { public Optional<String> handleMessage(String message) {
checkState(status.get() == HandshakeStatus.IN_PROGRESS, checkState(status.get() == HandshakeStatus.IN_PROGRESS,
"illegal invocation of onMessage on handshake that is not in progress"); "illegal invocation of onMessage on handshake that is not in progress");
PlainMessage message = MessageHandler.parseMessage(buf); JsonObject jsonObject = new Gson().fromJson(message, JsonObject.class);
Optional<byte[]> nextMsg = Optional.empty(); int code = jsonObject.getAsJsonPrimitive("code").getAsInt();
Optional<String> nextMsg = Optional.empty();
if (initiator) { if (initiator) {
checkState(responderMsg == null, checkState(responderMsg == null,
"unexpected message: responder message had " + "already been received"); "unexpected message: responder message had " + "already been received");
checkState(message.messageType().equals(MessageType.PONG), checkState(code == MessageCodes.PONG,
"unexpected message: needs to be a pong"); "unexpected message: needs to be a pong");
responderMsg = message.data(); responderMsg = message;
} else { } else {
checkState(initiatorMsg == null, checkState(initiatorMsg == null,
"unexpected message: initiator message " + "had already been received"); "unexpected message: initiator message " + "had already been received");
checkState(message.messageType().equals(MessageType.PING), checkState(code == MessageCodes.PING,
"unexpected message: needs to be a ping"); "unexpected message: needs to be a ping");
initiatorMsg = message.data(); initiatorMsg = message;
responderMsg = MessageHandler.buildMessage(MessageType.PONG, MessageType.PONG.getValue(), new byte[0]); responderMsg = new Gson().toJson(new PongMessage());
nextMsg = Optional.of(responderMsg); nextMsg = Optional.of(responderMsg);
} }
status.set(HandshakeStatus.SUCCESS); status.set(HandshakeStatus.SUCCESS);
logger.trace("Handshake status set to {}", status.get()); logger.trace("Handshake status set to {}", status.get());
return nextMsg.map(Unpooled::wrappedBuffer); return nextMsg;
} }
} }

View File

@ -0,0 +1,14 @@
package dev.xiushen.andes.comm.message;
public abstract class AbstractMessage implements Message {
private final int code;
public AbstractMessage(final int code) {
this.code = code;
}
public int getCode() {
return code;
}
}

View File

@ -0,0 +1,15 @@
package dev.xiushen.andes.comm.message;
public abstract class DataMessage extends AbstractMessage {
private final String data;
public DataMessage(final int code, final String data) {
super(code);
this.data = data;
}
public String getData() {
return data;
}
}

View File

@ -1,21 +1,21 @@
package org.codenil.comm.message; package dev.xiushen.andes.comm.message;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnection;
public class DefaultMessage { public class DefaultMessage {
private final RawMessage message; private final String message;
private final PeerConnection connection; private final PeerConnection connection;
public DefaultMessage( public DefaultMessage(
final PeerConnection connection, final PeerConnection connection,
final RawMessage message) { final String message) {
this.message = message; this.message = message;
this.connection = connection; this.connection = connection;
} }
public RawMessage message() { public String message() {
return message; return message;
} }

View File

@ -1,6 +1,6 @@
package org.codenil.comm.message; package dev.xiushen.andes.comm.message;
import org.codenil.comm.connections.PeerConnection; import dev.xiushen.andes.comm.connections.PeerConnection;
@FunctionalInterface @FunctionalInterface
public interface DisconnectCallback { public interface DisconnectCallback {

View File

@ -0,0 +1,8 @@
package dev.xiushen.andes.comm.message;
public class DisconnectMessage extends DataMessage {
public DisconnectMessage(final String data) {
super(MessageCodes.DISCONNECT, data);
}
}

View File

@ -1,4 +1,4 @@
package org.codenil.comm.message; package dev.xiushen.andes.comm.message;
public enum DisconnectReason { public enum DisconnectReason {

View File

@ -0,0 +1,8 @@
package dev.xiushen.andes.comm.message;
public abstract class EmptyMessage extends AbstractMessage {
public EmptyMessage(int code) {
super(code);
}
}

View File

@ -0,0 +1,8 @@
package dev.xiushen.andes.comm.message;
public class HelloMessage extends DataMessage {
public HelloMessage(final String data) {
super(MessageCodes.HELLO, data);
}
}

View File

@ -0,0 +1,6 @@
package dev.xiushen.andes.comm.message;
public interface Message {
int getCode();
}

View File

@ -1,4 +1,4 @@
package org.codenil.comm.message; package dev.xiushen.andes.comm.message;
@FunctionalInterface @FunctionalInterface
public interface MessageCallback { public interface MessageCallback {

View File

@ -1,4 +1,4 @@
package org.codenil.comm.message; package dev.xiushen.andes.comm.message;
public class MessageCodes { public class MessageCodes {
public static final int HELLO = 0x00; public static final int HELLO = 0x00;

View File

@ -0,0 +1,13 @@
package dev.xiushen.andes.comm.message;
public class PingMessage extends EmptyMessage {
public PingMessage() {
super(MessageCodes.PING);
}
@Override
public String toString() {
return "PingMessage{data=''}";
}
}

View File

@ -0,0 +1,8 @@
package dev.xiushen.andes.comm.message;
public class PongMessage extends EmptyMessage {
public PongMessage() {
super(MessageCodes.PONG);
}
}

View File

@ -1,5 +1,16 @@
package org.codenil.comm.netty; package dev.xiushen.andes.comm.netty;
import dev.xiushen.andes.comm.NetworkConfig;
import dev.xiushen.andes.comm.RemotePeer;
import dev.xiushen.andes.comm.callback.ConnectCallback;
import dev.xiushen.andes.comm.connections.ConnectionInitializer;
import dev.xiushen.andes.comm.connections.PeerConnection;
import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import dev.xiushen.andes.comm.connections.Subscribers;
import dev.xiushen.andes.comm.handler.TimeoutHandler;
import dev.xiushen.andes.comm.handshake.HandshakeHandlerInbound;
import dev.xiushen.andes.comm.handshake.HandshakeHandlerOutbound;
import dev.xiushen.andes.comm.handshake.PlainHandshaker;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
@ -7,22 +18,15 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import org.codenil.comm.NetworkConfig; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.codenil.comm.RemotePeer; import io.netty.handler.codec.LengthFieldPrepender;
import org.codenil.comm.callback.ConnectCallback; import io.netty.handler.codec.string.StringDecoder;
import org.codenil.comm.connections.ConnectionInitializer; import io.netty.handler.codec.string.StringEncoder;
import org.codenil.comm.connections.PeerConnection;
import org.codenil.comm.connections.PeerConnectionEvents;
import org.codenil.comm.connections.Subscribers;
import org.codenil.comm.handler.TimeoutHandler;
import org.codenil.comm.handshake.HandshakeHandlerInbound;
import org.codenil.comm.handshake.HandshakeHandlerOutbound;
import org.codenil.comm.handshake.PlainHandshaker;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -34,6 +38,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class NettyConnectionInitializer implements ConnectionInitializer { public class NettyConnectionInitializer implements ConnectionInitializer {
private static final int TIMEOUT_SECONDS = 10; private static final int TIMEOUT_SECONDS = 10;
private static final int MAX_FRAME_LENGTH = 1024 * 1024; // 最大消息长度
private static final int LENGTH_FIELD_OFFSET = 0; // 长度字段起始位置
private static final int LENGTH_FIELD_LENGTH = 4; // 长度字段占4字节
private static final int LENGTH_ADJUSTMENT = 0; // 长度字段后数据的偏移量
private static final int INITIAL_BYTES_TO_STRIP = 4; // 跳过长度字段
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create(); private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create();
private final PeerConnectionEvents eventDispatcher; private final PeerConnectionEvents eventDispatcher;
@ -76,9 +85,8 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.childHandler(inboundChannelInitializer()) .childHandler(inboundChannelInitializer())
.bind(config.bindHost(), config.bindPort()); .bind(config.bindHost(), config.bindPort());
server.addListener(future -> { this.server.addListener(future -> {
final InetSocketAddress socketAddress = final InetSocketAddress socketAddress = (InetSocketAddress) server.channel().localAddress();
(InetSocketAddress) server.channel().localAddress();
if (!future.isSuccess() || socketAddress == null) { if (!future.isSuccess() || socketAddress == null) {
final String message = final String message =
String.format("Unable to start listening on %s:%s. Check for port conflicts.", String.format("Unable to start listening on %s:%s. Check for port conflicts.",
@ -154,9 +162,17 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
protected void initChannel(final SocketChannel ch) throws Exception { protected void initChannel(final SocketChannel ch) throws Exception {
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>(); final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
connectionFuture.thenAccept(connection -> connectSubscribers.forEach(c -> c.onConnect(connection))); connectionFuture.thenAccept(connection -> connectSubscribers.forEach(c -> c.onConnect(connection)));
//连接处理器 //连接超时处理器
ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to fully establish incoming connection")); ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to fully establish incoming connection"));
//其他处理器TLS之类的
ch.pipeline().addLast(new LengthFieldPrepender(4)); // 编码器添加4字节长度头
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(
MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
// JSON 编解码基于String
ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // 解码ByteBuf -> String
ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8)); // 编码String -> ByteBuf
addAdditionalInboundHandlers(ch); addAdditionalInboundHandlers(ch);
//握手消息处理器 //握手消息处理器
ch.pipeline().addLast(inboundHandler(selfIdentifier, connectionFuture)); ch.pipeline().addLast(inboundHandler(selfIdentifier, connectionFuture));
@ -172,8 +188,18 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
protected void initChannel(final SocketChannel ch) throws Exception { protected void initChannel(final SocketChannel ch) throws Exception {
//连接处理器 //连接处理器
ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.toString())); ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.toString()));
ch.pipeline().addLast(new LengthFieldPrepender(4)); // 编码器添加4字节长度头
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(
MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
// JSON 编解码基于String
ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // 解码ByteBuf -> String
ch.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8)); // 编码String -> ByteBuf
//其他处理器 //其他处理器
addAdditionalOutboundHandlers(ch, remotePeer); addAdditionalOutboundHandlers(ch, remotePeer);
//握手消息处理器 //握手消息处理器
ch.pipeline().addLast(outboundHandler(selfIdentifier, remotePeer, connectionFuture)); ch.pipeline().addLast(outboundHandler(selfIdentifier, remotePeer, connectionFuture));
} }

View File

@ -1,15 +1,13 @@
package org.codenil.comm.netty; package dev.xiushen.andes.comm.netty;
import dev.xiushen.andes.comm.connections.AbstractPeerConnection;
import dev.xiushen.andes.comm.connections.PeerConnectionEvents;
import dev.xiushen.andes.comm.message.DisconnectMessage;
import dev.xiushen.andes.comm.message.DisconnectReason;
import dev.xiushen.andes.comm.message.Message;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.codenil.comm.connections.AbstractPeerConnection;
import org.codenil.comm.connections.PeerConnectionEvents;
import org.codenil.comm.message.DisconnectMessage;
import org.codenil.comm.message.DisconnectReason;
import org.codenil.comm.message.Message;
import org.codenil.comm.message.RawMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -19,7 +17,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
public class NettyPeerConnection extends AbstractPeerConnection { public class NettyPeerConnection extends AbstractPeerConnection {
private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class); private static final Logger logger = LoggerFactory.getLogger(NettyPeerConnection.class);
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
@ -53,16 +51,14 @@ public class NettyPeerConnection extends AbstractPeerConnection {
public void disconnect(DisconnectReason reason) { public void disconnect(DisconnectReason reason) {
if (disconnected.compareAndSet(false, true)) { if (disconnected.compareAndSet(false, true)) {
connectionEvents.dispatchDisconnect(this); connectionEvents.dispatchDisconnect(this);
doSendMessage(DisconnectMessage.create(reason)); doSendMessage(new DisconnectMessage(reason.message()));
closeConnection(); closeConnection();
} }
} }
@Override @Override
protected void doSendMessage(final Message message) { protected void doSendMessage(final Message message) {
RawMessage rawMessage = RawMessage.create(message.code()); ctx.channel().writeAndFlush(message);
rawMessage.setData(message.data());
ctx.channel().writeAndFlush(rawMessage);
} }
@Override @Override

View File

@ -1,4 +1,4 @@
package org.codenil.comm.serialize; package dev.xiushen.andes.comm.serialize;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;

View File

@ -1,4 +1,4 @@
package org.codenil.comm.serialize; package dev.xiushen.andes.comm.serialize;
public class Version { public class Version {

View File

@ -1,9 +0,0 @@
package org.codenil.comm.callback;
import org.codenil.comm.message.DefaultMessage;
import org.codenil.comm.message.Message;
@FunctionalInterface
public interface ResponseCallback {
Message response(DefaultMessage message);
}

View File

@ -1,31 +0,0 @@
package org.codenil.comm.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.codenil.comm.message.RawMessage;
import org.codenil.comm.serialize.SerializeHelper;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class MessageFrameEncoder extends MessageToByteEncoder<RawMessage> {
public MessageFrameEncoder() {}
@Override
protected void encode(
final ChannelHandlerContext ctx,
final RawMessage msg,
final ByteBuf out) {
byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8);
SerializeHelper builder = new SerializeHelper();
ByteBuf buf = builder.writeBytes(idBytes)
.writeInt(msg.code())
.writeBytes(msg.data())
.build();
out.writeBytes(buf);
buf.release();
}
}

View File

@ -1,51 +0,0 @@
package org.codenil.comm.handler;
import io.netty.buffer.ByteBuf;
import org.codenil.comm.handshake.PlainMessage;
import org.codenil.comm.message.MessageType;
import org.codenil.comm.serialize.SerializeHelper;
public class MessageHandler {
public static byte[] buildMessage(final PlainMessage message) {
SerializeHelper builder = new SerializeHelper();
ByteBuf buf = builder.writeInt(message.messageType().getValue())
.writeInt(message.code())
.writeBytes(message.data()).build();
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
buf.release();
return result;
}
public static byte[] buildMessage(
final MessageType messageType,
final int code,
final byte[] data) {
return buildMessage(new PlainMessage(messageType, code, data));
}
public static PlainMessage parseMessage(final ByteBuf buf) {
PlainMessage ret = null;
buf.readerIndex(0);
//跳过版本
int versionLength = buf.readInt();
buf.skipBytes(versionLength);
int payloadLength = buf.readInt();
if(payloadLength < 8) {
return ret;
}
int messageType = buf.readInt();
int code = buf.readInt();
int dataLength = buf.readInt();
byte[] data = new byte[dataLength];
buf.readBytes(data);
ret = new PlainMessage(MessageType.forNumber(messageType), code, data);
return ret;
}
}

View File

@ -1,31 +0,0 @@
package org.codenil.comm.handshake;
import org.codenil.comm.message.MessageType;
public class PlainMessage {
private final MessageType messageType;
private final int code;
private final byte[] data;
public PlainMessage(final MessageType messageType, final byte[] data) {
this(messageType, -1, data);
}
public PlainMessage(final MessageType messageType, final int code, final byte[] data) {
this.messageType = messageType;
this.code = code;
this.data = data;
}
public MessageType messageType() {
return messageType;
}
public byte[] data() {
return data;
}
public int code() {
return code;
}
}

View File

@ -1,31 +0,0 @@
package org.codenil.comm.message;
public abstract class AbstractMessage implements Message {
private String requestId;
private byte[] data;
@Override
public String requestId() {
return requestId;
}
@Override
public final int size() {
return data.length;
}
@Override
public byte[] data() {
return data;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public void setData(byte[] data) {
this.data = data;
}
}

View File

@ -1,29 +0,0 @@
package org.codenil.comm.message;
public class DisconnectMessage extends AbstractMessage {
private DisconnectMessage(final byte[] data) {
super.setData(data);
}
public static DisconnectMessage create(final DisconnectReason reason) {
return new DisconnectMessage(new byte[]{reason.code()});
}
public static DisconnectMessage readFrom(final Message message) {
if (message instanceof DisconnectMessage) {
return (DisconnectMessage) message;
}
final int code = message.code();
if (code != MessageCodes.DISCONNECT) {
throw new IllegalArgumentException(
String.format("Message has code %d and thus is not a DisconnectMessage.", code));
}
return new DisconnectMessage(message.data());
}
@Override
public int code() {
return MessageCodes.DISCONNECT;
}
}

View File

@ -1,19 +0,0 @@
package org.codenil.comm.message;
public abstract class EmptyMessage implements Message {
@Override
public final int size() {
return 0;
}
@Override
public byte[] data() {
return new byte[]{};
}
@Override
public String toString() {
return getClass().getSimpleName() + "{ code=" + code() + ", size=0}";
}
}

View File

@ -1,17 +0,0 @@
package org.codenil.comm.message;
public class HelloMessage extends AbstractMessage {
public HelloMessage(final byte[] data) {
super.setData(data);
}
public static HelloMessage create(byte[] bytes) {
return new HelloMessage(new byte[0]);
}
@Override
public int code() {
return MessageCodes.HELLO;
}
}

View File

@ -1,19 +0,0 @@
package org.codenil.comm.message;
public interface Message {
String requestId();
int size();
int code();
byte[] data();
default RawMessage wrapMessage(final String requestId) {
RawMessage rawMessage = RawMessage.create(code());
rawMessage.setRequestId(requestId);
rawMessage.setData(data());
return rawMessage;
}
}

View File

@ -1,29 +0,0 @@
package org.codenil.comm.message;
public enum MessageType {
PING(0),
PONG(1),
DATA(2),
UNRECOGNIZED(-1),
;
private final int value;
MessageType(final int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static MessageType forNumber(final int value) {
return switch (value) {
case 0 -> PING;
case 1 -> PONG;
case 2 -> DATA;
case -1 -> UNRECOGNIZED;
default -> null;
};
}
}

View File

@ -1,26 +0,0 @@
package org.codenil.comm.message;
public class PingMessage extends EmptyMessage {
private static final PingMessage INSTANCE = new PingMessage();
public static PingMessage get() {
return INSTANCE;
}
private PingMessage() {}
@Override
public String requestId() {
return "";
}
@Override
public int code() {
return MessageCodes.PING;
}
@Override
public String toString() {
return "PingMessage{data=''}";
}
}

View File

@ -1,21 +0,0 @@
package org.codenil.comm.message;
public class PongMessage extends EmptyMessage {
private static final PongMessage INSTANCE = new PongMessage();
public static PongMessage get() {
return INSTANCE;
}
private PongMessage() {}
@Override
public String requestId() {
return "";
}
@Override
public int code() {
return MessageCodes.PONG;
}
}

View File

@ -1,56 +0,0 @@
package org.codenil.comm.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.codenil.comm.serialize.SerializeHelper;
public class RawMessage extends AbstractMessage {
private final int code;
protected RawMessage(final int code) {
this.code = code;
}
public static RawMessage create(
final int code) {
return new RawMessage(code);
}
public static RawMessage decode(byte[] messageBytes) {
ByteBuf buf = Unpooled.wrappedBuffer(messageBytes);
buf.readerIndex(0);
int code = buf.readInt();
int requestIdLength = buf.readInt();
byte[] requestIdBytes = new byte[requestIdLength];
buf.readBytes(requestIdBytes);
int dataLength = buf.readInt();
byte[] dataBytes = new byte[dataLength];
buf.readBytes(dataBytes);
RawMessage rawMessage = create(code);
rawMessage.setRequestId(new String(requestIdBytes));
rawMessage.setData(dataBytes);
return rawMessage;
}
public static byte[] encode(RawMessage rawMessage) {
SerializeHelper converter = new SerializeHelper();
ByteBuf byteBuf = converter
.writeVersion("1.0")
.writeInt(rawMessage.code())
.writeString(rawMessage.requestId())
.writeBytes(rawMessage.data())
.build();
return converter.readPayload(byteBuf);
}
@Override
public int code() {
return code;
}
}