Domain-Driven Design patterns for a distributed system

exactly-once.github.io

twitter.com/SzymonPobiega

linkedin.com/in/SzymonPobiega

Domain-Driven Design

Event Sourcing

Pony images by AndroAnimalia

Shopping cart

When queues help?

Higher-than-capacity load for short periods of time (bursts)

Further action does not depend on the outcome


[HttpPost]
public async Task<ActionResult> 
	SubmitOrder(string orderId)
{
	var order = await repo.Find(orderId);
	order.Submit();

	var tran = await BeginTransaction();
	await repo.Save(order, tran);
	await bus.Publish(new OrderSubmitted());
	await tran.Commit();
}
					


await bus.Publish(new OrderSubmitted());
await tran.Commit();
					


with msg as (
	select top (1) * from Msg 
		with (
			updlock, 
			readpast) 
		order by Seq
) delete from msg output Payload
					


[HttpPost]
public async Task<ActionResult> 
	SubmitOrder(string orderId)
{
	var order = await repo.Find(orderId);
	order.Submit();

	var tran = await BeginTransaction();
	await repo.Save(order, tran);
	await bus.Publish(
		new OrderSubmitted(), tran);
	await tran.Commit();
}
					

8 vCore SQL Azure

$2 per hour

800 messages/s

2 880 000 messages/h

$0.0007 per 1000 messages

Storage Queues: $0.0012 per 1000 messages

Polyglot messaging

Prepare shipment and bill customer

Aggregates and entities encapsulate behavior and data


public async Task Confirm(string orderId)
{
	var order = await repo.Find(orderId);
	order.ConfirmPayment();
	repo.Save(order);
}
					

Distributed business process


public class ConfirmPaymentHandler 
	: IHandleMessages<OrderAccepted>
{
	public Task Handle(OrderAccepted msg, 
		IMessageHandlingContext context)
	{
		var order = repo.Find(msg.OrderId);
		//When an order is accepted, 
		//confirm payment
		order.ConfirmPayment();
		context.Publish(
			new OrderConfirmed());
	}
}
					


public class ConfirmPaymentHandler 
	: IHandleMessages<ConfirmPayment>
{
	public Task Handle(ConfirmPayment msg, 
		IMessageHandlingContext context)
	{
		var order = repo.Find(msg.OrderId);
		order.ConfirmPayment();
		//When a payment is confirmed, 
		//schedule shipment
		context.Send(
			new ScheduleShipment());
	}
}
					

Process logic implicit in the message handler code

Events only? Commands only? Mixed?


var order = repo.Find(msg.OrderId);
order.ConfirmPayment();
if (order.Confirmed)
{
	context.Send(new PrepareShipment());
}
else
{ 
	context.Send(new CancelShipment());
}
					

udidahan.com/2008/08/25/domain-events-take-2/


var order = repo.Find(msg.OrderId);
var e = new Events();
e.Subscribe<PaymentConfirmed>(
	_ => ctx.Send(new PrepareShipment());
)
e.Subscribe<PaymentRefused>(
	_ => ctx.Send(new CancelShipment());
)

order.ConfirmPayment(e);
					


public void ConfirmPayment(Events e)
{ 
	//...
	if (success)
	{
		e.Publish(new PaymentConfirmed());
	}
	else
	{ 
		e.Publish(new PaymentRefused());
	}
}
//Callbacks registered by the handler
ctx.Send(new PrepareShipment());
ctx.Send(new CancelShipment());
					

Messages Are Addressed to Entities


public class Payment : Aggregate,
	IHandleMessages<ConfirmPayment>
{
	public Task Handle(ConfirmPayment msg, 
		IMessageHandlingContext ctx)
	{
		this.Confirmed = true;
		/* more business logic*/

		ctx.Publish(new PaymentConfirmed());
	}
}
					


public class OrderProcess : Saga,
	IHandleMessages<PaymentConfirmed>
{
	public Task Handle(
		PaymentConfirmed msg, 
		IMessageHandlingContext ctx)
	{
		this.PaymentConfirmed = true;

		if (AddressChecked)
		{
			ctx.Send(new PrepareShipment());
		}
	}
}
					


public class Shipment : Aggregate,
	IHandleMessages<PrepareShipment>
{
	public Task Handle(PrepareShipment msg, 
		IMessageHandlingContext ctx)
	{
		/* business logic*/

		ctx.Publish(new ShipmentPrepared());
	}
}
					

Message-driven state machines


var record = TryLoadOutbox(messageId);
if (record == null)
{ 
	var transaction = BeginTransaction();
	record = ProcessMessage(transaction);
	Store(record, transaction);
	transition.Commit();
}
if (!record.Dispatched)
{ 
	Dispatch(record.Messages);
	MarkAsDispatched(record);🔥🔥🔥
}🔥🔥🔥
					


var entity = LoadOrCreate(entityId);
if (!entity.Outbox.Contains(messageId))
{
	entity.Handle(message);
	await Save(entity);
}
var toSend = entity.Outbox[messageId];
if (toSend != null)
{ 
	Dispatch(toSend);
	entity.Outbox[messageId] = null;
	await Save(entity);🔥🔥🔥
}🔥🔥🔥
					


var partitionId = DeriveFromMessage();
var record = 
	TryLoadOutbox(messageId, partitionId);
if (record == null)
{ 
	var transaction 
		= BeginTransaction(partitionId);
	record = ProcessMessage(transaction);
	Store(record);
	transition.Commit();
}
					

Building reliable event-driven microservices

March 11-12

Thank you!

NDC Workshops

github.com/SzymonPobiega/NServiceBus.Router

github.com/exactly-once

exactly-once.github.io

twitter.com/SzymonPobiega

linkedin.com/in/SzymonPobiega