using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs;
using Team.Communicate.EventArg;
using Team.Communicate.State;
using Team.Utility;
namespace Team.Communicate.Server
{
///
/// TcpServer
///
public sealed class TeamTcpServer : ITeamTcpServer
{
#region Fields
///
/// 服务器使用的异步TcpListener
///
private TcpListener _listener;
///
/// 客户端会话列表
///
private readonly List _clients;
private bool _disposed;
#endregion
#region Properties
public IByteBuffer[] EndWith { get; set; } = Delimiters.LineDelimiter();
///
/// 服务器是否正在运行
///
public bool IsRunning { get; private set; }
///
/// 监听的IP地址
///
public IPAddress Address { get; private set; }
///
/// 监听的端口
///
public int Port { get; }
public Encoding Encoding { get;set; } = Encoding.UTF8;
#endregion
#region 构造函数
///
///
/// 异步TCP服务器
///
/// 监听的端口
public TeamTcpServer(int listenPort)
: this(IPAddress.Any, listenPort)
{
}
///
///
/// 异步TCP服务器
///
/// 监听的终结点
public TeamTcpServer(IPEndPoint localEp)
: this(localEp.Address, localEp.Port)
{
}
///
/// 异步TCP服务器
///
/// 监听的IP地址
/// 监听的端口
private TeamTcpServer(IPAddress localIpAddress, int listenPort)
{
Address = localIpAddress;
Port = listenPort;
Encoding = Encoding.UTF8;
_clients = new List();
_listener = new TcpListener(Address, Port);
_listener.AllowNatTraversal(true);
}
#endregion
#region Method
///
///
/// 启动服务器
///
public void Start()
{
if (IsRunning) return;
IsRunning = true;
_listener.Start();
_listener.BeginAcceptTcpClient(
HandleTcpClientAccepted, _listener);
}
///
///
/// 启动服务器
///
///
/// 服务器所允许的挂起连接序列的最大长度
///
public void Start(int backlog)
{
if (IsRunning) return;
IsRunning = true;
_listener.Start(backlog);
//_listener.BeginAcceptTcpClient(
// HandleTcpClientAccepted, _listener);
new Thread(async () =>
{
while (true)
{
var client = await _listener.AcceptTcpClientAsync();
var buffer = new byte[client.ReceiveBufferSize];
var state
= new TcpClientState(client, buffer);
_clients.Add(state);
var thread = new Thread(NetWorkMethod);
thread.Start(state);
if (!IsRunning)
{
break;
}
}
})
{
IsBackground = true
}.Start();
}
private async void NetWorkMethod(object client)
{
if (client is TcpClientState tcpClientState)
{
var buffer = new byte[1024];
var length= await tcpClientState.NetworkStream.ReadAsync(buffer,0,buffer.Length);
}
}
///
///
/// 停止服务器
///
public void Stop()
{
if (!IsRunning) return;
IsRunning = false;
_listener.Stop();
lock (_clients)
{
//关闭所有客户端连接
CloseAllClient();
}
}
///
/// 处理客户端连接的函数
///
///
private void HandleTcpClientAccepted(IAsyncResult ar)
{
if (!IsRunning) return;
var client = _listener.EndAcceptTcpClient(ar);
var buffer = new byte[client.ReceiveBufferSize];
var state
= new TcpClientState(client, buffer);
lock (_clients)
{
_clients.Add(state);
RaiseClientConnected(state);
}
var stream = state.NetworkStream;
//开始异步读取数据
stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state);
_listener.BeginAcceptTcpClient(
HandleTcpClientAccepted, ar.AsyncState);
}
///
/// 数据接受回调函数
///
///
private void HandleDataReceived(IAsyncResult ar)
{
if (!IsRunning) return;
var state = (TcpClientState) ar.AsyncState;
var stream = state.NetworkStream;
int rec;
try
{
rec = stream.EndRead(ar);
}
catch (Exception e)
{
LogUtil.WriteInfo("server 发生异常:" + e);
rec = 0;
}
if (rec == 0)
{
// connection has been closed
lock (_clients)
{
_clients.Remove(state);
//触发客户端连接断开事件
RaiseClientDisconnected();
return;
}
}
// received byte and trigger event notification
var buff = new byte[rec];
Buffer.BlockCopy(state.Buffer, 0, buff, 0, rec);
state.Memory = new MemoryStream();
state.Memory.Write(buff, 0, buff.Length);
//state.Buffer = buff;
//触发数据收到事件
try
{
RaiseDataReceived(state);
}
catch (Exception)
{
lock (_clients)
{
_clients.Remove(state);
//触发客户端连接断开事件
RaiseClientDisconnected();
return;
}
}
// continue listening for tcp diagram packets
stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state);
}
///
/// 发送数据
///
/// 接收数据的客户端会话
/// 数据报文
public void Send(TcpClientState state, byte[] data)
{
RaisePrepareSend(state);
Send(state.TcpClient, data);
}
///
/// 异步发送数据
///
/// 接收数据的客户端会话
/// 数据报文
public Task SendAsync(TcpClientState state, byte[] data)
{
RaisePrepareSend(state);
return SendAsync(state.TcpClient, data);
}
///
/// 指定IP地址发送数据
///
///
///
public void Send(string ip, byte[] data)
{
var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip);
RaisePrepareSend(tcpClient);
if (!IsRunning)
throw new InvalidProgramException("This TCP Socket server has not been started.");
if (ip == null)
throw new ArgumentNullException(nameof(ip));
if (data == null)
throw new ArgumentNullException(nameof(data));
tcpClient.TcpClient.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, tcpClient.TcpClient);
}
///
/// 指定IP地址异步发送数据
///
///
///
public Task SendAsync(string ip, byte[] data)
{
var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip);
if (tcpClient==null)
{
return Task.CompletedTask;
}
RaisePrepareSend(tcpClient);
if (!IsRunning)
throw new InvalidProgramException("This TCP Socket server has not been started.");
if (ip == null)
throw new ArgumentNullException(nameof(ip));
if (data == null)
throw new ArgumentNullException(nameof(data));
var stream = tcpClient.TcpClient.GetStream();
return stream.WriteAsync(data, 0, data.Length);
}
///
/// 异步发送数据至指定的客户端
///
/// 客户端
/// 报文
public void Send(TcpClient client, byte[] data)
{
if (!IsRunning)
throw new InvalidProgramException("This TCP Socket server has not been started.");
if (client == null)
throw new ArgumentNullException(nameof(client));
if (data == null)
throw new ArgumentNullException(nameof(data));
client.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, client);
}
///
/// 异步发送数据至指定的客户端
///
/// 客户端
/// 报文
public Task SendAsync(TcpClient client, byte[] data)
{
if (!IsRunning)
throw new InvalidProgramException("This TCP Socket server has not been started.");
if (client == null)
throw new ArgumentNullException(nameof(client));
if (data == null)
throw new ArgumentNullException(nameof(data));
var stream = client.GetStream();
return stream.WriteAsync(data, 0, data.Length);
}
///
/// 发送数据完成处理函数
///
/// 目标客户端Socket
private void SendDataEnd(IAsyncResult ar)
{
((TcpClient) ar.AsyncState).GetStream().EndWrite(ar);
RaiseCompletedSend(null);
}
///
/// 广播发送数据
///
/// 消息内容
public void BroadToClient(string msg)
{
foreach (var client in _clients)
{
var bytes = Encoding.GetBytes(msg);
Send(client, bytes);
}
}
public async Task BroadToClientAsync(string msg)
{
foreach (var client in _clients)
{
var bytes = Encoding.GetBytes(msg);
await SendAsync(client, bytes);
}
}
#endregion
#region 事件
///
/// 与客户端的连接已建立事件
///
public event EventHandler ClientConnected;
///
/// 与客户端的连接已断开事件
///
public event EventHandler ClientDisconnected;
///
/// 触发客户端连接事件
///
///
private void RaiseClientConnected(TcpClientState state)
{
ClientConnected?.Invoke(this, new ServerEventArgs(state));
}
///
/// 触发客户端连接断开事件
///
private void RaiseClientDisconnected()
{
ClientDisconnected?.Invoke(this, new ServerEventArgs("连接断开"));
}
///
///
/// 接收到数据事件
///
public event EventHandler DataReceived;
private void RaiseDataReceived(TcpClientState state)
{
DataReceived?.BeginInvoke(this, new ServerEventArgs(state), _ =>
{
} ,null);
}
///
/// 发送数据前的事件
///
public event EventHandler PrepareSend;
///
/// 触发发送数据前的事件
///
///
private void RaisePrepareSend(TcpClientState state)
{
PrepareSend?.Invoke(this, new ServerEventArgs(state));
}
///
/// 数据发送完毕事件
///
public event EventHandler CompletedSend;
///
/// 触发数据发送完毕的事件
///
///
private void RaiseCompletedSend(TcpClientState state)
{
CompletedSend?.Invoke(this, new ServerEventArgs(state));
}
///
/// 网络错误事件
///
public event EventHandler NetError;
///
/// 异常事件
///
public event EventHandler OtherException;
///
/// 触发异常事件
///
///
///
private void RaiseOtherException(TcpClientState state, string description = "unknown exception")
{
OtherException?.Invoke(this, new ServerEventArgs(description, state));
}
#endregion
#region Close
///
///
/// 关闭一个与客户端之间的会话
///
/// 需要关闭的客户端会话对象
public void Close(TcpClientState state)
{
if (state == null) return;
state.Close();
_clients.Remove(state);
}
///
///
/// 关闭所有的客户端会话,与所有的客户端连接会断开
///
public void CloseAllClient()
{
foreach (var client in _clients)
{
client?.Close();
}
_clients.Clear();
}
#endregion
#region 释放
///
///
///
///
/// Performs application-defined tasks associated with freeing,
/// releasing, or resetting unmanaged resources.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~TeamTcpServer()
{
Dispose();
}
///
/// Releases unmanaged and - optionally - managed resources
///
/// true to release
/// both managed and unmanaged resources; false
/// to release only unmanaged resources.
private void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
try
{
Stop();
// ReSharper disable once RedundantCheckBeforeAssignment
if (_listener != null)
{
_listener = null;
}
}
catch (SocketException e)
{
RaiseOtherException(null, e.ToString());
}
}
_disposed = true;
}
public void UseUnRegisterMessage(string key)
{
}
public void Register(string id, Func func)
{
}
public void UnRegister(string id)
{
}
#endregion
}
}