Data Engineer Case Study 面试:数据管道编排系统(Airflow/Prefect 真题)
data-engineersystem-designcase-studyinterviewarchitecture

Data Engineer Case Study 面试:数据管道编排系统(Airflow/Prefect 真题)

本文基于真实候选人面经整理Data面试全流程。还原面试题目、解题思路与技术考察重点,覆盖Airflow、系统设计、System Design、数据管道,附详细准备策略助你高效备战。

Sam · · 12 分钟阅读

面试真题来源:Airflow/Prefect Data Engineer 系统设计面试
难度:Hard | 考察领域:System Design / Architecture
核心考点:工作流编排、任务调度、依赖管理、容错机制

面试场景

这是 Airflow/Prefect DE 面试中非常经典的一道 Case Study 题:

题目:设计一个支持复杂依赖关系和数据管道编排的系统

面试官通常会给你一个真实的业务场景,要求你设计完整的数据处理架构。这道题考察的是你对工作流编排、任务调度、依赖管理、容错机制的全面理解。

业务需求分析

核心业务场景

在 Airflow/Prefect 这样的平台,数据管道编排需要支持:

  1. 复杂依赖:多任务之间的依赖关系
  2. 调度管理:定时触发、事件触发
  3. 容错机制:任务失败自动重试
  4. 监控告警:任务状态监控和异常告警

关键约束条件

  • 任务数量:数千到数万个任务
  • 调度频率:分钟级到小时级
  • 容错要求:自动重试和故障恢复
  • 可用性:99.99% SLA

整体架构设计

┌─────────────────────────────────────────────────────────────────┐
│                         Scheduler Layer                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  Cron       │  │  Event      │  │  Manual     │             │
│  │  Scheduler  │  │  Trigger    │  │  Trigger    │             │
│  └─────────────┘  └─────────────┘  └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘
          │                │                │
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────────┐
│                     DAG/Workflow Layer                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  DAG        │  │  Task       │  │  Dependency │             │
│  │  Definition │  │  Definition │  │  Graph      │             │
│  └─────────────┘  └─────────────┘  └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘
          │                │                │
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Execution Layer                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  Worker     │  │  Retry      │  │  Failover   │             │
│  │  Pool       │  │  Logic      │  │  Logic      │             │
│  └─────────────┘  └─────────────┘  └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘
          │                │                │
          ▼                ▼                ▼
┌─────────────────────────────────────────────────────────────────┐
│                     Monitoring & Alerting                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  Metrics    │  │  Alerts     │  │  Dashboard  │             │
│  │  Collection │  │  Engine     │  │  (可视化)    │             │
│  └─────────────┘  └─────────────┘  └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘

详细设计方案

1. DAG/Workflow 定义

DAG 定义

# Airflow DAG 定义示例
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2026, 8, 7),
    'email': ['data-team@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG('user_behavior_pipeline', 
         default_args=default_args, 
         schedule_interval='@hourly',
         catchup=False) as dag:
    
    # 任务 1: 数据提取
    extract_task = PythonOperator(
        task_id='extract_user_events',
        python_callable=extract_user_events,
        op_kwargs={'table': 'user_events'},
    )
    
    # 任务 2: 数据清洗
    clean_task = PythonOperator(
        task_id='clean_user_events',
        python_callable=clean_user_events,
        op_kwargs={'table': 'user_events'},
    )
    
    # 任务 3: 数据转换
    transform_task = PythonOperator(
        task_id='transform_user_events',
        python_callable=transform_user_events,
        op_kwargs={'table': 'user_events'},
    )
    
    # 任务 4: 数据加载
    load_task = PythonOperator(
        task_id='load_user_events',
        python_callable=load_user_events,
        op_kwargs={'table': 'user_events'},
    )
    
    # 任务 5: 数据质量检查
    quality_check_task = PythonOperator(
        task_id='quality_check',
        python_callable=check_data_quality,
        op_kwargs={'table': 'user_events'},
    )
    
    # 依赖关系
    extract_task >> clean_task >> transform_task >> load_task >> quality_check_task

2. 任务调度

调度策略

  • Cron 调度:定时触发
  • 事件触发:数据到达触发
  • 手动触发:人工触发

调度实现

# Prefect 任务调度示例
from prefect import flow, task
from prefect.schedules import CronSchedule
from datetime import timedelta

@task(retries=3, retry_delay=timedelta(minutes=5))
def extract_user_events(table: str):
    # 数据提取逻辑
    pass

@task(retries=3, retry_delay=timedelta(minutes=5))
def clean_user_events(table: str):
    # 数据清洗逻辑
    pass

@task(retries=3, retry_delay=timedelta(minutes=5))
def transform_user_events(table: str):
    # 数据转换逻辑
    pass

@task(retries=3, retry_delay=timedelta(minutes=5))
def load_user_events(table: str):
    # 数据加载逻辑
    pass

@flow
def user_behavior_pipeline(table: str = "user_events"):
    extract_result = extract_user_events(table)
    clean_result = clean_user_events(extract_result)
    transform_result = transform_user_events(clean_result)
    load_result = load_user_events(transform_result)
    return load_result

# 调度配置
user_behavior_pipeline.schedule = CronSchedule("0 * * * *")  # 每小时执行

3. 依赖管理

依赖类型

  • 上游依赖:任务之间的依赖关系
  • 数据依赖:数据文件/表的依赖
  • 外部依赖:外部系统/服务的依赖

依赖管理

# 依赖管理示例
from prefect import task
from prefect.tasks.shell import ShellOperation

@task
def check_data_dependency(table: str, date: str):
    # 检查数据是否就绪
    pass

@task
def check_external_dependency(service: str):
    # 检查外部服务是否可用
    pass

@flow
def dependent_pipeline(table: str = "user_events", date: str = "2026-08-07"):
    # 检查数据依赖
    data_ready = check_data_dependency(table, date)
    
    # 检查外部依赖
    service_available = check_external_dependency("api_service")
    
    # 如果依赖就绪,执行任务
    if data_ready and service_available:
        result = user_behavior_pipeline(table)
        return result
    else:
        raise Exception("Dependencies not met")

4. 容错机制

容错策略

  • 自动重试:任务失败自动重试
  • Checkpoint:任务状态保存
  • 故障转移:Worker 故障自动转移

容错实现

# 容错机制示例
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
from prefect.utilities import retry

@task(max_retries=3, retry_delay=timedelta(minutes=5))
def fault_tolerant_task(table: str):
    # 任务逻辑
    try:
        result = process_data(table)
        return result
    except Exception as e:
        # 记录错误日志
        log_error(e)
        # 抛出异常触发重试
        raise

@flow
def fault_tolerant_pipeline(table: str = "user_events"):
    return fault_tolerant_task(table)

关键技术决策

为什么选择这个方案?

  1. 复杂依赖:支持多任务之间的依赖关系
  2. 灵活调度:支持多种调度策略
  3. 容错机制:内置容错和重试机制
  4. 监控告警:完整的监控和告警系统

技术选型对比

方案优势劣势适用场景
Airflow生态丰富学习曲线陡复杂工作流
Prefect简单易用生态较小简单工作流
Luigi轻量级功能有限简单任务
Dagster现代化学习曲线陡数据管道

面试官追问

常见追问问题

  1. 如果任务数量增加 10 倍,架构如何调整?

    • 增加 Worker 节点
    • 优化调度策略
    • 增加资源配额
  2. 如果要求多租户隔离,如何实现?

    • 资源隔离:独立 Worker 池
    • 数据隔离:按 tenant_id 分区
    • 权限隔离:RBAC 权限模型
  3. 如果某个组件宕机,如何保证系统可用性?

    • 多节点部署
    • 故障转移机制
    • 数据备份和恢复

面试技巧

回答框架

  1. 澄清需求:明确业务场景和技术约束
  2. 架构设计:画出架构图,说明每个组件的职责
  3. 技术选型:解释为什么选择某个技术
  4. 权衡分析:讨论方案的优缺点

高分回答要点

  • 任务数量:数千到数万个任务
  • 调度频率:分钟级到小时级
  • 容错要求:自动重试和故障恢复
  • 监控告警:完整的监控和告警系统

本文整理自真实 Data Engineer 面试经验,架构设计经过实际验证。


💡 需要面试辅导?

如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。

👉 联系我们 获取专属面试准备方案

准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们