plaiddata-engineerinterviewpci-dsscompliancesparkkafkafin-techdata-pipeline

Plaid 数据工程师面试实录 2026:金融数据管道 + PCI-DSS 合规脱敏 完整复盘

Plaid Data Engineer 面试真实经历:银行数据连接管道、PCI-DSS 合规数据脱敏、金融数据质量监控、实时交易分析管道设计完整复盘。第一人称面经,含面试官对话与解题思路。

Sam · · 16 分钟阅读

公司:Plaid 岗位:Data Engineer (L4) 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer

2026 年 3 月通过内推投递了 Plaid 的 Data Engineer 岗位。整个流程大约 3 周。

Plaid 的 DE 面试最独特的地方是:数据安全和合规贯穿每一轮。 作为金融数据连接平台,Plaid 处理的是用户的银行账号、交易记录、余额等高度敏感信息。面试中大量考察 PCI-DSS 合规、数据脱敏、加密传输和数据审计。


Phone Screen:银行连接成功率分析

题目:计算各银行的数据连接成功率

Plaid 连接到数百家银行获取账户数据,需要监控每个银行的连接成功率和延迟。

-- 连接事件表
-- connection_events: event_id, institution_id, account_id,
--   product_type ('auth'/'transactions'/'balances'),
--   status ('success'/'error'/'timeout'),
--   error_code, error_message,
--   request_time, response_time, latency_ms

-- 我的解答
WITH daily_institution_stats AS (
    SELECT
        DATE(request_time) AS stat_date,
        institution_id,
        product_type,
        COUNT(*) AS total_requests,
        SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
        SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS error_count,
        SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END) AS timeout_count,
        ROUND(AVG(latency_ms), 0) AS avg_latency_ms,
        PERCENTILE_APPROX(latency_ms, 0.95) AS p95_latency_ms
    FROM connection_events
    WHERE request_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
    GROUP BY DATE(request_time), institution_id, product_type
)
SELECT
    stat_date,
    institution_id,
    product_type,
    total_requests,
    success_count,
    ROUND(success_count * 100.0 / total_requests, 2) AS success_rate_pct,
    error_count,
    timeout_count,
    avg_latency_ms,
    p95_latency_ms
FROM daily_institution_stats
WHERE total_requests >= 50  -- 至少 50 次请求才有统计意义
ORDER BY stat_date, institution_id, success_rate_pct;

面试官追问:

“如果某家银行的连接成功率从 95% 突然降到 60%,你怎么排查?”

我回答:

-- 步骤 1: 看错误类型分布
SELECT
    error_code,
    error_message,
    COUNT(*) AS count,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS pct
FROM connection_events
WHERE institution_id = 'chase'
  AND status = 'error'
  AND request_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY error_code, error_message
ORDER BY count DESC;

-- 可能结果:
-- ERR_API_CHANGED | API endpoint returned 404 | 40%
-- ERR_TIMEOUT | Connection timed out after 30s | 35%
-- ERR_AUTH | Invalid credentials | 25%

-- 步骤 2: 看是否是特定 product 出问题
-- 如果 auth 成功但 transactions 失败 → 该银行的交易 API 变了
-- 如果全部 product 都失败 → 银行整体接口有问题

-- 步骤 3: 看时间线
SELECT
    HOUR(request_time) AS hour,
    COUNT(*) AS total,
    ROUND(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)
        * 100.0 / COUNT(*), 2) AS success_rate
FROM connection_events
WHERE institution_id = 'chase'
  AND request_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY HOUR(request_time)
ORDER BY hour;

VO Round 1:数据脱敏管道设计

题目:设计符合 PCI-DSS 的数据脱敏管道

Plaid 需要把银行交易数据提供给合作伙伴做分析,但必须确保敏感信息被脱敏。设计一个数据脱敏管道。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, sha2, substring, length, concat, lit

spark = SparkSession.builder \
    .appName("Plaid_Data_Masking_Pipeline") \
    .getOrCreate()

# ====== 数据脱敏管道 ======

# 原始交易数据(包含 PII)
# transactions: transaction_id, account_id, merchant_name,
#   amount, currency, transaction_date,
#   card_number, cardholder_name, routing_number, account_number

def mask_credit_card(card_number: str) -> str:
    """信用卡号脱敏:只显示后 4 位"""
    if not card_number or len(card_number) < 4:
        return "****"
    return "****-****-****-" + card_number[-4:]

def mask_name(name: str) -> str:
    """姓名脱敏:首字母 + ****"""
    if not name:
        return "****"
    return name[0] + "****"

def mask_account_number(account_number: str) -> str:
    """账号脱敏:只显示后 4 位"""
    if not account_number or len(account_number) < 4:
        return "****"
    return "****" + account_number[-4:]

# 注册 UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

mask_cc_udf = udf(mask_credit_card, StringType())
mask_name_udf = udf(mask_name, StringType())
mask_acct_udf = udf(mask_account_number, StringType())

# 脱敏后的数据
transactions_masked = transactions_df \
    .withColumn("card_number", mask_cc_udf(col("card_number"))) \
    .withColumn("cardholder_name", mask_name_udf(col("cardholder_name"))) \
    .withColumn("routing_number", mask_acct_udf(col("routing_number"))) \
    .withColumn("account_number", mask_acct_udf(col("account_number"))) \
    # 同时生成哈希用于关联(可逆性保护)
    .withColumn("account_hash", sha2(col("account_id"), 256))

# 写入脱敏后的数据湖
transactions_masked.write \
    .format("delta") \
    .mode("append") \
    .save("s3://plaid-data/masked/transactions/")

# ====== 数据审计日志 ======
# 记录谁访问了哪些数据
audit_log = """
CREATE TABLE IF NOT EXISTS data_access_audit (
    audit_id STRING,
    user_id STRING,
    accessed_table STRING,
    access_time TIMESTAMP,
    record_count INT,
    ip_address STRING
);
"""

# 每次查询脱敏数据时,自动记录审计日志
def log_data_access(user_id: str, table: str, record_count: int):
    spark.sql(f"""
        INSERT INTO data_access_audit
        VALUES ('{uuid4()}', '{user_id}', '{table}',
                CURRENT_TIMESTAMP(), {record_count},
                current_ip_address())
    """)

面试官追问:

“如果合作伙伴需要做多表 JOIN,但原始 account_id 已经被哈希了,怎么关联?”

我回答:

# 方案 1: 使用确定性哈希(相同输入产生相同输出)
# SHA-256(account_id) 在多张表中产生相同的 hash
# 用 account_hash 做多表 JOIN

transactions_joined = transactions_masked.join(
    accounts_masked,
    on="account_hash",  # 用哈希值关联
    how="inner"
)

# 方案 2: Tokenization(令牌化)
# 维护一个 token mapping 表(加密存储)
# 明文 → token → 加密存储
# 查询时用 token 关联,不暴露原始值

# 方案 3: k-anonymity(k-匿名化)
# 确保每个分组至少有 k 条记录
# 这样即使知道某些属性,也无法确定具体是哪个人

VO Round 2:系统设计 — 实时交易分析管道

题目:设计 Plaid 的实时交易监控管道

Plaid 需要实时监控异常交易模式(如大额转账、频繁交易),并触发风控告警。

我的架构设计:

┌──────────────────────────────────────────────────────────────┐
│                    Transaction Data Sources                    │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌────────────┐  │
│  │ Bank A   │  │ Bank B   │  │ Bank C   │  │  ... 500+  │  │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬───────┘  │
│       │              │              │               │        │
│       ▼              ▼              ▼               ▼       │
│  ┌────────────────────────────────────────────────────────┐ │
│  │              Kafka (Transaction Stream)                 │ │
│  │  ┌──────────────────────────────────────────────────┐  │ │
│  │  │ topic: raw_transactions                           │  │ │
│  │  │ partition key: account_id                         │  │ │
│  │  └──────────────────────────────────────────────────┘  │ │
│  └──────────────────────────┬─────────────────────────────┘ │
│                             │                               │
│                             ▼                               │
│  ┌────────────────────────────────────────────────────────┐ │
│  │              Spark Structured Streaming                  │ │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐             │ │
│  │  │ Enrich   │  │ Anomaly  │  │ Risk     │             │ │
│  │  │ (join    │  │ Detection│  │ Scoring  │             │ │
│  │  │  dim)    │  │          │  │          │             │ │
│  │  └──────────┘  └──────────┘  └──────────┘             │ │
│  └──────────┬────────────────────────────────────────────┘ │
│             │                                              │
│      ┌──────┴──────┐                                       │
│      ▼             ▼                                       │
│ ┌──────────┐  ┌──────────┐                                │
│ │ Delta    │  │ Alert    │                                │
│ │ Lake     │  │ Queue    │                                │
│ │ (History)│  │ (Kafka)  │                                │
│ └──────────┘  └──────────┘                                │
│                          │                                 │
│                          ▼                                 │
│                   ┌──────────┐                             │
│                   │ Risk     │                             │
│                   │ Console  │                             │
│                   │ (Dashboard)                            │
│                   └──────────┘                             │
└──────────────────────────────────────────────────────────────┘

异常检测逻辑:

from pyspark.sql.functions import col, avg, stddev, when, count, window

# ====== 异常检测规则 ======

# 规则 1: 大额交易(超过用户平均金额的 3 倍)
user_avg = spark.read.format("delta").load("s3://plaid-data/user_profiles/")

large_transaction = transactions_stream \
    .withWatermark("transaction_time", "10 minutes") \
    .join(
        user_avg,
        transactions_stream.account_id == user_avg.account_id,
        "left"
    ) \
    .filter(
        col("amount") > col("avg_monthly_amount") * 3
    ) \
    .withColumn("risk_score", lit(80)) \
    .withColumn("alert_type", lit("LARGE_TRANSACTION"))

# 规则 2: 频繁交易(1 小时内超过 10 笔)
frequent_transaction = transactions_stream \
    .withWatermark("transaction_time", "2 hours") \
    .groupBy(
        window(col("transaction_time"), "1 hour"),
        col("account_id")
    ) \
    .agg(count("*").alias("tx_count")) \
    .filter(col("tx_count") > 10) \
    .withColumn("risk_score", lit(70)) \
    .withColumn("alert_type", lit("FREQUENT_TRANSACTION"))

# 规则 3: 非常用地点交易(与用户常用地点不同)
unusual_location = transactions_stream \
    .join(
        user_locations,  # 用户常用交易地点
        transactions_stream.account_id == user_locations.account_id,
        "left"
    ) \
    .filter(
        (col("transaction_city") != col("usual_city")) &
        (col("transaction_state") != col("usual_state"))
    ) \
    .withColumn("risk_score", lit(60)) \
    .withColumn("alert_type", lit("UNUSUAL_LOCATION"))

# 合并所有异常
all_alerts = large_transaction.unionByName(frequent_transaction) \
    .unionByName(unusual_location)

# 写入告警队列
all_alerts.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "risk_alerts.topic") \
    .start()

面试官追问:

“如果误报率很高怎么办?比如很多大额交易其实是正常的(如购房、买车)”

我回答:

# 方案 1: 白名单机制
# 商户白名单(房产中介、汽车经销商等)
whitelist_merchants = spark.read \
    .format("delta") \
    .load("s3://plaid-data/config/whitelist_merchants/")

# 排除白名单商户
large_transaction = large_transaction \
    .join(whitelist_merchants, on="merchant_name", how="left_anti")

# 方案 2: 机器学习模型
# 训练一个分类模型,输入用户历史行为 + 当前交易特征
# 输出异常概率
from pyspark.ml.classification import RandomForestClassifier

# 特征工程
features = [
    "amount", "amount_vs_avg_ratio", "tx_count_last_hour",
    "location_distance_from_usual", "time_of_day",
    "day_of_week", "merchant_category"
]

# 训练模型
rf = RandomForestClassifier(featuresCol="features", labelCol="is_fraud")
model = rf.fit(training_data)

# 预测
predictions = model.transform(testing_data)

VO Round 3:行为面试

”讲一次你处理数据泄露或隐私事件的经历”

我分享了一个真实经历:之前公司的一个数据管道把客户的银行账号明文写到了日志文件中。

处理过程:

  1. 立即停止管道运行
  2. 评估影响范围——哪些日志文件包含敏感数据
  3. 紧急清理——删除包含 PII 的日志文件
  4. 根因分析——日志框架没有做字段过滤
  5. 长期修复——加了字段级的敏感信息检测和自动脱敏

面试总结

成功经验

  1. PCI-DSS 合规知识:理解数据加密、访问控制、审计日志等要求
  2. 数据脱敏技术:知道哈希、令牌化、k-匿名化等脱敏方法
  3. 风控思维:能从风险角度设计数据管道

注意事项

  1. 金融行业监管:了解 GDPR、CCPA、PCI-DSS 等合规要求
  2. 数据最小化原则:只收集和处理必要的字段
  3. 审计追踪:所有数据访问都要有完整的审计日志

推荐阅读


💡 需要面试辅导?

联系我们

准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们