TeamTcpClient.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Net;
  7. using System.Net.Sockets;
  8. using System.Text;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Team.Communicate.Data;
  12. using Team.Communicate.EventArg;
  13. using Team.Communicate.Exceptions;
  14. using Team.Communicate.Interfaces;
  15. using Team.Utility;
  16. namespace Team.Communicate.Client
  17. {
  18. public class TeamTcpClient: IAsyncTcpClient
  19. {
  20. public event EventHandler<TcpClientStatusEventArgs> StatusChanged;
  21. public event EventHandler<ReceivedEventArgs> Received;
  22. public event EventHandler<SentEventArgs> Sent;
  23. private ConcurrentQueue<TaskCompletionSource<byte[]>> RequestQueue { get; }
  24. private TcpClient _tcpClient;
  25. private string _remoteIp;
  26. private bool _disposed;
  27. public bool IsAutoConnect { get; set; }
  28. public Encoding Encoding { get; set; }
  29. public IPAddress RemoteIpAddress { get; set; }
  30. public int Port { get; private set; }
  31. public bool IsConnected { get; private set; }
  32. #region 构造函数
  33. /// <summary>
  34. /// 异步TCP服务器
  35. /// </summary>
  36. public TeamTcpClient(string remoteIp, int remotePort, IPAddress localIpAddress) : this()
  37. {
  38. try
  39. {
  40. _remoteIp = remoteIp;
  41. Port = remotePort;
  42. RemoteIpAddress = IPAddress.Parse(remoteIp);
  43. _tcpClient.Client.Bind(new IPEndPoint(localIpAddress, 0));
  44. }
  45. catch (Exception e)
  46. {
  47. Console.WriteLine(e);
  48. }
  49. }
  50. public TeamTcpClient(IPAddress remoteIpAddress, int remotePort) : this()
  51. {
  52. RemoteIpAddress = remoteIpAddress;
  53. Port = remotePort;
  54. }
  55. public TeamTcpClient(string remoteIp, int remotePort) : this()
  56. {
  57. RemoteIpAddress = IPAddress.Parse(remoteIp);
  58. Port = remotePort;
  59. }
  60. public TimeSpan TimeoutTime { get; set; } = TimeSpan.FromSeconds(15);
  61. public bool Connecting => _isConnecting;
  62. private bool _isConnecting;
  63. private readonly object _lockObj = new object();
  64. private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();
  65. protected TeamTcpClient()
  66. {
  67. _tcpClient = new TcpClient();
  68. Encoding = Encoding.UTF8;
  69. RequestQueue = new ConcurrentQueue<TaskCompletionSource<byte[]>>();
  70. _cacheReceived = new List<ReceivedEventArgs>();
  71. }
  72. public void Reconnect()
  73. {
  74. lock (_lockObj)
  75. {
  76. if (_isConnecting)
  77. {
  78. return;
  79. }
  80. _isConnecting = true;
  81. }
  82. Task.Factory.StartNew(async () =>
  83. {
  84. while (true)
  85. {
  86. if (_tokenSource.Token.IsCancellationRequested)
  87. {
  88. return;
  89. }
  90. if (!IsConnected && IsAutoConnect)
  91. {
  92. try
  93. {
  94. await ConnectAsync();
  95. lock (_lockObj)
  96. {
  97. _isConnecting = false;
  98. }
  99. break;
  100. }
  101. catch (NullReferenceException) when (_tokenSource.Token.IsCancellationRequested)
  102. {
  103. return;
  104. }
  105. catch (Exception)
  106. {
  107. }
  108. }
  109. if (_tokenSource.Token.IsCancellationRequested)
  110. return;
  111. await Task.Delay(TimeoutTime);
  112. }
  113. }, _tokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
  114. // ReSharper disable once FunctionNeverReturns
  115. }
  116. #endregion
  117. #region Method
  118. /// <summary>
  119. ///
  120. /// </summary>
  121. /// <param name="ip"></param>
  122. /// <param name="port"></param>
  123. public void SetRemoteIpOrPort(string ip, int port)
  124. {
  125. if (ip == _remoteIp || port == Port)
  126. {
  127. return;
  128. }
  129. Port = port;
  130. _remoteIp = ip;
  131. try
  132. {
  133. DisConnect();
  134. }
  135. catch (Exception e)
  136. {
  137. Console.WriteLine(e);
  138. }
  139. IsConnected = false;
  140. RemoteIpAddress = IPAddress.Parse(ip);
  141. }
  142. /// <inheritdoc />
  143. /// <summary>
  144. /// 连接成功为true
  145. /// </summary>
  146. /// 1.exception ArgumentOutOfRangeException
  147. /// 2.exception ArgumentNullException
  148. /// <returns></returns>
  149. public bool Connect()
  150. {
  151. if (IsConnected)
  152. {
  153. return true;
  154. }
  155. _tcpClient = new TcpClient();
  156. _tcpClient.Connect(RemoteIpAddress, Port);
  157. lock (_lockObj)
  158. {
  159. _isConnecting = false;
  160. }
  161. IsConnected = true;
  162. OnConnectStatusChanged(ConnectStatus.Connected);
  163. StartReceive();
  164. return IsConnected;
  165. }
  166. /// <inheritdoc />
  167. /// <summary>
  168. /// 连接成功为true
  169. /// </summary>
  170. /// 1.exception ArgumentOutOfRangeException
  171. /// 2.exception ArgumentNullException
  172. /// <returns></returns>
  173. public async Task<bool> ConnectAsync()
  174. {
  175. if (IsConnected)
  176. {
  177. return true;
  178. }
  179. try
  180. {
  181. _tcpClient?.Close();
  182. _tcpClient?.Dispose();
  183. }
  184. catch (Exception)
  185. {
  186. }
  187. _tcpClient = new TcpClient();
  188. if (_tcpClient.Client == null)
  189. {
  190. return false;
  191. }
  192. await _tcpClient.ConnectAsync(RemoteIpAddress, Port);
  193. IsConnected = true;
  194. OnConnectStatusChanged(ConnectStatus.Connected);
  195. StartReceive();
  196. return IsConnected;
  197. }
  198. private string _endWith=string.Empty;
  199. public void SetEndWith(string endWith)
  200. {
  201. if (string.IsNullOrEmpty(endWith))
  202. {
  203. return;
  204. }
  205. _endWith = endWith;
  206. }
  207. public void DisConnect()
  208. {
  209. try
  210. {
  211. CancelReceive();
  212. _tcpClient.Close();
  213. }
  214. catch (Exception e)
  215. {
  216. Console.WriteLine(e);
  217. }
  218. Reconnect();
  219. OnConnectStatusChanged(ConnectStatus.Disconnected);
  220. }
  221. public async Task<TransmissionResult> SendAsync(Transmission msg)
  222. {
  223. if (!_tcpClient.Connected)
  224. {
  225. return null;
  226. }
  227. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  228. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  229. await _tcpClient.GetStream().WriteAsync(msg.Data, 0, msg.Data.Length);
  230. var transmission = new Transmission(msg.Data, Transmission.EType.Sent)
  231. {
  232. Origin = from,
  233. Destination = to
  234. };
  235. OnSent(new SentEventArgs(transmission));
  236. return new TransmissionResult(from, to);
  237. }
  238. public TransmissionResult SendString(string msg)
  239. {
  240. if (!_tcpClient.Connected)
  241. {
  242. throw new NotConnectException("未连接到服务器异常");
  243. }
  244. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  245. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  246. msg += _endWith;
  247. var buffer = Encoding.GetBytes(msg);
  248. _tcpClient.GetStream().Write(buffer, 0, buffer.Length);
  249. var transmission = new Transmission(buffer, Transmission.EType.Sent)
  250. {
  251. Origin = from,
  252. Destination = to
  253. };
  254. OnSent(new SentEventArgs(transmission));
  255. return new TransmissionResult(from, to);
  256. }
  257. public async Task<TransmissionResult> SendStringAsync(string msg)
  258. {
  259. if (!_tcpClient.Connected)
  260. {
  261. throw new NotConnectException("未连接到服务器异常");
  262. }
  263. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  264. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  265. msg += _endWith;
  266. var buffer = Encoding.GetBytes(msg);
  267. await _tcpClient.GetStream().WriteAsync(buffer, 0, buffer.Length);
  268. var transmission = new Transmission(buffer, Transmission.EType.Sent)
  269. {
  270. Origin = from,
  271. Destination = to
  272. };
  273. OnSent(new SentEventArgs(transmission));
  274. return new TransmissionResult(from,to);
  275. }
  276. public TransmissionResult SendBytes(byte[] data)
  277. {
  278. if (!_tcpClient.Connected)
  279. {
  280. throw new NotConnectException("Not Connected");
  281. }
  282. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  283. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  284. _tcpClient.GetStream().Write(data, 0, data.Length);
  285. OnSent(new SentEventArgs(new Transmission(data, Transmission.EType.Sent)
  286. {
  287. Origin = from,
  288. Destination = to
  289. }));
  290. return new TransmissionResult(from, to);
  291. }
  292. public async Task<TransmissionResult> SendBytesAsync(byte[] data)
  293. {
  294. if (!_tcpClient.Connected)
  295. {
  296. return null;
  297. }
  298. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  299. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  300. await _tcpClient.GetStream().WriteAsync(data, 0, data.Length);
  301. return new TransmissionResult(from, to);
  302. }
  303. public async Task<string> SendAndReceiveStringAsync(string sendMessage)
  304. {
  305. if (!IsConnected)
  306. {
  307. throw new NotConnectException($"未连接到服务器异常:{RemoteIpAddress}:{Port}");
  308. }
  309. if (RequestQueue.Count > 0)
  310. {
  311. throw new BusyException("已经在执行一个任务");
  312. }
  313. var resultTask = new TaskCompletionSource<byte[]>();
  314. RequestQueue.Enqueue(resultTask);
  315. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  316. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  317. sendMessage += _endWith;
  318. var buffer = Encoding.GetBytes(sendMessage);
  319. await _tcpClient.GetStream().WriteAsync(buffer, 0, buffer.Length);
  320. OnSent(new SentEventArgs(new Transmission(buffer, Transmission.EType.Sent)
  321. {
  322. Origin = from,
  323. Destination = to
  324. }));
  325. var bytes= await resultTask.Task;
  326. var result = Encoding.GetString(bytes);
  327. return result.TrimEnd();
  328. }
  329. public string SendAndReceiveString(string sendMessage)
  330. {
  331. if (!IsConnected)
  332. {
  333. throw new NotConnectException("未连接到服务器异常");
  334. }
  335. if (RequestQueue.Count > 0)
  336. {
  337. throw new BusyException("已经在执行一个任务");
  338. }
  339. var resultTask = new TaskCompletionSource<byte[]>();
  340. RequestQueue.Enqueue(resultTask);
  341. var from = _tcpClient.Client.LocalEndPoint as IPEndPoint;
  342. var to = _tcpClient.Client.RemoteEndPoint as IPEndPoint;
  343. sendMessage += _endWith;
  344. var buffer = Encoding.GetBytes(sendMessage);
  345. _tcpClient.GetStream().Write(buffer, 0, buffer.Length);
  346. OnSent(new SentEventArgs(new Transmission(buffer, Transmission.EType.Sent)
  347. {
  348. Origin = from, Destination = to
  349. }));
  350. var bytes = resultTask.Task.GetAwaiter().GetResult();
  351. var result = Encoding.GetString(bytes);
  352. return result.TrimEnd();
  353. }
  354. #endregion
  355. public void Dispose()
  356. {
  357. Dispose(true);
  358. //.NET Framework 类库
  359. // GC..::.SuppressFinalize 方法
  360. //请求系统不要调用指定对象的终结器。
  361. GC.SuppressFinalize(this);
  362. }
  363. protected void Dispose(bool disposing)
  364. {
  365. if (_disposed) return;
  366. if (disposing)
  367. {
  368. // Release managed resources
  369. IsAutoConnect = false;
  370. CancelReceive();
  371. _tokenSource.Cancel();
  372. }
  373. if (_tcpClient != null)
  374. {
  375. //CloseClientSocket();
  376. #if NET40 || NET45 || NET451 || NET452
  377. _socketClient?.Close();
  378. #else
  379. _tcpClient?.Dispose();
  380. #endif
  381. LogUtil.WriteInfo($"Tcp client Disposed");
  382. }
  383. _disposed = true;
  384. }
  385. private void CloseClientSocket()
  386. {
  387. try
  388. {
  389. var stream = _tcpClient.GetStream();
  390. stream.Dispose();
  391. _tcpClient.Client.Shutdown(SocketShutdown.Both);
  392. _tcpClient.Client.Dispose();
  393. }
  394. catch (Exception ex)
  395. {
  396. LogUtil.WriteInfo($"Tcp client client close exception:{ex}");
  397. }
  398. }
  399. private void StartReceive()
  400. {
  401. Task.Factory.StartNew(async () =>
  402. {
  403. while (_tcpClient != null)
  404. {
  405. try
  406. {
  407. var buffer = new byte[1024];
  408. var netStream = _tcpClient.GetStream();
  409. //int read = await _tcpClient.GetStream().ReadAsync(buffer, 0, buffer.Length);
  410. using (var memoryStream = new MemoryStream())
  411. {
  412. int length;
  413. do
  414. {
  415. length = await netStream.ReadAsync(buffer, 0, buffer.Length, _tokenSource.Token);
  416. if (length > 0)
  417. {
  418. //Array.Copy(buffer, data, length);
  419. await memoryStream.WriteAsync(buffer, 0, length);
  420. }
  421. else
  422. {
  423. LogUtil.WriteInfo($"服务端:{RemoteIpAddress}已经断开了连接");
  424. DisConnect();
  425. return;
  426. }
  427. } while (length == buffer.Length && netStream.DataAvailable);
  428. var msg = new Transmission(memoryStream.ToArray(), Transmission.EType.Received)
  429. {
  430. Destination = _tcpClient.Client.LocalEndPoint as IPEndPoint,
  431. Origin = _tcpClient.Client.RemoteEndPoint as IPEndPoint
  432. };
  433. OnReceived(new ReceivedEventArgs(msg));
  434. }
  435. }
  436. catch (Exception e)
  437. when (e is ObjectDisposedException || e is IOException ||e is InvalidOperationException ||e is TaskCanceledException)
  438. {
  439. LogUtil.WriteInfo($"读取服务端发生异常:{e}");
  440. DisConnect();
  441. break;
  442. }
  443. }
  444. },TaskCreationOptions.LongRunning);
  445. }
  446. public async Task<byte[]> ReceiveBytesAsync()
  447. {
  448. if (!IsConnected)
  449. {
  450. throw new NotConnectException("client未连接到服务器");
  451. }
  452. if (RequestQueue.Count > 0)
  453. {
  454. throw new BusyException("已经在执行一个任务");
  455. }
  456. if (_cacheReceived.Count > 0)
  457. {
  458. var first = _cacheReceived.First();
  459. _cacheReceived.RemoveAt(0);
  460. return await Task.FromResult(first.Message.Data);
  461. }
  462. var resultTask = new TaskCompletionSource<byte[]>();
  463. RequestQueue.Enqueue(resultTask);
  464. return await resultTask.Task;
  465. }
  466. public byte[] ReceiveBytes()
  467. {
  468. if (!IsConnected)
  469. {
  470. throw new NotConnectException("client未连接到服务器");
  471. }
  472. if (RequestQueue.Count > 0)
  473. {
  474. throw new BusyException("已经在执行一个任务");
  475. }
  476. if (_cacheReceived.Count>0)
  477. {
  478. var first= _cacheReceived.First();
  479. _cacheReceived.RemoveAt(0);
  480. return first.Message.Data;
  481. }
  482. var resultTask = new TaskCompletionSource<byte[]>();
  483. RequestQueue.Enqueue(resultTask);
  484. return resultTask.Task.GetAwaiter().GetResult();
  485. }
  486. public async Task<string> ReceiveStringAsync()
  487. {
  488. if (!IsConnected)
  489. {
  490. throw new NotConnectException("client未连接到服务器");
  491. }
  492. if (RequestQueue.Count>0)
  493. {
  494. throw new BusyException("已经在执行一个任务");
  495. }
  496. if (_cacheReceived.Count > 0)
  497. {
  498. var first = _cacheReceived.First();
  499. _cacheReceived.RemoveAt(0);
  500. var result1 = Encoding.GetString(first.Message.Data).TrimEnd();
  501. return await Task.FromResult(result1);
  502. }
  503. var resultTask = new TaskCompletionSource<byte[]>();
  504. RequestQueue.Enqueue(resultTask);
  505. var content= await resultTask.Task;
  506. var result = Encoding.GetString(content);
  507. return result.TrimEnd();
  508. }
  509. public string ReceiveString()
  510. {
  511. if (!IsConnected)
  512. {
  513. throw new NotConnectException("client未连接到服务器");
  514. }
  515. if (RequestQueue.Count > 0)
  516. {
  517. throw new BusyException("已经在执行一个任务");
  518. }
  519. var resultTask = new TaskCompletionSource<byte[]>();
  520. RequestQueue.Enqueue(resultTask);
  521. var content = resultTask.Task.GetAwaiter().GetResult();
  522. var result = Encoding.GetString(content);
  523. return result.TrimEnd();
  524. }
  525. public void CancelReceive()
  526. {
  527. for (var i = 0; i < RequestQueue.Count; i++)
  528. {
  529. RequestQueue.TryDequeue(out var taskCompletionSource);
  530. taskCompletionSource.TrySetCanceled();
  531. }
  532. _cacheReceived.Clear();
  533. }
  534. /// <summary>
  535. /// recommended async method because this method maybe cause deadlock
  536. /// </summary>
  537. /// <returns></returns>
  538. private void OnConnectStatusChanged(ConnectStatus status)
  539. {
  540. var ep = status == ConnectStatus.Connected ?
  541. _tcpClient.Client.RemoteEndPoint as IPEndPoint : new IPEndPoint(RemoteIpAddress, Port);
  542. if (IsConnected == (status == ConnectStatus.Connected)) return;
  543. StatusChanged?.Invoke(this, new TcpClientStatusEventArgs(status, ep));
  544. IsConnected = status == ConnectStatus.Connected;
  545. }
  546. private readonly List<ReceivedEventArgs> _cacheReceived;
  547. private void OnReceived(ReceivedEventArgs receivedEventArgs)
  548. {
  549. if (RequestQueue.TryDequeue(out var taskCompletionSource))
  550. {
  551. taskCompletionSource.SetResult(receivedEventArgs.Message.Data);
  552. }
  553. else
  554. {
  555. _cacheReceived.Add(receivedEventArgs);
  556. }
  557. Received?.Invoke(this, receivedEventArgs);
  558. LogUtil.WriteDebug($"{RemoteIpAddress}:{Port}收到消息:"+Encoding.GetString(receivedEventArgs.Message.Data));
  559. }
  560. private void OnSent(SentEventArgs sentEventArgs)
  561. {
  562. Sent?.Invoke(this, sentEventArgs);
  563. }
  564. }
  565. }