mongodbdata-engineerinterviewchange-streamsetldata-warehousesparkaggregation
MongoDB 数据工程师面试实录 2026:Change Streams + 数据湖 ETL 管道 完整复盘
MongoDB Data Engineer 面试真实经历:MongoDB Change Streams 实时 ETL、文档数据库到数据仓库迁移、聚合管道优化、数据湖架构设计完整复盘。第一人称面经,含面试官对话与解题思路。
Sam · · 16 分钟阅读
公司:MongoDB 岗位:Data Engineer (L4) 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer
2026 年 4 月通过内推投递了 MongoDB 的 Data Engineer 岗位。整个流程大约 3 周。
MongoDB 的 DE 面试最独特的地方:从文档数据库到数据仓库的 ETL 是核心主题。 作为 NoSQL 数据库公司,MongoDB 需要帮客户解决”如何把 MongoDB 的数据同步到数据仓库/数据湖”这个问题。面试中大量考察 Change Streams、聚合管道和数据模型转换。
Phone Screen:MongoDB 聚合管道
题目:使用聚合管道分析用户行为
给定 MongoDB 中的用户行为集合,使用聚合管道分析用户行为模式。
// 用户行为集合
// {
// _id: ObjectId,
// userId: "user_123",
// eventType: "page_view" | "click" | "purchase",
// timestamp: ISODate,
// metadata: { page: "/products", product: "123", price: 99.99 }
// }
// 我的解答:分析每日用户行为统计
db.userEvents.aggregate([
// 步骤 1: 过滤最近 30 天的数据
{
$match: {
timestamp: {
$gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
}
}
},
// 步骤 2: 提取日期
{
$addFields: {
date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
}
},
// 步骤 3: 按日期和事件类型分组
{
$group: {
_id: {
date: "$date",
eventType: "$eventType"
},
count: { $sum: 1 },
uniqueUsers: { $addToSet: "$userId" },
avgPrice: {
$avg: { $ifNull: ["$metadata.price", 0] }
}
}
},
// 步骤 4: 计算唯一用户数
{
$addFields: {
uniqueUserCount: { $size: "$uniqueUsers" }
}
},
// 步骤 5: 排序
{
$sort: { "_id.date": -1, "_id.eventType": 1 }
},
// 步骤 6: 格式化输出
{
$project: {
_id: 0,
date: "$_id.date",
eventType: "$_id.eventType",
count: 1,
uniqueUserCount: 1,
avgPrice: { $round: ["$avgPrice", 2] }
}
}
])
面试官追问:
“如果这个聚合管道跑了 5 分钟还没结果,怎么优化?”
我回答:
// 优化 1: 确保有合适的索引
db.userEvents.createIndex({ timestamp: -1 })
db.userEvents.createIndex({ eventType: 1, timestamp: -1 })
// 优化 2: 使用 $facets 做多维度并行聚合
db.userEvents.aggregate([
{
$match: {
timestamp: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
}
},
{
$facets: [
// 维度 1: 按事件类型
{
eventType: [
{ $group: { _id: "$eventType", count: { $sum: 1 } } }
]
},
// 维度 2: 按日期
{
daily: [
{ $addFields: { date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } } } },
{ $group: { _id: "$date", count: { $sum: 1 } } }
]
}
]
}
])
// 优化 3: 使用物化视图
// 预先计算并存储聚合结果
db.dailyStats.insertOne({
date: "2026-05-27",
stats: {
page_view: { count: 150000, uniqueUsers: 45000 },
click: { count: 80000, uniqueUsers: 30000 },
purchase: { count: 5000, uniqueUsers: 4800 }
}
})
// 定时任务更新物化视图
// 每 5 分钟运行一次
VO Round 1:MongoDB Change Streams 实时 ETL
题目:使用 Change Streams 实现 MongoDB 到数据仓库的实时同步
MongoDB 的 Change Streams 可以监听集合的变更事件,设计一个实时 ETL 管道。
from pymongo import MongoClient
from pymongo.change_stream import ChangeStream
from kafka import KafkaProducer
import json
# ====== MongoDB Change Streams 实时 ETL ======
# 1. 连接 MongoDB
client = MongoClient('mongodb://user:pass@mongo-primary:27017,mongo-secondary:27017/?replicaSet=rs0')
db = client['ecommerce']
collection = db['orders']
# 2. 创建 Change Stream
pipeline = [
{
$match: {
$or: [
{ operationType: "insert" },
{ operationType: "update" },
{ operationType: "delete" }
]
}
}
]
change_stream = collection.watch(pipeline)
# 3. 连接 Kafka
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 4. 处理变更事件
for change in change_stream:
try:
operation = change['operationType']
document_key = change.get('documentKey', {})
full_document = change.get('fullDocument', {})
# 构造 ETL 消息
etl_message = {
'operation': operation,
'collection': 'orders',
'document_id': str(document_key.get('_id', '')),
'data': full_document if operation != 'delete' else None,
'timestamp': change['clusterTime'].isoformat()
}
# 发送到 Kafka
producer.send(
'mongodb.orders.changes',
key=str(document_key.get('_id', '')).encode(),
value=etl_message
)
print(f"Processed {operation} for {document_key}")
except Exception as e:
print(f"Error processing change: {e}")
# 写入死信队列
producer.send('mongodb.changes.dlq', value=str(change))
producer.flush()
VO Round 2:系统设计 — 文档数据库到数据仓库 ETL
题目:设计 MongoDB 到数据仓库的 ETL 管道
MongoDB 的数据是半结构化的文档,需要设计一个 ETL 管道将其转换为关系型数据仓库的格式。
我的架构设计:
┌──────────────────────────────────────────────────────────────┐
│ MongoDB ETL Pipeline │
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ MongoDB Cluster │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ orders │ │ users │ │ products │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ Change Streams (Real-time CDC) │ │ │
│ │ │ - insert │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ - update │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ - delete │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ └──────────────────────┬───────────────────────────┘ │ │
│ └─────────────────────────┼──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Kafka (Event Buffer) │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ topic: mongodb.changes.orders │ │ │
│ │ │ topic: mongodb.changes.users │ │ │
│ │ │ topic: mongodb.changes.products │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ └──────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Spark Streaming (Transformation) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Flatten │ │ Type │ │ Schema │ │ │
│ │ │ (BSON → │ │ Cast │ │ Registry │ │ │
│ │ │ JSON) │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────┬────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Data Warehouse (Snowflake/Redshift) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ orders │ │ users │ │ products │ │ │
│ │ │ (star │ │ (dim) │ │ (dim) │ │ │
│ │ │ schema) │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
文档到关系型的转换:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, get_json_object
spark = SparkSession.builder \
.appName("MongoDB_to_DW_ETL") \
.getOrCreate()
# ====== MongoDB 到关系型的转换 ======
# 1. 从 MongoDB 读取数据
orders_df = spark.read \
.format("mongodb") \
.option("uri", "mongodb://user:pass@mongo:27017") \
.option("database", "ecommerce") \
.option("collection", "orders") \
.load()
# 2. 扁平化嵌套文档
# MongoDB 文档:
# {
# _id: ObjectId,
# userId: "user_123",
# items: [
# { productId: "prod_1", quantity: 2, price: 99.99 },
# { productId: "prod_2", quantity: 1, price: 49.99 }
# ],
# shipping: { address: "123 Main St", city: "San Francisco", state: "CA" },
# total: 249.97,
# status: "completed",
# createdAt: ISODate
# }
# 展平 shipping 对象
orders_flat = orders_df \
.withColumn("shipping_address", get_json_object(col("shipping"), "$.address")) \
.withColumn("shipping_city", get_json_object(col("shipping"), "$.city")) \
.withColumn("shipping_state", get_json_object(col("shipping"), "$.state")) \
.drop("shipping")
# 3. 展开 items 数组(一行变多行)
order_items = orders_flat \
.withColumn("item", explode(col("items"))) \
.withColumn("product_id", get_json_object(col("item"), "$.productId")) \
.withColumn("quantity", get_json_object(col("item"), "$.quantity")) \
.withColumn("item_price", get_json_object(col("item"), "$.price")) \
.drop("items", "item")
# 4. 类型转换
order_items_typed = order_items \
.withColumn("quantity", col("quantity").cast("int")) \
.withColumn("item_price", col("item_price").cast("double")) \
.withColumn("total", col("total").cast("double")) \
.withColumn("created_at", col("createdAt").cast("timestamp"))
# 5. 写入数据仓库
order_items_typed.write \
.format("jdbc") \
.option("url", "jdbc:snowflake://account.snowflakecomputing.com") \
.option("dbtable", "orders.order_items") \
.option("user", "user") \
.option("password", "pass") \
.mode("append") \
.save()
面试总结
成功经验
- Change Streams:理解 MongoDB Change Streams 的原理和使用场景
- 聚合管道优化:知道如何优化 MongoDB 聚合管道的性能
- 文档到关系型转换:理解如何把半结构化数据转换为关系型格式
注意事项
- MongoDB 深度:不是 CRUD 层面的使用,而是理解 Change Streams、聚合管道等高级功能
- ETL 设计:理解 CDC、Change Data Capture 的概念
- 数据模型转换:知道如何处理嵌套文档、数组、类型转换