Aggregation Pipelines
Examples showing how to use Tnuctipun’s type-safe filters and projections in MongoDB aggregation pipelines.
Basic Pipeline with $match and $project
use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};
use bson::doc;
#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
pub id: String,
pub user_id: String,
pub status: String,
pub total_amount: f64,
pub created_at: bson::DateTime,
pub items: Vec<OrderItem>,
}
#[derive(Debug, Serialize, Deserialize)]
struct OrderItem {
pub product_id: String,
pub quantity: i32,
pub price: f64,
}
async fn basic_aggregation_example(
collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
// Type-safe $match stage
let match_filter = empty::<Order>()
.eq::<order_fields::Status, _>("completed".to_string())
.gte::<order_fields::TotalAmount, _>(100.0)
.and();
// Type-safe $project stage
let project_doc = projection::empty::<Order>()
.includes::<order_fields::Id>()
.includes::<order_fields::UserId>()
.includes::<order_fields::TotalAmount>()
.includes::<order_fields::CreatedAt>()
.excludes::<order_fields::Items>() // Exclude large array
.build();
let pipeline = vec![
doc! { "$match": match_filter },
doc! { "$project": project_doc },
doc! { "$sort": { "total_amount": -1 } },
doc! { "$limit": 50 }
];
let cursor = collection.aggregate(pipeline, None).await?;
// Note: try_collect requires futures TryStreamExt trait
// Using simplified approach for this example
let mut results = Vec::new();
// cursor iteration would be implemented here
Ok(results)
}
Complex Aggregation with Multiple Stages
use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
pub id: String,
pub user_id: String,
pub status: String,
pub total_amount: f64,
pub created_at: bson::DateTime,
}
async fn complex_sales_analysis(
order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
// Filter for recent completed orders
// Note: For time-based filtering, would need proper date handling
let match_filter = empty::<Order>()
.eq::<order_fields::Status, _>("completed".to_string())
// .gte::<order_fields::CreatedAt, _>(recent_date) // Commented for API compatibility
.and();
// Project relevant fields for analysis
let project_doc = projection::empty::<Order>()
.includes::<order_fields::UserId>()
.includes::<order_fields::TotalAmount>()
.includes::<order_fields::CreatedAt>()
.build();
let pipeline = vec![
doc! { "$match": match_filter },
doc! { "$project": project_doc },
// Group by user to calculate user metrics
doc! { "$group": {
"_id": "$user_id",
"total_orders": { "$sum": 1 },
"total_spent": { "$sum": "$total_amount" },
"avg_order_value": { "$avg": "$total_amount" },
"first_order": { "$min": "$created_at" },
"last_order": { "$max": "$created_at" }
}},
// Filter for high-value customers
doc! { "$match": {
"total_spent": { "$gte": 500.0 },
"total_orders": { "$gte": 3 }
}},
// Add customer segment classification
doc! { "$addFields": {
"customer_segment": {
"$switch": {
"branches": [
{
"case": { "$gte": ["$total_spent", 2000.0] },
"then": "premium"
},
{
"case": { "$gte": ["$total_spent", 1000.0] },
"then": "gold"
}
],
"default": "silver"
}
}
}},
// Sort by total spent
doc! { "$sort": { "total_spent": -1 } },
// Limit results
doc! { "$limit": 100 }
];
let cursor = order_collection.aggregate(pipeline, None).await?;
// Note: try_collect requires futures TryStreamExt trait
// Using simplified approach for this example
let mut results = Vec::new();
// cursor iteration would be implemented here
Ok(results)
}
Lookup and Join Operations
use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct User {
pub id: String,
pub name: String,
pub email: String,
pub registration_date: bson::DateTime,
}
#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
pub id: String,
pub user_id: String,
pub status: String,
pub total_amount: f64,
pub created_at: bson::DateTime,
}
async fn user_order_analytics(
user_collection: &mongodb::Collection<User>,
order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
// Start with active users
let user_filter = empty::<User>()
.ne::<user_fields::Email, _>("".to_string()) // Valid email
.and();
let user_projection = projection::empty::<User>()
.includes::<user_fields::Id>()
.includes::<user_fields::Name>()
.includes::<user_fields::Email>()
.includes::<user_fields::RegistrationDate>()
.build();
let pipeline = vec![
doc! { "$match": user_filter },
doc! { "$project": user_projection },
// Lookup orders for each user
doc! { "$lookup": {
"from": "orders",
"localField": "id",
"foreignField": "user_id",
"as": "orders"
}},
// Filter users who have completed orders
doc! { "$match": {
"orders": { "$ne": [] }
}},
// Add computed fields
doc! { "$addFields": {
"total_orders": { "$size": "$orders" },
"completed_orders": {
"$size": {
"$filter": {
"input": "$orders",
"cond": { "$eq": ["$$this.status", "completed"] }
}
}
},
"total_spent": {
"$sum": {
"$map": {
"input": {
"$filter": {
"input": "$orders",
"cond": { "$eq": ["$$this.status", "completed"] }
}
},
"in": "$$this.total_amount"
}
}
}
}},
// Filter for customers with meaningful activity
doc! { "$match": {
"completed_orders": { "$gte": 1 },
"total_spent": { "$gte": 50.0 }
}},
// Sort by total spent
doc! { "$sort": { "total_spent": -1 } },
// Project final result
doc! { "$project": {
"name": 1,
"email": 1,
"registration_date": 1,
"total_orders": 1,
"completed_orders": 1,
"total_spent": 1,
"avg_order_value": { "$divide": ["$total_spent", "$completed_orders"] }
}}
];
let cursor = user_collection.aggregate(pipeline, None).await?;
// Note: try_collect requires futures TryStreamExt trait
// Using simplified approach for this example
let results = Vec::new();
Ok(results)
}
Time-based Analytics
use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
pub id: String,
pub user_id: String,
pub status: String,
pub total_amount: f64,
pub created_at: bson::DateTime,
}
async fn monthly_sales_trends(
order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
// Filter for the last year
let one_year_ago = bson::DateTime::from_millis(
bson::DateTime::now().timestamp_millis() - (365 * 24 * 60 * 60 * 1000)
);
let time_filter = empty::<Order>()
.eq::<order_fields::Status, _>("completed".to_string())
.gte::<order_fields::CreatedAt, _>(one_year_ago)
.and();
let project_doc = projection::empty::<Order>()
.includes::<order_fields::TotalAmount>()
.includes::<order_fields::CreatedAt>()
.build();
let pipeline = vec![
doc! { "$match": time_filter },
doc! { "$project": project_doc },
// Group by year-month
doc! { "$group": {
"_id": {
"year": { "$year": "$created_at" },
"month": { "$month": "$created_at" }
},
"total_revenue": { "$sum": "$total_amount" },
"order_count": { "$sum": 1 },
"avg_order_value": { "$avg": "$total_amount" },
"max_order_value": { "$max": "$total_amount" },
"min_order_value": { "$min": "$total_amount" }
}},
// Sort by year-month
doc! { "$sort": { "_id.year": 1, "_id.month": 1 } },
// Add month name for readability
doc! { "$addFields": {
"month_name": {
"$arrayElemAt": [
["", "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"],
"$_id.month"
]
},
"year_month": {
"$concat": [
{ "$toString": "$_id.year" },
"-",
{ "$toString": "$_id.month" }
]
}
}}
];
let cursor = order_collection.aggregate(pipeline, None).await?;
// Note: try_collect requires futures TryStreamExt trait
// Using simplified approach for this example
let results = Vec::new();
Ok(results)
}
Geographic Analysis
use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
pub id: String,
pub user_id: String,
pub status: String,
pub total_amount: f64,
pub shipping_address: Address,
pub created_at: bson::DateTime,
}
#[derive(Debug, Serialize, Deserialize)]
struct Address {
pub country: String,
pub state: Option<String>,
pub city: String,
}
async fn geographic_sales_analysis(
order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
let completed_filter = empty::<Order>()
.eq::<order_fields::Status, _>("completed".to_string())
.and();
let pipeline = vec![
doc! { "$match": completed_filter },
// Group by country
doc! { "$group": {
"_id": "$shipping_address.country",
"total_orders": { "$sum": 1 },
"total_revenue": { "$sum": "$total_amount" },
"avg_order_value": { "$avg": "$total_amount" },
"unique_customers": { "$addToSet": "$user_id" }
}},
// Add customer count
doc! { "$addFields": {
"customer_count": { "$size": "$unique_customers" }
}},
// Remove the unique_customers array (we only needed the count)
doc! { "$project": {
"unique_customers": 0
}},
// Filter for countries with meaningful volume
doc! { "$match": {
"total_orders": { "$gte": 10 }
}},
// Sort by revenue
doc! { "$sort": { "total_revenue": -1 } },
// Add ranking
doc! { "$group": {
"_id": null,
"countries": { "$push": "$$ROOT" }
}},
doc! { "$unwind": {
"path": "$countries",
"includeArrayIndex": "rank"
}},
doc! { "$replaceRoot": {
"newRoot": {
"$mergeObjects": [
"$countries",
{ "rank": { "$add": ["$rank", 1] } }
]
}
}}
];
let cursor = order_collection.aggregate(pipeline, None).await?;
// Note: try_collect requires futures TryStreamExt trait
// Using simplified approach for this example
let results = Vec::new();
Ok(results)
}