Uber 数据工程师面试实录 2026:Kafka + Spark + 实时调度数据管道深度面
Uber Data Engineer 面试真实经历:Kafka 流处理、Spark Streaming、实时调度数据管道、System Design 完整复盘。第一人称面经,含面试官对话与解题思路。
公司:Uber Technologies 岗位:Data Engineer (L5) 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer
2026 年 4 月通过内推投递了 Uber 的 Data Engineer 岗位。整个流程历时约 5 周,包括一轮电话面和四轮 Virtual Onsite。
Uber 的 DE 面试和其他大厂最大的不同在于:极度关注实时数据流和流式处理。Uber 的核心业务(打车、配送、货运)都是实时系统,数据管道需要支撑毫秒级的决策——比如动态定价、司机调度、ETA 预估。面试中 Kafka、Spark Streaming、Flink 这些流处理技术被反复考察。
下面我按时间线完整复盘每一轮面试。
Phone Screen:Kafka + Python 编程
电话面由一位 Uber 的 Senior DE 进行,45 分钟,考察 Kafka 和 Python 数据处理能力。
题目:实时订单数据管道
Uber 每天产生数亿条订单事件,包括司机接单、乘客上车、行程中位置更新、乘客下车、支付完成等。
- 设计 Kafka topic 架构来承载这些事件
- 写 Python 代码消费 Kafka 事件,实时计算每个城市的”进行中订单数”
- 讨论如何处理数据丢失和重复消费
我的解答:
Kafka Topic 设计:
┌─────────────────────────────────────────────────────┐
│ Uber Order Event Topics │
├─────────────────────────────────────────────────────┤
│ │
│ Topic: orders.created │
│ Partition key: city_id │
│ Schema: │
│ - order_id: string │
│ - rider_id: string │
│ - city_id: string │
│ - pickup_location: {lat, lng} │
│ - created_at: timestamp │
│ │
│ Topic: orders.accepted │
│ Partition key: order_id │
│ Schema: │
│ - order_id: string │
│ - driver_id: string │
│ - accepted_at: timestamp │
│ │
│ Topic: orders.location_updates │
│ Partition key: order_id │
│ Schema: │
│ - order_id: string │
│ - lat: float │
│ - lng: float │
│ - speed: float │
│ - timestamp: timestamp │
│ │
│ Topic: orders.completed │
│ Partition key: order_id │
│ Schema: │
│ - order_id: string │
│ - city_id: string │
│ - distance: float │
│ - duration: int │
│ - fare: float │
│ - completed_at: timestamp │
│ │
│ Topic: orders.payment │
│ Partition key: order_id │
│ Schema: │
│ - order_id: string │
│ - payment_method: string │
│ - amount: float │
│ - status: string │
│ - paid_at: timestamp │
│ │
└─────────────────────────────────────────────────────┘
Python 消费代码(实时计算城市订单数):
from kafka import KafkaConsumer
from collections import defaultdict
import json
import threading
import time
class RealtimeOrderTracker:
"""实时订单跟踪器"""
def __init__(self, bootstrap_servers='kafka:9092'):
self.consumer = KafkaConsumer(
'orders.created',
'orders.completed',
bootstrap_servers=bootstrap_servers,
group_id='realtime_tracker',
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
enable_auto_commit=True,
auto_commit_interval_ms=5000
)
# 每个城市的进行中订单集合
self.active_orders = defaultdict(set) # city_id -> set(order_id)
self.lock = threading.Lock()
self.last_update = time.time()
def process_message(self, message):
"""处理 Kafka 消息"""
topic = message.topic
event = message.value
with self.lock:
if topic == 'orders.created':
city_id = event['city_id']
order_id = event['order_id']
self.active_orders[city_id].add(order_id)
elif topic == 'orders.completed':
city_id = event['city_id']
order_id = event['order_id']
self.active_orders[city_id].discard(order_id)
def get_active_orders(self) -> dict:
"""获取当前各城市的进行中订单数"""
with self.lock:
return {city: len(orders) for city, orders in self.active_orders.items()}
def run(self):
"""启动消费循环"""
print("Starting realtime order tracker...")
for message in self.consumer:
try:
self.process_message(message)
except Exception as e:
print(f"Error processing message: {e}")
# 每秒输出一次统计
if time.time() - self.last_update >= 1:
active = self.get_active_orders()
print(f"[{time.strftime('%H:%M:%S')}] Active orders by city:")
for city, count in sorted(active.items(), key=lambda x: -x[1])[:10]:
print(f" {city}: {count}")
self.last_update = time.time()
if __name__ == '__main__':
tracker = RealtimeOrderTracker(bootstrap_servers='kafka:9092')
tracker.run()
面试官追问:
“如何处理数据丢失和重复消费?”
我回答了 Kafka 的几个核心机制:
- Exactly-once 语义:使用
enable.idempotence=true+ 事务性生产者 - Consumer offset 管理:处理完消息后再提交 offset(手动提交)
- 幂等性设计:下游消费者必须设计为幂等——同一个订单事件处理多次结果应该一致
# 幂等消费者示例
class IdempotentConsumer:
"""幂等消费者 - 基于 order_id 去重"""
def __init__(self):
self.processed_ids = set()
self.max_cache_size = 1_000_000
def should_process(self, order_id: str) -> bool:
"""判断是否应该处理这条消息"""
if order_id in self.processed_ids:
return False
self.processed_ids.add(order_id)
# 防止内存溢出:达到上限后清空(依赖下游幂等性)
if len(self.processed_ids) > self.max_cache_size:
self.processed_ids = set(list(self.processed_ids)[-500_000:])
return True
def process(self, event: dict) -> bool:
"""处理事件,返回是否成功"""
if not self.should_process(event['order_id']):
return False
# 实际处理逻辑
# ... 写入数据库 / 聚合计算 / 发送通知
return True
VO Round 1:Spark Streaming + 数据管道
这一轮由一位 Staff DE 进行,60 分钟。
题目 1:Spark Structured Streaming — 实时 ETA 数据管道
Uber 需要实时追踪每个城市的平均 ETA(预估到达时间),用于动态定价和司机调度。设计一个 Spark Structured Streaming 管道。
我的解答:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, window, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
spark = SparkSession.builder \
.appName("RealtimeETAPipeline") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.getOrCreate()
# 定义 Schema
eta_schema = StructType([
StructField("order_id", StringType(), True),
StructField("driver_id", StringType(), True),
StructField("city_id", StringType(), True),
StructField("eta_seconds", IntegerType(), True),
StructField("distance_km", DoubleType(), True),
StructField("timestamp", StringType(), True)
])
# 从 Kafka 读取
eta_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \n .option("subscribe", "orders.eta_updates") \
.option("startingOffsets", "latest") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), eta_schema).alias("data")) \
.select("data.*") \
.withColumn("event_time", to_timestamp(col("timestamp")))
# 5 分钟滑动窗口,10 秒触发
windowed_eta = eta_stream \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes", "1 minute"),
col("city_id")
) \
.agg(
avg("eta_seconds").alias("avg_eta"),
avg("distance_km").alias("avg_distance"),
count("*").alias("sample_count")
)
# 写入 BigQuery / Redshift
query = windowed_eta \
.writeStream \
.format("parquet") \
.option("path", "s3://uber-data/eta-streaming/output/") \
.option("checkpointLocation", "s3://uber-data/eta-streaming/checkpoint/") \
.trigger(processingTime="10 seconds") \
.outputMode("update") \
.start()
query.awaitTermination()
面试官追问:
“如果某个城市的 ETA 数据突然飙升,你怎么判断是真实情况还是数据问题?”
我回答了一个完整的数据质量监控方案:
# 数据质量监控:ETA 异常检测
from pyspark.sql.functions import stddev, mean, when
# 计算历史基线
with spark.sql(f"""
SELECT
city_id,
AVG(eta_seconds) AS baseline_avg,
STDDEV(eta_seconds) AS baseline_stddev
FROM historical_eta_30d
GROUP BY city_id
""").collect() as baselines:
baseline_map = {row['city_id']: (row['baseline_avg'], row['baseline_stddev']) for row in baselines}
# 实时异常检测
def check_eta_anomaly(current_avg, city_id):
if city_id not in baseline_map:
return False
avg, stddev = baseline_map[city_id]
# 如果当前平均值超过基线 3 个标准差,触发告警
threshold = avg + 3 * stddev
return current_avg > threshold
# 在 streaming query 中加入异常检测
windowed_eta = windowed_eta.withColumn(
"is_anomaly",
when(col("avg_eta") > 300, True) # ETA > 5 分钟视为异常
.otherwise(False)
)
题目 2:Python — 行程轨迹数据处理
给定司机的 GPS 轨迹数据,计算每次行程的实际行驶距离和平均速度。
from dataclasses import dataclass
from typing import List
import math
@dataclass
class GPSPoint:
lat: float
lng: float
timestamp: float # Unix timestamp
def haversine_distance(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
"""计算两点之间的 Haversine 距离(公里)"""
R = 6371 # 地球半径(公里)
lat1_r, lat2_r = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlng = math.radians(lng2 - lng1)
a = math.sin(dlat / 2) ** 2 + \
math.cos(lat1_r) * math.cos(lat2_r) * math.sin(dlng / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def calculate_trip_stats(points: List[GPSPoint]) -> dict:
"""计算行程统计信息"""
if len(points) < 2:
return {
'total_distance_km': 0.0,
'duration_seconds': 0.0,
'avg_speed_kmh': 0.0,
'max_speed_kmh': 0.0,
'point_count': len(points)
}
total_distance = 0.0
max_speed = 0.0
for i in range(1, len(points)):
dist = haversine_distance(
points[i-1].lat, points[i-1].lng,
points[i].lat, points[i].lng
)
total_distance += dist
dt = (points[i].timestamp - points[i-1].timestamp) / 3600 # 小时
if dt > 0:
speed = dist / dt
max_speed = max(max_speed, speed)
duration = points[-1].timestamp - points[0].timestamp
avg_speed = total_distance / (duration / 3600) if duration > 0 else 0.0
return {
'total_distance_km': round(total_distance, 2),
'duration_seconds': round(duration, 1),
'avg_speed_kmh': round(avg_speed, 1),
'max_speed_kmh': round(max_speed, 1),
'point_count': len(points)
}
# 使用示例
sample_points = [
GPSPoint(37.7749, -122.4194, 1000),
GPSPoint(37.7751, -122.4190, 1060),
GPSPoint(37.7755, -122.4185, 1120),
GPSPoint(37.7760, -122.4180, 1180),
]
stats = calculate_trip_stats(sample_points)
print(f"距离: {stats['total_distance_km']}km")
print(f"平均速度: {stats['avg_speed_kmh']}km/h")
print(f"最大速度: {stats['max_speed_kmh']}km/h")
面试官追问:
“如果 GPS 数据有噪声(漂移点、缺失点),怎么处理?”
我回答了几种常见的 GPS 数据清洗方法:
def clean_gps_points(points: List[GPSPoint], max_speed_kmh: float = 200) -> List[GPSPoint]:
"""GPS 数据清洗"""
if len(points) < 3:
return points
cleaned = [points[0]]
for i in range(1, len(points)):
prev = cleaned[-1]
curr = points[i]
# 1. 速度异常检测:如果两点间速度超过阈值,认为是漂移点
dist = haversine_distance(prev.lat, prev.lng, curr.lat, curr.lng)
dt = (curr.timestamp - prev.timestamp) / 3600
speed = dist / dt if dt > 0 else 0
if speed > max_speed_kmh:
# 跳过这个漂移点
continue
# 2. 时间异常检测:如果时间间隔太长,说明可能有数据缺失
if dt > 0.5: # 超过 30 分钟
# 用线性插值补点(简单方案)
missing_points = int(dt / (1/3600)) # 每 1 秒一个点
for j in range(1, min(missing_points, 10)): # 最多补 10 个点
ratio = j / missing_points
interpolated = GPSPoint(
lat=prev.lat + (curr.lat - prev.lat) * ratio,
lng=prev.lng + (curr.lng - prev.lng) * ratio,
timestamp=prev.timestamp + (curr.timestamp - prev.timestamp) * ratio
)
cleaned.append(interpolated)
cleaned.append(curr)
return cleaned
VO Round 2:数据建模 + SQL
这一轮由一位 Data Platform 团队的 DE 进行,60 分钟。
题目:设计 Uber 司机的数据仓库模型
Uber 需要分析司机的表现、收入和留存。设计一个数据仓库模型支持以下分析:
- 司机每日/每周/每月的收入统计
- 司机接单率、取消率、评分
- 司机留存分析(新司机 30/60/90 天留存)
- 司机热力图(活跃城市/区域分布)
我的数据模型设计:
-- ============================================
-- 维度表
-- ============================================
-- 司机维度表
CREATE TABLE dim_drivers (
driver_id STRING,
signup_date DATE,
city_id STRING,
vehicle_type STRING, -- sedan / suv / van
vehicle_year INT,
driver_rating FLOAT, -- 当前评分
is_active BOOLEAN,
onboard_date DATE, -- 完成培训日期
first_ride_date DATE -- 首次接单日期
);
-- 时间维度表
CREATE TABLE dim_date (
date_key DATE,
day_of_week INT,
day_name STRING,
month INT,
quarter INT,
year INT,
is_weekend BOOLEAN,
is_holiday BOOLEAN
);
-- 城市维度表
CREATE TABLE dim_city (
city_id STRING,
city_name STRING,
country STRING,
metro_area STRING,
timezone STRING,
population INT
);
-- ============================================
-- 事实表
-- ============================================
-- 订单事实表(粒度:每笔订单)
CREATE TABLE fact_rides (
ride_id STRING,
driver_id STRING,
rider_id STRING,
city_id STRING,
ride_date DATE,
ride_time TIMESTAMP,
pickup_lat FLOAT,
pickup_lng FLOAT,
dropoff_lat FLOAT,
dropoff_lng FLOAT,
distance_km FLOAT,
duration_seconds INT,
base_fare FLOAT,
surge_multiplier FLOAT,
total_fare FLOAT,
driver_earnings FLOAT,
rider_tip FLOAT,
cancellation_by STRING, -- null / driver / rider / system
rider_rating INT, -- 1-5
driver_rating INT, -- 1-5
payment_method STRING,
ride_type STRING -- uberX / comfort / XL
);
-- 司机在线事件表(粒度:每次上线/下线)
CREATE TABLE fact_driver_sessions (
session_id STRING,
driver_id STRING,
city_id STRING,
online_time TIMESTAMP,
offline_time TIMESTAMP,
duration_seconds INT,
rides_completed INT,
earnings_total FLOAT
);
核心分析 SQL:
-- 1. 司机月度收入统计
SELECT
d.driver_id,
d.city_id,
DATE_TRUNC('month', r.ride_date) AS month,
COUNT(r.ride_id) AS total_rides,
SUM(r.driver_earnings) AS monthly_earnings,
SUM(r.rider_tip) AS monthly_tips,
ROUND(AVG(r.driver_rating), 2) AS avg_driver_rating,
ROUND(AVG(r.distance_km), 2) AS avg_ride_distance,
ROUND(AVG(r.duration_seconds) / 60, 1) AS avg_ride_duration_min
FROM fact_rides r
INNER JOIN dim_drivers d ON r.driver_id = d.driver_id
WHERE r.ride_date >= '2025-01-01'
GROUP BY d.driver_id, d.city_id, month
ORDER BY month, monthly_earnings DESC;
-- 2. 司机留存分析(30/60/90 天)
WITH driver_cohort AS (
SELECT
driver_id,
MIN(ride_date) AS cohort_month
FROM fact_rides
GROUP BY driver_id
),
driver_activity AS (
SELECT DISTINCT
driver_id,
DATE_SUB(ride_date, INTERVAL cohort_month MONTH) AS active_month
FROM fact_rides r
INNER JOIN driver_cohort c ON r.driver_id = c.driver_id
)
SELECT
cohort_month,
COUNT(DISTINCT driver_id) AS cohort_size,
COUNT(DISTINCT CASE WHEN active_month = 0 THEN driver_id END) AS retained_0m,
COUNT(DISTINCT CASE WHEN active_month = 1 THEN driver_id END) AS retained_30d,
COUNT(DISTINCT CASE WHEN active_month = 2 THEN driver_id END) AS retained_60d,
COUNT(DISTINCT CASE WHEN active_month = 3 THEN driver_id END) AS retained_90d
FROM driver_activity
GROUP BY cohort_month
ORDER BY cohort_month;
-- 3. 司机热力图(按区域统计活跃度)
SELECT
c.metro_area,
d.city_id,
COUNT(DISTINCT r.driver_id) AS active_drivers,
COUNT(r.ride_id) AS total_rides,
SUM(r.driver_earnings) AS total_driver_earnings,
ROUND(AVG(r.driver_earnings), 2) AS avg_earnings_per_ride
FROM fact_rides r
INNER JOIN dim_city c ON r.city_id = c.city_id
WHERE r.ride_date >= '2026-01-01'
GROUP BY c.metro_area, d.city_id
ORDER BY total_rides DESC;
-- 4. 司机取消率分析
SELECT
DATE(r.ride_date) AS stat_date,
d.city_id,
COUNT(*) AS total_rides,
COUNT(CASE WHEN r.cancellation_by = 'driver' THEN 1 END) AS driver_cancellations,
ROUND(
COUNT(CASE WHEN r.cancellation_by = 'driver' THEN 1 END) * 100.0
/ COUNT(*), 2
) AS driver_cancellation_rate,
COUNT(CASE WHEN r.cancellation_by = 'rider' THEN 1 END) AS rider_cancellations
FROM fact_rides r
INNER JOIN dim_drivers d ON r.driver_id = d.driver_id
WHERE r.ride_date >= '2026-01-01'
GROUP BY stat_date, d.city_id
ORDER BY stat_date, driver_cancellation_rate DESC;
面试官追问:
“这个模型的数据量有多大?怎么优化查询性能?”
我回答:
- 数据量级:Uber 每天约 2000 万 - 3000 万笔订单,月数据量约 6-9 亿条
- 分区策略:
fact_rides按ride_date分区,fact_driver_sessions按日期分区 - 聚类索引:对高频查询维度(
driver_id,city_id)建立聚类 - 聚合表预计算:对常用指标建立日/周/月级别的聚合表
VO Round 3:System Design — 实时调度数据管道
这一轮由一位 Staff Engineer 进行,60 分钟。
题目:设计 Uber 的实时调度数据管道
Uber 需要根据实时供需情况动态调度司机。设计一个数据管道系统,支持:
- 实时追踪每个区域的司机数量和乘客需求
- 预测未来 15 分钟的供需缺口
- 触发司机激励(surge pricing / driver incentives)
- 端到端延迟 < 10 秒
我的架构设计:
┌──────────────────────────────────────────────────────────────┐
│ Data Sources │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Driver GPS │ │ Rider App │ │ Order │ │
│ │ Updates │ │ Requests │ │ Events │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
│ │ │ │ │
└────────┼───────────────┼───────────────┼──────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────────┐
│ Kafka (Event Bus) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ drivers.geo │ │ riders │ │ orders │ │
│ │ _updates │ │ _requests │ │ _events │ │
│ └──────────────┘ └──────────────┘ └──────────────────┘ │
└──────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Flink / Spark Streaming │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Geo Aggregation │ │ Demand Prediction│ │
│ │ (5-min window) │ │ (ML Model) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ Supply-Demand Matching Engine │ │
│ │ (计算供需比,触发 surge / incentive) │ │
│ └──────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Redis (Hot Data)│ │ Kafka │
│ (实时区域状态) │ │ (surge alerts) │
└──────────────────┘ └──────────────────┘
关键设计决策:
- 地理网格:将城市划分为 500m x 500m 的网格,每个网格独立计算供需比
- Flink vs Spark Streaming:选择 Flink,因为它的 event-time 语义和状态管理更适合低延迟场景
- Redis 缓存:每个网格的实时状态存储在 Redis 中,API 查询延迟 < 1ms
# Flink 示例:实时供需计算
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.watermark_strategy import WatermarkStrategy
from datetime import timedelta
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# 司机位置流
driver_stream = (
env
.add_source(KafkaSource("drivers.geo_updates"))
.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(timedelta(seconds=5))
)
.map(lambda x: (x['grid_id'], x['driver_id']))
.key_by(lambda x: x[0])
.window(TumblingEventTimeWindows.of(timedelta(seconds=30)))
.apply(lambda grid_id, elements: {
'grid_id': grid_id,
'driver_count': len(set(e[1] for e in elements)),
'driver_ids': list(set(e[1] for e in elements))
})
)
# 乘客需求流
demand_stream = (
env
.add_source(KafkaSource("riders_requests"))
.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(timedelta(seconds=5))
)
.map(lambda x: (x['grid_id'], x['request_id']))
.key_by(lambda x: x[0])
.window(TumblingEventTimeWindows.of(timedelta(seconds=30)))
.apply(lambda grid_id, elements: {
'grid_id': grid_id,
'demand_count': len(elements)
})
)
# 供需匹配
supply_demand = driver_stream.connect(demand_stream).process(
SupplyDemandProcessFunction()
)
# 输出到 Redis + Kafka
supply_demand.add_sink(RedisSink("grid_status"))
supply_demand.add_sink(KafkaSink("surge_pricing"))
env.execute("Uber Supply-Demand Pipeline")
面试官追问:
“如果某个区域突然涌入大量乘客(比如演唱会结束),系统怎么应对?”
我回答:
- Surge Pricing:供需比超过阈值时自动触发加价,吸引更多司机
- Driver Incentives:向附近空闲司机推送”前往该区域可获得额外奖励”的通知
- 跨区调度:分析周边区域的司机分布,引导空闲司机前往缺口区域
- 预测性调度:基于历史数据,提前预测演唱会结束时间,提前 30 分钟开始调度
VO Round 4:Behavioral
面试官问:
“分享一次你的数据管道出了严重问题的经历。你是怎么定位和修复的?”
我的回答(STAR 框架):
Situation:之前负责一个 ETL 管道,每天凌晨 3 点开始跑,预计 4 小时完成。有一天,业务方发现 T+1 报表数据全没了,管道凌晨 4 点就失败了。
Task:需要在 1 小时内定位问题、修复管道、重新跑数据,同时不影响其他团队的 T+1 出数。
Action:
- 先看日志:发现是 Spark OOM(内存溢出),一个 shuffle task 处理了 50GB 的数据
- 根因分析:某个新接入的数据源产生了大量重复的
user_id,导致数据倾斜 - 快速修复:在 ETL 中加入
dropDuplicates(),并对热点 key 加盐值分散 - 长期方案:建立数据质量监控,在 ETL 开始前检查数据分布
# 数据倾斜修复
from pyspark.sql.functions import concat, lit, rand
# 方法 1: 去重(如果重复是数据质量问题)
df = df.dropDuplicates(["user_id", "event_time"])
# 方法 2: 加盐值分散热点 key
hot_key_threshold = 1_000_000 # 超过这个数量的 key 视为热点
# 找出热点 key
hot_keys = df.groupBy("user_id").count() \
.filter(col("count") > hot_key_threshold) \
.select("user_id") \
.rdd.map(lambda r: r[0]).collect()
# 对热点 key 加盐
def add_salt(row):
if row['user_id'] in hot_keys_set:
return row.withColumn('salt', rand() * 100)
return row
df = df.withColumn(
"user_id_salts",
F.when(F.col("user_id").isin(hot_keys), concat(F.col("user_id"), lit("_"), (rand() * 100).cast("int")))
.otherwise(F.col("user_id"))
) \
.groupBy("user_id_salt") \
.agg(F.count("*").alias("count"))
Result:管道从 4 小时缩短到 1.5 小时,建立了 10+ 条数据质量规则,类似问题再也没有发生过。
面试总结
成功经验
-
Kafka 流处理是 Uber DE 的核心考点:Topic 设计、分区策略、消费组管理、exactly-once 语义、数据去重,这些是面试中的高频考点。
-
Spark Structured Streaming:窗口模型、watermark、stateful processing、checkpoint 机制,要能熟练解释和编码。
-
数据建模能力:Star schema、事实表/维度表设计、分区策略、聚合表预计算,Uber 的数据量级很大,性能优化是必考题。
-
System Design 注重低延迟:Uber 的核心业务需要实时决策,面试中反复考察”端到端延迟”、“状态管理”、“地理网格计算”等实时系统概念。
面试注意事项
准备方向:
- Kafka:Topic 架构、分区策略、消费组、exactly-once、数据去重
- Spark Streaming:窗口、watermark、checkpoint、状态管理
- SQL:窗口函数、分区优化、聚合表设计
- System Design:实时数据管道、地理网格计算、供需匹配
面试技巧:
- Uber 的面试官很看重工程实践——不要只讲理论,要给具体的代码和实现
- 对于流处理题,先说窗口模型和 watermark 策略,再给代码
- 对于 System Design 题,先 clarify 延迟和吞吐要求,再设计方案
推荐阅读
- Uber 面试全流程指南 — Uber 面试流程、高频题目与准备策略
- System Design 面试完全攻略 — 分布式系统设计的核心原则与高频题目
- 行为面试 STAR 故事模板 — Leadership、决策、冲突解决等高频行为问题的回答框架
💡 需要面试辅导?
如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。
👉 联系我们 获取专属面试准备方案