TikTok Data Engineer 面经 | 推荐系统数据管道 + SQL 分析 + 实时流处理
TikTok Data Engineer 完整面试经历:推荐系统数据管道设计、短视频指标分析 SQL、Flink 实时流处理,附 Python/SQL/PySpark 代码和架构图。
公司: TikTok
岗位: Data Engineer (推荐系统方向)
面试形式: OA (1h, 4 题) → Phone Screen → VO (4 rounds)
地点: 洛杉矶 / 远程
结果: Offer ✅
面试流程
TikTok 的 DE 面试在北美大厂里算节奏最快的——从 OA 到 offer 总共不到两周。他们的面试风格跟 Meta 有点像,但更偏重数据管道设计和推荐系统业务场景。
流程拆解:
- OA (HackerRank, 1h 4 题) — 算法题,难度中等
- Phone Screen (45min) — 1 道 coding + 1 道 SQL
- VO Round 1: SQL + Data Analysis (45min) — 短视频指标分析
- VO Round 2: Data Pipeline Design (45min) — 推荐特征管道
- VO Round 3: System Design (60min) — 实时推荐数据平台
- VO Round 4: BQ + HM (30min) — 行为面 + 团队匹配
OA:四道题一小时
TikTok 的 OA 是标准的 HackerRank 格式,1 小时 4 道题。跟其他公司不同,TikTok 的 OA 偏算法而非 SQL,但题目不算难。
题目 1: 视频观看序列分析(数组 + 前缀和)
给定一个用户观看视频的时间序列
[watch_duration_1, watch_duration_2, ...],找出连续观看总时长超过阈值 K 的最短子序列长度。
def shortest_watch_session(durations: list[int], threshold: int) -> int:
"""
Find the shortest contiguous subarray with sum >= threshold.
Uses sliding window approach for O(n) solution.
"""
if not durations:
return -1
left = 0
current_sum = 0
min_length = float('inf')
for right in range(len(durations)):
current_sum += durations[right]
while current_sum >= threshold:
min_length = min(min_length, right - left + 1)
current_sum -= durations[left]
left += 1
return min_length if min_length != float('inf') else -1
# Example: Find shortest session with total watch time >= 60 seconds
durations = [15, 25, 10, 30, 20, 40, 5]
print(shortest_watch_session(durations, 60)) # Output: 3 (25+10+30=65)
题目 2: A/B Test 分组(哈希 + 一致性哈希)
设计一个函数,根据用户 ID 将用户分配到 A/B 测试的不同组别,保证同一用户始终分配到同一组。
import hashlib
from typing import Dict, List
class ABTestAssigner:
"""
Consistent hash-based A/B test assignment.
Guarantees same user always gets same group.
"""
def __init__(self, experiment_name: str, groups: Dict[str, float]):
"""
groups: {"control": 0.5, "variant_a": 0.3, "variant_b": 0.2}
Values represent allocation ratios.
"""
self.experiment_name = experiment_name
self.groups = groups
# Build cumulative thresholds
self.thresholds = []
cumulative = 0
for group, ratio in groups.items():
cumulative += ratio
self.thresholds.append((group, cumulative))
def assign(self, user_id: str) -> str:
# Hash user_id + experiment_name for consistency
hash_key = f"{self.experiment_name}:{user_id}"
hash_value = int(hashlib.md5(hash_key.encode()).hexdigest(), 16)
# Map to [0, 1) range
normalized = (hash_value % 10000) / 10000.0
# Find group based on cumulative thresholds
for group, threshold in self.thresholds:
if normalized < threshold:
return group
return self.groups[-1]
# Example
assigner = ABTestAssigner("video_recs_v2", {
"control": 0.5,
"variant_a": 0.3,
"variant_b": 0.2
})
for uid in range(1, 11):
print(f"User {uid}: {assigner.assign(str(uid))}")
题目 3: 推荐列表去重(集合 + 优先级)
从多个推荐源(搜索、关注、热门)合并视频推荐,去除重复视频,并按来源优先级排序。
from collections import defaultdict
from typing import List, Dict
def merge_recommendations(
search_recs: List[Dict],
follow_recs: List[Dict],
trending_recs: List[Dict],
max_results: int = 20
) -> List[Dict]:
"""
Merge recommendations from multiple sources, deduplicate, and rank.
Priority: search > follow > trending
"""
source_priority = {
"search": 0,
"follow": 1,
"trending": 2
}
seen_videos = set()
merged = []
for source, recs in [
("search", search_recs),
("follow", follow_recs),
("trending", trending_recs)
]:
for video in recs:
video_id = video["video_id"]
if video_id not in seen_videos:
seen_videos.add(video_id)
merged.append({
**video,
"source": source,
"source_priority": source_priority[source]
})
# Sort by source priority, then by video score within same source
merged.sort(key=lambda x: (x["source_priority"], -x.get("score", 0)))
return merged[:max_results]
题目 4: 视频标签共现矩阵(图论 + 矩阵)
给定视频的标签列表,计算所有标签对的共现次数,找出最常一起出现的标签组合。
from collections import Counter
from itertools import combinations
def tag_cooccurrence(videos: List[Dict], top_k: int = 10) -> List[tuple]:
"""
videos: [{"video_id": 1, "tags": ["cooking", "recipe", "pasta"]}, ...]
Returns top_k most common tag pairs.
"""
pair_counter = Counter()
for video in videos:
tags = sorted(video["tags"]) # Sort to avoid (A,B) vs (B,A)
for pair in combinations(tags, 2):
pair_counter[pair] += 1
return pair_counter.most_common(top_k)
# Example
videos = [
{"video_id": 1, "tags": ["cooking", "recipe", "pasta"]},
{"video_id": 2, "tags": ["cooking", "recipe", "sushi"]},
{"video_id": 3, "tags": ["recipe", "pasta", "italian"]},
{"video_id": 4, "tags": ["cooking", "italian", "pasta"]},
]
print(tag_cooccurrence(videos))
# Output: [(('cooking', 'recipe'), 2), (('cooking', 'pasta'), 2), ...]
Phone Screen:SQL + Coding
电话面两道题,一道 SQL 一道 Python。
SQL 题:用户日活与留存分析
给定用户观看日志表,计算每日 DAU 和次日留存率。
-- User watch log table
-- user_watch_logs: user_id, video_id, watch_duration, watched_at
WITH daily_active AS (
SELECT
DATE(watched_at) AS activity_date,
user_id
FROM user_watch_logs
GROUP BY DATE(watched_at), user_id
),
next_day AS (
SELECT
d1.activity_date,
d1.user_id,
d2.activity_date AS next_day_activity
FROM daily_active d1
LEFT JOIN daily_active d2
ON d1.user_id = d2.user_id
AND d2.activity_date = d1.activity_date + INTERVAL '1 day'
),
daily_metrics AS (
SELECT
activity_date,
COUNT(DISTINCT user_id) AS dau,
ROUND(
100.0 * COUNT(DISTINCT CASE WHEN next_day_activity IS NOT NULL THEN user_id END)
/ COUNT(DISTINCT user_id),
2
) AS retention_rate_pct
FROM next_day
GROUP BY activity_date
)
SELECT * FROM daily_metrics
ORDER BY activity_date DESC
LIMIT 30;
Python 题:视频分桶统计
from collections import defaultdict
from typing import List, Dict
def bucket_watch_duration(videos: List[Dict], bucket_boundaries: List[int] = [15, 30, 60, 120, 300]) -> Dict[str, int]:
"""
Bucket video watch durations into ranges.
bucket_boundaries: [15, 30, 60, 120, 300] creates buckets:
0-15s, 15-30s, 30-60s, 60-120s, 120-300s, 300s+
"""
buckets = defaultdict(int)
for video in videos:
duration = video["watch_duration_seconds"]
for i, boundary in enumerate(bucket_boundaries):
if duration <= boundary:
if i == 0:
bucket_name = f"0-{boundary}s"
else:
bucket_name = f"{bucket_boundaries[i-1]}-{boundary}s"
buckets[bucket_name] += 1
break
else:
bucket_name = f"{bucket_boundaries[-1]}s+"
buckets[bucket_name] += 1
return dict(buckets)
VO Round 1: SQL + Data Analysis
这轮重点考察对短视频业务的理解和 SQL 分析能力。
题目:视频推荐效果分析
分析推荐系统的效果:对比推荐视频和用户搜索视频的观看指标差异。
-- Tables:
-- video_views: view_id, user_id, video_id, source (recommended/search/follow/trending),
-- watch_duration, completed_watch, liked, shared, commented, viewed_at
-- videos: video_id, creator_id, duration_seconds, category, created_at
WITH source_metrics AS (
SELECT
source,
COUNT(*) AS total_views,
ROUND(AVG(watch_duration), 2) AS avg_watch_duration,
ROUND(100.0 * SUM(CASE WHEN completed_watch THEN 1 ELSE 0 END) / COUNT(*), 2) AS completion_rate,
ROUND(100.0 * SUM(CASE WHEN liked THEN 1 ELSE 0 END) / COUNT(*), 2) AS like_rate,
ROUND(100.0 * SUM(CASE WHEN shared THEN 1 ELSE 0 END) / COUNT(*), 2) AS share_rate,
ROUND(100.0 * SUM(CASE WHEN commented THEN 1 ELSE 0 END) / COUNT(*), 2) AS comment_rate,
-- Engagement score: weighted combination
ROUND(
100.0 * (
0.3 * SUM(CASE WHEN completed_watch THEN 1 ELSE 0 END) +
0.25 * SUM(CASE WHEN liked THEN 1 ELSE 0 END) +
0.25 * SUM(CASE WHEN shared THEN 1 ELSE 0 END) +
0.2 * SUM(CASE WHEN commented THEN 1 ELSE 0 END)
) / COUNT(*),
2
) AS engagement_score
FROM video_views
WHERE viewed_at >= NOW() - INTERVAL '7 days'
GROUP BY source
)
SELECT * FROM source_metrics
ORDER BY engagement_score DESC;
面试官追问:
Q: 如果推荐视频的 completion rate 明显低于搜索视频,可能的原因是什么?
我回答了几个方向:
- 推荐质量 — 推荐模型可能过优化点击率(CTR),而忽略了内容质量
- 用户意图匹配 — 搜索是主动意图,推荐是被动消费,期望不同
- 冷启动问题 — 新视频在推荐池中缺乏足够的 engagement 信号
- 信息茧房 — 推荐可能过度集中在某些类型,导致用户疲劳
题目:创作者增长分析
-- Analyze creator growth and content performance
WITH creator_weekly AS (
SELECT
v.creator_id,
DATE_TRUNC('week', vv.viewed_at) AS week,
COUNT(DISTINCT vv.view_id) AS total_views,
COUNT(DISTINCT vv.user_id) AS unique_viewers,
COUNT(DISTINCT v.video_id) AS videos_published,
AVG(vv.watch_duration) AS avg_watch_duration
FROM videos v
JOIN video_views vv ON v.video_id = vv.video_id
WHERE vv.viewed_at >= NOW() - INTERVAL '90 days'
GROUP BY v.creator_id, DATE_TRUNC('week', vv.viewed_at)
),
growth_metrics AS (
SELECT
creator_id,
week,
total_views,
unique_viewers,
videos_published,
avg_watch_duration,
-- Week-over-week growth
LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week) AS prev_week_views,
CASE
WHEN LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week) > 0
THEN ROUND(100.0 * (total_views - LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week))
/ LAG(total_views) OVER (PARTITION BY creator_id ORDER BY week), 2)
ELSE NULL
END AS wow_growth_pct
FROM creator_weekly
)
SELECT * FROM growth_metrics
WHERE wow_growth_pct > 50
ORDER BY wow_growth_pct DESC
LIMIT 50;
VO Round 2: Data Pipeline Design
这轮给的场景是:设计一个推荐系统特征管道,为实时推荐提供用户特征和视频特征。
"""
Recommendation Feature Pipeline Architecture
Offline (batch, daily):
Raw Data → Spark ETL → Feature Store (HBase) → Model Training
Online (real-time):
User Events → Kafka → Flink → Feature Store (Redis) → Model Serving
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import json
@dataclass
class UserFeature:
user_id: str
watch_history: List[str] # Recent video IDs
preferred_categories: Dict[str, float] # category -> preference score
active_hours: List[int] # Hours user is most active
avg_session_duration: float # Average watch session in seconds
device_type: str # mobile/desktop
location_country: str
@dataclass
class VideoFeature:
video_id: str
creator_id: str
category: str
duration_seconds: float
tags: List[str]
embedding: List[float] # 512-dim video embedding
virality_score: float # How quickly video gains views
completion_rate: float # Historical completion rate
created_at: datetime
class RecommendationFeaturePipeline:
"""
Two-tier feature pipeline:
1. Offline: Spark batch jobs compute historical features
2. Online: Flink stream processing updates real-time features
"""
def __init__(self, config: Dict):
self.config = config
# In production: Redis for online, HBase for offline
self.online_features = {} # Redis cache
self.offline_features = {} # HBase/HDFS
def compute_offline_features(self, user_id: str, history_days: int = 30) -> UserFeature:
"""
Batch computation of user features using historical data.
Runs daily via Airflow scheduler.
"""
cutoff = datetime.now() - timedelta(days=history_days)
# Simulate Spark SQL queries
watch_history = self._query_watch_history(user_id, cutoff)
categories = self._query_category_preferences(user_id, cutoff)
active_hours = self._query_active_hours(user_id, cutoff)
session_duration = self._query_avg_session_duration(user_id, cutoff)
device = self._query_device_type(user_id)
location = self._query_location(user_id)
feature = UserFeature(
user_id=user_id,
watch_history=watch_history[-100:], # Last 100 videos
preferred_categories=categories,
active_hours=active_hours,
avg_session_duration=session_duration,
device_type=device,
location_country=location
)
# Store in offline feature store
self.offline_features[user_id] = feature
return feature
def update_online_features(self, event: Dict):
"""
Real-time feature update from Kafka stream.
Each user event (watch, like, share) triggers a feature update.
"""
user_id = event["user_id"]
event_type = event["event_type"] # watch/like/share/comment
video_id = event["video_id"]
# Get or initialize user online features
if user_id not in self.online_features:
# Load base features from offline store
self.online_features[user_id] = self.offline_features.get(user_id, {})
features = self.online_features[user_id]
if event_type == "watch":
# Update recent watch history (sliding window)
history = features.get("recent_watches", [])
history.append({
"video_id": video_id,
"timestamp": event["timestamp"],
"watch_duration": event.get("watch_duration", 0)
})
# Keep only last N events (TTL-based eviction)
features["recent_watches"] = history[-50:]
# Update category preference (decay-based)
category = event.get("video_category", "unknown")
current_prefs = features.get("category_prefs", {})
# Decay existing preferences and boost current category
for cat in current_prefs:
current_prefs[cat] *= 0.99 # Decay factor
current_prefs[category] = current_prefs.get(category, 0) + 1.0
features["category_prefs"] = current_prefs
elif event_type == "like":
# Boost video's category preference
category = event.get("video_category", "unknown")
current_prefs = features.get("category_prefs", {})
current_prefs[category] = current_prefs.get(category, 0) + 2.0 # Higher weight for likes
features["category_prefs"] = current_prefs
# Update in online feature store (Redis)
self._write_to_redis(user_id, features)
def get_user_features(self, user_id: str) -> Dict:
"""
Merge online + offline features for model inference.
Online features override offline for recent activity.
"""
offline = self.offline_features.get(user_id, {})
online = self.online_features.get(user_id, {})
# Merge: online takes precedence for time-sensitive features
merged = {**offline, **online}
return merged
# Simulated data access methods
def _query_watch_history(self, user_id, cutoff) -> List[str]:
return [f"video_{i}" for i in range(100)]
def _query_category_preferences(self, user_id, cutoff) -> Dict[str, float]:
return {"cooking": 0.3, "comedy": 0.25, "music": 0.2, "tech": 0.15, "other": 0.1}
def _query_active_hours(self, user_id, cutoff) -> List[int]:
return [9, 12, 18, 20, 21, 22]
def _query_avg_session_duration(self, user_id, cutoff) -> float:
return 450.0 # 7.5 minutes
def _query_device_type(self, user_id) -> str:
return "mobile"
def _query_location(self, user_id) -> str:
return "US"
def _write_to_redis(self, user_id: str, features: Dict):
# In production: redis.set(f"user:{user_id}:features", json.dumps(features), ex=3600)
pass
VO Round 3: System Design — 实时推荐数据平台
需求
设计 TikTok 的实时推荐数据平台,要求:
- 支持每秒百万级用户事件(观看、点赞、分享)
- 特征更新延迟 < 1 秒
- 推荐模型推理延迟 < 50ms
- 支持 A/B 测试和多模型版本管理
架构
┌──────────────────────────────────────┐
│ Client Apps (iOS/Android) │
│ Events: watch, like, share, comment │
└──────────────┬───────────────────────┘
│ HTTPS / gRPC
┌──────────────▼───────────────────────┐
│ Event Ingestion Layer │
│ - API Gateway (Envoy) │
│ - Rate Limiting │
│ - Event Validation │
└──────────────┬───────────────────────┘
│
┌──────────────▼───────────────────────┐
│ Message Queue (Kafka) │
│ Topics: │
│ - user_events (watch/like/share) │
│ - video_events (publish/update) │
│ - model_events (predictions) │
└──┬────────────┬────────────┬──────────┘
│ │ │
┌──────────────▼──┐ ┌─────▼────────────▼──────┐ ┌▼─────────────────┐
│ Feature Updates │ │ Analytics Pipeline │ │ Event Archiving │
│ (Flink) │ │ (Spark Streaming) │ │ (HDFS / S3) │
│ - User features │ │ - Real-time dashboards│ │ │
│ - Video features │ │ - KPI tracking │ │ ┌─────────────┐ │
│ - Category prefs │ │ - Anomaly detection │ │ │ Data Lake │ │
└────────┬─────────┘ └────────────────────────┘ │ │ (Parquet) │ │
│ │ └─────────────┘ │
┌────────▼─────────┐ └──────────────────┘
│ Feature Store │
│ ┌───────────────┐ │
│ │ Online (Redis) │ │ <--- Real-time lookup (<1ms)
│ │ TTL: 1 hour │ │
│ └───────────────┘ │
│ ┌───────────────┐ │
│ │ Offline │ │ <--- Batch features (daily)
│ │ (HBase/HDFS) │ │
│ └───────────────┘ │
└────────┬──────────┘
│
┌────────▼──────────┐
│ Model Serving │
│ ┌───────────────┐ │
│ │ Online Serving │ │ <--- TensorFlow Serving / Ray
│ │ (TensorFlow) │ │
│ └───────────────┘ │
└────────┬───────────┘
│
┌────────▼───────────┐
│ Recommendation API │
│ - Fetch N videos │
│ - Rank by model score│
│ - Apply business │
│ rules (filtering) │
└────────────────────┘
核心设计讨论
Q: 特征一致性怎么保证?(离线和在线特征可能不一致)
我提出了 Feature Store 的两层架构:
- Point-in-time correctness — 模型训练时使用训练时刻的特征快照,避免 data leakage
- Feature versioning — 每个特征都有版本号,模型记录使用的特征版本
- Feature drift monitoring — 对比离线和在线特征的分布差异,超过阈值触发告警
Q: 如果 Kafka 积压(lag)严重怎么办?
我的方案:
- Auto-scaling — Flink consumer 根据 lag 自动增加并行度
- 优先级队列 — 实时特征更新优先于离线分析
- 背压机制 — 当下游处理能力不足时,Kafka 触发背压,上游限流
- Watermark 机制 — Flink 用 watermark 处理乱序事件,保证 event-time 语义
VO Round 4: Behavioral + HM
HM 面试比较注重跨团队协作和技术判断力。
Q: Tell me about a time you disagreed with a data scientist on feature design.
我讲了一个经历:数据科学家想加入一个实时计算的用户情绪特征(基于 NLP 分析评论),但这个特征计算延迟高、成本高。我的建议是用近似特征(点赞/评论比率 + 分享率)替代,最终效果几乎一样但成本降低了 90%。
面试总结
成功经验
- TikTok OA 偏算法 — 1h 4 题,重点在数组/滑动窗口/哈希,提前练好基础算法
- SQL 题贴近业务 — 推荐效果分析、创作者增长、留存率分析都是真实业务场景
- System Design 重点在数据流 — 推荐特征管道是核心,要熟悉 Kafka + Flink + Feature Store 架构
- 代码风格要求高 — TikTok 面试官看重 clean code 和模块化设计
注意事项
- OA 四道题不需要全部做完,但前 2-3 题一定要正确
- VO 的 SQL 题要在 15-20 分钟内写完,建议提前练复杂窗口函数
- System Design 要准备好 feature store 和 real-time streaming 的 tradeoff 讨论
推荐阅读
💡 需要面试辅导?
如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。
👉 联系我们 获取专属面试准备方案