Python 面试高频题全解析(2026 更新):Data Engineer 必考的 15 道题
Pythondata-engineercodinginterviewalgorithmdata-structures

Python 面试高频题全解析(2026 更新):Data Engineer 必考的 15 道题

Python面试高频题全解析2026:精选15道Data Engineer必考的Python编程题,涵盖数据结构、算法实现与系统设计。每题附完整代码实现与复杂度分析。

Sam · · 16 分钟阅读

Python 是 Data Engineer 面试中最常考的语言。Meta、Google、TikTok、Amazon 的 DE 面试都会用 Python 考编程题。


题型一:字符串处理

题目一:反转字符串中的单词

def reverse_words(s: str) -> str:
    """反转字符串中单词的顺序,去掉多余空格"""
    words = s.split()  # split() 自动处理多余空格
    return ' '.join(reversed(words))

# 测试
print(reverse_words("  hello   world  "))  # "world hello"

题目二:验证括号序列

def is_valid_parentheses(s: str) -> bool:
    stack = []
    mapping = {')': '(', ']': '[', '}': '{'}
    
    for char in s:
        if char in mapping.values():
            stack.append(char)
        elif char in mapping:
            if not stack or stack.pop() != mapping[char]:
                return False
    
    return not stack

# 测试
print(is_valid_parentheses("()[]{}"))   # True
print(is_valid_parentheses("([)]"))     # False

题目三:最长无重复子串

def length_of_longest_substring(s: str) -> int:
    char_index = {}
    start = 0
    max_length = 0
    
    for i, char in enumerate(s):
        if char in char_index and char_index[char] >= start:
            start = char_index[char] + 1
        
        char_index[char] = i
        max_length = max(max_length, i - start + 1)
    
    return max_length

# 测试
print(length_of_longest_substring("abcabcbb"))  # 3
print(length_of_longest_substring("bbbbb"))      # 1

题型二:数组与列表

题目四:合并区间

from typing import List

def merge_intervals(intervals: List[List[int]]) -> List[List[int]]:
    if not intervals:
        return []
    
    # 按起点排序
    intervals.sort(key=lambda x: x[0])
    
    result = [intervals[0]]
    
    for current in intervals[1:]:
        last = result[-1]
        
        if current[0] <= last[1]:
            # 有重叠,合并
            last[1] = max(last[1], current[1])
        else:
            result.append(current)
    
    return result

# 测试
print(merge_intervals([[1,3],[2,6],[8,10],[15,18]]))
# [[1,6], [8,10], [15,18]]

题目五:数组去重并保持顺序

def remove_duplicates_keep_order(nums: list[int]) -> list[int]:
    seen = set()
    result = []
    
    for num in nums:
        if num not in seen:
            seen.add(num)
            result.append(num)
    
    return result

# 测试
print(remove_duplicates_keep_order([1, 2, 2, 3, 1, 4]))
# [1, 2, 3, 4]

题型三:字典与哈希表

题目六:两数之和

def two_sum(nums: list[int], target: int) -> list[int]:
    num_map = {}
    
    for i, num in enumerate(nums):
        complement = target - num
        
        if complement in num_map:
            return [num_map[complement], i]
        
        num_map[num] = i
    
    return []

# 测试
print(two_sum([2, 7, 11, 15], 9))  # [0, 1]

题目七:分组字母异位词

from collections import defaultdict

def group_anagrams(strs: list[str]) -> list[list[str]]:
    anagram_map = defaultdict(list)
    
    for s in strs:
        key = tuple(sorted(s))
        anagram_map[key].append(s)
    
    return list(anagram_map.values())

# 测试
print(group_anagrams(["eat", "tea", "tan", "ate", "nat", "bat"]))
# [["eat", "tea", "ate"], ["tan", "nat"], ["bat"]]

题型四:链表操作

题目八:反转链表

class ListNode:
    def __init__(self, val=0, next=None):
        self.val = val
        self.next = next

def reverse_linked_list(head: ListNode) -> ListNode:
    prev = None
    current = head
    
    while current:
        next_temp = current.next
        current.next = prev
        prev = current
        current = next_temp
    
    return prev

题目九:检测链表环

def has_cycle(head: ListNode) -> bool:
    """Floyd's cycle detection algorithm"""
    slow = fast = head
    
    while fast and fast.next:
        slow = slow.next
        fast = fast.next.next
        
        if slow == fast:
            return True
    
    return False

题型五:树与图

题目十:二叉树层序遍历

from collections import deque
from typing import Optional, List

def level_order(root: Optional[ListNode]) -> List[List[int]]:
    if not root:
        return []
    
    result = []
    queue = deque([root])
    
    while queue:
        level_size = len(queue)
        level = []
        
        for _ in range(level_size):
            node = queue.popleft()
            level.append(node.val)
            
            if node.left:
                queue.append(node.left)
            if node.right:
                queue.append(node.right)
        
        result.append(level)
    
    return result

题型六:算法题

题目十一:LRU Cache

class LRUCache:
    def __init__(self, capacity: int):
        from collections import OrderedDict
        self.cache = OrderedDict()
        self.capacity = capacity
    
    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        
        # 移动到末尾(最近使用)
        self.cache.move_to_end(key)
        return self.cache[key]
    
    def put(self, key: int, value: int) -> None:
        if key in self.cache:
            self.cache.move_to_end(key)
        
        self.cache[key] = value
        
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)  # 移除最久未使用

题目十二:Top K 频繁元素

import heapq
from collections import Counter

def top_k_frequent(nums: list[int], k: int) -> list[int]:
    count = Counter(nums)
    
    # 用最小堆维护 top k
    heap = []
    for num, freq in count.items():
        heapq.heappush(heap, (freq, num))
        
        if len(heap) > k:
            heapq.heappop(heap)
    
    return [num for freq, num in heap]

# 测试
print(top_k_frequent([1,1,1,2,2,3], 2))  # [1, 2]

题型七:系统设计

题目十三:日志解析器

import re
from dataclasses import dataclass
from typing import List

@dataclass
class LogEntry:
    timestamp: str
    level: str
    message: str

def parse_logs(raw_logs: List[str]) -> List[LogEntry]:
    pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)'
    entries = []
    
    for line in raw_logs:
        match = re.match(pattern, line)
        if match:
            entries.append(LogEntry(
                timestamp=match.group(1),
                level=match.group(2),
                message=match.group(3)
            ))
    
    return entries

题目十四:数据管道

from typing import Callable, Iterable

def create_pipeline(*functions: Callable) -> Callable:
    """创建数据管道"""
    def pipeline(data: Iterable) -> Iterable:
        result = data
        for func in functions:
            result = func(result)
        return result
    return pipeline

# 使用示例
def filter_even(x): return [i for i in x if i % 2 == 0]
def double(x): return [i * 2 for i in x]

pipe = create_pipeline(filter_even, double)
print(pipe([1, 2, 3, 4, 5, 6]))  # [4, 8, 12]

题目十五:异步数据加载

import asyncio
from typing import List

async def fetch_data(url: str) -> str:
    """模拟异步数据获取"""
    await asyncio.sleep(1)  # 模拟网络请求
    return f"Data from {url}"

async def fetch_all(urls: List[str]) -> List[str]:
    """并发获取所有数据"""
    tasks = [fetch_data(url) for url in urls]
    return await asyncio.gather(*tasks)

# 使用
# asyncio.run(fetch_all(["url1", "url2", "url3"]))

Python 面试技巧

1. 熟悉内置数据结构

数据结构操作时间复杂度
list索引访问O(1)
list插入/删除O(n)
set成员检查O(1)
dict键查找O(1)
deque两端操作O(1)
heap插入/弹出O(log n)

2. 常用内置函数

# 排序
sorted([3, 1, 2], key=lambda x: -x)

# 枚举
for i, val in enumerate([1, 2, 3]):
    print(i, val)

# 拉链
list(zip([1, 2], ['a', 'b']))  # [(1, 'a'), (2, 'b')]

# 展开
[*set([1, 2, 2, 3])]  # [1, 2, 3]

3. 代码风格

  • 用类型注解
  • 写 docstring
  • 变量名要有意义
  • 遵循 PEP 8

FAQ

Python 面试可以用 IDE 吗?

通常不行,面试官希望你手写代码。

需要记住所有语法吗?

不需要,面试官主要看思路。


💡 需要面试辅导?

如果你对准备技术面试感到迷茫,或者想要个性化的面试指导和简历优化,欢迎联系 Interview Coach Pro 获取一对一辅导服务。

👉 联系我们 获取专属面试准备方案

准备好拿下下一次面试了吗?

获取针对你的目标岗位和公司的个性化辅导方案。

联系我们