版权声明:原创内容,转载请标明作者和出处。
在计算机科学中,竞争条件(Race Condition)是指系统的输出依赖于事件或进程的时序或顺序。当多个进程访问和操作共享数据,并且最终的结果取决于进程运行的精确时序时,就会发生竞争条件。常见的解决方案有:互斥锁(Mutexes)、信号量(Semaphores)、监视器(Monitors)、条件变量(Condition Variables)、原子操作(Atomic Operations)、锁定和事务内存(Transactional Memory)、消息传递(Message Passing)、顺序一致性(Sequential Consistency)、避免共享状态、软件事务内存(Software Transactional Memory, STM)。
本文仅阐述在前端领域中常见的竞争场景,整理一些常见的解决方案。
如图现在有这么一个搜索框,我们希望用户停止输入后自动发出请求获取搜索结果。我们很容易想到使用 debounce 来进行防抖。但 debounce 在于中间等待的时间可能会很久,使用 throttle 能够给用户更加即时的反馈。
在这种场景中,无论你是使用 throttle 还是 debounce ,多次发出的请求,返回成功后会竞争更新搜索结果面板,那么到底该以哪个结果为准呢?如何保证用户输入的关键词和结果是匹配上的?这就是前端最典型的竞争态场景。
// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
/** fetch data */
};
const updateSearchResult = async (inputText) => {
const result = await searchReq(inputText);
console.log('result is', result);
};
const searchReqThrottled = throttle(updateSearchResult, 300, {
leading: true,
});
async function test() {
await delay(0);
searchReqThrottled('fafa');
await delay(310);
searchReqThrottled('kaixin');
// 请问会输出2遍 result is 吗?
}
// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
/** fetch data */
};
const updateSearchResult = async (inputText) => {
const result = await searchReq(inputText);
console.log('result is', result);
};
const searchReqDebounced = debounce(updateSearchResult, 300, {
leading: true,
});
async function test() {
await delay(0);
searchReqDebounced('fafa');
await delay(310);
searchReqDebounced('kaixin');
// 请问会输出2遍 result is 吗?
}
按如上实现,throttle 和 debounce 保证了不频繁触发请求,但是无法保证用户看到的结果是当前搜索词的最新结果。
问题来了,如何解决先请求后返回的问题?
let latestTimestamp = 0;
const searchReq = (inputText, timestamp) => {
/** fetch data */
};
const updateSearchResult = async (inputText) => {
const currentTimestamp = Date.now(); // 获取当前时间戳
latestTimestamp = currentTimestamp; // 更新最新时间戳
const result = await searchReq(inputText, currentTimestamp);
// 检查返回的结果是否匹配最新时间戳
if (result.timestamp === latestTimestamp) {
console.log('result is', result.inputText);
// 更新UI等操作
} else {
// 忽略不是最新请求的结果
console.log('Ignoring outdated result for', result.inputText);
}
};
// throttle 或 debounce 函数的实现可以保持不变
如果是单个请求,我们可以抽象一种名为 TakeLastePromise 函数来包装真正的请求函数。这样可以做到在第二次调用 updateSearchResult 时,前一次的请求被取消,保证数据一致性。TakeLatestPromise 的代码实现可见我的另一篇博文
// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
/** fetch data */
};
const updateSearchResult = takeLatestPromise(async (inputText) => {
const result = await searchReq(inputText);
console.log('result is', result);
});
const searchReqDebounced = updateSearchResult;
async function test() {
await delay(0);
searchReqDebounced('fafa');
await delay(310);
searchReqDebounced('kaixin');
// 会输出几次 result is ?
}
当然,我们也可以结合 debounce/throttle 来组合使用 👇🏻
// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
/** fetch data */
};
const updateSearchResult = takeLatestPromise(async (inputText) => {
const result = await searchReq(inputText);
console.log('result is', result);
});
const searchReqDebounced = debounce(updateSearchResult, 300, {
leading: true,
});
async function test() {
await delay(0);
searchReqDebounced('fafa');
await delay(310);
searchReqDebounced('kaixin');
// 会输出1遍 result is
}
Rx.js 的 switchMap 函数可以做到第二次信号触发时使用调用最新的数据,丢弃前一次的结果。这里不展开 Rx.js 的使用方式,直接上代码:
import { debounceTime, switchMap, takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';
// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
/** fetch data */
};
const search$ = new Subject();
search$
.pipe(
// 这里可以用 throttleTime(300) 来实现 throttle 的限流
debounceTime(300),
switchMap((inputText) => searchReq(inputText))
)
.subscribe((result) => console.log('result is', result));
async function test() {
search$.next('fafa');
setTimeout(() => search$.next('kaixin'), 310);
}
test();
2 个不同位置的按钮 A B,AB 点击后分别触发请求,A 请求发出后展示 loading,loading 过程中 B 点击需要展示 loading 态并等待 A 发出的请求完成,2 个按钮要共享 loading 状态,只要请求成功后,AB 按钮 loading 态立刻消失。
const aLoading = false;
const fetchDataA = async () => {
aLoading = true;
await fetchReq();
aLoading = false;
};
const bLoading = false;
const fetchDataB = async () => {
bLoading = true;
await fetchReq();
bLoading = false;
};
// fetchReq 该如何实现?
const fetchReq = reusePromise(fetchReqFn, (...args) => {
return '';
});
async function fetchReqFn(...args: unknown) {
return await fetch('/api/xxx');
}
export function reusePromise<Params, Result>(
api: (params: Params) => Promise<Result>,
generateHash: (params: Params) => string
): (params: Params) => Promise<Result> {
/**
* 基于 hash 来做请求状态的复用
*/
const duringAPIPromiseHashMappings: Record<
string,
Promise<Result> | undefined
> = {};
return (params) => {
const currentHash = generateHash(params);
const existPromise = duringAPIPromiseHashMappings[currentHash];
// 如果存在请求中的数据结构
if (existPromise) {
// 等待 Promise 结束
return existPromise;
}
// 等待这次流程处理结束,将 hash 的缓存删除
const res = (duringAPIPromiseHashMappings[currentHash] = api(
params
).finally(() => {
delete duringAPIPromiseHashMappings[currentHash];
}));
return res;
};
}
当然,类似的问题我们都可以通过 Rx.js 来实现:
import { from, Observable, throwError } from 'rxjs';
import { shareReplay, catchError } from 'rxjs/operators';
const fetchReqFn = (params: unknown): Observable<Response> => {
const fetch$ = from(fetch(`https://api.example.com/data?params=${params}`));
return fetch$.pipe(
catchError((err) => {
console.error('Error occurred:', err);
return throwError(err); // 返回一个发出错误信息的 Observable
}),
shareReplay(1)
);
};
const fetchReq = async (params: unknown) => {
try {
return await fetchReqFn(params).toPromise();
} catch (err) {
console.error('Failed to fetch data:', err);
throw err; // 在这里,我们可以决定如何处理错误
}
};
搜索场景 + 过滤器
const filterResult = async ({ searchText, order, day }) => {
const params = {
searchText,
order,
dateRange: {
start: startOfDay(day),
end: endOfDay(day),
},
};
return Axios.get('/api/search', { params });
};
用户操作时,如何保证用户看到的结果是最后一次请求返回的数据。
function onSearchTextChange(newSearchText: string) {
filterResult({
searchText: newSearchText,
});
}
function onOrderChange(newOrder: 'desc' | 'sec') {
filterResult({
order: newOrder,
});
}
function dayChange(newDay: Date) {
filterResult({
day: newDay,
});
}
let currentReqId = genUUID();
const filterResultFn = async ({ searchText, order, day, reqId }) => {
const params = {
searchText,
order,
dateRange: {
start: startOfDay(day),
end: endOfDay(day),
},
};
const result = await Axios.get('/api/search', { params });
if (reqId !== currentReqId) {
return;
} else {
return result;
}
};
const filterResult = (params) => {
const newReqId = genUUID();
currentReqId = newReqId;
return filterResultFn({ ...params, reqId: newReqId });
};
import { Subject, combineLatest } from 'rxjs';
import { switchMap, startWith } from 'rxjs/operators';
import axios from 'axios';
// 为每个可能改变的值创建一个 Subject
const searchText$ = new Subject<string>();
const order$ = new Subject<'desc' | 'asc'>();
const day$ = new Subject<Date>();
// 当任何一个 Subject 发出新的值时,都会触发这个函数
const fetchResults = ([searchText, order, day]: [
string,
'desc' | 'asc',
Date
]) => {
const params = {
searchText,
order,
dateRange: {
start: startOfDay(day),
end: endOfDay(day),
},
};
// 返回一个 Observable,这个 Observable 会发出请求的结果
return from(axios.get('/api/search', { params }));
};
// 使用 combineLatest 来组合这些 Subject
const results$ = combineLatest([
searchText$.pipe(startWith('')), // 使用 startWith 来设置一个初始值
order$.pipe(startWith('desc')),
day$.pipe(startWith(new Date())),
]).pipe(
switchMap(fetchResults) // 使用 switchMap 来确保总是获取最后一次请求的结果
);
// 订阅 results$,以便在请求的结果到达时进行处理
results$.subscribe((results) => {
// 在这里处理结果
console.log(results);
});
// 触发新的请求
function onSearchTextChange(newSearchText: string) {
searchText$.next(newSearchText);
}
function onOrderChange(newOrder: 'desc' | 'asc') {
order$.next(newOrder);
}
function dayChange(newDay: Date) {
day$.next(newDay);
}
如果此时再升级一下场景的复杂度:搜索场景 + 过滤器 + 推拉结合。
// 增加一个定时器定时刷新,又该如何保证最终展示的结果是最新一次请求的返回呢?
setInterval(() => {
fetchResults(currentParams);
});
可以通过将定时器的逻辑也使用 RxJS 来实现,并且将它和过滤条件组合在一起。这样,无论是由于过滤条件的改变,还是由于定时器的触发,都会发送一个新的请求。
import { interval, Subject, combineLatest } from 'rxjs';
import { startWith, map, switchMap } from 'rxjs/operators';
import axios from 'axios';
// 为每个可能改变的值创建一个 Subject
const searchText$ = new Subject<string>();
const order$ = new Subject<'desc' | 'asc'>();
const day$ = new Subject<Date>();
// 创建一个 interval Observable,每隔一定时间就发出一个值
const timer$ = interval(10000); // 每隔 10 秒刷新一次
// 当任何一个 Subject 或者 timer$ 发出新的值时,都会触发这个函数
const fetchResults = ([searchText, order, day]: [
string,
'desc' | 'asc',
Date
]) => {
const params = {
searchText,
order,
dateRange: {
start: startOfDay(day),
end: endOfDay(day),
},
};
// 返回一个 Observable,这个 Observable 会发出请求的结果
return from(axios.get('/api/search', { params }));
};
// 使用 combineLatest 来组合这些 Subject 和 timer$
// timer$ 发出的值实际上我们并不关心,但它的变化会触发新的请求
const results$ = combineLatest([
searchText$.pipe(startWith('')), // 使用 startWith 来设置一个初始值
order$.pipe(startWith('desc')),
day$.pipe(startWith(new Date())),
timer$,
]).pipe(
map(([searchText, order, day]) => [searchText, order, day]), // 不关心 timer$ 的值,所以在这里过滤掉
switchMap(fetchResults) // 使用 switchMap 来确保总是获取最后一次请求的结果
);
// 订阅 results$,以便在请求的结果到达时进行处理
results$.subscribe((results) => {
// 在这里处理结果
console.log(results);
});
// 触发新的请求
function onSearchTextChange(newSearchText: string) {
searchText$.next(newSearchText);
}
function onOrderChange(newOrder: 'desc' | 'asc') {
order$.next(newOrder);
}
function dayChange(newDay: Date) {
day$.next(newDay);
}
但是这种处理方式,我们没法复用。如何抽象一种通用的处理方式呢?
我们假设有这么一种复用的函数:
const filterResultFn = async ({ searchText, order, day }) => {
const params = {
searchText,
order,
dateRange: {
start: startOfDay(day),
end: endOfDay(day),
},
};
const result = await Axios.get('/api/search', { params });
return result;
};
const filterResultReq = makeTakeLatestSubject(filterResultFn);
filterResultReq.next(params);
filterResultReq.subscribe((result) => {
// handle result
console.log('result is:', result);
});
在 makeTakeLatestSubject 中自动封装好多次请求的处理,只处理最后一次请求的结果。对应的实现如下:
import {
Subscription,
Subject,
Observable,
from,
switchMap,
catchError,
of,
} from 'rxjs';
type MakeTakeLatestSubjectReturn<FnParams, FnResult> = {
subscribe: (
cb: (data: FnResult) => void,
err?: (error: any) => void
) => Subscription;
next: (params: FnParams) => void;
unsubscribe: () => void;
};
/**
* 用于处理请求函数的竞争问题,当请求函数被调用时,如果上一次的请求没有返回的情况下会取消上一次的请求
*/
export function makeTakeLatestSubject<FnParams, FnResult>(
fn: (p: FnParams) => Promise<FnResult>
): MakeTakeLatestSubjectReturn<FnParams, FnResult> {
const request$ = new Subject<FnParams>();
const err$ = new Subject<unknown>();
const resolve$: Observable<FnResult> = request$.pipe(
switchMap((data) => {
return from<PromiseLike<FnResult>>(fn(data));
}),
catchError((err, caught) => {
err$.next(err);
return caught;
})
);
return {
subscribe: (cb, err) => {
const cbSubscription = resolve$.subscribe(cb);
const errSubscription = err$.subscribe(err);
cbSubscription.add(errSubscription);
return cbSubscription;
},
unsubscribe: () => {
request$.unsubscribe();
},
next: (params) => {
request$.next(params);
},
};
}
但是这个函数也不够好用。存在以下问题:
import {
Subscription,
Subject,
Observable,
from,
switchMap,
catchError,
of,
} from 'rxjs';
type MakeTakeLatestSubjectReturn<FnParams extends any[], FnResult> = {
subscribe: (
cb: (
data:
| { params: FnParams; result: FnResult }
| { params: FnParams; error: Error }
) => void,
err?: (error: any) => void
) => Subscription;
next: (...params: FnParams) => void;
unsubscribe: () => void;
};
/**
* 用于处理请求函数的竞争问题,当请求函数被调用时,如果上一次的请求没有返回的情况下会取消上一次的请求
*/
export function makeTakeLatestSubject<FnParams extends any[], FnResult>(
fn: (...params: FnParams) => Promise<FnResult>
): MakeTakeLatestSubjectReturn<FnParams, FnResult> {
const request$ = new Subject<FnParams>();
const err$ = new Subject<unknown>();
const resolve$: Observable<
{ params: FnParams; result: FnResult } | { params: FnParams; error: Error }
> = request$.pipe(
switchMap((data) => {
return from<
PromiseLike<
| { params: FnParams; result: FnResult }
| { params: FnParams; error: Error }
>
>(
fn(...data).then(
(result) => {
return {
params: data,
result: result,
};
},
(error) => {
return {
params: data,
error,
};
}
)
);
}),
catchError((err, caught) => {
err$.next(err);
return caught;
})
);
return {
subscribe: (cb, err) => {
const cbSubscription = resolve$.subscribe(cb);
const errSubscription = err$.subscribe(err);
cbSubscription.add(errSubscription);
return cbSubscription;
},
unsubscribe: () => {
request$.unsubscribe();
},
next: (...params) => {
request$.next(params);
},
};
}
到此为止,我们还有一些场景没有考虑:
这些问题可以在我的 常用 Promise 解决方案 中找到答案。