123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615 |
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Net;
- using System.Net.Sockets;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using Team.Communicate.Data;
- using Team.Communicate.EventArg;
- using Team.Communicate.Exceptions;
- using Team.Communicate.Interfaces;
- using Team.Utility;
- namespace Team.Communicate.Client
- {
- public class TeamTcpClient: IAsyncTcpClient
- {
- public event EventHandler<TcpClientStatusEventArgs> StatusChanged;
- public event EventHandler<ReceivedEventArgs> Received;
- public event EventHandler<SentEventArgs> Sent;
- private ConcurrentQueue<TaskCompletionSource<byte[]>> RequestQueue { get; }
- private TcpClient _tcpClient;
- private string _remoteIp;
- private bool _disposed;
- public bool IsAutoConnect { get; set; }
- public Encoding Encoding { get; set; }
- public IPAddress RemoteIpAddress { get; set; }
-
- public int Port { get; private set; }
- public bool IsConnected { get; private set; }
- #region 构造函数
- /// <summary>
- /// 异步TCP服务器
- /// </summary>
- public TeamTcpClient(string remoteIp, int remotePort, IPAddress localIpAddress) : this()
- {
- try
- {
- _remoteIp = remoteIp;
- Port = remotePort;
- RemoteIpAddress = IPAddress.Parse(remoteIp);
- _tcpClient.Client.Bind(new IPEndPoint(localIpAddress, 0));
- }
- catch (Exception e)
- {
- Console.WriteLine(e);
- }
- }
- public TeamTcpClient(IPAddress remoteIpAddress, int remotePort) : this()
- {
- RemoteIpAddress = remoteIpAddress;
- Port = remotePort;
- }
- public TeamTcpClient(string remoteIp, int remotePort) : this()
- {
- RemoteIpAddress = IPAddress.Parse(remoteIp);
- Port = remotePort;
- }
-
- public TimeSpan TimeoutTime { get; set; } = TimeSpan.FromSeconds(15);
- public bool Connecting => _isConnecting;
- private bool _isConnecting;
- private readonly object _lockObj = new object();
- private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
- protected TeamTcpClient()
- {
- _tcpClient = new TcpClient();
- Encoding = Encoding.UTF8;
- RequestQueue = new ConcurrentQueue<TaskCompletionSource<byte[]>>();
- _cacheReceived = new List<ReceivedEventArgs>();
- }
-
-
- public void Reconnect()
- {
- lock (_lockObj)
- {
- if (_isConnecting)
- {
- return;
- }
- _isConnecting = true;
- }
- Task.Factory.StartNew(async () =>
- {
- while (true)
- {
- if (_tokenSource.Token.IsCancellationRequested)
- {
- return;
- }
- if (!IsConnected && IsAutoConnect)
- {
- try
- {
- await ConnectAsync();
-
- lock (_lockObj)
- {
- _isConnecting = false;
- }
- break;
- }
- catch (NullReferenceException) when (_tokenSource.Token.IsCancellationRequested)
- {
- return;
- }
- catch (Exception)
- {
-
- }
- }
- if (_tokenSource.Token.IsCancellationRequested)
- return;
- await Task.Delay(TimeoutTime);
- }
- }, _tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
- // ReSharper disable once FunctionNeverReturns
- }
- #endregion
- #region Method
- /// <summary>
- ///
- /// </summary>
- /// <param name="ip"></param>
- /// <param name="port"></param>
- public void SetRemoteIpOrPort(string ip, int port)
- {
- if (ip == _remoteIp || port == Port)
- {
- return;
- }
- Port = port;
- _remoteIp = ip;
- try
- {
- DisConnect();
- }
- catch (Exception e)
- {
- Console.WriteLine(e);
- }
- IsConnected = false;
- RemoteIpAddress = IPAddress.Parse(ip);
- }
- /// <inheritdoc />
- /// <summary>
- /// 连接成功为true
- /// </summary>
- /// 1.exception ArgumentOutOfRangeException
- /// 2.exception ArgumentNullException
- /// <returns></returns>
- public bool Connect()
- {
- if (IsConnected)
- {
- return true;
- }
- _tcpClient = new TcpClient();
- _tcpClient.Connect(RemoteIpAddress, Port);
- lock (_lockObj)
- {
- _isConnecting = false;
- }
- IsConnected = true;
- OnConnectStatusChanged(ConnectStatus.Connected);
- StartReceive();
- return IsConnected;
- }
- /// <inheritdoc />
- /// <summary>
- /// 连接成功为true
- /// </summary>
- /// 1.exception ArgumentOutOfRangeException
- /// 2.exception ArgumentNullException
- /// <returns></returns>
- public async Task<bool> ConnectAsync()
- {
- if (IsConnected)
- {
- return true;
- }
- try
- {
- _tcpClient?.Close();
- _tcpClient?.Dispose();
- }
- catch (Exception)
- {
- }
- _tcpClient = new TcpClient();
- if (_tcpClient.Client == null)
- {
- return false;
- }
- await _tcpClient.ConnectAsync(RemoteIpAddress, Port);
- IsConnected = true;
- OnConnectStatusChanged(ConnectStatus.Connected);
- StartReceive();
- return IsConnected;
- }
- private string _endWith=string.Empty;
- public void SetEndWith(string endWith)
- {
- if (string.IsNullOrEmpty(endWith))
- {
- return;
- }
- _endWith = endWith;
- }
- public void DisConnect()
- {
- try
- {
- CancelReceive();
- _tcpClient.Close();
- }
- catch (Exception e)
- {
- Console.WriteLine(e);
- }
- Reconnect();
- OnConnectStatusChanged(ConnectStatus.Disconnected);
- }
- public async Task<TransmissionResult> SendAsync(Transmission msg)
- {
- if (!_tcpClient.Connected)
- {
- return null;
- }
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- await _tcpClient.GetStream().WriteAsync(msg.Data, 0, msg.Data.Length);
- var transmission = new Transmission(msg.Data, Transmission.EType.Sent)
- {
- Origin = from,
- Destination = to
- };
- OnSent(new SentEventArgs(transmission));
- return new TransmissionResult(from, to);
- }
- public TransmissionResult SendString(string msg)
- {
- if (!_tcpClient.Connected)
- {
- throw new NotConnectException("未连接到服务器异常");
- }
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- msg += _endWith;
- var buffer = Encoding.GetBytes(msg);
- _tcpClient.GetStream().Write(buffer, 0, buffer.Length);
- var transmission = new Transmission(buffer, Transmission.EType.Sent)
- {
- Origin = from,
- Destination = to
- };
- OnSent(new SentEventArgs(transmission));
- return new TransmissionResult(from, to);
- }
- public async Task<TransmissionResult> SendStringAsync(string msg)
- {
- if (!_tcpClient.Connected)
- {
- throw new NotConnectException("未连接到服务器异常");
- }
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- msg += _endWith;
- var buffer = Encoding.GetBytes(msg);
- await _tcpClient.GetStream().WriteAsync(buffer, 0, buffer.Length);
- var transmission = new Transmission(buffer, Transmission.EType.Sent)
- {
- Origin = from,
- Destination = to
- };
- OnSent(new SentEventArgs(transmission));
- return new TransmissionResult(from,to);
- }
- public TransmissionResult SendBytes(byte[] data)
- {
- if (!_tcpClient.Connected)
- {
- throw new NotConnectException("Not Connected");
- }
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- _tcpClient.GetStream().Write(data, 0, data.Length);
- OnSent(new SentEventArgs(new Transmission(data, Transmission.EType.Sent)
- {
- Origin = from,
- Destination = to
- }));
- return new TransmissionResult(from, to);
- }
-
- public async Task<TransmissionResult> SendBytesAsync(byte[] data)
- {
- if (!_tcpClient.Connected)
- {
- return null;
- }
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- await _tcpClient.GetStream().WriteAsync(data, 0, data.Length);
- return new TransmissionResult(from, to);
- }
- public async Task<string> SendAndReceiveStringAsync(string sendMessage)
- {
- if (!IsConnected)
- {
- throw new NotConnectException($"未连接到服务器异常:{RemoteIpAddress}:{Port}");
- }
- if (RequestQueue.Count > 0)
- {
- throw new BusyException("已经在执行一个任务");
- }
- var resultTask = new TaskCompletionSource<byte[]>();
- RequestQueue.Enqueue(resultTask);
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- sendMessage += _endWith;
- var buffer = Encoding.GetBytes(sendMessage);
- await _tcpClient.GetStream().WriteAsync(buffer, 0, buffer.Length);
- OnSent(new SentEventArgs(new Transmission(buffer, Transmission.EType.Sent)
- {
- Origin = from,
- Destination = to
- }));
-
- var bytes= await resultTask.Task;
- var result = Encoding.GetString(bytes);
- return result.TrimEnd();
- }
- public string SendAndReceiveString(string sendMessage)
- {
- if (!IsConnected)
- {
- throw new NotConnectException("未连接到服务器异常");
- }
- if (RequestQueue.Count > 0)
- {
- throw new BusyException("已经在执行一个任务");
- }
- var resultTask = new TaskCompletionSource<byte[]>();
- RequestQueue.Enqueue(resultTask);
- var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
- var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
- sendMessage += _endWith;
- var buffer = Encoding.GetBytes(sendMessage);
- _tcpClient.GetStream().Write(buffer, 0, buffer.Length);
- OnSent(new SentEventArgs(new Transmission(buffer, Transmission.EType.Sent)
- {
- Origin = from, Destination = to
- }));
- var bytes = resultTask.Task.GetAwaiter().GetResult();
- var result = Encoding.GetString(bytes);
- return result.TrimEnd();
- }
- #endregion
- public void Dispose()
- {
- Dispose(true);
- //.NET Framework 类库
- // GC..::.SuppressFinalize 方法
- //请求系统不要调用指定对象的终结器。
- GC.SuppressFinalize(this);
-
- }
- protected void Dispose(bool disposing)
- {
- if (_disposed) return;
- if (disposing)
- {
- // Release managed resources
- IsAutoConnect = false;
- CancelReceive();
- _tokenSource.Cancel();
- }
- if (_tcpClient != null)
- {
- //CloseClientSocket();
- #if NET40 || NET45 || NET451 || NET452
- _socketClient?.Close();
- #else
- _tcpClient?.Dispose();
- #endif
- LogUtil.WriteInfo($"Tcp client Disposed");
- }
- _disposed = true;
- }
- private void CloseClientSocket()
- {
- try
- {
- var stream = _tcpClient.GetStream();
- stream.Dispose();
- _tcpClient.Client.Shutdown(SocketShutdown.Both);
- _tcpClient.Client.Dispose();
- }
- catch (Exception ex)
- {
- LogUtil.WriteInfo($"Tcp client client close exception:{ex}");
- }
- }
-
- private void StartReceive()
- {
- Task.Factory.StartNew(async () =>
- {
- while (_tcpClient != null)
- {
- try
- {
- var buffer = new byte[1024];
- var netStream = _tcpClient.GetStream();
- //int read = await _tcpClient.GetStream().ReadAsync(buffer, 0, buffer.Length);
- using (var memoryStream = new MemoryStream())
- {
- int length;
- do
- {
- length = await netStream.ReadAsync(buffer, 0, buffer.Length, _tokenSource.Token);
- if (length > 0)
- {
- //Array.Copy(buffer, data, length);
- await memoryStream.WriteAsync(buffer, 0, length);
- }
- else
- {
- LogUtil.WriteInfo($"服务端:{RemoteIpAddress}已经断开了连接");
- DisConnect();
- return;
- }
- } while (length == buffer.Length && netStream.DataAvailable);
- var msg = new Transmission(memoryStream.ToArray(), Transmission.EType.Received)
- {
- Destination = _tcpClient.Client.LocalEndPoint as IPEndPoint,
- Origin = _tcpClient.Client.RemoteEndPoint as IPEndPoint
- };
- OnReceived(new ReceivedEventArgs(msg));
- }
- }
- catch (Exception e)
- when (e is ObjectDisposedException || e is IOException ||e is InvalidOperationException ||e is TaskCanceledException)
- {
- LogUtil.WriteInfo($"读取服务端发生异常:{e}");
- DisConnect();
- break;
- }
- }
- },TaskCreationOptions.LongRunning);
- }
-
- public async Task<byte[]> ReceiveBytesAsync()
- {
- if (!IsConnected)
- {
- throw new NotConnectException("client未连接到服务器");
- }
- if (RequestQueue.Count > 0)
- {
- throw new BusyException("已经在执行一个任务");
- }
- if (_cacheReceived.Count > 0)
- {
- var first = _cacheReceived.First();
- _cacheReceived.RemoveAt(0);
- return await Task.FromResult(first.Message.Data);
- }
- var resultTask = new TaskCompletionSource<byte[]>();
- RequestQueue.Enqueue(resultTask);
- return await resultTask.Task;
- }
- public byte[] ReceiveBytes()
- {
- if (!IsConnected)
- {
- throw new NotConnectException("client未连接到服务器");
- }
- if (RequestQueue.Count > 0)
- {
- throw new BusyException("已经在执行一个任务");
- }
- if (_cacheReceived.Count>0)
- {
- var first= _cacheReceived.First();
- _cacheReceived.RemoveAt(0);
- return first.Message.Data;
- }
- var resultTask = new TaskCompletionSource<byte[]>();
- RequestQueue.Enqueue(resultTask);
- return resultTask.Task.GetAwaiter().GetResult();
- }
- public async Task<string> ReceiveStringAsync()
- {
- if (!IsConnected)
- {
- throw new NotConnectException("client未连接到服务器");
- }
- if (RequestQueue.Count>0)
- {
- throw new BusyException("已经在执行一个任务");
- }
- if (_cacheReceived.Count > 0)
- {
- var first = _cacheReceived.First();
- _cacheReceived.RemoveAt(0);
- var result1 = Encoding.GetString(first.Message.Data).TrimEnd();
- return await Task.FromResult(result1);
- }
- var resultTask = new TaskCompletionSource<byte[]>();
- RequestQueue.Enqueue(resultTask);
- var content= await resultTask.Task;
- var result = Encoding.GetString(content);
- return result.TrimEnd();
- }
- public string ReceiveString()
- {
- if (!IsConnected)
- {
- throw new NotConnectException("client未连接到服务器");
- }
- if (RequestQueue.Count > 0)
- {
- throw new BusyException("已经在执行一个任务");
- }
- var resultTask = new TaskCompletionSource<byte[]>();
- RequestQueue.Enqueue(resultTask);
- var content = resultTask.Task.GetAwaiter().GetResult();
- var result = Encoding.GetString(content);
- return result.TrimEnd();
- }
- public void CancelReceive()
- {
- for (var i = 0; i < RequestQueue.Count; i++)
- {
- RequestQueue.TryDequeue(out var taskCompletionSource);
- taskCompletionSource.TrySetCanceled();
- }
- _cacheReceived.Clear();
- }
- /// <summary>
- /// recommended async method because this method maybe cause deadlock
- /// </summary>
- /// <returns></returns>
-
- private void OnConnectStatusChanged(ConnectStatus status)
- {
- var ep = status == ConnectStatus.Connected ?
- _tcpClient.Client.RemoteEndPoint as IPEndPoint : new IPEndPoint(RemoteIpAddress, Port);
- if (IsConnected == (status == ConnectStatus.Connected)) return;
- StatusChanged?.Invoke(this, new TcpClientStatusEventArgs(status, ep));
- IsConnected = status == ConnectStatus.Connected;
- }
- private readonly List<ReceivedEventArgs> _cacheReceived;
- private void OnReceived(ReceivedEventArgs receivedEventArgs)
- {
- if (RequestQueue.TryDequeue(out var taskCompletionSource))
- {
- taskCompletionSource.SetResult(receivedEventArgs.Message.Data);
- }
- else
- {
- _cacheReceived.Add(receivedEventArgs);
- }
- Received?.Invoke(this, receivedEventArgs);
- LogUtil.WriteDebug($"{RemoteIpAddress}:{Port}收到消息:"+Encoding.GetString(receivedEventArgs.Message.Data));
- }
- private void OnSent(SentEventArgs sentEventArgs)
- {
-
- Sent?.Invoke(this, sentEventArgs);
- }
- }
- }
|