using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using DotNetty.Buffers; using DotNetty.Codecs; using Team.Communicate.EventArg; using Team.Communicate.State; using Team.Utility; namespace Team.Communicate.Server { /// <summary> /// TcpServer /// </summary> public sealed class TeamTcpServer : ITeamTcpServer { #region Fields /// <summary> /// 服务器使用的异步TcpListener /// </summary> private TcpListener _listener; /// <summary> /// 客户端会话列表 /// </summary> private readonly List<TcpClientState> _clients; private bool _disposed; #endregion #region Properties public IByteBuffer[] EndWith { get; set; } = Delimiters.LineDelimiter(); /// <summary> /// 服务器是否正在运行 /// </summary> public bool IsRunning { get; private set; } /// <summary> /// 监听的IP地址 /// </summary> public IPAddress Address { get; private set; } /// <summary> /// 监听的端口 /// </summary> public int Port { get; } public Encoding Encoding { get;set; } = Encoding.UTF8; #endregion #region 构造函数 /// <inheritdoc /> /// <summary> /// 异步TCP服务器 /// </summary> /// <param name="listenPort">监听的端口</param> public TeamTcpServer(int listenPort) : this(IPAddress.Any, listenPort) { } /// <inheritdoc /> /// <summary> /// 异步TCP服务器 /// </summary> /// <param name="localEp">监听的终结点</param> public TeamTcpServer(IPEndPoint localEp) : this(localEp.Address, localEp.Port) { } /// <summary> /// 异步TCP服务器 /// </summary> /// <param name="localIpAddress">监听的IP地址</param> /// <param name="listenPort">监听的端口</param> private TeamTcpServer(IPAddress localIpAddress, int listenPort) { Address = localIpAddress; Port = listenPort; Encoding = Encoding.UTF8; _clients = new List<TcpClientState>(); _listener = new TcpListener(Address, Port); _listener.AllowNatTraversal(true); } #endregion #region Method /// <inheritdoc /> /// <summary> /// 启动服务器 /// </summary> public void Start() { if (IsRunning) return; IsRunning = true; _listener.Start(); _listener.BeginAcceptTcpClient( HandleTcpClientAccepted, _listener); } /// <inheritdoc /> /// <summary> /// 启动服务器 /// </summary> /// <param name="backlog"> /// 服务器所允许的挂起连接序列的最大长度 /// </param> public void Start(int backlog) { if (IsRunning) return; IsRunning = true; _listener.Start(backlog); //_listener.BeginAcceptTcpClient( // HandleTcpClientAccepted, _listener); new Thread(async () => { while (true) { var client = await _listener.AcceptTcpClientAsync(); var buffer = new byte[client.ReceiveBufferSize]; var state = new TcpClientState(client, buffer); _clients.Add(state); var thread = new Thread(NetWorkMethod); thread.Start(state); if (!IsRunning) { break; } } }) { IsBackground = true }.Start(); } private async void NetWorkMethod(object client) { if (client is TcpClientState tcpClientState) { var buffer = new byte[1024]; var length= await tcpClientState.NetworkStream.ReadAsync(buffer,0,buffer.Length); } } /// <inheritdoc /> /// <summary> /// 停止服务器 /// </summary> public void Stop() { if (!IsRunning) return; IsRunning = false; _listener.Stop(); lock (_clients) { //关闭所有客户端连接 CloseAllClient(); } } /// <summary> /// 处理客户端连接的函数 /// </summary> /// <param name="ar"></param> private void HandleTcpClientAccepted(IAsyncResult ar) { if (!IsRunning) return; var client = _listener.EndAcceptTcpClient(ar); var buffer = new byte[client.ReceiveBufferSize]; var state = new TcpClientState(client, buffer); lock (_clients) { _clients.Add(state); RaiseClientConnected(state); } var stream = state.NetworkStream; //开始异步读取数据 stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state); _listener.BeginAcceptTcpClient( HandleTcpClientAccepted, ar.AsyncState); } /// <summary> /// 数据接受回调函数 /// </summary> /// <param name="ar"></param> private void HandleDataReceived(IAsyncResult ar) { if (!IsRunning) return; var state = (TcpClientState) ar.AsyncState; var stream = state.NetworkStream; int rec; try { rec = stream.EndRead(ar); } catch (Exception e) { LogUtil.WriteInfo("server 发生异常:" + e); rec = 0; } if (rec == 0) { // connection has been closed lock (_clients) { _clients.Remove(state); //触发客户端连接断开事件 RaiseClientDisconnected(); return; } } // received byte and trigger event notification var buff = new byte[rec]; Buffer.BlockCopy(state.Buffer, 0, buff, 0, rec); state.Memory = new MemoryStream(); state.Memory.Write(buff, 0, buff.Length); //state.Buffer = buff; //触发数据收到事件 try { RaiseDataReceived(state); } catch (Exception) { lock (_clients) { _clients.Remove(state); //触发客户端连接断开事件 RaiseClientDisconnected(); return; } } // continue listening for tcp diagram packets stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state); } /// <summary> /// 发送数据 /// </summary> /// <param name="state">接收数据的客户端会话</param> /// <param name="data">数据报文</param> public void Send(TcpClientState state, byte[] data) { RaisePrepareSend(state); Send(state.TcpClient, data); } /// <summary> /// 异步发送数据 /// </summary> /// <param name="state">接收数据的客户端会话</param> /// <param name="data">数据报文</param> public Task SendAsync(TcpClientState state, byte[] data) { RaisePrepareSend(state); return SendAsync(state.TcpClient, data); } /// <summary> /// 指定IP地址发送数据 /// </summary> /// <param name="ip"></param> /// <param name="data"></param> public void Send(string ip, byte[] data) { var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip); RaisePrepareSend(tcpClient); if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (ip == null) throw new ArgumentNullException(nameof(ip)); if (data == null) throw new ArgumentNullException(nameof(data)); tcpClient.TcpClient.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, tcpClient.TcpClient); } /// <summary> /// 指定IP地址异步发送数据 /// </summary> /// <param name="ip"></param> /// <param name="data"></param> public Task SendAsync(string ip, byte[] data) { var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip); if (tcpClient==null) { return Task.CompletedTask; } RaisePrepareSend(tcpClient); if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (ip == null) throw new ArgumentNullException(nameof(ip)); if (data == null) throw new ArgumentNullException(nameof(data)); var stream = tcpClient.TcpClient.GetStream(); return stream.WriteAsync(data, 0, data.Length); } /// <summary> /// 异步发送数据至指定的客户端 /// </summary> /// <param name="client">客户端</param> /// <param name="data">报文</param> public void Send(TcpClient client, byte[] data) { if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (client == null) throw new ArgumentNullException(nameof(client)); if (data == null) throw new ArgumentNullException(nameof(data)); client.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, client); } /// <summary> /// 异步发送数据至指定的客户端 /// </summary> /// <param name="client">客户端</param> /// <param name="data">报文</param> public Task SendAsync(TcpClient client, byte[] data) { if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (client == null) throw new ArgumentNullException(nameof(client)); if (data == null) throw new ArgumentNullException(nameof(data)); var stream = client.GetStream(); return stream.WriteAsync(data, 0, data.Length); } /// <summary> /// 发送数据完成处理函数 /// </summary> /// <param name="ar">目标客户端Socket</param> private void SendDataEnd(IAsyncResult ar) { ((TcpClient) ar.AsyncState).GetStream().EndWrite(ar); RaiseCompletedSend(null); } /// <summary> /// 广播发送数据 /// </summary> /// <param name="msg">消息内容</param> public void BroadToClient(string msg) { foreach (var client in _clients) { var bytes = Encoding.GetBytes(msg); Send(client, bytes); } } public async Task BroadToClientAsync(string msg) { foreach (var client in _clients) { var bytes = Encoding.GetBytes(msg); await SendAsync(client, bytes); } } #endregion #region 事件 /// <summary> /// 与客户端的连接已建立事件 /// </summary> public event EventHandler<ServerEventArgs> ClientConnected; /// <summary> /// 与客户端的连接已断开事件 /// </summary> public event EventHandler<ServerEventArgs> ClientDisconnected; /// <summary> /// 触发客户端连接事件 /// </summary> /// <param name="state"></param> private void RaiseClientConnected(TcpClientState state) { ClientConnected?.Invoke(this, new ServerEventArgs(state)); } /// <summary> /// 触发客户端连接断开事件 /// </summary> private void RaiseClientDisconnected() { ClientDisconnected?.Invoke(this, new ServerEventArgs("连接断开")); } /// <inheritdoc /> /// <summary> /// 接收到数据事件 /// </summary> public event EventHandler<ServerEventArgs> DataReceived; private void RaiseDataReceived(TcpClientState state) { DataReceived?.BeginInvoke(this, new ServerEventArgs(state), _ => { } ,null); } /// <summary> /// 发送数据前的事件 /// </summary> public event EventHandler<ServerEventArgs> PrepareSend; /// <summary> /// 触发发送数据前的事件 /// </summary> /// <param name="state"></param> private void RaisePrepareSend(TcpClientState state) { PrepareSend?.Invoke(this, new ServerEventArgs(state)); } /// <summary> /// 数据发送完毕事件 /// </summary> public event EventHandler<ServerEventArgs> CompletedSend; /// <summary> /// 触发数据发送完毕的事件 /// </summary> /// <param name="state"></param> private void RaiseCompletedSend(TcpClientState state) { CompletedSend?.Invoke(this, new ServerEventArgs(state)); } /// <summary> /// 网络错误事件 /// </summary> public event EventHandler<ServerEventArgs> NetError; /// <summary> /// 异常事件 /// </summary> public event EventHandler<ServerEventArgs> OtherException; /// <summary> /// 触发异常事件 /// </summary> /// <param name="state"></param> /// <param name="description"></param> private void RaiseOtherException(TcpClientState state, string description = "unknown exception") { OtherException?.Invoke(this, new ServerEventArgs(description, state)); } #endregion #region Close /// <inheritdoc /> /// <summary> /// 关闭一个与客户端之间的会话 /// </summary> /// <param name="state">需要关闭的客户端会话对象</param> public void Close(TcpClientState state) { if (state == null) return; state.Close(); _clients.Remove(state); } /// <inheritdoc /> /// <summary> /// 关闭所有的客户端会话,与所有的客户端连接会断开 /// </summary> public void CloseAllClient() { foreach (var client in _clients) { client?.Close(); } _clients.Clear(); } #endregion #region 释放 /// <inheritdoc> /// <cref></cref> /// </inheritdoc> /// <summary> /// Performs application-defined tasks associated with freeing, /// releasing, or resetting unmanaged resources. /// </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } ~TeamTcpServer() { Dispose(); } /// <summary> /// Releases unmanaged and - optionally - managed resources /// </summary> /// <param name="disposing"><c>true</c> to release /// both managed and unmanaged resources; <c>false</c> /// to release only unmanaged resources.</param> private void Dispose(bool disposing) { if (_disposed) return; if (disposing) { try { Stop(); // ReSharper disable once RedundantCheckBeforeAssignment if (_listener != null) { _listener = null; } } catch (SocketException e) { RaiseOtherException(null, e.ToString()); } } _disposed = true; } public void UseUnRegisterMessage(string key) { } public void Register(string id, Func<string, NetworkStream, string> func) { } public void UnRegister(string id) { } #endregion } }