c# Redis 기반 경량 메시지 구성 요소를 구현하려면

최근에 경량급 ASP를 개발하고 있습니다.NET MVC 개발 프레임워크는 로그 기록, 메일 발송, 문자 발송 등 기능을 추가해야 한다. 모듈의 독립성을 유지하기 위해 메시지 통신 방식을 통해 처리해야 한다. 프레임워크를 배치하고 사용하기 위해 2차 개발 과정에서의 간편하고 편리성을 유지해야 하기 때문에 전통적인 MQ를 선택하지 않고 Redis의 구독 발표를 바탕으로 한 시스템 내부 메시지 구성 요소를 실현한다. 말이 많지 않고 코드를 올린다!

데이터 구조 정의


메시지 실체는 몇 가지 부분을 포함한다. 구독 채널 이름, 정보 헤더, 정보체, 정보 차별화 추가 정보 사전, 정보 헤더는 주로 메시지 표지, 메시지 날짜, 정보체 포함 정보 내용, 정보 실체 유형 등을 포함한다.

public class Message
 {
     public string MessageChannel { set; get; }
     public MessageHead @MessageHead { set; get; }
     public MessageBody @MessageBody { set; get; }
 
     [JsonExtensionData]
     public Dictionary<string,Object> @MessageExtra { set; get; }
 
     public Message()
     {
 
     }
 
     public void AddExtra(string Name, string Value)
     {
         if (@MessageExtra == null)
         {
             @MessageExtra = new Dictionary<string, object>();
         }
         @MessageExtra.Add(Name, Value);
     }
 
     public Object GetExtra(string Name)
     {
         return @MessageExtra[Name];
     }
 }
 
 public class MessageHead
 {
     public string MessageID { set; get; }
     public DateTime MessageDate { set; get; }
 
     public MessageHead()
     {
         MessageID = CommonUtil.CreateCommonGuid();
         MessageDate = DateTime.Now;
     }
 }
 
 public class MessageBody
 {
     public string MessageJsonContent { set; get; }
     public Type MessageMapperType { set; get; }
 }
주: 메시지 구독 발표 전달 과정에서 저는 Json을 통해 서열화되어 전송되었기 때문에 사용 과정에서 정보에 대한 추가 키 값이 필요할 수 있습니다. 여기서 대상에 정의된 것은 Dictinary 대상이지만 Dictinary 자체는 서열화를 지원하지 않기 때문에 Json ExtensionData를 추가해야 합니다.

구독 채널 선언


우리가 달성해야 할 효과는 시스템이 시작될 때 모든 메시지 채널은 시스템의 응용에 따라 자동으로 구독할 수 있다는 것이다. 여기에 우리의 구독 채널이 메시지를 수신하는 실현 클래스를 표시하는 주석이 필요하다.

[AttributeUsage(AttributeTargets.Class)]
    public class MessageChanelAttribute : Attribute
    {
        private string _ChannleName;
        public string ChannelName
        {
            get
            {
                return this._ChannleName;
            }
            set
            {
                this._ChannleName = value;
            }
 
        }
    }

메시지의 개성화된 정책 처리


Redis의 타사 라이브러리 StackExchange를 사용합니다.Redis.dll, 메시지 구독을 할 때 채널에 메시지를 받을 때의 처리 의뢰를 지정해야 합니다. 우리는 자동 구독을 하는 과정에서 각종 메시지 처리 클래스를 수집하고 채널과 일일이 대응해야 합니다. 이때 우리는 기본 클래스인 FastDefault MessageHandler가 필요합니다. 우리의 구체적인 메시지 처리 클래스는 FastDefault MessageHandler에서 계승하고 처리 방법을 다시 쓰면 됩니다.

[Component]
   [MessageChanelAttribute(ChannelName = "DefaultMessage")]
   public class FastDefaultMessageHandler : IFastMessageHandle
   {
       [AutoWired]
       public DBUtil @DBUtil;
 
       public void HandleMessage(RedisChannel ChannelName, RedisValue Message)
       {
           FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message);
           try
           {
               if (!CheckMessageIsConsume(Entity))
               {
                   this.CustomHandle(Entity);
               }
           }
           catch (Exception e)
           {
               StringBuilder ExceptionLog = new StringBuilder();
               ExceptionLog.AppendFormat(" Message Channel:{0}", Entity.MessageChannel + Environment.NewLine);
               ExceptionLog.AppendFormat(" Message :{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine);
               ExceptionLog.AppendFormat(" Message :{0}", Message + Environment.NewLine);
               ExceptionLog.AppendFormat(" :{0}", e.Message + Environment.NewLine);
               LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine);
               ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine);
               MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity);
           }
           finally
           {
               MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID);
           }
 
       }
 
       public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message)
       {
 
       }
 
       public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message)
       {
           return false;
       }
   }
그 중에서 Handle Message 방법은 우리가 채널을 구독할 때 대응하는 의뢰입니다. 클래스에 있는Custom Handle의 허위 방법을 호출합니다. 하위 클래스는 이 방법을 다시 쓰는 것을 계승합니다. 이 방법은 다중 모드를 바탕으로 정책 호출을 합니다. CheckMessage IsConsume 방법은 메시지가 중복 소비되었는지 확인하는 데 사용되고 다시 쓸 수도 있습니다. 다음은 로그 클래스에 접근하는 실례를 보십시오.MessageChanelAttribute를 사용하여 이 구현 클래스가 구독하고 발표해야 하는 채널의 이름은 Visit이고 CustomHandle 방법에서 데이터베이스 삽입 작업이 이루어지며 CheckMessageIsConsume 방법으로 이 로그 데이터가 소비되었는지 판단합니다(데이터베이스에 이미 존재함).

[MessageChanelAttribute(ChannelName = "Visit")]
public class VisitLog : FastDefaultMessageHandler
{
    public override void CustomHandle(Message.Design.Message Message)
    {
        Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
        @DBUtil.Insert(LogEntity);
        base.CustomHandle(Message);
    }
 
    public override bool CheckMessageIsConsume(Message.Design.Message Message)
    {
        Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
        DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid);
        if (Row.IsExist())
        {
            return true;
        }
        else
        {
            return false;
        }
    }
}

메시지 자동 구독


우리는 시스템이 시작할 때 채널과 구현 클래스를 정의하여 자동으로 구독을 실현하기를 희망합니다. 여기는 IOC 용기에 사용해야 합니다. 시스템을 시작할 때 모든 메시지 처리 클래스를 용기에 넣고 자동 구독할 때 모두 꺼내서 메시지 처리 클래스에서 설명한 채널 이름에 따라 자동 구독을 해야 합니다.

public void Init()
      {
          List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle));
          foreach (Type HandlerType in HandlerTypeList)
          {
              MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute;
              RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage);
          }
      }
주:
1. 이곳의 IOC 용기는 제가 직접 구현한 것입니다. 주소: https://gitee.com/grassprogramming/FastIOC, 여러분은 AutoFac로 대체할 수 있습니다.
2. Redisutil은 StackExchange에 대한 것입니다.Redis.dll 봉인 처리 클래스, 주소: https://gitee.com/grassprogramming/FastUtil

메시지 보내기


메시지는 Redis의 발표 방법을 호출하면 됩니다. 채널 이름과 정의된 데이터 실체 클래스를 Json으로 서열화합니다.

public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = null)
   {
       FastExecutor.Message.Design.Message MessageEntity = new Design.Message();
       MessageEntity.MessageChannel = ChannleName;
       MessageHead Head = new MessageHead();
       MessageBody Body = new MessageBody();
       Body.MessageMapperType = typeof(T);
       Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity);
       MessageEntity.MessageHead = Head;
       MessageEntity.MessageBody = Body;
       if (ExtraData != null)
       {
           foreach (var item in ExtraData)
           {
               MessageEntity.AddExtra(item.Key, item.Value);
           }
       }
       RedisUtil.Publish(ChannleName, MessageEntity);
       MessageACK.CopyMessageToACKList(ChannleName, MessageEntity);
   }

메시지 확인 및 저장


Redis가 구독 게시 모드를 메시지 구성 요소로 하는 문제는 두 가지가 있습니다.

문제: 메시지 소비 완료 확인 메커니즘 없음


솔루션
Redis의 Hash 저장 방식을 바탕으로 메시지 저장 필드를 만들어서 메시지를 보낼 때 메시지 Hash 사전으로 복사하고 소비가 끝난 후에 삭제합니다. SendMessage의 메시지 ACK에 대응합니다.CopyMessageToACKList 방법 및 FastDefaultMessageHandler의 MessageACK.ConfirmMessageFinish 방법, 본질은 Hash 사전의 추가 및 삭제 기능

문제: 메시지 처리단이 끊겼습니다. 다시 시작하면 메시지가 사라집니다.


솔루션
확인 메커니즘은 메시지가 소비되지 않아도 처리단의 다운타임이 없어지지 않도록 보장합니다. 주의해야 할 것은 메시지가 분실되지 않은 것은 Hash 사전에만 저장되어 있지만 메시지 채널에 존재하지 않기 때문에 시스템이 시작될 때마다 이 Hash 사전을 스캔하여 채널을 다시 발표합니다. 그러면 중복 소비를 초래할 수 있습니다.따라서 FastDefault MessageHandler의 CheckMessage IsConsume 방법으로 판단해야 합니다. 또한 메시지 처리자 자체의 처리 이상을 기록해야 합니다. 예를 들어 문자 공급업체의 인터페이스에 문제가 있습니다. 메시지 처리 이상은 Redis의 Channel Exception 채널에 들어가기 때문에 우리는 수요에 따라 가시화 인터페이스를 통해 수동 복구 여부를 결정할 수 있습니다.

마지막


Message 구성 요소 관련 코드 주소: https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message
부족 문제: 메시지가 단순 기록 로그 문제라면 메시지가 소비되었는지 확인할 방법이 없습니다
만약 여러분에게 좋은 건의가 있다면, 메시지를 남겨 함께 교류하고 공부하며 함께 진보할 수 있습니다
이상은 c#Redis를 바탕으로 경량급 메시지 구성 요소를 실현하는 절차에 대한 상세한 내용입니다. c#Redis를 바탕으로 메시지 구성 요소를 실현하는 것에 대한 더 많은 자료는 저희 다른 관련 글을 주목해 주십시오!

좋은 웹페이지 즐겨찾기