TcpServer.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Runtime.InteropServices;
  7. using System.Text;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using DotNetty.Buffers;
  11. using DotNetty.Codecs;
  12. using Team.Communicate.EventArg;
  13. using Team.Communicate.State;
  14. using Team.Utility;
  15. namespace Team.Communicate.Server
  16. {
  17. /// <summary>
  18. /// TcpServer
  19. /// </summary>
  20. public sealed class TeamTcpServer : ITeamTcpServer
  21. {
  22. #region Fields
  23. /// <summary>
  24. /// 服务器使用的异步TcpListener
  25. /// </summary>
  26. private TcpListener _listener;
  27. /// <summary>
  28. /// 客户端会话列表
  29. /// </summary>
  30. private readonly List<TcpClientState> _clients;
  31. private bool _disposed;
  32. #endregion
  33. #region Properties
  34. public IByteBuffer[] EndWith { get; set; } = Delimiters.LineDelimiter();
  35. /// <summary>
  36. /// 服务器是否正在运行
  37. /// </summary>
  38. public bool IsRunning { get; private set; }
  39. /// <summary>
  40. /// 监听的IP地址
  41. /// </summary>
  42. public IPAddress Address { get; private set; }
  43. /// <summary>
  44. /// 监听的端口
  45. /// </summary>
  46. public int Port { get; }
  47. public Encoding Encoding { get;set; } = Encoding.UTF8;
  48. #endregion
  49. #region 构造函数
  50. /// <inheritdoc />
  51. /// <summary>
  52. /// 异步TCP服务器
  53. /// </summary>
  54. /// <param name="listenPort">监听的端口</param>
  55. public TeamTcpServer(int listenPort)
  56. : this(IPAddress.Any, listenPort)
  57. {
  58. }
  59. /// <inheritdoc />
  60. /// <summary>
  61. /// 异步TCP服务器
  62. /// </summary>
  63. /// <param name="localEp">监听的终结点</param>
  64. public TeamTcpServer(IPEndPoint localEp)
  65. : this(localEp.Address, localEp.Port)
  66. {
  67. }
  68. /// <summary>
  69. /// 异步TCP服务器
  70. /// </summary>
  71. /// <param name="localIpAddress">监听的IP地址</param>
  72. /// <param name="listenPort">监听的端口</param>
  73. private TeamTcpServer(IPAddress localIpAddress, int listenPort)
  74. {
  75. Address = localIpAddress;
  76. Port = listenPort;
  77. Encoding = Encoding.UTF8;
  78. _clients = new List<TcpClientState>();
  79. _listener = new TcpListener(Address, Port);
  80. _listener.AllowNatTraversal(true);
  81. }
  82. #endregion
  83. #region Method
  84. /// <inheritdoc />
  85. /// <summary>
  86. /// 启动服务器
  87. /// </summary>
  88. public void Start()
  89. {
  90. if (IsRunning) return;
  91. IsRunning = true;
  92. _listener.Start();
  93. _listener.BeginAcceptTcpClient(
  94. HandleTcpClientAccepted, _listener);
  95. }
  96. /// <inheritdoc />
  97. /// <summary>
  98. /// 启动服务器
  99. /// </summary>
  100. /// <param name="backlog">
  101. /// 服务器所允许的挂起连接序列的最大长度
  102. /// </param>
  103. public void Start(int backlog)
  104. {
  105. if (IsRunning) return;
  106. IsRunning = true;
  107. _listener.Start(backlog);
  108. //_listener.BeginAcceptTcpClient(
  109. // HandleTcpClientAccepted, _listener);
  110. new Thread(async () =>
  111. {
  112. while (true)
  113. {
  114. var client = await _listener.AcceptTcpClientAsync();
  115. var buffer = new byte[client.ReceiveBufferSize];
  116. var state
  117. = new TcpClientState(client, buffer);
  118. _clients.Add(state);
  119. var thread = new Thread(NetWorkMethod);
  120. thread.Start(state);
  121. if (!IsRunning)
  122. {
  123. break;
  124. }
  125. }
  126. })
  127. {
  128. IsBackground = true
  129. }.Start();
  130. }
  131. private async void NetWorkMethod(object client)
  132. {
  133. if (client is TcpClientState tcpClientState)
  134. {
  135. var buffer = new byte[1024];
  136. var length= await tcpClientState.NetworkStream.ReadAsync(buffer,0,buffer.Length);
  137. }
  138. }
  139. /// <inheritdoc />
  140. /// <summary>
  141. /// 停止服务器
  142. /// </summary>
  143. public void Stop()
  144. {
  145. if (!IsRunning) return;
  146. IsRunning = false;
  147. _listener.Stop();
  148. lock (_clients)
  149. {
  150. //关闭所有客户端连接
  151. CloseAllClient();
  152. }
  153. }
  154. /// <summary>
  155. /// 处理客户端连接的函数
  156. /// </summary>
  157. /// <param name="ar"></param>
  158. private void HandleTcpClientAccepted(IAsyncResult ar)
  159. {
  160. if (!IsRunning) return;
  161. var client = _listener.EndAcceptTcpClient(ar);
  162. var buffer = new byte[client.ReceiveBufferSize];
  163. var state
  164. = new TcpClientState(client, buffer);
  165. lock (_clients)
  166. {
  167. _clients.Add(state);
  168. RaiseClientConnected(state);
  169. }
  170. var stream = state.NetworkStream;
  171. //开始异步读取数据
  172. stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state);
  173. _listener.BeginAcceptTcpClient(
  174. HandleTcpClientAccepted, ar.AsyncState);
  175. }
  176. /// <summary>
  177. /// 数据接受回调函数
  178. /// </summary>
  179. /// <param name="ar"></param>
  180. private void HandleDataReceived(IAsyncResult ar)
  181. {
  182. if (!IsRunning) return;
  183. var state = (TcpClientState) ar.AsyncState;
  184. var stream = state.NetworkStream;
  185. int rec;
  186. try
  187. {
  188. rec = stream.EndRead(ar);
  189. }
  190. catch (Exception e)
  191. {
  192. LogUtil.WriteInfo("server 发生异常:" + e);
  193. rec = 0;
  194. }
  195. if (rec == 0)
  196. {
  197. // connection has been closed
  198. lock (_clients)
  199. {
  200. _clients.Remove(state);
  201. //触发客户端连接断开事件
  202. RaiseClientDisconnected();
  203. return;
  204. }
  205. }
  206. // received byte and trigger event notification
  207. var buff = new byte[rec];
  208. Buffer.BlockCopy(state.Buffer, 0, buff, 0, rec);
  209. state.Memory = new MemoryStream();
  210. state.Memory.Write(buff, 0, buff.Length);
  211. //state.Buffer = buff;
  212. //触发数据收到事件
  213. try
  214. {
  215. RaiseDataReceived(state);
  216. }
  217. catch (Exception)
  218. {
  219. lock (_clients)
  220. {
  221. _clients.Remove(state);
  222. //触发客户端连接断开事件
  223. RaiseClientDisconnected();
  224. return;
  225. }
  226. }
  227. // continue listening for tcp diagram packets
  228. stream.BeginRead(state.Buffer, 0, state.Buffer.Length, HandleDataReceived, state);
  229. }
  230. /// <summary>
  231. /// 发送数据
  232. /// </summary>
  233. /// <param name="state">接收数据的客户端会话</param>
  234. /// <param name="data">数据报文</param>
  235. public void Send(TcpClientState state, byte[] data)
  236. {
  237. RaisePrepareSend(state);
  238. Send(state.TcpClient, data);
  239. }
  240. /// <summary>
  241. /// 异步发送数据
  242. /// </summary>
  243. /// <param name="state">接收数据的客户端会话</param>
  244. /// <param name="data">数据报文</param>
  245. public Task SendAsync(TcpClientState state, byte[] data)
  246. {
  247. RaisePrepareSend(state);
  248. return SendAsync(state.TcpClient, data);
  249. }
  250. /// <summary>
  251. /// 指定IP地址发送数据
  252. /// </summary>
  253. /// <param name="ip"></param>
  254. /// <param name="data"></param>
  255. public void Send(string ip, byte[] data)
  256. {
  257. var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip);
  258. RaisePrepareSend(tcpClient);
  259. if (!IsRunning)
  260. throw new InvalidProgramException("This TCP Socket server has not been started.");
  261. if (ip == null)
  262. throw new ArgumentNullException(nameof(ip));
  263. if (data == null)
  264. throw new ArgumentNullException(nameof(data));
  265. tcpClient.TcpClient.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, tcpClient.TcpClient);
  266. }
  267. /// <summary>
  268. /// 指定IP地址异步发送数据
  269. /// </summary>
  270. /// <param name="ip"></param>
  271. /// <param name="data"></param>
  272. public Task SendAsync(string ip, byte[] data)
  273. {
  274. var tcpClient = _clients.Find(p => p.TcpClient.Client.RemoteEndPoint.ToString() == ip);
  275. if (tcpClient==null)
  276. {
  277. return Task.CompletedTask;
  278. }
  279. RaisePrepareSend(tcpClient);
  280. if (!IsRunning)
  281. throw new InvalidProgramException("This TCP Socket server has not been started.");
  282. if (ip == null)
  283. throw new ArgumentNullException(nameof(ip));
  284. if (data == null)
  285. throw new ArgumentNullException(nameof(data));
  286. var stream = tcpClient.TcpClient.GetStream();
  287. return stream.WriteAsync(data, 0, data.Length);
  288. }
  289. /// <summary>
  290. /// 异步发送数据至指定的客户端
  291. /// </summary>
  292. /// <param name="client">客户端</param>
  293. /// <param name="data">报文</param>
  294. public void Send(TcpClient client, byte[] data)
  295. {
  296. if (!IsRunning)
  297. throw new InvalidProgramException("This TCP Socket server has not been started.");
  298. if (client == null)
  299. throw new ArgumentNullException(nameof(client));
  300. if (data == null)
  301. throw new ArgumentNullException(nameof(data));
  302. client.GetStream().BeginWrite(data, 0, data.Length, SendDataEnd, client);
  303. }
  304. /// <summary>
  305. /// 异步发送数据至指定的客户端
  306. /// </summary>
  307. /// <param name="client">客户端</param>
  308. /// <param name="data">报文</param>
  309. public Task SendAsync(TcpClient client, byte[] data)
  310. {
  311. if (!IsRunning)
  312. throw new InvalidProgramException("This TCP Socket server has not been started.");
  313. if (client == null)
  314. throw new ArgumentNullException(nameof(client));
  315. if (data == null)
  316. throw new ArgumentNullException(nameof(data));
  317. var stream = client.GetStream();
  318. return stream.WriteAsync(data, 0, data.Length);
  319. }
  320. /// <summary>
  321. /// 发送数据完成处理函数
  322. /// </summary>
  323. /// <param name="ar">目标客户端Socket</param>
  324. private void SendDataEnd(IAsyncResult ar)
  325. {
  326. ((TcpClient) ar.AsyncState).GetStream().EndWrite(ar);
  327. RaiseCompletedSend(null);
  328. }
  329. /// <summary>
  330. /// 广播发送数据
  331. /// </summary>
  332. /// <param name="msg">消息内容</param>
  333. public void BroadToClient(string msg)
  334. {
  335. foreach (var client in _clients)
  336. {
  337. var bytes = Encoding.GetBytes(msg);
  338. Send(client, bytes);
  339. }
  340. }
  341. public async Task BroadToClientAsync(string msg)
  342. {
  343. foreach (var client in _clients)
  344. {
  345. var bytes = Encoding.GetBytes(msg);
  346. await SendAsync(client, bytes);
  347. }
  348. }
  349. #endregion
  350. #region 事件
  351. /// <summary>
  352. /// 与客户端的连接已建立事件
  353. /// </summary>
  354. public event EventHandler<ServerEventArgs> ClientConnected;
  355. /// <summary>
  356. /// 与客户端的连接已断开事件
  357. /// </summary>
  358. public event EventHandler<ServerEventArgs> ClientDisconnected;
  359. /// <summary>
  360. /// 触发客户端连接事件
  361. /// </summary>
  362. /// <param name="state"></param>
  363. private void RaiseClientConnected(TcpClientState state)
  364. {
  365. ClientConnected?.Invoke(this, new ServerEventArgs(state));
  366. }
  367. /// <summary>
  368. /// 触发客户端连接断开事件
  369. /// </summary>
  370. private void RaiseClientDisconnected()
  371. {
  372. ClientDisconnected?.Invoke(this, new ServerEventArgs("连接断开"));
  373. }
  374. /// <inheritdoc />
  375. /// <summary>
  376. /// 接收到数据事件
  377. /// </summary>
  378. public event EventHandler<ServerEventArgs> DataReceived;
  379. private void RaiseDataReceived(TcpClientState state)
  380. {
  381. DataReceived?.BeginInvoke(this, new ServerEventArgs(state), _ =>
  382. {
  383. } ,null);
  384. }
  385. /// <summary>
  386. /// 发送数据前的事件
  387. /// </summary>
  388. public event EventHandler<ServerEventArgs> PrepareSend;
  389. /// <summary>
  390. /// 触发发送数据前的事件
  391. /// </summary>
  392. /// <param name="state"></param>
  393. private void RaisePrepareSend(TcpClientState state)
  394. {
  395. PrepareSend?.Invoke(this, new ServerEventArgs(state));
  396. }
  397. /// <summary>
  398. /// 数据发送完毕事件
  399. /// </summary>
  400. public event EventHandler<ServerEventArgs> CompletedSend;
  401. /// <summary>
  402. /// 触发数据发送完毕的事件
  403. /// </summary>
  404. /// <param name="state"></param>
  405. private void RaiseCompletedSend(TcpClientState state)
  406. {
  407. CompletedSend?.Invoke(this, new ServerEventArgs(state));
  408. }
  409. /// <summary>
  410. /// 网络错误事件
  411. /// </summary>
  412. public event EventHandler<ServerEventArgs> NetError;
  413. /// <summary>
  414. /// 异常事件
  415. /// </summary>
  416. public event EventHandler<ServerEventArgs> OtherException;
  417. /// <summary>
  418. /// 触发异常事件
  419. /// </summary>
  420. /// <param name="state"></param>
  421. /// <param name="description"></param>
  422. private void RaiseOtherException(TcpClientState state, string description = "unknown exception")
  423. {
  424. OtherException?.Invoke(this, new ServerEventArgs(description, state));
  425. }
  426. #endregion
  427. #region Close
  428. /// <inheritdoc />
  429. /// <summary>
  430. /// 关闭一个与客户端之间的会话
  431. /// </summary>
  432. /// <param name="state">需要关闭的客户端会话对象</param>
  433. public void Close(TcpClientState state)
  434. {
  435. if (state == null) return;
  436. state.Close();
  437. _clients.Remove(state);
  438. }
  439. /// <inheritdoc />
  440. /// <summary>
  441. /// 关闭所有的客户端会话,与所有的客户端连接会断开
  442. /// </summary>
  443. public void CloseAllClient()
  444. {
  445. foreach (var client in _clients)
  446. {
  447. client?.Close();
  448. }
  449. _clients.Clear();
  450. }
  451. #endregion
  452. #region 释放
  453. /// <inheritdoc>
  454. /// <cref></cref>
  455. /// </inheritdoc>
  456. /// <summary>
  457. /// Performs application-defined tasks associated with freeing,
  458. /// releasing, or resetting unmanaged resources.
  459. /// </summary>
  460. public void Dispose()
  461. {
  462. Dispose(true);
  463. GC.SuppressFinalize(this);
  464. }
  465. ~TeamTcpServer()
  466. {
  467. Dispose();
  468. }
  469. /// <summary>
  470. /// Releases unmanaged and - optionally - managed resources
  471. /// </summary>
  472. /// <param name="disposing"><c>true</c> to release
  473. /// both managed and unmanaged resources; <c>false</c>
  474. /// to release only unmanaged resources.</param>
  475. private void Dispose(bool disposing)
  476. {
  477. if (_disposed) return;
  478. if (disposing)
  479. {
  480. try
  481. {
  482. Stop();
  483. // ReSharper disable once RedundantCheckBeforeAssignment
  484. if (_listener != null)
  485. {
  486. _listener = null;
  487. }
  488. }
  489. catch (SocketException e)
  490. {
  491. RaiseOtherException(null, e.ToString());
  492. }
  493. }
  494. _disposed = true;
  495. }
  496. public void UseUnRegisterMessage(string key)
  497. {
  498. }
  499. public void Register(string id, Func<string, NetworkStream, string> func)
  500. {
  501. }
  502. public void UnRegister(string id)
  503. {
  504. }
  505. #endregion
  506. }
  507. }