Coinbase 系统设计面试实录 2026:真实面经完整复盘
Coinbase面试第一人称完整复盘:涵盖算法Coding、系统设计、Behavioral面试。还原真实面试对话、高频题目与解题思路,附准备策略与注意事项,助你高效备战Coinbase技术面试。
公司:Coinbase 岗位:系统设计 (System Design) 面试形式:Virtual Onsite 结果:Pass → Offer
大家好,我是 Sam。这次分享 Coinbase SDE 面试的完整经历。Coinbase 的面试风格非常独特:OA 不像常规算法题而是工程实现题,Coding 面注重接口抽象和问题建模,System Design 强调异步编程思维。如果你习惯了传统 FAANG 的面试套路,Coinbase 会让你感觉焕然一新。
Online Assessment:工程实现能力优先
Coinbase 的 OA 更像工程代码实现题而非常规算法。题目是构建一个 Banking Account System,有四问,逐步增加复杂度。
第一问:基本账户操作
题目:实现一个银行账户系统,支持开户、存款、取款、转账、查余额。
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
class InsufficientFundsError(Exception):
pass
class AccountNotFoundError(Exception):
pass
@dataclass
class Transaction:
"""交易记录"""
tx_id: str
from_account: Optional[str]
to_account: Optional[str]
amount: float
timestamp: str
tx_type: str # deposit, withdraw, transfer
@dataclass
class Account:
"""银行账户"""
account_id: str
balance: float = 0.0
transactions: list[Transaction] = field(default_factory=list)
created_at: str = ""
is_active: bool = True
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
class BankSystem:
"""
银行账户系统 — 基础版。
支持:开户、存款、取款、转账、查余额、交易历史。
"""
def __init__(self):
self.accounts: dict[str, Account] = {}
self.tx_counter = 0
def create_account(self, account_id: str) -> Account:
"""开户"""
if account_id in self.accounts:
raise ValueError(f"Account {account_id} already exists")
account = Account(account_id=account_id)
self.accounts[account_id] = account
return account
def deposit(self, account_id: str, amount: float) -> Transaction:
"""存款"""
self._validate_account(account_id)
if amount <= 0:
raise ValueError("Deposit amount must be positive")
self.tx_counter += 1
tx = Transaction(
tx_id=f"tx_{self.tx_counter}",
from_account=None,
to_account=account_id,
amount=amount,
timestamp=datetime.now().isoformat(),
tx_type="deposit"
)
self.accounts[account_id].balance += amount
self.accounts[account_id].transactions.append(tx)
return tx
def withdraw(self, account_id: str, amount: float) -> Transaction:
"""取款"""
self._validate_account(account_id)
if amount <= 0:
raise ValueError("Withdraw amount must be positive")
if self.accounts[account_id].balance < amount:
raise InsufficientFundsError(
f"Insufficient funds. Balance: {self.accounts[account_id].balance}"
)
self.tx_counter += 1
tx = Transaction(
tx_id=f"tx_{self.tx_counter}",
from_account=account_id,
to_account=None,
amount=amount,
timestamp=datetime.now().isoformat(),
tx_type="withdraw"
)
self.accounts[account_id].balance -= amount
self.accounts[account_id].transactions.append(tx)
return tx
def transfer(self, from_id: str, to_id: str, amount: float) -> Transaction:
"""转账"""
self._validate_account(from_id)
self._validate_account(to_id)
if amount <= 0:
raise ValueError("Transfer amount must be positive")
if self.accounts[from_id].balance < amount:
raise InsufficientFundsError(
f"Insufficient funds. Balance: {self.accounts[from_id].balance}"
)
self.tx_counter += 1
tx = Transaction(
tx_id=f"tx_{self.tx_counter}",
from_account=from_id,
to_account=to_id,
amount=amount,
timestamp=datetime.now().isoformat(),
tx_type="transfer"
)
# 原子转账:先扣后加
self.accounts[from_id].balance -= amount
self.accounts[to_id].balance += amount
# 双方都记录交易
self.accounts[from_id].transactions.append(tx)
tx_to = Transaction(**tx.__dict__) # 复制一份给收款方
self.accounts[to_id].transactions.append(tx_to)
return tx
def get_balance(self, account_id: str) -> float:
return self.accounts[account_id].balance
def _validate_account(self, account_id: str) -> None:
if account_id not in self.accounts:
raise AccountNotFoundError(f"Account {account_id} not found")
第二问:CAS 操作(Compare-And-Set)
题目:增加 CompareAndSet 操作。只有当账户当前余额等于预期值时,才设置为新值。
class BankSystemCAS(BankSystem):
"""
扩展版本:支持 CAS 操作和条件删除。
"""
def compare_and_set(self, account_id: str, expected: float, new_balance: float) -> bool:
"""
CAS 操作:原子地比较并设置余额。
只有当当前余额 == expected 时,才设置为 new_balance。
返回是否成功。
"""
self._validate_account(account_id)
account = self.accounts[account_id]
if account.balance != expected:
return False
account.balance = new_balance
return True
def conditional_delete(self, account_id: str, expected_balance: float) -> bool:
"""
条件删除:只有当余额等于预期值时才删除账户。
"""
self._validate_account(account_id)
if self.accounts[account_id].balance != expected_balance:
return False
del self.accounts[account_id]
return True
第三问:Time Travel(时间回溯)
题目:支持查询某个时间点的余额。
@dataclass
class BalanceSnapshot:
"""余额快照"""
timestamp: str
balance: float
tx_id: str
class BankSystemTimeTravel(BankSystemCAS):
"""
扩展版本:支持 time travel 查询。
维护一个有序快照列表,支持按时间二分查找。
"""
def __init__(self):
super().__init__()
self.snapshots: dict[str, list[BalanceSnapshot]] = {}
# 每个账户在创建时有一个初始快照
# 初始快照在 create_account 中创建
def create_account(self, account_id: str) -> Account:
account = super().create_account(account_id)
self.snapshots[account_id] = [
BalanceSnapshot(
timestamp=account.created_at,
balance=0.0,
tx_id="init"
)
]
return account
def _add_snapshot(self, account_id: str, tx: Transaction) -> None:
"""交易后记录快照"""
if account_id not in self.snapshots:
return
balance = self.accounts[account_id].balance
snapshot = BalanceSnapshot(
timestamp=tx.timestamp,
balance=balance,
tx_id=tx.tx_id
)
self.snapshots[account_id].append(snapshot)
def get_balance_at(self, account_id: str, timestamp: str) -> float:
"""
查询某个时间点的余额。
在快照列表中找最后一个 timestamp <= 查询时间 的快照。
使用二分查找:O(log n)
"""
from bisect import bisect_right
if account_id not in self.snapshots:
raise AccountNotFoundError(f"Account {account_id} not found")
snapshots = self.snapshots[account_id]
# 二分查找
idx = bisect_right(snapshots, timestamp, key=lambda s: s.timestamp)
if idx == 0:
raise ValueError(f"No data before {timestamp}")
return snapshots[idx - 1].balance
第四问:Sliding Window 聚合
题目:查询某个账户在滑动窗口内的交易聚合(总存款、总取款、交易次数)。
from datetime import datetime, timedelta
from collections import deque
class AggregationResult:
def __init__(self):
self.total_deposits = 0.0
self.total_withdrawals = 0.0
self.transaction_count = 0
self.transactions: list[Transaction] = []
def __repr__(self):
return (f"AggregationResult(deposits={self.total_deposits:.2f}, "
f"withdrawals={self.total_withdrawals:.2f}, "
f"count={self.transaction_count})")
class SlidingWindowAggregator:
"""
滑动窗口聚合器。
处理乱序事件流,按时间窗口聚合交易数据。
"""
def __init__(self, window_seconds: int = 3600):
self.window = timedelta(seconds=window_seconds)
# 按账户维护有序事件队列
self.events: dict[str, deque[tuple[str, Transaction]]] = defaultdict(deque)
def add_event(self, account_id: str, tx: Transaction) -> None:
"""添加事件(可能乱序)"""
ts = datetime.fromisoformat(tx.timestamp)
self.events[account_id].append((tx.timestamp, tx))
def get_aggregation(self, account_id: str,
end_time: datetime = None) -> AggregationResult:
"""获取当前滑动窗口内的聚合结果"""
if end_time is None:
end_time = datetime.now()
start_time = end_time - self.window
result = AggregationResult()
for tx_ts_str, tx in self.events.get(account_id, []):
tx_time = datetime.fromisoformat(tx_ts_str)
if start_time <= tx_time <= end_time:
result.transaction_count += 1
if tx.tx_type == "deposit":
result.total_deposits += tx.amount
elif tx.tx_type == "withdraw":
result.total_withdrawals += tx.amount
result.transactions.append(tx)
return result
Coding 面试:数据结构只是基础,问题建模才是关键
Interleave Iterator 题目
面试官:实现一个 Interleave Iterator,接受多个列表,依次从每个列表中轮流输出元素。
from typing import Iterator, List
class InterleaveIterator:
"""
交错迭代器。
接受多个列表,按轮询顺序输出元素。
[1,2,3], [4,5], [6] -> 1, 4, 6, 2, 5, 3
"""
def __init__(self, lists: list[list]):
self.lists = lists
self.indices = [0] * len(lists)
self.total_elements = sum(len(lst) for lst in lists)
self.current = 0
def __iter__(self):
return self
def __next__(self):
if self.current >= self.total_elements:
raise StopIteration
self.current += 1
# 找到下一个有元素的列表
for i in range(len(self.lists)):
if self.indices[i] < len(self.lists[i]):
value = self.lists[i][self.indices[i]]
self.indices[i] += 1
return value
raise StopIteration
# 扩展:StepIterator — 步长控制
class StepIterator(InterleaveIterator):
"""每步取 N 个元素后再换列表"""
def __init__(self, lists: list[list], step: int = 1):
super().__init__(lists)
self.step = step
self.step_counter = 0
def __next__(self):
if self.current >= self.total_elements:
raise StopIteration
# 在当前列表上取 step 个元素
for _ in range(self.step):
for i in range(len(self.lists)):
if self.indices[i] < len(self.lists[i]):
self.current += 1
self.step_counter += 1
value = self.lists[i][self.indices[i]]
self.indices[i] += 1
return value
# 如果当前列表取完了 step 个,切换到下一个
# 简化实现
raise StopIteration
面试官:如果列表是无限流呢? 我:这时候不能预先计算 total_elements。可以用 Round-Robin 方式,每次遍历所有列表,检查每个列表是否有下一个元素。没有就跳过。这样可以处理无限流。
系统设计:真实场景 + 异步模型 + 可扩展接口
加密货币买入系统
面试官:设计一个买入加密货币的系统。从多个 broker 获取报价并选择最低价下单。所有 broker API 都是 async 的。
系统架构:
┌──────────────────────────────────────────────────────┐
│ Client (Web/Mobile) │
└──────────────────────┬───────────────────────────────┘
│ async REST
▼
┌──────────────────────────────────────────────────────┐
│ API Gateway (Koa / Go) │
│ async/await 异步路由层 │
└──────────┬───────────────────────────────┬────────────┘
│ │
▼ ▼
┌─────────────────────┐ ┌──────────────────────────┐
│ Buy Service │ │ Order Management │
│ │ │ │
│ 1. Fetch quotes │ │ - Order creation │
│ 2. Select lowest │ │ - Order tracking │
│ 3. Place order │─────►│ - Status updates │
│ 4. Confirm │ │ - WebSocket push │
└─────────┬───────────┘ └──────────┬───────────────┘
│ │
▼ ▼
┌─────────────────────┐ ┌──────────────────────────┐
│ Broker Adapters │ │ Persistence Layer │
│ │ │ │
│ ┌──────┐ ┌──────┐ │ │ PostgreSQL (Orders) │
│ │B1 │ │B2 │ │ │ Redis (Quotes cache) │
│ │Async │ │Async │ │ │ Kafka (Order events) │
│ └──────┘ └──────┘ │ └──────────────────────────┘
└─────────────────────┘
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class Quote:
broker_id: str
price: float
available_amount: float
latency_ms: int
class BrokerAdapter(ABC):
"""Broker 适配器接口"""
@abstractmethod
async def get_quote(self, symbol: str, amount: float) -> Optional[Quote]:
pass
@abstractmethod
async def place_order(self, symbol: str, amount: float, price: float) -> str:
pass
class BrokerClient(BrokerAdapter):
"""具体的 Broker 客户端实现"""
def __init__(self, broker_id: str, base_url: str, timeout: float = 5.0):
self.broker_id = broker_id
self.base_url = base_url
self.timeout = timeout
async def get_quote(self, symbol: str, amount: float) -> Optional[Quote]:
"""异步获取报价,带超时和错误处理"""
try:
start = asyncio.get_event_loop().time()
# 模拟异步请求
response = await asyncio.wait_for(
self._fetch_quote(symbol, amount),
timeout=self.timeout
)
latency = (asyncio.get_event_loop().time() - start) * 1000
return Quote(
broker_id=self.broker_id,
price=response["price"],
available_amount=response["available"],
latency_ms=int(latency)
)
except asyncio.TimeoutError:
print(f"Broker {self.broker_id} timed out")
return None
except Exception as e:
print(f"Broker {self.broker_id} error: {e}")
return None
async def place_order(self, symbol: str, amount: float, price: float) -> str:
"""异步下单"""
# 模拟下单
return f"order_{self.broker_id}_{symbol}"
async def _fetch_quote(self, symbol: str, amount: float) -> dict:
"""模拟 API 请求"""
await asyncio.sleep(0.1) # 模拟网络延迟
return {"price": 100.0 + hash(symbol) % 10, "available": 1000.0}
class CryptoBuyService:
"""
加密货币买入服务。
核心逻辑:并发获取所有 broker 报价,选择最优价格下单。
"""
def __init__(self, brokers: list[BrokerAdapter]):
self.brokers = brokers
async def buy(self, symbol: str, amount: float) -> dict:
"""
买入加密货币。
1. 并发获取所有 broker 报价
2. 选择价格最低的可用 broker
3. 下单
4. 返回结果
"""
# 步骤 1:并发获取报价
quote_tasks = [
broker.get_quote(symbol, amount)
for broker in self.brokers
]
quotes = await asyncio.gather(*quote_tasks, return_exceptions=True)
# 过滤有效报价
valid_quotes = []
for q in quotes:
if isinstance(q, Exception):
continue
if q is not None and q.available_amount >= amount:
valid_quotes.append(q)
if not valid_quotes:
return {
"status": "failed",
"reason": "No broker available with sufficient funds"
}
# 步骤 2:选择最优报价(价格最低 + 延迟最低)
best_quote = min(valid_quotes,
key=lambda q: (q.price, q.latency_ms))
# 步骤 3:下单
order_id = await self._place_with_fallback(
symbol, amount, best_quote
)
return {
"status": "success",
"order_id": order_id,
"broker": best_quote.broker_id,
"price": best_quote.price,
"amount": amount
}
async def _place_with_fallback(self, symbol: str, amount: float,
best_quote: Quote) -> str:
"""下单,带 fallback 到第二低价"""
broker = next(b for b in self.brokers
if b.broker_id == best_quote.broker_id)
try:
order_id = await asyncio.wait_for(
broker.place_order(symbol, amount, best_quote.price),
timeout=10.0
)
return order_id
except (asyncio.TimeoutError, Exception):
# Fallback: 找第二低价的 broker
print(f"Primary broker failed, trying fallback...")
# 实际实现中需要重新获取报价
return "fallback_order_id"
面试官:如果某个 broker 频繁超时,怎么办? 我:可以用熔断器模式(Circuit Breaker)。连续失败 N 次后,暂时跳过这个 broker,过一段时间再试。同时记录每个 broker 的成功率和延迟,动态调整优先级。
BQ 面试:没有标准套路,重视动机与技术深度
面试官:Why Coinbase? 我:(结合 Coinbase 在 Web3 基建、合规风控等领域的实际业务方向,讲述了为什么感兴趣)
面试官:讲讲你做的最有影响力的项目。 我:(用 STAR 框架,重点讲技术挑战和量化影响)
面试总结
成功经验
- OA 要体现工程思维:不是纯算法,而是完整的系统设计实现。
- Coding 要关注接口抽象:Coinbase 喜欢问 iterator、stream 这类设计题,考察 abstraction 能力。
- System Design 要 async-first:从设计开始就要按异步模型思考。
- BQ 要结合业务:Why Coinbase 要具体到公司的实际业务方向。
面试注意事项
时间管理:OA 时间紧,第一问 15 分钟,后续每问 12-15 分钟。
技术深度:Coinbase 特别看重异步编程能力和系统边界的思考。
推荐阅读
- Coinbase 面试全流程指南 — Coinbase 面试流程、高频题目与准备策略
- System Design 面试完全攻略 — 分布式系统设计的核心原则与高频题目
- 行为面试 STAR 故事模板 — Leadership、决策、冲突解决等高频行为问题的回答框架
💡 需要面试辅导?
如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。
👉 联系我们 获取专属面试准备方案