using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; using Team.Communicate.Data; using Team.Communicate.EventArg; using Team.Communicate.Exceptions; using Team.Communicate.Interfaces; using Team.Utility; namespace Team.Communicate.Client { public class TeamTcpClient: IAsyncTcpClient { public event EventHandler StatusChanged; public event EventHandler Received; public event EventHandler Sent; private ConcurrentQueue> RequestQueue { get; } private TcpClient _tcpClient; private string _remoteIp; private bool _disposed; public bool IsAutoConnect { get; set; } public Encoding Encoding { get; set; } public IPAddress RemoteIpAddress { get; set; } public int Port { get; private set; } public bool IsConnected { get; private set; } #region 构造函数 /// /// 异步TCP服务器 /// public TeamTcpClient(string remoteIp, int remotePort, IPAddress localIpAddress) : this() { try { _remoteIp = remoteIp; Port = remotePort; RemoteIpAddress = IPAddress.Parse(remoteIp); _tcpClient.Client.Bind(new IPEndPoint(localIpAddress, 0)); } catch (Exception e) { Console.WriteLine(e); } } public TeamTcpClient(IPAddress remoteIpAddress, int remotePort) : this() { RemoteIpAddress = remoteIpAddress; Port = remotePort; } public TeamTcpClient(string remoteIp, int remotePort) : this() { RemoteIpAddress = IPAddress.Parse(remoteIp); Port = remotePort; } public TimeSpan TimeoutTime { get; set; } = TimeSpan.FromSeconds(15); public bool Connecting => _isConnecting; private bool _isConnecting; private readonly object _lockObj = new object(); private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource(); protected TeamTcpClient() { _tcpClient = new TcpClient(); Encoding = Encoding.UTF8; RequestQueue = new ConcurrentQueue>(); _cacheReceived = new List(); } public void Reconnect() { lock (_lockObj) { if (_isConnecting) { return; } _isConnecting = true; } Task.Factory.StartNew(async () => { while (true) { if (_tokenSource.Token.IsCancellationRequested) { return; } if (!IsConnected && IsAutoConnect) { try { await ConnectAsync(); lock (_lockObj) { _isConnecting = false; } break; } catch (NullReferenceException) when (_tokenSource.Token.IsCancellationRequested) { return; } catch (Exception) { } } if (_tokenSource.Token.IsCancellationRequested) return; await Task.Delay(TimeoutTime); } }, _tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); // ReSharper disable once FunctionNeverReturns } #endregion #region Method /// /// /// /// /// public void SetRemoteIpOrPort(string ip, int port) { if (ip == _remoteIp || port == Port) { return; } Port = port; _remoteIp = ip; try { DisConnect(); } catch (Exception e) { Console.WriteLine(e); } IsConnected = false; RemoteIpAddress = IPAddress.Parse(ip); } /// /// /// 连接成功为true /// /// 1.exception ArgumentOutOfRangeException /// 2.exception ArgumentNullException /// public bool Connect() { if (IsConnected) { return true; } _tcpClient = new TcpClient(); _tcpClient.Connect(RemoteIpAddress, Port); lock (_lockObj) { _isConnecting = false; } IsConnected = true; OnConnectStatusChanged(ConnectStatus.Connected); StartReceive(); return IsConnected; } /// /// /// 连接成功为true /// /// 1.exception ArgumentOutOfRangeException /// 2.exception ArgumentNullException /// public async Task ConnectAsync() { if (IsConnected) { return true; } try { _tcpClient?.Close(); _tcpClient?.Dispose(); } catch (Exception) { } _tcpClient = new TcpClient(); if (_tcpClient.Client == null) { return false; } await _tcpClient.ConnectAsync(RemoteIpAddress, Port); IsConnected = true; OnConnectStatusChanged(ConnectStatus.Connected); StartReceive(); return IsConnected; } private string _endWith=string.Empty; public void SetEndWith(string endWith) { if (string.IsNullOrEmpty(endWith)) { return; } _endWith = endWith; } public void DisConnect() { try { CancelReceive(); _tcpClient.Close(); } catch (Exception e) { Console.WriteLine(e); } Reconnect(); OnConnectStatusChanged(ConnectStatus.Disconnected); } public async Task SendAsync(Transmission msg) { if (!_tcpClient.Connected) { return null; } var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; await _tcpClient.GetStream().WriteAsync(msg.Data, 0, msg.Data.Length); var transmission = new Transmission(msg.Data, Transmission.EType.Sent) { Origin = from, Destination = to }; OnSent(new SentEventArgs(transmission)); return new TransmissionResult(from, to); } public TransmissionResult SendString(string msg) { if (!_tcpClient.Connected) { throw new NotConnectException("未连接到服务器异常"); } var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; msg += _endWith; var buffer = Encoding.GetBytes(msg); _tcpClient.GetStream().Write(buffer, 0, buffer.Length); var transmission = new Transmission(buffer, Transmission.EType.Sent) { Origin = from, Destination = to }; OnSent(new SentEventArgs(transmission)); return new TransmissionResult(from, to); } public async Task SendStringAsync(string msg) { if (!_tcpClient.Connected) { throw new NotConnectException("未连接到服务器异常"); } var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; msg += _endWith; var buffer = Encoding.GetBytes(msg); await _tcpClient.GetStream().WriteAsync(buffer, 0, buffer.Length); var transmission = new Transmission(buffer, Transmission.EType.Sent) { Origin = from, Destination = to }; OnSent(new SentEventArgs(transmission)); return new TransmissionResult(from,to); } public TransmissionResult SendBytes(byte[] data) { if (!_tcpClient.Connected) { throw new NotConnectException("Not Connected"); } var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; _tcpClient.GetStream().Write(data, 0, data.Length); OnSent(new SentEventArgs(new Transmission(data, Transmission.EType.Sent) { Origin = from, Destination = to })); return new TransmissionResult(from, to); } public async Task SendBytesAsync(byte[] data) { if (!_tcpClient.Connected) { return null; } var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; await _tcpClient.GetStream().WriteAsync(data, 0, data.Length); return new TransmissionResult(from, to); } public async Task SendAndReceiveStringAsync(string sendMessage) { if (!IsConnected) { throw new NotConnectException($"未连接到服务器异常:{RemoteIpAddress}:{Port}"); } if (RequestQueue.Count > 0) { throw new BusyException("已经在执行一个任务"); } var resultTask = new TaskCompletionSource(); RequestQueue.Enqueue(resultTask); var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; sendMessage += _endWith; var buffer = Encoding.GetBytes(sendMessage); await _tcpClient.GetStream().WriteAsync(buffer, 0, buffer.Length); OnSent(new SentEventArgs(new Transmission(buffer, Transmission.EType.Sent) { Origin = from, Destination = to })); var bytes= await resultTask.Task; var result = Encoding.GetString(bytes); return result.TrimEnd(); } public string SendAndReceiveString(string sendMessage) { if (!IsConnected) { throw new NotConnectException("未连接到服务器异常"); } if (RequestQueue.Count > 0) { throw new BusyException("已经在执行一个任务"); } var resultTask = new TaskCompletionSource(); RequestQueue.Enqueue(resultTask); var from = _tcpClient.Client.LocalEndPoint as IPEndPoint; var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint; sendMessage += _endWith; var buffer = Encoding.GetBytes(sendMessage); _tcpClient.GetStream().Write(buffer, 0, buffer.Length); OnSent(new SentEventArgs(new Transmission(buffer, Transmission.EType.Sent) { Origin = from, Destination = to })); var bytes = resultTask.Task.GetAwaiter().GetResult(); var result = Encoding.GetString(bytes); return result.TrimEnd(); } #endregion public void Dispose() { Dispose(true); //.NET Framework 类库 // GC..::.SuppressFinalize 方法 //请求系统不要调用指定对象的终结器。 GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (_disposed) return; if (disposing) { // Release managed resources IsAutoConnect = false; CancelReceive(); _tokenSource.Cancel(); } if (_tcpClient != null) { //CloseClientSocket(); #if NET40 || NET45 || NET451 || NET452 _socketClient?.Close(); #else _tcpClient?.Dispose(); #endif LogUtil.WriteInfo($"Tcp client Disposed"); } _disposed = true; } private void CloseClientSocket() { try { var stream = _tcpClient.GetStream(); stream.Dispose(); _tcpClient.Client.Shutdown(SocketShutdown.Both); _tcpClient.Client.Dispose(); } catch (Exception ex) { LogUtil.WriteInfo($"Tcp client client close exception:{ex}"); } } private void StartReceive() { Task.Factory.StartNew(async () => { while (_tcpClient != null) { try { var buffer = new byte[1024]; var netStream = _tcpClient.GetStream(); //int read = await _tcpClient.GetStream().ReadAsync(buffer, 0, buffer.Length); using (var memoryStream = new MemoryStream()) { int length; do { length = await netStream.ReadAsync(buffer, 0, buffer.Length, _tokenSource.Token); if (length > 0) { //Array.Copy(buffer, data, length); await memoryStream.WriteAsync(buffer, 0, length); } else { LogUtil.WriteInfo($"服务端:{RemoteIpAddress}已经断开了连接"); DisConnect(); return; } } while (length == buffer.Length && netStream.DataAvailable); var msg = new Transmission(memoryStream.ToArray(), Transmission.EType.Received) { Destination = _tcpClient.Client.LocalEndPoint as IPEndPoint, Origin = _tcpClient.Client.RemoteEndPoint as IPEndPoint }; OnReceived(new ReceivedEventArgs(msg)); } } catch (Exception e) when (e is ObjectDisposedException || e is IOException ||e is InvalidOperationException ||e is TaskCanceledException) { LogUtil.WriteInfo($"读取服务端发生异常:{e}"); DisConnect(); break; } } },TaskCreationOptions.LongRunning); } public async Task ReceiveBytesAsync() { if (!IsConnected) { throw new NotConnectException("client未连接到服务器"); } if (RequestQueue.Count > 0) { throw new BusyException("已经在执行一个任务"); } if (_cacheReceived.Count > 0) { var first = _cacheReceived.First(); _cacheReceived.RemoveAt(0); return await Task.FromResult(first.Message.Data); } var resultTask = new TaskCompletionSource(); RequestQueue.Enqueue(resultTask); return await resultTask.Task; } public byte[] ReceiveBytes() { if (!IsConnected) { throw new NotConnectException("client未连接到服务器"); } if (RequestQueue.Count > 0) { throw new BusyException("已经在执行一个任务"); } if (_cacheReceived.Count>0) { var first= _cacheReceived.First(); _cacheReceived.RemoveAt(0); return first.Message.Data; } var resultTask = new TaskCompletionSource(); RequestQueue.Enqueue(resultTask); return resultTask.Task.GetAwaiter().GetResult(); } public async Task ReceiveStringAsync() { if (!IsConnected) { throw new NotConnectException("client未连接到服务器"); } if (RequestQueue.Count>0) { throw new BusyException("已经在执行一个任务"); } if (_cacheReceived.Count > 0) { var first = _cacheReceived.First(); _cacheReceived.RemoveAt(0); var result1 = Encoding.GetString(first.Message.Data).TrimEnd(); return await Task.FromResult(result1); } var resultTask = new TaskCompletionSource(); RequestQueue.Enqueue(resultTask); var content= await resultTask.Task; var result = Encoding.GetString(content); return result.TrimEnd(); } public string ReceiveString() { if (!IsConnected) { throw new NotConnectException("client未连接到服务器"); } if (RequestQueue.Count > 0) { throw new BusyException("已经在执行一个任务"); } var resultTask = new TaskCompletionSource(); RequestQueue.Enqueue(resultTask); var content = resultTask.Task.GetAwaiter().GetResult(); var result = Encoding.GetString(content); return result.TrimEnd(); } public void CancelReceive() { for (var i = 0; i < RequestQueue.Count; i++) { RequestQueue.TryDequeue(out var taskCompletionSource); taskCompletionSource.TrySetCanceled(); } _cacheReceived.Clear(); } /// /// recommended async method because this method maybe cause deadlock /// /// private void OnConnectStatusChanged(ConnectStatus status) { var ep = status == ConnectStatus.Connected ? _tcpClient.Client.RemoteEndPoint as IPEndPoint : new IPEndPoint(RemoteIpAddress, Port); if (IsConnected == (status == ConnectStatus.Connected)) return; StatusChanged?.Invoke(this, new TcpClientStatusEventArgs(status, ep)); IsConnected = status == ConnectStatus.Connected; } private readonly List _cacheReceived; private void OnReceived(ReceivedEventArgs receivedEventArgs) { if (RequestQueue.TryDequeue(out var taskCompletionSource)) { taskCompletionSource.SetResult(receivedEventArgs.Message.Data); } else { _cacheReceived.Add(receivedEventArgs); } Received?.Invoke(this, receivedEventArgs); LogUtil.WriteDebug($"{RemoteIpAddress}:{Port}收到消息:"+Encoding.GetString(receivedEventArgs.Message.Data)); } private void OnSent(SentEventArgs sentEventArgs) { Sent?.Invoke(this, sentEventArgs); } } }