TikTok Data Engineer 面经 | 推荐系统数据管道 + SQL 分析 + 实时流处理
tiktokdata-engineer数据工程推荐系统sqlsparkflink实时流处理面试面经

TikTok Data Engineer 面经 | 推荐系统数据管道 + SQL 分析 + 实时流处理

TikTok Data Engineer 完整面试经历:推荐系统数据管道设计、短视频指标分析 SQL、Flink 实时流处理,附 Python/SQL/PySpark 代码和架构图。

Sam · · 16 分钟阅读

公司: TikTok

岗位: Data Engineer (推荐系统方向)

面试形式: OA (1h, 4 题) → Phone Screen → VO (4 rounds)

地点: 洛杉矶 / 远程

结果: Offer ✅


面试流程

TikTok 的 DE 面试在北美大厂里算节奏最快的——从 OA 到 offer 总共不到两周。他们的面试风格跟 Meta 有点像,但更偏重数据管道设计推荐系统业务场景

流程拆解:

  1. OA (HackerRank, 1h 4 题) — 算法题,难度中等
  2. Phone Screen (45min) — 1 道 coding + 1 道 SQL
  3. VO Round 1: SQL + Data Analysis (45min) — 短视频指标分析
  4. VO Round 2: Data Pipeline Design (45min) — 推荐特征管道
  5. VO Round 3: System Design (60min) — 实时推荐数据平台
  6. VO Round 4: BQ + HM (30min) — 行为面 + 团队匹配

OA:四道题一小时

TikTok 的 OA 是标准的 HackerRank 格式,1 小时 4 道题。跟其他公司不同,TikTok 的 OA 偏算法而非 SQL,但题目不算难。

题目 1: 视频观看序列分析(数组 + 前缀和)

给定一个用户观看视频的时间序列 [watch_duration_1, watch_duration_2, ...],找出连续观看总时长超过阈值 K 的最短子序列长度。

def shortest_watch_session(durations: list[int], threshold: int) -> int:
    """
    Find the shortest contiguous subarray with sum >= threshold.
    Uses sliding window approach for O(n) solution.
    """
    if not durations:
        return -1

    left = 0
    current_sum = 0
    min_length = float('inf')

    for right in range(len(durations)):
        current_sum += durations[right]

        while current_sum >= threshold:
            min_length = min(min_length, right - left + 1)
            current_sum -= durations[left]
            left += 1

    return min_length if min_length != float('inf') else -1

# Example: Find shortest session with total watch time >= 60 seconds
durations = [15, 25, 10, 30, 20, 40, 5]
print(shortest_watch_session(durations, 60))  # Output: 3 (25+10+30=65)

题目 2: A/B Test 分组(哈希 + 一致性哈希)

设计一个函数,根据用户 ID 将用户分配到 A/B 测试的不同组别,保证同一用户始终分配到同一组。

import hashlib
from typing import Dict, List

class ABTestAssigner:
    """
    Consistent hash-based A/B test assignment.
    Guarantees same user always gets same group.
    """

    def __init__(self, experiment_name: str, groups: Dict[str, float]):
        """
        groups: {"control": 0.5, "variant_a": 0.3, "variant_b": 0.2}
        Values represent allocation ratios.
        """
        self.experiment_name = experiment_name
        self.groups = groups
        # Build cumulative thresholds
        self.thresholds = []
        cumulative = 0
        for group, ratio in groups.items():
            cumulative += ratio
            self.thresholds.append((group, cumulative))

    def assign(self, user_id: str) -> str:
        # Hash user_id + experiment_name for consistency
        hash_key = f"{self.experiment_name}:{user_id}"
        hash_value = int(hashlib.md5(hash_key.encode()).hexdigest(), 16)

        # Map to [0, 1) range
        normalized = (hash_value % 10000) / 10000.0

        # Find group based on cumulative thresholds
        for group, threshold in self.thresholds:
            if normalized < threshold:
                return group
        return self.groups[-1]

# Example
assigner = ABTestAssigner("video_recs_v2", {
    "control": 0.5,
    "variant_a": 0.3,
    "variant_b": 0.2
})

for uid in range(1, 11):
    print(f"User {uid}: {assigner.assign(str(uid))}")

题目 3: 推荐列表去重(集合 + 优先级)

从多个推荐源(搜索、关注、热门)合并视频推荐,去除重复视频,并按来源优先级排序。

from collections import defaultdict
from typing import List, Dict

def merge_recommendations(
    search_recs: List[Dict],
    follow_recs: List[Dict],
    trending_recs: List[Dict],
    max_results: int = 20
) -> List[Dict]:
    """
    Merge recommendations from multiple sources, deduplicate, and rank.
    Priority: search > follow > trending
    """
    source_priority = {
        "search": 0,
        "follow": 1,
        "trending": 2
    }

    seen_videos = set()
    merged = []

    for source, recs in [
        ("search", search_recs),
        ("follow", follow_recs),
        ("trending", trending_recs)
    ]:
        for video in recs:
            video_id = video["video_id"]
            if video_id not in seen_videos:
                seen_videos.add(video_id)
                merged.append({
                    **video,
                    "source": source,
                    "source_priority": source_priority[source]
                })

    # Sort by source priority, then by video score within same source
    merged.sort(key=lambda x: (x["source_priority"], -x.get("score", 0)))
    return merged[:max_results]

题目 4: 视频标签共现矩阵(图论 + 矩阵)

给定视频的标签列表,计算所有标签对的共现次数,找出最常一起出现的标签组合。

from collections import Counter
from itertools import combinations

def tag_cooccurrence(videos: List[Dict], top_k: int = 10) -> List[tuple]:
    """
    videos: [{"video_id": 1, "tags": ["cooking", "recipe", "pasta"]}, ...]
    Returns top_k most common tag pairs.
    """
    pair_counter = Counter()

    for video in videos:
        tags = sorted(video["tags"])  # Sort to avoid (A,B) vs (B,A)
        for pair in combinations(tags, 2):
            pair_counter[pair] += 1

    return pair_counter.most_common(top_k)

# Example
videos = [
    {"video_id": 1, "tags": ["cooking", "recipe", "pasta"]},
    {"video_id": 2, "tags": ["cooking", "recipe", "sushi"]},
    {"video_id": 3, "tags": ["recipe", "pasta", "italian"]},
    {"video_id": 4, "tags": ["cooking", "italian", "pasta"]},
]
print(tag_cooccurrence(videos))
# Output: [(('cooking', 'recipe'), 2), (('cooking', 'pasta'), 2), ...]

Phone Screen:SQL + Coding

电话面两道题,一道 SQL 一道 Python。

SQL 题:用户日活与留存分析

给定用户观看日志表,计算每日 DAU 和次日留存率。

-- User watch log table
-- user_watch_logs: user_id, video_id, watch_duration, watched_at

WITH daily_active AS (
    SELECT
        DATE(watched_at) AS activity_date,
        user_id
    FROM user_watch_logs
    GROUP BY DATE(watched_at), user_id
),
next_day AS (
    SELECT
        d1.activity_date,
        d1.user_id,
        d2.activity_date AS next_day_activity
    FROM daily_active d1
    LEFT JOIN daily_active d2
        ON d1.user_id = d2.user_id
        AND d2.activity_date = d1.activity_date + INTERVAL '1 day'
),
daily_metrics AS (
    SELECT
        activity_date,
        COUNT(DISTINCT user_id) AS dau,
        ROUND(
            100.0 * COUNT(DISTINCT CASE WHEN next_day_activity IS NOT NULL THEN user_id END)
            / COUNT(DISTINCT user_id),
            2
        ) AS retention_rate_pct
    FROM next_day
    GROUP BY activity_date
)
SELECT * FROM daily_metrics
ORDER BY activity_date DESC
LIMIT 30;

Python 题:视频分桶统计

from collections import defaultdict
from typing import List, Dict

def bucket_watch_duration(videos: List[Dict], bucket_boundaries: List[int] = [15, 30, 60, 120, 300]) -> Dict[str, int]:
    """
    Bucket video watch durations into ranges.
    bucket_boundaries: [15, 30, 60, 120, 300] creates buckets:
    0-15s, 15-30s, 30-60s, 60-120s, 120-300s, 300s+
    """
    buckets = defaultdict(int)

    for video in videos:
        duration = video["watch_duration_seconds"]

        for i, boundary in enumerate(bucket_boundaries):
            if duration <= boundary:
                if i == 0:
                    bucket_name = f"0-{boundary}s"
                else:
                    bucket_name = f"{bucket_boundaries[i-1]}-{boundary}s"
                buckets[bucket_name] += 1
                break
        else:
            bucket_name = f"{bucket_boundaries[-1]}s+"
            buckets[bucket_name] += 1

    return dict(buckets)

VO Round 1: SQL + Data Analysis

这轮重点考察对短视频业务的理解和 SQL 分析能力。

题目:视频推荐效果分析

分析推荐系统的效果:对比推荐视频和用户搜索视频的观看指标差异。

-- Tables:
-- video_views: view_id, user_id, video_id, source (recommended/search/follow/trending),
--              watch_duration, completed_watch, liked, shared, commented, viewed_at
-- videos: video_id, creator_id, duration_seconds, category, created_at

WITH source_metrics AS (
    SELECT
        source,
        COUNT(*) AS total_views,
        ROUND(AVG(watch_duration), 2) AS avg_watch_duration,
        ROUND(100.0 * SUM(CASE WHEN completed_watch THEN 1 ELSE 0 END) / COUNT(*), 2) AS completion_rate,
        ROUND(100.0 * SUM(CASE WHEN liked THEN 1 ELSE 0 END) / COUNT(*), 2) AS like_rate,
        ROUND(100.0 * SUM(CASE WHEN shared THEN 1 ELSE 0 END) / COUNT(*), 2) AS share_rate,
        ROUND(100.0 * SUM(CASE WHEN commented THEN 1 ELSE 0 END) / COUNT(*), 2) AS comment_rate,
        -- Engagement score: weighted combination
        ROUND(
            100.0 * (
                0.3 * SUM(CASE WHEN completed_watch THEN 1 ELSE 0 END) +
                0.25 * SUM(CASE WHEN liked THEN 1 ELSE 0 END) +
                0.25 * SUM(CASE WHEN shared THEN 1 ELSE 0 END) +
                0.2 * SUM(CASE WHEN commented THEN 1 ELSE 0 END)
            ) / COUNT(*),
            2
        ) AS engagement_score
    FROM video_views
    WHERE viewed_at >= NOW() - INTERVAL '7 days'
    GROUP BY source
)
SELECT * FROM source_metrics
ORDER BY engagement_score DESC;

面试官追问:

Q: 如果推荐视频的 completion rate 明显低于搜索视频,可能的原因是什么?

我回答了几个方向:

  1. 推荐质量 — 推荐模型可能过优化点击率(CTR),而忽略了内容质量
  2. 用户意图匹配 — 搜索是主动意图,推荐是被动消费,期望不同
  3. 冷启动问题 — 新视频在推荐池中缺乏足够的 engagement 信号
  4. 信息茧房 — 推荐可能过度集中在某些类型,导致用户疲劳

题目:创作者增长分析

-- Analyze creator growth and content performance
WITH creator_weekly AS (
    SELECT
        v.creator_id,
        DATE_TRUNC('week', vv.viewed_at) AS week,
        COUNT(DISTINCT vv.view_id) AS total_views,
        COUNT(DISTINCT vv.user_id) AS unique_viewers,
        COUNT(DISTINCT v.video_id) AS videos_published,
        AVG(vv.watch_duration) AS avg_watch_duration
    FROM videos v
    JOIN video_views vv ON v.video_id = vv.video_id
    WHERE vv.viewed_at >= NOW() - INTERVAL '90 days'
    GROUP BY v.creator_id, DATE_TRUNC('week', vv.viewed_at)
),
growth_metrics AS (
    SELECT
        creator_id,
        week,
        total_views,
        unique_viewers,
        videos_published,
        avg_watch_duration,
        -- Week-over-week growth
        LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week) AS prev_week_views,
        CASE
            WHEN LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week) > 0
            THEN ROUND(100.0 * (total_views - LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week))
                     / LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week), 2)
            ELSE NULL
        END AS wow_growth_pct
    FROM creator_weekly
)
SELECT * FROM growth_metrics
WHERE wow_growth_pct > 50
ORDER BY wow_growth_pct DESC
LIMIT 50;

VO Round 2: Data Pipeline Design

这轮给的场景是:设计一个推荐系统特征管道,为实时推荐提供用户特征和视频特征。

"""
Recommendation Feature Pipeline Architecture

Offline (batch, daily):
    Raw Data → Spark ETL → Feature Store (HBase) → Model Training

Online (real-time):
    User Events → Kafka → Flink → Feature Store (Redis) → Model Serving
"""

from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import json

@dataclass
class UserFeature:
    user_id: str
    watch_history: List[str]          # Recent video IDs
    preferred_categories: Dict[str, float]  # category -> preference score
    active_hours: List[int]           # Hours user is most active
    avg_session_duration: float       # Average watch session in seconds
    device_type: str                  # mobile/desktop
    location_country: str

@dataclass
class VideoFeature:
    video_id: str
    creator_id: str
    category: str
    duration_seconds: float
    tags: List[str]
    embedding: List[float]            # 512-dim video embedding
    virality_score: float             # How quickly video gains views
    completion_rate: float            # Historical completion rate
    created_at: datetime

class RecommendationFeaturePipeline:
    """
    Two-tier feature pipeline:
    1. Offline: Spark batch jobs compute historical features
    2. Online: Flink stream processing updates real-time features
    """

    def __init__(self, config: Dict):
        self.config = config
        # In production: Redis for online, HBase for offline
        self.online_features = {}     # Redis cache
        self.offline_features = {}    # HBase/HDFS

    def compute_offline_features(self, user_id: str, history_days: int = 30) -> UserFeature:
        """
        Batch computation of user features using historical data.
        Runs daily via Airflow scheduler.
        """
        cutoff = datetime.now() - timedelta(days=history_days)

        # Simulate Spark SQL queries
        watch_history = self._query_watch_history(user_id, cutoff)
        categories = self._query_category_preferences(user_id, cutoff)
        active_hours = self._query_active_hours(user_id, cutoff)
        session_duration = self._query_avg_session_duration(user_id, cutoff)
        device = self._query_device_type(user_id)
        location = self._query_location(user_id)

        feature = UserFeature(
            user_id=user_id,
            watch_history=watch_history[-100:],  # Last 100 videos
            preferred_categories=categories,
            active_hours=active_hours,
            avg_session_duration=session_duration,
            device_type=device,
            location_country=location
        )

        # Store in offline feature store
        self.offline_features[user_id] = feature
        return feature

    def update_online_features(self, event: Dict):
        """
        Real-time feature update from Kafka stream.
        Each user event (watch, like, share) triggers a feature update.
        """
        user_id = event["user_id"]
        event_type = event["event_type"]  # watch/like/share/comment
        video_id = event["video_id"]

        # Get or initialize user online features
        if user_id not in self.online_features:
            # Load base features from offline store
            self.online_features[user_id] = self.offline_features.get(user_id, {})

        features = self.online_features[user_id]

        if event_type == "watch":
            # Update recent watch history (sliding window)
            history = features.get("recent_watches", [])
            history.append({
                "video_id": video_id,
                "timestamp": event["timestamp"],
                "watch_duration": event.get("watch_duration", 0)
            })
            # Keep only last N events (TTL-based eviction)
            features["recent_watches"] = history[-50:]

            # Update category preference (decay-based)
            category = event.get("video_category", "unknown")
            current_prefs = features.get("category_prefs", {})
            # Decay existing preferences and boost current category
            for cat in current_prefs:
                current_prefs[cat] *= 0.99  # Decay factor
            current_prefs[category] = current_prefs.get(category, 0) + 1.0
            features["category_prefs"] = current_prefs

        elif event_type == "like":
            # Boost video's category preference
            category = event.get("video_category", "unknown")
            current_prefs = features.get("category_prefs", {})
            current_prefs[category] = current_prefs.get(category, 0) + 2.0  # Higher weight for likes
            features["category_prefs"] = current_prefs

        # Update in online feature store (Redis)
        self._write_to_redis(user_id, features)

    def get_user_features(self, user_id: str) -> Dict:
        """
        Merge online + offline features for model inference.
        Online features override offline for recent activity.
        """
        offline = self.offline_features.get(user_id, {})
        online = self.online_features.get(user_id, {})

        # Merge: online takes precedence for time-sensitive features
        merged = {**offline, **online}
        return merged

    # Simulated data access methods
    def _query_watch_history(self, user_id, cutoff) -> List[str]:
        return [f"video_{i}" for i in range(100)]

    def _query_category_preferences(self, user_id, cutoff) -> Dict[str, float]:
        return {"cooking": 0.3, "comedy": 0.25, "music": 0.2, "tech": 0.15, "other": 0.1}

    def _query_active_hours(self, user_id, cutoff) -> List[int]:
        return [9, 12, 18, 20, 21, 22]

    def _query_avg_session_duration(self, user_id, cutoff) -> float:
        return 450.0  # 7.5 minutes

    def _query_device_type(self, user_id) -> str:
        return "mobile"

    def _query_location(self, user_id) -> str:
        return "US"

    def _write_to_redis(self, user_id: str, features: Dict):
        # In production: redis.set(f"user:{user_id}:features", json.dumps(features), ex=3600)
        pass

VO Round 3: System Design — 实时推荐数据平台

需求

设计 TikTok 的实时推荐数据平台,要求:

  • 支持每秒百万级用户事件(观看、点赞、分享)
  • 特征更新延迟 < 1 秒
  • 推荐模型推理延迟 < 50ms
  • 支持 A/B 测试和多模型版本管理

架构

                    ┌──────────────────────────────────────┐
                    │        Client Apps (iOS/Android)      │
                    │  Events: watch, like, share, comment  │
                    └──────────────┬───────────────────────┘
                                   │ HTTPS / gRPC
                    ┌──────────────▼───────────────────────┐
                    │          Event Ingestion Layer         │
                    │  - API Gateway (Envoy)                │
                    │  - Rate Limiting                      │
                    │  - Event Validation                   │
                    └──────────────┬───────────────────────┘

                    ┌──────────────▼───────────────────────┐
                    │          Message Queue (Kafka)         │
                    │  Topics:                              │
                    │    - user_events (watch/like/share)    │
                    │    - video_events (publish/update)     │
                    │    - model_events (predictions)        │
                    └──┬────────────┬────────────┬──────────┘
                       │            │            │
        ┌──────────────▼──┐  ┌─────▼────────────▼──────┐  ┌▼─────────────────┐
        │ Feature Updates  │  │   Analytics Pipeline    │  │ Event Archiving  │
        │ (Flink)          │  │   (Spark Streaming)     │  │ (HDFS / S3)      │
        │ - User features  │  │   - Real-time dashboards│  │                  │
        │ - Video features │  │   - KPI tracking        │  │  ┌─────────────┐ │
        │ - Category prefs │  │   - Anomaly detection   │  │  │   Data Lake  │ │
        └────────┬─────────┘  └────────────────────────┘  │  │  (Parquet)   │ │
                 │                                         │  └─────────────┘ │
        ┌────────▼─────────┐                               └──────────────────┘
        │ Feature Store     │
        │ ┌───────────────┐ │
        │ │ Online (Redis) │ │ <--- Real-time lookup (<1ms)
        │ │ TTL: 1 hour    │ │
        │ └───────────────┘ │
        │ ┌───────────────┐ │
        │ │ Offline       │ │ <--- Batch features (daily)
        │ │ (HBase/HDFS)  │ │
        │ └───────────────┘ │
        └────────┬──────────┘

        ┌────────▼──────────┐
        │ Model Serving      │
        │ ┌───────────────┐  │
        │ │ Online Serving │  │ <--- TensorFlow Serving / Ray
        │ │ (TensorFlow)   │  │
        │ └───────────────┘  │
        └────────┬───────────┘

        ┌────────▼───────────┐
        │ Recommendation API │
        │ - Fetch N videos    │
        │ - Rank by model score│
        │ - Apply business    │
        │   rules (filtering) │
        └────────────────────┘

核心设计讨论

Q: 特征一致性怎么保证?(离线和在线特征可能不一致)

我提出了 Feature Store 的两层架构:

  1. Point-in-time correctness — 模型训练时使用训练时刻的特征快照,避免 data leakage
  2. Feature versioning — 每个特征都有版本号,模型记录使用的特征版本
  3. Feature drift monitoring — 对比离线和在线特征的分布差异,超过阈值触发告警

Q: 如果 Kafka 积压(lag)严重怎么办?

我的方案:

  1. Auto-scaling — Flink consumer 根据 lag 自动增加并行度
  2. 优先级队列 — 实时特征更新优先于离线分析
  3. 背压机制 — 当下游处理能力不足时,Kafka 触发背压,上游限流
  4. Watermark 机制 — Flink 用 watermark 处理乱序事件,保证 event-time 语义

VO Round 4: Behavioral + HM

HM 面试比较注重跨团队协作技术判断力

Q: Tell me about a time you disagreed with a data scientist on feature design.

我讲了一个经历:数据科学家想加入一个实时计算的用户情绪特征(基于 NLP 分析评论),但这个特征计算延迟高、成本高。我的建议是用近似特征(点赞/评论比率 + 分享率)替代,最终效果几乎一样但成本降低了 90%。


面试总结

成功经验

  1. TikTok OA 偏算法 — 1h 4 题,重点在数组/滑动窗口/哈希,提前练好基础算法
  2. SQL 题贴近业务 — 推荐效果分析、创作者增长、留存率分析都是真实业务场景
  3. System Design 重点在数据流 — 推荐特征管道是核心,要熟悉 Kafka + Flink + Feature Store 架构
  4. 代码风格要求高 — TikTok 面试官看重 clean code 和模块化设计

注意事项

  • OA 四道题不需要全部做完,但前 2-3 题一定要正确
  • VO 的 SQL 题要在 15-20 分钟内写完,建议提前练复杂窗口函数
  • System Design 要准备好 feature store 和 real-time streaming 的 tradeoff 讨论

推荐阅读


💡 需要面试辅导?

如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。

👉 联系我们 获取专属面试准备方案

准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们