导航菜单

编程范式入门 | 第五课:并发编程 - 让程序同时处理多件事情

阅读约 1 分钟 编程范式入门

在现代软件开发中,用户期望应用能够快速响应,同时处理多个任务。想象一下,如果你的网页在加载图片时整个界面都卡住了,或者聊天应用在发送消息时无法接收新消息,用户体验会多么糟糕。这就是为什么我们需要并发编程

🎯 什么是并发编程?

并发编程是一种编程范式,它允许程序同时执行多个任务或处理多个事件。这就像一个优秀的服务员,能够同时为多桌客人点菜、上菜、结账,而不是一次只服务一桌客人。

并发 vs 并行

在深入学习之前,我们需要理解两个重要概念:

  • 并发(Concurrency):多个任务在同一时间段内交替执行,看起来像是同时进行
  • 并行(Parallelism):多个任务真正同时执行,需要多核处理器支持
// 串行执行:一个接一个
function serialTasks() {
  console.log('任务1开始');
  // 模拟耗时操作
  for(let i = 0; i < 1000000000; i++) {}
  console.log('任务1完成');
  
  console.log('任务2开始');
  for(let i = 0; i < 1000000000; i++) {}
  console.log('任务2完成');
}

// 并发执行:交替进行
function concurrentTasks() {
  setTimeout(() => console.log('任务1完成'), 1000);
  setTimeout(() => console.log('任务2完成'), 500);
  console.log('两个任务都已启动');
}

🌟 JavaScript中的并发编程

1. 事件循环(Event Loop)

JavaScript是单线程语言,但通过事件循环机制实现了并发处理:

console.log('1. 同步代码开始');

setTimeout(() => {
  console.log('3. 宏任务:setTimeout');
}, 0);

Promise.resolve().then(() => {
  console.log('2. 微任务:Promise');
});

console.log('1. 同步代码结束');

// 输出顺序:
// 1. 同步代码开始
// 1. 同步代码结束  
// 2. 微任务:Promise
// 3. 宏任务:setTimeout

2. 回调函数(Callbacks)

最早的异步编程方式:

// 简单回调
function fetchUserData(userId, callback) {
  // 模拟网络请求
  setTimeout(() => {
    const userData = { id: userId, name: '张三', email: 'zhang@example.com' };
    callback(null, userData);
  }, 1000);
}

fetchUserData(123, (error, user) => {
  if (error) {
    console.error('获取用户数据失败:', error);
  } else {
    console.log('用户数据:', user);
  }
});

// 回调地狱问题
function fetchUserProfile(userId) {
  fetchUserData(userId, (error, user) => {
    if (error) return console.error(error);
    
    fetchUserPosts(user.id, (error, posts) => {
      if (error) return console.error(error);
      
      fetchPostComments(posts[0].id, (error, comments) => {
        if (error) return console.error(error);
        
        console.log('用户资料完整数据:', { user, posts, comments });
      });
    });
  });
}

3. Promise:更优雅的异步处理

// 创建Promise
function fetchUserData(userId) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (userId > 0) {
        resolve({ id: userId, name: '张三', email: 'zhang@example.com' });
      } else {
        reject(new Error('无效的用户ID'));
      }
    }, 1000);
  });
}

// 使用Promise
fetchUserData(123)
  .then(user => {
    console.log('用户数据:', user);
    return fetchUserPosts(user.id);
  })
  .then(posts => {
    console.log('用户文章:', posts);
    return fetchPostComments(posts[0].id);
  })
  .then(comments => {
    console.log('文章评论:', comments);
  })
  .catch(error => {
    console.error('发生错误:', error);
  });

// Promise.all:并行执行多个异步任务
Promise.all([
  fetchUserData(123),
  fetchUserData(456),
  fetchUserData(789)
])
.then(users => {
  console.log('所有用户数据:', users);
})
.catch(error => {
  console.error('至少有一个请求失败:', error);
});

// Promise.race:竞速执行
Promise.race([
  fetchUserData(123),
  new Promise((_, reject) => 
    setTimeout(() => reject(new Error('请求超时')), 2000)
  )
])
.then(result => console.log('最快的结果:', result))
.catch(error => console.error('错误或超时:', error));

4. async/await:同步风格的异步代码

// 使用async/await重写上面的例子
async function fetchUserProfile(userId) {
  try {
    const user = await fetchUserData(userId);
    console.log('用户数据:', user);
    
    const posts = await fetchUserPosts(user.id);
    console.log('用户文章:', posts);
    
    const comments = await fetchPostComments(posts[0].id);
    console.log('文章评论:', comments);
    
    return { user, posts, comments };
  } catch (error) {
    console.error('发生错误:', error);
    throw error;
  }
}

// 并行执行多个异步任务
async function fetchMultipleUsers() {
  try {
    const users = await Promise.all([
      fetchUserData(123),
      fetchUserData(456),
      fetchUserData(789)
    ]);
    
    console.log('所有用户数据:', users);
    return users;
  } catch (error) {
    console.error('获取用户数据失败:', error);
  }
}

// 错误处理和超时控制
async function fetchWithTimeout(userId, timeout = 5000) {
  try {
    const result = await Promise.race([
      fetchUserData(userId),
      new Promise((_, reject) => 
        setTimeout(() => reject(new Error('请求超时')), timeout)
      )
    ]);
    
    return result;
  } catch (error) {
    if (error.message === '请求超时') {
      console.log('请求超时,使用缓存数据');
      return getCachedUserData(userId);
    }
    throw error;
  }
}

🚀 高级并发模式

1. 生成器(Generators)和异步迭代

// 异步生成器
async function* fetchUsersGenerator(userIds) {
  for (const userId of userIds) {
    try {
      const user = await fetchUserData(userId);
      yield user;
    } catch (error) {
      console.error(`获取用户${userId}失败:`, error);
    }
  }
}

// 使用异步生成器
async function processUsers() {
  const userIds = [123, 456, 789, 101, 202];
  
  for await (const user of fetchUsersGenerator(userIds)) {
    console.log('处理用户:', user);
    // 可以在这里对每个用户进行处理
  }
}

// 控制并发数量的生成器
async function* batchFetchUsers(userIds, batchSize = 3) {
  for (let i = 0; i < userIds.length; i += batchSize) {
    const batch = userIds.slice(i, i + batchSize);
    const users = await Promise.all(
      batch.map(id => fetchUserData(id).catch(err => ({ error: err, id })))
    );
    
    for (const user of users) {
      yield user;
    }
  }
}

2. Web Workers:真正的并行处理

// main.js - 主线程
class WorkerPool {
  constructor(workerScript, poolSize = 4) {
    this.workers = [];
    this.taskQueue = [];
    this.busyWorkers = new Set();
    
    // 创建工作线程池
    for (let i = 0; i < poolSize; i++) {
      const worker = new Worker(workerScript);
      worker.onmessage = (e) => this.handleWorkerMessage(worker, e);
      this.workers.push(worker);
    }
  }
  
  async executeTask(data) {
    return new Promise((resolve, reject) => {
      const task = { data, resolve, reject, id: Date.now() + Math.random() };
      
      const availableWorker = this.workers.find(w => !this.busyWorkers.has(w));
      
      if (availableWorker) {
        this.assignTask(availableWorker, task);
      } else {
        this.taskQueue.push(task);
      }
    });
  }
  
  assignTask(worker, task) {
    this.busyWorkers.add(worker);
    worker.currentTask = task;
    worker.postMessage({ id: task.id, data: task.data });
  }
  
  handleWorkerMessage(worker, event) {
    const { id, result, error } = event.data;
    const task = worker.currentTask;
    
    if (task && task.id === id) {
      this.busyWorkers.delete(worker);
      worker.currentTask = null;
      
      if (error) {
        task.reject(new Error(error));
      } else {
        task.resolve(result);
      }
      
      // 处理队列中的下一个任务
      if (this.taskQueue.length > 0) {
        const nextTask = this.taskQueue.shift();
        this.assignTask(worker, nextTask);
      }
    }
  }
}

// 使用工作线程池
const workerPool = new WorkerPool('math-worker.js', 4);

async function processLargeDataset() {
  const data = Array.from({ length: 1000 }, (_, i) => i);
  const chunkSize = 100;
  const chunks = [];
  
  // 将数据分块
  for (let i = 0; i < data.length; i += chunkSize) {
    chunks.push(data.slice(i, i + chunkSize));
  }
  
  try {
    // 并行处理所有数据块
    const results = await Promise.all(
      chunks.map(chunk => workerPool.executeTask(chunk))
    );
    
    // 合并结果
    const finalResult = results.flat();
    console.log('处理完成:', finalResult);
  } catch (error) {
    console.error('处理失败:', error);
  }
}
// math-worker.js - 工作线程
self.onmessage = function(e) {
  const { id, data } = e.data;
  
  try {
    // 执行计算密集型任务
    const result = data.map(num => {
      // 模拟复杂计算
      let sum = 0;
      for (let i = 0; i < 1000000; i++) {
        sum += Math.sqrt(num * i);
      }
      return sum;
    });
    
    // 返回结果
    self.postMessage({ id, result });
  } catch (error) {
    self.postMessage({ id, error: error.message });
  }
};

3. 响应式编程(Reactive Programming)

// 简单的Observable实现
class Observable {
  constructor(subscribe) {
    this.subscribe = subscribe;
  }
  
  static fromEvent(element, eventName) {
    return new Observable(observer => {
      const handler = event => observer.next(event);
      element.addEventListener(eventName, handler);
      
      return () => element.removeEventListener(eventName, handler);
    });
  }
  
  static interval(ms) {
    return new Observable(observer => {
      const id = setInterval(() => observer.next(Date.now()), ms);
      return () => clearInterval(id);
    });
  }
  
  map(fn) {
    return new Observable(observer => {
      return this.subscribe({
        next: value => observer.next(fn(value)),
        error: err => observer.error(err),
        complete: () => observer.complete()
      });
    });
  }
  
  filter(predicate) {
    return new Observable(observer => {
      return this.subscribe({
        next: value => predicate(value) && observer.next(value),
        error: err => observer.error(err),
        complete: () => observer.complete()
      });
    });
  }
  
  debounce(ms) {
    return new Observable(observer => {
      let timeoutId;
      
      return this.subscribe({
        next: value => {
          clearTimeout(timeoutId);
          timeoutId = setTimeout(() => observer.next(value), ms);
        },
        error: err => observer.error(err),
        complete: () => observer.complete()
      });
    });
  }
}

// 使用响应式编程处理用户输入
const searchInput = document.getElementById('search');
const searchResults = document.getElementById('results');

const search$ = Observable.fromEvent(searchInput, 'input')
  .map(event => event.target.value)
  .filter(query => query.length > 2)
  .debounce(300)
  .map(query => fetch(`/api/search?q=${query}`).then(r => r.json()));

search$.subscribe({
  next: async (searchPromise) => {
    try {
      const results = await searchPromise;
      displayResults(results);
    } catch (error) {
      console.error('搜索失败:', error);
    }
  },
  error: err => console.error('Observable错误:', err)
});

function displayResults(results) {
  searchResults.innerHTML = results
    .map(item => `<div class="result">${item.title}</div>`)
    .join('');
}

🛠️ 实战案例:构建一个并发文件上传器

class ConcurrentFileUploader {
  constructor(options = {}) {
    this.maxConcurrent = options.maxConcurrent || 3;
    this.retryAttempts = options.retryAttempts || 3;
    this.chunkSize = options.chunkSize || 1024 * 1024; // 1MB
    
    this.uploadQueue = [];
    this.activeUploads = new Map();
    this.completedUploads = [];
    this.failedUploads = [];
  }
  
  async uploadFiles(files) {
    // 将文件添加到队列
    for (const file of files) {
      this.uploadQueue.push({
        file,
        id: this.generateId(),
        status: 'pending',
        progress: 0,
        attempts: 0
      });
    }
    
    // 开始并发上传
    const uploadPromises = [];
    for (let i = 0; i < Math.min(this.maxConcurrent, this.uploadQueue.length); i++) {
      uploadPromises.push(this.processUploadQueue());
    }
    
    await Promise.all(uploadPromises);
    
    return {
      completed: this.completedUploads,
      failed: this.failedUploads
    };
  }
  
  async processUploadQueue() {
    while (this.uploadQueue.length > 0) {
      const uploadTask = this.uploadQueue.shift();
      if (!uploadTask) break;
      
      try {
        await this.uploadFile(uploadTask);
        this.completedUploads.push(uploadTask);
      } catch (error) {
        uploadTask.error = error;
        this.failedUploads.push(uploadTask);
      }
    }
  }
  
  async uploadFile(uploadTask) {
    const { file, id } = uploadTask;
    this.activeUploads.set(id, uploadTask);
    
    try {
      // 大文件分块上传
      if (file.size > this.chunkSize) {
        await this.uploadFileInChunks(uploadTask);
      } else {
        await this.uploadSingleFile(uploadTask);
      }
      
      uploadTask.status = 'completed';
      uploadTask.progress = 100;
    } catch (error) {
      uploadTask.attempts++;
      
      if (uploadTask.attempts < this.retryAttempts) {
        console.log(`重试上传文件 ${file.name}, 第${uploadTask.attempts}次尝试`);
        await this.delay(1000 * uploadTask.attempts); // 指数退避
        return this.uploadFile(uploadTask);
      }
      
      uploadTask.status = 'failed';
      throw error;
    } finally {
      this.activeUploads.delete(id);
    }
  }
  
  async uploadFileInChunks(uploadTask) {
    const { file, id } = uploadTask;
    const totalChunks = Math.ceil(file.size / this.chunkSize);
    
    // 初始化分块上传
    const uploadId = await this.initializeChunkedUpload(file);
    const uploadedChunks = [];
    
    try {
      // 并发上传分块
      const chunkPromises = [];
      
      for (let i = 0; i < totalChunks; i++) {
        const start = i * this.chunkSize;
        const end = Math.min(start + this.chunkSize, file.size);
        const chunk = file.slice(start, end);
        
        chunkPromises.push(
          this.uploadChunk(uploadId, i, chunk, uploadTask)
        );
        
        // 限制并发分块数量
        if (chunkPromises.length >= this.maxConcurrent) {
          const results = await Promise.all(chunkPromises);
          uploadedChunks.push(...results);
          chunkPromises.length = 0;
          
          // 更新进度
          uploadTask.progress = (uploadedChunks.length / totalChunks) * 100;
          this.onProgress?.(uploadTask);
        }
      }
      
      // 上传剩余分块
      if (chunkPromises.length > 0) {
        const results = await Promise.all(chunkPromises);
        uploadedChunks.push(...results);
      }
      
      // 完成分块上传
      await this.completeChunkedUpload(uploadId, uploadedChunks);
      
    } catch (error) {
      await this.abortChunkedUpload(uploadId);
      throw error;
    }
  }
  
  async uploadSingleFile(uploadTask) {
    const { file } = uploadTask;
    const formData = new FormData();
    formData.append('file', file);
    
    const response = await fetch('/api/upload', {
      method: 'POST',
      body: formData,
      onUploadProgress: (event) => {
        uploadTask.progress = (event.loaded / event.total) * 100;
        this.onProgress?.(uploadTask);
      }
    });
    
    if (!response.ok) {
      throw new Error(`上传失败: ${response.statusText}`);
    }
    
    return response.json();
  }
  
  async uploadChunk(uploadId, chunkIndex, chunk, uploadTask) {
    const formData = new FormData();
    formData.append('uploadId', uploadId);
    formData.append('chunkIndex', chunkIndex);
    formData.append('chunk', chunk);
    
    const response = await fetch('/api/upload/chunk', {
      method: 'POST',
      body: formData
    });
    
    if (!response.ok) {
      throw new Error(`分块${chunkIndex}上传失败`);
    }
    
    return { chunkIndex, etag: response.headers.get('ETag') };
  }
  
  async initializeChunkedUpload(file) {
    const response = await fetch('/api/upload/init', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        fileName: file.name,
        fileSize: file.size,
        contentType: file.type
      })
    });
    
    const { uploadId } = await response.json();
    return uploadId;
  }
  
  async completeChunkedUpload(uploadId, chunks) {
    const response = await fetch('/api/upload/complete', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ uploadId, chunks })
    });
    
    return response.json();
  }
  
  async abortChunkedUpload(uploadId) {
    await fetch(`/api/upload/abort/${uploadId}`, { method: 'DELETE' });
  }
  
  generateId() {
    return Date.now().toString(36) + Math.random().toString(36).substr(2);
  }
  
  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
  
  // 获取上传状态
  getUploadStatus() {
    return {
      pending: this.uploadQueue.length,
      active: this.activeUploads.size,
      completed: this.completedUploads.length,
      failed: this.failedUploads.length
    };
  }
}

// 使用文件上传器
const uploader = new ConcurrentFileUploader({
  maxConcurrent: 3,
  retryAttempts: 3,
  chunkSize: 2 * 1024 * 1024 // 2MB
});

// 监听进度
uploader.onProgress = (uploadTask) => {
  console.log(`${uploadTask.file.name}: ${uploadTask.progress.toFixed(1)}%`);
};

// 处理文件选择
document.getElementById('fileInput').addEventListener('change', async (event) => {
  const files = Array.from(event.target.files);
  
  try {
    const result = await uploader.uploadFiles(files);
    console.log('上传完成:', result);
  } catch (error) {
    console.error('上传失败:', error);
  }
});

⚖️ 并发编程的最佳实践

✅ 优势

  1. 提高性能:充分利用系统资源
  2. 改善用户体验:避免界面阻塞
  3. 增强可扩展性:能处理更多并发请求
  4. 提高吞吐量:同时处理多个任务

⚠️ 注意事项

  1. 竞态条件:多个任务访问共享资源
  2. 内存泄漏:未正确清理异步任务
  3. 错误处理:异步错误的传播和处理
  4. 调试困难:异步代码的调试复杂性

🎯 最佳实践

// 1. 合理控制并发数量
class ConcurrencyController {
  constructor(limit = 5) {
    this.limit = limit;
    this.running = 0;
    this.queue = [];
  }
  
  async execute(task) {
    return new Promise((resolve, reject) => {
      this.queue.push({ task, resolve, reject });
      this.process();
    });
  }
  
  async process() {
    if (this.running >= this.limit || this.queue.length === 0) {
      return;
    }
    
    this.running++;
    const { task, resolve, reject } = this.queue.shift();
    
    try {
      const result = await task();
      resolve(result);
    } catch (error) {
      reject(error);
    } finally {
      this.running--;
      this.process();
    }
  }
}

// 2. 超时和取消机制
class CancellablePromise {
  constructor(executor) {
    this.cancelled = false;
    
    this.promise = new Promise((resolve, reject) => {
      this.reject = reject;
      
      executor(
        (value) => {
          if (!this.cancelled) resolve(value);
        },
        (reason) => {
          if (!this.cancelled) reject(reason);
        }
      );
    });
  }
  
  cancel() {
    this.cancelled = true;
    this.reject(new Error('操作已取消'));
  }
  
  static withTimeout(promise, timeout) {
    const timeoutPromise = new Promise((_, reject) => {
      setTimeout(() => reject(new Error('操作超时')), timeout);
    });
    
    return Promise.race([promise, timeoutPromise]);
  }
}

// 3. 错误处理和重试机制
async function withRetry(fn, maxAttempts = 3, delay = 1000) {
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxAttempts) throw error;
      
      console.log(`第${attempt}次尝试失败,${delay}ms后重试`);
      await new Promise(resolve => setTimeout(resolve, delay));
      delay *= 2; // 指数退避
    }
  }
}

📚 总结

并发编程是现代应用开发的核心技能,它让我们能够构建响应迅速、性能优异的应用程序。JavaScript提供了丰富的并发编程工具,从基础的Promise到高级的Web Workers。

核心要点:

  • 理解事件循环机制
  • 掌握Promise和async/await
  • 合理控制并发数量
  • 处理错误和超时
  • 使用Web Workers处理CPU密集型任务

🔮 下节预告

下一课,我们将探索现代编程范式,包括响应式编程、微服务架构、函数式响应式编程等前沿概念,了解编程范式的最新发展趋势。

💭 思考题

  1. 什么情况下应该使用Web Workers而不是async/await?
  2. 如何设计一个既支持并发又能避免竞态条件的系统?
  3. 在你的项目中,哪些场景可以应用并发编程来提升性能?

并发编程让程序变得更加高效和响应迅速,但也带来了复杂性。掌握好并发编程的原理和最佳实践,是成为优秀开发者的必经之路!