■ NATS 서버를 사용해 메시지를 송수신하는 방법을 보여준다.
[TestCommon 프로젝트]
▶ MessageReceivedEventArgs.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
using System; using NATS.Client; namespace TestCommon { /// <summary> /// 메시지 수신시 이벤트 인자 /// </summary> public class MessageReceivedEventArgs : EventArgs { //////////////////////////////////////////////////////////////////////////////////////////////////// Field ////////////////////////////////////////////////////////////////////////////////////////// Private #region Field /// <summary> /// 메시지 /// </summary> private Msg message; /// <summary> /// 응답 데이터 /// </summary> private byte[] replyData; #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Property ////////////////////////////////////////////////////////////////////////////////////////// Public #region 주제 - Subject /// <summary> /// 주제 /// </summary> public string Subject { get { return this.message.Subject; } } #endregion #region 응답 - Reply /// <summary> /// 응답 /// </summary> public string Reply { get { return this.message.Reply; } } #endregion #region 데이터 - Data /// <summary> /// 데이터 /// </summary> public byte[] Data { get { return this.message.Data; } } #endregion #region 응답 데이터 - ReplyData /// <summary> /// 응답 데이터 /// </summary> public byte[] ReplyData { get { return this.replyData; } set { this.replyData = value; } } #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Constructor ////////////////////////////////////////////////////////////////////////////////////////// Public #region 생성자 - MessageReceivedEventArgs(message) /// <summary> /// 생성자 /// </summary> /// <param name="message">메시지</param> public MessageReceivedEventArgs(Msg message) { this.message = message; this.replyData = null; } #endregion } } |
▶ Receiver.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
using System; using NATS.Client; namespace TestCommon { /// <summary> /// 수신기 /// </summary> public class Receiver { //////////////////////////////////////////////////////////////////////////////////////////////////// Event ////////////////////////////////////////////////////////////////////////////////////////// Public #region 메시지 수신시 - MessageReceived /// <summary> /// 메시지 수신시 /// </summary> public event EventHandler<MessageReceivedEventArgs> MessageReceived; #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Field ////////////////////////////////////////////////////////////////////////////////////////// Private #region Field /// <summary> /// 메시지 서버 URL /// </summary> private string messageServerURL = null; /// <summary> /// 메시지 주제 /// </summary> private string messageSubject = null; /// <summary> /// 메시지 큐 /// </summary> private string messageQueue = null; /// <summary> /// 연결 /// </summary> private IConnection connection = null; /// <summary> /// 메시지 핸들러 /// </summary> private EventHandler<MsgHandlerEventArgs> messageHandler = null; /// <summary> /// 비동기 구독 인터페이스 /// </summary> private IAsyncSubscription subscription = null; /// <summary> /// 잠금 객체 /// </summary> private object lockObject = new object(); /// <summary> /// 작업 여부 /// </summary> private bool isWorking = false; #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Constructor ////////////////////////////////////////////////////////////////////////////////////////// Public #region 생성자 - Receiver() /// <summary> /// 생성자 /// </summary> public Receiver() { } #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Method ////////////////////////////////////////////////////////////////////////////////////////// Public #region 초기화 하기 - Initialize(messageServerURL, messageSubject, messageQueue) /// <summary> /// 초기화 하기 /// </summary> /// <param name="messageServerURL">메시지 서버 URL</param> /// <param name="messageSubject">메시지 주제</param> /// <param name="messageQueue">메시지 큐</param> public void Initialize(string messageServerURL, string messageSubject, string messageQueue) { this.messageServerURL = messageServerURL; this.messageSubject = messageSubject; this.messageQueue = messageQueue; Options options = ConnectionFactory.GetDefaultOptions(); options.Url = this.messageServerURL; this.connection = new ConnectionFactory().CreateConnection(options); this.messageHandler = (sender, e) => { lock(this.lockObject) { FireMessageReceivedEvent(e.Message); } }; } #endregion #region 서버 시작하기 - Start() /// <summary> /// 서버 시작하기 /// </summary> public void Start() { if(this.isWorking) { return; } this.isWorking = true; try { this.subscription = this.connection.SubscribeAsync(this.messageSubject, this.messageQueue, this.messageHandler); } catch(Exception exception) { this.isWorking = false; throw exception; } } #endregion #region 서버 중단하기 - Stop() /// <summary> /// 서버 중단하기 /// </summary> public void Stop() { if(!this.isWorking) { return; } this.isWorking = false; lock(this.lockObject) { if(this.subscription != null) { this.subscription.Dispose(); this.connection.Dispose(); } } } #endregion ////////////////////////////////////////////////////////////////////////////////////////// Protected #region 메시지 수신시 이벤트 발생시키기 - FireMessageReceivedEvent(message) /// <summary> /// 메시지 수신시 이벤트 발생시키기 /// </summary> /// <param name="message">메시지</param> protected void FireMessageReceivedEvent(Msg message) { MessageReceivedEventArgs e = new MessageReceivedEventArgs(message); MessageReceived?.Invoke(this, e); if(e.ReplyData != null) { message.ArrivalSubcription.Connection.Publish(message.Reply, e.ReplyData); } } #endregion } } |
▶ Sender.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
using System; using System.Text; using NATS.Client; namespace TestCommon { /// <summary> /// 송신기 /// </summary> public class Sender : IDisposable { //////////////////////////////////////////////////////////////////////////////////////////////////// Field ////////////////////////////////////////////////////////////////////////////////////////// Private #region Field /// <summary> /// 메시지 서버 URL /// </summary> private string messageServerURL = null; /// <summary> /// 메시지 주제 /// </summary> private string messageSubject = null; /// <summary> /// 연결 /// </summary> private IConnection connection = null; #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Constructor ////////////////////////////////////////////////////////////////////////////////////////// Public #region 생성자 - Sender() /// <summary> /// 생성자 /// </summary> public Sender() { } #endregion //////////////////////////////////////////////////////////////////////////////////////////////////// Method ////////////////////////////////////////////////////////////////////////////////////////// Public #region 초기화 하기 - Initialize(messageServerURL, messageSubject) /// <summary> /// 초기화 하기 /// </summary> /// <param name="messageServerURL">메시지 서버 URL</param> /// <param name="messageSubject">메시지 주제</param> public void Initialize(string messageServerURL, string messageSubject) { this.messageServerURL = messageServerURL; this.messageSubject = messageSubject; Options options = ConnectionFactory.GetDefaultOptions(); options.Url = this.messageServerURL; this.connection = new ConnectionFactory().CreateConnection(options); } #endregion #region 메시지 송신하기 - SendMessage(source) /// <summary> /// 메시지 송신하기 /// </summary> /// <param name="source">소스 문자열</param> public void SendMessage(string source) { if(this.connection != null) { byte[] sourceByteArray = Encoding.UTF8.GetBytes(source); this.connection.Publish(this.messageSubject, sourceByteArray); this.connection.Flush(); } } #endregion #region 메시지 요청하기 - RequestMessage(source, timeout) /// <summary> /// 메시지 요청하기 /// </summary> /// <param name="source">소스 문자열</param> /// <param name="timeout">타임아웃</param> /// <returns>결과 문자열</returns> public string RequestMessage(string source, int timeout) { if(this.connection != null) { byte[] sourceByteArray = Encoding.UTF8.GetBytes(source); Msg replyMessage = this.connection.Request(this.messageSubject, sourceByteArray, timeout); string reply = Encoding.UTF8.GetString(replyMessage.Data); return reply; } else { return null; } } #endregion #region (IDisposable) 리소스 해제하기 - Dispose() /// <summary> /// 리소스 해제하기 /// </summary> public void Dispose() { if(this.connection != null) { this.connection.Dispose(); } } #endregion } } |
[TestReceiver 프로젝트]
▶ Program.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
using System; using System.Text; using TestCommon; namespace TestReceiver { /// <summary> /// 프로그램 /// </summary> class Program { //////////////////////////////////////////////////////////////////////////////////////////////////// Method ////////////////////////////////////////////////////////////////////////////////////////// Static //////////////////////////////////////////////////////////////////////////////// Private #region 프로그램 시작하기 - Main() /// <summary> /// 프로그램 시작하기 /// </summary> private static void Main() { Receiver receiver = new Receiver(); receiver.Initialize("127.0.0.1:4222", "foo", "aaa"); receiver.MessageReceived += receiver_MessageReceived; Console.WriteLine("수신기를 초기화 했습니다."); receiver.Start(); Console.WriteLine("수신기를 시작했습니다."); Console.ReadKey(); receiver.Stop(); Console.WriteLine("수신기를 중단했습니다."); } #endregion #region 수신기 메시지 수신시 처리하기 - receiver_MessageReceived(sender, e) /// <summary> /// 수신기 메시지 수신시 처리하기 /// </summary> /// <param name="sender">이벤트 발생자</param> /// <param name="e">이벤트 인자</param> private static void receiver_MessageReceived(object sender, MessageReceivedEventArgs e) { Console.WriteLine("SUBJECT : {0}", e.Subject); Console.WriteLine("REPLY : {0}", e.Reply ); string source = Encoding.UTF8.GetString(e.Data); Console.WriteLine("메시지 수신 : {0}", source); if(!string.IsNullOrEmpty(e.Reply)) { e.ReplyData = Encoding.UTF8.GetBytes("reply : " + DateTime.Now.ToString()); } } #endregion } } |
[TestSender 프로젝트]
▶ Program.cs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
using System; using TestCommon; namespace TestSender { /// <summary> /// 프로그램 /// </summary> class Program { //////////////////////////////////////////////////////////////////////////////////////////////////// Method ////////////////////////////////////////////////////////////////////////////////////////// Static //////////////////////////////////////////////////////////////////////////////// Private #region 프로그램 시작하기 - Main() /// <summary> /// 프로그램 시작하기 /// </summary> private static void Main() { Console.WriteLine("메시지 전송을 위해 아무 키나 눌러 주시기 바랍니다."); Console.ReadKey(); //using(Sender sender = new Sender()) //{ // sender.Initialize("127.0.0.1:4222", "foo"); // // for(int i = 0; i < 10; i++) // { // sender.SendMessage(string.Format("테스트 메시지 : {0}", i + 1)); // } //} using(Sender sender = new Sender()) { sender.Initialize("127.0.0.1:4222", "foo"); for(int i = 0; i < 10; i++) { string reply = sender.RequestMessage(string.Format("테스트 메시지 : {0}", i + 1), 5000); Console.WriteLine(reply); } } } #endregion } } |