Domain-Driven Design
Event Sourcing
When queues help?
Higher-than-capacity load for short periods of time (bursts)
Further action does not depend on the outcome
[HttpPost]
public async Task<ActionResult>
SubmitOrder(string orderId)
{
var order = await repo.Find(orderId);
order.Submit();
var tran = await BeginTransaction();
await repo.Save(order, tran);
await bus.Publish(new OrderSubmitted());
await tran.Commit();
}
await bus.Publish(new OrderSubmitted());
await tran.Commit();
with msg as (
select top (1) * from Msg
with (
updlock,
readpast)
order by Seq
) delete from msg output Payload
[HttpPost]
public async Task<ActionResult>
SubmitOrder(string orderId)
{
var order = await repo.Find(orderId);
order.Submit();
var tran = await BeginTransaction();
await repo.Save(order, tran);
await bus.Publish(
new OrderSubmitted(), tran);
await tran.Commit();
}
8 vCore SQL Azure
$2 per hour
800 messages/s
2 880 000 messages/h
$0.0007 per 1000 messages
Storage Queues: $0.0012 per 1000 messages
Prepare shipment and bill customer
Aggregates and entities encapsulate behavior and data
public async Task Confirm(string orderId)
{
var order = await repo.Find(orderId);
order.ConfirmPayment();
repo.Save(order);
}
Distributed business process
public class ConfirmPaymentHandler
: IHandleMessages<OrderAccepted>
{
public Task Handle(OrderAccepted msg,
IMessageHandlingContext context)
{
var order = repo.Find(msg.OrderId);
//When an order is accepted,
//confirm payment
order.ConfirmPayment();
context.Publish(
new OrderConfirmed());
}
}
public class ConfirmPaymentHandler
: IHandleMessages<ConfirmPayment>
{
public Task Handle(ConfirmPayment msg,
IMessageHandlingContext context)
{
var order = repo.Find(msg.OrderId);
order.ConfirmPayment();
//When a payment is confirmed,
//schedule shipment
context.Send(
new ScheduleShipment());
}
}
Process logic implicit in the message handler code
Events only? Commands only? Mixed?
var order = repo.Find(msg.OrderId);
order.ConfirmPayment();
if (order.Confirmed)
{
context.Send(new PrepareShipment());
}
else
{
context.Send(new CancelShipment());
}
var order = repo.Find(msg.OrderId);
var e = new Events();
e.Subscribe<PaymentConfirmed>(
_ => ctx.Send(new PrepareShipment());
)
e.Subscribe<PaymentRefused>(
_ => ctx.Send(new CancelShipment());
)
order.ConfirmPayment(e);
public void ConfirmPayment(Events e)
{
//...
if (success)
{
e.Publish(new PaymentConfirmed());
}
else
{
e.Publish(new PaymentRefused());
}
}
//Callbacks registered by the handler
ctx.Send(new PrepareShipment());
ctx.Send(new CancelShipment());
Messages Are Addressed to Entities
public class Payment : Aggregate,
IHandleMessages<ConfirmPayment>
{
public Task Handle(ConfirmPayment msg,
IMessageHandlingContext ctx)
{
this.Confirmed = true;
/* more business logic*/
ctx.Publish(new PaymentConfirmed());
}
}
public class OrderProcess : Saga,
IHandleMessages<PaymentConfirmed>
{
public Task Handle(
PaymentConfirmed msg,
IMessageHandlingContext ctx)
{
this.PaymentConfirmed = true;
if (AddressChecked)
{
ctx.Send(new PrepareShipment());
}
}
}
public class Shipment : Aggregate,
IHandleMessages<PrepareShipment>
{
public Task Handle(PrepareShipment msg,
IMessageHandlingContext ctx)
{
/* business logic*/
ctx.Publish(new ShipmentPrepared());
}
}
Message-driven state machines
var record = TryLoadOutbox(messageId);
if (record == null)
{
var transaction = BeginTransaction();
record = ProcessMessage(transaction);
Store(record, transaction);
transition.Commit();
}
if (!record.Dispatched)
{
Dispatch(record.Messages);
MarkAsDispatched(record);🔥🔥🔥
}🔥🔥🔥
var entity = LoadOrCreate(entityId);
if (!entity.Outbox.Contains(messageId))
{
entity.Handle(message);
await Save(entity);
}
var toSend = entity.Outbox[messageId];
if (toSend != null)
{
Dispatch(toSend);
entity.Outbox[messageId] = null;
await Save(entity);🔥🔥🔥
}🔥🔥🔥
var partitionId = DeriveFromMessage();
var record =
TryLoadOutbox(messageId, partitionId);
if (record == null)
{
var transaction
= BeginTransaction(partitionId);
record = ProcessMessage(transaction);
Store(record);
transition.Commit();
}
Building reliable event-driven microservices