MongoDB Mastery: Advanced Features and Integration - Part 3
Explore MongoDB's advanced features including aggregation pipelines, transactions, change streams, and integration patterns with real-world applications using C# and .NET.
MongoDB Mastery: Advanced Features and Integration - Part 3
Introduction
Welcome to the final part of our MongoDB Mastery series! In Part 1, we covered setup and basics with C#, and in Part 2, we explored CRUD operations with a comprehensive e-commerce example. Now we'll dive into MongoDB's advanced features and integration patterns.
This part covers:
- Aggregation pipelines for complex data processing
- ACID transactions for multi-document operations
- Change streams for real-time data monitoring
- Advanced document design patterns and schema optimization
- Performance optimization and monitoring
- Integration patterns with ASP.NET Core applications
📚 Series Overview:
- Part 1: Setup and Getting Started
- Part 2: CRUD Operations with Real-World Examples
- Part 3: Advanced Features and Integration (This article)
Section 1: Aggregation Framework
Understanding Aggregation Pipelines
MongoDB's aggregation framework processes data through a pipeline of stages, similar to Unix pipes.
// Basic aggregation structure
var pipeline = new[]
{
new BsonDocument("$match", new BsonDocument("status", "active")),
new BsonDocument("$group", new BsonDocument
{
{ "_id", "$category" },
{ "total", new BsonDocument("$sum", "$amount") }
}),
new BsonDocument("$sort", new BsonDocument("total", -1)),
new BsonDocument("$limit", 5)
};
var results = await collection.Aggregate<BsonDocument>(pipeline).ToListAsync();
Common Aggregation Stages
$match - Filter Documents
// Filter orders from last 30 days
var thirtyDaysAgo = DateTime.UtcNow.AddDays(-30);
var matchStage = new BsonDocument("$match", new BsonDocument
{
{ "createdAt", new BsonDocument("$gte", thirtyDaysAgo) },
{ "status", new BsonDocument("$in", new BsonArray { "completed", "shipped" }) }
});
$group - Group Documents
// Group sales by product and month
var groupStage = new BsonDocument("$group", new BsonDocument
{
{ "_id", new BsonDocument
{
{ "product", "$productId" },
{ "month", new BsonDocument("$month", "$createdAt") },
{ "year", new BsonDocument("$year", "$createdAt") }
}
},
{ "totalSales", new BsonDocument("$sum", "$amount") },
{ "orderCount", new BsonDocument("$sum", 1) },
{ "averageOrder", new BsonDocument("$avg", "$amount") }
});
$project - Reshape Documents
// Reshape user documents
var projectStage = new BsonDocument("$project", new BsonDocument
{
{ "_id", 0 },
{ "fullName", new BsonDocument("$concat", new BsonArray { "$firstName", " ", "$lastName" }) },
{ "age", new BsonDocument("$floor", new BsonDocument("$divide", new BsonArray
{
new BsonDocument("$subtract", new BsonArray { DateTime.UtcNow, "$birthDate" }),
365 * 24 * 60 * 60 * 1000 // Convert to years
}))
},
{ "isAdult", new BsonDocument("$gte", new BsonArray { "$age", 18 }) }
});
$lookup - Join Collections
// Join orders with products
var lookupStage = new BsonDocument("$lookup", new BsonDocument
{
{ "from", "products" },
{ "localField", "productId" },
{ "foreignField", "_id" },
{ "as", "productDetails" }
});
$unwind - Deconstruct Arrays
// Flatten order items array
var unwindStage = new BsonDocument("$unwind", "$items");
Real-World Aggregation Example - Sales Analytics
// Services/AnalyticsService.cs
using MongoDB.Driver;
using MongoDB.Bson;
using ECommerce.Models;
namespace ECommerce.Services
{
public class AnalyticsService
{
private readonly IMongoCollection<Order> _orders;
private readonly IMongoCollection<Product> _products;
public AnalyticsService(IMongoDatabase database)
{
_orders = database.GetCollection<Order>("orders");
_products = database.GetCollection<Product>("products");
}
public async Task<List<CategorySalesSummary>> GetSalesAnalyticsAsync(DateTime startDate, DateTime endDate)
{
var pipeline = new[]
{
// Filter by date range
new BsonDocument("$match", new BsonDocument
{
{ "createdAt", new BsonDocument
{
{ "$gte", startDate },
{ "$lte", endDate }
}
},
{ "status", "completed" }
}),
// Unwind items array
new BsonDocument("$unwind", "$items"),
// Lookup product details
new BsonDocument("$lookup", new BsonDocument
{
{ "from", "products" },
{ "localField", "items.productId" },
{ "foreignField", "_id" },
{ "as", "product" }
}),
// Unwind product array
new BsonDocument("$unwind", "$product"),
// Group by product and calculate metrics
new BsonDocument("$group", new BsonDocument
{
{ "_id", "$items.productId" },
{ "productName", new BsonDocument("$first", "$product.name") },
{ "category", new BsonDocument("$first", "$product.category") },
{ "totalQuantity", new BsonDocument("$sum", "$items.quantity") },
{ "totalRevenue", new BsonDocument("$sum", new BsonDocument("$multiply", new BsonArray { "$items.quantity", "$items.price" })) },
{ "orderCount", new BsonDocument("$addToSet", "$_id") }
}),
// Calculate additional metrics
new BsonDocument("$addFields", new BsonDocument
{
{ "orderCount", new BsonDocument("$size", "$orderCount") },
{ "averageOrderValue", new BsonDocument("$divide", new BsonArray { "$totalRevenue", "$orderCount" }) }
}),
// Group by category
new BsonDocument("$group", new BsonDocument
{
{ "_id", "$category" },
{ "products", new BsonDocument("$push", new BsonDocument
{
{ "productId", "$_id" },
{ "name", "$productName" },
{ "quantity", "$totalQuantity" },
{ "revenue", "$totalRevenue" },
{ "orders", "$orderCount" },
{ "avgOrderValue", "$averageOrderValue" }
})
},
{ "categoryTotalRevenue", new BsonDocument("$sum", "$totalRevenue") },
{ "categoryTotalOrders", new BsonDocument("$sum", "$orderCount") },
{ "productCount", new BsonDocument("$sum", 1) }
}),
// Sort by revenue
new BsonDocument("$sort", new BsonDocument("categoryTotalRevenue", -1))
};
return await _orders.Aggregate<CategorySalesSummary>(pipeline).ToListAsync();
}
public async Task<List<CustomerLifetimeValue>> GetCustomerLifetimeValueAsync()
{
var pipeline = new[]
{
new BsonDocument("$match", new BsonDocument("status", "completed")),
new BsonDocument("$group", new BsonDocument
{
{ "_id", "$customerId" },
{ "totalOrders", new BsonDocument("$sum", 1) },
{ "totalSpent", new BsonDocument("$sum", "$totalAmount") },
{ "firstOrder", new BsonDocument("$min", "$createdAt") },
{ "lastOrder", new BsonDocument("$max", "$createdAt") },
{ "averageOrderValue", new BsonDocument("$avg", "$totalAmount") }
}),
new BsonDocument("$addFields", new BsonDocument
{
{ "customerLifetime", new BsonDocument("$divide", new BsonArray
{
new BsonDocument("$subtract", new BsonArray { "$lastOrder", "$firstOrder" }),
1000 * 60 * 60 * 24 // Convert to days
})
}
}),
new BsonDocument("$match", new BsonDocument("totalOrders", new BsonDocument("$gte", 2))),
new BsonDocument("$sort", new BsonDocument("totalSpent", -1))
};
return await _orders.Aggregate<CustomerLifetimeValue>(pipeline).ToListAsync();
}
public async Task<List<ProductRecommendation>> GetProductRecommendationsAsync(string productId)
{
var pipeline = new[]
{
// Find orders containing the product
new BsonDocument("$match", new BsonDocument
{
{ "items.productId", productId },
{ "status", "completed" }
}),
// Unwind items
new BsonDocument("$unwind", "$items"),
// Exclude the original product
new BsonDocument("$match", new BsonDocument("items.productId", new BsonDocument("$ne", productId))),
// Group by product and count frequency
new BsonDocument("$group", new BsonDocument
{
{ "_id", "$items.productId" },
{ "frequency", new BsonDocument("$sum", 1) },
{ "totalQuantity", new BsonDocument("$sum", "$items.quantity") }
}),
// Lookup product details
new BsonDocument("$lookup", new BsonDocument
{
{ "from", "products" },
{ "localField", "_id" },
{ "foreignField", "_id" },
{ "as", "product" }
}),
new BsonDocument("$unwind", "$product"),
// Sort by frequency
new BsonDocument("$sort", new BsonDocument("frequency", -1)),
// Limit results
new BsonDocument("$limit", 5),
new BsonDocument("$project", new BsonDocument
{
{ "_id", 0 },
{ "productId", "$_id" },
{ "name", "$product.name" },
{ "frequency", 1 },
{ "totalQuantity", 1 }
})
};
return await _orders.Aggregate<ProductRecommendation>(pipeline).ToListAsync();
}
}
// Supporting models
public class CategorySalesSummary
{
public string Category { get; set; }
public List<ProductSalesSummary> Products { get; set; }
public decimal CategoryTotalRevenue { get; set; }
public int CategoryTotalOrders { get; set; }
public int ProductCount { get; set; }
}
public class ProductSalesSummary
{
public string ProductId { get; set; }
public string Name { get; set; }
public int Quantity { get; set; }
public decimal Revenue { get; set; }
public int Orders { get; set; }
public decimal AvgOrderValue { get; set; }
}
public class CustomerLifetimeValue
{
public string CustomerId { get; set; }
public int TotalOrders { get; set; }
public decimal TotalSpent { get; set; }
public DateTime FirstOrder { get; set; }
public DateTime LastOrder { get; set; }
public decimal AverageOrderValue { get; set; }
public double CustomerLifetime { get; set; }
}
public class ProductRecommendation
{
public string ProductId { get; set; }
public string Name { get; set; }
public int Frequency { get; set; }
public int TotalQuantity { get; set; }
}
}
Section 2: ACID Transactions
Multi-Document Transactions
MongoDB supports ACID transactions across multiple documents and collections.
// Transfer money between accounts
public async Task TransferMoneyAsync(string fromAccountId, string toAccountId, decimal amount)
{
using var session = await _client.StartSessionAsync();
session.StartTransaction();
try
{
// Check sender balance
var sender = await _accounts.Find(session, a => a.Id == fromAccountId).FirstOrDefaultAsync();
if (sender.Balance < amount)
{
throw new InvalidOperationException("Insufficient funds");
}
// Debit sender
var debitUpdate = Builders<Account>.Update.Inc(a => a.Balance, -amount);
await _accounts.UpdateOneAsync(session, a => a.Id == fromAccountId, debitUpdate);
// Credit receiver
var creditUpdate = Builders<Account>.Update.Inc(a => a.Balance, amount);
await _accounts.UpdateOneAsync(session, a => a.Id == toAccountId, creditUpdate);
// Log transaction
var transaction = new Transaction
{
FromAccountId = fromAccountId,
ToAccountId = toAccountId,
Amount = amount,
Timestamp = DateTime.UtcNow
};
await _transactions.InsertOneAsync(session, transaction);
await session.CommitTransactionAsync();
Console.WriteLine("Transaction completed successfully");
}
catch (Exception ex)
{
await session.AbortTransactionAsync();
Console.WriteLine($"Transaction failed: {ex.Message}");
throw;
}
}
E-Commerce Order Processing with Transactions
// Services/OrderService.cs
using MongoDB.Driver;
using ECommerce.Models;
namespace ECommerce.Services
{
public class OrderService
{
private readonly IMongoClient _client;
private readonly IMongoCollection<Order> _orders;
private readonly IMongoCollection<Product> _products;
private readonly IMongoCollection<Payment> _payments;
public OrderService(IMongoDatabase database, IMongoClient client)
{
_client = client;
_orders = database.GetCollection<Order>("orders");
_products = database.GetCollection<Product>("products");
_payments = database.GetCollection<Payment>("payments");
}
public async Task<Order> CreateOrderAsync(Order order)
{
using var session = await _client.StartSessionAsync();
session.StartTransaction();
try
{
// Calculate total and validate stock
decimal totalAmount = 0;
var orderItems = new List<OrderItem>();
foreach (var item in order.Items)
{
var product = await _products.Find(session, p => p.Id == item.ProductId && p.IsActive).FirstOrDefaultAsync();
if (product == null)
{
throw new InvalidOperationException($"Product {item.ProductId} not found");
}
var availableStock = product.Inventory.Quantity - product.Inventory.Reserved;
if (availableStock < item.Quantity)
{
throw new InvalidOperationException($"Insufficient stock for {product.Name}");
}
// Reserve stock
var reserveUpdate = Builders<Product>.Update.Inc(p => p.Inventory.Reserved, item.Quantity);
await _products.UpdateOneAsync(session, p => p.Id == item.ProductId, reserveUpdate);
var itemTotal = product.Price * item.Quantity;
totalAmount += itemTotal;
orderItems.Add(new OrderItem
{
ProductId = item.ProductId,
Name = product.Name,
Price = product.Price,
Quantity = item.Quantity,
Total = itemTotal
});
}
// Create order
var newOrder = new Order
{
Id = ObjectId.GenerateNewId().ToString(),
CustomerId = order.CustomerId,
Items = orderItems,
TotalAmount = totalAmount,
Status = OrderStatus.Confirmed,
ShippingAddress = order.ShippingAddress,
PaymentMethod = order.PaymentMethod,
CreatedAt = DateTime.UtcNow
};
await _orders.InsertOneAsync(session, newOrder);
await session.CommitTransactionAsync();
return newOrder;
}
catch (Exception ex)
{
await session.AbortTransactionAsync();
throw new InvalidOperationException($"Order creation failed: {ex.Message}");
}
}
public async Task ProcessPaymentAsync(string orderId, PaymentInfo paymentInfo)
{
using var session = await _client.StartSessionAsync();
session.StartTransaction();
try
{
// Get order
var order = await _orders.Find(session, o => o.Id == orderId).FirstOrDefaultAsync();
if (order == null || order.Status != OrderStatus.Confirmed)
{
throw new InvalidOperationException("Order cannot be paid");
}
// Process payment (simplified)
var payment = new Payment
{
Id = ObjectId.GenerateNewId().ToString(),
OrderId = orderId,
Amount = order.TotalAmount,
Method = paymentInfo.Method,
Status = PaymentStatus.Completed,
TransactionId = paymentInfo.TransactionId,
ProcessedAt = DateTime.UtcNow
};
await _payments.InsertOneAsync(session, payment);
// Update order status
var orderUpdate = Builders<Order>.Update.Combine(
Builders<Order>.Update.Set(o => o.Status, OrderStatus.Paid),
Builders<Order>.Update.Set(o => o.PaidAt, DateTime.UtcNow),
Builders<Order>.Update.Set(o => o.PaymentId, payment.Id)
);
await _orders.UpdateOneAsync(session, o => o.Id == orderId, orderUpdate);
await session.CommitTransactionAsync();
Console.WriteLine("Payment processed successfully");
}
catch (Exception ex)
{
await session.AbortTransactionAsync();
throw new InvalidOperationException($"Payment processing failed: {ex.Message}");
}
}
public async Task ShipOrderAsync(string orderId, ShippingInfo shippingInfo)
{
using var session = await _client.StartSessionAsync();
session.StartTransaction();
try
{
// Get order
var order = await _orders.Find(session, o => o.Id == orderId).FirstOrDefaultAsync();
if (order == null || order.Status != OrderStatus.Paid)
{
throw new InvalidOperationException("Order cannot be shipped");
}
// Reduce inventory
foreach (var item in order.Items)
{
var inventoryUpdate = Builders<Product>.Update.Combine(
Builders<Product>.Update.Inc(p => p.Inventory.Quantity, -item.Quantity),
Builders<Product>.Update.Inc(p => p.Inventory.Reserved, -item.Quantity)
);
await _products.UpdateOneAsync(session, p => p.Id == item.ProductId, inventoryUpdate);
}
// Update order status
var orderUpdate = Builders<Order>.Update.Combine(
Builders<Order>.Update.Set(o => o.Status, OrderStatus.Shipped),
Builders<Order>.Update.Set(o => o.ShippedAt, DateTime.UtcNow),
Builders<Order>.Update.Set(o => o.TrackingNumber, shippingInfo.TrackingNumber),
Builders<Order>.Update.Set(o => o.Carrier, shippingInfo.Carrier)
);
await _orders.UpdateOneAsync(session, o => o.Id == orderId, orderUpdate);
await session.CommitTransactionAsync();
Console.WriteLine("Order shipped successfully");
}
catch (Exception ex)
{
await session.AbortTransactionAsync();
throw new InvalidOperationException($"Shipping failed: {ex.Message}");
}
}
}
// Supporting models
public class Order
{
public string Id { get; set; }
public string CustomerId { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
public OrderStatus Status { get; set; }
public Address ShippingAddress { get; set; }
public PaymentMethod PaymentMethod { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? PaidAt { get; set; }
public string PaymentId { get; set; }
public DateTime? ShippedAt { get; set; }
public string TrackingNumber { get; set; }
public string Carrier { get; set; }
}
public class OrderItem
{
public string ProductId { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
public int Quantity { get; set; }
public decimal Total { get; set; }
}
public enum OrderStatus
{
Confirmed,
Paid,
Shipped,
Delivered,
Cancelled
}
public class Payment
{
public string Id { get; set; }
public string OrderId { get; set; }
public decimal Amount { get; set; }
public PaymentMethod Method { get; set; }
public PaymentStatus Status { get; set; }
public string TransactionId { get; set; }
public DateTime ProcessedAt { get; set; }
}
public enum PaymentStatus
{
Pending,
Completed,
Failed,
Refunded
}
public class PaymentInfo
{
public PaymentMethod Method { get; set; }
public string TransactionId { get; set; }
}
public class ShippingInfo
{
public string TrackingNumber { get; set; }
public string Carrier { get; set; }
}
}
Section 3: Change Streams
Real-Time Data Monitoring
Change streams allow applications to subscribe to real-time changes in collections.
// Services/NotificationService.cs
using MongoDB.Driver;
using ECommerce.Models;
namespace ECommerce.Services
{
public class NotificationService : IDisposable
{
private readonly IMongoCollection<Product> _products;
private readonly List<IChangeStreamCursor<ChangeStreamDocument<Product>>> _changeStreams;
public NotificationService(IMongoDatabase database)
{
_products = database.GetCollection<Product>("products");
_changeStreams = new List<IChangeStreamCursor<ChangeStreamDocument<Product>>>();
}
public async Task StartWatchingAsync()
{
// Watch for low inventory
var inventoryPipeline = new[]
{
new BsonDocument("$match", new BsonDocument
{
{ "operationType", "update" },
{ "updateDescription.updatedFields.inventory.quantity", new BsonDocument("$exists", true) }
})
};
var inventoryStream = await _products.WatchAsync(inventoryPipeline);
inventoryStream.ForEachAsync(async change =>
{
var product = await _products.Find(p => p.Id == change.DocumentKey["_id"].AsObjectId.ToString()).FirstOrDefaultAsync();
if (product.Inventory.Quantity <= product.Inventory.LowStockThreshold)
{
await SendLowStockAlertAsync(product);
}
});
_changeStreams.Add(inventoryStream);
// Watch for new orders
var ordersCollection = _products.Database.GetCollection<Order>("orders");
var orderPipeline = new[]
{
new BsonDocument("$match", new BsonDocument("operationType", "insert"))
};
var orderStream = await ordersCollection.WatchAsync(orderPipeline);
orderStream.ForEachAsync(change =>
{
ProcessNewOrder(change.FullDocument);
return Task.CompletedTask;
});
_changeStreams.Add(orderStream);
Console.WriteLine("Notification service started");
}
public async Task SendLowStockAlertAsync(Product product)
{
// Send email, SMS, or push notification
Console.WriteLine($"Low stock alert: {product.Name} ({product.Inventory.Quantity} remaining)");
// In a real app, integrate with email service like SendGrid
// await _emailService.SendAlert("admin@example.com", "Low Stock Alert", ...);
}
public void ProcessNewOrder(Order order)
{
Console.WriteLine($"Processing new order: {order.Id}");
// Trigger order fulfillment process
// await _fulfillmentService.ProcessOrder(order);
}
public void Dispose()
{
foreach (var stream in _changeStreams)
{
stream.Dispose();
}
_changeStreams.Clear();
}
}
}
Section 4: Performance Optimization
Database Profiling
// Enable profiling
await _database.RunCommandAsync<BsonDocument>(new BsonDocument
{
{ "profile", 2 },
{ "slowms", 100 }
});
// View slow queries
var systemProfile = _database.GetCollection<BsonDocument>("system.profile");
var slowQueries = await systemProfile.Find(new BsonDocument())
.Sort(new BsonDocument("ts", -1))
.Limit(5)
.ToListAsync();
// Disable profiling
await _database.RunCommandAsync<BsonDocument>(new BsonDocument("profile", 0));
Query Optimization Techniques
// Use hint to force index usage
var result = await _products.Find(p => p.Category == "electronics")
.Hint(new BsonDocument("category", 1).Add("price", -1))
.ToListAsync();
// Analyze query performance
var explanation = await _products.Find(p => p.Price > 100).ExplainAsync();
Console.WriteLine($"Execution time: {explanation.ExecutionStats.ExecutionTime.TotalMilliseconds}ms");
Console.WriteLine($"Documents examined: {explanation.ExecutionStats.TotalDocsExamined}");
Console.WriteLine($"Documents returned: {explanation.ExecutionStats.TotalDocsReturned}");
Connection Pooling
// Program.cs (enhanced)
var mongoClientSettings = new MongoClientSettings
{
Server = new MongoServerAddress("localhost", 27017),
MaxConnectionPoolSize = 10,
MinConnectionPoolSize = 5,
MaxConnectionIdleTime = TimeSpan.FromSeconds(30),
MaxConnectionLifeTime = TimeSpan.FromMinutes(10),
WaitQueueTimeout = TimeSpan.FromSeconds(5)
};
builder.Services.AddSingleton<IMongoClient>(sp => new MongoClient(mongoClientSettings));
builder.Services.AddScoped(sp =>
{
var client = sp.GetRequiredService<IMongoClient>();
return client.GetDatabase("ECommerceDB");
});
Caching Strategies
// Services/CacheService.cs
using Microsoft.Extensions.Caching.Memory;
namespace ECommerce.Services
{
public class CacheService
{
private readonly IMemoryCache _cache;
private readonly ProductService _productService;
public CacheService(IMemoryCache cache, ProductService productService)
{
_cache = cache;
_productService = productService;
}
public async Task<Product> GetProductAsync(string productId)
{
var cacheKey = $"product_{productId}";
if (!_cache.TryGetValue(cacheKey, out Product product))
{
product = await _productService.GetProductByIdAsync(productId);
if (product != null)
{
_cache.Set(cacheKey, product, TimeSpan.FromMinutes(10));
}
}
return product;
}
public void InvalidateProduct(string productId)
{
_cache.Remove($"product_{productId}");
// Invalidate list caches
_cache.Remove("products_list");
}
public async Task<List<Product>> GetProductsAsync(ProductFilter filter = null, Pagination pagination = null)
{
var cacheKey = $"products_{filter?.GetHashCode() ?? 0}_{pagination?.GetHashCode() ?? 0}";
if (!_cache.TryGetValue(cacheKey, out List<Product> products))
{
products = await _productService.GetAllProductsAsync(filter, null, pagination);
_cache.Set(cacheKey, products, TimeSpan.FromMinutes(5));
}
return products;
}
}
}
Section 5: Advanced Document Design Patterns
Understanding Document Size Limits
MongoDB has a 16MB BSON document size limit. This is crucial for designing scalable schemas:
// ❌ Bad: Single document with unlimited reviews
public class ProductWithEmbeddedReviews
{
public string Id { get; set; }
public string Name { get; set; }
public List<Review> Reviews { get; set; } // Can grow infinitely!
}
// ✅ Good: Separate collections with references
public class Product
{
public string Id { get; set; }
public string Name { get; set; }
public int ReviewCount { get; set; }
public double AverageRating { get; set; }
}
public class Review
{
public string Id { get; set; }
public string ProductId { get; set; } // Reference to Product
public string UserId { get; set; }
public int Rating { get; set; }
public string Comment { get; set; }
public DateTime CreatedAt { get; set; }
}
When to Embed vs Reference
Embed Documents When:
- Data is frequently accessed together
- Child documents are bounded and won't grow indefinitely
- You need atomic operations on parent-child data
// Good embedding: Product specifications (bounded, accessed together)
public class Product
{
public string Id { get; set; }
public string Name { get; set; }
public ProductSpecs Specs { get; set; } // Embed - always needed with product
}
public class ProductSpecs
{
public Dimensions Dimensions { get; set; }
public List<string> Materials { get; set; }
public WarrantyInfo Warranty { get; set; }
}
Use References When:
- Documents can grow unbounded (reviews, comments, logs)
- Data is accessed independently
- You need to query child documents across parents
// Good referencing: User orders (unbounded, independent queries)
public class User
{
public string Id { get; set; }
public string Name { get; set; }
public List<string> OrderIds { get; set; } // References to Order documents
}
public class Order
{
public string Id { get; set; }
public string UserId { get; set; }
public List<OrderItem> Items { get; set; }
public decimal Total { get; set; }
public DateTime CreatedAt { get; set; }
}
Complex Scenarios: Hybrid Approaches
For complex relationships, combine embedding and referencing:
// Scenario: E-commerce with reviews and categories
public class Product
{
public string Id { get; set; }
public string Name { get; set; }
public string CategoryId { get; set; } // Reference (categories change independently)
public ProductSpecs Specs { get; set; } // Embed (always needed)
public List<string> TagIds { get; set; } // References (tags reusable)
public ReviewSummary ReviewSummary { get; set; } // Embed (computed data)
}
public class ReviewSummary
{
public int TotalReviews { get; set; }
public double AverageRating { get; set; }
public Dictionary<int, int> RatingDistribution { get; set; } // 1-5 star counts
}
// Separate collections for unbounded data
public class Review
{
public string Id { get; set; }
public string ProductId { get; set; }
public string UserId { get; set; }
public int Rating { get; set; }
public string Comment { get; set; }
public List<string> Photos { get; set; } // URLs to avoid size limits
public DateTime CreatedAt { get; set; }
}
Best Practices for Large Datasets
public class DataService
{
private readonly IMongoCollection<Product> _products;
private readonly IMongoCollection<Review> _reviews;
public async Task<ProductWithReviews> GetProductWithReviewsAsync(string productId, int page = 1, int pageSize = 10)
{
// Get product with embedded summary
var product = await _products.Find(p => p.Id == productId).FirstOrDefaultAsync();
// Get paginated reviews separately
var reviews = await _reviews
.Find(r => r.ProductId == productId)
.SortByDescending(r => r.CreatedAt)
.Skip((page - 1) * pageSize)
.Limit(pageSize)
.ToListAsync();
return new ProductWithReviews
{
Product = product,
Reviews = reviews,
CurrentPage = page,
PageSize = pageSize
};
}
public async Task AddReviewAsync(Review review)
{
using var session = await _client.StartSessionAsync();
session.StartTransaction();
try
{
// Add review
await _reviews.InsertOneAsync(session, review);
// Update product summary atomically
var update = Builders<Product>.Update
.Inc(p => p.ReviewSummary.TotalReviews, 1)
.Inc(p => p.ReviewSummary.RatingDistribution[review.Rating], 1);
// Recalculate average rating
var product = await _products.Find(session, p => p.Id == review.ProductId).FirstOrDefaultAsync();
var totalRating = product.ReviewSummary.AverageRating * product.ReviewSummary.TotalReviews + review.Rating;
var newAverage = totalRating / (product.ReviewSummary.TotalReviews + 1);
update = update.Set(p => p.ReviewSummary.AverageRating, newAverage);
await _products.UpdateOneAsync(session, p => p.Id == review.ProductId, update);
await session.CommitTransactionAsync();
}
catch
{
await session.AbortTransactionAsync();
throw;
}
}
}
Schema Design Anti-Patterns to Avoid
// ❌ Anti-pattern: Massive arrays
public class User
{
public string Id { get; set; }
public List<LogEntry> ActivityLog { get; set; } // Can grow to millions!
}
// ✅ Better: Separate collection with TTL index
public class UserActivityLog
{
public string Id { get; set; }
public string UserId { get; set; }
public string Action { get; set; }
public DateTime CreatedAt { get; set; }
}
// Create TTL index to auto-delete old logs
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<UserActivityLog>(
Builders<UserActivityLog>.IndexKeys.Ascending(x => x.Timestamp),
new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(90) }
)
);
Section 6: Integration Patterns
Repository Pattern
// Repositories/BaseRepository.cs
using MongoDB.Driver;
namespace ECommerce.Repositories
{
public abstract class BaseRepository<T> where T : class
{
protected readonly IMongoCollection<T> _collection;
protected BaseRepository(IMongoDatabase database, string collectionName)
{
_collection = database.GetCollection<T>(collectionName);
}
public virtual async Task<T> GetByIdAsync(string id)
{
return await _collection.Find(Builders<T>.Filter.Eq("_id", ObjectId.Parse(id))).FirstOrDefaultAsync();
}
public virtual async Task<List<T>> GetAllAsync(FilterDefinition<T> filter = null, SortDefinition<T> sort = null, int? limit = null)
{
filter ??= Builders<T>.Filter.Empty;
var query = _collection.Find(filter);
if (sort != null)
query = query.Sort(sort);
if (limit.HasValue)
query = query.Limit(limit);
return await query.ToListAsync();
}
public virtual async Task<T> CreateAsync(T entity)
{
await _collection.InsertOneAsync(entity);
return entity;
}
public virtual async Task<bool> UpdateAsync(string id, T entity)
{
var result = await _collection.ReplaceOneAsync(
Builders<T>.Filter.Eq("_id", ObjectId.Parse(id)),
entity
);
return result.ModifiedCount > 0;
}
public virtual async Task<bool> DeleteAsync(string id)
{
var result = await _collection.DeleteOneAsync(
Builders<T>.Filter.Eq("_id", ObjectId.Parse(id))
);
return result.DeletedCount > 0;
}
}
}
// Repositories/ProductRepository.cs
using ECommerce.Models;
namespace ECommerce.Repositories
{
public class ProductRepository : BaseRepository<Product>
{
public ProductRepository(IMongoDatabase database) : base(database, "products") { }
public async Task<List<Product>> GetByCategoryAsync(string category)
{
return await GetAllAsync(
Builders<Product>.Filter.And(
Builders<Product>.Filter.Eq(p => p.Category, category),
Builders<Product>.Filter.Eq(p => p.IsActive, true)
),
Builders<Product>.Sort.Descending(p => p.CreatedAt)
);
}
public async Task<List<Product>> GetLowStockAsync()
{
// Use aggregation for complex queries
var pipeline = new[]
{
new BsonDocument("$match", new BsonDocument("isActive", true)),
new BsonDocument("$addFields", new BsonDocument
{
{ "availableStock", new BsonDocument("$subtract", new BsonArray { "$inventory.quantity", "$inventory.reserved" }) }
}),
new BsonDocument("$match", new BsonDocument
{
{ "$expr", new BsonDocument("$lte", new BsonArray { "$availableStock", "$inventory.lowStockThreshold" }) }
}),
new BsonDocument("$sort", new BsonDocument("availableStock", 1))
};
return await _collection.Aggregate<Product>(pipeline).ToListAsync();
}
public async Task<List<Product>> SearchAsync(string searchTerm)
{
return await GetAllAsync(
Builders<Product>.Filter.And(
Builders<Product>.Filter.Eq(p => p.IsActive, true),
Builders<Product>.Filter.Text(searchTerm)
),
Builders<Product>.Sort.MetaTextScore("textScore")
);
}
}
}
Service Layer Pattern
// Services/BaseService.cs
using ECommerce.Repositories;
namespace ECommerce.Services
{
public abstract class BaseService<T> where T : class
{
protected readonly BaseRepository<T> _repository;
protected BaseService(BaseRepository<T> repository)
{
_repository = repository;
}
public virtual async Task<T> GetByIdAsync(string id)
{
var entity = await _repository.GetByIdAsync(id);
if (entity == null)
{
throw new KeyNotFoundException("Entity not found");
}
return entity;
}
public virtual async Task<List<T>> GetAllAsync()
{
return await _repository.GetAllAsync();
}
public virtual async Task<T> CreateAsync(T entity)
{
Validate(entity);
return await _repository.CreateAsync(entity);
}
public virtual async Task<bool> UpdateAsync(string id, T entity)
{
Validate(entity, isUpdate: true);
var updated = await _repository.UpdateAsync(id, entity);
if (!updated)
{
throw new KeyNotFoundException("Entity not found or no changes made");
}
return true;
}
public virtual async Task<bool> DeleteAsync(string id)
{
var deleted = await _repository.DeleteAsync(id);
if (!deleted)
{
throw new KeyNotFoundException("Entity not found");
}
return true;
}
protected virtual void Validate(T entity, bool isUpdate = false)
{
if (entity == null)
{
throw new ArgumentNullException(nameof(entity));
}
}
}
}
// Services/ProductService.cs (enhanced)
using ECommerce.Repositories;
using ECommerce.Models;
namespace ECommerce.Services
{
public class ProductService : BaseService<Product>
{
private readonly ProductRepository _productRepository;
public ProductService(ProductRepository repository) : base(repository)
{
_productRepository = repository;
}
protected override void Validate(Product product, bool isUpdate = false)
{
base.Validate(product, isUpdate);
if (!isUpdate || !string.IsNullOrEmpty(product.Name))
{
if (string.IsNullOrWhiteSpace(product.Name))
{
throw new ArgumentException("Product name is required");
}
}
if (!isUpdate || product.Price != default)
{
if (product.Price <= 0)
{
throw new ArgumentException("Valid price is required");
}
}
if (!isUpdate || !string.IsNullOrEmpty(product.Sku))
{
if (string.IsNullOrWhiteSpace(product.Sku))
{
throw new ArgumentException("SKU is required");
}
}
}
public async Task<List<Product>> GetByCategoryAsync(string category)
{
return await _productRepository.GetByCategoryAsync(category);
}
public async Task<List<Product>> GetLowStockAsync()
{
return await _productRepository.GetLowStockAsync();
}
public async Task<List<Product>> SearchAsync(string searchTerm)
{
if (string.IsNullOrWhiteSpace(searchTerm))
{
throw new ArgumentException("Search term is required");
}
return await _productRepository.SearchAsync(searchTerm.Trim());
}
}
}
Conclusion
Congratulations! You've completed the MongoDB Mastery series. In this final part, we covered:
- Aggregation Framework: Complex data processing with pipelines in C#
- ACID Transactions: Multi-document operations with consistency using sessions
- Change Streams: Real-time data monitoring with the MongoDB .NET driver
- Advanced Document Design: Schema optimization, embedding vs referencing, and size limits
- Performance Optimization: Profiling, indexing, and caching strategies
- Integration Patterns: Repository and service layer patterns for ASP.NET Core
These advanced features make MongoDB suitable for complex, production-ready applications.
🚀 What's Next?
- Build a complete application using these patterns
- Explore MongoDB Atlas for cloud deployment
- Learn about MongoDB's security features
- Consider integrating with other technologies like Redis for caching
MongoDB's flexibility and powerful features make it an excellent choice for modern applications. Keep experimenting and building!
This concludes our MongoDB Mastery series. Read Part 1: Setup and Getting Started | Read Part 2: CRUD Operations
Related Articles
Mastering ASP.NET Core Middleware: From Basics to Advanced (.NET 10)
A comprehensive deep dive into ASP.NET Core Middleware. Learn how the request pipeline works, how to build high-performance custom middleware, and implement real-world patterns like Multi-Tenancy, Request Logging, and API Key Authentication.
Full Stack Insights
Software Engineer
Passionate about software development, architecture, and sharing knowledge.
Quick Links
Full Stack Insights
Software Engineer
Passionate about software development, architecture, and sharing knowledge with the community.