MediatR & CQRS: The Professional's Guide
🎯 What You'll Master:
Build enterprise-grade .NET applications with proper command/query separation, advanced pipeline behaviors, and high-performance patterns used by top-tier development teams worldwide.
📋 Executive Summary
MediatR combined with CQRS (Command Query Responsibility Segregation) represents the pinnacle of modern .NET application architecture. This pattern separates read and write operations, enabling unprecedented scalability, maintainability, and team productivity. For enterprise .NET professionals, mastering these patterns is essential for building systems that can handle millions of users while remaining maintainable and testable.
💡 Why MediatR + CQRS is Critical for Enterprise .NET
Architectural Benefits
- • Decoupling: Controllers don't know about business logic
- • Single Responsibility: Each handler does one thing perfectly
- • Testability: Independent, mockable components
- • Cross-cutting Concerns: Pipeline behaviors for common functionality
Enterprise Advantages
- • Scalability: Separate read/write optimization
- • Team Productivity: Parallel development streams
- • Maintainability: Clear separation of concerns
- • Performance: Optimized query and command handling
🎯 CQRS Foundation & MediatR Integration
Command Query Separation
Commands change state, Queries return data. Never mix them.
Commands
- • Modify application state
- • Return success/failure status
- • Can trigger business rules
- • Usually return Unit or Result<T>
- • Validated and authorized
Queries
- • Read data without side effects
- • Return specific DTOs/ViewModels
- • Optimized for performance
- • Can bypass domain layer
- • Cacheable and scalable
🚀 MediatR: The Perfect CQRS Companion
In-Process Messaging
- • Request/Response pattern
- • Loose coupling via interfaces
- • Single entry point per operation
Pipeline Behaviors
- • Cross-cutting concerns
- • Validation, logging, caching
- • Composable middleware pattern
Notifications
- • Publish/Subscribe pattern
- • Domain event handling
- • Decoupled side effects
⚙️ Professional Command Implementation
🎯 Advanced Command Patterns
// ⚙️ Professional Command with Result Pattern public record CreateProductCommand( string Name, string Description, decimal Price, string CategoryId, List Tags, ProductSpecification Specifications ) : IRequest> { // 🎯 Command-specific validation can be added here public class Validator : AbstractValidator { public Validator() { RuleFor(x => x.Name) .NotEmpty().WithMessage("Product name is required") .MaximumLength(100).WithMessage("Name must not exceed 100 characters") .Must(BeUniqueProductName).WithMessage("Product name must be unique");RuleFor(x => x.Price) .GreaterThan(0).WithMessage("Price must be greater than zero") .LessThanOrEqualTo(1000000).WithMessage("Price exceeds maximum allowed"); RuleFor(x => x.CategoryId) .NotEmpty().WithMessage("Category is required") .Must(BeValidCategory).WithMessage("Invalid category specified"); RuleFor(x => x.Tags) .Must(x => x.Count <= 10).WithMessage("Maximum 10 tags allowed") .ForEach(tag => tag.MaximumLength(50)); } private bool BeUniqueProductName(string name) { // Inject repository or use custom validator return true; // Simplified for example } private bool BeValidCategory(string categoryId) { return Guid.TryParse(categoryId, out _); } }}
// 🎯 Command Handler with Enterprise Patterns public class CreateProductCommandHandler : IRequestHandler<CreateProductCommand, Result> { private readonly IProductRepository _productRepository; private readonly ICategoryRepository _categoryRepository; private readonly IUnitOfWork _unitOfWork; private readonly ILogger _logger; private readonly IProductDomainService _productDomainService; private readonly IPublisher _publisher;
public CreateProductCommandHandler( IProductRepository productRepository, ICategoryRepository categoryRepository, IUnitOfWork unitOfWork, ILogger<CreateProductCommandHandler> logger, IProductDomainService productDomainService, IPublisher publisher) { _productRepository = productRepository; _categoryRepository = categoryRepository; _unitOfWork = unitOfWork; _logger = logger; _productDomainService = productDomainService; _publisher = publisher; } public async Task<Result<ProductResponseDto>> Handle( CreateProductCommand command, CancellationToken cancellationToken) { using var activity = ProductTelemetry.StartActivity("CreateProduct"); activity?.SetTag("product.name", command.Name); try { // 🔍 Business validation var category = await _categoryRepository.GetByIdAsync( new CategoryId(command.CategoryId), cancellationToken); if (category == null) return Result.Failure<ProductResponseDto>("Category not found"); if (!category.AcceptsNewProducts) return Result.Failure<ProductResponseDto>("Category is closed for new products"); // 🏭 Domain object creation with business rules var productResult = await _productDomainService.CreateProductAsync( command.Name, command.Description, Money.FromDecimal(command.Price, Currency.USD), category, command.Tags.Select(Tag.Create).ToList(), command.Specifications, cancellationToken); if (!productResult.IsSuccess) return Result.Failure<ProductResponseDto>(productResult.Error); var product = productResult.Value; // 💾 Persistence await _productRepository.AddAsync(product, cancellationToken); await _unitOfWork.SaveChangesAsync(cancellationToken); // 📢 Domain events await _publisher.Publish( new ProductCreatedNotification(product.Id, product.Name, product.CategoryId), cancellationToken); _logger.LogInformation( "Product {ProductId} '{ProductName}' created successfully", product.Id, product.Name); // 📤 Response mapping var response = new ProductResponseDto { Id = product.Id.Value, Name = product.Name, Description = product.Description, Price = product.Price.Amount, Currency = product.Price.Currency.Code, CategoryId = product.CategoryId.Value, Tags = product.Tags.Select(t => t.Value).ToList(), CreatedAt = product.CreatedAt, Status = product.Status.ToString() }; return Result.Success(response); } catch (Exception ex) { _logger.LogError(ex, "Failed to create product '{ProductName}'", command.Name); activity?.SetStatus(ActivityStatusCode.Error, ex.Message); return Result.Failure<ProductResponseDto>( "An error occurred while creating the product"); } }
}
🏆 Enterprise Command Patterns
Validation Strategy
- • FluentValidation for command validation
- • Domain service validation for business rules
- • Cross-field validation in validators
- • Async validation for database checks
Error Handling
- • Result pattern for explicit error handling
- • Structured logging with correlation IDs
- • Activity tracing for observability
- • Exception translation to user-friendly messages
📊 High-Performance Query Implementation
🚀 Optimized Query Patterns
// 📊 High-Performance Query with Advanced Filtering public record GetProductsQuery( int Page = 1, int PageSize = 20, string? SearchTerm = null, string? CategoryId = null, decimal? MinPrice = null, decimal? MaxPrice = null, ProductStatus? Status = null, string? SortBy = "CreatedAt", SortDirection SortDirection = SortDirection.Descending, bool IncludeInactive = false ) : IRequest>> { // 🎯 Query validation public class Validator : AbstractValidator { public Validator() { RuleFor(x => x.Page) .GreaterThan(0).WithMessage("Page must be greater than 0");RuleFor(x => x.PageSize) .InclusiveBetween(1, 100).WithMessage("Page size must be between 1 and 100"); RuleFor(x => x.SearchTerm) .MaximumLength(100).When(x => !string.IsNullOrEmpty(x.SearchTerm)); RuleFor(x => x.SortBy) .Must(BeValidSortField).WithMessage("Invalid sort field"); } private bool BeValidSortField(string? sortBy) { var validFields = new[] { "Name", "Price", "CreatedAt", "UpdatedAt" }; return string.IsNullOrEmpty(sortBy) || validFields.Contains(sortBy); } }}
// 🚀 Optimized Query Handler with Caching public class GetProductsQueryHandler : IRequestHandler>> { private readonly IProductReadRepository _productReadRepository; private readonly IMemoryCache _cache; private readonly ILogger logger; private static readonly string CacheKeyPrefix = "products_query";
public GetProductsQueryHandler( IProductReadRepository productReadRepository, IMemoryCache cache, ILogger<GetProductsQueryHandler> logger) { _productReadRepository = productReadRepository; _cache = cache; _logger = logger; } public async Task<Result<PagedResponse<ProductListItemDto>>> Handle( GetProductsQuery query, CancellationToken cancellationToken) { using var activity = ProductTelemetry.StartActivity("GetProducts"); // 🔑 Cache key generation var cacheKey = GenerateCacheKey(query); // 💨 Try cache first for read-heavy scenarios if (_cache.TryGetValue(cacheKey, out PagedResponse<ProductListItemDto>? cachedResult)) { _logger.LogDebug("Cache hit for products query: {CacheKey}", cacheKey); activity?.SetTag("cache.hit", true); return Result.Success(cachedResult!); } try { // 🏗️ Query specification pattern var specification = new ProductSearchSpecification(query); // 📊 Execute query with projection var (products, totalCount) = await _productReadRepository .GetPagedAsync<ProductListItemDto>( specification, query.Page, query.PageSize, cancellationToken); // 📦 Build response var response = new PagedResponse<ProductListItemDto> { Data = products, Page = query.Page, PageSize = query.PageSize, TotalCount = totalCount, TotalPages = (int)Math.Ceiling((double)totalCount / query.PageSize) }; // 💾 Cache the result var cacheOptions = new MemoryCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(5), SlidingExpiration = TimeSpan.FromMinutes(2), Priority = CacheItemPriority.Normal }; _cache.Set(cacheKey, response, cacheOptions); _logger.LogDebug( "Products query executed: {TotalCount} total, {ReturnedCount} returned", totalCount, products.Count()); activity?.SetTag("query.total_count", totalCount); activity?.SetTag("cache.hit", false); return Result.Success(response); } catch (Exception ex) { _logger.LogError(ex, "Failed to execute products query"); activity?.SetStatus(ActivityStatusCode.Error, ex.Message); return Result.Failure<PagedResponse<ProductListItemDto>>( "An error occurred while retrieving products"); } } private static string GenerateCacheKey(GetProductsQuery query) { var keyBuilder = new StringBuilder(CacheKeyPrefix); keyBuilder.Append($"p{query.Page}_s{query.PageSize}"); if (!string.IsNullOrEmpty(query.SearchTerm)) keyBuilder.Append($"_st{query.SearchTerm.GetHashCode()}"); if (!string.IsNullOrEmpty(query.CategoryId)) keyBuilder.Append($"_c{query.CategoryId}"); if (query.MinPrice.HasValue) keyBuilder.Append($"_minp{query.MinPrice.Value}"); if (query.MaxPrice.HasValue) keyBuilder.Append($"_maxp{query.MaxPrice.Value}"); if (query.Status.HasValue) keyBuilder.Append($"_st{query.Status.Value}"); keyBuilder.Append($"_sb{query.SortBy}_sd{query.SortDirection}"); keyBuilder.Append($"_ia{query.IncludeInactive}"); return keyBuilder.ToString(); }
}
⚡ Query Optimization Strategies
Performance Patterns
- • Specification pattern for dynamic queries
- • Projection to DTOs to reduce data transfer
- • Memory caching for frequently accessed data
- • Database query optimization with proper indexing
Scalability Features
- • Read replicas for query distribution
- • Redis caching for distributed scenarios
- • ElasticSearch integration for complex searches
- • CDN caching for static query results
🔧 Advanced Pipeline Behaviors
⚙️ Cross-Cutting Concerns Implementation
// 🔧 Validation Pipeline Behavior public class ValidationBehavior : IPipelineBehavior where TRequest : IRequest { private readonly IEnumerable> _validators; private readonly ILogger> _logger;public ValidationBehavior( IEnumerable<IValidator<TRequest>> validators, ILogger<ValidationBehavior<TRequest, TResponse>> logger) { _validators = validators; _logger = logger; } public async Task<TResponse> Handle( TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken) { if (!_validators.Any()) return await next(); var context = new ValidationContext<TRequest>(request); // 🔍 Execute all validators in parallel var validationTasks = _validators .Select(v => v.ValidateAsync(context, cancellationToken)); var validationResults = await Task.WhenAll(validationTasks); // 📋 Collect all validation failures var failures = validationResults .Where(r => !r.IsValid) .SelectMany(r => r.Errors) .Where(f => f != null) .GroupBy( x => x.PropertyName, x => x.ErrorMessage, (propertyName, errorMessages) => new { Key = propertyName, Values = errorMessages.Distinct().ToArray() }) .ToDictionary(x => x.Key, x => x.Values); if (failures.Any()) { _logger.LogWarning( "Validation failed for {RequestType}: {ValidationErrors}", typeof(TRequest).Name, string.Join(", ", failures.Select(f => $"{f.Key}: [{string.Join(", ", f.Value)}]"))); throw new ValidationException(failures); } return await next(); }}
// 📊 Performance Monitoring Behavior public class PerformanceBehavior : IPipelineBehavior where TRequest : IRequest { private readonly ILogger> _logger; private readonly IMetrics _metrics; private const int SlowRequestThresholdMs = 500;
public PerformanceBehavior( ILogger<PerformanceBehavior<TRequest, TResponse>> logger, IMetrics metrics) { _logger = logger; _metrics = metrics; } public async Task<TResponse> Handle( TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken) { var requestName = typeof(TRequest).Name; var stopwatch = Stopwatch.StartNew(); using var activity = ActivitySource.StartActivity($"MediatR.{requestName}"); activity?.SetTag("request.type", requestName); try { var response = await next(); stopwatch.Stop(); var elapsedMs = stopwatch.ElapsedMilliseconds; // 📈 Record metrics _metrics.Measure.Counter.Increment( "mediatr_requests_total", new MetricTags("request_type", requestName, "status", "success")); _metrics.Measure.Histogram.Update( "mediatr_request_duration_ms", elapsedMs, new MetricTags("request_type", requestName)); // 🐌 Log slow requests if (elapsedMs > SlowRequestThresholdMs) { _logger.LogWarning( "Slow request detected: {RequestType} took {ElapsedMs}ms to complete. Request: {@Request}", requestName, elapsedMs, request); } else { _logger.LogDebug( "Request {RequestType} completed in {ElapsedMs}ms", requestName, elapsedMs); } activity?.SetTag("request.duration_ms", elapsedMs); activity?.SetStatus(ActivityStatusCode.Ok); return response; } catch (Exception ex) { stopwatch.Stop(); _metrics.Measure.Counter.Increment( "mediatr_requests_total", new MetricTags("request_type", requestName, "status", "error")); _logger.LogError(ex, "Request {RequestType} failed after {ElapsedMs}ms. Request: {@Request}", requestName, stopwatch.ElapsedMilliseconds, request); activity?.SetStatus(ActivityStatusCode.Error, ex.Message); throw; } }}
// 🔄 Caching Behavior for Queries public class CachingBehavior : IPipelineBehavior where TRequest : IRequest, ICacheableQuery { private readonly IDistributedCache _cache; private readonly ILogger> _logger; private readonly IJsonSerializer _serializer;
public CachingBehavior( IDistributedCache cache, ILogger<CachingBehavior<TRequest, TResponse>> logger, IJsonSerializer serializer) { _cache = cache; _logger = logger; _serializer = serializer; } public async Task<TResponse> Handle( TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken) { var cacheKey = request.CacheKey; // 🔍 Try to get from cache var cachedResponse = await _cache.GetStringAsync(cacheKey, cancellationToken); if (!string.IsNullOrEmpty(cachedResponse)) { _logger.LogDebug("Cache hit for key: {CacheKey}", cacheKey); return _serializer.Deserialize<TResponse>(cachedResponse)!; } _logger.LogDebug("Cache miss for key: {CacheKey}", cacheKey); // 📤 Execute the actual handler var response = await next(); // 💾 Cache the response var serializedResponse = _serializer.Serialize(response); var cacheOptions = new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = request.CacheDuration }; await _cache.SetStringAsync( cacheKey, serializedResponse, cacheOptions, cancellationToken); _logger.LogDebug("Response cached for key: {CacheKey}", cacheKey); return response; }
}
🚀 Pipeline Behavior Benefits
Separation of Concerns
- • Validation logic separated from business logic
- • Logging and metrics automatically applied
- • Caching transparent to handlers
- • Transaction management centralized
Composability
- • Behaviors can be combined and ordered
- • Conditional behavior application
- • Easy to add new cross-cutting concerns
- • Testable behavior isolation
📢 Domain Events & Notifications
🔔 Event-Driven Architecture Integration
// 📢 Domain Event Notification public record ProductCreatedNotification( Guid ProductId, string ProductName, Guid CategoryId, decimal Price, DateTime CreatedAt ) : INotification;// 🎯 Multiple handlers for single event public class ProductCreatedEmailHandler : INotificationHandler { private readonly IEmailService _emailService; private readonly IUserRepository _userRepository; private readonly ILogger _logger;
public ProductCreatedEmailHandler( IEmailService emailService, IUserRepository userRepository, ILogger<ProductCreatedEmailHandler> logger) { _emailService = emailService; _userRepository = userRepository; _logger = logger; } public async Task Handle( ProductCreatedNotification notification, CancellationToken cancellationToken) { try { // 📧 Send notification emails to interested users var subscribedUsers = await _userRepository .GetUsersSubscribedToCategoryAsync( notification.CategoryId, cancellationToken); var emailTasks = subscribedUsers.Select(user => _emailService.SendProductNotificationAsync( user.Email, notification.ProductName, notification.Price, cancellationToken)); await Task.WhenAll(emailTasks); _logger.LogInformation( "Product creation emails sent to {UserCount} users for product {ProductId}", subscribedUsers.Count(), notification.ProductId); } catch (Exception ex) { _logger.LogError(ex, "Failed to send product creation emails for product {ProductId}", notification.ProductId); // Don't rethrow - this is a side effect, not critical to main operation } }}
public class ProductCreatedAnalyticsHandler : INotificationHandler { private readonly IAnalyticsService _analyticsService; private readonly ILogger _logger;
public ProductCreatedAnalyticsHandler( IAnalyticsService analyticsService, ILogger<ProductCreatedAnalyticsHandler> logger) { _analyticsService = analyticsService; _logger = logger; } public async Task Handle( ProductCreatedNotification notification, CancellationToken cancellationToken) { try { // 📊 Track product creation analytics await _analyticsService.TrackEventAsync("product_created", new { product_id = notification.ProductId, category_id = notification.CategoryId, price = notification.Price, created_at = notification.CreatedAt }, cancellationToken); _logger.LogDebug( "Analytics tracked for product creation: {ProductId}", notification.ProductId); } catch (Exception ex) { _logger.LogError(ex, "Failed to track analytics for product {ProductId}", notification.ProductId); } }}
public class ProductCreatedSearchIndexHandler : INotificationHandler { private readonly ISearchIndexService _searchIndexService; private readonly IProductRepository _productRepository; private readonly ILogger _logger;
public ProductCreatedSearchIndexHandler( ISearchIndexService searchIndexService, IProductRepository productRepository, ILogger<ProductCreatedSearchIndexHandler> logger) { _searchIndexService = searchIndexService; _productRepository = productRepository; _logger = logger; } public async Task Handle( ProductCreatedNotification notification, CancellationToken cancellationToken) { try { // 🔍 Add product to search index var product = await _productRepository.GetByIdAsync( new ProductId(notification.ProductId), cancellationToken); if (product != null) { var searchDocument = new ProductSearchDocument { Id = product.Id.Value, Name = product.Name, Description = product.Description, Price = product.Price.Amount, CategoryId = product.CategoryId.Value, Tags = product.Tags.Select(t => t.Value).ToArray(), CreatedAt = product.CreatedAt }; await _searchIndexService.IndexProductAsync( searchDocument, cancellationToken); _logger.LogInformation( "Product {ProductId} indexed for search", notification.ProductId); } } catch (Exception ex) { _logger.LogError(ex, "Failed to index product {ProductId} for search", notification.ProductId); } }
}
⚡ Event-Driven Benefits
Loose Coupling
- • Core business logic unaware of side effects
- • Easy to add new event handlers
- • Handlers can be deployed independently
- • Failure isolation between handlers
Scalability
- • Asynchronous processing of side effects
- • Handlers can run on different servers
- • Event sourcing integration possible
- • Message queue integration for reliability
📋 Enterprise Implementation Checklist
⚙️ Commands Setup
📊 Queries Setup
🔧 Pipeline Behaviors
📢 Events & Notifications
📊 Performance & Monitoring
📈 Metrics & Observability
Key Metrics to Track:
- • Request duration per handler type
- • Request success/failure rates
- • Validation failure frequencies
- • Cache hit/miss ratios
- • Event processing latencies
Monitoring Tools:
- • Application Insights for Azure
- • OpenTelemetry for cross-platform
- • Prometheus + Grafana for metrics
- • ELK Stack for log analysis
- • Custom dashboard creation
⚡ Performance Optimization
Command Optimization:
- • Bulk operations for multiple items
- • Background processing for heavy operations
- • Database transaction optimization
- • Minimal data loading strategies
Query Optimization:
- • Multi-level caching strategies
- • Database query optimization
- • Projection to minimize data transfer
- • Pagination for large result sets
🔧 Production Considerations
Reliability Patterns:
- • Circuit breaker for external dependencies
- • Retry policies with exponential backoff
- • Timeout configuration for all operations
- • Health checks for critical dependencies
Security Considerations:
- • Authorization in pipeline behaviors
- • Input sanitization and validation
- • Audit logging for sensitive operations
- • Rate limiting to prevent abuse
Master MediatR & CQRS, Build Scalable Excellence
MediatR and CQRS aren't just patterns—they're the foundation of modern, scalable .NET applications. By mastering these techniques, you'll build systems that can handle enterprise-scale demands while remaining maintainable and testable.
Advanced Topics
- • Event sourcing with MediatR
- • Distributed CQRS patterns
- • Performance optimization techniques
- • Integration testing strategies
Essential Libraries
- • MediatR.Extensions.Microsoft.DependencyInjection
- • FluentValidation.DependencyInjectionExtensions
- • Microsoft.Extensions.Caching.Memory
- • System.Diagnostics.Activity
Best Practices
- • Single responsibility per handler
- • Immutable request/response objects
- • Pipeline behaviors for cross-cutting
- • Result pattern for error handling