Amazon 数据工程师面试实录 2026:Leadership Principles + AWS 数据服务深度面
Amazon Data Engineer 面试真实经历:Leadership Principles、AWS 数据服务、SQL、Spark、System Design 完整复盘。第一人称真实面经,含面试官对话与解题思路。
公司:Amazon 岗位:Data Engineer II (L5) 面试形式:Online Assessment + Virtual Onsite (4 轮) 结果:Pass → Offer
2026 年 5 月,我参加了 Amazon 的 Data Engineer 面试。Amazon 的面试流程非常独特——Leadership Principles (LPs) 贯穿整个面试过程,从 OA 到 VO 再到 Behavioral,LPs 无处不在。加上对 AWS 数据服务的深度考察,整个面试过程既考验技术实力,也考验文化契合度。
Online Assessment:SQL + Python
OA 在 HackerRank 上进行,90 分钟,包括 3 道 SQL 题和 2 道 Python 题。
SQL 题目:订单分析
给定以下表结构:
orders: order_id, user_id, order_date, status, total_amountorder_items: order_id, product_id, quantity, unit_priceproducts: product_id, category, brand请完成以下查询:
- 计算每个类别的月销售额
- 找出复购率最高的前 10 个用户
- 计算每个用户的 RFM 分数
我的解答:
-- 1. 每个类别的月销售额
SELECT
p.category,
DATE_TRUNC('month', o.order_date) AS month,
SUM(oi.quantity * oi.unit_price) AS monthly_revenue
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.status = 'completed'
GROUP BY p.category, DATE_TRUNC('month', o.order_date)
ORDER BY month DESC, monthly_revenue DESC;
-- 2. 复购率最高的前 10 个用户
WITH user_orders AS (
SELECT
user_id,
COUNT(DISTINCT order_id) AS order_count,
COUNT(DISTINCT DATE_TRUNC('month', order_date)) AS active_months
FROM orders
WHERE status = 'completed'
GROUP BY user_id
),
repeat_buyers AS (
SELECT
user_id,
CASE WHEN active_months > 0
THEN CAST(order_count AS FLOAT) / active_months
ELSE 0
END AS orders_per_month
FROM user_orders
WHERE order_count >= 2
)
SELECT user_id, orders_per_month
FROM repeat_buyers
ORDER BY orders_per_month DESC
LIMIT 10;
-- 3. RFM 分数计算
WITH customer_metrics AS (
SELECT
user_id,
MAX(order_date) AS last_purchase,
COUNT(DISTINCT order_id) AS frequency,
SUM(total_amount) AS monetary
FROM orders
WHERE status = 'completed'
GROUP BY user_id
),
rfm_scores AS (
SELECT
user_id,
last_purchase,
frequency,
monetary,
NTILE(5) OVER (ORDER BY last_purchase DESC) AS r_score,
NTILE(5) OVER (ORDER BY frequency ASC) AS f_score,
NTILE(5) OVER (ORDER BY monetary ASC) AS m_score
FROM customer_metrics
)
SELECT
user_id,
r_score,
f_score,
m_score,
r_score + f_score + m_score AS rfm_total
FROM rfm_scores
ORDER BY rfm_total DESC;
Python 题目:数据分区设计
设计一个数据分区策略,支持以下需求:
- 每天 10 亿条记录
- 按时间范围和用户 ID 查询
- 支持数据回溯 90 天
我的解答:
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime, timedelta
@dataclass
class PartitionConfig:
"""分区配置"""
partition_key: str # 分区键
partition_type: str # 分区类型: range, hash, list
num_partitions: int # 分区数量
retention_days: int # 保留天数
class DataPartitioner:
"""数据分区器"""
def __init__(self, config: PartitionConfig):
self.config = config
def get_partition(self, record: dict) -> str:
"""计算记录应该写入哪个分区"""
key_value = record.get(self.config.partition_key)
if self.config.partition_type == 'range':
return self._range_partition(key_value)
elif self.config.partition_type == 'hash':
return self._hash_partition(key_value)
elif self.config.partition_type == 'list':
return self._list_partition(key_value)
def _range_partition(self, value) -> str:
"""范围分区(按时间)"""
if isinstance(value, datetime):
return value.strftime('%Y/%m/%d')
return str(value)
def _hash_partition(self, value) -> str:
"""哈希分区(按用户 ID)"""
hash_value = hash(str(value)) % self.config.num_partitions
return f"partition_{hash_value:04d}"
def _list_partition(self, value) -> str:
"""列表分区(按类别)"""
return str(value).lower().replace(' ', '_')
def get_query_partitions(self, filters: dict) -> List[str]:
"""根据查询条件返回需要扫描的分区"""
partitions = []
if self.config.partition_type == 'range':
start_date = filters.get('start_date')
end_date = filters.get('end_date')
if start_date and end_date:
current = start_date
while current <= end_date:
partitions.append(current.strftime('%Y/%m/%d'))
current += timedelta(days=1)
return partitions
# 使用示例
config = PartitionConfig(
partition_key='event_time',
partition_type='range',
num_partitions=365,
retention_days=90
)
partitioner = DataPartitioner(config)
record = {'event_time': datetime(2026, 9, 6), 'user_id': '12345'}
print(partitioner.get_partition(record)) # "2026/09/06"
VO Round 1:Python + 数据管道设计
这一轮由一位 Senior DE 进行,60 分钟。
题目:设计一个弹性数据管道
设计一个数据管道,支持以下需求:
- 从多个数据源(RDS、S3、Kafka)读取数据
- 支持数据清洗和转换
- 写入 Redshift 和 S3
- 支持失败重试和告警
我的解答:
import boto3
import json
import logging
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
logger = logging.getLogger(__name__)
class DataSource(Enum):
RDS = 'rds'
S3 = 's3'
KAFKA = 'kafka'
class DataSink(Enum):
REDSHIFT = 'redshift'
S3 = 's3'
@dataclass
class PipelineConfig:
"""管道配置"""
name: str
source_type: DataSource
sink_type: DataSink
retry_max: int = 3
retry_delay: int = 60 # seconds
alert_on_failure: bool = True
batch_size: int = 10000
@dataclass
class PipelineResult:
"""管道执行结果"""
success: bool
records_processed: int
records_failed: int
duration_seconds: float
error_message: Optional[str] = None
class DataExtractor:
"""数据抽取器"""
def extract_from_rds(self, connection: dict, query: str) -> List[dict]:
"""从 RDS 抽取数据"""
client = boto3.client('rds-data')
response = client.execute_statement(
resourceArn=connection['arn'],
secretArn=connection['secret'],
database=connection['database'],
sql=query
)
return self._parse_rds_response(response)
def extract_from_s3(self, bucket: str, key: str) -> List[dict]:
"""从 S3 读取数据"""
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
return json.loads(content)
def extract_from_kafka(self, topic: str, group_id: str) -> List[dict]:
"""从 Kafka 读取数据(使用 MSK)"""
# 实际实现使用 kafka-python 或 confluent-kafka
records = []
# ... Kafka consumer logic
return records
class DataTransformer:
"""数据转换器"""
def __init__(self, transformations: List[Callable]):
self.transformations = transformations
def transform(self, records: List[dict]) -> List[dict]:
"""应用转换"""
result = records
for transform in self.transformations:
result = transform(result)
return result
class DataLoader:
"""数据加载器"""
def load_to_redshift(self, connection: dict, table: str, records: List[dict]):
"""加载到 Redshift"""
# 使用 COPY 命令从 S3 加载
s3 = boto3.client('s3')
temp_key = f"staging/{table}/{datetime.now().isoformat()}.json"
# 先上传到 S3
json_data = json.dumps(records).encode('utf-8')
s3.put_object(Bucket=connection['staging_bucket'], Key=temp_key, Body=json_data)
# 然后使用 COPY 命令
redshift = boto3.client('redshift-data')
redshift.execute_statement(
clusterIdentifier=connection['cluster_id'],
database=connection['database'],
secretArn=connection['secret'],
sql=f"""
COPY {table}
FROM 's3://{connection['staging_bucket']}/{temp_key}'
IAM_ROLE '{connection['iam_role']}'
FORMAT AS JSON
"""
)
def load_to_s3(self, bucket: str, key: str, records: List[dict]):
"""加载到 S3"""
s3 = boto3.client('s3')
json_data = json.dumps(records).encode('utf-8')
s3.put_object(Bucket=bucket, Key=key, Body=json_data)
class DataPipeline:
"""数据管道"""
def __init__(self, config: PipelineConfig):
self.config = config
self.extractor = DataExtractor()
self.loader = DataLoader()
def run(self, source_config: dict, sink_config: dict,
transformations: Optional[List[Callable]] = None) -> PipelineResult:
"""执行管道"""
start_time = datetime.now()
for attempt in range(1, self.config.retry_max + 1):
try:
logger.info(f"Attempt {attempt}/{self.config.retry_max}")
# 1. 抽取
records = self._extract(source_config)
logger.info(f"Extracted {len(records)} records")
# 2. 转换
if transformations:
transformer = DataTransformer(transformations)
records = transformer.transform(records)
# 3. 加载
self._load(sink_config, records)
logger.info(f"Loaded {len(records)} records")
duration = (datetime.now() - start_time).total_seconds()
return PipelineResult(
success=True,
records_processed=len(records),
records_failed=0,
duration_seconds=duration
)
except Exception as e:
logger.error(f"Attempt {attempt} failed: {str(e)}")
if attempt < self.config.retry_max:
import time
time.sleep(self.config.retry_delay * attempt) # 指数退避
else:
duration = (datetime.now() - start_time).total_seconds()
if self.config.alert_on_failure:
self._send_alert(str(e))
return PipelineResult(
success=False,
records_processed=0,
records_failed=0,
duration_seconds=duration,
error_message=str(e)
)
def _extract(self, config: dict) -> List[dict]:
"""根据配置抽取数据"""
if self.config.source_type == DataSource.RDS:
return self.extractor.extract_from_rds(
config['connection'], config['query'])
elif self.config.source_type == DataSource.S3:
return self.extractor.extract_from_s3(
config['bucket'], config['key'])
elif self.config.source_type == DataSource.KAFKA:
return self.extractor.extract_from_kafka(
config['topic'], config['group_id'])
def _load(self, config: dict, records: List[dict]):
"""根据配置加载数据"""
if self.config.sink_type == DataSink.REDSHIFT:
self.loader.load_to_redshift(config, config['table'], records)
elif self.config.sink_type == DataSink.S3:
self.loader.load_to_s3(config['bucket'], config['key'], records)
def _send_alert(self, error_message: str):
"""发送告警(SNS)"""
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:PipelineAlerts',
Subject=f'Pipeline Failed: {self.config.name}',
Message=error_message
)
# 使用示例
config = PipelineConfig(
name='user_events_pipeline',
source_type=DataSource.S3,
sink_type=DataSink.REDSHIFT,
retry_max=3,
alert_on_failure=True
)
pipeline = DataPipeline(config)
result = pipeline.run(
source_config={
'bucket': 'my-data-lake',
'key': 'raw/events/2026-09-06.json'
},
sink_config={
'cluster_id': 'my-cluster',
'database': 'analytics',
'table': 'user_events',
'staging_bucket': 'my-staging',
'iam_role': 'arn:aws:iam::123456789:role/RedshiftRole'
}
)
print(f"Pipeline result: {result}")
面试官追问:
“如果数据量很大,一次加载会超时怎么办?”
我回答可以使用Glue + Partitioned Loading:
# 使用 AWS Glue 进行大规模数据加载
import boto3
from awsglue.context import GlueContext
from awsglue.job import Job
def run_glue_job():
glue = boto3.client('glue')
response = glue.start_job_run(
JobName='etl-user-events',
Arguments={
'--source_date': '2026-09-06',
'--target_table': 'user_events'
}
)
return response['JobRunId']
VO Round 2:SQL 深度 + Redshift 优化
这一轮由一位 Data Warehouse 团队的 DE 进行,60 分钟。
题目:Redshift 查询优化
以下查询在 Redshift 上执行很慢,请分析原因并优化:
-- 原始查询
SELECT
o.user_id,
u.user_name,
COUNT(*) as order_count,
SUM(oi.quantity * oi.unit_price) as total_spent
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN order_items oi ON o.order_id = oi.order_id
WHERE o.order_date >= '2026-01-01'
AND o.status = 'completed'
GROUP BY o.user_id, u.user_name
HAVING SUM(oi.quantity * oi.unit_price) > 1000
ORDER BY total_spent DESC;
我分析的性能问题:
- 没有分区键:
orders表没有按order_date分区 - 没有 Distribution Key:JOIN 列没有设置合适的分布键
- 没有 Sort Key:查询条件列没有设置排序键
- 中间结果过大:JOIN 后过滤效率低
优化方案:
-- 1. 表结构优化
CREATE TABLE orders (
order_id INT,
user_id INT,
order_date DATE,
status VARCHAR(20),
total_amount DECIMAL(10,2)
)
DISTKEY(user_id) -- 按用户分布,优化 JOIN
SORTKEY(order_date, status); -- 按日期和状态排序,优化过滤
CREATE TABLE order_items (
order_id INT DISTKEY, -- 与 orders 相同的分布键
product_id INT,
quantity INT,
unit_price DECIMAL(10,2)
);
CREATE TABLE users (
user_id INT DISTKEY, -- 与 orders 相同的分布键
user_name VARCHAR(100),
email VARCHAR(255)
);
-- 2. 使用 Materialized View
CREATE MATERIALIZED VIEW mv_user_order_summary
BACKUP NO
AS
SELECT
o.user_id,
u.user_name,
COUNT(*) as order_count,
SUM(oi.quantity * oi.unit_price) as total_spent
FROM orders o
JOIN users u ON o.user_id = u.user_id
JOIN order_items oi ON o.order_id = oi.order_id
WHERE o.status = 'completed'
GROUP BY o.user_id, u.user_name;
-- 3. 优化查询
SELECT *
FROM mv_user_order_summary
WHERE total_spent > 1000
ORDER BY total_spent DESC;
-- 4. 使用 Approximate 聚合(如果需要)
SELECT
APPROXIMATE PERCENTILE (total_spent) WITHIN GROUP
(ORDER BY total_spent DESC) AS p95_spent
FROM mv_user_order_summary;
面试官追问:
“Redshift 的 WAREHOUSE 大小如何选择?”
我回答:
| Warehouse Size | 适用场景 | 并发查询 | 成本 |
|---|---|---|---|
| XS/S/M | 开发测试 | 1-5 | 低 |
| L/XL | 生产环境 | 5-20 | 中 |
| 2XL-8XL | 大规模分析 | 20-100 | 高 |
| RA3 | 弹性扩展 | 动态 | 按需 |
推荐使用 RA3(Raptor Accelerated 3):
- 计算和存储分离
- 自动缩放
- 按秒计费
VO Round 3:System Design — AWS Data Lake
这一轮由一位 Principal DE 进行,60 分钟。
题目:设计 AWS 数据湖架构
设计一个数据湖,支持以下需求:
- 存储 PB 级数据
- 支持多种分析引擎(Athena, Redshift, EMR)
- 数据治理和安全
- 成本优化
我的架构设计:
┌─────────────────────────────────────────────────────────────────┐
│ Data Sources │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ RDS │ │ DynamoDB │ │ Kafka │ │ External APIs│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
└───────┼──────────────┼──────────────┼──────────────┼────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AWS Glue / MSK / Kinesis │ │
│ │ - Batch: Glue Crawlers + ETL Jobs │ │
│ │ - Streaming: MSK / Kinesis Data Streams │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Data Lake Storage (S3) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ raw/ │ │ staged/ │ │ curated/ │ │ consumption/ │ │
│ │ (原始) │ │ (清洗) │ │ (治理) │ │ (应用) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────────┘ │
│ │
│ Format: Parquet/ORC + Partitioned by date/event_type │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Processing Layer │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Athena │ │ Redshift │ │ EMR │ │ Glue Studio │ │
│ │ (即席) │ │ (OLAP) │ │ (Spark) │ │ (可视化ETL) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Governance & Security │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AWS Lake Formation + IAM + KMS │ │
│ │ - Data Catalog: Glue Data Catalog │ │
│ │ - Access Control: Lake Formation Permissions │ │
│ │ - Encryption: KMS (SSE-KMS) │ │
│ │ - Audit: CloudTrail + Macie │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
关键技术决策:
| 组件 | 选择 | 原因 |
|---|---|---|
| 存储 | S3 + Parquet | 成本低,列式存储压缩率高 |
| 元数据 | Glue Data Catalog | 统一元数据管理 |
| 即席查询 | Athena | 无需管理,按查询付费 |
| OLAP | Redshift RA3 | 高性能,弹性扩展 |
| 批处理 | EMR/Spark | 灵活,支持复杂 ETL |
| 流处理 | MSK/Kinesis | 托管 Kafka/实时流 |
| 治理 | Lake Formation | 统一权限管理 |
S3 分区策略:
s3://my-data-lake/
├── raw/
│ ├── events/
│ │ ├── year=2026/
│ │ │ ├── month=09/
│ │ │ │ ├── day=06/
│ │ │ │ │ ├── hour=00/part-000.parquet
│ │ │ │ │ ├── hour=01/part-000.parquet
│ │ │ │ │ └── ...
│ │ │ │ └── ...
│ │ │ └── ...
│ │ └── ...
│ └── ...
├── staged/
├── curated/
│ ├── user_events/
│ │ ├── year=2026/
│ │ │ ├── month=09/
│ │ │ │ ├── day=06/
│ │ │ │ │ └── part-000.parquet
│ │ │ │ └── ...
│ │ │ └── ...
│ │ └── ...
│ └── ...
└── consumption/
├── dashboard/
├── ml_features/
└── reports/
Lake Formation 权限管理:
import boto3
lakeformation = boto3.client('lakeformation')
# 授予用户对表的权限
lakeformation.grant_permissions(
Principal={
'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789:role/AnalyticsRole'
},
Resource={
'TableResource': {
'CatalogId': '123456789',
'DatabaseName': 'curated',
'Name': 'user_events'
}
},
Permissions=[
'SELECT',
'DESCRIBE'
]
)
# 列级权限
lakeformation.grant_permissions(
Principal={
'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789:role/BIRole'
},
Resource={
'TableWithColumnsResource': {
'CatalogId': '123456789',
'DatabaseName': 'curated',
'Name': 'user_events',
'ColumnNames': ['user_id', 'event_type', 'event_time'] # 排除敏感列
}
},
Permissions=['SELECT']
)
VO Round 4:Leadership Principles Deep Dive
最后一轮由 Hiring Manager 进行,60 分钟,全程围绕 Amazon 的 16 条 Leadership Principles。
LP1: Customer Obsession
Q: Tell me about a time you went above and beyond for a customer.
STAR 回答:
- Situation: 客户投诉数据延迟,导致业务决策受影响
- Task: 需要在 24 小时内解决延迟问题
- Action:
- 立即排查数据管道,发现是 Spark job 数据倾斜
- 连夜优化 Shuffle 逻辑,加盐值分散热点 key
- 建立实时监控,设置延迟告警
- Result: 延迟从 4 小时降到 15 分钟,客户满意度提升 90%
LP5: Dive Deep
Q: Describe a time you had to dig deep to find the root cause of a problem.
STAR 回答:
- Situation: 数据管道偶尔失败,错误信息不明确
- Task: 找到根本原因并修复
- Action:
- 收集 30 天的日志,分析失败模式
- 发现失败集中在特定时段(高峰期)
- 深入 Redshift logs,发现是 WLM queue 耗尽
- 调整 WLM 配置,增加 CONCURRENT SCALE 槽位
- Result: 失败率从 5% 降到 0.1%
LP6: Hire and Develop the Best
Q: How have you helped develop someone on your team?
STAR 回答:
- Situation: 新入职的 DE 不熟悉 AWS 数据服务
- Task: 帮助他在 3 个月内独立负责项目
- Action:
- 制定学习路径:Glue → Athena → Redshift → EMR
- 每周 1-on-1 code review
- 让他负责一个小项目,我作为 mentor 指导
- Result: 3 个月后他独立负责了一个关键数据管道
LP16: Insist on the Highest Standards
Q: Tell me about a time you had to push for higher standards.
STAR 回答:
- Situation: 团队的数据质量监控不完善
- Task: 建立全面的数据质量框架
- Action:
- 引入 Great Expectations 框架
- 定义 10+ 数据质量规则(完整性、准确性、一致性)
- 建立每日数据质量报告
- Result: 数据质量问题减少 80%,业务信任度提升
面试总结
成功经验
- LPs 是核心:Amazon 的面试围绕 Leadership Principles 展开,每个技术问题都可能转向 LP 考察
- AWS 服务要熟悉:Glue、Athena、Redshift、EMR、MSK 是必考题
- SQL 优化能力:Redshift 的 Distribution Key、Sort Key、分区优化是重点
- STAR 框架:Behavioral 问题使用 STAR 框架回答,结构清晰
面试注意事项
LP 准备:准备 10-15 个故事,覆盖所有 16 条 LPs,特别是 Customer Obsession、Ownership、Dive Deep、Insist on Highest Standards。
技术深度:Amazon 对 AWS 数据服务的理解要求很深,不仅要会用,还要理解底层原理。
系统思维:System Design 考察的是完整的数据平台思维,从 Ingestion 到 Governance。
推荐阅读
- Amazon Redshift 优化最佳实践 — Distribution Key、Sort Key、WLM 配置
- AWS Glue ETL 完全指南 — Crawler、Job、Trigger 配置
- Data Lake 架构设计 — 分层设计、治理、安全
💡 需要面试辅导?
如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。
👉 联系我们 获取专属面试准备方案