AI智能
改变未来

.NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)–学习笔记


2.6.7 RabbitMQ — Masstransit 详解

  • Consumer 消费者
  • Producer 生产者
  • Request-Response 请求-响应

Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息

消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

  • Consumer
  • Instance
  • Handler
  • Others

Consumer

public class Program{public static async Task Main(){var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>{cfg.ReceiveEndpoint(\"order-service\", e =>{e.Consumer<SubmitOrderConsumer>();});});}}

继承 IConsumer,实现 Consume 方法

class SubmitOrderConsumer :IConsumer<SubmitOrder>{public async Task Consume(ConsumeContext<SubmitOrder> context){await context.Publish<OrderSubmitted>(new{context.Message.OrderId});}}

三个原则:

  • 拥抱 The Hollywood Principle, which states, \”Dont\’t call us, we\’ll call you.\”
  • Consume 方法是一个被等待的方法,在执行中时其他消费者无法接收到这个消息,当这个方法完成的时候,消息被 ack,并且从队列中移除
  • Task 方法异常会导致消息触发 retry,如果没有配置重试,消息将被投递到失败队列

Instance

public class Program{public static async Task Main(){var submitOrderConsumer = new SubmitOrderConsumer();var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>{cfg.ReceiveEndpoint(\"order-service\", e =>{e.Instance(submitOrderConsumer);});});}}

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

Consumer 每次接收到消息都会 new 一个实例

Handler

public class Program{public static async Task Main(){var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>{cfg.ReceiveEndpoint(\"order-service\", e =>{e.Handler<SubmitOrder>(async context =>{await Console.Out.WriteLineAsync($\"Submit Order Received: {context.Message.OrderId}\");});});});}}

通过一个委托 Lambda 方法,来消费消息

Others

  • Saga<>
  • StateMachineSaga<>

Producer 生产者

消息的生产可以通过两种方式产生:发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则,消息被定义为:命令 command 和事件 event

  • send
  • publish

send

可以调用以下对象的 send 方法来发送 command:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • ISendEndpointProvider(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

ConsumeContext

public class SubmitOrderConsumer :IConsumer<SubmitOrder>{private readonly IOrderSubmitter _orderSubmitter;public SubmitOrderConsumer(IOrderSubmitter submitter)=> _orderSubmitter = submitter;public async Task Consume(IConsumeContext<SubmitOrder> context){await _orderSubmitter.Process(context.Message);await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));}}

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider){var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);await endpoint.Send(new SubmitOrder { OrderIdad8= \"123\" });}

publish

  • 发送地址
  • 短地址
  • Convention Map

发送地址

  • rabbitmq://localhost/input-queue
  • rabbitmq://localhost/input-queue?durable=false

短地址

  • GetSendEndpoint(new Uri(\”queue:input-queue\”))

Convention Map

在配置文件中指定 map 规则

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings[\"deliveryServiceQueue\"]));

直接发送

public class SubmitOrderConsumer :IConsumer<SubmitOrder>{private readonly IOrderSubmitter _orderSubmitter;public SubmitOrderConsumer(IOrderSubmitter submitter)=> _orderSubmitter = submitter;public async Task Consume(IConsumeContext<SubmitOrder> context){await _orderSubmitter.Process(context.Message);await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));}}

可以调用以下对象的 publish 方法来发送 event:

  • ConsumeContext (在 Consumer 的 Consumer 方法参数中传递)
  • IPublishEndpoint(可以从 DI 中获取)
  • IBusControl(最顶层的控制对象,用来启动和停止 masstransit 的控制器)

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint){await publishEndpoint.Publish<OrderSubmitted>(new{OrderId = \"27\",OrderDate = DateTime.UtcNow,});}

Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

  • Consumer
  • IClientFactory
  • IRequestClient
  • Send a request

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context){var order = await _orderRepository.Get(context.Message.OrderId);if (order == null)throw new InvalidOperationException(\"Order not found\");await context.RespondAsync<OrderStatusResult>(new{OrderId = order.Id,order.Timestamp,order.StatusCode,order.StatusText});}

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

IClientFactory

public interface IClient56cFactory{IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);}

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest>where TRequest : class{RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);}

RequestClient 可以创建请求,或者直接获得响应

Send a request

var serviceAddress = new Uri(\"rabbitmq://localhost/check-order-status\");var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);var response = await client.GetResponse&ltad0;OrderStatusResult>(new { OrderId = id});

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。

欢迎转载、使用、重新发布,但务必保留文章署名 郑子铭 (包含链接: http://www.cnblogs.com/MingsonZheng/ ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。

如有任何疑问,请与我联系 (MingsonZheng@outlook.com) 。

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » .NET 云原生架构师训练营(模块二 基础巩固 RabbitMQ Masstransit 详解)–学习笔记