Implementing the Outbox Pattern for Reliable Messaging in .NET Modular Monoliths Architecture
We’re going to explore how to implement the Outbox Pattern for reliable messaging in a Modular Monoliths architecture, focusing specifically on the BasketCheckout use case in our EShop application.
By the end of this tutorial, you’ll understand how to ensure reliable communication between modules, avoid message loss, and maintain consistent states across your application. Let’s get started! 🚀
Why Use the Outbox Pattern?
In a distributed system or even within a modular monolith, ensuring reliable communication between different modules or services is crucial.
Traditional messaging approaches can suffer from issues like message loss, duplication, and inconsistent states, especially when trying to update a database and send a message as part of the same transaction which we called Dual Write Problem. This is where the Outbox Pattern comes in.
What is the Outbox Pattern?
The Outbox Pattern ensures that messages are stored in a persistent database before being sent to a message broker. This guarantees that messages are not lost, even if the system crashes, providing a more reliable messaging solution.
Key Benefits:
- Reliability: Messages are safely stored before sending, avoiding data loss.
- Consistency: Maintains consistency between database operations and message sending.
- Decoupling: Modules can operate independently, reducing dependencies.
Our Main Use Case: Basket Checkout
To demonstrate the Outbox Pattern, we’ll focus on the BasketCheckout process in our EShop Modular Monolith Application (MMA).
Understanding the BasketCheckout Workflow
Here’s a high-level overview of how the BasketCheckout process works using the Outbox Pattern:
1- BasketCheckout Command: The client application sends a BasketCheckout command.
2- Database Transaction: The Basket module performs its operations in a single database transaction. This includes:
- Removing the basket from the Redis database.
- Saving the BasketCheckout event into an Outbox table.
3- Outbox Processing: A background service reads the Outbox table and publishes the BasketCheckout integration event to RabbitMQ using MassTransit.
4- Subscriber Module: The Ordering module subscribes to this event, processes it, and creates an order in the PostgreSQL database under the ordering schema.
Step-by-Step Implementation Guide
Let’s go through the steps required to implement the Outbox Pattern in our BasketCheckoutHandler:
Step 1: Creating the Outbox Entity
First, we’ll define an OutboxMessage
entity to store messages that need to be sent.
namespace Basket.Basket.Models;
public class OutboxMessage : Entity<Guid>
{
public string Type { get; set; } = default!;
public string Content { get; set; } = default!;
public DateTime OccuredOn { get; set; } = default!;
public DateTime? ProcessedOn { get; set; } = default!;
}
- Type: Represents the type of the event being stored.
- Content: The JSON payload of the event.
- OccuredOn: Timestamp of when the event occurred.
- ProcessedOn: Timestamp of when the message was processed.
Step 2: Modifying the DbContext
Next, we need to update our BasketDbContext
to include the OutboxMessage
DbSet.
namespace Basket.Data;
public class BasketDbContext : DbContext
{
public BasketDbContext(DbContextOptions<BasketDbContext> options)
: base(options) { }
public DbSet<ShoppingCart> ShoppingCarts => Set<ShoppingCart>();
public DbSet<ShoppingCartItem> ShoppingCartItems => Set<ShoppingCartItem>();
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
protected override void OnModelCreating(ModelBuilder builder)
{
builder.HasDefaultSchema("basket");
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
base.OnModelCreating(builder);
}
}
- DbSet<OutboxMessage>: Allows CRUD operations on the OutboxMessages table.
- Schema and Configurations: Ensures that the database schema for the Basket module is correctly set and configurations are applied.
Step 3: Update CheckoutBasketHandler
Now, let’s modify the CheckoutBasketHandler
to save the message to the Outbox table instead of publishing it directly. We will ensure that saving the Outbox message and deleting the basket are part of the same atomic transaction.
using Shared.Messaging.Events;
using System.Text.Json;
namespace Basket.Basket.Features.CheckoutBasket;
public record CheckoutBasketCommand(BasketCheckoutDto BasketCheckout)
: ICommand<CheckoutBasketResult>;
public record CheckoutBasketResult(bool IsSuccess);
internal class CheckoutBasketHandler(BasketDbContext dbContext)
: ICommandHandler<CheckoutBasketCommand, CheckoutBasketResult>
{
public async Task<CheckoutBasketResult> Handle(CheckoutBasketCommand command, CancellationToken cancellationToken)
{
await using var transaction =
await dbContext.Database.BeginTransactionAsync(cancellationToken);
try
{
// Retrieve basket
var basket = await dbContext.ShoppingCarts
.Include(x => x.Items)
.SingleOrDefaultAsync(x => x.UserName == command.BasketCheckout.UserName, cancellationToken);
if (basket == null)
{
throw new BasketNotFoundException(command.BasketCheckout.UserName);
}
// Create event message
var eventMessage = command.BasketCheckout.Adapt<BasketCheckoutIntegrationEvent>();
eventMessage.TotalPrice = basket.TotalPrice;
// Save to outbox
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = typeof(BasketCheckoutIntegrationEvent).AssemblyQualifiedName!,
Content = JsonSerializer.Serialize(eventMessage),
OccuredOn = DateTime.UtcNow
};
dbContext.OutboxMessages.Add(outboxMessage);
// Delete basket
dbContext.ShoppingCarts.Remove(basket);
await dbContext.SaveChangesAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
return new CheckoutBasketResult(true);
}
catch
{
await transaction.RollbackAsync(cancellationToken);
return new CheckoutBasketResult(false);
}
}
}
- DbContext Injection: We inject
BasketDbContext
directly into the handler to manage transactions. - Begin Transaction: We start a new database transaction using
dbContext.Database.BeginTransactionAsync
. - Retrieve Basket: Fetches the existing basket from the database.
- Create Event Message: Creates a
BasketCheckoutIntegrationEvent
from the DTO and sets the total price. - Save to Outbox: Creates a new
OutboxMessage
and adds it to the database context. - Delete Basket: Removes the basket from the database context.
- Commit Transaction: Saves changes and commits the transaction to ensure atomicity.
- Rollback Transaction: In case of any exception, we roll back the transaction to maintain consistency.
Step 4: Create a Background Service to Process Outbox Messages
Finally, we need to create a background service that will periodically check the Outbox table, process unprocessed messages, and publish them to RabbitMQ. Creating the OutboxProcessor Class:
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
namespace Basket.Data.Processors;
public class OutboxProcessor
(IServiceProvider serviceProvider, IBus bus, ILogger<OutboxProcessor> logger)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<BasketDbContext>();
var outboxMessages = await dbContext.OutboxMessages
.Where(m => m.ProcessedOn == null)
.ToListAsync(stoppingToken);
foreach (var message in outboxMessages)
{
var eventType = Type.GetType(message.Type);
if (eventType == null)
{
logger.LogWarning("Could not resolve type: {Type}", message.Type);
continue;
}
var eventMessage = JsonSerializer.Deserialize(message.Content, eventType);
if (eventMessage == null)
{
logger.LogWarning("Could not deserialize message: {Content}", message.Content);
continue;
}
await bus.Publish(eventMessage, stoppingToken);
message.ProcessedOn = DateTime.UtcNow;
logger.LogInformation("Successfully processed outbox message with ID: {Id}", message.Id);
}
await dbContext.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing outbox messages");
}
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); // Adjust the delay as needed
}
}
}
- The service periodically checks the
OutboxMessages
table for messages that have not yet been processed. This is determined by checking if theProcessedOn
column isnull
. - It uses Entity Framework Core (
BasketDbContext
) to query the outbox table and retrieve all unprocessed messages. - For each unprocessed message, the service determines the event type using the
Type
stored in theOutboxMessage
entity. - It deserializes the message content into the appropriate event type and then publishes this event to RabbitMQ using MassTransit.
- After successfully publishing a message, the service sets the
ProcessedOn
column to the current UTC time. This marks the message as processed, ensuring it won't be sent again. - The service includes error handling to manage any exceptions that occur during message processing.
- The service waits for a specified delay (10 seconds in this example) before checking for new messages again. This allows for continuous, periodic processing of outbox messages.
Registering the OutboxProcessor
Don’t forget to register OutboxProcessor
as a hosted service in your application startup:
// In BasketModule.cs
services.AddHostedService<OutboxProcessor>();
Conclusion
In this tutorial, we implemented the Outbox Pattern for reliable messaging in our modular monolith architecture:
- Created the Outbox Entity: We defined the
OutboxMessage
entity to store messages that need to be sent. - Modified the DbContext: Updated the
BasketDbContext
to include theOutboxMessage
DbSet and created the necessary migrations. - Updated the CheckoutBasketHandler: Ensured that the basket checkout operation and message saving were part of the same atomic transaction.
- Created a Background Service: Implemented an
OutboxProcessor
to process outbox messages and publish them to RabbitMQ.
This is step-by-step development of reference Modular Monoltihs Architecture on .NET used ASP.NET Web API, Docker, PostgreSQL, Redis, RabbitMQ, Keycloak, Seq, MassTransit, Entity Framework Core, CQRS, MediatR, DDD, Vertical Slice Architecture and Outbox Pattern implementation with using latest features of .NET 8 and C# 12.