在现代软件开发中,用户期望应用能够快速响应,同时处理多个任务。想象一下,如果你的网页在加载图片时整个界面都卡住了,或者聊天应用在发送消息时无法接收新消息,用户体验会多么糟糕。这就是为什么我们需要并发编程。
🎯 什么是并发编程?
并发编程是一种编程范式,它允许程序同时执行多个任务或处理多个事件。这就像一个优秀的服务员,能够同时为多桌客人点菜、上菜、结账,而不是一次只服务一桌客人。
并发 vs 并行
在深入学习之前,我们需要理解两个重要概念:
- 并发(Concurrency):多个任务在同一时间段内交替执行,看起来像是同时进行
- 并行(Parallelism):多个任务真正同时执行,需要多核处理器支持
// 串行执行:一个接一个
function serialTasks() {
console.log('任务1开始');
// 模拟耗时操作
for(let i = 0; i < 1000000000; i++) {}
console.log('任务1完成');
console.log('任务2开始');
for(let i = 0; i < 1000000000; i++) {}
console.log('任务2完成');
}
// 并发执行:交替进行
function concurrentTasks() {
setTimeout(() => console.log('任务1完成'), 1000);
setTimeout(() => console.log('任务2完成'), 500);
console.log('两个任务都已启动');
}
🌟 JavaScript中的并发编程
1. 事件循环(Event Loop)
JavaScript是单线程语言,但通过事件循环机制实现了并发处理:
console.log('1. 同步代码开始');
setTimeout(() => {
console.log('3. 宏任务:setTimeout');
}, 0);
Promise.resolve().then(() => {
console.log('2. 微任务:Promise');
});
console.log('1. 同步代码结束');
// 输出顺序:
// 1. 同步代码开始
// 1. 同步代码结束
// 2. 微任务:Promise
// 3. 宏任务:setTimeout
2. 回调函数(Callbacks)
最早的异步编程方式:
// 简单回调
function fetchUserData(userId, callback) {
// 模拟网络请求
setTimeout(() => {
const userData = { id: userId, name: '张三', email: 'zhang@example.com' };
callback(null, userData);
}, 1000);
}
fetchUserData(123, (error, user) => {
if (error) {
console.error('获取用户数据失败:', error);
} else {
console.log('用户数据:', user);
}
});
// 回调地狱问题
function fetchUserProfile(userId) {
fetchUserData(userId, (error, user) => {
if (error) return console.error(error);
fetchUserPosts(user.id, (error, posts) => {
if (error) return console.error(error);
fetchPostComments(posts[0].id, (error, comments) => {
if (error) return console.error(error);
console.log('用户资料完整数据:', { user, posts, comments });
});
});
});
}
3. Promise:更优雅的异步处理
// 创建Promise
function fetchUserData(userId) {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (userId > 0) {
resolve({ id: userId, name: '张三', email: 'zhang@example.com' });
} else {
reject(new Error('无效的用户ID'));
}
}, 1000);
});
}
// 使用Promise
fetchUserData(123)
.then(user => {
console.log('用户数据:', user);
return fetchUserPosts(user.id);
})
.then(posts => {
console.log('用户文章:', posts);
return fetchPostComments(posts[0].id);
})
.then(comments => {
console.log('文章评论:', comments);
})
.catch(error => {
console.error('发生错误:', error);
});
// Promise.all:并行执行多个异步任务
Promise.all([
fetchUserData(123),
fetchUserData(456),
fetchUserData(789)
])
.then(users => {
console.log('所有用户数据:', users);
})
.catch(error => {
console.error('至少有一个请求失败:', error);
});
// Promise.race:竞速执行
Promise.race([
fetchUserData(123),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('请求超时')), 2000)
)
])
.then(result => console.log('最快的结果:', result))
.catch(error => console.error('错误或超时:', error));
4. async/await:同步风格的异步代码
// 使用async/await重写上面的例子
async function fetchUserProfile(userId) {
try {
const user = await fetchUserData(userId);
console.log('用户数据:', user);
const posts = await fetchUserPosts(user.id);
console.log('用户文章:', posts);
const comments = await fetchPostComments(posts[0].id);
console.log('文章评论:', comments);
return { user, posts, comments };
} catch (error) {
console.error('发生错误:', error);
throw error;
}
}
// 并行执行多个异步任务
async function fetchMultipleUsers() {
try {
const users = await Promise.all([
fetchUserData(123),
fetchUserData(456),
fetchUserData(789)
]);
console.log('所有用户数据:', users);
return users;
} catch (error) {
console.error('获取用户数据失败:', error);
}
}
// 错误处理和超时控制
async function fetchWithTimeout(userId, timeout = 5000) {
try {
const result = await Promise.race([
fetchUserData(userId),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('请求超时')), timeout)
)
]);
return result;
} catch (error) {
if (error.message === '请求超时') {
console.log('请求超时,使用缓存数据');
return getCachedUserData(userId);
}
throw error;
}
}
🚀 高级并发模式
1. 生成器(Generators)和异步迭代
// 异步生成器
async function* fetchUsersGenerator(userIds) {
for (const userId of userIds) {
try {
const user = await fetchUserData(userId);
yield user;
} catch (error) {
console.error(`获取用户${userId}失败:`, error);
}
}
}
// 使用异步生成器
async function processUsers() {
const userIds = [123, 456, 789, 101, 202];
for await (const user of fetchUsersGenerator(userIds)) {
console.log('处理用户:', user);
// 可以在这里对每个用户进行处理
}
}
// 控制并发数量的生成器
async function* batchFetchUsers(userIds, batchSize = 3) {
for (let i = 0; i < userIds.length; i += batchSize) {
const batch = userIds.slice(i, i + batchSize);
const users = await Promise.all(
batch.map(id => fetchUserData(id).catch(err => ({ error: err, id })))
);
for (const user of users) {
yield user;
}
}
}
2. Web Workers:真正的并行处理
// main.js - 主线程
class WorkerPool {
constructor(workerScript, poolSize = 4) {
this.workers = [];
this.taskQueue = [];
this.busyWorkers = new Set();
// 创建工作线程池
for (let i = 0; i < poolSize; i++) {
const worker = new Worker(workerScript);
worker.onmessage = (e) => this.handleWorkerMessage(worker, e);
this.workers.push(worker);
}
}
async executeTask(data) {
return new Promise((resolve, reject) => {
const task = { data, resolve, reject, id: Date.now() + Math.random() };
const availableWorker = this.workers.find(w => !this.busyWorkers.has(w));
if (availableWorker) {
this.assignTask(availableWorker, task);
} else {
this.taskQueue.push(task);
}
});
}
assignTask(worker, task) {
this.busyWorkers.add(worker);
worker.currentTask = task;
worker.postMessage({ id: task.id, data: task.data });
}
handleWorkerMessage(worker, event) {
const { id, result, error } = event.data;
const task = worker.currentTask;
if (task && task.id === id) {
this.busyWorkers.delete(worker);
worker.currentTask = null;
if (error) {
task.reject(new Error(error));
} else {
task.resolve(result);
}
// 处理队列中的下一个任务
if (this.taskQueue.length > 0) {
const nextTask = this.taskQueue.shift();
this.assignTask(worker, nextTask);
}
}
}
}
// 使用工作线程池
const workerPool = new WorkerPool('math-worker.js', 4);
async function processLargeDataset() {
const data = Array.from({ length: 1000 }, (_, i) => i);
const chunkSize = 100;
const chunks = [];
// 将数据分块
for (let i = 0; i < data.length; i += chunkSize) {
chunks.push(data.slice(i, i + chunkSize));
}
try {
// 并行处理所有数据块
const results = await Promise.all(
chunks.map(chunk => workerPool.executeTask(chunk))
);
// 合并结果
const finalResult = results.flat();
console.log('处理完成:', finalResult);
} catch (error) {
console.error('处理失败:', error);
}
}
// math-worker.js - 工作线程
self.onmessage = function(e) {
const { id, data } = e.data;
try {
// 执行计算密集型任务
const result = data.map(num => {
// 模拟复杂计算
let sum = 0;
for (let i = 0; i < 1000000; i++) {
sum += Math.sqrt(num * i);
}
return sum;
});
// 返回结果
self.postMessage({ id, result });
} catch (error) {
self.postMessage({ id, error: error.message });
}
};
3. 响应式编程(Reactive Programming)
// 简单的Observable实现
class Observable {
constructor(subscribe) {
this.subscribe = subscribe;
}
static fromEvent(element, eventName) {
return new Observable(observer => {
const handler = event => observer.next(event);
element.addEventListener(eventName, handler);
return () => element.removeEventListener(eventName, handler);
});
}
static interval(ms) {
return new Observable(observer => {
const id = setInterval(() => observer.next(Date.now()), ms);
return () => clearInterval(id);
});
}
map(fn) {
return new Observable(observer => {
return this.subscribe({
next: value => observer.next(fn(value)),
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
filter(predicate) {
return new Observable(observer => {
return this.subscribe({
next: value => predicate(value) && observer.next(value),
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
debounce(ms) {
return new Observable(observer => {
let timeoutId;
return this.subscribe({
next: value => {
clearTimeout(timeoutId);
timeoutId = setTimeout(() => observer.next(value), ms);
},
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
}
// 使用响应式编程处理用户输入
const searchInput = document.getElementById('search');
const searchResults = document.getElementById('results');
const search$ = Observable.fromEvent(searchInput, 'input')
.map(event => event.target.value)
.filter(query => query.length > 2)
.debounce(300)
.map(query => fetch(`/api/search?q=${query}`).then(r => r.json()));
search$.subscribe({
next: async (searchPromise) => {
try {
const results = await searchPromise;
displayResults(results);
} catch (error) {
console.error('搜索失败:', error);
}
},
error: err => console.error('Observable错误:', err)
});
function displayResults(results) {
searchResults.innerHTML = results
.map(item => `<div class="result">${item.title}</div>`)
.join('');
}
🛠️ 实战案例:构建一个并发文件上传器
class ConcurrentFileUploader {
constructor(options = {}) {
this.maxConcurrent = options.maxConcurrent || 3;
this.retryAttempts = options.retryAttempts || 3;
this.chunkSize = options.chunkSize || 1024 * 1024; // 1MB
this.uploadQueue = [];
this.activeUploads = new Map();
this.completedUploads = [];
this.failedUploads = [];
}
async uploadFiles(files) {
// 将文件添加到队列
for (const file of files) {
this.uploadQueue.push({
file,
id: this.generateId(),
status: 'pending',
progress: 0,
attempts: 0
});
}
// 开始并发上传
const uploadPromises = [];
for (let i = 0; i < Math.min(this.maxConcurrent, this.uploadQueue.length); i++) {
uploadPromises.push(this.processUploadQueue());
}
await Promise.all(uploadPromises);
return {
completed: this.completedUploads,
failed: this.failedUploads
};
}
async processUploadQueue() {
while (this.uploadQueue.length > 0) {
const uploadTask = this.uploadQueue.shift();
if (!uploadTask) break;
try {
await this.uploadFile(uploadTask);
this.completedUploads.push(uploadTask);
} catch (error) {
uploadTask.error = error;
this.failedUploads.push(uploadTask);
}
}
}
async uploadFile(uploadTask) {
const { file, id } = uploadTask;
this.activeUploads.set(id, uploadTask);
try {
// 大文件分块上传
if (file.size > this.chunkSize) {
await this.uploadFileInChunks(uploadTask);
} else {
await this.uploadSingleFile(uploadTask);
}
uploadTask.status = 'completed';
uploadTask.progress = 100;
} catch (error) {
uploadTask.attempts++;
if (uploadTask.attempts < this.retryAttempts) {
console.log(`重试上传文件 ${file.name}, 第${uploadTask.attempts}次尝试`);
await this.delay(1000 * uploadTask.attempts); // 指数退避
return this.uploadFile(uploadTask);
}
uploadTask.status = 'failed';
throw error;
} finally {
this.activeUploads.delete(id);
}
}
async uploadFileInChunks(uploadTask) {
const { file, id } = uploadTask;
const totalChunks = Math.ceil(file.size / this.chunkSize);
// 初始化分块上传
const uploadId = await this.initializeChunkedUpload(file);
const uploadedChunks = [];
try {
// 并发上传分块
const chunkPromises = [];
for (let i = 0; i < totalChunks; i++) {
const start = i * this.chunkSize;
const end = Math.min(start + this.chunkSize, file.size);
const chunk = file.slice(start, end);
chunkPromises.push(
this.uploadChunk(uploadId, i, chunk, uploadTask)
);
// 限制并发分块数量
if (chunkPromises.length >= this.maxConcurrent) {
const results = await Promise.all(chunkPromises);
uploadedChunks.push(...results);
chunkPromises.length = 0;
// 更新进度
uploadTask.progress = (uploadedChunks.length / totalChunks) * 100;
this.onProgress?.(uploadTask);
}
}
// 上传剩余分块
if (chunkPromises.length > 0) {
const results = await Promise.all(chunkPromises);
uploadedChunks.push(...results);
}
// 完成分块上传
await this.completeChunkedUpload(uploadId, uploadedChunks);
} catch (error) {
await this.abortChunkedUpload(uploadId);
throw error;
}
}
async uploadSingleFile(uploadTask) {
const { file } = uploadTask;
const formData = new FormData();
formData.append('file', file);
const response = await fetch('/api/upload', {
method: 'POST',
body: formData,
onUploadProgress: (event) => {
uploadTask.progress = (event.loaded / event.total) * 100;
this.onProgress?.(uploadTask);
}
});
if (!response.ok) {
throw new Error(`上传失败: ${response.statusText}`);
}
return response.json();
}
async uploadChunk(uploadId, chunkIndex, chunk, uploadTask) {
const formData = new FormData();
formData.append('uploadId', uploadId);
formData.append('chunkIndex', chunkIndex);
formData.append('chunk', chunk);
const response = await fetch('/api/upload/chunk', {
method: 'POST',
body: formData
});
if (!response.ok) {
throw new Error(`分块${chunkIndex}上传失败`);
}
return { chunkIndex, etag: response.headers.get('ETag') };
}
async initializeChunkedUpload(file) {
const response = await fetch('/api/upload/init', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
fileName: file.name,
fileSize: file.size,
contentType: file.type
})
});
const { uploadId } = await response.json();
return uploadId;
}
async completeChunkedUpload(uploadId, chunks) {
const response = await fetch('/api/upload/complete', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ uploadId, chunks })
});
return response.json();
}
async abortChunkedUpload(uploadId) {
await fetch(`/api/upload/abort/${uploadId}`, { method: 'DELETE' });
}
generateId() {
return Date.now().toString(36) + Math.random().toString(36).substr(2);
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// 获取上传状态
getUploadStatus() {
return {
pending: this.uploadQueue.length,
active: this.activeUploads.size,
completed: this.completedUploads.length,
failed: this.failedUploads.length
};
}
}
// 使用文件上传器
const uploader = new ConcurrentFileUploader({
maxConcurrent: 3,
retryAttempts: 3,
chunkSize: 2 * 1024 * 1024 // 2MB
});
// 监听进度
uploader.onProgress = (uploadTask) => {
console.log(`${uploadTask.file.name}: ${uploadTask.progress.toFixed(1)}%`);
};
// 处理文件选择
document.getElementById('fileInput').addEventListener('change', async (event) => {
const files = Array.from(event.target.files);
try {
const result = await uploader.uploadFiles(files);
console.log('上传完成:', result);
} catch (error) {
console.error('上传失败:', error);
}
});
⚖️ 并发编程的最佳实践
✅ 优势
- 提高性能:充分利用系统资源
- 改善用户体验:避免界面阻塞
- 增强可扩展性:能处理更多并发请求
- 提高吞吐量:同时处理多个任务
⚠️ 注意事项
- 竞态条件:多个任务访问共享资源
- 内存泄漏:未正确清理异步任务
- 错误处理:异步错误的传播和处理
- 调试困难:异步代码的调试复杂性
🎯 最佳实践
// 1. 合理控制并发数量
class ConcurrencyController {
constructor(limit = 5) {
this.limit = limit;
this.running = 0;
this.queue = [];
}
async execute(task) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.process();
});
}
async process() {
if (this.running >= this.limit || this.queue.length === 0) {
return;
}
this.running++;
const { task, resolve, reject } = this.queue.shift();
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.running--;
this.process();
}
}
}
// 2. 超时和取消机制
class CancellablePromise {
constructor(executor) {
this.cancelled = false;
this.promise = new Promise((resolve, reject) => {
this.reject = reject;
executor(
(value) => {
if (!this.cancelled) resolve(value);
},
(reason) => {
if (!this.cancelled) reject(reason);
}
);
});
}
cancel() {
this.cancelled = true;
this.reject(new Error('操作已取消'));
}
static withTimeout(promise, timeout) {
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error('操作超时')), timeout);
});
return Promise.race([promise, timeoutPromise]);
}
}
// 3. 错误处理和重试机制
async function withRetry(fn, maxAttempts = 3, delay = 1000) {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
if (attempt === maxAttempts) throw error;
console.log(`第${attempt}次尝试失败,${delay}ms后重试`);
await new Promise(resolve => setTimeout(resolve, delay));
delay *= 2; // 指数退避
}
}
}
📚 总结
并发编程是现代应用开发的核心技能,它让我们能够构建响应迅速、性能优异的应用程序。JavaScript提供了丰富的并发编程工具,从基础的Promise到高级的Web Workers。
核心要点:
- 理解事件循环机制
- 掌握Promise和async/await
- 合理控制并发数量
- 处理错误和超时
- 使用Web Workers处理CPU密集型任务
🔮 下节预告
下一课,我们将探索现代编程范式,包括响应式编程、微服务架构、函数式响应式编程等前沿概念,了解编程范式的最新发展趋势。
💭 思考题
- 什么情况下应该使用Web Workers而不是async/await?
- 如何设计一个既支持并发又能避免竞态条件的系统?
- 在你的项目中,哪些场景可以应用并发编程来提升性能?
并发编程让程序变得更加高效和响应迅速,但也带来了复杂性。掌握好并发编程的原理和最佳实践,是成为优秀开发者的必经之路!