Sending messages to the future

Szymon Pobiega

@SzymonPobiega

Event Source all the things!

  • Command → Event
  • Event → Command
  • Rinse & repeat

Become reactive or die trying!

  • Publish and subscribe to streams of records
  • Store streams of records in a fault-tolerant durable way
  • Process streams of records as they occur

Software system is defined by

data stored in it

messages flowing through it

Time-dependent event processors distract flow

Examples?

Conventions


class StatefulMessageHandler {
    public void Correlate()
    {
        Matching<MyMessage>(m => m.OrderId);
        Matching<OtherMessage>(m => m.TransactionId);
    }
    public void Handle(SomeMessage message) {
        /*...*/
    }
    public void Handle(OtherMessage message) {
        /*...*/
    }
}  
                

Moving average


List<DataPoint> dataPoints;

void Handle(NotifyPrice message) {
   dataPoints.Add(new DataPoint(message.Price, message.Date));

   var cutOffTime = DateTime.Now - WindowSize;
   dataPoints.RemoveAll(d => d.Date < cutOffTime);
}

decimal Handle(GetAverage message) {
   return dataPoints.Average(d => d.Price);
}
               


List<DataPoint> dataPoints;

void Handle(NotifyPrice message) {
   dataPoints.Add(new DataPoint(message.Id, message.Price));

   SendToFuture(WindowSize, new RemoveDataPoint(message.Id));
}

void Handle(RemoveDataPoint message) {
   dataPoints.RemoveAll(d => d.Id > message.Id);
}

decimal Handle(GetAverage message) {
   return dataPoints.Average(d => d.Price);
}
                  

Coordinating transactions


void Handle(TransactionSubmitted message) {
   Send(message.FromAccount, new Debit(message.Amount));
   Send(message.ToAccount, new Credit(message.Amount));
   TimeoutAt = DateTime.Now + TransactionTimeout;
}

void Handle(DebitConfirmed message) {
   Debitted = true;
   if (Credited) Complete();
}

void Handle(CreditConfirmed message) {
   Credited = true;
   if (Debitted) Complete();
}
                  


SELECT * FROM Transactions 
   WHERE (Debitted = 0 OR Credited = 0) 
   AND TimeoutAt < GETUTCDATE()
                  


void Timeout() {
   Send(message.FromAccount, new CancelDebit(message.Amount));
   Send(message.ToAccount, new CancelCredit(message.Amount));
}
                  


void Handle(TransactionSubmitted message) {
   Send(message.FromAccount, new Debit(message.Amount));
   Send(message.ToAccount, new Credit(message.Amount));
   SendToFuture(TransactionTimeout, new CancelTransaction(...))
}

void Handle(DebitConfirmed message) {
   Debitted = true;
   if (Credited) Complete();
}

void Handle(CreditConfirmed message) {
   Credited = true;
   if (Debitted) Complete();
}

void Handle(CancelTransaction message) {
   Send(message.FromAccount, new CancelDebit(message.Amount));
   Send(message.ToAccount, new CancelCredit(message.Amount));
}
                  

Auction


Dictionary<string, decimal> bids;

void Handle(StartBidding message) {
   foreach (var bidder in message.Bidders) {
      Send(bidder.Name, new GetQuote());
   }
   EndTime = DateTime.UtcNow + WaitTime;
}

void Handle(Quote message)
{
   quotes[message.Bidder] = message.Quote;
}
                  


SELECT * FROM Auctions 
    WHERE EndTime > GETUTCDATE()
                  


void SendReply() {
   var bestBid = bids.Min(b => b.Value);
   ReplyToOriginator(bestBid.Key);
}
                  


Dictionary<string, decimal> bids;

void Handle(StartBidding message) {
   foreach (var bidder in message.Bidders) {
      Send(bidder.Name, new GetQuote());
   }
   SendToFuture(WaitTime, BiddingClosed());
}

void Handle(Quote message) {
   quotes[message.Bidder] = message.Quote;
}

void Handle(BiddingClosed message) {
   var bestBid = bids.Min(b => b.Value);
   ReplyToOriginator(bestBid.Key);
}
                  

Discount policy

20% if customer made two purchases in last two years


void Handle(OrderAccepted message) {
   purchases.Add(new Purchase(message.Date));
}

void Handle(CalculateDiscount message) {
   var purchasesWithinPromoTime = purchases
      .Where(p => p.Date > DateTime.Now - PromoTime).Count();
   var factor = purchasesWithinPromoTime >= 2 ? 0.8 : 1.0;
   
   Reply(new CalculateDiscountResponse(message.Value * factor));
}
                  


List<Purchase> purchases;

void Handle(OrderAccepted message) {
   purchases.Add(new Purchase(message.Id));
   SendToFuture(PromoTime, new ForgetPurchase(message.Id));
}

void Handle(ForgetPurchase message) {
   purchases.RemoveAll(p => p.Id == message.Id);
}

void Handle(CalculateDiscount message) {
   var factor = purchases.Count >= 2 ? 0.8 : 1.0;
   Reply(new CalculateDiscountResponse(message.Value * factor));
}
                  

Swamps are evil

  • Require polling or state tracking
  • Time-based logic is hard to DRY
  • Unit testing is problematic
  • Can hide important business concepts

Warping spacetime

Case study

RabbitMQ

Binding

Time-to-deliver

Defer by a fixed time span

Cascade

Recap

  • Command → Event → Command is not enough
  • Swamps disrupt the message flow
  • Rabbit holes warp spacetime

Thank you!

Szymon Pobiega

@SzymonPobiega