using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; using DotNetty.Buffers; using DotNetty.Codecs; using Team.Communicate.EventArg; using Team.Communicate.State; using Team.Utility; namespace Team.Communicate.Server { /// /// TcpServer /// public sealed class TeamTcpServer : ITeamTcpServer { #region Fields /// /// 服务器使用的异步TcpListener /// private TcpListener _listener; /// /// 客户端会话列表 /// private readonly List _clients; private bool _disposed; #endregion #region Properties public IByteBuffer[] EndWith { get; set; } = Delimiters.LineDelimiter(); /// /// 服务器是否正在运行 /// public bool IsRunning { get; private set; } /// /// 监听的IP地址 /// public IPAddress Address { get; private set; } /// /// 监听的端口 /// public int Port { get; } public Encoding Encoding { get;set; } = Encoding.UTF8; #endregion #region 构造函数 /// /// /// 异步TCP服务器 /// /// 监听的端口 public TeamTcpServer(int listenPort) : this(IPAddress.Any, listenPort) { } /// /// /// 异步TCP服务器 /// /// 监听的终结点 public TeamTcpServer(IPEndPoint localEp) : this(localEp.Address, localEp.Port) { } /// /// 异步TCP服务器 /// /// 监听的IP地址 /// 监听的端口 private TeamTcpServer(IPAddress localIpAddress, int listenPort) { Address = localIpAddress; Port = listenPort; Encoding = Encoding.UTF8; _clients = new List(); _listener = new TcpListener(Address, Port); _listener.AllowNatTraversal(true); } #endregion #region Method /// /// /// 启动服务器 /// public void Start() { if (IsRunning) return; IsRunning = true; _listener.Start(); _listener.BeginAcceptTcpClient( HandleTcpClientAccepted, _listener); } /// /// /// 启动服务器 /// /// /// 服务器所允许的挂起连接序列的最大长度 /// public void Start(int backlog) { if (IsRunning) return; IsRunning = true; _listener.Start(backlog); //_listener.BeginAcceptTcpClient( // HandleTcpClientAccepted, _listener); new Thread(async () => { while (true) { var client = await _listener.AcceptTcpClientAsync(); var buffer = new byte[client.ReceiveBufferSize]; var state = new TcpClientState(client, buffer); _clients.Add(state); var thread = new Thread(NetWorkMethod); thread.Start(state); if (!IsRunning) { break; } } }) { IsBackground = true }.Start(); } private async void NetWorkMethod(object client) { if (client is TcpClientState tcpClientState) { var buffer = new byte[1024]; var length= await tcpClientState.NetworkStream.ReadAsync(buffer,0,buffer.Length); } } /// /// /// 停止服务器 /// public void Stop() { if (!IsRunning) return; IsRunning = false; _listener.Stop(); lock (_clients) { //关闭所有客户端连接 CloseAllClient(); } } /// /// 处理客户端连接的函数 /// /// private void HandleTcpClientAccepted(IAsyncResult ar) { if (!IsRunning) return; var client = _listener.EndAcceptTcpClient(ar); var buffer = new byte[client.ReceiveBufferSize]; var state = new TcpClientState(client, buffer); lock (_clients) { _clients.Add(state); RaiseClientConnected(state); } var stream = state.NetworkStream; //开始异步读取数据 stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state); _listener.BeginAcceptTcpClient( HandleTcpClientAccepted, ar.AsyncState); } /// /// 数据接受回调函数 /// /// private void HandleDataReceived(IAsyncResult ar) { if (!IsRunning) return; var state = (TcpClientState) ar.AsyncState; var stream = state.NetworkStream; int rec; try { rec = stream.EndRead(ar); } catch (Exception e) { LogUtil.WriteInfo("server 发生异常:" + e); rec = 0; } if (rec == 0) { // connection has been closed lock (_clients) { _clients.Remove(state); //触发客户端连接断开事件 RaiseClientDisconnected(); return; } } // received byte and trigger event notification var buff = new byte[rec]; Buffer.BlockCopy(state.Buffer, 0, buff, 0, rec); state.Memory = new MemoryStream(); state.Memory.Write(buff, 0, buff.Length); //state.Buffer = buff; //触发数据收到事件 try { RaiseDataReceived(state); } catch (Exception) { lock (_clients) { _clients.Remove(state); //触发客户端连接断开事件 RaiseClientDisconnected(); return; } } // continue listening for tcp diagram packets stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state); } /// /// 发送数据 /// /// 接收数据的客户端会话 /// 数据报文 public void Send(TcpClientState state, byte[] data) { RaisePrepareSend(state); Send(state.TcpClient, data); } /// /// 异步发送数据 /// /// 接收数据的客户端会话 /// 数据报文 public Task SendAsync(TcpClientState state, byte[] data) { RaisePrepareSend(state); return SendAsync(state.TcpClient, data); } /// /// 指定IP地址发送数据 /// /// /// public void Send(string ip, byte[] data) { var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip); RaisePrepareSend(tcpClient); if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (ip == null) throw new ArgumentNullException(nameof(ip)); if (data == null) throw new ArgumentNullException(nameof(data)); tcpClient.TcpClient.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, tcpClient.TcpClient); } /// /// 指定IP地址异步发送数据 /// /// /// public Task SendAsync(string ip, byte[] data) { var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip); if (tcpClient==null) { return Task.CompletedTask; } RaisePrepareSend(tcpClient); if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (ip == null) throw new ArgumentNullException(nameof(ip)); if (data == null) throw new ArgumentNullException(nameof(data)); var stream = tcpClient.TcpClient.GetStream(); return stream.WriteAsync(data, 0, data.Length); } /// /// 异步发送数据至指定的客户端 /// /// 客户端 /// 报文 public void Send(TcpClient client, byte[] data) { if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (client == null) throw new ArgumentNullException(nameof(client)); if (data == null) throw new ArgumentNullException(nameof(data)); client.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, client); } /// /// 异步发送数据至指定的客户端 /// /// 客户端 /// 报文 public Task SendAsync(TcpClient client, byte[] data) { if (!IsRunning) throw new InvalidProgramException("This TCP Socket server has not been started."); if (client == null) throw new ArgumentNullException(nameof(client)); if (data == null) throw new ArgumentNullException(nameof(data)); var stream = client.GetStream(); return stream.WriteAsync(data, 0, data.Length); } /// /// 发送数据完成处理函数 /// /// 目标客户端Socket private void SendDataEnd(IAsyncResult ar) { ((TcpClient) ar.AsyncState).GetStream().EndWrite(ar); RaiseCompletedSend(null); } /// /// 广播发送数据 /// /// 消息内容 public void BroadToClient(string msg) { foreach (var client in _clients) { var bytes = Encoding.GetBytes(msg); Send(client, bytes); } } public async Task BroadToClientAsync(string msg) { foreach (var client in _clients) { var bytes = Encoding.GetBytes(msg); await SendAsync(client, bytes); } } #endregion #region 事件 /// /// 与客户端的连接已建立事件 /// public event EventHandler ClientConnected; /// /// 与客户端的连接已断开事件 /// public event EventHandler ClientDisconnected; /// /// 触发客户端连接事件 /// /// private void RaiseClientConnected(TcpClientState state) { ClientConnected?.Invoke(this, new ServerEventArgs(state)); } /// /// 触发客户端连接断开事件 /// private void RaiseClientDisconnected() { ClientDisconnected?.Invoke(this, new ServerEventArgs("连接断开")); } /// /// /// 接收到数据事件 /// public event EventHandler DataReceived; private void RaiseDataReceived(TcpClientState state) { DataReceived?.BeginInvoke(this, new ServerEventArgs(state), _ => { } ,null); } /// /// 发送数据前的事件 /// public event EventHandler PrepareSend; /// /// 触发发送数据前的事件 /// /// private void RaisePrepareSend(TcpClientState state) { PrepareSend?.Invoke(this, new ServerEventArgs(state)); } /// /// 数据发送完毕事件 /// public event EventHandler CompletedSend; /// /// 触发数据发送完毕的事件 /// /// private void RaiseCompletedSend(TcpClientState state) { CompletedSend?.Invoke(this, new ServerEventArgs(state)); } /// /// 网络错误事件 /// public event EventHandler NetError; /// /// 异常事件 /// public event EventHandler OtherException; /// /// 触发异常事件 /// /// /// private void RaiseOtherException(TcpClientState state, string description = "unknown exception") { OtherException?.Invoke(this, new ServerEventArgs(description, state)); } #endregion #region Close /// /// /// 关闭一个与客户端之间的会话 /// /// 需要关闭的客户端会话对象 public void Close(TcpClientState state) { if (state == null) return; state.Close(); _clients.Remove(state); } /// /// /// 关闭所有的客户端会话,与所有的客户端连接会断开 /// public void CloseAllClient() { foreach (var client in _clients) { client?.Close(); } _clients.Clear(); } #endregion #region 释放 /// /// /// /// /// Performs application-defined tasks associated with freeing, /// releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } ~TeamTcpServer() { Dispose(); } /// /// Releases unmanaged and - optionally - managed resources /// /// true to release /// both managed and unmanaged resources; false /// to release only unmanaged resources. private void Dispose(bool disposing) { if (_disposed) return; if (disposing) { try { Stop(); // ReSharper disable once RedundantCheckBeforeAssignment if (_listener != null) { _listener = null; } } catch (SocketException e) { RaiseOtherException(null, e.ToString()); } } _disposed = true; } public void UseUnRegisterMessage(string key) { } public void Register(string id, Func func) { } public void UnRegister(string id) { } #endregion } }