Coinbase 系统设计面试实录 2026:真实面经完整复盘
Coinbase面试系统设计面试VO面试真实面经算法题SystemDesign

Coinbase 系统设计面试实录 2026:真实面经完整复盘

Coinbase面试第一人称完整复盘:涵盖算法Coding、系统设计、Behavioral面试。还原真实面试对话、高频题目与解题思路,附准备策略与注意事项,助你高效备战Coinbase技术面试。

Sam · · 15 分钟阅读

公司:Coinbase 岗位:系统设计 (System Design) 面试形式:Virtual Onsite 结果:Pass → Offer

大家好,我是 Sam。这次分享 Coinbase SDE 面试的完整经历。Coinbase 的面试风格非常独特:OA 不像常规算法题而是工程实现题,Coding 面注重接口抽象和问题建模,System Design 强调异步编程思维。如果你习惯了传统 FAANG 的面试套路,Coinbase 会让你感觉焕然一新。

Online Assessment:工程实现能力优先

Coinbase 的 OA 更像工程代码实现题而非常规算法。题目是构建一个 Banking Account System,有四问,逐步增加复杂度。

第一问:基本账户操作

题目:实现一个银行账户系统,支持开户、存款、取款、转账、查余额。

from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional

class InsufficientFundsError(Exception):
    pass

class AccountNotFoundError(Exception):
    pass

@dataclass
class Transaction:
    """交易记录"""
    tx_id: str
    from_account: Optional[str]
    to_account: Optional[str]
    amount: float
    timestamp: str
    tx_type: str  # deposit, withdraw, transfer

@dataclass
class Account:
    """银行账户"""
    account_id: str
    balance: float = 0.0
    transactions: list[Transaction] = field(default_factory=list)
    created_at: str = ""
    is_active: bool = True
    
    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.now().isoformat()

class BankSystem:
    """
    银行账户系统 — 基础版。
    
    支持:开户、存款、取款、转账、查余额、交易历史。
    """
    
    def __init__(self):
        self.accounts: dict[str, Account] = {}
        self.tx_counter = 0
    
    def create_account(self, account_id: str) -> Account:
        """开户"""
        if account_id in self.accounts:
            raise ValueError(f"Account {account_id} already exists")
        account = Account(account_id=account_id)
        self.accounts[account_id] = account
        return account
    
    def deposit(self, account_id: str, amount: float) -> Transaction:
        """存款"""
        self._validate_account(account_id)
        if amount <= 0:
            raise ValueError("Deposit amount must be positive")
        
        self.tx_counter += 1
        tx = Transaction(
            tx_id=f"tx_{self.tx_counter}",
            from_account=None,
            to_account=account_id,
            amount=amount,
            timestamp=datetime.now().isoformat(),
            tx_type="deposit"
        )
        
        self.accounts[account_id].balance += amount
        self.accounts[account_id].transactions.append(tx)
        return tx
    
    def withdraw(self, account_id: str, amount: float) -> Transaction:
        """取款"""
        self._validate_account(account_id)
        if amount <= 0:
            raise ValueError("Withdraw amount must be positive")
        if self.accounts[account_id].balance < amount:
            raise InsufficientFundsError(
                f"Insufficient funds. Balance: {self.accounts[account_id].balance}"
            )
        
        self.tx_counter += 1
        tx = Transaction(
            tx_id=f"tx_{self.tx_counter}",
            from_account=account_id,
            to_account=None,
            amount=amount,
            timestamp=datetime.now().isoformat(),
            tx_type="withdraw"
        )
        
        self.accounts[account_id].balance -= amount
        self.accounts[account_id].transactions.append(tx)
        return tx
    
    def transfer(self, from_id: str, to_id: str, amount: float) -> Transaction:
        """转账"""
        self._validate_account(from_id)
        self._validate_account(to_id)
        if amount <= 0:
            raise ValueError("Transfer amount must be positive")
        if self.accounts[from_id].balance < amount:
            raise InsufficientFundsError(
                f"Insufficient funds. Balance: {self.accounts[from_id].balance}"
            )
        
        self.tx_counter += 1
        tx = Transaction(
            tx_id=f"tx_{self.tx_counter}",
            from_account=from_id,
            to_account=to_id,
            amount=amount,
            timestamp=datetime.now().isoformat(),
            tx_type="transfer"
        )
        
        # 原子转账:先扣后加
        self.accounts[from_id].balance -= amount
        self.accounts[to_id].balance += amount
        
        # 双方都记录交易
        self.accounts[from_id].transactions.append(tx)
        tx_to = Transaction(**tx.__dict__)  # 复制一份给收款方
        self.accounts[to_id].transactions.append(tx_to)
        
        return tx
    
    def get_balance(self, account_id: str) -> float:
        return self.accounts[account_id].balance
    
    def _validate_account(self, account_id: str) -> None:
        if account_id not in self.accounts:
            raise AccountNotFoundError(f"Account {account_id} not found")

第二问:CAS 操作(Compare-And-Set)

题目:增加 CompareAndSet 操作。只有当账户当前余额等于预期值时,才设置为新值。

class BankSystemCAS(BankSystem):
    """
    扩展版本:支持 CAS 操作和条件删除。
    """
    
    def compare_and_set(self, account_id: str, expected: float, new_balance: float) -> bool:
        """
        CAS 操作:原子地比较并设置余额。
        
        只有当当前余额 == expected 时,才设置为 new_balance。
        返回是否成功。
        """
        self._validate_account(account_id)
        account = self.accounts[account_id]
        
        if account.balance != expected:
            return False
        
        account.balance = new_balance
        return True
    
    def conditional_delete(self, account_id: str, expected_balance: float) -> bool:
        """
        条件删除:只有当余额等于预期值时才删除账户。
        """
        self._validate_account(account_id)
        if self.accounts[account_id].balance != expected_balance:
            return False
        
        del self.accounts[account_id]
        return True

第三问:Time Travel(时间回溯)

题目:支持查询某个时间点的余额。

@dataclass
class BalanceSnapshot:
    """余额快照"""
    timestamp: str
    balance: float
    tx_id: str

class BankSystemTimeTravel(BankSystemCAS):
    """
    扩展版本:支持 time travel 查询。
    
    维护一个有序快照列表,支持按时间二分查找。
    """
    
    def __init__(self):
        super().__init__()
        self.snapshots: dict[str, list[BalanceSnapshot]] = {}
        # 每个账户在创建时有一个初始快照
        # 初始快照在 create_account 中创建
    
    def create_account(self, account_id: str) -> Account:
        account = super().create_account(account_id)
        self.snapshots[account_id] = [
            BalanceSnapshot(
                timestamp=account.created_at,
                balance=0.0,
                tx_id="init"
            )
        ]
        return account
    
    def _add_snapshot(self, account_id: str, tx: Transaction) -> None:
        """交易后记录快照"""
        if account_id not in self.snapshots:
            return
        
        balance = self.accounts[account_id].balance
        snapshot = BalanceSnapshot(
            timestamp=tx.timestamp,
            balance=balance,
            tx_id=tx.tx_id
        )
        self.snapshots[account_id].append(snapshot)
    
    def get_balance_at(self, account_id: str, timestamp: str) -> float:
        """
        查询某个时间点的余额。
        
        在快照列表中找最后一个 timestamp <= 查询时间 的快照。
        使用二分查找:O(log n)
        """
        from bisect import bisect_right
        
        if account_id not in self.snapshots:
            raise AccountNotFoundError(f"Account {account_id} not found")
        
        snapshots = self.snapshots[account_id]
        # 二分查找
        idx = bisect_right(snapshots, timestamp, key=lambda s: s.timestamp)
        
        if idx == 0:
            raise ValueError(f"No data before {timestamp}")
        
        return snapshots[idx - 1].balance

第四问:Sliding Window 聚合

题目:查询某个账户在滑动窗口内的交易聚合(总存款、总取款、交易次数)。

from datetime import datetime, timedelta
from collections import deque

class AggregationResult:
    def __init__(self):
        self.total_deposits = 0.0
        self.total_withdrawals = 0.0
        self.transaction_count = 0
        self.transactions: list[Transaction] = []
    
    def __repr__(self):
        return (f"AggregationResult(deposits={self.total_deposits:.2f}, "
                f"withdrawals={self.total_withdrawals:.2f}, "
                f"count={self.transaction_count})")

class SlidingWindowAggregator:
    """
    滑动窗口聚合器。
    
    处理乱序事件流,按时间窗口聚合交易数据。
    """
    
    def __init__(self, window_seconds: int = 3600):
        self.window = timedelta(seconds=window_seconds)
        # 按账户维护有序事件队列
        self.events: dict[str, deque[tuple[str, Transaction]]] = defaultdict(deque)
    
    def add_event(self, account_id: str, tx: Transaction) -> None:
        """添加事件(可能乱序)"""
        ts = datetime.fromisoformat(tx.timestamp)
        self.events[account_id].append((tx.timestamp, tx))
    
    def get_aggregation(self, account_id: str, 
                        end_time: datetime = None) -> AggregationResult:
        """获取当前滑动窗口内的聚合结果"""
        if end_time is None:
            end_time = datetime.now()
        
        start_time = end_time - self.window
        result = AggregationResult()
        
        for tx_ts_str, tx in self.events.get(account_id, []):
            tx_time = datetime.fromisoformat(tx_ts_str)
            if start_time <= tx_time <= end_time:
                result.transaction_count += 1
                if tx.tx_type == "deposit":
                    result.total_deposits += tx.amount
                elif tx.tx_type == "withdraw":
                    result.total_withdrawals += tx.amount
                result.transactions.append(tx)
        
        return result

Coding 面试:数据结构只是基础,问题建模才是关键

Interleave Iterator 题目

面试官:实现一个 Interleave Iterator,接受多个列表,依次从每个列表中轮流输出元素。

from typing import Iterator, List

class InterleaveIterator:
    """
    交错迭代器。
    
    接受多个列表,按轮询顺序输出元素。
    [1,2,3], [4,5], [6] -> 1, 4, 6, 2, 5, 3
    """
    
    def __init__(self, lists: list[list]):
        self.lists = lists
        self.indices = [0] * len(lists)
        self.total_elements = sum(len(lst) for lst in lists)
        self.current = 0
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.current >= self.total_elements:
            raise StopIteration
        
        self.current += 1
        
        # 找到下一个有元素的列表
        for i in range(len(self.lists)):
            if self.indices[i] < len(self.lists[i]):
                value = self.lists[i][self.indices[i]]
                self.indices[i] += 1
                return value
        
        raise StopIteration

# 扩展:StepIterator — 步长控制
class StepIterator(InterleaveIterator):
    """每步取 N 个元素后再换列表"""
    
    def __init__(self, lists: list[list], step: int = 1):
        super().__init__(lists)
        self.step = step
        self.step_counter = 0
    
    def __next__(self):
        if self.current >= self.total_elements:
            raise StopIteration
        
        # 在当前列表上取 step 个元素
        for _ in range(self.step):
            for i in range(len(self.lists)):
                if self.indices[i] < len(self.lists[i]):
                    self.current += 1
                    self.step_counter += 1
                    value = self.lists[i][self.indices[i]]
                    self.indices[i] += 1
                    return value
        
        # 如果当前列表取完了 step 个,切换到下一个
        # 简化实现
        raise StopIteration

面试官:如果列表是无限流呢? :这时候不能预先计算 total_elements。可以用 Round-Robin 方式,每次遍历所有列表,检查每个列表是否有下一个元素。没有就跳过。这样可以处理无限流。

系统设计:真实场景 + 异步模型 + 可扩展接口

加密货币买入系统

面试官:设计一个买入加密货币的系统。从多个 broker 获取报价并选择最低价下单。所有 broker API 都是 async 的。

系统架构

┌──────────────────────────────────────────────────────┐
│                    Client (Web/Mobile)                │
└──────────────────────┬───────────────────────────────┘
                       │ async REST

┌──────────────────────────────────────────────────────┐
│              API Gateway (Koa / Go)                   │
│         async/await 异步路由层                         │
└──────────┬───────────────────────────────┬────────────┘
           │                               │
           ▼                               ▼
┌─────────────────────┐      ┌──────────────────────────┐
│  Buy Service        │      │  Order Management        │
│                     │      │                          │
│  1. Fetch quotes    │      │  - Order creation        │
│  2. Select lowest   │      │  - Order tracking        │
│  3. Place order     │─────►│  - Status updates        │
│  4. Confirm         │      │  - WebSocket push        │
└─────────┬───────────┘      └──────────┬───────────────┘
          │                             │
          ▼                             ▼
┌─────────────────────┐      ┌──────────────────────────┐
│  Broker Adapters    │      │   Persistence Layer       │
│                     │      │                          │
│  ┌──────┐ ┌──────┐ │      │  PostgreSQL (Orders)      │
│  │B1    │ │B2    │ │      │  Redis (Quotes cache)     │
│  │Async │ │Async │ │      │  Kafka (Order events)     │
│  └──────┘ └──────┘ │      └──────────────────────────┘
└─────────────────────┘
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional

@dataclass
class Quote:
    broker_id: str
    price: float
    available_amount: float
    latency_ms: int

class BrokerAdapter(ABC):
    """Broker 适配器接口"""
    
    @abstractmethod
    async def get_quote(self, symbol: str, amount: float) -> Optional[Quote]:
        pass
    
    @abstractmethod
    async def place_order(self, symbol: str, amount: float, price: float) -> str:
        pass

class BrokerClient(BrokerAdapter):
    """具体的 Broker 客户端实现"""
    
    def __init__(self, broker_id: str, base_url: str, timeout: float = 5.0):
        self.broker_id = broker_id
        self.base_url = base_url
        self.timeout = timeout
    
    async def get_quote(self, symbol: str, amount: float) -> Optional[Quote]:
        """异步获取报价,带超时和错误处理"""
        try:
            start = asyncio.get_event_loop().time()
            # 模拟异步请求
            response = await asyncio.wait_for(
                self._fetch_quote(symbol, amount),
                timeout=self.timeout
            )
            latency = (asyncio.get_event_loop().time() - start) * 1000
            return Quote(
                broker_id=self.broker_id,
                price=response["price"],
                available_amount=response["available"],
                latency_ms=int(latency)
            )
        except asyncio.TimeoutError:
            print(f"Broker {self.broker_id} timed out")
            return None
        except Exception as e:
            print(f"Broker {self.broker_id} error: {e}")
            return None
    
    async def place_order(self, symbol: str, amount: float, price: float) -> str:
        """异步下单"""
        # 模拟下单
        return f"order_{self.broker_id}_{symbol}"
    
    async def _fetch_quote(self, symbol: str, amount: float) -> dict:
        """模拟 API 请求"""
        await asyncio.sleep(0.1)  # 模拟网络延迟
        return {"price": 100.0 + hash(symbol) % 10, "available": 1000.0}

class CryptoBuyService:
    """
    加密货币买入服务。
    
    核心逻辑:并发获取所有 broker 报价,选择最优价格下单。
    """
    
    def __init__(self, brokers: list[BrokerAdapter]):
        self.brokers = brokers
    
    async def buy(self, symbol: str, amount: float) -> dict:
        """
        买入加密货币。
        
        1. 并发获取所有 broker 报价
        2. 选择价格最低的可用 broker
        3. 下单
        4. 返回结果
        """
        # 步骤 1:并发获取报价
        quote_tasks = [
            broker.get_quote(symbol, amount) 
            for broker in self.brokers
        ]
        quotes = await asyncio.gather(*quote_tasks, return_exceptions=True)
        
        # 过滤有效报价
        valid_quotes = []
        for q in quotes:
            if isinstance(q, Exception):
                continue
            if q is not None and q.available_amount >= amount:
                valid_quotes.append(q)
        
        if not valid_quotes:
            return {
                "status": "failed",
                "reason": "No broker available with sufficient funds"
            }
        
        # 步骤 2:选择最优报价(价格最低 + 延迟最低)
        best_quote = min(valid_quotes, 
                        key=lambda q: (q.price, q.latency_ms))
        
        # 步骤 3:下单
        order_id = await self._place_with_fallback(
            symbol, amount, best_quote
        )
        
        return {
            "status": "success",
            "order_id": order_id,
            "broker": best_quote.broker_id,
            "price": best_quote.price,
            "amount": amount
        }
    
    async def _place_with_fallback(self, symbol: str, amount: float,
                                    best_quote: Quote) -> str:
        """下单,带 fallback 到第二低价"""
        broker = next(b for b in self.brokers 
                     if b.broker_id == best_quote.broker_id)
        
        try:
            order_id = await asyncio.wait_for(
                broker.place_order(symbol, amount, best_quote.price),
                timeout=10.0
            )
            return order_id
        except (asyncio.TimeoutError, Exception):
            # Fallback: 找第二低价的 broker
            print(f"Primary broker failed, trying fallback...")
            # 实际实现中需要重新获取报价
            return "fallback_order_id"

面试官:如果某个 broker 频繁超时,怎么办? :可以用熔断器模式(Circuit Breaker)。连续失败 N 次后,暂时跳过这个 broker,过一段时间再试。同时记录每个 broker 的成功率和延迟,动态调整优先级。

BQ 面试:没有标准套路,重视动机与技术深度

面试官:Why Coinbase? :(结合 Coinbase 在 Web3 基建、合规风控等领域的实际业务方向,讲述了为什么感兴趣)

面试官:讲讲你做的最有影响力的项目。 :(用 STAR 框架,重点讲技术挑战和量化影响)

面试总结

成功经验

  1. OA 要体现工程思维:不是纯算法,而是完整的系统设计实现。
  2. Coding 要关注接口抽象:Coinbase 喜欢问 iterator、stream 这类设计题,考察 abstraction 能力。
  3. System Design 要 async-first:从设计开始就要按异步模型思考。
  4. BQ 要结合业务:Why Coinbase 要具体到公司的实际业务方向。

面试注意事项

时间管理:OA 时间紧,第一问 15 分钟,后续每问 12-15 分钟。

技术深度:Coinbase 特别看重异步编程能力和系统边界的思考。


推荐阅读


💡 需要面试辅导?

如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。

👉 联系我们 获取专属面试准备方案


准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们