/* * MIT License * * Copyright (c) 2023 OrdinaryRoad * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ package tech.ordinaryroad.live.chat.client.servers.netty.client.base; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import tech.ordinaryroad.live.chat.client.commons.base.exception.BaseException; import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseConnectionListener; import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseMsgListener; import tech.ordinaryroad.live.chat.client.commons.base.msg.IMsg; import tech.ordinaryroad.live.chat.client.commons.client.BaseLiveChatClient; import tech.ordinaryroad.live.chat.client.commons.client.enums.ClientStatusEnums; import tech.ordinaryroad.live.chat.client.servers.netty.client.config.BaseNettyClientConfig; import tech.ordinaryroad.live.chat.client.servers.netty.handler.base.BaseBinaryFrameHandler; import tech.ordinaryroad.live.chat.client.servers.netty.handler.base.BaseConnectionHandler; import javax.net.ssl.SSLException; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** * @author mjz * @date 2023/8/26 */ @Slf4j public abstract class BaseNettyClient , Msg extends IMsg, MsgListener extends IBaseMsgListener, ConnectionHandler extends BaseConnectionHandler, BinaryFrameHandler extends BaseBinaryFrameHandler > extends BaseLiveChatClient { @Getter private final EventLoopGroup workerGroup; @Getter private final Bootstrap bootstrap = new Bootstrap(); private BinaryFrameHandler binaryFrameHandler; private ConnectionHandler connectionHandler; private IBaseConnectionListener connectionListener; private Channel channel; @Getter private URI websocketUri; protected IBaseConnectionListener clientConnectionListener; /** * 控制弹幕发送频率 */ private volatile long lastSendDanmuTimeInMillis; public abstract ConnectionHandler initConnectionHandler(IBaseConnectionListener clientConnectionListener); public abstract BinaryFrameHandler initBinaryFrameHandler(); protected BaseNettyClient(Config config, EventLoopGroup workerGroup, IBaseConnectionListener connectionListener) { super(config); this.workerGroup = workerGroup; this.connectionListener = connectionListener; } public void onConnected(ConnectionHandler connectionHandler) { this.setStatus(ClientStatusEnums.CONNECTED); if (this.connectionListener != null) { this.connectionListener.onConnected(connectionHandler); } } public void onConnectFailed(ConnectionHandler connectionHandler) { this.setStatus(ClientStatusEnums.CONNECT_FAILED); tryReconnect(); if (this.connectionListener != null) { this.connectionListener.onConnectFailed(connectionHandler); } } public void onDisconnected(ConnectionHandler connectionHandler) { this.setStatus(ClientStatusEnums.DISCONNECTED); tryReconnect(); if (this.connectionListener != null) { this.connectionListener.onDisconnected(connectionHandler); } } @Override public void init() { if (checkStatus(ClientStatusEnums.INITIALIZED)) { return; } try { this.websocketUri = new URI(getWebSocketUriString()); SslContext sslCtx = SslContextBuilder.forClient().build(); this.clientConnectionListener = new IBaseConnectionListener() { @Override public void onConnected(ConnectionHandler connectionHandler) { BaseNettyClient.this.onConnected(connectionHandler); } @Override public void onConnectFailed(ConnectionHandler connectionHandler) { BaseNettyClient.this.onConnectFailed(connectionHandler); } @Override public void onDisconnected(ConnectionHandler connectionHandler) { BaseNettyClient.this.onDisconnected(connectionHandler); } }; this.binaryFrameHandler = this.initBinaryFrameHandler(); this.connectionHandler = this.initConnectionHandler(this.clientConnectionListener); this.bootstrap.group(this.workerGroup) // 创建Channel .channel(NioSocketChannel.class) .remoteAddress(this.websocketUri.getHost(), getInetPort()) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) // Channel配置 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { // 责任链 ChannelPipeline pipeline = ch.pipeline(); // 放到第一位 addFirst 支持wss链接服务端 pipeline.addFirst(sslCtx.newHandler(ch.alloc(), BaseNettyClient.this.websocketUri.getHost(), getInetPort())); // 添加一个http的编解码器 pipeline.addLast(new HttpClientCodec()); // 添加一个用于支持大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response pipeline.addLast(new HttpObjectAggregator(BaseNettyClient.this.getConfig().getAggregatorMaxContentLength())); // 连接处理器 pipeline.addLast(BaseNettyClient.this.connectionHandler); // 弹幕处理器 pipeline.addLast(BaseNettyClient.this.binaryFrameHandler); } }); this.setStatus(ClientStatusEnums.INITIALIZED); } catch (URISyntaxException e) { throw new BaseException(e); } catch (SSLException e) { throw new BaseException(e); } } private int getInetPort() { int port = this.websocketUri.getPort(); return port == -1 ? "wss".equalsIgnoreCase(websocketUri.getScheme()) ? 443 : 80 : port; } @Override public void connect(Runnable success, Consumer failed) { if (this.cancelReconnect) { this.cancelReconnect = false; } if (!checkStatus(ClientStatusEnums.INITIALIZED)) { return; } if (getStatus() == ClientStatusEnums.CONNECTED) { return; } if (getStatus() != ClientStatusEnums.RECONNECTING) { this.setStatus(ClientStatusEnums.CONNECTING); } this.bootstrap.connect().addListener((ChannelFutureListener) connectFuture -> { if (connectFuture.isSuccess()) { if (log.isDebugEnabled()) { log.debug("连接建立成功!"); } this.channel = connectFuture.channel(); // 监听是否握手成功 this.connectionHandler.getHandshakeFuture().addListener((ChannelFutureListener) handshakeFuture -> { try { connectionHandler.sendAuthRequest(channel); if (success != null) { success.run(); } } catch (Exception e) { log.error("认证包发送失败,断开连接", e); this.disconnect(); } }); } else { log.error("连接建立失败", connectFuture.cause()); this.onConnectFailed(this.connectionHandler); if (failed != null) { failed.accept(connectFuture.cause()); } } }); } @Override public void disconnect() { if (this.channel == null) { return; } this.channel.close(); } @Override protected void tryReconnect() { if (this.cancelReconnect) { this.cancelReconnect = false; return; } if (!getConfig().isAutoReconnect()) { return; } if (log.isWarnEnabled()) { log.warn("{}s后将重新连接 {}", getConfig().getReconnectDelay(), getConfig().getRoomId()); } workerGroup.schedule(() -> { this.setStatus(ClientStatusEnums.RECONNECTING); this.connect(); }, getConfig().getReconnectDelay(), TimeUnit.SECONDS); } @Override public void send(Object msg, Runnable success, Consumer failed) { ChannelFuture future = this.channel.writeAndFlush(msg); if (success != null || failed != null) { future.addListener((ChannelFutureListener) channelFuture -> { if (channelFuture.isSuccess()) { if (success != null) { success.run(); } } else { if (failed != null) { failed.accept(channelFuture.cause()); } } }); } } @Override public void destroy() { super.destroy(); // 销毁时不需要重连 this.cancelReconnect = true; workerGroup.shutdownGracefully().addListener(future -> { if (future.isSuccess()) { this.setStatus(ClientStatusEnums.DESTROYED); } else { throw new BaseException("client销毁失败", future.cause()); } }); } @Override protected String getWebSocketUriString() { return getConfig().getWebsocketUri(); } @Override protected void setStatus(ClientStatusEnums status) { if (log.isDebugEnabled()) { if (getStatus() != status) { log.debug("{} 状态变化 {} => {}\n", getClass().getSimpleName(), getStatus(), status); } } super.setStatus(status); } @Override public void sendDanmu(Object danmu, Runnable success, Consumer failed) { throw new BaseException("暂未支持该功能"); } @Override public void clickLike(int count, Runnable success, Consumer failed) { throw new BaseException("暂未支持该功能"); } /** * 发送弹幕前判断是否可以发送 * * @param checkConnected 是否检查Client连接状态 */ protected boolean checkCanSendDanmu(boolean checkConnected) { if (checkConnected && getStatus() != ClientStatusEnums.CONNECTED) { throw new BaseException("连接未建立,无法发送弹幕"); } if (System.currentTimeMillis() - this.lastSendDanmuTimeInMillis <= getConfig().getMinSendDanmuPeriod()) { if (log.isWarnEnabled()) { log.warn("发送弹幕频率过快,忽略该次发送"); } return false; } return true; } protected boolean checkCanSendDanmu() { return checkCanSendDanmu(true); } /** * 发送弹幕后调用该方法 */ protected void finishSendDanmu() { this.lastSendDanmuTimeInMillis = System.currentTimeMillis(); if (log.isDebugEnabled()) { log.debug("弹幕发送完成"); } } public void iteratorMsgListeners(Consumer consumer) { binaryFrameHandler.iteratorMsgListeners(consumer); } }