Uber 数据工程师面试实录 2026:Kafka + Spark + 实时调度数据管道深度面
uberdata-engineerinterviewkafkasparkstreamingpythonsystem-design

Uber 数据工程师面试实录 2026:Kafka + Spark + 实时调度数据管道深度面

Uber Data Engineer 面试真实经历:Kafka 流处理、Spark Streaming、实时调度数据管道、System Design 完整复盘。第一人称面经,含面试官对话与解题思路。

Sam · · 16 分钟阅读

公司:Uber Technologies 岗位:Data Engineer (L5) 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer

2026 年 4 月通过内推投递了 Uber 的 Data Engineer 岗位。整个流程历时约 5 周,包括一轮电话面和四轮 Virtual Onsite。

Uber 的 DE 面试和其他大厂最大的不同在于:极度关注实时数据流和流式处理。Uber 的核心业务(打车、配送、货运)都是实时系统,数据管道需要支撑毫秒级的决策——比如动态定价、司机调度、ETA 预估。面试中 Kafka、Spark Streaming、Flink 这些流处理技术被反复考察。

下面我按时间线完整复盘每一轮面试。


Phone Screen:Kafka + Python 编程

电话面由一位 Uber 的 Senior DE 进行,45 分钟,考察 Kafka 和 Python 数据处理能力。

题目:实时订单数据管道

Uber 每天产生数亿条订单事件,包括司机接单、乘客上车、行程中位置更新、乘客下车、支付完成等。

  1. 设计 Kafka topic 架构来承载这些事件
  2. 写 Python 代码消费 Kafka 事件,实时计算每个城市的”进行中订单数”
  3. 讨论如何处理数据丢失和重复消费

我的解答:

Kafka Topic 设计:

┌─────────────────────────────────────────────────────┐
│              Uber Order Event Topics                 │
├─────────────────────────────────────────────────────┤
│                                                     │
│  Topic: orders.created                              │
│  Partition key: city_id                             │
│  Schema:                                            │
│    - order_id: string                                │
│    - rider_id: string                                │
│    - city_id: string                                 │
│    - pickup_location: {lat, lng}                     │
│    - created_at: timestamp                           │
│                                                     │
│  Topic: orders.accepted                             │
│  Partition key: order_id                            │
│  Schema:                                            │
│    - order_id: string                                │
│    - driver_id: string                               │
│    - accepted_at: timestamp                          │
│                                                     │
│  Topic: orders.location_updates                     │
│  Partition key: order_id                            │
│  Schema:                                            │
│    - order_id: string                                │
│    - lat: float                                      │
│    - lng: float                                      │
│    - speed: float                                    │
│    - timestamp: timestamp                            │
│                                                     │
│  Topic: orders.completed                            │
│  Partition key: order_id                            │
│  Schema:                                            │
│    - order_id: string                                │
│    - city_id: string                                 │
│    - distance: float                                 │
│    - duration: int                                   │
│    - fare: float                                     │
│    - completed_at: timestamp                         │
│                                                     │
│  Topic: orders.payment                              │
│  Partition key: order_id                            │
│  Schema:                                            │
│    - order_id: string                                │
│    - payment_method: string                          │
│    - amount: float                                   │
│    - status: string                                  │
│    - paid_at: timestamp                              │
│                                                     │
└─────────────────────────────────────────────────────┘

Python 消费代码(实时计算城市订单数):

from kafka import KafkaConsumer
from collections import defaultdict
import json
import threading
import time

class RealtimeOrderTracker:
    """实时订单跟踪器"""

    def __init__(self, bootstrap_servers='kafka:9092'):
        self.consumer = KafkaConsumer(
            'orders.created',
            'orders.completed',
            bootstrap_servers=bootstrap_servers,
            group_id='realtime_tracker',
            auto_offset_reset='latest',
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            enable_auto_commit=True,
            auto_commit_interval_ms=5000
        )
        # 每个城市的进行中订单集合
        self.active_orders = defaultdict(set)  # city_id -> set(order_id)
        self.lock = threading.Lock()
        self.last_update = time.time()

    def process_message(self, message):
        """处理 Kafka 消息"""
        topic = message.topic
        event = message.value

        with self.lock:
            if topic == 'orders.created':
                city_id = event['city_id']
                order_id = event['order_id']
                self.active_orders[city_id].add(order_id)

            elif topic == 'orders.completed':
                city_id = event['city_id']
                order_id = event['order_id']
                self.active_orders[city_id].discard(order_id)

    def get_active_orders(self) -> dict:
        """获取当前各城市的进行中订单数"""
        with self.lock:
            return {city: len(orders) for city, orders in self.active_orders.items()}

    def run(self):
        """启动消费循环"""
        print("Starting realtime order tracker...")

        for message in self.consumer:
            try:
                self.process_message(message)
            except Exception as e:
                print(f"Error processing message: {e}")

            # 每秒输出一次统计
            if time.time() - self.last_update >= 1:
                active = self.get_active_orders()
                print(f"[{time.strftime('%H:%M:%S')}] Active orders by city:")
                for city, count in sorted(active.items(), key=lambda x: -x[1])[:10]:
                    print(f"  {city}: {count}")
                self.last_update = time.time()


if __name__ == '__main__':
    tracker = RealtimeOrderTracker(bootstrap_servers='kafka:9092')
    tracker.run()

面试官追问:

“如何处理数据丢失和重复消费?”

我回答了 Kafka 的几个核心机制:

  1. Exactly-once 语义:使用 enable.idempotence=true + 事务性生产者
  2. Consumer offset 管理:处理完消息后再提交 offset(手动提交)
  3. 幂等性设计:下游消费者必须设计为幂等——同一个订单事件处理多次结果应该一致
# 幂等消费者示例
class IdempotentConsumer:
    """幂等消费者 - 基于 order_id 去重"""

    def __init__(self):
        self.processed_ids = set()
        self.max_cache_size = 1_000_000

    def should_process(self, order_id: str) -> bool:
        """判断是否应该处理这条消息"""
        if order_id in self.processed_ids:
            return False

        self.processed_ids.add(order_id)

        # 防止内存溢出:达到上限后清空(依赖下游幂等性)
        if len(self.processed_ids) > self.max_cache_size:
            self.processed_ids = set(list(self.processed_ids)[-500_000:])

        return True

    def process(self, event: dict) -> bool:
        """处理事件,返回是否成功"""
        if not self.should_process(event['order_id']):
            return False

        # 实际处理逻辑
        # ... 写入数据库 / 聚合计算 / 发送通知
        return True

VO Round 1:Spark Streaming + 数据管道

这一轮由一位 Staff DE 进行,60 分钟。

题目 1:Spark Structured Streaming — 实时 ETA 数据管道

Uber 需要实时追踪每个城市的平均 ETA(预估到达时间),用于动态定价和司机调度。设计一个 Spark Structured Streaming 管道。

我的解答:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, window, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

spark = SparkSession.builder \
    .appName("RealtimeETAPipeline") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .getOrCreate()

# 定义 Schema
eta_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("driver_id", StringType(), True),
    StructField("city_id", StringType(), True),
    StructField("eta_seconds", IntegerType(), True),
    StructField("distance_km", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

# 从 Kafka 读取
eta_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \n    .option("subscribe", "orders.eta_updates") \
    .option("startingOffsets", "latest") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), eta_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", to_timestamp(col("timestamp")))

# 5 分钟滑动窗口,10 秒触发
windowed_eta = eta_stream \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes", "1 minute"),
        col("city_id")
    ) \
    .agg(
        avg("eta_seconds").alias("avg_eta"),
        avg("distance_km").alias("avg_distance"),
        count("*").alias("sample_count")
    )

# 写入 BigQuery / Redshift
query = windowed_eta \
    .writeStream \
    .format("parquet") \
    .option("path", "s3://uber-data/eta-streaming/output/") \
    .option("checkpointLocation", "s3://uber-data/eta-streaming/checkpoint/") \
    .trigger(processingTime="10 seconds") \
    .outputMode("update") \
    .start()

query.awaitTermination()

面试官追问:

“如果某个城市的 ETA 数据突然飙升,你怎么判断是真实情况还是数据问题?”

我回答了一个完整的数据质量监控方案:

# 数据质量监控:ETA 异常检测
from pyspark.sql.functions import stddev, mean, when

# 计算历史基线
with spark.sql(f"""
    SELECT
        city_id,
        AVG(eta_seconds) AS baseline_avg,
        STDDEV(eta_seconds) AS baseline_stddev
    FROM historical_eta_30d
    GROUP BY city_id
""").collect() as baselines:
    baseline_map = {row['city_id']: (row['baseline_avg'], row['baseline_stddev']) for row in baselines}

# 实时异常检测
def check_eta_anomaly(current_avg, city_id):
    if city_id not in baseline_map:
        return False

    avg, stddev = baseline_map[city_id]
    # 如果当前平均值超过基线 3 个标准差,触发告警
    threshold = avg + 3 * stddev
    return current_avg > threshold

# 在 streaming query 中加入异常检测
windowed_eta = windowed_eta.withColumn(
    "is_anomaly",
    when(col("avg_eta") > 300, True)  # ETA > 5 分钟视为异常
    .otherwise(False)
)

题目 2:Python — 行程轨迹数据处理

给定司机的 GPS 轨迹数据,计算每次行程的实际行驶距离和平均速度。

from dataclasses import dataclass
from typing import List
import math

@dataclass
class GPSPoint:
    lat: float
    lng: float
    timestamp: float  # Unix timestamp

def haversine_distance(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
    """计算两点之间的 Haversine 距离(公里)"""
    R = 6371  # 地球半径(公里)

    lat1_r, lat2_r = math.radians(lat1), math.radians(lat2)
    dlat = math.radians(lat2 - lat1)
    dlng = math.radians(lng2 - lng1)

    a = math.sin(dlat / 2) ** 2 + \
        math.cos(lat1_r) * math.cos(lat2_r) * math.sin(dlng / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    return R * c

def calculate_trip_stats(points: List[GPSPoint]) -> dict:
    """计算行程统计信息"""
    if len(points) < 2:
        return {
            'total_distance_km': 0.0,
            'duration_seconds': 0.0,
            'avg_speed_kmh': 0.0,
            'max_speed_kmh': 0.0,
            'point_count': len(points)
        }

    total_distance = 0.0
    max_speed = 0.0

    for i in range(1, len(points)):
        dist = haversine_distance(
            points[i-1].lat, points[i-1].lng,
            points[i].lat, points[i].lng
        )
        total_distance += dist

        dt = (points[i].timestamp - points[i-1].timestamp) / 3600  # 小时
        if dt > 0:
            speed = dist / dt
            max_speed = max(max_speed, speed)

    duration = points[-1].timestamp - points[0].timestamp
    avg_speed = total_distance / (duration / 3600) if duration > 0 else 0.0

    return {
        'total_distance_km': round(total_distance, 2),
        'duration_seconds': round(duration, 1),
        'avg_speed_kmh': round(avg_speed, 1),
        'max_speed_kmh': round(max_speed, 1),
        'point_count': len(points)
    }


# 使用示例
sample_points = [
    GPSPoint(37.7749, -122.4194, 1000),
    GPSPoint(37.7751, -122.4190, 1060),
    GPSPoint(37.7755, -122.4185, 1120),
    GPSPoint(37.7760, -122.4180, 1180),
]

stats = calculate_trip_stats(sample_points)
print(f"距离: {stats['total_distance_km']}km")
print(f"平均速度: {stats['avg_speed_kmh']}km/h")
print(f"最大速度: {stats['max_speed_kmh']}km/h")

面试官追问:

“如果 GPS 数据有噪声(漂移点、缺失点),怎么处理?”

我回答了几种常见的 GPS 数据清洗方法:

def clean_gps_points(points: List[GPSPoint], max_speed_kmh: float = 200) -> List[GPSPoint]:
    """GPS 数据清洗"""
    if len(points) < 3:
        return points

    cleaned = [points[0]]

    for i in range(1, len(points)):
        prev = cleaned[-1]
        curr = points[i]

        # 1. 速度异常检测:如果两点间速度超过阈值,认为是漂移点
        dist = haversine_distance(prev.lat, prev.lng, curr.lat, curr.lng)
        dt = (curr.timestamp - prev.timestamp) / 3600
        speed = dist / dt if dt > 0 else 0

        if speed > max_speed_kmh:
            # 跳过这个漂移点
            continue

        # 2. 时间异常检测:如果时间间隔太长,说明可能有数据缺失
        if dt > 0.5:  # 超过 30 分钟
            # 用线性插值补点(简单方案)
            missing_points = int(dt / (1/3600))  # 每 1 秒一个点
            for j in range(1, min(missing_points, 10)):  # 最多补 10 个点
                ratio = j / missing_points
                interpolated = GPSPoint(
                    lat=prev.lat + (curr.lat - prev.lat) * ratio,
                    lng=prev.lng + (curr.lng - prev.lng) * ratio,
                    timestamp=prev.timestamp + (curr.timestamp - prev.timestamp) * ratio
                )
                cleaned.append(interpolated)

        cleaned.append(curr)

    return cleaned

VO Round 2:数据建模 + SQL

这一轮由一位 Data Platform 团队的 DE 进行,60 分钟。

题目:设计 Uber 司机的数据仓库模型

Uber 需要分析司机的表现、收入和留存。设计一个数据仓库模型支持以下分析:

  1. 司机每日/每周/每月的收入统计
  2. 司机接单率、取消率、评分
  3. 司机留存分析(新司机 30/60/90 天留存)
  4. 司机热力图(活跃城市/区域分布)

我的数据模型设计:

-- ============================================
-- 维度表
-- ============================================

-- 司机维度表
CREATE TABLE dim_drivers (
    driver_id         STRING,
    signup_date       DATE,
    city_id           STRING,
    vehicle_type      STRING,    -- sedan / suv / van
    vehicle_year      INT,
    driver_rating     FLOAT,     -- 当前评分
    is_active         BOOLEAN,
    onboard_date      DATE,      -- 完成培训日期
    first_ride_date   DATE       -- 首次接单日期
);

-- 时间维度表
CREATE TABLE dim_date (
    date_key          DATE,
    day_of_week       INT,
    day_name          STRING,
    month             INT,
    quarter           INT,
    year              INT,
    is_weekend        BOOLEAN,
    is_holiday        BOOLEAN
);

-- 城市维度表
CREATE TABLE dim_city (
    city_id           STRING,
    city_name         STRING,
    country           STRING,
    metro_area        STRING,
    timezone          STRING,
    population        INT
);

-- ============================================
-- 事实表
-- ============================================

-- 订单事实表(粒度:每笔订单)
CREATE TABLE fact_rides (
    ride_id           STRING,
    driver_id         STRING,
    rider_id          STRING,
    city_id           STRING,
    ride_date         DATE,
    ride_time         TIMESTAMP,
    pickup_lat        FLOAT,
    pickup_lng        FLOAT,
    dropoff_lat       FLOAT,
    dropoff_lng       FLOAT,
    distance_km       FLOAT,
    duration_seconds  INT,
    base_fare         FLOAT,
    surge_multiplier  FLOAT,
    total_fare        FLOAT,
    driver_earnings   FLOAT,
    rider_tip         FLOAT,
    cancellation_by   STRING,    -- null / driver / rider / system
    rider_rating      INT,       -- 1-5
    driver_rating     INT,       -- 1-5
    payment_method    STRING,
    ride_type         STRING     -- uberX / comfort / XL
);

-- 司机在线事件表(粒度:每次上线/下线)
CREATE TABLE fact_driver_sessions (
    session_id        STRING,
    driver_id         STRING,
    city_id           STRING,
    online_time       TIMESTAMP,
    offline_time      TIMESTAMP,
    duration_seconds  INT,
    rides_completed   INT,
    earnings_total    FLOAT
);

核心分析 SQL:

-- 1. 司机月度收入统计
SELECT
    d.driver_id,
    d.city_id,
    DATE_TRUNC('month', r.ride_date) AS month,
    COUNT(r.ride_id) AS total_rides,
    SUM(r.driver_earnings) AS monthly_earnings,
    SUM(r.rider_tip) AS monthly_tips,
    ROUND(AVG(r.driver_rating), 2) AS avg_driver_rating,
    ROUND(AVG(r.distance_km), 2) AS avg_ride_distance,
    ROUND(AVG(r.duration_seconds) / 60, 1) AS avg_ride_duration_min
FROM fact_rides r
INNER JOIN dim_drivers d ON r.driver_id = d.driver_id
WHERE r.ride_date >= '2025-01-01'
GROUP BY d.driver_id, d.city_id, month
ORDER BY month, monthly_earnings DESC;

-- 2. 司机留存分析(30/60/90 天)
WITH driver_cohort AS (
    SELECT
        driver_id,
        MIN(ride_date) AS cohort_month
    FROM fact_rides
    GROUP BY driver_id
),
driver_activity AS (
    SELECT DISTINCT
        driver_id,
        DATE_SUB(ride_date, INTERVAL cohort_month MONTH) AS active_month
    FROM fact_rides r
    INNER JOIN driver_cohort c ON r.driver_id = c.driver_id
)
SELECT
    cohort_month,
    COUNT(DISTINCT driver_id) AS cohort_size,
    COUNT(DISTINCT CASE WHEN active_month = 0 THEN driver_id END) AS retained_0m,
    COUNT(DISTINCT CASE WHEN active_month = 1 THEN driver_id END) AS retained_30d,
    COUNT(DISTINCT CASE WHEN active_month = 2 THEN driver_id END) AS retained_60d,
    COUNT(DISTINCT CASE WHEN active_month = 3 THEN driver_id END) AS retained_90d
FROM driver_activity
GROUP BY cohort_month
ORDER BY cohort_month;

-- 3. 司机热力图(按区域统计活跃度)
SELECT
    c.metro_area,
    d.city_id,
    COUNT(DISTINCT r.driver_id) AS active_drivers,
    COUNT(r.ride_id) AS total_rides,
    SUM(r.driver_earnings) AS total_driver_earnings,
    ROUND(AVG(r.driver_earnings), 2) AS avg_earnings_per_ride
FROM fact_rides r
INNER JOIN dim_city c ON r.city_id = c.city_id
WHERE r.ride_date >= '2026-01-01'
GROUP BY c.metro_area, d.city_id
ORDER BY total_rides DESC;

-- 4. 司机取消率分析
SELECT
    DATE(r.ride_date) AS stat_date,
    d.city_id,
    COUNT(*) AS total_rides,
    COUNT(CASE WHEN r.cancellation_by = 'driver' THEN 1 END) AS driver_cancellations,
    ROUND(
        COUNT(CASE WHEN r.cancellation_by = 'driver' THEN 1 END) * 100.0
        / COUNT(*), 2
    ) AS driver_cancellation_rate,
    COUNT(CASE WHEN r.cancellation_by = 'rider' THEN 1 END) AS rider_cancellations
FROM fact_rides r
INNER JOIN dim_drivers d ON r.driver_id = d.driver_id
WHERE r.ride_date >= '2026-01-01'
GROUP BY stat_date, d.city_id
ORDER BY stat_date, driver_cancellation_rate DESC;

面试官追问:

“这个模型的数据量有多大?怎么优化查询性能?”

我回答:

  • 数据量级:Uber 每天约 2000 万 - 3000 万笔订单,月数据量约 6-9 亿条
  • 分区策略fact_ridesride_date 分区,fact_driver_sessions 按日期分区
  • 聚类索引:对高频查询维度(driver_id, city_id)建立聚类
  • 聚合表预计算:对常用指标建立日/周/月级别的聚合表

VO Round 3:System Design — 实时调度数据管道

这一轮由一位 Staff Engineer 进行,60 分钟。

题目:设计 Uber 的实时调度数据管道

Uber 需要根据实时供需情况动态调度司机。设计一个数据管道系统,支持:

  1. 实时追踪每个区域的司机数量和乘客需求
  2. 预测未来 15 分钟的供需缺口
  3. 触发司机激励(surge pricing / driver incentives)
  4. 端到端延迟 < 10 秒

我的架构设计:

┌──────────────────────────────────────────────────────────────┐
│                      Data Sources                             │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐              │
│  │ Driver GPS  │  │ Rider App  │  │ Order      │              │
│  │ Updates     │  │ Requests   │  │ Events     │              │
│  └─────┬──────┘  └─────┬──────┘  └─────┬──────┘              │
│        │               │               │                      │
└────────┼───────────────┼───────────────┼──────────────────────┘
         │               │               │
         ▼               ▼               ▼
┌──────────────────────────────────────────────────────────────┐
│                    Kafka (Event Bus)                          │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐   │
│  │ drivers.geo  │  │ riders       │  │ orders           │   │
│  │ _updates     │  │ _requests    │  │ _events          │   │
│  └──────────────┘  └──────────────┘  └──────────────────┘   │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│                 Flink / Spark Streaming                       │
│                                                               │
│  ┌──────────────────┐  ┌──────────────────┐                 │
│  │ Geo Aggregation  │  │ Demand Prediction│                 │
│  │ (5-min window)   │  │ (ML Model)       │                 │
│  └────────┬─────────┘  └────────┬─────────┘                 │
│           │                     │                            │
│           ▼                     ▼                            │
│  ┌──────────────────────────────────────────┐               │
│  │  Supply-Demand Matching Engine           │               │
│  │  (计算供需比,触发 surge / incentive)     │               │
│  └──────────────────────────────────────────┘               │
└──────────────────────────────────────────────────────────────┘
         │                              │
         ▼                              ▼
┌──────────────────┐        ┌──────────────────┐
│  Redis (Hot Data)│        │  Kafka           │
│  (实时区域状态)   │        │  (surge alerts)  │
└──────────────────┘        └──────────────────┘

关键设计决策:

  1. 地理网格:将城市划分为 500m x 500m 的网格,每个网格独立计算供需比
  2. Flink vs Spark Streaming:选择 Flink,因为它的 event-time 语义和状态管理更适合低延迟场景
  3. Redis 缓存:每个网格的实时状态存储在 Redis 中,API 查询延迟 < 1ms
# Flink 示例:实时供需计算
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.watermark_strategy import WatermarkStrategy
from datetime import timedelta

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# 司机位置流
driver_stream = (
    env
    .add_source(KafkaSource("drivers.geo_updates"))
    .assign_timestamps_and_watermarks(
        WatermarkStrategy.for_bounded_out_of_orderness(timedelta(seconds=5))
    )
    .map(lambda x: (x['grid_id'], x['driver_id']))
    .key_by(lambda x: x[0])
    .window(TumblingEventTimeWindows.of(timedelta(seconds=30)))
    .apply(lambda grid_id, elements: {
        'grid_id': grid_id,
        'driver_count': len(set(e[1] for e in elements)),
        'driver_ids': list(set(e[1] for e in elements))
    })
)

# 乘客需求流
demand_stream = (
    env
    .add_source(KafkaSource("riders_requests"))
    .assign_timestamps_and_watermarks(
        WatermarkStrategy.for_bounded_out_of_orderness(timedelta(seconds=5))
    )
    .map(lambda x: (x['grid_id'], x['request_id']))
    .key_by(lambda x: x[0])
    .window(TumblingEventTimeWindows.of(timedelta(seconds=30)))
    .apply(lambda grid_id, elements: {
        'grid_id': grid_id,
        'demand_count': len(elements)
    })
)

# 供需匹配
supply_demand = driver_stream.connect(demand_stream).process(
    SupplyDemandProcessFunction()
)

# 输出到 Redis + Kafka
supply_demand.add_sink(RedisSink("grid_status"))
supply_demand.add_sink(KafkaSink("surge_pricing"))

env.execute("Uber Supply-Demand Pipeline")

面试官追问:

“如果某个区域突然涌入大量乘客(比如演唱会结束),系统怎么应对?”

我回答:

  1. Surge Pricing:供需比超过阈值时自动触发加价,吸引更多司机
  2. Driver Incentives:向附近空闲司机推送”前往该区域可获得额外奖励”的通知
  3. 跨区调度:分析周边区域的司机分布,引导空闲司机前往缺口区域
  4. 预测性调度:基于历史数据,提前预测演唱会结束时间,提前 30 分钟开始调度

VO Round 4:Behavioral

面试官问:

“分享一次你的数据管道出了严重问题的经历。你是怎么定位和修复的?”

我的回答(STAR 框架):

Situation:之前负责一个 ETL 管道,每天凌晨 3 点开始跑,预计 4 小时完成。有一天,业务方发现 T+1 报表数据全没了,管道凌晨 4 点就失败了。

Task:需要在 1 小时内定位问题、修复管道、重新跑数据,同时不影响其他团队的 T+1 出数。

Action

  1. 先看日志:发现是 Spark OOM(内存溢出),一个 shuffle task 处理了 50GB 的数据
  2. 根因分析:某个新接入的数据源产生了大量重复的 user_id,导致数据倾斜
  3. 快速修复:在 ETL 中加入 dropDuplicates(),并对热点 key 加盐值分散
  4. 长期方案:建立数据质量监控,在 ETL 开始前检查数据分布
# 数据倾斜修复
from pyspark.sql.functions import concat, lit, rand

# 方法 1: 去重(如果重复是数据质量问题)
df = df.dropDuplicates(["user_id", "event_time"])

# 方法 2: 加盐值分散热点 key
hot_key_threshold = 1_000_000  # 超过这个数量的 key 视为热点

# 找出热点 key
hot_keys = df.groupBy("user_id").count() \
    .filter(col("count") > hot_key_threshold) \
    .select("user_id") \
    .rdd.map(lambda r: r[0]).collect()

# 对热点 key 加盐
def add_salt(row):
    if row['user_id'] in hot_keys_set:
        return row.withColumn('salt', rand() * 100)
    return row

df = df.withColumn(
    "user_id_salts",
    F.when(F.col("user_id").isin(hot_keys), concat(F.col("user_id"), lit("_"), (rand() * 100).cast("int")))
    .otherwise(F.col("user_id"))
) \
.groupBy("user_id_salt") \
.agg(F.count("*").alias("count"))

Result:管道从 4 小时缩短到 1.5 小时,建立了 10+ 条数据质量规则,类似问题再也没有发生过。


面试总结

成功经验

  1. Kafka 流处理是 Uber DE 的核心考点:Topic 设计、分区策略、消费组管理、exactly-once 语义、数据去重,这些是面试中的高频考点。

  2. Spark Structured Streaming:窗口模型、watermark、stateful processing、checkpoint 机制,要能熟练解释和编码。

  3. 数据建模能力:Star schema、事实表/维度表设计、分区策略、聚合表预计算,Uber 的数据量级很大,性能优化是必考题。

  4. System Design 注重低延迟:Uber 的核心业务需要实时决策,面试中反复考察”端到端延迟”、“状态管理”、“地理网格计算”等实时系统概念。

面试注意事项

准备方向

  • Kafka:Topic 架构、分区策略、消费组、exactly-once、数据去重
  • Spark Streaming:窗口、watermark、checkpoint、状态管理
  • SQL:窗口函数、分区优化、聚合表设计
  • System Design:实时数据管道、地理网格计算、供需匹配

面试技巧

  • Uber 的面试官很看重工程实践——不要只讲理论,要给具体的代码和实现
  • 对于流处理题,先说窗口模型和 watermark 策略,再给代码
  • 对于 System Design 题,先 clarify 延迟和吞吐要求,再设计方案

推荐阅读


💡 需要面试辅导?

如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。

👉 联系我们 获取专属面试准备方案

准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们