mongodbdata-engineerinterviewchange-streamsetldata-warehousesparkaggregation

MongoDB 数据工程师面试实录 2026:Change Streams + 数据湖 ETL 管道 完整复盘

MongoDB Data Engineer 面试真实经历:MongoDB Change Streams 实时 ETL、文档数据库到数据仓库迁移、聚合管道优化、数据湖架构设计完整复盘。第一人称面经,含面试官对话与解题思路。

Sam · · 16 分钟阅读

公司:MongoDB 岗位:Data Engineer (L4) 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer

2026 年 4 月通过内推投递了 MongoDB 的 Data Engineer 岗位。整个流程大约 3 周。

MongoDB 的 DE 面试最独特的地方:从文档数据库到数据仓库的 ETL 是核心主题。 作为 NoSQL 数据库公司,MongoDB 需要帮客户解决”如何把 MongoDB 的数据同步到数据仓库/数据湖”这个问题。面试中大量考察 Change Streams、聚合管道和数据模型转换。


Phone Screen:MongoDB 聚合管道

题目:使用聚合管道分析用户行为

给定 MongoDB 中的用户行为集合,使用聚合管道分析用户行为模式。

// 用户行为集合
// {
//   _id: ObjectId,
//   userId: "user_123",
//   eventType: "page_view" | "click" | "purchase",
//   timestamp: ISODate,
//   metadata: { page: "/products", product: "123", price: 99.99 }
// }

// 我的解答:分析每日用户行为统计
db.userEvents.aggregate([
  // 步骤 1: 过滤最近 30 天的数据
  {
    $match: {
      timestamp: {
        $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
      }
    }
  },
  
  // 步骤 2: 提取日期
  {
    $addFields: {
      date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
    }
  },
  
  // 步骤 3: 按日期和事件类型分组
  {
    $group: {
      _id: {
        date: "$date",
        eventType: "$eventType"
      },
      count: { $sum: 1 },
      uniqueUsers: { $addToSet: "$userId" },
      avgPrice: {
        $avg: { $ifNull: ["$metadata.price", 0] }
      }
    }
  },
  
  // 步骤 4: 计算唯一用户数
  {
    $addFields: {
      uniqueUserCount: { $size: "$uniqueUsers" }
    }
  },
  
  // 步骤 5: 排序
  {
    $sort: { "_id.date": -1, "_id.eventType": 1 }
  },
  
  // 步骤 6: 格式化输出
  {
    $project: {
      _id: 0,
      date: "$_id.date",
      eventType: "$_id.eventType",
      count: 1,
      uniqueUserCount: 1,
      avgPrice: { $round: ["$avgPrice", 2] }
    }
  }
])

面试官追问:

“如果这个聚合管道跑了 5 分钟还没结果,怎么优化?”

我回答:

// 优化 1: 确保有合适的索引
db.userEvents.createIndex({ timestamp: -1 })
db.userEvents.createIndex({ eventType: 1, timestamp: -1 })

// 优化 2: 使用 $facets 做多维度并行聚合
db.userEvents.aggregate([
  {
    $match: {
      timestamp: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
    }
  },
  {
    $facets: [
      // 维度 1: 按事件类型
      {
        eventType: [
          { $group: { _id: "$eventType", count: { $sum: 1 } } }
        ]
      },
      // 维度 2: 按日期
      {
        daily: [
          { $addFields: { date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } } } },
          { $group: { _id: "$date", count: { $sum: 1 } } }
        ]
      }
    ]
  }
])

// 优化 3: 使用物化视图
// 预先计算并存储聚合结果
db.dailyStats.insertOne({
  date: "2026-05-27",
  stats: {
    page_view: { count: 150000, uniqueUsers: 45000 },
    click: { count: 80000, uniqueUsers: 30000 },
    purchase: { count: 5000, uniqueUsers: 4800 }
  }
})

// 定时任务更新物化视图
// 每 5 分钟运行一次

VO Round 1:MongoDB Change Streams 实时 ETL

题目:使用 Change Streams 实现 MongoDB 到数据仓库的实时同步

MongoDB 的 Change Streams 可以监听集合的变更事件,设计一个实时 ETL 管道。

from pymongo import MongoClient
from pymongo.change_stream import ChangeStream
from kafka import KafkaProducer
import json

# ====== MongoDB Change Streams 实时 ETL ======

# 1. 连接 MongoDB
client = MongoClient('mongodb://user:pass@mongo-primary:27017,mongo-secondary:27017/?replicaSet=rs0')
db = client['ecommerce']
collection = db['orders']

# 2. 创建 Change Stream
pipeline = [
    {
        $match: {
            $or: [
                { operationType: "insert" },
                { operationType: "update" },
                { operationType: "delete" }
            ]
        }
    }
]

change_stream = collection.watch(pipeline)

# 3. 连接 Kafka
producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 4. 处理变更事件
for change in change_stream:
    try:
        operation = change['operationType']
        document_key = change.get('documentKey', {})
        full_document = change.get('fullDocument', {})
        
        # 构造 ETL 消息
        etl_message = {
            'operation': operation,
            'collection': 'orders',
            'document_id': str(document_key.get('_id', '')),
            'data': full_document if operation != 'delete' else None,
            'timestamp': change['clusterTime'].isoformat()
        }
        
        # 发送到 Kafka
        producer.send(
            'mongodb.orders.changes',
            key=str(document_key.get('_id', '')).encode(),
            value=etl_message
        )
        
        print(f"Processed {operation} for {document_key}")
        
    except Exception as e:
        print(f"Error processing change: {e}")
        # 写入死信队列
        producer.send('mongodb.changes.dlq', value=str(change))

producer.flush()

VO Round 2:系统设计 — 文档数据库到数据仓库 ETL

题目:设计 MongoDB 到数据仓库的 ETL 管道

MongoDB 的数据是半结构化的文档,需要设计一个 ETL 管道将其转换为关系型数据仓库的格式。

我的架构设计:

┌──────────────────────────────────────────────────────────────┐
│                    MongoDB ETL Pipeline                       │
│                                                               │
│  ┌────────────────────────────────────────────────────────┐  │
│  │                  MongoDB Cluster                       │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐             │  │
│  │  │ orders   │  │ users    │  │ products │             │  │
│  │  │          │  │          │  │          │             │  │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘             │  │
│  │       │              │              │                   │  │
│  │       ▼              ▼              ▼                  │  │
│  │  ┌──────────────────────────────────────────────────┐  │  │
│  │  │  Change Streams (Real-time CDC)                   │  │  │
│  │  │  - insert     │  │  │  │  │  │  │  │  │  │  │  │  │  │  │
│  │  │  - update     │  │  │  │  │  │  │  │  │  │  │  │  │  │  │
│  │  │  - delete     │  │  │  │  │  │  │  │  │  │  │  │  │  │  │
│  │  └──────────────────────┬───────────────────────────┘  │  │
│  └─────────────────────────┼──────────────────────────────┘  │
│                            │                                 │
│                            ▼                                 │
│  ┌────────────────────────────────────────────────────────┐  │
│  │              Kafka (Event Buffer)                        │  │
│  │  ┌──────────────────────────────────────────────────┐  │  │
│  │  │ topic: mongodb.changes.orders                     │  │  │
│  │  │ topic: mongodb.changes.users                      │  │  │
│  │  │ topic: mongodb.changes.products                   │  │  │
│  │  └──────────────────────────────────────────────────┘  │  │
│  └──────────────────────────┬─────────────────────────────┘  │
│                             │                                │
│                             ▼                                │
│  ┌────────────────────────────────────────────────────────┐  │
│  │              Spark Streaming (Transformation)            │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐             │  │
│  │  │ Flatten  │  │ Type     │  │ Schema   │             │  │
│  │  │ (BSON →  │  │ Cast     │  │ Registry │             │  │
│  │  │  JSON)   │  │          │  │          │             │  │
│  │  └──────────┘  └──────────┘  └──────────┘             │  │
│  └──────────┬────────────────────────────────────────────┘  │
│             │                                               │
│             ▼                                               │
│  ┌────────────────────────────────────────────────────────┐  │
│  │              Data Warehouse (Snowflake/Redshift)        │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐             │  │
│  │  │ orders   │  │ users    │  │ products │             │  │
│  │  │ (star    │  │ (dim)    │  │ (dim)    │             │  │
│  │  │  schema) │  │          │  │          │             │  │
│  │  └──────────┘  └──────────┘  └──────────┘             │  │
│  └────────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────┘

文档到关系型的转换:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, get_json_object

spark = SparkSession.builder \
    .appName("MongoDB_to_DW_ETL") \
    .getOrCreate()

# ====== MongoDB 到关系型的转换 ======

# 1. 从 MongoDB 读取数据
orders_df = spark.read \
    .format("mongodb") \
    .option("uri", "mongodb://user:pass@mongo:27017") \
    .option("database", "ecommerce") \
    .option("collection", "orders") \
    .load()

# 2. 扁平化嵌套文档
# MongoDB 文档:
# {
#   _id: ObjectId,
#   userId: "user_123",
#   items: [
#     { productId: "prod_1", quantity: 2, price: 99.99 },
#     { productId: "prod_2", quantity: 1, price: 49.99 }
#   ],
#   shipping: { address: "123 Main St", city: "San Francisco", state: "CA" },
#   total: 249.97,
#   status: "completed",
#   createdAt: ISODate
# }

# 展平 shipping 对象
orders_flat = orders_df \
    .withColumn("shipping_address", get_json_object(col("shipping"), "$.address")) \
    .withColumn("shipping_city", get_json_object(col("shipping"), "$.city")) \
    .withColumn("shipping_state", get_json_object(col("shipping"), "$.state")) \
    .drop("shipping")

# 3. 展开 items 数组(一行变多行)
order_items = orders_flat \
    .withColumn("item", explode(col("items"))) \
    .withColumn("product_id", get_json_object(col("item"), "$.productId")) \
    .withColumn("quantity", get_json_object(col("item"), "$.quantity")) \
    .withColumn("item_price", get_json_object(col("item"), "$.price")) \
    .drop("items", "item")

# 4. 类型转换
order_items_typed = order_items \
    .withColumn("quantity", col("quantity").cast("int")) \
    .withColumn("item_price", col("item_price").cast("double")) \
    .withColumn("total", col("total").cast("double")) \
    .withColumn("created_at", col("createdAt").cast("timestamp"))

# 5. 写入数据仓库
order_items_typed.write \
    .format("jdbc") \
    .option("url", "jdbc:snowflake://account.snowflakecomputing.com") \
    .option("dbtable", "orders.order_items") \
    .option("user", "user") \
    .option("password", "pass") \
    .mode("append") \
    .save()

面试总结

成功经验

  1. Change Streams:理解 MongoDB Change Streams 的原理和使用场景
  2. 聚合管道优化:知道如何优化 MongoDB 聚合管道的性能
  3. 文档到关系型转换:理解如何把半结构化数据转换为关系型格式

注意事项

  1. MongoDB 深度:不是 CRUD 层面的使用,而是理解 Change Streams、聚合管道等高级功能
  2. ETL 设计:理解 CDC、Change Data Capture 的概念
  3. 数据模型转换:知道如何处理嵌套文档、数组、类型转换

推荐阅读


💡 需要面试辅导?

联系我们

准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们