plaiddata-engineerinterviewpci-dsscompliancesparkkafkafin-techdata-pipeline
Plaid 数据工程师面试实录 2026:金融数据管道 + PCI-DSS 合规脱敏 完整复盘
Plaid Data Engineer 面试真实经历:银行数据连接管道、PCI-DSS 合规数据脱敏、金融数据质量监控、实时交易分析管道设计完整复盘。第一人称面经,含面试官对话与解题思路。
Sam · · 16 分钟阅读
公司:Plaid 岗位:Data Engineer (L4) 面试形式:Phone Screen + Virtual Onsite (4 轮) 结果:Pass → Offer
2026 年 3 月通过内推投递了 Plaid 的 Data Engineer 岗位。整个流程大约 3 周。
Plaid 的 DE 面试最独特的地方是:数据安全和合规贯穿每一轮。 作为金融数据连接平台,Plaid 处理的是用户的银行账号、交易记录、余额等高度敏感信息。面试中大量考察 PCI-DSS 合规、数据脱敏、加密传输和数据审计。
Phone Screen:银行连接成功率分析
题目:计算各银行的数据连接成功率
Plaid 连接到数百家银行获取账户数据,需要监控每个银行的连接成功率和延迟。
-- 连接事件表
-- connection_events: event_id, institution_id, account_id,
-- product_type ('auth'/'transactions'/'balances'),
-- status ('success'/'error'/'timeout'),
-- error_code, error_message,
-- request_time, response_time, latency_ms
-- 我的解答
WITH daily_institution_stats AS (
SELECT
DATE(request_time) AS stat_date,
institution_id,
product_type,
COUNT(*) AS total_requests,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS error_count,
SUM(CASE WHEN status = 'timeout' THEN 1 ELSE 0 END) AS timeout_count,
ROUND(AVG(latency_ms), 0) AS avg_latency_ms,
PERCENTILE_APPROX(latency_ms, 0.95) AS p95_latency_ms
FROM connection_events
WHERE request_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY DATE(request_time), institution_id, product_type
)
SELECT
stat_date,
institution_id,
product_type,
total_requests,
success_count,
ROUND(success_count * 100.0 / total_requests, 2) AS success_rate_pct,
error_count,
timeout_count,
avg_latency_ms,
p95_latency_ms
FROM daily_institution_stats
WHERE total_requests >= 50 -- 至少 50 次请求才有统计意义
ORDER BY stat_date, institution_id, success_rate_pct;
面试官追问:
“如果某家银行的连接成功率从 95% 突然降到 60%,你怎么排查?”
我回答:
-- 步骤 1: 看错误类型分布
SELECT
error_code,
error_message,
COUNT(*) AS count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) AS pct
FROM connection_events
WHERE institution_id = 'chase'
AND status = 'error'
AND request_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY error_code, error_message
ORDER BY count DESC;
-- 可能结果:
-- ERR_API_CHANGED | API endpoint returned 404 | 40%
-- ERR_TIMEOUT | Connection timed out after 30s | 35%
-- ERR_AUTH | Invalid credentials | 25%
-- 步骤 2: 看是否是特定 product 出问题
-- 如果 auth 成功但 transactions 失败 → 该银行的交易 API 变了
-- 如果全部 product 都失败 → 银行整体接口有问题
-- 步骤 3: 看时间线
SELECT
HOUR(request_time) AS hour,
COUNT(*) AS total,
ROUND(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END)
* 100.0 / COUNT(*), 2) AS success_rate
FROM connection_events
WHERE institution_id = 'chase'
AND request_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY HOUR(request_time)
ORDER BY hour;
VO Round 1:数据脱敏管道设计
题目:设计符合 PCI-DSS 的数据脱敏管道
Plaid 需要把银行交易数据提供给合作伙伴做分析,但必须确保敏感信息被脱敏。设计一个数据脱敏管道。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, sha2, substring, length, concat, lit
spark = SparkSession.builder \
.appName("Plaid_Data_Masking_Pipeline") \
.getOrCreate()
# ====== 数据脱敏管道 ======
# 原始交易数据(包含 PII)
# transactions: transaction_id, account_id, merchant_name,
# amount, currency, transaction_date,
# card_number, cardholder_name, routing_number, account_number
def mask_credit_card(card_number: str) -> str:
"""信用卡号脱敏:只显示后 4 位"""
if not card_number or len(card_number) < 4:
return "****"
return "****-****-****-" + card_number[-4:]
def mask_name(name: str) -> str:
"""姓名脱敏:首字母 + ****"""
if not name:
return "****"
return name[0] + "****"
def mask_account_number(account_number: str) -> str:
"""账号脱敏:只显示后 4 位"""
if not account_number or len(account_number) < 4:
return "****"
return "****" + account_number[-4:]
# 注册 UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
mask_cc_udf = udf(mask_credit_card, StringType())
mask_name_udf = udf(mask_name, StringType())
mask_acct_udf = udf(mask_account_number, StringType())
# 脱敏后的数据
transactions_masked = transactions_df \
.withColumn("card_number", mask_cc_udf(col("card_number"))) \
.withColumn("cardholder_name", mask_name_udf(col("cardholder_name"))) \
.withColumn("routing_number", mask_acct_udf(col("routing_number"))) \
.withColumn("account_number", mask_acct_udf(col("account_number"))) \
# 同时生成哈希用于关联(可逆性保护)
.withColumn("account_hash", sha2(col("account_id"), 256))
# 写入脱敏后的数据湖
transactions_masked.write \
.format("delta") \
.mode("append") \
.save("s3://plaid-data/masked/transactions/")
# ====== 数据审计日志 ======
# 记录谁访问了哪些数据
audit_log = """
CREATE TABLE IF NOT EXISTS data_access_audit (
audit_id STRING,
user_id STRING,
accessed_table STRING,
access_time TIMESTAMP,
record_count INT,
ip_address STRING
);
"""
# 每次查询脱敏数据时,自动记录审计日志
def log_data_access(user_id: str, table: str, record_count: int):
spark.sql(f"""
INSERT INTO data_access_audit
VALUES ('{uuid4()}', '{user_id}', '{table}',
CURRENT_TIMESTAMP(), {record_count},
current_ip_address())
""")
面试官追问:
“如果合作伙伴需要做多表 JOIN,但原始 account_id 已经被哈希了,怎么关联?”
我回答:
# 方案 1: 使用确定性哈希(相同输入产生相同输出)
# SHA-256(account_id) 在多张表中产生相同的 hash
# 用 account_hash 做多表 JOIN
transactions_joined = transactions_masked.join(
accounts_masked,
on="account_hash", # 用哈希值关联
how="inner"
)
# 方案 2: Tokenization(令牌化)
# 维护一个 token mapping 表(加密存储)
# 明文 → token → 加密存储
# 查询时用 token 关联,不暴露原始值
# 方案 3: k-anonymity(k-匿名化)
# 确保每个分组至少有 k 条记录
# 这样即使知道某些属性,也无法确定具体是哪个人
VO Round 2:系统设计 — 实时交易分析管道
题目:设计 Plaid 的实时交易监控管道
Plaid 需要实时监控异常交易模式(如大额转账、频繁交易),并触发风控告警。
我的架构设计:
┌──────────────────────────────────────────────────────────────┐
│ Transaction Data Sources │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │
│ │ Bank A │ │ Bank B │ │ Bank C │ │ ... 500+ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬───────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Kafka (Transaction Stream) │ │
│ │ ┌──────────────────────────────────────────────────┐ │ │
│ │ │ topic: raw_transactions │ │ │
│ │ │ partition key: account_id │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ └──────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Spark Structured Streaming │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Enrich │ │ Anomaly │ │ Risk │ │ │
│ │ │ (join │ │ Detection│ │ Scoring │ │ │
│ │ │ dim) │ │ │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────┬────────────────────────────────────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Delta │ │ Alert │ │
│ │ Lake │ │ Queue │ │
│ │ (History)│ │ (Kafka) │ │
│ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ Risk │ │
│ │ Console │ │
│ │ (Dashboard) │
│ └──────────┘ │
└──────────────────────────────────────────────────────────────┘
异常检测逻辑:
from pyspark.sql.functions import col, avg, stddev, when, count, window
# ====== 异常检测规则 ======
# 规则 1: 大额交易(超过用户平均金额的 3 倍)
user_avg = spark.read.format("delta").load("s3://plaid-data/user_profiles/")
large_transaction = transactions_stream \
.withWatermark("transaction_time", "10 minutes") \
.join(
user_avg,
transactions_stream.account_id == user_avg.account_id,
"left"
) \
.filter(
col("amount") > col("avg_monthly_amount") * 3
) \
.withColumn("risk_score", lit(80)) \
.withColumn("alert_type", lit("LARGE_TRANSACTION"))
# 规则 2: 频繁交易(1 小时内超过 10 笔)
frequent_transaction = transactions_stream \
.withWatermark("transaction_time", "2 hours") \
.groupBy(
window(col("transaction_time"), "1 hour"),
col("account_id")
) \
.agg(count("*").alias("tx_count")) \
.filter(col("tx_count") > 10) \
.withColumn("risk_score", lit(70)) \
.withColumn("alert_type", lit("FREQUENT_TRANSACTION"))
# 规则 3: 非常用地点交易(与用户常用地点不同)
unusual_location = transactions_stream \
.join(
user_locations, # 用户常用交易地点
transactions_stream.account_id == user_locations.account_id,
"left"
) \
.filter(
(col("transaction_city") != col("usual_city")) &
(col("transaction_state") != col("usual_state"))
) \
.withColumn("risk_score", lit(60)) \
.withColumn("alert_type", lit("UNUSUAL_LOCATION"))
# 合并所有异常
all_alerts = large_transaction.unionByName(frequent_transaction) \
.unionByName(unusual_location)
# 写入告警队列
all_alerts.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "risk_alerts.topic") \
.start()
面试官追问:
“如果误报率很高怎么办?比如很多大额交易其实是正常的(如购房、买车)”
我回答:
# 方案 1: 白名单机制
# 商户白名单(房产中介、汽车经销商等)
whitelist_merchants = spark.read \
.format("delta") \
.load("s3://plaid-data/config/whitelist_merchants/")
# 排除白名单商户
large_transaction = large_transaction \
.join(whitelist_merchants, on="merchant_name", how="left_anti")
# 方案 2: 机器学习模型
# 训练一个分类模型,输入用户历史行为 + 当前交易特征
# 输出异常概率
from pyspark.ml.classification import RandomForestClassifier
# 特征工程
features = [
"amount", "amount_vs_avg_ratio", "tx_count_last_hour",
"location_distance_from_usual", "time_of_day",
"day_of_week", "merchant_category"
]
# 训练模型
rf = RandomForestClassifier(featuresCol="features", labelCol="is_fraud")
model = rf.fit(training_data)
# 预测
predictions = model.transform(testing_data)
VO Round 3:行为面试
”讲一次你处理数据泄露或隐私事件的经历”
我分享了一个真实经历:之前公司的一个数据管道把客户的银行账号明文写到了日志文件中。
处理过程:
- 立即停止管道运行
- 评估影响范围——哪些日志文件包含敏感数据
- 紧急清理——删除包含 PII 的日志文件
- 根因分析——日志框架没有做字段过滤
- 长期修复——加了字段级的敏感信息检测和自动脱敏
面试总结
成功经验
- PCI-DSS 合规知识:理解数据加密、访问控制、审计日志等要求
- 数据脱敏技术:知道哈希、令牌化、k-匿名化等脱敏方法
- 风控思维:能从风险角度设计数据管道
注意事项
- 金融行业监管:了解 GDPR、CCPA、PCI-DSS 等合规要求
- 数据最小化原则:只收集和处理必要的字段
- 审计追踪:所有数据访问都要有完整的审计日志