From ca6a44bbed32e784815e581c0d784c10ff737ecc Mon Sep 17 00:00:00 2001 From: alyenc Date: Tue, 13 Aug 2024 15:39:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E4=BA=86=E4=B8=80=E6=B3=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/codenil/comm/Communication.java | 119 +++++---- .../org/codenil/comm/ConnectionStore.java | 25 -- .../org/codenil/comm/DataUpdateMessage.java | 29 --- .../java/org/codenil/comm/NetworkService.java | 229 +++++++----------- .../java/org/codenil/comm/PeerReputation.java | 114 --------- .../java/org/codenil/comm/RemotePeer.java | 32 ++- src/main/java/org/codenil/comm/Test.java | 64 +++++ .../ConnectCallback.java | 4 +- .../comm/callback/DisconnectCallback.java | 8 + .../comm/callback/MessageCallback.java | 9 + .../comm/callback/ResponseCallback.java | 9 + .../connections/AbstractPeerConnection.java | 68 ++++-- .../connections/ConnectionInitializer.java | 2 - .../codenil/comm/connections/KeepAlive.java | 5 +- .../comm/connections/PeerConnection.java | 14 +- .../connections/PeerConnectionEvents.java | 46 +++- .../{netty => }/handler/CommonHandler.java | 8 +- .../handler/MessageFrameDecoder.java | 53 ++-- .../handler/MessageFrameEncoder.java | 9 +- .../{netty => }/handler/MessageHandler.java | 2 +- .../{netty => }/handler/TimeoutHandler.java | 2 +- .../handshake/AbstractHandshakeHandler.java | 71 +++--- .../handshake/HandshakeHandlerInbound.java | 3 +- .../handshake/HandshakeHandlerOutbound.java | 4 +- .../comm/handshake/HandshakeSecrets.java | 113 --------- .../codenil/comm/handshake/Handshaker.java | 2 - .../comm/handshake/PlainHandshaker.java | 9 +- .../codenil/comm/message/AbstractMessage.java | 17 +- .../comm/message/DisconnectMessage.java | 12 +- .../comm/message/DisconnectReason.java | 26 +- .../codenil/comm/message/EmptyMessage.java | 6 +- .../codenil/comm/message/HelloMessage.java | 6 +- .../org/codenil/comm/message/Message.java | 11 +- .../codenil/comm/message/MessageCodes.java | 3 +- .../org/codenil/comm/message/PingMessage.java | 4 +- .../org/codenil/comm/message/PongMessage.java | 4 +- .../org/codenil/comm/message/RawMessage.java | 48 +++- .../netty/NettyConnectionInitializer.java | 42 ++-- .../comm/netty/NettyPeerConnection.java | 46 +++- .../codenil/comm/netty/OutboundMessage.java | 17 -- 40 files changed, 595 insertions(+), 700 deletions(-) delete mode 100644 src/main/java/org/codenil/comm/ConnectionStore.java delete mode 100644 src/main/java/org/codenil/comm/DataUpdateMessage.java delete mode 100644 src/main/java/org/codenil/comm/PeerReputation.java create mode 100644 src/main/java/org/codenil/comm/Test.java rename src/main/java/org/codenil/comm/{connections => callback}/ConnectCallback.java (59%) create mode 100644 src/main/java/org/codenil/comm/callback/DisconnectCallback.java create mode 100644 src/main/java/org/codenil/comm/callback/MessageCallback.java create mode 100644 src/main/java/org/codenil/comm/callback/ResponseCallback.java rename src/main/java/org/codenil/comm/{netty => }/handler/CommonHandler.java (94%) rename src/main/java/org/codenil/comm/{netty => }/handler/MessageFrameDecoder.java (69%) rename src/main/java/org/codenil/comm/{netty => }/handler/MessageFrameEncoder.java (76%) rename src/main/java/org/codenil/comm/{netty => }/handler/MessageHandler.java (97%) rename src/main/java/org/codenil/comm/{netty => }/handler/TimeoutHandler.java (96%) delete mode 100644 src/main/java/org/codenil/comm/handshake/HandshakeSecrets.java delete mode 100644 src/main/java/org/codenil/comm/netty/OutboundMessage.java diff --git a/src/main/java/org/codenil/comm/Communication.java b/src/main/java/org/codenil/comm/Communication.java index 02e8adb..8db55a3 100644 --- a/src/main/java/org/codenil/comm/Communication.java +++ b/src/main/java/org/codenil/comm/Communication.java @@ -2,13 +2,20 @@ package org.codenil.comm; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import org.codenil.comm.connections.*; + +import org.codenil.comm.callback.ConnectCallback; +import org.codenil.comm.callback.DisconnectCallback; +import org.codenil.comm.callback.MessageCallback; +import org.codenil.comm.connections.ConnectionInitializer; +import org.codenil.comm.connections.PeerConnection; +import org.codenil.comm.connections.PeerConnectionEvents; import org.codenil.comm.message.DisconnectReason; -import org.codenil.comm.message.MessageCallback; +import org.codenil.comm.message.RawMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -21,22 +28,14 @@ public class Communication { private static final Logger logger = LoggerFactory.getLogger(Communication.class); - /** - * 连接初始化 - */ + /** 连接初始化 */ private final ConnectionInitializer connectionInitializer; - /** - * 消息订阅 - */ + + /** 消息订阅 */ private final PeerConnectionEvents connectionEvents; - /** - * 连接回调订阅 - */ - private final Subscribers connectSubscribers = Subscribers.create(); - /** - * 连接缓存 - */ - private final Cache> peersConnectingCache = CacheBuilder.newBuilder() + + /** 连接缓存 */ + private final Cache> connectingCache = CacheBuilder.newBuilder() .expireAfterWrite(Duration.ofSeconds(30L)).concurrencyLevel(1).build(); private final AtomicBoolean started = new AtomicBoolean(false); @@ -59,7 +58,7 @@ public class Communication { } //注册回调监听 - setupListeners(); + connectionEvents.subscribeConnect(this::dispatchConnect); //启动连接初始化 return connectionInitializer @@ -81,7 +80,7 @@ public class Communication { new IllegalStateException("Illegal attempt to stop " + getClass().getSimpleName())); } - peersConnectingCache.asMap() + connectingCache.asMap() .values() .forEach((conn) -> { try { @@ -97,17 +96,17 @@ public class Communication { * 连接到远程节点 */ public CompletableFuture connect(final RemotePeer remotePeer) { - final CompletableFuture peerConnectionCompletableFuture; + final CompletableFuture completableFuture; try { synchronized (this) { //尝试从缓存获取链接,获取不到就创建一个 - peerConnectionCompletableFuture = peersConnectingCache.get( - remotePeer.ip(), () -> createConnection(remotePeer)); + completableFuture = connectingCache.get(remotePeer.endpoint(), + () -> createConnection(remotePeer)); } } catch (final ExecutionException e) { throw new RuntimeException(e); } - return peerConnectionCompletableFuture; + return completableFuture; } /** @@ -121,7 +120,21 @@ public class Communication { * 订阅连接 */ public void subscribeConnect(final ConnectCallback callback) { - connectSubscribers.subscribe(callback); + connectionEvents.subscribeConnect(callback); + } + + /** + * 订阅断开 + */ + public void subscribeDisconnect(final DisconnectCallback callback) { + connectionEvents.subscribeDisconnect(callback); + } + + /** + * 订阅指定code的消息 + */ + public void subscribeByCode(final int code, final MessageCallback callback) { + connectionEvents.subscribeByCode(code, callback); } /** @@ -129,45 +142,51 @@ public class Communication { */ @Nonnull private CompletableFuture createConnection(final RemotePeer remotePeer) { - CompletableFuture completableFuture = initiateOutboundConnection(remotePeer); + CompletableFuture completableFuture = connectionInitializer + .connect(remotePeer) + .whenComplete((conn, err) -> { + if (err != null) { + logger.debug("Failed to connect to peer {}", remotePeer.toString()); + } else { + logger.debug("Outbound connection established to peer: {}", remotePeer.toString()); + } + });; completableFuture.whenComplete((peerConnection, throwable) -> { if (throwable == null) { + remotePeer.setPkiId(peerConnection.remoteIdentifier()); + peerConnection.setRemotePeer(remotePeer); dispatchConnect(peerConnection); } }); return completableFuture; } - /** - * 初始化远程连接 - */ - private CompletableFuture initiateOutboundConnection(final RemotePeer remotePeer) { - logger.trace("Initiating connection to peer: {}:{}", remotePeer.ip(), remotePeer.listeningPort()); - - return connectionInitializer - .connect(remotePeer) - .whenComplete((conn, err) -> { - if (err != null) { - logger.debug("Failed to connect to peer {}: {}", remotePeer.ip(), err.getMessage()); - } else { - logger.debug("Outbound connection established to peer: {}", remotePeer.ip()); - } - }); - } - - private void setupListeners() { - connectionInitializer.subscribeIncomingConnect(this::handleIncomingConnection); - } - - private void handleIncomingConnection(final PeerConnection peerConnection) { - dispatchConnect(peerConnection); - } - /** * 连接完成后调用注册的回调 */ private void dispatchConnect(final PeerConnection connection) { - connectSubscribers.forEach(c -> c.onConnect(connection)); + connectionEvents.dispatchConnect(connection); + } + + /** + * 连接断开后调用注册的回调 + */ + private void dispatchDisconnect(final PeerConnection connection) { + connectionEvents.dispatchDisconnect(connection); + } + + /** + * 收到消息后调用注册的回调 + */ + private void dispatchMessage(final PeerConnection connection, final RawMessage message) { + connectionEvents.dispatchMessage(connection, message); + } + + /** + * 收到指定code消息后调用指定回调 + */ + private void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) { + connectionEvents.dispatchMessageByCode(code, connection, message); } } diff --git a/src/main/java/org/codenil/comm/ConnectionStore.java b/src/main/java/org/codenil/comm/ConnectionStore.java deleted file mode 100644 index 06ef13d..0000000 --- a/src/main/java/org/codenil/comm/ConnectionStore.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.codenil.comm; - -import org.codenil.comm.connections.PeerConnection; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ConnectionStore { - - private final Map pki2Conn = new ConcurrentHashMap<>(); - - public void registerConnection(PeerConnection peerConnection) { - pki2Conn.put(peerConnection.peerIdentity(), peerConnection); - System.out.println(peerConnection.peerIdentity()); - } - - public boolean unRegisterConnection(PeerConnection peerConnection) { - return pki2Conn.remove(peerConnection.peerIdentity(), peerConnection); - } - - public PeerConnection getConnection(String peerIdentity) { - return pki2Conn.get(peerIdentity); - } -} - diff --git a/src/main/java/org/codenil/comm/DataUpdateMessage.java b/src/main/java/org/codenil/comm/DataUpdateMessage.java deleted file mode 100644 index 966597a..0000000 --- a/src/main/java/org/codenil/comm/DataUpdateMessage.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.codenil.comm; - -import org.codenil.comm.message.AbstractMessage; -import org.codenil.comm.message.Message; -import org.codenil.comm.message.MessageCodes; - -public class DataUpdateMessage extends AbstractMessage { - - public DataUpdateMessage(byte[] data) { - super(data); - } - - public static DataUpdateMessage readFrom(final Message message) { - if (message instanceof DataUpdateMessage) { - return (DataUpdateMessage) message; - } - return new DataUpdateMessage(message.getData()); - } - - public static DataUpdateMessage create(final String data) { - return new DataUpdateMessage(data.getBytes()); - } - - @Override - public int getCode() { - return MessageCodes.DATA_UPDATE; - } - -} diff --git a/src/main/java/org/codenil/comm/NetworkService.java b/src/main/java/org/codenil/comm/NetworkService.java index 30a78b8..983799e 100644 --- a/src/main/java/org/codenil/comm/NetworkService.java +++ b/src/main/java/org/codenil/comm/NetworkService.java @@ -1,26 +1,21 @@ package org.codenil.comm; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.tuweni.bytes.Bytes; +import org.codenil.comm.callback.ConnectCallback; +import org.codenil.comm.callback.DisconnectCallback; +import org.codenil.comm.callback.MessageCallback; import org.codenil.comm.connections.*; -import org.codenil.comm.message.DefaultMessage; import org.codenil.comm.message.DisconnectReason; import org.codenil.comm.message.Message; -import org.codenil.comm.message.RawMessage; import org.codenil.comm.netty.NettyConnectionInitializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigInteger; import java.time.Clock; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; public class NetworkService { @@ -29,30 +24,26 @@ public class NetworkService { private final CountDownLatch shutdown = new CountDownLatch(1); private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); - - private final Map> listenersByCode = new ConcurrentHashMap<>(); - private final Map messageResponseByCode = new ConcurrentHashMap<>(); - - private final AtomicInteger outstandingRequests = new AtomicInteger(0); - private final AtomicLong requestIdCounter = new AtomicLong(1); + private final Map aliveConnections = new ConcurrentHashMap<>(); + private final Map deadConnections = new ConcurrentHashMap<>(); private final Clock clock = Clock.systemUTC(); - private final PeerReputation reputation = new PeerReputation(); - private final ConnectionStore connectionStore = new ConnectionStore(); private final Communication communication; - private volatile long lastRequestTimestamp = 0; - - public NetworkService(final NetworkConfig networkConfig) { + public NetworkService( + final NetworkConfig networkConfig, + final String selfIdentifier) { PeerConnectionEvents connectionEvents = new PeerConnectionEvents(); - ConnectionInitializer connectionInitializer = new NettyConnectionInitializer(networkConfig, connectionEvents); + ConnectionInitializer connectionInitializer = new NettyConnectionInitializer(networkConfig, selfIdentifier, connectionEvents); this.communication = new Communication(connectionEvents, connectionInitializer); } + /** + * 启动网络服务 + */ public CompletableFuture start() { if (started.compareAndSet(false, true)) { logger.info("Starting Network."); - setupHandlers(); return communication.start(); } else { logger.error("Attempted to start already running network."); @@ -60,6 +51,9 @@ public class NetworkService { } } + /** + * 停止网络服务 + */ public CompletableFuture stop() { if (stopped.compareAndSet(false, true)) { logger.info("Stopping Network."); @@ -73,107 +67,18 @@ public class NetworkService { } } + /** + * 连接到远程节点 + */ public CompletableFuture connect(final RemotePeer remotePeer) { return communication.connect(remotePeer); } - public void sendRequest(final String peerIdentity, final Message message) { - lastRequestTimestamp = clock.millis(); - this.dispatchRequest( - msg -> { - try { - PeerConnection connection = connectionStore.getConnection(peerIdentity); - if(Objects.nonNull(connection)) { - connection.send(msg); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - message); - } - - public long subscribe(final int messageCode, final MessageCallback callback) { - return listenersByCode - .computeIfAbsent(messageCode, _ -> Subscribers.create()) - .subscribe(callback); - } - - public void unsubscribe(final long subscriptionId, final int messageCode) { - if (listenersByCode.containsKey(messageCode)) { - listenersByCode.get(messageCode).unsubscribe(subscriptionId); - if (listenersByCode.get(messageCode).getSubscriberCount() < 1) { - listenersByCode.remove(messageCode); - } - } - } - - public void registerResponse( - final int messageCode, final MessageResponse messageResponse) { - messageResponseByCode.put(messageCode, messageResponse); - } - - private void setupHandlers() { - communication.subscribeConnect(this::registerNewConnection); - communication.subscribeMessage(message -> { - try { - this.receiveMessage(message); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } - }); - } - /** - * 注册新连接 + * 断开连接 */ - private void registerNewConnection(final PeerConnection newConnection) { - synchronized (this) { - connectionStore.registerConnection(newConnection); - } - } - - private boolean registerDisconnect(final PeerConnection connection) { - return connectionStore.unRegisterConnection(connection); - } - - private void dispatchRequest(final RequestSender sender, final Message message) { - outstandingRequests.incrementAndGet(); - sender.send(message.wrapMessage(requestIdCounter.getAndIncrement() + "")); - } - - private void receiveMessage(final DefaultMessage message) throws Exception { - - //处理自定义回复处理器 - Optional maybeResponse = Optional.empty(); - try { - final int code = message.message().getCode(); - Optional.ofNullable(listenersByCode.get(code)) - .ifPresent(listeners -> listeners.forEach(messageCallback -> messageCallback.exec(message))); - - Message requestIdAndEthMessage = message.message(); - maybeResponse = Optional.ofNullable(messageResponseByCode.get(code)) - .map(messageResponse -> messageResponse.response(message)) - .map(responseData -> responseData.wrapMessage(requestIdAndEthMessage.getRequestId())); - } catch (Exception e) { - logger.atDebug() - .setMessage("Received malformed message {}, {}") - .addArgument(message) - .addArgument(e::toString) - .log(); - this.disconnect(message.connection().peerIdentity(), DisconnectReason.UNKNOWN); - } - - maybeResponse.ifPresent( - responseData -> { - try { - sendRequest(message.connection().peerIdentity(), responseData); - } catch (Exception __) {} - }); - } - - private void disconnect(final String peerIdentity, final DisconnectReason reason) { - PeerConnection connection = connectionStore.getConnection(peerIdentity); + public void disconnect(final String pkiId, final DisconnectReason reason) { + PeerConnection connection = aliveConnections.get(pkiId); if(Objects.nonNull(connection)) { try { connection.disconnect(reason); @@ -183,40 +88,78 @@ public class NetworkService { } } - private void recordRequestTimeout(final PeerConnection connection, final int requestCode) { - logger.atDebug() - .setMessage("Timed out while waiting for response") - .log(); - logger.trace("Timed out while waiting for response from peer {}", this); - reputation.recordRequestTimeout(requestCode) - .ifPresent(reason -> { - this.disconnect(connection.peerIdentity(), reason); - }); + /** + * 发送消息 + * @param pkiId 远程节点的PKIID + * @param message 发送的信息 + */ + public void send(final String pkiId, final Message message) { + try { + PeerConnection connection = aliveConnections.get(pkiId); + if (Objects.nonNull(connection)) { + connection.send(message); + } + } catch (Exception e) { + throw new RuntimeException(e); + } } - private void recordUselessResponse(final PeerConnection connection, final String requestType) { - logger.atTrace() - .setMessage("Received useless response for request type {}") - .addArgument(requestType) - .log(); - reputation.recordUselessResponse(System.currentTimeMillis()) - .ifPresent(reason -> { - this.disconnect(connection.peerIdentity(), reason); - }); + /** + * 订阅消息 + */ + public void subscribeMessage(final MessageCallback callback) { + //订阅消息,全量订阅 + communication.subscribeMessage(message -> { + synchronized (this) { + callback.onMessage(message); + } + }); } - @FunctionalInterface - public interface RequestSender { - void send(final RawMessage message); + /** + * 订阅连接 + */ + public void subscribeConnect(final ConnectCallback callback) { + //订阅连接成功 + communication.subscribeConnect(newConnection -> { + synchronized (this) { + callback.onConnect(newConnection); + aliveConnections.put(newConnection.pkiId(), newConnection); + deadConnections.remove(newConnection.pkiId(), newConnection); + } + }); } - @FunctionalInterface - public interface MessageCallback { - void exec(DefaultMessage message); + /** + * 订阅断开 + */ + public void subscribeDisconnect(final DisconnectCallback callback) { + //订阅断开连接 + communication.subscribeDisconnect(connection -> { + synchronized (this) { + callback.onDisconnect(connection); + aliveConnections.remove(connection.pkiId(), connection); + deadConnections.put(connection.pkiId(), connection); + } + }); } - @FunctionalInterface - public interface MessageResponse { - Message response(DefaultMessage message); + /** + * 订阅指定code的消息 + */ + public void subscribeMessageByCode(final int code, final MessageCallback callback) { + communication.subscribeByCode(code, message -> { + synchronized (this) { + callback.onMessage(message); + } + }); } + + /** + * 获取所有存活的连接 + */ + public List aliveConnections() { + return new ArrayList<>(aliveConnections.values()); + } + } diff --git a/src/main/java/org/codenil/comm/PeerReputation.java b/src/main/java/org/codenil/comm/PeerReputation.java deleted file mode 100644 index 2798329..0000000 --- a/src/main/java/org/codenil/comm/PeerReputation.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.codenil.comm; - -import org.codenil.comm.message.DisconnectReason; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.util.Map; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.google.common.base.Preconditions.checkArgument; - -public class PeerReputation implements Comparable { - private static final long USELESS_RESPONSE_WINDOW_IN_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); - private static final int DEFAULT_MAX_SCORE = 150; - private static final int DEFAULT_INITIAL_SCORE = 100; - private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class); - private static final int TIMEOUT_THRESHOLD = 5; - private static final int USELESS_RESPONSE_THRESHOLD = 5; - private static final int SMALL_ADJUSTMENT = 1; - private static final int LARGE_ADJUSTMENT = 10; - - private final ConcurrentMap timeoutCountByRequestType = new ConcurrentHashMap<>(); - private final Queue uselessResponseTimes = new ConcurrentLinkedQueue<>(); - - private int score; - - private final int maxScore; - - public PeerReputation() { - this(DEFAULT_INITIAL_SCORE, DEFAULT_MAX_SCORE); - } - - public PeerReputation(final int initialScore, final int maxScore) { - checkArgument( - initialScore <= maxScore, "Initial score must be less than or equal to max score"); - this.maxScore = maxScore; - this.score = initialScore; - } - - public Optional recordRequestTimeout(final int requestCode) { - final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet(); - if (newTimeoutCount >= TIMEOUT_THRESHOLD) { - LOG.debug( - "Disconnection triggered by {} repeated timeouts for requestCode {}", - newTimeoutCount, - requestCode); - score -= LARGE_ADJUSTMENT; - return Optional.of(DisconnectReason.TIMEOUT); - } else { - score -= SMALL_ADJUSTMENT; - return Optional.empty(); - } - } - - public void resetTimeoutCount(final int requestCode) { - timeoutCountByRequestType.remove(requestCode); - } - - private AtomicInteger getOrCreateTimeoutCount(final int requestCode) { - return timeoutCountByRequestType.computeIfAbsent(requestCode, code -> new AtomicInteger()); - } - - public Map timeoutCounts() { - return timeoutCountByRequestType; - } - - public Optional recordUselessResponse(final long timestamp) { - uselessResponseTimes.add(timestamp); - while (shouldRemove(uselessResponseTimes.peek(), timestamp)) { - uselessResponseTimes.poll(); - } - if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) { - score -= LARGE_ADJUSTMENT; - LOG.debug("Disconnection triggered by exceeding useless response threshold"); - return Optional.of(DisconnectReason.UNKNOWN); - } else { - score -= SMALL_ADJUSTMENT; - return Optional.empty(); - } - } - - public void recordUsefulResponse() { - if (score < maxScore) { - score = Math.min(maxScore, score + SMALL_ADJUSTMENT); - } - } - - private boolean shouldRemove(final Long timestamp, final long currentTimestamp) { - return timestamp != null && timestamp + USELESS_RESPONSE_WINDOW_IN_MILLIS < currentTimestamp; - } - - @Override - public String toString() { - return String.format( - "PeerReputation score: %d, timeouts: %s, useless: %s", - score, timeoutCounts(), uselessResponseTimes.size()); - } - - @Override - public int compareTo(final @Nonnull PeerReputation otherReputation) { - return Integer.compare(this.score, otherReputation.score); - } - - public int getScore() { - return score; - } -} \ No newline at end of file diff --git a/src/main/java/org/codenil/comm/RemotePeer.java b/src/main/java/org/codenil/comm/RemotePeer.java index 83d2be8..1b2218d 100644 --- a/src/main/java/org/codenil/comm/RemotePeer.java +++ b/src/main/java/org/codenil/comm/RemotePeer.java @@ -1,32 +1,30 @@ package org.codenil.comm; -import java.net.InetSocketAddress; - public class RemotePeer { - private final String peerIdentity; + private String pkiId; - private final String ip; - - private final int listeningPort; + private final String endpoint; public RemotePeer( - final String ip, - final int listeningPort) { - this.ip = ip; - this.listeningPort = listeningPort; - this.peerIdentity = new InetSocketAddress(ip, listeningPort).toString(); + final String endpoint) { + this.endpoint = endpoint; } - public String peerIdentity() { - return peerIdentity; + public String pkiId() { + return pkiId; } - public String ip() { - return ip; + public String endpoint() { + return endpoint; } - public int listeningPort() { - return listeningPort; + public void setPkiId(String pkiId) { + this.pkiId = pkiId; } + + public String toString() { + return String.format("%s, pkiId:%s", this.endpoint(), this.pkiId); + } + } diff --git a/src/main/java/org/codenil/comm/Test.java b/src/main/java/org/codenil/comm/Test.java new file mode 100644 index 0000000..310922d --- /dev/null +++ b/src/main/java/org/codenil/comm/Test.java @@ -0,0 +1,64 @@ +package org.codenil.comm; + +import org.codenil.comm.connections.PeerConnection; +import org.codenil.comm.message.MessageCodes; +import org.codenil.comm.message.RawMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; + +public class Test { + + private static final Logger logger = LoggerFactory.getLogger(Communication.class); + + public static void main(String[] args) throws Exception { + + //server(); + + client(); + } + + private static void server() throws Exception { + NetworkConfig config = new NetworkConfig(); + config.setBindHost("192.168.8.30"); + config.setBindPort(8080); + + NetworkService service = new NetworkService(config, ""); + service.start(); + CompletableFuture start = service.start(); + + start.whenComplete((res, err) -> { + service.subscribeMessageByCode(MessageCodes.GOSSIP, message -> { + logger.info("接收到消息:" + message.message().code()); + }); + }); + } + + private static void client() throws Exception { + NetworkConfig config = new NetworkConfig(); + config.setBindHost("192.168.8.30"); + config.setBindPort(8090); + + NetworkService service = new NetworkService(config, ""); + CompletableFuture start = service.start(); + + start.whenComplete((res, err) -> { + RemotePeer remotePeer = new RemotePeer("192.168.8.30:8080"); + CompletableFuture conn = service.connect(remotePeer); + + conn.whenComplete((cres, cerr) -> { + if (cerr == null) { + try { + RawMessage rawMessage = RawMessage.create(MessageCodes.GOSSIP); + rawMessage.setData("test".getBytes(StandardCharsets.UTF_8)); + service.send(remotePeer.pkiId(), rawMessage); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + }); + } +} diff --git a/src/main/java/org/codenil/comm/connections/ConnectCallback.java b/src/main/java/org/codenil/comm/callback/ConnectCallback.java similarity index 59% rename from src/main/java/org/codenil/comm/connections/ConnectCallback.java rename to src/main/java/org/codenil/comm/callback/ConnectCallback.java index 463c560..3f8ba46 100644 --- a/src/main/java/org/codenil/comm/connections/ConnectCallback.java +++ b/src/main/java/org/codenil/comm/callback/ConnectCallback.java @@ -1,4 +1,6 @@ -package org.codenil.comm.connections; +package org.codenil.comm.callback; + +import org.codenil.comm.connections.PeerConnection; /** * 连接回调 diff --git a/src/main/java/org/codenil/comm/callback/DisconnectCallback.java b/src/main/java/org/codenil/comm/callback/DisconnectCallback.java new file mode 100644 index 0000000..661f6d6 --- /dev/null +++ b/src/main/java/org/codenil/comm/callback/DisconnectCallback.java @@ -0,0 +1,8 @@ +package org.codenil.comm.callback; + +import org.codenil.comm.connections.PeerConnection; + +@FunctionalInterface +public interface DisconnectCallback { + void onDisconnect(final PeerConnection connection); +} diff --git a/src/main/java/org/codenil/comm/callback/MessageCallback.java b/src/main/java/org/codenil/comm/callback/MessageCallback.java new file mode 100644 index 0000000..432ebaf --- /dev/null +++ b/src/main/java/org/codenil/comm/callback/MessageCallback.java @@ -0,0 +1,9 @@ +package org.codenil.comm.callback; + +import org.codenil.comm.message.DefaultMessage; + +@FunctionalInterface +public interface MessageCallback { + + void onMessage(final DefaultMessage message); +} diff --git a/src/main/java/org/codenil/comm/callback/ResponseCallback.java b/src/main/java/org/codenil/comm/callback/ResponseCallback.java new file mode 100644 index 0000000..b9b27ef --- /dev/null +++ b/src/main/java/org/codenil/comm/callback/ResponseCallback.java @@ -0,0 +1,9 @@ +package org.codenil.comm.callback; + +import org.codenil.comm.message.DefaultMessage; +import org.codenil.comm.message.Message; + +@FunctionalInterface +public interface ResponseCallback { + Message response(DefaultMessage message); +} diff --git a/src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java b/src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java index 935636a..833a93c 100644 --- a/src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java +++ b/src/main/java/org/codenil/comm/connections/AbstractPeerConnection.java @@ -1,23 +1,37 @@ package org.codenil.comm.connections; -import org.codenil.comm.message.DisconnectMessage; -import org.codenil.comm.message.DisconnectReason; +import io.netty.channel.ChannelHandler; +import org.codenil.comm.RemotePeer; import org.codenil.comm.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; public abstract class AbstractPeerConnection implements PeerConnection { private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class); - protected final PeerConnectionEvents connectionEvents; - private final AtomicBoolean disconnected = new AtomicBoolean(false); - private final AtomicBoolean terminatedImmediately = new AtomicBoolean(false); + private final String remoteIdentifier; - protected AbstractPeerConnection(final PeerConnectionEvents connectionEvents) { - this.connectionEvents = connectionEvents; + protected final AtomicBoolean disconnected = new AtomicBoolean(false); + + protected final AtomicBoolean terminatedImmediately = new AtomicBoolean(false); + + private RemotePeer remotePeer; + + protected AbstractPeerConnection( + final String remoteIdentifier) { + this.remoteIdentifier = remoteIdentifier; + } + + @Override + public String pkiId() { + if(Objects.isNull(remotePeer)) { + throw new IllegalStateException("connection not complated yet"); + } + return remotePeer.pkiId(); } @Override @@ -26,32 +40,34 @@ public abstract class AbstractPeerConnection implements PeerConnection { } @Override - public void terminateConnection() { - if (terminatedImmediately.compareAndSet(false, true)) { - if (disconnected.compareAndSet(false, true)) { - connectionEvents.dispatchDisconnect(this); - } - // Always ensure the context gets closed immediately even if we previously sent a disconnect - // message and are waiting to close. - closeConnectionImmediately(); - logger.atTrace() - .setMessage("Terminating connection, reason {}") - .addArgument(this) - .log(); - } + public String remoteIdentifier() { + return remoteIdentifier; } @Override - public void disconnect(DisconnectReason reason) { - if (disconnected.compareAndSet(false, true)) { - connectionEvents.dispatchDisconnect(this); - doSendMessage(DisconnectMessage.create(reason)); - closeConnection(); - } + public boolean disconnected() { + return disconnected.get() || terminatedImmediately.get(); + } + + @Override + public RemotePeer remotePeer() { + return remotePeer; + } + + @Override + public void setRemotePeer(RemotePeer remotePeer) { + this.remotePeer = remotePeer; + } + + @Override + public void replaceHandler(String name, ChannelHandler newHandler) { + doReplaceHandler(name, newHandler); } protected abstract void doSendMessage(final Message message); + protected abstract void doReplaceHandler(String name, ChannelHandler newHandler); + protected abstract void closeConnection(); protected abstract void closeConnectionImmediately(); diff --git a/src/main/java/org/codenil/comm/connections/ConnectionInitializer.java b/src/main/java/org/codenil/comm/connections/ConnectionInitializer.java index 9e21ea4..7f41995 100644 --- a/src/main/java/org/codenil/comm/connections/ConnectionInitializer.java +++ b/src/main/java/org/codenil/comm/connections/ConnectionInitializer.java @@ -11,8 +11,6 @@ public interface ConnectionInitializer { CompletableFuture stop(); - void subscribeIncomingConnect(final ConnectCallback callback); - CompletableFuture connect(RemotePeer remotePeer); } diff --git a/src/main/java/org/codenil/comm/connections/KeepAlive.java b/src/main/java/org/codenil/comm/connections/KeepAlive.java index b8d880d..9d72e39 100644 --- a/src/main/java/org/codenil/comm/connections/KeepAlive.java +++ b/src/main/java/org/codenil/comm/connections/KeepAlive.java @@ -4,12 +4,12 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; + import org.codenil.comm.message.DisconnectReason; import org.codenil.comm.message.PingMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class KeepAlive extends ChannelDuplexHandler { @@ -28,8 +28,7 @@ public class KeepAlive extends ChannelDuplexHandler { } @Override - public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) - throws IOException { + public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) { if (!(evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE)) { return; diff --git a/src/main/java/org/codenil/comm/connections/PeerConnection.java b/src/main/java/org/codenil/comm/connections/PeerConnection.java index b1fff5d..83b65df 100644 --- a/src/main/java/org/codenil/comm/connections/PeerConnection.java +++ b/src/main/java/org/codenil/comm/connections/PeerConnection.java @@ -1,14 +1,26 @@ package org.codenil.comm.connections; +import io.netty.channel.ChannelHandler; +import org.codenil.comm.RemotePeer; import org.codenil.comm.message.DisconnectReason; import org.codenil.comm.message.Message; public interface PeerConnection { - String peerIdentity(); + String pkiId(); + + String remoteIdentifier(); + + boolean disconnected(); + + RemotePeer remotePeer(); + + void setRemotePeer(RemotePeer remotePeer); void send(final Message message) throws Exception; + void replaceHandler(String name, ChannelHandler newHandler); + void disconnect(DisconnectReason reason) throws Exception; void terminateConnection(); diff --git a/src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java b/src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java index fb93f46..dd2cf2e 100644 --- a/src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java +++ b/src/main/java/org/codenil/comm/connections/PeerConnectionEvents.java @@ -1,15 +1,50 @@ package org.codenil.comm.connections; -import org.codenil.comm.message.*; +import org.codenil.comm.callback.ConnectCallback; +import org.codenil.comm.callback.DisconnectCallback; +import org.codenil.comm.callback.MessageCallback; +import org.codenil.comm.message.DefaultMessage; +import org.codenil.comm.message.RawMessage; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class PeerConnectionEvents { + /** 连接回调订阅 */ + private final Subscribers connectSubscribers = Subscribers.create(true); + private final Subscribers disconnectSubscribers = Subscribers.create(true); private final Subscribers messageSubscribers = Subscribers.create(true); + private final Map> subscribersByCode = new ConcurrentHashMap<>(); + public PeerConnectionEvents() {} + public void subscribeConnect(final ConnectCallback callback) { + connectSubscribers.subscribe(callback); + } + + public void subscribeDisconnect(final DisconnectCallback callback) { + disconnectSubscribers.subscribe(callback); + } + + public void subscribeMessage(final MessageCallback callback) { + messageSubscribers.subscribe(callback); + } + + public void subscribeByCode(final int messageCode, final MessageCallback callback) { + subscribersByCode + .computeIfAbsent(messageCode, _ -> Subscribers.create()) + .subscribe(callback); + } + + public void dispatchConnect( + final PeerConnection connection) { + connectSubscribers.forEach(s -> s.onConnect(connection)); + } + public void dispatchDisconnect( final PeerConnection connection) { disconnectSubscribers.forEach(s -> s.onDisconnect(connection)); @@ -20,11 +55,8 @@ public class PeerConnectionEvents { messageSubscribers.forEach(s -> s.onMessage(msg)); } - public void subscribeDisconnect(final DisconnectCallback callback) { - disconnectSubscribers.subscribe(callback); - } - - public void subscribeMessage(final MessageCallback callback) { - messageSubscribers.subscribe(callback); + public void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) { + final DefaultMessage msg = new DefaultMessage(connection, message); + subscribersByCode.get(code).forEach(s -> s.onMessage(msg)); } } diff --git a/src/main/java/org/codenil/comm/netty/handler/CommonHandler.java b/src/main/java/org/codenil/comm/handler/CommonHandler.java similarity index 94% rename from src/main/java/org/codenil/comm/netty/handler/CommonHandler.java rename to src/main/java/org/codenil/comm/handler/CommonHandler.java index dee1b54..32d4691 100644 --- a/src/main/java/org/codenil/comm/netty/handler/CommonHandler.java +++ b/src/main/java/org/codenil/comm/handler/CommonHandler.java @@ -1,10 +1,10 @@ -package org.codenil.comm.netty.handler; +package org.codenil.comm.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; + import org.codenil.comm.connections.PeerConnection; import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.message.Message; import org.codenil.comm.message.MessageCodes; import org.codenil.comm.message.PongMessage; import org.codenil.comm.message.RawMessage; @@ -32,8 +32,8 @@ public class CommonHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(final ChannelHandlerContext ctx, final RawMessage originalMessage) { - logger.debug("Received a message from {}", originalMessage.getCode()); - switch (originalMessage.getCode()) { + logger.debug("Received a message from {}", originalMessage.code()); + switch (originalMessage.code()) { case MessageCodes.PING: logger.trace("Received Wire PING"); try { diff --git a/src/main/java/org/codenil/comm/netty/handler/MessageFrameDecoder.java b/src/main/java/org/codenil/comm/handler/MessageFrameDecoder.java similarity index 69% rename from src/main/java/org/codenil/comm/netty/handler/MessageFrameDecoder.java rename to src/main/java/org/codenil/comm/handler/MessageFrameDecoder.java index 0759ff6..42ab7cc 100644 --- a/src/main/java/org/codenil/comm/netty/handler/MessageFrameDecoder.java +++ b/src/main/java/org/codenil/comm/handler/MessageFrameDecoder.java @@ -1,10 +1,11 @@ -package org.codenil.comm.netty.handler; +package org.codenil.comm.handler; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DecoderException; import io.netty.handler.timeout.IdleStateHandler; + import org.codenil.comm.connections.KeepAlive; import org.codenil.comm.connections.PeerConnection; import org.codenil.comm.connections.PeerConnectionEvents; @@ -27,16 +28,16 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { private static final Logger logger = LoggerFactory.getLogger(MessageFrameDecoder.class); - private final CompletableFuture connectFuture; + private final CompletableFuture connectionFuture; private final PeerConnectionEvents connectionEvents; private boolean hellosExchanged; public MessageFrameDecoder( final PeerConnectionEvents connectionEvents, - final CompletableFuture connectFuture) { + final CompletableFuture connectionFuture) { this.connectionEvents = connectionEvents; - this.connectFuture = connectFuture; + this.connectionFuture = connectionFuture; } @Override @@ -70,14 +71,17 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { byteBuf.readBytes(data); // 创建消息对象 - RawMessage message = new RawMessage(code, data); + RawMessage message = RawMessage.create(code); message.setRequestId(id); + message.setData(data); if (hellosExchanged) { out.add(message); - } else if (message.getCode() == MessageCodes.HELLO) { + } else if (message.code() == MessageCodes.HELLO) { hellosExchanged = true; - final PeerConnection connection = new NettyPeerConnection(ctx, connectionEvents); + + String remoteIdentifier = new String(message.data()); + final PeerConnection connection = new NettyPeerConnection(ctx, remoteIdentifier, connectionEvents); /* * 如果收到的消息是Hello消息 @@ -89,24 +93,27 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { final AtomicBoolean waitingForPong = new AtomicBoolean(false); ctx.channel() .pipeline() - .addLast(new IdleStateHandler(15, 0, 0), - new KeepAlive(connection, waitingForPong), - new CommonHandler(connection, connectionEvents, waitingForPong), - new MessageFrameEncoder()); - connectFuture.complete(connection); - } else if (message.getCode() == MessageCodes.DISCONNECT) { + .addLast("IdleState", new IdleStateHandler(15, 0, 0)) + .addLast("KeepAlive", new KeepAlive(connection, waitingForPong)) + .addLast("Common", new CommonHandler(connection, connectionEvents, waitingForPong)) + .addLast("FrameEncoder", new MessageFrameEncoder()); + connectionFuture.complete(connection); + } else if (message.code() == MessageCodes.DISCONNECT) { logger.debug("Disconnected before sending HELLO."); ctx.close(); - connectFuture.completeExceptionally(new RuntimeException("Disconnect")); + connectionFuture.completeExceptionally(new RuntimeException("Disconnect")); } else { logger.debug( "Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}", - message.getCode(), Arrays.toString(message.getData())); + message.code(), Arrays.toString(message.data())); DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.UNKNOWN); - ctx.writeAndFlush(new RawMessage(disconnectMessage.getCode(), disconnectMessage.getData())) - .addListener((f) -> ctx.close()); - connectFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged")); + + RawMessage rawMessage = RawMessage.create(disconnectMessage.code()); + rawMessage.setData(disconnectMessage.data()); + ctx.writeAndFlush(rawMessage) + .addListener(_ -> ctx.close()); + connectionFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged")); } } @@ -119,8 +126,8 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { : throwable; if (cause instanceof IllegalArgumentException) { logger.debug("Invalid incoming message ", throwable); - if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) { - connectFuture.get().disconnect(DisconnectReason.INVALID_MESSAGE_RECEIVED); + if (connectionFuture.isDone() && !connectionFuture.isCompletedExceptionally()) { + connectionFuture.get().disconnect(DisconnectReason.INVALID_MESSAGE_RECEIVED); return; } } else if (cause instanceof IOException) { @@ -129,10 +136,10 @@ public class MessageFrameDecoder extends ByteToMessageDecoder { } else { logger.error("Exception while processing incoming message", throwable); } - if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) { - connectFuture.get().terminateConnection(); + if (connectionFuture.isDone() && !connectionFuture.isCompletedExceptionally()) { + connectionFuture.get().terminateConnection(); } else { - connectFuture.completeExceptionally(throwable); + connectionFuture.completeExceptionally(throwable); ctx.close(); } } diff --git a/src/main/java/org/codenil/comm/netty/handler/MessageFrameEncoder.java b/src/main/java/org/codenil/comm/handler/MessageFrameEncoder.java similarity index 76% rename from src/main/java/org/codenil/comm/netty/handler/MessageFrameEncoder.java rename to src/main/java/org/codenil/comm/handler/MessageFrameEncoder.java index adc8173..2a20fb1 100644 --- a/src/main/java/org/codenil/comm/netty/handler/MessageFrameEncoder.java +++ b/src/main/java/org/codenil/comm/handler/MessageFrameEncoder.java @@ -1,4 +1,4 @@ -package org.codenil.comm.netty.handler; +package org.codenil.comm.handler; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -18,12 +18,11 @@ public class MessageFrameEncoder extends MessageToByteEncoder { final ChannelHandlerContext ctx, final RawMessage msg, final ByteBuf out) { - byte[] idBytes = Optional.ofNullable(msg.getRequestId()).orElse("").getBytes(StandardCharsets.UTF_8); - + byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8); SerializeHelper builder = new SerializeHelper(); ByteBuf buf = builder.writeBytes(idBytes) - .writeInt(msg.getCode()) - .writeBytes(msg.getData()) + .writeInt(msg.code()) + .writeBytes(msg.data()) .build(); out.writeBytes(buf); diff --git a/src/main/java/org/codenil/comm/netty/handler/MessageHandler.java b/src/main/java/org/codenil/comm/handler/MessageHandler.java similarity index 97% rename from src/main/java/org/codenil/comm/netty/handler/MessageHandler.java rename to src/main/java/org/codenil/comm/handler/MessageHandler.java index 779fb2f..a72bc20 100644 --- a/src/main/java/org/codenil/comm/netty/handler/MessageHandler.java +++ b/src/main/java/org/codenil/comm/handler/MessageHandler.java @@ -1,4 +1,4 @@ -package org.codenil.comm.netty.handler; +package org.codenil.comm.handler; import io.netty.buffer.ByteBuf; import org.codenil.comm.handshake.PlainMessage; diff --git a/src/main/java/org/codenil/comm/netty/handler/TimeoutHandler.java b/src/main/java/org/codenil/comm/handler/TimeoutHandler.java similarity index 96% rename from src/main/java/org/codenil/comm/netty/handler/TimeoutHandler.java rename to src/main/java/org/codenil/comm/handler/TimeoutHandler.java index 5442458..a081d5f 100644 --- a/src/main/java/org/codenil/comm/netty/handler/TimeoutHandler.java +++ b/src/main/java/org/codenil/comm/handler/TimeoutHandler.java @@ -1,4 +1,4 @@ -package org.codenil.comm.netty.handler; +package org.codenil.comm.handler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; diff --git a/src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java b/src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java index 0322395..f57d4a8 100644 --- a/src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java +++ b/src/main/java/org/codenil/comm/handshake/AbstractHandshakeHandler.java @@ -4,10 +4,14 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.MessageToByteEncoder; -import org.codenil.comm.connections.PeerConnectionEvents; -import org.codenil.comm.message.*; + import org.codenil.comm.connections.PeerConnection; -import org.codenil.comm.netty.handler.MessageFrameDecoder; +import org.codenil.comm.connections.PeerConnectionEvents; +import org.codenil.comm.handler.MessageFrameDecoder; +import org.codenil.comm.message.HelloMessage; +import org.codenil.comm.message.MessageCodes; +import org.codenil.comm.message.RawMessage; +import org.codenil.comm.serialize.SerializeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,12 +25,18 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl private final CompletableFuture connectionFuture; private final PeerConnectionEvents connectionEvents; + + /** 本机的标识 */ + private final String selfIdentifier; + protected final Handshaker handshaker; protected AbstractHandshakeHandler( + final String selfIdentifier, final CompletableFuture connectionFuture, final PeerConnectionEvents connectionEvents, final Handshaker handshaker) { + this.selfIdentifier = selfIdentifier; this.connectionFuture = connectionFuture; this.connectionEvents = connectionEvents; this.handshaker = handshaker; @@ -50,15 +60,17 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl */ ctx.channel() .pipeline() - .replace(this, "DeFramer", new MessageFrameDecoder(connectionEvents, connectionFuture)) - .addBefore("DeFramer", "validate", new FirstMessageFrameEncoder()); + .replace(this, "FrameDecoder", new MessageFrameDecoder(connectionEvents, connectionFuture)) + .addBefore("FrameDecoder", "validate", new FirstMessageFrameEncoder()); /* * 替换完编解码器后发送Hello消息 + * hello消息需要带一些数据 */ - HelloMessage helloMessage = HelloMessage.create(); - RawMessage rawMessage = new RawMessage(helloMessage.getCode(), helloMessage.getData()); - rawMessage.setRequestId(helloMessage.getRequestId()); + HelloMessage helloMessage = HelloMessage.create(selfIdentifier.getBytes(StandardCharsets.UTF_8)); + RawMessage rawMessage = RawMessage.create(helloMessage.code()); + rawMessage.setData(helloMessage.data()); + rawMessage.setRequestId(helloMessage.requestId()); ctx.writeAndFlush(rawMessage) .addListener(ff -> { if (ff.isSuccess()) { @@ -90,42 +102,19 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl final ChannelHandlerContext context, final RawMessage msg, final ByteBuf out) { - if (msg.getCode() != MessageCodes.HELLO) { - throw new IllegalStateException("First message sent wasn't a HELLO."); + if (msg.code() != MessageCodes.HELLO) { + throw new IllegalStateException("First wire message sent wasn't a HELLO."); } - byte[] idBytes = Optional.ofNullable(msg.getRequestId()).orElse("").getBytes(StandardCharsets.UTF_8); - int channelsSize = msg.getChannels().size(); + byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8); - int channelBytesLength = 0; - for (String channel : msg.getChannels()) { - byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8); - channelBytesLength = channelBytesLength + 4 + channelBytes.length; - } + SerializeHelper builder = new SerializeHelper(); + ByteBuf buf = builder.writeBytes(idBytes) + .writeInt(msg.code()) + .writeBytes(msg.data()) + .build(); - int payloadLength = 4 + 4 + idBytes.length + 4 + channelBytesLength + 4 + msg.getData().length; - - // 写入协议头:消息总长度 - out.writeInt(payloadLength + 4); - - // 写入payload - // 写入code - out.writeInt(msg.getCode()); - - // 写入id - out.writeInt(idBytes.length); - out.writeBytes(idBytes); - - // 写入channels - out.writeInt(channelsSize); - for (String channel : msg.getChannels()) { - byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8); - out.writeInt(channelBytes.length); - out.writeBytes(channelBytes); - } - - // 写入data - out.writeInt(msg.getData().length); - out.writeBytes(msg.getData()); + out.writeBytes(buf); + buf.release(); context.pipeline().remove(this); } } diff --git a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java b/src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java index 5cc2db9..9912cb4 100644 --- a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java +++ b/src/main/java/org/codenil/comm/handshake/HandshakeHandlerInbound.java @@ -13,10 +13,11 @@ import java.util.concurrent.CompletableFuture; public class HandshakeHandlerInbound extends AbstractHandshakeHandler { public HandshakeHandlerInbound( + final String selfIdentifier, final CompletableFuture connectionFuture, final PeerConnectionEvents connectionEvent, final Handshaker handshaker) { - super(connectionFuture, connectionEvent, handshaker); + super(selfIdentifier, connectionFuture, connectionEvent, handshaker); handshaker.prepareResponder(); } diff --git a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java b/src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java index 6e2a88b..8df6d78 100644 --- a/src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java +++ b/src/main/java/org/codenil/comm/handshake/HandshakeHandlerOutbound.java @@ -2,6 +2,7 @@ package org.codenil.comm.handshake; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; + import org.codenil.comm.connections.PeerConnection; import org.codenil.comm.connections.PeerConnectionEvents; import org.slf4j.Logger; @@ -20,10 +21,11 @@ public class HandshakeHandlerOutbound extends AbstractHandshakeHandler { private final ByteBuf first; public HandshakeHandlerOutbound( + final String selfIdentifier, final CompletableFuture connectionFuture, final PeerConnectionEvents connectionEvent, final Handshaker handshaker) { - super(connectionFuture, connectionEvent, handshaker); + super(selfIdentifier, connectionFuture, connectionEvent, handshaker); handshaker.prepareInitiator(); this.first = handshaker.firstMessage(); diff --git a/src/main/java/org/codenil/comm/handshake/HandshakeSecrets.java b/src/main/java/org/codenil/comm/handshake/HandshakeSecrets.java deleted file mode 100644 index 4292b47..0000000 --- a/src/main/java/org/codenil/comm/handshake/HandshakeSecrets.java +++ /dev/null @@ -1,113 +0,0 @@ -package org.codenil.comm.handshake; - -import org.apache.tuweni.bytes.Bytes; -import org.apache.tuweni.bytes.Bytes32; -import org.bouncycastle.crypto.digests.KeccakDigest; - -import java.util.Arrays; -import java.util.Objects; - -import static com.google.common.base.Preconditions.checkArgument; - -public class HandshakeSecrets { - private final byte[] aesSecret; - private final byte[] macSecret; - private final byte[] token; - private final KeccakDigest egressMac = new KeccakDigest(Bytes32.SIZE * 8); - private final KeccakDigest ingressMac = new KeccakDigest(Bytes32.SIZE * 8); - - public HandshakeSecrets(final byte[] aesSecret, final byte[] macSecret, final byte[] token) { - checkArgument(aesSecret.length == Bytes32.SIZE, "aes secret must be exactly 32 bytes long"); - checkArgument(macSecret.length == Bytes32.SIZE, "mac secret must be exactly 32 bytes long"); - checkArgument(token.length == Bytes32.SIZE, "token must be exactly 32 bytes long"); - - this.aesSecret = aesSecret; - this.macSecret = macSecret; - this.token = token; - } - - public HandshakeSecrets updateEgress(final byte[] bytes) { - egressMac.update(bytes, 0, bytes.length); - return this; - } - - public HandshakeSecrets updateIngress(final byte[] bytes) { - ingressMac.update(bytes, 0, bytes.length); - return this; - } - - @Override - public String toString() { - return "HandshakeSecrets{" - + "aesSecret=" - + Bytes.wrap(aesSecret) - + ", macSecret=" - + Bytes.wrap(macSecret) - + ", token=" - + Bytes.wrap(token) - + ", egressMac=" - + Bytes.wrap(snapshot(egressMac)) - + ", ingressMac=" - + Bytes.wrap(snapshot(ingressMac)) - + '}'; - } - - public byte[] getAesSecret() { - return aesSecret; - } - - public byte[] getMacSecret() { - return macSecret; - } - - public byte[] getToken() { - return token; - } - - public byte[] getEgressMac() { - return snapshot(egressMac); - } - - public byte[] getIngressMac() { - return snapshot(ingressMac); - } - - private static byte[] snapshot(final KeccakDigest digest) { - final byte[] out = new byte[Bytes32.SIZE]; - new KeccakDigest(digest).doFinal(out, 0); - return out; - } - - @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") // checked in delegated method - @Override - public boolean equals(final Object obj) { - return equals(obj, false); - } - - public boolean equals(final Object o, final boolean flipMacs) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final HandshakeSecrets that = (HandshakeSecrets) o; - final KeccakDigest vsEgress = flipMacs ? that.ingressMac : that.egressMac; - final KeccakDigest vsIngress = flipMacs ? that.egressMac : that.ingressMac; - return Arrays.equals(aesSecret, that.aesSecret) - && Arrays.equals(macSecret, that.macSecret) - && Arrays.equals(token, that.token) - && Arrays.equals(snapshot(egressMac), snapshot(vsEgress)) - && Arrays.equals(snapshot(ingressMac), snapshot(vsIngress)); - } - - @Override - public int hashCode() { - return Objects.hash( - Arrays.hashCode(aesSecret), - Arrays.hashCode(macSecret), - Arrays.hashCode(token), - egressMac, - ingressMac); - } -} diff --git a/src/main/java/org/codenil/comm/handshake/Handshaker.java b/src/main/java/org/codenil/comm/handshake/Handshaker.java index aac4ac3..00be08a 100644 --- a/src/main/java/org/codenil/comm/handshake/Handshaker.java +++ b/src/main/java/org/codenil/comm/handshake/Handshaker.java @@ -14,7 +14,5 @@ public interface Handshaker { ByteBuf firstMessage(); - HandshakeSecrets secrets(); - Optional handleMessage(ByteBuf buf); } diff --git a/src/main/java/org/codenil/comm/handshake/PlainHandshaker.java b/src/main/java/org/codenil/comm/handshake/PlainHandshaker.java index 5871ba4..88dffa5 100644 --- a/src/main/java/org/codenil/comm/handshake/PlainHandshaker.java +++ b/src/main/java/org/codenil/comm/handshake/PlainHandshaker.java @@ -2,8 +2,9 @@ package org.codenil.comm.handshake; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; + +import org.codenil.comm.handler.MessageHandler; import org.codenil.comm.message.MessageType; -import org.codenil.comm.netty.handler.MessageHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,6 @@ public class PlainHandshaker implements Handshaker { "illegal invocation of onMessage on handshake that is not in progress"); PlainMessage message = MessageHandler.parseMessage(buf); - Optional nextMsg = Optional.empty(); if (initiator) { checkState(responderMsg == null, @@ -84,9 +84,4 @@ public class PlainHandshaker implements Handshaker { logger.trace("Handshake status set to {}", status.get()); return nextMsg.map(Unpooled::wrappedBuffer); } - - @Override - public HandshakeSecrets secrets() { - return null; - } } diff --git a/src/main/java/org/codenil/comm/message/AbstractMessage.java b/src/main/java/org/codenil/comm/message/AbstractMessage.java index 18d416f..7682fcc 100644 --- a/src/main/java/org/codenil/comm/message/AbstractMessage.java +++ b/src/main/java/org/codenil/comm/message/AbstractMessage.java @@ -4,29 +4,28 @@ public abstract class AbstractMessage implements Message { private String requestId; - private byte[] data = new byte[]{}; - - public AbstractMessage( - final byte[] data) { - this.data = data; - } + private byte[] data; @Override - public String getRequestId() { + public String requestId() { return requestId; } @Override - public final int getSize() { + public final int size() { return data.length; } @Override - public byte[] getData() { + public byte[] data() { return data; } public void setRequestId(String requestId) { this.requestId = requestId; } + + public void setData(byte[] data) { + this.data = data; + } } diff --git a/src/main/java/org/codenil/comm/message/DisconnectMessage.java b/src/main/java/org/codenil/comm/message/DisconnectMessage.java index 02aef0b..2639e3a 100644 --- a/src/main/java/org/codenil/comm/message/DisconnectMessage.java +++ b/src/main/java/org/codenil/comm/message/DisconnectMessage.java @@ -1,31 +1,29 @@ package org.codenil.comm.message; -import com.google.gson.Gson; - public class DisconnectMessage extends AbstractMessage { private DisconnectMessage(final byte[] data) { - super(data); + super.setData(data); } public static DisconnectMessage create(final DisconnectReason reason) { - return new DisconnectMessage(new Gson().toJson(reason).getBytes()); + return new DisconnectMessage(new byte[]{reason.code()}); } public static DisconnectMessage readFrom(final Message message) { if (message instanceof DisconnectMessage) { return (DisconnectMessage) message; } - final int code = message.getCode(); + final int code = message.code(); if (code != MessageCodes.DISCONNECT) { throw new IllegalArgumentException( String.format("Message has code %d and thus is not a DisconnectMessage.", code)); } - return new DisconnectMessage(message.getData()); + return new DisconnectMessage(message.data()); } @Override - public int getCode() { + public int code() { return MessageCodes.DISCONNECT; } } diff --git a/src/main/java/org/codenil/comm/message/DisconnectReason.java b/src/main/java/org/codenil/comm/message/DisconnectReason.java index 213a5ee..d74aa4d 100644 --- a/src/main/java/org/codenil/comm/message/DisconnectReason.java +++ b/src/main/java/org/codenil/comm/message/DisconnectReason.java @@ -1,27 +1,33 @@ package org.codenil.comm.message; -import java.util.Optional; - public enum DisconnectReason { - UNKNOWN(null), + UNKNOWN((byte) 0x00, ""), - TIMEOUT((byte) 0x0b), + TIMEOUT((byte) 0x0b, ""), INVALID_MESSAGE_RECEIVED((byte) 0x02, "An exception was caught decoding message"), + ; - private final Optional code; - private final Optional message; + private final Byte code; + private final String message; DisconnectReason(final Byte code) { - this.code = Optional.ofNullable(code); - this.message = Optional.empty(); + this.code = code; + this.message = ""; } DisconnectReason(final Byte code, final String message) { - this.code = Optional.ofNullable(code); - this.message = Optional.of(message); + this.code = code; + this.message = message; } + public Byte code() { + return code; + } + + public String message() { + return message; + } } diff --git a/src/main/java/org/codenil/comm/message/EmptyMessage.java b/src/main/java/org/codenil/comm/message/EmptyMessage.java index bfd9a54..e8c50fd 100644 --- a/src/main/java/org/codenil/comm/message/EmptyMessage.java +++ b/src/main/java/org/codenil/comm/message/EmptyMessage.java @@ -3,17 +3,17 @@ package org.codenil.comm.message; public abstract class EmptyMessage implements Message { @Override - public final int getSize() { + public final int size() { return 0; } @Override - public byte[] getData() { + public byte[] data() { return new byte[]{}; } @Override public String toString() { - return getClass().getSimpleName() + "{ code=" + getCode() + ", size=0}"; + return getClass().getSimpleName() + "{ code=" + code() + ", size=0}"; } } diff --git a/src/main/java/org/codenil/comm/message/HelloMessage.java b/src/main/java/org/codenil/comm/message/HelloMessage.java index 401fb70..f642ab3 100644 --- a/src/main/java/org/codenil/comm/message/HelloMessage.java +++ b/src/main/java/org/codenil/comm/message/HelloMessage.java @@ -3,15 +3,15 @@ package org.codenil.comm.message; public class HelloMessage extends AbstractMessage { public HelloMessage(final byte[] data) { - super(data); + super.setData(data); } - public static HelloMessage create() { + public static HelloMessage create(byte[] bytes) { return new HelloMessage(new byte[0]); } @Override - public int getCode() { + public int code() { return MessageCodes.HELLO; } } diff --git a/src/main/java/org/codenil/comm/message/Message.java b/src/main/java/org/codenil/comm/message/Message.java index 3ee6f3d..c4cc6e8 100644 --- a/src/main/java/org/codenil/comm/message/Message.java +++ b/src/main/java/org/codenil/comm/message/Message.java @@ -2,17 +2,18 @@ package org.codenil.comm.message; public interface Message { - String getRequestId(); + String requestId(); - int getSize(); + int size(); - int getCode(); + int code(); - byte[] getData(); + byte[] data(); default RawMessage wrapMessage(final String requestId) { - RawMessage rawMessage = new RawMessage(getCode(), getData()); + RawMessage rawMessage = RawMessage.create(code()); rawMessage.setRequestId(requestId); + rawMessage.setData(data()); return rawMessage; } } diff --git a/src/main/java/org/codenil/comm/message/MessageCodes.java b/src/main/java/org/codenil/comm/message/MessageCodes.java index 6943556..ce7919c 100644 --- a/src/main/java/org/codenil/comm/message/MessageCodes.java +++ b/src/main/java/org/codenil/comm/message/MessageCodes.java @@ -6,7 +6,8 @@ public class MessageCodes { public static final int PING = 0x02; public static final int PONG = 0x03; - public static final int DATA_UPDATE = 0x04; + public static final int GOSSIP = 0x04; + private MessageCodes() {} public static String messageName(final int code) { diff --git a/src/main/java/org/codenil/comm/message/PingMessage.java b/src/main/java/org/codenil/comm/message/PingMessage.java index 579dd51..b214724 100644 --- a/src/main/java/org/codenil/comm/message/PingMessage.java +++ b/src/main/java/org/codenil/comm/message/PingMessage.java @@ -10,12 +10,12 @@ public class PingMessage extends EmptyMessage { private PingMessage() {} @Override - public String getRequestId() { + public String requestId() { return ""; } @Override - public int getCode() { + public int code() { return MessageCodes.PING; } diff --git a/src/main/java/org/codenil/comm/message/PongMessage.java b/src/main/java/org/codenil/comm/message/PongMessage.java index 715ebf6..e56a193 100644 --- a/src/main/java/org/codenil/comm/message/PongMessage.java +++ b/src/main/java/org/codenil/comm/message/PongMessage.java @@ -10,12 +10,12 @@ public class PongMessage extends EmptyMessage { private PongMessage() {} @Override - public String getRequestId() { + public String requestId() { return ""; } @Override - public int getCode() { + public int code() { return MessageCodes.PONG; } } diff --git a/src/main/java/org/codenil/comm/message/RawMessage.java b/src/main/java/org/codenil/comm/message/RawMessage.java index 2594a36..bd1f798 100644 --- a/src/main/java/org/codenil/comm/message/RawMessage.java +++ b/src/main/java/org/codenil/comm/message/RawMessage.java @@ -1,18 +1,56 @@ package org.codenil.comm.message; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.codenil.comm.serialize.SerializeHelper; + public class RawMessage extends AbstractMessage { private final int code; - public RawMessage( - final int code, - final byte[] data) { - super(data); + protected RawMessage(final int code) { this.code = code; } + public static RawMessage create( + final int code) { + return new RawMessage(code); + } + + public static RawMessage decode(byte[] messageBytes) { + ByteBuf buf = Unpooled.wrappedBuffer(messageBytes); + buf.readerIndex(0); + + int code = buf.readInt(); + + int requestIdLength = buf.readInt(); + byte[] requestIdBytes = new byte[requestIdLength]; + buf.readBytes(requestIdBytes); + + int dataLength = buf.readInt(); + byte[] dataBytes = new byte[dataLength]; + buf.readBytes(dataBytes); + + RawMessage rawMessage = create(code); + rawMessage.setRequestId(new String(requestIdBytes)); + rawMessage.setData(dataBytes); + return rawMessage; + } + + public static byte[] encode(RawMessage rawMessage) { + SerializeHelper converter = new SerializeHelper(); + + ByteBuf byteBuf = converter + .writeVersion("1.0") + .writeInt(rawMessage.code()) + .writeString(rawMessage.requestId()) + .writeBytes(rawMessage.data()) + .build(); + return converter.readPayload(byteBuf); + } + @Override - public int getCode() { + public int code() { return code; } } diff --git a/src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java b/src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java index d581cc3..ecead03 100644 --- a/src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java +++ b/src/main/java/org/codenil/comm/netty/NettyConnectionInitializer.java @@ -7,13 +7,18 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import org.codenil.comm.RemotePeer; -import org.codenil.comm.connections.*; import org.codenil.comm.NetworkConfig; +import org.codenil.comm.RemotePeer; +import org.codenil.comm.callback.ConnectCallback; +import org.codenil.comm.connections.ConnectionInitializer; +import org.codenil.comm.connections.PeerConnection; +import org.codenil.comm.connections.PeerConnectionEvents; +import org.codenil.comm.connections.Subscribers; +import org.codenil.comm.handler.TimeoutHandler; import org.codenil.comm.handshake.HandshakeHandlerInbound; import org.codenil.comm.handshake.HandshakeHandlerOutbound; import org.codenil.comm.handshake.PlainHandshaker; -import org.codenil.comm.netty.handler.TimeoutHandler; + import javax.annotation.Nonnull; import java.io.IOException; @@ -39,13 +44,17 @@ public class NettyConnectionInitializer implements ConnectionInitializer { private final AtomicBoolean stopped = new AtomicBoolean(false); private final NetworkConfig config; + private final String selfIdentifier; private ChannelFuture server; + public NettyConnectionInitializer( final NetworkConfig config, + final String selfIdentifier, final PeerConnectionEvents eventDispatcher) { this.config = config; + this.selfIdentifier = selfIdentifier; this.eventDispatcher = eventDispatcher; } @@ -111,11 +120,6 @@ public class NettyConnectionInitializer implements ConnectionInitializer { return stoppedFuture; } - @Override - public void subscribeIncomingConnect(ConnectCallback callback) { - connectSubscribers.subscribe(callback); - } - /** * 连接到远程 */ @@ -123,10 +127,12 @@ public class NettyConnectionInitializer implements ConnectionInitializer { public CompletableFuture connect(RemotePeer remotePeer) { final CompletableFuture connectionFuture = new CompletableFuture<>(); + String[] parts = remotePeer.endpoint().split(":"); + new Bootstrap() .group(workers) .channel(NioSocketChannel.class) - .remoteAddress(new InetSocketAddress(remotePeer.ip(), remotePeer.listeningPort())) + .remoteAddress(new InetSocketAddress(parts[0], Integer.parseInt(parts[1]))) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT_SECONDS * 1000) .handler(outboundChannelInitializer(remotePeer, connectionFuture)) @@ -141,6 +147,7 @@ public class NettyConnectionInitializer implements ConnectionInitializer { return connectionFuture; } + @Nonnull private ChannelInitializer inboundChannelInitializer() { return new ChannelInitializer<>() { @Override @@ -152,7 +159,7 @@ public class NettyConnectionInitializer implements ConnectionInitializer { //其他处理器,TLS之类的 addAdditionalInboundHandlers(ch); //握手消息处理器 - ch.pipeline().addLast(inboundHandler(connectionFuture)); + ch.pipeline().addLast(inboundHandler(selfIdentifier, connectionFuture)); } }; } @@ -164,11 +171,11 @@ public class NettyConnectionInitializer implements ConnectionInitializer { @Override protected void initChannel(final SocketChannel ch) throws Exception { //连接处理器 - ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.ip())); + ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.toString())); //其他处理器 addAdditionalOutboundHandlers(ch, remotePeer); //握手消息处理器 - ch.pipeline().addLast(outboundHandler(remotePeer, connectionFuture)); + ch.pipeline().addLast(outboundHandler(selfIdentifier, remotePeer, connectionFuture)); } }; } @@ -182,14 +189,19 @@ public class NettyConnectionInitializer implements ConnectionInitializer { @Nonnull private HandshakeHandlerInbound inboundHandler( + final String selfIdentifier, final CompletableFuture connectionFuture) { - return new HandshakeHandlerInbound(connectionFuture, eventDispatcher, new PlainHandshaker()); + return new HandshakeHandlerInbound(selfIdentifier, connectionFuture, + eventDispatcher, new PlainHandshaker()); } @Nonnull private HandshakeHandlerOutbound outboundHandler( - final RemotePeer remotePeer, final CompletableFuture connectionFuture) { - return new HandshakeHandlerOutbound(connectionFuture, eventDispatcher, new PlainHandshaker()); + final String selfIdentifier, + final RemotePeer remotePeer, + final CompletableFuture connectionFuture) { + return new HandshakeHandlerOutbound(selfIdentifier, connectionFuture, + eventDispatcher, new PlainHandshaker()); } void addAdditionalOutboundHandlers(final Channel channel, final RemotePeer remotePeer) diff --git a/src/main/java/org/codenil/comm/netty/NettyPeerConnection.java b/src/main/java/org/codenil/comm/netty/NettyPeerConnection.java index b10a40c..9ddc768 100644 --- a/src/main/java/org/codenil/comm/netty/NettyPeerConnection.java +++ b/src/main/java/org/codenil/comm/netty/NettyPeerConnection.java @@ -1,11 +1,17 @@ package org.codenil.comm.netty; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; + import org.codenil.comm.connections.AbstractPeerConnection; import org.codenil.comm.connections.PeerConnectionEvents; +import org.codenil.comm.message.DisconnectMessage; +import org.codenil.comm.message.DisconnectReason; import org.codenil.comm.message.Message; import org.codenil.comm.message.RawMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; @@ -13,24 +19,56 @@ import static java.util.concurrent.TimeUnit.SECONDS; public class NettyPeerConnection extends AbstractPeerConnection { + private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class); + private final ChannelHandlerContext ctx; + private final PeerConnectionEvents connectionEvents; + public NettyPeerConnection( final ChannelHandlerContext ctx, + final String remoteIdentifier, final PeerConnectionEvents connectionEvents) { - super(connectionEvents); + super(remoteIdentifier); this.ctx = ctx; + this.connectionEvents = connectionEvents; } + @Override + public void terminateConnection() { + if (terminatedImmediately.compareAndSet(false, true)) { + if (disconnected.compareAndSet(false, true)) { + connectionEvents.dispatchDisconnect(this); + } + + closeConnectionImmediately(); + logger.atTrace() + .setMessage("Terminating connection, reason {}") + .addArgument(this) + .log(); + } + } @Override - public String peerIdentity() { - return ctx.channel().remoteAddress().toString(); + public void disconnect(DisconnectReason reason) { + if (disconnected.compareAndSet(false, true)) { + connectionEvents.dispatchDisconnect(this); + doSendMessage(DisconnectMessage.create(reason)); + closeConnection(); + } } @Override protected void doSendMessage(final Message message) { - ctx.channel().writeAndFlush(new RawMessage(message.getCode(), message.getData())); + RawMessage rawMessage = RawMessage.create(message.code()); + rawMessage.setData(message.data()); + ctx.channel().writeAndFlush(rawMessage); + } + + @Override + protected void doReplaceHandler(String name, ChannelHandler newHandler) { + ctx.channel().pipeline() + .replace(name, name, newHandler); } @Override diff --git a/src/main/java/org/codenil/comm/netty/OutboundMessage.java b/src/main/java/org/codenil/comm/netty/OutboundMessage.java deleted file mode 100644 index 3101426..0000000 --- a/src/main/java/org/codenil/comm/netty/OutboundMessage.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.codenil.comm.netty; - -import org.codenil.comm.message.Message; - -public class OutboundMessage { - - private final Message message; - - public OutboundMessage( - final Message message) { - this.message = message; - } - - public Message message() { - return message; - } -}