Databricks 数据工程师面试实录 2026:Spark + Delta Lake + Lakehouse 深度面
Databricks Data Engineer 面试真实经历:PySpark、Delta Lake、Lakehouse 架构、System Design 完整复盘。第一人称真实面经,含面试官对话与解题思路。
公司:Databricks 岗位:Data Engineer II 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer
2026 年 7 月,我参加了 Databricks 的 Data Engineer 面试。作为 Lakehouse 架构的缔造者,Databricks 的面试自然围绕 Spark、Delta Lake 和 Lakehouse 架构展开。面试官都是 Spark 核心开发团队或资深数据工程师,对分布式计算和 Delta Lake internals 的理解要求非常高。
Phone Screen:Spark SQL + PySpark
电话面由一位 Staff Engineer 进行,45 分钟,纯 Spark/PySpark 编码。
题目:DataFrame 操作优化
给定以下 PySpark 代码,请指出问题并优化:
# 原始代码
df = spark.read.parquet("dbfs:/data/events/")
result = df \
.filter(df.event_type == "click") \
.withColumn("hour", F.hour(df.event_time)) \
.groupBy(df.user_id, "hour") \
.agg(
F.count("*").alias("click_count"),
F.avg(df.duration).alias("avg_duration")
) \
.join(
spark.read.parquet("dbfs:/data/users/"),
on="user_id",
how="left"
) \
.orderBy(F.desc("click_count"))
我指出的问题:
- 重复读取:
users表每次执行都会重新读取 - Shuffle 过多:
groupBy和join都会触发 shuffle - 全局排序:
orderBy触发单点排序,容易 OOM - 没有缓存:如果
df被多次使用,没有缓存
优化后的代码:
from pyspark.sql import SparkSession, functions as F
from delta import configure_spark_with_delta_pip
# 构建优化后的 Spark Session
builder = (SparkSession.builder
.appName("ClickAnalytics")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.adaptive.enabled", "true") # AQE
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.config("spark.sql.shuffle.partitions", "200")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# 1. 读取并缓存
events_df = spark.read.format("delta").load("dbfs:/data/events/").cache()
users_df = spark.read.format("delta").load("dbfs:/data/users/").cache()
# 触发缓存
events_df.count()
users_df.count()
# 2. 优化查询
result = (events_df
.filter(F.col("event_type") == "click")
.withColumn("hour", F.hour(F.col("event_time")))
.groupBy("user_id", "hour")
.agg(
F.count("*").alias("click_count"),
F.avg("duration").alias("avg_duration")
)
.join(users_df, on="user_id", how="left")
.sortWithinPartitions(F.desc("click_count")) # 局部排序
)
# 如果需要全局 Top N,先局部排序再全局聚合
top_100 = (result
.withColumn("rank", F.row_number().over(
F.Window.orderBy(F.desc("click_count"))
))
.filter(F.col("rank") <= 100))
# 3. 写入 Delta 表
result.write.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.save("dbfs:/data/outputs/click_summary/")
面试官追问:
“AQE(Adaptive Query Execution)具体做了什么优化?”
我回答:
- 动态合并分区:小文件自动合并,减少小文件开销
- 动态优化 Shuffle:根据实际数据大小调整 shuffle partition 数量
- 动态处理倾斜:自动检测倾斜 key,使用加盐策略分散
# AQE 配置详解
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", 4)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5.0)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
VO Round 1:PySpark 深度 + 性能调优
这一轮由一位 Spark 核心团队成员进行,60 分钟。
题目 1:处理数据倾斜
以下代码在运行时报 OOM,请分析原因并修复:
# 问题代码
user_events = spark.read.parquet("dbfs:/data/events/")
user_profiles = spark.read.parquet("dbfs:/data/profiles/")
result = user_events.join(user_profiles, on="user_id", how="left")
原因分析:
某些 user_id(如匿名用户、测试用户)的数据量极大,导致单个 partition 数据倾斜。
修复方案:
# 方案 1: 广播小表(如果 profiles 表较小)
from pyspark.sql.functions import broadcast
result = user_events.join(
broadcast(user_profiles),
on="user_id",
how="left"
)
# 方案 2: 加盐值分散热点 key
def add_salt(df, salt_col, num_salts=100):
"""为 DataFrame 添加盐值列"""
return df.withColumn(salt_col, (F.hash(F.col("user_id")) % num_salts))
def remove_salt(df, salt_col):
"""移除盐值列"""
return df.drop(salt_col)
# 为两个表都加盐
salted_events = add_salt(user_events, "event_salt", 100)
salted_profiles = add_salt(user_profiles, "profile_salt", 100)
# 在 salt 列上也做 join
result = (salted_events
.join(
salted_profiles,
on=["user_id", F.col("event_salt") == F.col("profile_salt")],
how="left"
)
.drop("profile_salt")
.groupBy("user_id") # 重新聚合
.agg(F.first(F.col("column_name")).alias("column_name")) # 取第一个非空值
)
# 方案 3: 使用 AQE 自动处理倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
result = user_events.join(user_profiles, on="user_id", how="left")
题目 2:UDF 性能优化
以下 UDF 运行很慢,请优化:
# 原始 UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def parse_user_agent(ua_string):
"""解析 User-Agent 字符串"""
if "Chrome" in ua_string:
return "Chrome"
elif "Firefox" in ua_string:
return "Firefox"
elif "Safari" in ua_string:
return "Safari"
else:
return "Other"
parse_ua_udf = udf(parse_user_agent, StringType())
df.withColumn("browser", parse_ua_udf(df.user_agent))
优化方案:
# 方案 1: 使用内置函数替代 UDF(推荐)
df.withColumn(
"browser",
F.when(F.col("user_agent").contains("Chrome"), "Chrome")
.when(F.col("user_agent").contains("Firefox"), "Firefox")
.when(F.col("user_agent").contains("Safari"), "Safari")
.otherwise("Other")
)
# 方案 2: 使用 Pandas UDF(如果逻辑复杂)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def parse_ua_pandas(ua_series: pd.Series) -> pd.Series:
"""Pandas UDF - 向量化操作"""
result = ua_series.apply(lambda x: _parse_ua_fast(x))
return result
def _parse_ua_fast(ua):
if "Chrome" in ua:
return "Chrome"
elif "Firefox" in ua:
return "Firefox"
elif "Safari" in ua:
return "Safari"
return "Other"
# 方案 3: 使用 Regex Extract
df.withColumn(
"browser",
F.regexp_extract(F.col("user_agent"), r"(Chrome|Firefox|Safari|Edge)", 1)
)
性能对比:
| 方法 | 性能 | 原因 |
|---|---|---|
| Python UDF | 1x (基准) | JVM ↔ Python 序列化开销 |
| Pandas UDF | 5-10x | 向量化操作,减少序列化 |
| 内置函数 | 10-100x | Catalyst 优化器优化 |
VO Round 2:Delta Lake 深度
这一轮由一位 Delta Lake 团队成员进行,60 分钟。
题目:Delta Lake 特性实战
- 解释 Delta Lake 的 ACID 保证如何实现
- 演示 Time Travel 和 MERGE 操作
- 解释 Z-Ordering 和 OPTIMIZE
我的回答:
Delta Lake 事务日志
┌─────────────────────────────────────────────────────────┐
│ Delta Transaction Log │
│ │
│ _delta_log/ │
│ ├── 00000000000000000000.json (version 0) │
│ ├── 00000000000000000001.json (version 1) │
│ ├── 00000000000000000002.json (version 2) │
│ └── ... │
│ │
│ Each JSON contains: │
│ - add/remove file operations │
│ - metadata changes │
│ - commit timestamp │
│ │
│ Checkpoint: │
│ ├── 00000000000000000010.checkpoint.parquet │
│ └── (compacted state for faster recovery) │
└─────────────────────────────────────────────────────────┘
Time Travel 示例
# 写入 Delta 表
df.write.format("delta").mode("append").save("dbfs:/data/events/")
# 查询当前版本
spark.read.format("delta").load("dbfs:/data/events/").show()
# 查询历史版本(按版本号)
spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("dbfs:/data/events/") \
.show()
# 查询历史版本(按时间点)
spark.read.format("delta") \
.option("timestampAsOf", "2026-09-07 12:00:00") \
.load("dbfs:/data/events/") \
.show()
# 查看 Delta 表历史
df_history = spark.read.format("delta").load("dbfs:/data/events/")
df_history.history().show()
# 恢复到指定版本
spark.sql("RESTORE TABLE events TO VERSION AS OF 5")
MERGE (UPSERT) 操作
from delta.tables import DeltaTable
# 加载 Delta 表
delta_table = DeltaTable.forPath(spark, "dbfs:/data/users/")
# 准备更新数据
updates_df = spark.read.json("dbfs:/data/user_updates/")
# MERGE 操作
(delta_table
.alias("target")
.merge(
updates_df.alias("source"),
"target.user_id = source.user_id"
)
.whenMatchedUpdate(set={
"user_name": "source.user_name",
"email": "source.email",
"updated_at": F.current_timestamp()
})
.whenNotMatchedInsert(values={
"user_id": "source.user_id",
"user_name": "source.user_name",
"email": "source.email",
"created_at": F.current_timestamp(),
"updated_at": F.current_timestamp()
})
.execute()
)
# 条件 MERGE(只更新特定条件)
(delta_table
.alias("target")
.merge(
updates_df.alias("source"),
"target.user_id = source.user_id AND target.status = 'active'"
)
.whenMatchedUpdate(set={
"last_login": "source.last_login"
})
.whenNotMatchedInsert(values={
"user_id": "source.user_id",
"status": "active",
"last_login": "source.last_login"
})
.execute()
)
OPTIMIZE 和 Z-Ordering
# OPTIMIZE: 合并小文件
(delta_table
.optimize()
.executeCompaction()
)
# OPTIMIZE + Z-Ordering: 对常用查询列进行 z-order
(delta_table
.optimize()
.zOrderBy("user_id", "event_type")
.executeCompaction()
)
# 查看优化效果
spark.sql("DESCRIBE DETAIL events").show()
Z-Ordering 原理:
传统分区: Z-Ordering:
┌─────────┐ ┌─────────┐
│ user=1 │ │ user=1 │
│ user=1 │ → │ user=1 │
│ user=2 │ │ user=2 │
│ user=2 │ │ user=2 │
└─────────┘ └─────────┘
查询 user=1 时:
- 传统: 扫描整个文件
- Z-Order: 只扫描相关文件块
Schema Evolution
# 自动 schema evolution
df_new_schema.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("dbfs:/data/events/")
# 手动 schema evolution
delta_table = DeltaTable.forPath(spark, "dbfs:/data/events/")
delta_table.alias("target") \
.merge(new_df.alias("source"), "1=1") \
.whenNotMatchedInsertAll() \
.execute()
# 查看 schema 变更历史
spark.sql("DESCRIBE HISTORY events").show()
VO Round 3:System Design — Lakehouse 架构
这一轮由一位 Principal Engineer 进行,60 分钟。
题目:设计 Lakehouse 数据平台
设计一个 Lakehouse 平台,支持:
- 批流统一处理
- 数据科学和 BI 分析
- ML 特征存储
- 数据治理
我的架构设计:
┌─────────────────────────────────────────────────────────────────┐
│ Data Sources │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Databases│ │ SaaS │ │ IoT │ │ Social Media │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
└───────┼──────────────┼──────────────┼──────────────┼────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Databricks Auto Loader (cloudFiles) │ │
│ │ - Incremental loading from cloud storage │ │
│ │ - Schema inference and evolution │ │
│ │ - Support for JSON, Parquet, CSV, AVRO │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Bronze Layer (Raw) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Delta Tables: raw_events, raw_users, raw_products │ │
│ │ - Append-only │ │
│ │ - Minimal transformation │ │
│ │ - Partitioned by date │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Silver Layer (Cleaned) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Delta Tables: cleaned_events, dim_users, dim_products │ │
│ │ - Deduplicated │ │
│ │ - Validated and cleaned │ │
│ │ - SCD Type 2 for dimensions │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Gold Layer (Aggregated) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Delta Tables: daily_stats, user_metrics, product_metrics │ │
│ │ - Aggregated for BI dashboards │ │
│ │ - Optimized with Z-Ordering │ │
│ │ - Materialized views for common queries │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Consumption Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ BI Tools │ │ ML │ │ Real-time│ │ Data Apps │ │
│ │ (Tableau)│ │ (MLflow) │ │ (Delta )│ │ (SQL Ware- │ │
│ │ │ │ │ │ Stream )│ │ house) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
关键技术决策:
| 组件 | 选择 | 原因 |
|---|---|---|
| 存储 | Delta Lake | ACID、Time Travel、Schema Evolution |
| 批处理 | Spark Batch | 成熟稳定,生态丰富 |
| 流处理 | Structured Streaming | 与批处理统一 API |
| ML | MLflow | 实验追踪、模型注册 |
| 调度 | Databricks Workflows | 原生集成,自动缩放 |
| 治理 | Unity Catalog | 统一元数据、权限管理 |
Auto Loader 示例:
# 增量加载
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "dbfs:/schemas/events/")
.option("cloudFiles.inferColumnTypes", "true")
.load("abfss://container@storage.dfs.core.windows.net/raw/events/")
)
# 写入 Bronze 层
(df.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/checkpoints/events/")
.trigger(processingTime="1 minute")
.start("dbfs:/lakehouse/bronze/events/")
)
Unity Catalog 权限管理:
-- 创建 Catalog 和 Schema
CREATE CATALOG IF NOT EXISTS prod;
USE CATALOG prod;
CREATE SCHEMA IF NOT EXISTS lakehouse;
-- 创建表
CREATE TABLE IF NOT EXISTS lakehouse.events (
event_id STRING,
user_id STRING,
event_type STRING,
event_time TIMESTAMP
);
-- 授予权限
GRANT SELECT ON TABLE lakehouse.events TO `data_analysts`;
GRANT MODIFY ON TABLE lakehouse.events TO `data_engineers`;
GRANT ALL PRIVILEGES ON SCHEMA lakehouse TO `admin`;
-- 列级权限
GRANT SELECT ON COLUMN lakehouse.events.user_id TO `data_analysts`;
-- 不授予敏感列的权限
VO Round 4:Behavioral / Culture Fit
最后一轮由 Hiring Manager 进行,60 分钟。
典型问题
Q1: Why Databricks? Why Lakehouse?
我回答:
- Lakehouse 是未来:结合了 Data Warehouse 的管理性和 Data Lake 的灵活性
- 开源生态:Delta Lake 是开源的,社区活跃
- 技术挑战:在 Spark 核心团队工作,能接触到最前沿的技术
Q2: Describe a time you had to learn a new technology quickly.
我分享了学习 Delta Lake 的经历:
- 官方文档:通读 Delta Lake 文档,理解核心概念
- 动手实践:在个人项目中替换 Parquet 为 Delta
- 深入源码:阅读 Delta 事务日志的实现
- 社区参与:参加 Databricks Community 活动
面试总结
成功经验
- Spark 底层原理:理解 Catalyst Optimizer、Tungsten Execution、AQE
- Delta Lake 特性:ACID、Time Travel、MERGE、Optimize、Z-Ordering
- Lakehouse 架构:Bronze/Silver/Gold 分层设计
- 性能调优:数据倾斜、UDF 优化、分区策略
面试注意事项
技术深度:Databricks 面试官对 Spark 和 Delta Lake 的理解非常深,要准备底层原理。
实战经验:多分享实际项目经验,特别是性能优化和故障排查的经历。
架构思维:System Design 考察的是完整的 Lakehouse 平台设计能力。
推荐阅读
- Spark 性能优化完全指南 — AQE、数据倾斜、UDF 优化
- Delta Lake 最佳实践 — Time Travel、MERGE、Optimize
- Lakehouse 架构设计 — 分层设计、治理、ML 集成
💡 需要面试辅导?
如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。
👉 联系我们 获取专属面试准备方案