改了一波
This commit is contained in:
parent
ad7a7beac4
commit
ca6a44bbed
@ -2,13 +2,20 @@ package org.codenil.comm;
|
|||||||
|
|
||||||
import com.google.common.cache.Cache;
|
import com.google.common.cache.Cache;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import org.codenil.comm.connections.*;
|
|
||||||
|
import org.codenil.comm.callback.ConnectCallback;
|
||||||
|
import org.codenil.comm.callback.DisconnectCallback;
|
||||||
|
import org.codenil.comm.callback.MessageCallback;
|
||||||
|
import org.codenil.comm.connections.ConnectionInitializer;
|
||||||
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
import org.codenil.comm.message.DisconnectReason;
|
import org.codenil.comm.message.DisconnectReason;
|
||||||
import org.codenil.comm.message.MessageCallback;
|
import org.codenil.comm.message.RawMessage;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
@ -21,22 +28,14 @@ public class Communication {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(Communication.class);
|
private static final Logger logger = LoggerFactory.getLogger(Communication.class);
|
||||||
|
|
||||||
/**
|
/** 连接初始化 */
|
||||||
* 连接初始化
|
|
||||||
*/
|
|
||||||
private final ConnectionInitializer connectionInitializer;
|
private final ConnectionInitializer connectionInitializer;
|
||||||
/**
|
|
||||||
* 消息订阅
|
/** 消息订阅 */
|
||||||
*/
|
|
||||||
private final PeerConnectionEvents connectionEvents;
|
private final PeerConnectionEvents connectionEvents;
|
||||||
/**
|
|
||||||
* 连接回调订阅
|
/** 连接缓存 */
|
||||||
*/
|
private final Cache<String, CompletableFuture<PeerConnection>> connectingCache = CacheBuilder.newBuilder()
|
||||||
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create();
|
|
||||||
/**
|
|
||||||
* 连接缓存
|
|
||||||
*/
|
|
||||||
private final Cache<String, CompletableFuture<PeerConnection>> peersConnectingCache = CacheBuilder.newBuilder()
|
|
||||||
.expireAfterWrite(Duration.ofSeconds(30L)).concurrencyLevel(1).build();
|
.expireAfterWrite(Duration.ofSeconds(30L)).concurrencyLevel(1).build();
|
||||||
|
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
@ -59,7 +58,7 @@ public class Communication {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//注册回调监听
|
//注册回调监听
|
||||||
setupListeners();
|
connectionEvents.subscribeConnect(this::dispatchConnect);
|
||||||
|
|
||||||
//启动连接初始化
|
//启动连接初始化
|
||||||
return connectionInitializer
|
return connectionInitializer
|
||||||
@ -81,7 +80,7 @@ public class Communication {
|
|||||||
new IllegalStateException("Illegal attempt to stop " + getClass().getSimpleName()));
|
new IllegalStateException("Illegal attempt to stop " + getClass().getSimpleName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
peersConnectingCache.asMap()
|
connectingCache.asMap()
|
||||||
.values()
|
.values()
|
||||||
.forEach((conn) -> {
|
.forEach((conn) -> {
|
||||||
try {
|
try {
|
||||||
@ -97,17 +96,17 @@ public class Communication {
|
|||||||
* 连接到远程节点
|
* 连接到远程节点
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<PeerConnection> connect(final RemotePeer remotePeer) {
|
public CompletableFuture<PeerConnection> connect(final RemotePeer remotePeer) {
|
||||||
final CompletableFuture<PeerConnection> peerConnectionCompletableFuture;
|
final CompletableFuture<PeerConnection> completableFuture;
|
||||||
try {
|
try {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
//尝试从缓存获取链接,获取不到就创建一个
|
//尝试从缓存获取链接,获取不到就创建一个
|
||||||
peerConnectionCompletableFuture = peersConnectingCache.get(
|
completableFuture = connectingCache.get(remotePeer.endpoint(),
|
||||||
remotePeer.ip(), () -> createConnection(remotePeer));
|
() -> createConnection(remotePeer));
|
||||||
}
|
}
|
||||||
} catch (final ExecutionException e) {
|
} catch (final ExecutionException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return peerConnectionCompletableFuture;
|
return completableFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -121,7 +120,21 @@ public class Communication {
|
|||||||
* 订阅连接
|
* 订阅连接
|
||||||
*/
|
*/
|
||||||
public void subscribeConnect(final ConnectCallback callback) {
|
public void subscribeConnect(final ConnectCallback callback) {
|
||||||
connectSubscribers.subscribe(callback);
|
connectionEvents.subscribeConnect(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 订阅断开
|
||||||
|
*/
|
||||||
|
public void subscribeDisconnect(final DisconnectCallback callback) {
|
||||||
|
connectionEvents.subscribeDisconnect(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 订阅指定code的消息
|
||||||
|
*/
|
||||||
|
public void subscribeByCode(final int code, final MessageCallback callback) {
|
||||||
|
connectionEvents.subscribeByCode(code, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -129,45 +142,51 @@ public class Communication {
|
|||||||
*/
|
*/
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private CompletableFuture<PeerConnection> createConnection(final RemotePeer remotePeer) {
|
private CompletableFuture<PeerConnection> createConnection(final RemotePeer remotePeer) {
|
||||||
CompletableFuture<PeerConnection> completableFuture = initiateOutboundConnection(remotePeer);
|
CompletableFuture<PeerConnection> completableFuture = connectionInitializer
|
||||||
|
.connect(remotePeer)
|
||||||
|
.whenComplete((conn, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
logger.debug("Failed to connect to peer {}", remotePeer.toString());
|
||||||
|
} else {
|
||||||
|
logger.debug("Outbound connection established to peer: {}", remotePeer.toString());
|
||||||
|
}
|
||||||
|
});;
|
||||||
|
|
||||||
completableFuture.whenComplete((peerConnection, throwable) -> {
|
completableFuture.whenComplete((peerConnection, throwable) -> {
|
||||||
if (throwable == null) {
|
if (throwable == null) {
|
||||||
|
remotePeer.setPkiId(peerConnection.remoteIdentifier());
|
||||||
|
peerConnection.setRemotePeer(remotePeer);
|
||||||
dispatchConnect(peerConnection);
|
dispatchConnect(peerConnection);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return completableFuture;
|
return completableFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 初始化远程连接
|
|
||||||
*/
|
|
||||||
private CompletableFuture<PeerConnection> initiateOutboundConnection(final RemotePeer remotePeer) {
|
|
||||||
logger.trace("Initiating connection to peer: {}:{}", remotePeer.ip(), remotePeer.listeningPort());
|
|
||||||
|
|
||||||
return connectionInitializer
|
|
||||||
.connect(remotePeer)
|
|
||||||
.whenComplete((conn, err) -> {
|
|
||||||
if (err != null) {
|
|
||||||
logger.debug("Failed to connect to peer {}: {}", remotePeer.ip(), err.getMessage());
|
|
||||||
} else {
|
|
||||||
logger.debug("Outbound connection established to peer: {}", remotePeer.ip());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupListeners() {
|
|
||||||
connectionInitializer.subscribeIncomingConnect(this::handleIncomingConnection);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleIncomingConnection(final PeerConnection peerConnection) {
|
|
||||||
dispatchConnect(peerConnection);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接完成后调用注册的回调
|
* 连接完成后调用注册的回调
|
||||||
*/
|
*/
|
||||||
private void dispatchConnect(final PeerConnection connection) {
|
private void dispatchConnect(final PeerConnection connection) {
|
||||||
connectSubscribers.forEach(c -> c.onConnect(connection));
|
connectionEvents.dispatchConnect(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接断开后调用注册的回调
|
||||||
|
*/
|
||||||
|
private void dispatchDisconnect(final PeerConnection connection) {
|
||||||
|
connectionEvents.dispatchDisconnect(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 收到消息后调用注册的回调
|
||||||
|
*/
|
||||||
|
private void dispatchMessage(final PeerConnection connection, final RawMessage message) {
|
||||||
|
connectionEvents.dispatchMessage(connection, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 收到指定code消息后调用指定回调
|
||||||
|
*/
|
||||||
|
private void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) {
|
||||||
|
connectionEvents.dispatchMessageByCode(code, connection, message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,25 +0,0 @@
|
|||||||
package org.codenil.comm;
|
|
||||||
|
|
||||||
import org.codenil.comm.connections.PeerConnection;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
public class ConnectionStore {
|
|
||||||
|
|
||||||
private final Map<String, PeerConnection> pki2Conn = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
public void registerConnection(PeerConnection peerConnection) {
|
|
||||||
pki2Conn.put(peerConnection.peerIdentity(), peerConnection);
|
|
||||||
System.out.println(peerConnection.peerIdentity());
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean unRegisterConnection(PeerConnection peerConnection) {
|
|
||||||
return pki2Conn.remove(peerConnection.peerIdentity(), peerConnection);
|
|
||||||
}
|
|
||||||
|
|
||||||
public PeerConnection getConnection(String peerIdentity) {
|
|
||||||
return pki2Conn.get(peerIdentity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,29 +0,0 @@
|
|||||||
package org.codenil.comm;
|
|
||||||
|
|
||||||
import org.codenil.comm.message.AbstractMessage;
|
|
||||||
import org.codenil.comm.message.Message;
|
|
||||||
import org.codenil.comm.message.MessageCodes;
|
|
||||||
|
|
||||||
public class DataUpdateMessage extends AbstractMessage {
|
|
||||||
|
|
||||||
public DataUpdateMessage(byte[] data) {
|
|
||||||
super(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static DataUpdateMessage readFrom(final Message message) {
|
|
||||||
if (message instanceof DataUpdateMessage) {
|
|
||||||
return (DataUpdateMessage) message;
|
|
||||||
}
|
|
||||||
return new DataUpdateMessage(message.getData());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static DataUpdateMessage create(final String data) {
|
|
||||||
return new DataUpdateMessage(data.getBytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getCode() {
|
|
||||||
return MessageCodes.DATA_UPDATE;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -1,26 +1,21 @@
|
|||||||
package org.codenil.comm;
|
package org.codenil.comm;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.codenil.comm.callback.ConnectCallback;
|
||||||
import org.apache.tuweni.bytes.Bytes;
|
import org.codenil.comm.callback.DisconnectCallback;
|
||||||
|
import org.codenil.comm.callback.MessageCallback;
|
||||||
import org.codenil.comm.connections.*;
|
import org.codenil.comm.connections.*;
|
||||||
import org.codenil.comm.message.DefaultMessage;
|
|
||||||
import org.codenil.comm.message.DisconnectReason;
|
import org.codenil.comm.message.DisconnectReason;
|
||||||
import org.codenil.comm.message.Message;
|
import org.codenil.comm.message.Message;
|
||||||
import org.codenil.comm.message.RawMessage;
|
|
||||||
import org.codenil.comm.netty.NettyConnectionInitializer;
|
import org.codenil.comm.netty.NettyConnectionInitializer;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.math.BigInteger;
|
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
public class NetworkService {
|
public class NetworkService {
|
||||||
|
|
||||||
@ -29,30 +24,26 @@ public class NetworkService {
|
|||||||
private final CountDownLatch shutdown = new CountDownLatch(1);
|
private final CountDownLatch shutdown = new CountDownLatch(1);
|
||||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||||
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
private final Map<String, PeerConnection> aliveConnections = new ConcurrentHashMap<>();
|
||||||
private final Map<Integer, Subscribers<MessageCallback>> listenersByCode = new ConcurrentHashMap<>();
|
private final Map<String, PeerConnection> deadConnections = new ConcurrentHashMap<>();
|
||||||
private final Map<Integer, MessageResponse> messageResponseByCode = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
|
|
||||||
private final AtomicLong requestIdCounter = new AtomicLong(1);
|
|
||||||
private final Clock clock = Clock.systemUTC();
|
private final Clock clock = Clock.systemUTC();
|
||||||
private final PeerReputation reputation = new PeerReputation();
|
|
||||||
private final ConnectionStore connectionStore = new ConnectionStore();
|
|
||||||
|
|
||||||
private final Communication communication;
|
private final Communication communication;
|
||||||
|
|
||||||
private volatile long lastRequestTimestamp = 0;
|
public NetworkService(
|
||||||
|
final NetworkConfig networkConfig,
|
||||||
public NetworkService(final NetworkConfig networkConfig) {
|
final String selfIdentifier) {
|
||||||
PeerConnectionEvents connectionEvents = new PeerConnectionEvents();
|
PeerConnectionEvents connectionEvents = new PeerConnectionEvents();
|
||||||
ConnectionInitializer connectionInitializer = new NettyConnectionInitializer(networkConfig, connectionEvents);
|
ConnectionInitializer connectionInitializer = new NettyConnectionInitializer(networkConfig, selfIdentifier, connectionEvents);
|
||||||
this.communication = new Communication(connectionEvents, connectionInitializer);
|
this.communication = new Communication(connectionEvents, connectionInitializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动网络服务
|
||||||
|
*/
|
||||||
public CompletableFuture<Integer> start() {
|
public CompletableFuture<Integer> start() {
|
||||||
if (started.compareAndSet(false, true)) {
|
if (started.compareAndSet(false, true)) {
|
||||||
logger.info("Starting Network.");
|
logger.info("Starting Network.");
|
||||||
setupHandlers();
|
|
||||||
return communication.start();
|
return communication.start();
|
||||||
} else {
|
} else {
|
||||||
logger.error("Attempted to start already running network.");
|
logger.error("Attempted to start already running network.");
|
||||||
@ -60,6 +51,9 @@ public class NetworkService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 停止网络服务
|
||||||
|
*/
|
||||||
public CompletableFuture<Void> stop() {
|
public CompletableFuture<Void> stop() {
|
||||||
if (stopped.compareAndSet(false, true)) {
|
if (stopped.compareAndSet(false, true)) {
|
||||||
logger.info("Stopping Network.");
|
logger.info("Stopping Network.");
|
||||||
@ -73,107 +67,18 @@ public class NetworkService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 连接到远程节点
|
||||||
|
*/
|
||||||
public CompletableFuture<PeerConnection> connect(final RemotePeer remotePeer) {
|
public CompletableFuture<PeerConnection> connect(final RemotePeer remotePeer) {
|
||||||
return communication.connect(remotePeer);
|
return communication.connect(remotePeer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendRequest(final String peerIdentity, final Message message) {
|
|
||||||
lastRequestTimestamp = clock.millis();
|
|
||||||
this.dispatchRequest(
|
|
||||||
msg -> {
|
|
||||||
try {
|
|
||||||
PeerConnection connection = connectionStore.getConnection(peerIdentity);
|
|
||||||
if(Objects.nonNull(connection)) {
|
|
||||||
connection.send(msg);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public long subscribe(final int messageCode, final MessageCallback callback) {
|
|
||||||
return listenersByCode
|
|
||||||
.computeIfAbsent(messageCode, _ -> Subscribers.create())
|
|
||||||
.subscribe(callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void unsubscribe(final long subscriptionId, final int messageCode) {
|
|
||||||
if (listenersByCode.containsKey(messageCode)) {
|
|
||||||
listenersByCode.get(messageCode).unsubscribe(subscriptionId);
|
|
||||||
if (listenersByCode.get(messageCode).getSubscriberCount() < 1) {
|
|
||||||
listenersByCode.remove(messageCode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void registerResponse(
|
|
||||||
final int messageCode, final MessageResponse messageResponse) {
|
|
||||||
messageResponseByCode.put(messageCode, messageResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupHandlers() {
|
|
||||||
communication.subscribeConnect(this::registerNewConnection);
|
|
||||||
communication.subscribeMessage(message -> {
|
|
||||||
try {
|
|
||||||
this.receiveMessage(message);
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error(e.getMessage(), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 注册新连接
|
* 断开连接
|
||||||
*/
|
*/
|
||||||
private void registerNewConnection(final PeerConnection newConnection) {
|
public void disconnect(final String pkiId, final DisconnectReason reason) {
|
||||||
synchronized (this) {
|
PeerConnection connection = aliveConnections.get(pkiId);
|
||||||
connectionStore.registerConnection(newConnection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean registerDisconnect(final PeerConnection connection) {
|
|
||||||
return connectionStore.unRegisterConnection(connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void dispatchRequest(final RequestSender sender, final Message message) {
|
|
||||||
outstandingRequests.incrementAndGet();
|
|
||||||
sender.send(message.wrapMessage(requestIdCounter.getAndIncrement() + ""));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void receiveMessage(final DefaultMessage message) throws Exception {
|
|
||||||
|
|
||||||
//处理自定义回复处理器
|
|
||||||
Optional<Message> maybeResponse = Optional.empty();
|
|
||||||
try {
|
|
||||||
final int code = message.message().getCode();
|
|
||||||
Optional.ofNullable(listenersByCode.get(code))
|
|
||||||
.ifPresent(listeners -> listeners.forEach(messageCallback -> messageCallback.exec(message)));
|
|
||||||
|
|
||||||
Message requestIdAndEthMessage = message.message();
|
|
||||||
maybeResponse = Optional.ofNullable(messageResponseByCode.get(code))
|
|
||||||
.map(messageResponse -> messageResponse.response(message))
|
|
||||||
.map(responseData -> responseData.wrapMessage(requestIdAndEthMessage.getRequestId()));
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.atDebug()
|
|
||||||
.setMessage("Received malformed message {}, {}")
|
|
||||||
.addArgument(message)
|
|
||||||
.addArgument(e::toString)
|
|
||||||
.log();
|
|
||||||
this.disconnect(message.connection().peerIdentity(), DisconnectReason.UNKNOWN);
|
|
||||||
}
|
|
||||||
|
|
||||||
maybeResponse.ifPresent(
|
|
||||||
responseData -> {
|
|
||||||
try {
|
|
||||||
sendRequest(message.connection().peerIdentity(), responseData);
|
|
||||||
} catch (Exception __) {}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void disconnect(final String peerIdentity, final DisconnectReason reason) {
|
|
||||||
PeerConnection connection = connectionStore.getConnection(peerIdentity);
|
|
||||||
if(Objects.nonNull(connection)) {
|
if(Objects.nonNull(connection)) {
|
||||||
try {
|
try {
|
||||||
connection.disconnect(reason);
|
connection.disconnect(reason);
|
||||||
@ -183,40 +88,78 @@ public class NetworkService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recordRequestTimeout(final PeerConnection connection, final int requestCode) {
|
/**
|
||||||
logger.atDebug()
|
* 发送消息
|
||||||
.setMessage("Timed out while waiting for response")
|
* @param pkiId 远程节点的PKIID
|
||||||
.log();
|
* @param message 发送的信息
|
||||||
logger.trace("Timed out while waiting for response from peer {}", this);
|
*/
|
||||||
reputation.recordRequestTimeout(requestCode)
|
public void send(final String pkiId, final Message message) {
|
||||||
.ifPresent(reason -> {
|
try {
|
||||||
this.disconnect(connection.peerIdentity(), reason);
|
PeerConnection connection = aliveConnections.get(pkiId);
|
||||||
|
if (Objects.nonNull(connection)) {
|
||||||
|
connection.send(message);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 订阅消息
|
||||||
|
*/
|
||||||
|
public void subscribeMessage(final MessageCallback callback) {
|
||||||
|
//订阅消息,全量订阅
|
||||||
|
communication.subscribeMessage(message -> {
|
||||||
|
synchronized (this) {
|
||||||
|
callback.onMessage(message);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void recordUselessResponse(final PeerConnection connection, final String requestType) {
|
/**
|
||||||
logger.atTrace()
|
* 订阅连接
|
||||||
.setMessage("Received useless response for request type {}")
|
*/
|
||||||
.addArgument(requestType)
|
public void subscribeConnect(final ConnectCallback callback) {
|
||||||
.log();
|
//订阅连接成功
|
||||||
reputation.recordUselessResponse(System.currentTimeMillis())
|
communication.subscribeConnect(newConnection -> {
|
||||||
.ifPresent(reason -> {
|
synchronized (this) {
|
||||||
this.disconnect(connection.peerIdentity(), reason);
|
callback.onConnect(newConnection);
|
||||||
|
aliveConnections.put(newConnection.pkiId(), newConnection);
|
||||||
|
deadConnections.remove(newConnection.pkiId(), newConnection);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
/**
|
||||||
public interface RequestSender {
|
* 订阅断开
|
||||||
void send(final RawMessage message);
|
*/
|
||||||
|
public void subscribeDisconnect(final DisconnectCallback callback) {
|
||||||
|
//订阅断开连接
|
||||||
|
communication.subscribeDisconnect(connection -> {
|
||||||
|
synchronized (this) {
|
||||||
|
callback.onDisconnect(connection);
|
||||||
|
aliveConnections.remove(connection.pkiId(), connection);
|
||||||
|
deadConnections.put(connection.pkiId(), connection);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
/**
|
||||||
public interface MessageCallback {
|
* 订阅指定code的消息
|
||||||
void exec(DefaultMessage message);
|
*/
|
||||||
|
public void subscribeMessageByCode(final int code, final MessageCallback callback) {
|
||||||
|
communication.subscribeByCode(code, message -> {
|
||||||
|
synchronized (this) {
|
||||||
|
callback.onMessage(message);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
/**
|
||||||
public interface MessageResponse {
|
* 获取所有存活的连接
|
||||||
Message response(DefaultMessage message);
|
*/
|
||||||
|
public List<PeerConnection> aliveConnections() {
|
||||||
|
return new ArrayList<>(aliveConnections.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,114 +0,0 @@
|
|||||||
package org.codenil.comm;
|
|
||||||
|
|
||||||
import org.codenil.comm.message.DisconnectReason;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
|
||||||
|
|
||||||
public class PeerReputation implements Comparable<PeerReputation> {
|
|
||||||
private static final long USELESS_RESPONSE_WINDOW_IN_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
|
|
||||||
private static final int DEFAULT_MAX_SCORE = 150;
|
|
||||||
private static final int DEFAULT_INITIAL_SCORE = 100;
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(PeerReputation.class);
|
|
||||||
private static final int TIMEOUT_THRESHOLD = 5;
|
|
||||||
private static final int USELESS_RESPONSE_THRESHOLD = 5;
|
|
||||||
private static final int SMALL_ADJUSTMENT = 1;
|
|
||||||
private static final int LARGE_ADJUSTMENT = 10;
|
|
||||||
|
|
||||||
private final ConcurrentMap<Integer, AtomicInteger> timeoutCountByRequestType = new ConcurrentHashMap<>();
|
|
||||||
private final Queue<Long> uselessResponseTimes = new ConcurrentLinkedQueue<>();
|
|
||||||
|
|
||||||
private int score;
|
|
||||||
|
|
||||||
private final int maxScore;
|
|
||||||
|
|
||||||
public PeerReputation() {
|
|
||||||
this(DEFAULT_INITIAL_SCORE, DEFAULT_MAX_SCORE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public PeerReputation(final int initialScore, final int maxScore) {
|
|
||||||
checkArgument(
|
|
||||||
initialScore <= maxScore, "Initial score must be less than or equal to max score");
|
|
||||||
this.maxScore = maxScore;
|
|
||||||
this.score = initialScore;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
|
|
||||||
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
|
|
||||||
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
|
|
||||||
LOG.debug(
|
|
||||||
"Disconnection triggered by {} repeated timeouts for requestCode {}",
|
|
||||||
newTimeoutCount,
|
|
||||||
requestCode);
|
|
||||||
score -= LARGE_ADJUSTMENT;
|
|
||||||
return Optional.of(DisconnectReason.TIMEOUT);
|
|
||||||
} else {
|
|
||||||
score -= SMALL_ADJUSTMENT;
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void resetTimeoutCount(final int requestCode) {
|
|
||||||
timeoutCountByRequestType.remove(requestCode);
|
|
||||||
}
|
|
||||||
|
|
||||||
private AtomicInteger getOrCreateTimeoutCount(final int requestCode) {
|
|
||||||
return timeoutCountByRequestType.computeIfAbsent(requestCode, code -> new AtomicInteger());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<Integer, AtomicInteger> timeoutCounts() {
|
|
||||||
return timeoutCountByRequestType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
|
|
||||||
uselessResponseTimes.add(timestamp);
|
|
||||||
while (shouldRemove(uselessResponseTimes.peek(), timestamp)) {
|
|
||||||
uselessResponseTimes.poll();
|
|
||||||
}
|
|
||||||
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
|
|
||||||
score -= LARGE_ADJUSTMENT;
|
|
||||||
LOG.debug("Disconnection triggered by exceeding useless response threshold");
|
|
||||||
return Optional.of(DisconnectReason.UNKNOWN);
|
|
||||||
} else {
|
|
||||||
score -= SMALL_ADJUSTMENT;
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void recordUsefulResponse() {
|
|
||||||
if (score < maxScore) {
|
|
||||||
score = Math.min(maxScore, score + SMALL_ADJUSTMENT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean shouldRemove(final Long timestamp, final long currentTimestamp) {
|
|
||||||
return timestamp != null && timestamp + USELESS_RESPONSE_WINDOW_IN_MILLIS < currentTimestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return String.format(
|
|
||||||
"PeerReputation score: %d, timeouts: %s, useless: %s",
|
|
||||||
score, timeoutCounts(), uselessResponseTimes.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(final @Nonnull PeerReputation otherReputation) {
|
|
||||||
return Integer.compare(this.score, otherReputation.score);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getScore() {
|
|
||||||
return score;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,32 +1,30 @@
|
|||||||
package org.codenil.comm;
|
package org.codenil.comm;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
|
|
||||||
public class RemotePeer {
|
public class RemotePeer {
|
||||||
|
|
||||||
private final String peerIdentity;
|
private String pkiId;
|
||||||
|
|
||||||
private final String ip;
|
private final String endpoint;
|
||||||
|
|
||||||
private final int listeningPort;
|
|
||||||
|
|
||||||
public RemotePeer(
|
public RemotePeer(
|
||||||
final String ip,
|
final String endpoint) {
|
||||||
final int listeningPort) {
|
this.endpoint = endpoint;
|
||||||
this.ip = ip;
|
|
||||||
this.listeningPort = listeningPort;
|
|
||||||
this.peerIdentity = new InetSocketAddress(ip, listeningPort).toString();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String peerIdentity() {
|
public String pkiId() {
|
||||||
return peerIdentity;
|
return pkiId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String ip() {
|
public String endpoint() {
|
||||||
return ip;
|
return endpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int listeningPort() {
|
public void setPkiId(String pkiId) {
|
||||||
return listeningPort;
|
this.pkiId = pkiId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s, pkiId:%s", this.endpoint(), this.pkiId);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
64
src/main/java/org/codenil/comm/Test.java
Normal file
64
src/main/java/org/codenil/comm/Test.java
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
package org.codenil.comm;
|
||||||
|
|
||||||
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
|
import org.codenil.comm.message.MessageCodes;
|
||||||
|
import org.codenil.comm.message.RawMessage;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
public class Test {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(Communication.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
//server();
|
||||||
|
|
||||||
|
client();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void server() throws Exception {
|
||||||
|
NetworkConfig config = new NetworkConfig();
|
||||||
|
config.setBindHost("192.168.8.30");
|
||||||
|
config.setBindPort(8080);
|
||||||
|
|
||||||
|
NetworkService service = new NetworkService(config, "");
|
||||||
|
service.start();
|
||||||
|
CompletableFuture<Integer> start = service.start();
|
||||||
|
|
||||||
|
start.whenComplete((res, err) -> {
|
||||||
|
service.subscribeMessageByCode(MessageCodes.GOSSIP, message -> {
|
||||||
|
logger.info("接收到消息:" + message.message().code());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void client() throws Exception {
|
||||||
|
NetworkConfig config = new NetworkConfig();
|
||||||
|
config.setBindHost("192.168.8.30");
|
||||||
|
config.setBindPort(8090);
|
||||||
|
|
||||||
|
NetworkService service = new NetworkService(config, "");
|
||||||
|
CompletableFuture<Integer> start = service.start();
|
||||||
|
|
||||||
|
start.whenComplete((res, err) -> {
|
||||||
|
RemotePeer remotePeer = new RemotePeer("192.168.8.30:8080");
|
||||||
|
CompletableFuture<PeerConnection> conn = service.connect(remotePeer);
|
||||||
|
|
||||||
|
conn.whenComplete((cres, cerr) -> {
|
||||||
|
if (cerr == null) {
|
||||||
|
try {
|
||||||
|
RawMessage rawMessage = RawMessage.create(MessageCodes.GOSSIP);
|
||||||
|
rawMessage.setData("test".getBytes(StandardCharsets.UTF_8));
|
||||||
|
service.send(remotePeer.pkiId(), rawMessage);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,6 @@
|
|||||||
package org.codenil.comm.connections;
|
package org.codenil.comm.callback;
|
||||||
|
|
||||||
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接回调
|
* 连接回调
|
@ -0,0 +1,8 @@
|
|||||||
|
package org.codenil.comm.callback;
|
||||||
|
|
||||||
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface DisconnectCallback {
|
||||||
|
void onDisconnect(final PeerConnection connection);
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
package org.codenil.comm.callback;
|
||||||
|
|
||||||
|
import org.codenil.comm.message.DefaultMessage;
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface MessageCallback {
|
||||||
|
|
||||||
|
void onMessage(final DefaultMessage message);
|
||||||
|
}
|
@ -0,0 +1,9 @@
|
|||||||
|
package org.codenil.comm.callback;
|
||||||
|
|
||||||
|
import org.codenil.comm.message.DefaultMessage;
|
||||||
|
import org.codenil.comm.message.Message;
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ResponseCallback {
|
||||||
|
Message response(DefaultMessage message);
|
||||||
|
}
|
@ -1,23 +1,37 @@
|
|||||||
package org.codenil.comm.connections;
|
package org.codenil.comm.connections;
|
||||||
|
|
||||||
import org.codenil.comm.message.DisconnectMessage;
|
import io.netty.channel.ChannelHandler;
|
||||||
import org.codenil.comm.message.DisconnectReason;
|
import org.codenil.comm.RemotePeer;
|
||||||
import org.codenil.comm.message.Message;
|
import org.codenil.comm.message.Message;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public abstract class AbstractPeerConnection implements PeerConnection {
|
public abstract class AbstractPeerConnection implements PeerConnection {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class);
|
private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class);
|
||||||
|
|
||||||
protected final PeerConnectionEvents connectionEvents;
|
private final String remoteIdentifier;
|
||||||
private final AtomicBoolean disconnected = new AtomicBoolean(false);
|
|
||||||
private final AtomicBoolean terminatedImmediately = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
protected AbstractPeerConnection(final PeerConnectionEvents connectionEvents) {
|
protected final AtomicBoolean disconnected = new AtomicBoolean(false);
|
||||||
this.connectionEvents = connectionEvents;
|
|
||||||
|
protected final AtomicBoolean terminatedImmediately = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private RemotePeer remotePeer;
|
||||||
|
|
||||||
|
protected AbstractPeerConnection(
|
||||||
|
final String remoteIdentifier) {
|
||||||
|
this.remoteIdentifier = remoteIdentifier;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String pkiId() {
|
||||||
|
if(Objects.isNull(remotePeer)) {
|
||||||
|
throw new IllegalStateException("connection not complated yet");
|
||||||
|
}
|
||||||
|
return remotePeer.pkiId();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -26,32 +40,34 @@ public abstract class AbstractPeerConnection implements PeerConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void terminateConnection() {
|
public String remoteIdentifier() {
|
||||||
if (terminatedImmediately.compareAndSet(false, true)) {
|
return remoteIdentifier;
|
||||||
if (disconnected.compareAndSet(false, true)) {
|
|
||||||
connectionEvents.dispatchDisconnect(this);
|
|
||||||
}
|
|
||||||
// Always ensure the context gets closed immediately even if we previously sent a disconnect
|
|
||||||
// message and are waiting to close.
|
|
||||||
closeConnectionImmediately();
|
|
||||||
logger.atTrace()
|
|
||||||
.setMessage("Terminating connection, reason {}")
|
|
||||||
.addArgument(this)
|
|
||||||
.log();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(DisconnectReason reason) {
|
public boolean disconnected() {
|
||||||
if (disconnected.compareAndSet(false, true)) {
|
return disconnected.get() || terminatedImmediately.get();
|
||||||
connectionEvents.dispatchDisconnect(this);
|
|
||||||
doSendMessage(DisconnectMessage.create(reason));
|
|
||||||
closeConnection();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemotePeer remotePeer() {
|
||||||
|
return remotePeer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRemotePeer(RemotePeer remotePeer) {
|
||||||
|
this.remotePeer = remotePeer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void replaceHandler(String name, ChannelHandler newHandler) {
|
||||||
|
doReplaceHandler(name, newHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void doSendMessage(final Message message);
|
protected abstract void doSendMessage(final Message message);
|
||||||
|
|
||||||
|
protected abstract void doReplaceHandler(String name, ChannelHandler newHandler);
|
||||||
|
|
||||||
protected abstract void closeConnection();
|
protected abstract void closeConnection();
|
||||||
|
|
||||||
protected abstract void closeConnectionImmediately();
|
protected abstract void closeConnectionImmediately();
|
||||||
|
@ -11,8 +11,6 @@ public interface ConnectionInitializer {
|
|||||||
|
|
||||||
CompletableFuture<Void> stop();
|
CompletableFuture<Void> stop();
|
||||||
|
|
||||||
void subscribeIncomingConnect(final ConnectCallback callback);
|
|
||||||
|
|
||||||
CompletableFuture<PeerConnection> connect(RemotePeer remotePeer);
|
CompletableFuture<PeerConnection> connect(RemotePeer remotePeer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,12 +4,12 @@ import io.netty.channel.ChannelDuplexHandler;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleState;
|
||||||
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.handler.timeout.IdleStateEvent;
|
||||||
|
|
||||||
import org.codenil.comm.message.DisconnectReason;
|
import org.codenil.comm.message.DisconnectReason;
|
||||||
import org.codenil.comm.message.PingMessage;
|
import org.codenil.comm.message.PingMessage;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public class KeepAlive extends ChannelDuplexHandler {
|
public class KeepAlive extends ChannelDuplexHandler {
|
||||||
@ -28,8 +28,7 @@ public class KeepAlive extends ChannelDuplexHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt)
|
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
|
||||||
throws IOException {
|
|
||||||
if (!(evt instanceof IdleStateEvent
|
if (!(evt instanceof IdleStateEvent
|
||||||
&& ((IdleStateEvent) evt).state() == IdleState.READER_IDLE)) {
|
&& ((IdleStateEvent) evt).state() == IdleState.READER_IDLE)) {
|
||||||
return;
|
return;
|
||||||
|
@ -1,14 +1,26 @@
|
|||||||
package org.codenil.comm.connections;
|
package org.codenil.comm.connections;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
|
import org.codenil.comm.RemotePeer;
|
||||||
import org.codenil.comm.message.DisconnectReason;
|
import org.codenil.comm.message.DisconnectReason;
|
||||||
import org.codenil.comm.message.Message;
|
import org.codenil.comm.message.Message;
|
||||||
|
|
||||||
public interface PeerConnection {
|
public interface PeerConnection {
|
||||||
|
|
||||||
String peerIdentity();
|
String pkiId();
|
||||||
|
|
||||||
|
String remoteIdentifier();
|
||||||
|
|
||||||
|
boolean disconnected();
|
||||||
|
|
||||||
|
RemotePeer remotePeer();
|
||||||
|
|
||||||
|
void setRemotePeer(RemotePeer remotePeer);
|
||||||
|
|
||||||
void send(final Message message) throws Exception;
|
void send(final Message message) throws Exception;
|
||||||
|
|
||||||
|
void replaceHandler(String name, ChannelHandler newHandler);
|
||||||
|
|
||||||
void disconnect(DisconnectReason reason) throws Exception;
|
void disconnect(DisconnectReason reason) throws Exception;
|
||||||
|
|
||||||
void terminateConnection();
|
void terminateConnection();
|
||||||
|
@ -1,15 +1,50 @@
|
|||||||
package org.codenil.comm.connections;
|
package org.codenil.comm.connections;
|
||||||
|
|
||||||
import org.codenil.comm.message.*;
|
import org.codenil.comm.callback.ConnectCallback;
|
||||||
|
import org.codenil.comm.callback.DisconnectCallback;
|
||||||
|
import org.codenil.comm.callback.MessageCallback;
|
||||||
|
import org.codenil.comm.message.DefaultMessage;
|
||||||
|
import org.codenil.comm.message.RawMessage;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class PeerConnectionEvents {
|
public class PeerConnectionEvents {
|
||||||
|
|
||||||
|
/** 连接回调订阅 */
|
||||||
|
private final Subscribers<ConnectCallback> connectSubscribers = Subscribers.create(true);
|
||||||
|
|
||||||
private final Subscribers<DisconnectCallback> disconnectSubscribers = Subscribers.create(true);
|
private final Subscribers<DisconnectCallback> disconnectSubscribers = Subscribers.create(true);
|
||||||
|
|
||||||
private final Subscribers<MessageCallback> messageSubscribers = Subscribers.create(true);
|
private final Subscribers<MessageCallback> messageSubscribers = Subscribers.create(true);
|
||||||
|
|
||||||
|
private final Map<Integer, Subscribers<MessageCallback>> subscribersByCode = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public PeerConnectionEvents() {}
|
public PeerConnectionEvents() {}
|
||||||
|
|
||||||
|
public void subscribeConnect(final ConnectCallback callback) {
|
||||||
|
connectSubscribers.subscribe(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void subscribeDisconnect(final DisconnectCallback callback) {
|
||||||
|
disconnectSubscribers.subscribe(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void subscribeMessage(final MessageCallback callback) {
|
||||||
|
messageSubscribers.subscribe(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void subscribeByCode(final int messageCode, final MessageCallback callback) {
|
||||||
|
subscribersByCode
|
||||||
|
.computeIfAbsent(messageCode, _ -> Subscribers.create())
|
||||||
|
.subscribe(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dispatchConnect(
|
||||||
|
final PeerConnection connection) {
|
||||||
|
connectSubscribers.forEach(s -> s.onConnect(connection));
|
||||||
|
}
|
||||||
|
|
||||||
public void dispatchDisconnect(
|
public void dispatchDisconnect(
|
||||||
final PeerConnection connection) {
|
final PeerConnection connection) {
|
||||||
disconnectSubscribers.forEach(s -> s.onDisconnect(connection));
|
disconnectSubscribers.forEach(s -> s.onDisconnect(connection));
|
||||||
@ -20,11 +55,8 @@ public class PeerConnectionEvents {
|
|||||||
messageSubscribers.forEach(s -> s.onMessage(msg));
|
messageSubscribers.forEach(s -> s.onMessage(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void subscribeDisconnect(final DisconnectCallback callback) {
|
public void dispatchMessageByCode(final int code, final PeerConnection connection, final RawMessage message) {
|
||||||
disconnectSubscribers.subscribe(callback);
|
final DefaultMessage msg = new DefaultMessage(connection, message);
|
||||||
}
|
subscribersByCode.get(code).forEach(s -> s.onMessage(msg));
|
||||||
|
|
||||||
public void subscribeMessage(final MessageCallback callback) {
|
|
||||||
messageSubscribers.subscribe(callback);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
package org.codenil.comm.netty.handler;
|
package org.codenil.comm.handler;
|
||||||
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
|
|
||||||
import org.codenil.comm.connections.PeerConnection;
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
import org.codenil.comm.connections.PeerConnectionEvents;
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
import org.codenil.comm.message.Message;
|
|
||||||
import org.codenil.comm.message.MessageCodes;
|
import org.codenil.comm.message.MessageCodes;
|
||||||
import org.codenil.comm.message.PongMessage;
|
import org.codenil.comm.message.PongMessage;
|
||||||
import org.codenil.comm.message.RawMessage;
|
import org.codenil.comm.message.RawMessage;
|
||||||
@ -32,8 +32,8 @@ public class CommonHandler extends SimpleChannelInboundHandler<RawMessage> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void channelRead0(final ChannelHandlerContext ctx, final RawMessage originalMessage) {
|
protected void channelRead0(final ChannelHandlerContext ctx, final RawMessage originalMessage) {
|
||||||
logger.debug("Received a message from {}", originalMessage.getCode());
|
logger.debug("Received a message from {}", originalMessage.code());
|
||||||
switch (originalMessage.getCode()) {
|
switch (originalMessage.code()) {
|
||||||
case MessageCodes.PING:
|
case MessageCodes.PING:
|
||||||
logger.trace("Received Wire PING");
|
logger.trace("Received Wire PING");
|
||||||
try {
|
try {
|
@ -1,10 +1,11 @@
|
|||||||
package org.codenil.comm.netty.handler;
|
package org.codenil.comm.handler;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||||
import io.netty.handler.codec.DecoderException;
|
import io.netty.handler.codec.DecoderException;
|
||||||
import io.netty.handler.timeout.IdleStateHandler;
|
import io.netty.handler.timeout.IdleStateHandler;
|
||||||
|
|
||||||
import org.codenil.comm.connections.KeepAlive;
|
import org.codenil.comm.connections.KeepAlive;
|
||||||
import org.codenil.comm.connections.PeerConnection;
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
import org.codenil.comm.connections.PeerConnectionEvents;
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
@ -27,16 +28,16 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(MessageFrameDecoder.class);
|
private static final Logger logger = LoggerFactory.getLogger(MessageFrameDecoder.class);
|
||||||
|
|
||||||
private final CompletableFuture<PeerConnection> connectFuture;
|
private final CompletableFuture<PeerConnection> connectionFuture;
|
||||||
private final PeerConnectionEvents connectionEvents;
|
private final PeerConnectionEvents connectionEvents;
|
||||||
|
|
||||||
private boolean hellosExchanged;
|
private boolean hellosExchanged;
|
||||||
|
|
||||||
public MessageFrameDecoder(
|
public MessageFrameDecoder(
|
||||||
final PeerConnectionEvents connectionEvents,
|
final PeerConnectionEvents connectionEvents,
|
||||||
final CompletableFuture<PeerConnection> connectFuture) {
|
final CompletableFuture<PeerConnection> connectionFuture) {
|
||||||
this.connectionEvents = connectionEvents;
|
this.connectionEvents = connectionEvents;
|
||||||
this.connectFuture = connectFuture;
|
this.connectionFuture = connectionFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -70,14 +71,17 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
|
|||||||
byteBuf.readBytes(data);
|
byteBuf.readBytes(data);
|
||||||
|
|
||||||
// 创建消息对象
|
// 创建消息对象
|
||||||
RawMessage message = new RawMessage(code, data);
|
RawMessage message = RawMessage.create(code);
|
||||||
message.setRequestId(id);
|
message.setRequestId(id);
|
||||||
|
message.setData(data);
|
||||||
|
|
||||||
if (hellosExchanged) {
|
if (hellosExchanged) {
|
||||||
out.add(message);
|
out.add(message);
|
||||||
} else if (message.getCode() == MessageCodes.HELLO) {
|
} else if (message.code() == MessageCodes.HELLO) {
|
||||||
hellosExchanged = true;
|
hellosExchanged = true;
|
||||||
final PeerConnection connection = new NettyPeerConnection(ctx, connectionEvents);
|
|
||||||
|
String remoteIdentifier = new String(message.data());
|
||||||
|
final PeerConnection connection = new NettyPeerConnection(ctx, remoteIdentifier, connectionEvents);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 如果收到的消息是Hello消息
|
* 如果收到的消息是Hello消息
|
||||||
@ -89,24 +93,27 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
|
|||||||
final AtomicBoolean waitingForPong = new AtomicBoolean(false);
|
final AtomicBoolean waitingForPong = new AtomicBoolean(false);
|
||||||
ctx.channel()
|
ctx.channel()
|
||||||
.pipeline()
|
.pipeline()
|
||||||
.addLast(new IdleStateHandler(15, 0, 0),
|
.addLast("IdleState", new IdleStateHandler(15, 0, 0))
|
||||||
new KeepAlive(connection, waitingForPong),
|
.addLast("KeepAlive", new KeepAlive(connection, waitingForPong))
|
||||||
new CommonHandler(connection, connectionEvents, waitingForPong),
|
.addLast("Common", new CommonHandler(connection, connectionEvents, waitingForPong))
|
||||||
new MessageFrameEncoder());
|
.addLast("FrameEncoder", new MessageFrameEncoder());
|
||||||
connectFuture.complete(connection);
|
connectionFuture.complete(connection);
|
||||||
} else if (message.getCode() == MessageCodes.DISCONNECT) {
|
} else if (message.code() == MessageCodes.DISCONNECT) {
|
||||||
logger.debug("Disconnected before sending HELLO.");
|
logger.debug("Disconnected before sending HELLO.");
|
||||||
ctx.close();
|
ctx.close();
|
||||||
connectFuture.completeExceptionally(new RuntimeException("Disconnect"));
|
connectionFuture.completeExceptionally(new RuntimeException("Disconnect"));
|
||||||
} else {
|
} else {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}",
|
"Message received before HELLO's exchanged, disconnecting. Code: {}, Data: {}",
|
||||||
message.getCode(), Arrays.toString(message.getData()));
|
message.code(), Arrays.toString(message.data()));
|
||||||
|
|
||||||
DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.UNKNOWN);
|
DisconnectMessage disconnectMessage = DisconnectMessage.create(DisconnectReason.UNKNOWN);
|
||||||
ctx.writeAndFlush(new RawMessage(disconnectMessage.getCode(), disconnectMessage.getData()))
|
|
||||||
.addListener((f) -> ctx.close());
|
RawMessage rawMessage = RawMessage.create(disconnectMessage.code());
|
||||||
connectFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged"));
|
rawMessage.setData(disconnectMessage.data());
|
||||||
|
ctx.writeAndFlush(rawMessage)
|
||||||
|
.addListener(_ -> ctx.close());
|
||||||
|
connectionFuture.completeExceptionally(new RuntimeException("Message received before HELLO's exchanged"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,8 +126,8 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
|
|||||||
: throwable;
|
: throwable;
|
||||||
if (cause instanceof IllegalArgumentException) {
|
if (cause instanceof IllegalArgumentException) {
|
||||||
logger.debug("Invalid incoming message ", throwable);
|
logger.debug("Invalid incoming message ", throwable);
|
||||||
if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) {
|
if (connectionFuture.isDone() && !connectionFuture.isCompletedExceptionally()) {
|
||||||
connectFuture.get().disconnect(DisconnectReason.INVALID_MESSAGE_RECEIVED);
|
connectionFuture.get().disconnect(DisconnectReason.INVALID_MESSAGE_RECEIVED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (cause instanceof IOException) {
|
} else if (cause instanceof IOException) {
|
||||||
@ -129,10 +136,10 @@ public class MessageFrameDecoder extends ByteToMessageDecoder {
|
|||||||
} else {
|
} else {
|
||||||
logger.error("Exception while processing incoming message", throwable);
|
logger.error("Exception while processing incoming message", throwable);
|
||||||
}
|
}
|
||||||
if (connectFuture.isDone() && !connectFuture.isCompletedExceptionally()) {
|
if (connectionFuture.isDone() && !connectionFuture.isCompletedExceptionally()) {
|
||||||
connectFuture.get().terminateConnection();
|
connectionFuture.get().terminateConnection();
|
||||||
} else {
|
} else {
|
||||||
connectFuture.completeExceptionally(throwable);
|
connectionFuture.completeExceptionally(throwable);
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package org.codenil.comm.netty.handler;
|
package org.codenil.comm.handler;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
@ -18,12 +18,11 @@ public class MessageFrameEncoder extends MessageToByteEncoder<RawMessage> {
|
|||||||
final ChannelHandlerContext ctx,
|
final ChannelHandlerContext ctx,
|
||||||
final RawMessage msg,
|
final RawMessage msg,
|
||||||
final ByteBuf out) {
|
final ByteBuf out) {
|
||||||
byte[] idBytes = Optional.ofNullable(msg.getRequestId()).orElse("").getBytes(StandardCharsets.UTF_8);
|
byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
SerializeHelper builder = new SerializeHelper();
|
SerializeHelper builder = new SerializeHelper();
|
||||||
ByteBuf buf = builder.writeBytes(idBytes)
|
ByteBuf buf = builder.writeBytes(idBytes)
|
||||||
.writeInt(msg.getCode())
|
.writeInt(msg.code())
|
||||||
.writeBytes(msg.getData())
|
.writeBytes(msg.data())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
out.writeBytes(buf);
|
out.writeBytes(buf);
|
@ -1,4 +1,4 @@
|
|||||||
package org.codenil.comm.netty.handler;
|
package org.codenil.comm.handler;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.codenil.comm.handshake.PlainMessage;
|
import org.codenil.comm.handshake.PlainMessage;
|
@ -1,4 +1,4 @@
|
|||||||
package org.codenil.comm.netty.handler;
|
package org.codenil.comm.handler;
|
||||||
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
@ -4,10 +4,14 @@ import io.netty.buffer.ByteBuf;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.handler.codec.MessageToByteEncoder;
|
import io.netty.handler.codec.MessageToByteEncoder;
|
||||||
import org.codenil.comm.connections.PeerConnectionEvents;
|
|
||||||
import org.codenil.comm.message.*;
|
|
||||||
import org.codenil.comm.connections.PeerConnection;
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
import org.codenil.comm.netty.handler.MessageFrameDecoder;
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
|
import org.codenil.comm.handler.MessageFrameDecoder;
|
||||||
|
import org.codenil.comm.message.HelloMessage;
|
||||||
|
import org.codenil.comm.message.MessageCodes;
|
||||||
|
import org.codenil.comm.message.RawMessage;
|
||||||
|
import org.codenil.comm.serialize.SerializeHelper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -21,12 +25,18 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl
|
|||||||
|
|
||||||
private final CompletableFuture<PeerConnection> connectionFuture;
|
private final CompletableFuture<PeerConnection> connectionFuture;
|
||||||
private final PeerConnectionEvents connectionEvents;
|
private final PeerConnectionEvents connectionEvents;
|
||||||
|
|
||||||
|
/** 本机的标识 */
|
||||||
|
private final String selfIdentifier;
|
||||||
|
|
||||||
protected final Handshaker handshaker;
|
protected final Handshaker handshaker;
|
||||||
|
|
||||||
protected AbstractHandshakeHandler(
|
protected AbstractHandshakeHandler(
|
||||||
|
final String selfIdentifier,
|
||||||
final CompletableFuture<PeerConnection> connectionFuture,
|
final CompletableFuture<PeerConnection> connectionFuture,
|
||||||
final PeerConnectionEvents connectionEvents,
|
final PeerConnectionEvents connectionEvents,
|
||||||
final Handshaker handshaker) {
|
final Handshaker handshaker) {
|
||||||
|
this.selfIdentifier = selfIdentifier;
|
||||||
this.connectionFuture = connectionFuture;
|
this.connectionFuture = connectionFuture;
|
||||||
this.connectionEvents = connectionEvents;
|
this.connectionEvents = connectionEvents;
|
||||||
this.handshaker = handshaker;
|
this.handshaker = handshaker;
|
||||||
@ -50,15 +60,17 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl
|
|||||||
*/
|
*/
|
||||||
ctx.channel()
|
ctx.channel()
|
||||||
.pipeline()
|
.pipeline()
|
||||||
.replace(this, "DeFramer", new MessageFrameDecoder(connectionEvents, connectionFuture))
|
.replace(this, "FrameDecoder", new MessageFrameDecoder(connectionEvents, connectionFuture))
|
||||||
.addBefore("DeFramer", "validate", new FirstMessageFrameEncoder());
|
.addBefore("FrameDecoder", "validate", new FirstMessageFrameEncoder());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 替换完编解码器后发送Hello消息
|
* 替换完编解码器后发送Hello消息
|
||||||
|
* hello消息需要带一些数据
|
||||||
*/
|
*/
|
||||||
HelloMessage helloMessage = HelloMessage.create();
|
HelloMessage helloMessage = HelloMessage.create(selfIdentifier.getBytes(StandardCharsets.UTF_8));
|
||||||
RawMessage rawMessage = new RawMessage(helloMessage.getCode(), helloMessage.getData());
|
RawMessage rawMessage = RawMessage.create(helloMessage.code());
|
||||||
rawMessage.setRequestId(helloMessage.getRequestId());
|
rawMessage.setData(helloMessage.data());
|
||||||
|
rawMessage.setRequestId(helloMessage.requestId());
|
||||||
ctx.writeAndFlush(rawMessage)
|
ctx.writeAndFlush(rawMessage)
|
||||||
.addListener(ff -> {
|
.addListener(ff -> {
|
||||||
if (ff.isSuccess()) {
|
if (ff.isSuccess()) {
|
||||||
@ -90,42 +102,19 @@ public abstract class AbstractHandshakeHandler extends SimpleChannelInboundHandl
|
|||||||
final ChannelHandlerContext context,
|
final ChannelHandlerContext context,
|
||||||
final RawMessage msg,
|
final RawMessage msg,
|
||||||
final ByteBuf out) {
|
final ByteBuf out) {
|
||||||
if (msg.getCode() != MessageCodes.HELLO) {
|
if (msg.code() != MessageCodes.HELLO) {
|
||||||
throw new IllegalStateException("First message sent wasn't a HELLO.");
|
throw new IllegalStateException("First wire message sent wasn't a HELLO.");
|
||||||
}
|
}
|
||||||
byte[] idBytes = Optional.ofNullable(msg.getRequestId()).orElse("").getBytes(StandardCharsets.UTF_8);
|
byte[] idBytes = Optional.ofNullable(msg.requestId()).orElse("").getBytes(StandardCharsets.UTF_8);
|
||||||
int channelsSize = msg.getChannels().size();
|
|
||||||
|
|
||||||
int channelBytesLength = 0;
|
SerializeHelper builder = new SerializeHelper();
|
||||||
for (String channel : msg.getChannels()) {
|
ByteBuf buf = builder.writeBytes(idBytes)
|
||||||
byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8);
|
.writeInt(msg.code())
|
||||||
channelBytesLength = channelBytesLength + 4 + channelBytes.length;
|
.writeBytes(msg.data())
|
||||||
}
|
.build();
|
||||||
|
|
||||||
int payloadLength = 4 + 4 + idBytes.length + 4 + channelBytesLength + 4 + msg.getData().length;
|
out.writeBytes(buf);
|
||||||
|
buf.release();
|
||||||
// 写入协议头:消息总长度
|
|
||||||
out.writeInt(payloadLength + 4);
|
|
||||||
|
|
||||||
// 写入payload
|
|
||||||
// 写入code
|
|
||||||
out.writeInt(msg.getCode());
|
|
||||||
|
|
||||||
// 写入id
|
|
||||||
out.writeInt(idBytes.length);
|
|
||||||
out.writeBytes(idBytes);
|
|
||||||
|
|
||||||
// 写入channels
|
|
||||||
out.writeInt(channelsSize);
|
|
||||||
for (String channel : msg.getChannels()) {
|
|
||||||
byte[] channelBytes = channel.getBytes(StandardCharsets.UTF_8);
|
|
||||||
out.writeInt(channelBytes.length);
|
|
||||||
out.writeBytes(channelBytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 写入data
|
|
||||||
out.writeInt(msg.getData().length);
|
|
||||||
out.writeBytes(msg.getData());
|
|
||||||
context.pipeline().remove(this);
|
context.pipeline().remove(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,10 +13,11 @@ import java.util.concurrent.CompletableFuture;
|
|||||||
public class HandshakeHandlerInbound extends AbstractHandshakeHandler {
|
public class HandshakeHandlerInbound extends AbstractHandshakeHandler {
|
||||||
|
|
||||||
public HandshakeHandlerInbound(
|
public HandshakeHandlerInbound(
|
||||||
|
final String selfIdentifier,
|
||||||
final CompletableFuture<PeerConnection> connectionFuture,
|
final CompletableFuture<PeerConnection> connectionFuture,
|
||||||
final PeerConnectionEvents connectionEvent,
|
final PeerConnectionEvents connectionEvent,
|
||||||
final Handshaker handshaker) {
|
final Handshaker handshaker) {
|
||||||
super(connectionFuture, connectionEvent, handshaker);
|
super(selfIdentifier, connectionFuture, connectionEvent, handshaker);
|
||||||
handshaker.prepareResponder();
|
handshaker.prepareResponder();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package org.codenil.comm.handshake;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
|
||||||
import org.codenil.comm.connections.PeerConnection;
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
import org.codenil.comm.connections.PeerConnectionEvents;
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -20,10 +21,11 @@ public class HandshakeHandlerOutbound extends AbstractHandshakeHandler {
|
|||||||
private final ByteBuf first;
|
private final ByteBuf first;
|
||||||
|
|
||||||
public HandshakeHandlerOutbound(
|
public HandshakeHandlerOutbound(
|
||||||
|
final String selfIdentifier,
|
||||||
final CompletableFuture<PeerConnection> connectionFuture,
|
final CompletableFuture<PeerConnection> connectionFuture,
|
||||||
final PeerConnectionEvents connectionEvent,
|
final PeerConnectionEvents connectionEvent,
|
||||||
final Handshaker handshaker) {
|
final Handshaker handshaker) {
|
||||||
super(connectionFuture, connectionEvent, handshaker);
|
super(selfIdentifier, connectionFuture, connectionEvent, handshaker);
|
||||||
|
|
||||||
handshaker.prepareInitiator();
|
handshaker.prepareInitiator();
|
||||||
this.first = handshaker.firstMessage();
|
this.first = handshaker.firstMessage();
|
||||||
|
@ -1,113 +0,0 @@
|
|||||||
package org.codenil.comm.handshake;
|
|
||||||
|
|
||||||
import org.apache.tuweni.bytes.Bytes;
|
|
||||||
import org.apache.tuweni.bytes.Bytes32;
|
|
||||||
import org.bouncycastle.crypto.digests.KeccakDigest;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Objects;
|
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
|
||||||
|
|
||||||
public class HandshakeSecrets {
|
|
||||||
private final byte[] aesSecret;
|
|
||||||
private final byte[] macSecret;
|
|
||||||
private final byte[] token;
|
|
||||||
private final KeccakDigest egressMac = new KeccakDigest(Bytes32.SIZE * 8);
|
|
||||||
private final KeccakDigest ingressMac = new KeccakDigest(Bytes32.SIZE * 8);
|
|
||||||
|
|
||||||
public HandshakeSecrets(final byte[] aesSecret, final byte[] macSecret, final byte[] token) {
|
|
||||||
checkArgument(aesSecret.length == Bytes32.SIZE, "aes secret must be exactly 32 bytes long");
|
|
||||||
checkArgument(macSecret.length == Bytes32.SIZE, "mac secret must be exactly 32 bytes long");
|
|
||||||
checkArgument(token.length == Bytes32.SIZE, "token must be exactly 32 bytes long");
|
|
||||||
|
|
||||||
this.aesSecret = aesSecret;
|
|
||||||
this.macSecret = macSecret;
|
|
||||||
this.token = token;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HandshakeSecrets updateEgress(final byte[] bytes) {
|
|
||||||
egressMac.update(bytes, 0, bytes.length);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HandshakeSecrets updateIngress(final byte[] bytes) {
|
|
||||||
ingressMac.update(bytes, 0, bytes.length);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "HandshakeSecrets{"
|
|
||||||
+ "aesSecret="
|
|
||||||
+ Bytes.wrap(aesSecret)
|
|
||||||
+ ", macSecret="
|
|
||||||
+ Bytes.wrap(macSecret)
|
|
||||||
+ ", token="
|
|
||||||
+ Bytes.wrap(token)
|
|
||||||
+ ", egressMac="
|
|
||||||
+ Bytes.wrap(snapshot(egressMac))
|
|
||||||
+ ", ingressMac="
|
|
||||||
+ Bytes.wrap(snapshot(ingressMac))
|
|
||||||
+ '}';
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getAesSecret() {
|
|
||||||
return aesSecret;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getMacSecret() {
|
|
||||||
return macSecret;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getToken() {
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getEgressMac() {
|
|
||||||
return snapshot(egressMac);
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getIngressMac() {
|
|
||||||
return snapshot(ingressMac);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static byte[] snapshot(final KeccakDigest digest) {
|
|
||||||
final byte[] out = new byte[Bytes32.SIZE];
|
|
||||||
new KeccakDigest(digest).doFinal(out, 0);
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("EqualsWhichDoesntCheckParameterClass") // checked in delegated method
|
|
||||||
@Override
|
|
||||||
public boolean equals(final Object obj) {
|
|
||||||
return equals(obj, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean equals(final Object o, final boolean flipMacs) {
|
|
||||||
if (this == o) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (o == null || getClass() != o.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
final HandshakeSecrets that = (HandshakeSecrets) o;
|
|
||||||
final KeccakDigest vsEgress = flipMacs ? that.ingressMac : that.egressMac;
|
|
||||||
final KeccakDigest vsIngress = flipMacs ? that.egressMac : that.ingressMac;
|
|
||||||
return Arrays.equals(aesSecret, that.aesSecret)
|
|
||||||
&& Arrays.equals(macSecret, that.macSecret)
|
|
||||||
&& Arrays.equals(token, that.token)
|
|
||||||
&& Arrays.equals(snapshot(egressMac), snapshot(vsEgress))
|
|
||||||
&& Arrays.equals(snapshot(ingressMac), snapshot(vsIngress));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
return Objects.hash(
|
|
||||||
Arrays.hashCode(aesSecret),
|
|
||||||
Arrays.hashCode(macSecret),
|
|
||||||
Arrays.hashCode(token),
|
|
||||||
egressMac,
|
|
||||||
ingressMac);
|
|
||||||
}
|
|
||||||
}
|
|
@ -14,7 +14,5 @@ public interface Handshaker {
|
|||||||
|
|
||||||
ByteBuf firstMessage();
|
ByteBuf firstMessage();
|
||||||
|
|
||||||
HandshakeSecrets secrets();
|
|
||||||
|
|
||||||
Optional<ByteBuf> handleMessage(ByteBuf buf);
|
Optional<ByteBuf> handleMessage(ByteBuf buf);
|
||||||
}
|
}
|
||||||
|
@ -2,8 +2,9 @@ package org.codenil.comm.handshake;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
|
|
||||||
|
import org.codenil.comm.handler.MessageHandler;
|
||||||
import org.codenil.comm.message.MessageType;
|
import org.codenil.comm.message.MessageType;
|
||||||
import org.codenil.comm.netty.handler.MessageHandler;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -60,7 +61,6 @@ public class PlainHandshaker implements Handshaker {
|
|||||||
"illegal invocation of onMessage on handshake that is not in progress");
|
"illegal invocation of onMessage on handshake that is not in progress");
|
||||||
|
|
||||||
PlainMessage message = MessageHandler.parseMessage(buf);
|
PlainMessage message = MessageHandler.parseMessage(buf);
|
||||||
|
|
||||||
Optional<byte[]> nextMsg = Optional.empty();
|
Optional<byte[]> nextMsg = Optional.empty();
|
||||||
if (initiator) {
|
if (initiator) {
|
||||||
checkState(responderMsg == null,
|
checkState(responderMsg == null,
|
||||||
@ -84,9 +84,4 @@ public class PlainHandshaker implements Handshaker {
|
|||||||
logger.trace("Handshake status set to {}", status.get());
|
logger.trace("Handshake status set to {}", status.get());
|
||||||
return nextMsg.map(Unpooled::wrappedBuffer);
|
return nextMsg.map(Unpooled::wrappedBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public HandshakeSecrets secrets() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -4,29 +4,28 @@ public abstract class AbstractMessage implements Message {
|
|||||||
|
|
||||||
private String requestId;
|
private String requestId;
|
||||||
|
|
||||||
private byte[] data = new byte[]{};
|
private byte[] data;
|
||||||
|
|
||||||
public AbstractMessage(
|
|
||||||
final byte[] data) {
|
|
||||||
this.data = data;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRequestId() {
|
public String requestId() {
|
||||||
return requestId;
|
return requestId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final int getSize() {
|
public final int size() {
|
||||||
return data.length;
|
return data.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getData() {
|
public byte[] data() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRequestId(String requestId) {
|
public void setRequestId(String requestId) {
|
||||||
this.requestId = requestId;
|
this.requestId = requestId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setData(byte[] data) {
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,31 +1,29 @@
|
|||||||
package org.codenil.comm.message;
|
package org.codenil.comm.message;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
public class DisconnectMessage extends AbstractMessage {
|
public class DisconnectMessage extends AbstractMessage {
|
||||||
|
|
||||||
private DisconnectMessage(final byte[] data) {
|
private DisconnectMessage(final byte[] data) {
|
||||||
super(data);
|
super.setData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DisconnectMessage create(final DisconnectReason reason) {
|
public static DisconnectMessage create(final DisconnectReason reason) {
|
||||||
return new DisconnectMessage(new Gson().toJson(reason).getBytes());
|
return new DisconnectMessage(new byte[]{reason.code()});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DisconnectMessage readFrom(final Message message) {
|
public static DisconnectMessage readFrom(final Message message) {
|
||||||
if (message instanceof DisconnectMessage) {
|
if (message instanceof DisconnectMessage) {
|
||||||
return (DisconnectMessage) message;
|
return (DisconnectMessage) message;
|
||||||
}
|
}
|
||||||
final int code = message.getCode();
|
final int code = message.code();
|
||||||
if (code != MessageCodes.DISCONNECT) {
|
if (code != MessageCodes.DISCONNECT) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
String.format("Message has code %d and thus is not a DisconnectMessage.", code));
|
String.format("Message has code %d and thus is not a DisconnectMessage.", code));
|
||||||
}
|
}
|
||||||
return new DisconnectMessage(message.getData());
|
return new DisconnectMessage(message.data());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCode() {
|
public int code() {
|
||||||
return MessageCodes.DISCONNECT;
|
return MessageCodes.DISCONNECT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,27 +1,33 @@
|
|||||||
package org.codenil.comm.message;
|
package org.codenil.comm.message;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
public enum DisconnectReason {
|
public enum DisconnectReason {
|
||||||
|
|
||||||
UNKNOWN(null),
|
UNKNOWN((byte) 0x00, ""),
|
||||||
|
|
||||||
TIMEOUT((byte) 0x0b),
|
TIMEOUT((byte) 0x0b, ""),
|
||||||
|
|
||||||
INVALID_MESSAGE_RECEIVED((byte) 0x02, "An exception was caught decoding message"),
|
INVALID_MESSAGE_RECEIVED((byte) 0x02, "An exception was caught decoding message"),
|
||||||
|
|
||||||
;
|
;
|
||||||
|
|
||||||
private final Optional<Byte> code;
|
private final Byte code;
|
||||||
private final Optional<String> message;
|
private final String message;
|
||||||
|
|
||||||
DisconnectReason(final Byte code) {
|
DisconnectReason(final Byte code) {
|
||||||
this.code = Optional.ofNullable(code);
|
this.code = code;
|
||||||
this.message = Optional.empty();
|
this.message = "";
|
||||||
}
|
}
|
||||||
|
|
||||||
DisconnectReason(final Byte code, final String message) {
|
DisconnectReason(final Byte code, final String message) {
|
||||||
this.code = Optional.ofNullable(code);
|
this.code = code;
|
||||||
this.message = Optional.of(message);
|
this.message = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Byte code() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String message() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,17 +3,17 @@ package org.codenil.comm.message;
|
|||||||
public abstract class EmptyMessage implements Message {
|
public abstract class EmptyMessage implements Message {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final int getSize() {
|
public final int size() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getData() {
|
public byte[] data() {
|
||||||
return new byte[]{};
|
return new byte[]{};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return getClass().getSimpleName() + "{ code=" + getCode() + ", size=0}";
|
return getClass().getSimpleName() + "{ code=" + code() + ", size=0}";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,15 +3,15 @@ package org.codenil.comm.message;
|
|||||||
public class HelloMessage extends AbstractMessage {
|
public class HelloMessage extends AbstractMessage {
|
||||||
|
|
||||||
public HelloMessage(final byte[] data) {
|
public HelloMessage(final byte[] data) {
|
||||||
super(data);
|
super.setData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HelloMessage create() {
|
public static HelloMessage create(byte[] bytes) {
|
||||||
return new HelloMessage(new byte[0]);
|
return new HelloMessage(new byte[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCode() {
|
public int code() {
|
||||||
return MessageCodes.HELLO;
|
return MessageCodes.HELLO;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,17 +2,18 @@ package org.codenil.comm.message;
|
|||||||
|
|
||||||
public interface Message {
|
public interface Message {
|
||||||
|
|
||||||
String getRequestId();
|
String requestId();
|
||||||
|
|
||||||
int getSize();
|
int size();
|
||||||
|
|
||||||
int getCode();
|
int code();
|
||||||
|
|
||||||
byte[] getData();
|
byte[] data();
|
||||||
|
|
||||||
default RawMessage wrapMessage(final String requestId) {
|
default RawMessage wrapMessage(final String requestId) {
|
||||||
RawMessage rawMessage = new RawMessage(getCode(), getData());
|
RawMessage rawMessage = RawMessage.create(code());
|
||||||
rawMessage.setRequestId(requestId);
|
rawMessage.setRequestId(requestId);
|
||||||
|
rawMessage.setData(data());
|
||||||
return rawMessage;
|
return rawMessage;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,8 @@ public class MessageCodes {
|
|||||||
public static final int PING = 0x02;
|
public static final int PING = 0x02;
|
||||||
public static final int PONG = 0x03;
|
public static final int PONG = 0x03;
|
||||||
|
|
||||||
public static final int DATA_UPDATE = 0x04;
|
public static final int GOSSIP = 0x04;
|
||||||
|
|
||||||
private MessageCodes() {}
|
private MessageCodes() {}
|
||||||
|
|
||||||
public static String messageName(final int code) {
|
public static String messageName(final int code) {
|
||||||
|
@ -10,12 +10,12 @@ public class PingMessage extends EmptyMessage {
|
|||||||
private PingMessage() {}
|
private PingMessage() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRequestId() {
|
public String requestId() {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCode() {
|
public int code() {
|
||||||
return MessageCodes.PING;
|
return MessageCodes.PING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,12 +10,12 @@ public class PongMessage extends EmptyMessage {
|
|||||||
private PongMessage() {}
|
private PongMessage() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getRequestId() {
|
public String requestId() {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCode() {
|
public int code() {
|
||||||
return MessageCodes.PONG;
|
return MessageCodes.PONG;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,18 +1,56 @@
|
|||||||
package org.codenil.comm.message;
|
package org.codenil.comm.message;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
|
import org.codenil.comm.serialize.SerializeHelper;
|
||||||
|
|
||||||
public class RawMessage extends AbstractMessage {
|
public class RawMessage extends AbstractMessage {
|
||||||
|
|
||||||
private final int code;
|
private final int code;
|
||||||
|
|
||||||
public RawMessage(
|
protected RawMessage(final int code) {
|
||||||
final int code,
|
|
||||||
final byte[] data) {
|
|
||||||
super(data);
|
|
||||||
this.code = code;
|
this.code = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static RawMessage create(
|
||||||
|
final int code) {
|
||||||
|
return new RawMessage(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RawMessage decode(byte[] messageBytes) {
|
||||||
|
ByteBuf buf = Unpooled.wrappedBuffer(messageBytes);
|
||||||
|
buf.readerIndex(0);
|
||||||
|
|
||||||
|
int code = buf.readInt();
|
||||||
|
|
||||||
|
int requestIdLength = buf.readInt();
|
||||||
|
byte[] requestIdBytes = new byte[requestIdLength];
|
||||||
|
buf.readBytes(requestIdBytes);
|
||||||
|
|
||||||
|
int dataLength = buf.readInt();
|
||||||
|
byte[] dataBytes = new byte[dataLength];
|
||||||
|
buf.readBytes(dataBytes);
|
||||||
|
|
||||||
|
RawMessage rawMessage = create(code);
|
||||||
|
rawMessage.setRequestId(new String(requestIdBytes));
|
||||||
|
rawMessage.setData(dataBytes);
|
||||||
|
return rawMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte[] encode(RawMessage rawMessage) {
|
||||||
|
SerializeHelper converter = new SerializeHelper();
|
||||||
|
|
||||||
|
ByteBuf byteBuf = converter
|
||||||
|
.writeVersion("1.0")
|
||||||
|
.writeInt(rawMessage.code())
|
||||||
|
.writeString(rawMessage.requestId())
|
||||||
|
.writeBytes(rawMessage.data())
|
||||||
|
.build();
|
||||||
|
return converter.readPayload(byteBuf);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getCode() {
|
public int code() {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,13 +7,18 @@ import io.netty.channel.nio.NioEventLoopGroup;
|
|||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
import org.codenil.comm.RemotePeer;
|
|
||||||
import org.codenil.comm.connections.*;
|
|
||||||
import org.codenil.comm.NetworkConfig;
|
import org.codenil.comm.NetworkConfig;
|
||||||
|
import org.codenil.comm.RemotePeer;
|
||||||
|
import org.codenil.comm.callback.ConnectCallback;
|
||||||
|
import org.codenil.comm.connections.ConnectionInitializer;
|
||||||
|
import org.codenil.comm.connections.PeerConnection;
|
||||||
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
|
import org.codenil.comm.connections.Subscribers;
|
||||||
|
import org.codenil.comm.handler.TimeoutHandler;
|
||||||
import org.codenil.comm.handshake.HandshakeHandlerInbound;
|
import org.codenil.comm.handshake.HandshakeHandlerInbound;
|
||||||
import org.codenil.comm.handshake.HandshakeHandlerOutbound;
|
import org.codenil.comm.handshake.HandshakeHandlerOutbound;
|
||||||
import org.codenil.comm.handshake.PlainHandshaker;
|
import org.codenil.comm.handshake.PlainHandshaker;
|
||||||
import org.codenil.comm.netty.handler.TimeoutHandler;
|
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -39,13 +44,17 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
private final AtomicBoolean stopped = new AtomicBoolean(false);
|
||||||
|
|
||||||
private final NetworkConfig config;
|
private final NetworkConfig config;
|
||||||
|
private final String selfIdentifier;
|
||||||
|
|
||||||
private ChannelFuture server;
|
private ChannelFuture server;
|
||||||
|
|
||||||
|
|
||||||
public NettyConnectionInitializer(
|
public NettyConnectionInitializer(
|
||||||
final NetworkConfig config,
|
final NetworkConfig config,
|
||||||
|
final String selfIdentifier,
|
||||||
final PeerConnectionEvents eventDispatcher) {
|
final PeerConnectionEvents eventDispatcher) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.selfIdentifier = selfIdentifier;
|
||||||
this.eventDispatcher = eventDispatcher;
|
this.eventDispatcher = eventDispatcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,11 +120,6 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
return stoppedFuture;
|
return stoppedFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void subscribeIncomingConnect(ConnectCallback callback) {
|
|
||||||
connectSubscribers.subscribe(callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 连接到远程
|
* 连接到远程
|
||||||
*/
|
*/
|
||||||
@ -123,10 +127,12 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
public CompletableFuture<PeerConnection> connect(RemotePeer remotePeer) {
|
public CompletableFuture<PeerConnection> connect(RemotePeer remotePeer) {
|
||||||
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
|
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
|
||||||
|
|
||||||
|
String[] parts = remotePeer.endpoint().split(":");
|
||||||
|
|
||||||
new Bootstrap()
|
new Bootstrap()
|
||||||
.group(workers)
|
.group(workers)
|
||||||
.channel(NioSocketChannel.class)
|
.channel(NioSocketChannel.class)
|
||||||
.remoteAddress(new InetSocketAddress(remotePeer.ip(), remotePeer.listeningPort()))
|
.remoteAddress(new InetSocketAddress(parts[0], Integer.parseInt(parts[1])))
|
||||||
.option(ChannelOption.TCP_NODELAY, true)
|
.option(ChannelOption.TCP_NODELAY, true)
|
||||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT_SECONDS * 1000)
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, TIMEOUT_SECONDS * 1000)
|
||||||
.handler(outboundChannelInitializer(remotePeer, connectionFuture))
|
.handler(outboundChannelInitializer(remotePeer, connectionFuture))
|
||||||
@ -141,6 +147,7 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
return connectionFuture;
|
return connectionFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nonnull
|
||||||
private ChannelInitializer<SocketChannel> inboundChannelInitializer() {
|
private ChannelInitializer<SocketChannel> inboundChannelInitializer() {
|
||||||
return new ChannelInitializer<>() {
|
return new ChannelInitializer<>() {
|
||||||
@Override
|
@Override
|
||||||
@ -152,7 +159,7 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
//其他处理器,TLS之类的
|
//其他处理器,TLS之类的
|
||||||
addAdditionalInboundHandlers(ch);
|
addAdditionalInboundHandlers(ch);
|
||||||
//握手消息处理器
|
//握手消息处理器
|
||||||
ch.pipeline().addLast(inboundHandler(connectionFuture));
|
ch.pipeline().addLast(inboundHandler(selfIdentifier, connectionFuture));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -164,11 +171,11 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
@Override
|
@Override
|
||||||
protected void initChannel(final SocketChannel ch) throws Exception {
|
protected void initChannel(final SocketChannel ch) throws Exception {
|
||||||
//连接处理器
|
//连接处理器
|
||||||
ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.ip()));
|
ch.pipeline().addLast(timeoutHandler(connectionFuture, "Timed out waiting to establish connection with peer: " + remotePeer.toString()));
|
||||||
//其他处理器
|
//其他处理器
|
||||||
addAdditionalOutboundHandlers(ch, remotePeer);
|
addAdditionalOutboundHandlers(ch, remotePeer);
|
||||||
//握手消息处理器
|
//握手消息处理器
|
||||||
ch.pipeline().addLast(outboundHandler(remotePeer, connectionFuture));
|
ch.pipeline().addLast(outboundHandler(selfIdentifier, remotePeer, connectionFuture));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -182,14 +189,19 @@ public class NettyConnectionInitializer implements ConnectionInitializer {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private HandshakeHandlerInbound inboundHandler(
|
private HandshakeHandlerInbound inboundHandler(
|
||||||
|
final String selfIdentifier,
|
||||||
final CompletableFuture<PeerConnection> connectionFuture) {
|
final CompletableFuture<PeerConnection> connectionFuture) {
|
||||||
return new HandshakeHandlerInbound(connectionFuture, eventDispatcher, new PlainHandshaker());
|
return new HandshakeHandlerInbound(selfIdentifier, connectionFuture,
|
||||||
|
eventDispatcher, new PlainHandshaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
private HandshakeHandlerOutbound outboundHandler(
|
private HandshakeHandlerOutbound outboundHandler(
|
||||||
final RemotePeer remotePeer, final CompletableFuture<PeerConnection> connectionFuture) {
|
final String selfIdentifier,
|
||||||
return new HandshakeHandlerOutbound(connectionFuture, eventDispatcher, new PlainHandshaker());
|
final RemotePeer remotePeer,
|
||||||
|
final CompletableFuture<PeerConnection> connectionFuture) {
|
||||||
|
return new HandshakeHandlerOutbound(selfIdentifier, connectionFuture,
|
||||||
|
eventDispatcher, new PlainHandshaker());
|
||||||
}
|
}
|
||||||
|
|
||||||
void addAdditionalOutboundHandlers(final Channel channel, final RemotePeer remotePeer)
|
void addAdditionalOutboundHandlers(final Channel channel, final RemotePeer remotePeer)
|
||||||
|
@ -1,11 +1,17 @@
|
|||||||
package org.codenil.comm.netty;
|
package org.codenil.comm.netty;
|
||||||
|
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
|
||||||
import org.codenil.comm.connections.AbstractPeerConnection;
|
import org.codenil.comm.connections.AbstractPeerConnection;
|
||||||
import org.codenil.comm.connections.PeerConnectionEvents;
|
import org.codenil.comm.connections.PeerConnectionEvents;
|
||||||
|
import org.codenil.comm.message.DisconnectMessage;
|
||||||
|
import org.codenil.comm.message.DisconnectReason;
|
||||||
import org.codenil.comm.message.Message;
|
import org.codenil.comm.message.Message;
|
||||||
import org.codenil.comm.message.RawMessage;
|
import org.codenil.comm.message.RawMessage;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
@ -13,24 +19,56 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
|||||||
|
|
||||||
public class NettyPeerConnection extends AbstractPeerConnection {
|
public class NettyPeerConnection extends AbstractPeerConnection {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(AbstractPeerConnection.class);
|
||||||
|
|
||||||
private final ChannelHandlerContext ctx;
|
private final ChannelHandlerContext ctx;
|
||||||
|
|
||||||
|
private final PeerConnectionEvents connectionEvents;
|
||||||
|
|
||||||
public NettyPeerConnection(
|
public NettyPeerConnection(
|
||||||
final ChannelHandlerContext ctx,
|
final ChannelHandlerContext ctx,
|
||||||
|
final String remoteIdentifier,
|
||||||
final PeerConnectionEvents connectionEvents) {
|
final PeerConnectionEvents connectionEvents) {
|
||||||
super(connectionEvents);
|
super(remoteIdentifier);
|
||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
|
this.connectionEvents = connectionEvents;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void terminateConnection() {
|
||||||
|
if (terminatedImmediately.compareAndSet(false, true)) {
|
||||||
|
if (disconnected.compareAndSet(false, true)) {
|
||||||
|
connectionEvents.dispatchDisconnect(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
closeConnectionImmediately();
|
||||||
|
logger.atTrace()
|
||||||
|
.setMessage("Terminating connection, reason {}")
|
||||||
|
.addArgument(this)
|
||||||
|
.log();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String peerIdentity() {
|
public void disconnect(DisconnectReason reason) {
|
||||||
return ctx.channel().remoteAddress().toString();
|
if (disconnected.compareAndSet(false, true)) {
|
||||||
|
connectionEvents.dispatchDisconnect(this);
|
||||||
|
doSendMessage(DisconnectMessage.create(reason));
|
||||||
|
closeConnection();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doSendMessage(final Message message) {
|
protected void doSendMessage(final Message message) {
|
||||||
ctx.channel().writeAndFlush(new RawMessage(message.getCode(), message.getData()));
|
RawMessage rawMessage = RawMessage.create(message.code());
|
||||||
|
rawMessage.setData(message.data());
|
||||||
|
ctx.channel().writeAndFlush(rawMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doReplaceHandler(String name, ChannelHandler newHandler) {
|
||||||
|
ctx.channel().pipeline()
|
||||||
|
.replace(name, name, newHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1,17 +0,0 @@
|
|||||||
package org.codenil.comm.netty;
|
|
||||||
|
|
||||||
import org.codenil.comm.message.Message;
|
|
||||||
|
|
||||||
public class OutboundMessage {
|
|
||||||
|
|
||||||
private final Message message;
|
|
||||||
|
|
||||||
public OutboundMessage(
|
|
||||||
final Message message) {
|
|
||||||
this.message = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Message message() {
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user