Examples showing how to use Tnuctipun’s type-safe filters and projections in MongoDB aggregation pipelines.

Basic Pipeline with $match and $project

use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};
use bson::doc;

#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
    pub id: String,
    pub user_id: String,
    pub status: String,
    pub total_amount: f64,
    pub created_at: bson::DateTime,
    pub items: Vec<OrderItem>,
}

#[derive(Debug, Serialize, Deserialize)]
struct OrderItem {
    pub product_id: String,
    pub quantity: i32,
    pub price: f64,
}

async fn basic_aggregation_example(
    collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
    
    // Type-safe $match stage
    let match_filter = empty::<Order>()
        .eq::<order_fields::Status, _>("completed".to_string())
        .gte::<order_fields::TotalAmount, _>(100.0)
        .and();
    
    // Type-safe $project stage
    let project_doc = projection::empty::<Order>()
        .includes::<order_fields::Id>()
        .includes::<order_fields::UserId>()
        .includes::<order_fields::TotalAmount>()
        .includes::<order_fields::CreatedAt>()
        .excludes::<order_fields::Items>()  // Exclude large array
        .build();
    
    let pipeline = vec![
        doc! { "$match": match_filter },
        doc! { "$project": project_doc },
        doc! { "$sort": { "total_amount": -1 } },
        doc! { "$limit": 50 }
    ];
    
    let cursor = collection.aggregate(pipeline, None).await?;
    // Note: try_collect requires futures TryStreamExt trait
    // Using simplified approach for this example
    let mut results = Vec::new();
    // cursor iteration would be implemented here
    
    Ok(results)
}

Complex Aggregation with Multiple Stages

use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
    pub id: String,
    pub user_id: String,
    pub status: String,
    pub total_amount: f64,
    pub created_at: bson::DateTime,
}

async fn complex_sales_analysis(
    order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
    
    // Filter for recent completed orders
    // Note: For time-based filtering, would need proper date handling
    let match_filter = empty::<Order>()
        .eq::<order_fields::Status, _>("completed".to_string())
        // .gte::<order_fields::CreatedAt, _>(recent_date)  // Commented for API compatibility
        .and();
    
    // Project relevant fields for analysis
    let project_doc = projection::empty::<Order>()
        .includes::<order_fields::UserId>()
        .includes::<order_fields::TotalAmount>()
        .includes::<order_fields::CreatedAt>()
        .build();
    
    let pipeline = vec![
        doc! { "$match": match_filter },
        doc! { "$project": project_doc },
        
        // Group by user to calculate user metrics
        doc! { "$group": {
            "_id": "$user_id",
            "total_orders": { "$sum": 1 },
            "total_spent": { "$sum": "$total_amount" },
            "avg_order_value": { "$avg": "$total_amount" },
            "first_order": { "$min": "$created_at" },
            "last_order": { "$max": "$created_at" }
        }},
        
        // Filter for high-value customers
        doc! { "$match": {
            "total_spent": { "$gte": 500.0 },
            "total_orders": { "$gte": 3 }
        }},
        
        // Add customer segment classification
        doc! { "$addFields": {
            "customer_segment": {
                "$switch": {
                    "branches": [
                        {
                            "case": { "$gte": ["$total_spent", 2000.0] },
                            "then": "premium"
                        },
                        {
                            "case": { "$gte": ["$total_spent", 1000.0] },
                            "then": "gold"
                        }
                    ],
                    "default": "silver"
                }
            }
        }},
        
        // Sort by total spent
        doc! { "$sort": { "total_spent": -1 } },
        
        // Limit results
        doc! { "$limit": 100 }
    ];
    
    let cursor = order_collection.aggregate(pipeline, None).await?;
    // Note: try_collect requires futures TryStreamExt trait
    // Using simplified approach for this example
    let mut results = Vec::new();
    // cursor iteration would be implemented here
    
    Ok(results)
}

Lookup and Join Operations

use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct User {
    pub id: String,
    pub name: String,
    pub email: String,
    pub registration_date: bson::DateTime,
}

#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
    pub id: String,
    pub user_id: String,
    pub status: String,
    pub total_amount: f64,
    pub created_at: bson::DateTime,
}

async fn user_order_analytics(
    user_collection: &mongodb::Collection<User>,
    order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
    
    // Start with active users
    let user_filter = empty::<User>()
        .ne::<user_fields::Email, _>("".to_string())  // Valid email
        .and();
    
    let user_projection = projection::empty::<User>()
        .includes::<user_fields::Id>()
        .includes::<user_fields::Name>()
        .includes::<user_fields::Email>()
        .includes::<user_fields::RegistrationDate>()
        .build();
    
    let pipeline = vec![
        doc! { "$match": user_filter },
        doc! { "$project": user_projection },
        
        // Lookup orders for each user
        doc! { "$lookup": {
            "from": "orders",
            "localField": "id",
            "foreignField": "user_id",
            "as": "orders"
        }},
        
        // Filter users who have completed orders
        doc! { "$match": {
            "orders": { "$ne": [] }
        }},
        
        // Add computed fields
        doc! { "$addFields": {
            "total_orders": { "$size": "$orders" },
            "completed_orders": {
                "$size": {
                    "$filter": {
                        "input": "$orders",
                        "cond": { "$eq": ["$$this.status", "completed"] }
                    }
                }
            },
            "total_spent": {
                "$sum": {
                    "$map": {
                        "input": {
                            "$filter": {
                                "input": "$orders",
                                "cond": { "$eq": ["$$this.status", "completed"] }
                            }
                        },
                        "in": "$$this.total_amount"
                    }
                }
            }
        }},
        
        // Filter for customers with meaningful activity
        doc! { "$match": {
            "completed_orders": { "$gte": 1 },
            "total_spent": { "$gte": 50.0 }
        }},
        
        // Sort by total spent
        doc! { "$sort": { "total_spent": -1 } },
        
        // Project final result
        doc! { "$project": {
            "name": 1,
            "email": 1,
            "registration_date": 1,
            "total_orders": 1,
            "completed_orders": 1,
            "total_spent": 1,
            "avg_order_value": { "$divide": ["$total_spent", "$completed_orders"] }
        }}
    ];
    
    let cursor = user_collection.aggregate(pipeline, None).await?;
    // Note: try_collect requires futures TryStreamExt trait
    // Using simplified approach for this example
    let results = Vec::new();
    
    Ok(results)
}

Time-based Analytics

use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
    pub id: String,
    pub user_id: String,
    pub status: String,
    pub total_amount: f64,
    pub created_at: bson::DateTime,
}

async fn monthly_sales_trends(
    order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
    
    // Filter for the last year
    let one_year_ago = bson::DateTime::from_millis(
        bson::DateTime::now().timestamp_millis() - (365 * 24 * 60 * 60 * 1000)
    );
    let time_filter = empty::<Order>()
        .eq::<order_fields::Status, _>("completed".to_string())
        .gte::<order_fields::CreatedAt, _>(one_year_ago)
        .and();
    
    let project_doc = projection::empty::<Order>()
        .includes::<order_fields::TotalAmount>()
        .includes::<order_fields::CreatedAt>()
        .build();
    
    let pipeline = vec![
        doc! { "$match": time_filter },
        doc! { "$project": project_doc },
        
        // Group by year-month
        doc! { "$group": {
            "_id": {
                "year": { "$year": "$created_at" },
                "month": { "$month": "$created_at" }
            },
            "total_revenue": { "$sum": "$total_amount" },
            "order_count": { "$sum": 1 },
            "avg_order_value": { "$avg": "$total_amount" },
            "max_order_value": { "$max": "$total_amount" },
            "min_order_value": { "$min": "$total_amount" }
        }},
        
        // Sort by year-month
        doc! { "$sort": { "_id.year": 1, "_id.month": 1 } },
        
        // Add month name for readability
        doc! { "$addFields": {
            "month_name": {
                "$arrayElemAt": [
                    ["", "Jan", "Feb", "Mar", "Apr", "May", "Jun",
                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"],
                    "$_id.month"
                ]
            },
            "year_month": {
                "$concat": [
                    { "$toString": "$_id.year" },
                    "-",
                    { "$toString": "$_id.month" }
                ]
            }
        }}
    ];
    
    let cursor = order_collection.aggregate(pipeline, None).await?;
    // Note: try_collect requires futures TryStreamExt trait
    // Using simplified approach for this example
    let results = Vec::new();
    
    Ok(results)
}

Geographic Analysis

use tnuctipun::{FieldWitnesses, MongoComparable, filters::empty, projection};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, FieldWitnesses, MongoComparable)]
struct Order {
    pub id: String,
    pub user_id: String,
    pub status: String,
    pub total_amount: f64,
    pub shipping_address: Address,
    pub created_at: bson::DateTime,
}

#[derive(Debug, Serialize, Deserialize)]
struct Address {
    pub country: String,
    pub state: Option<String>,
    pub city: String,
}

async fn geographic_sales_analysis(
    order_collection: &mongodb::Collection<Order>
) -> mongodb::error::Result<Vec<bson::Document>> {
    
    let completed_filter = empty::<Order>()
        .eq::<order_fields::Status, _>("completed".to_string())
        .and();
    
    let pipeline = vec![
        doc! { "$match": completed_filter },
        
        // Group by country
        doc! { "$group": {
            "_id": "$shipping_address.country",
            "total_orders": { "$sum": 1 },
            "total_revenue": { "$sum": "$total_amount" },
            "avg_order_value": { "$avg": "$total_amount" },
            "unique_customers": { "$addToSet": "$user_id" }
        }},
        
        // Add customer count
        doc! { "$addFields": {
            "customer_count": { "$size": "$unique_customers" }
        }},
        
        // Remove the unique_customers array (we only needed the count)
        doc! { "$project": {
            "unique_customers": 0
        }},
        
        // Filter for countries with meaningful volume
        doc! { "$match": {
            "total_orders": { "$gte": 10 }
        }},
        
        // Sort by revenue
        doc! { "$sort": { "total_revenue": -1 } },
        
        // Add ranking
        doc! { "$group": {
            "_id": null,
            "countries": { "$push": "$$ROOT" }
        }},
        
        doc! { "$unwind": {
            "path": "$countries",
            "includeArrayIndex": "rank"
        }},
        
        doc! { "$replaceRoot": {
            "newRoot": {
                "$mergeObjects": [
                    "$countries",
                    { "rank": { "$add": ["$rank", 1] } }
                ]
            }
        }}
    ];
    
    let cursor = order_collection.aggregate(pipeline, None).await?;
    // Note: try_collect requires futures TryStreamExt trait
    // Using simplified approach for this example
    let results = Vec::new();
    
    Ok(results)
}