前端的竞争态请求处理

版权声明:原创内容,转载请标明作者和出处。

在计算机科学中,竞争条件(Race Condition)是指系统的输出依赖于事件或进程的时序或顺序。当多个进程访问和操作共享数据,并且最终的结果取决于进程运行的精确时序时,就会发生竞争条件。常见的解决方案有:互斥锁(Mutexes)、信号量(Semaphores)、监视器(Monitors)、条件变量(Condition Variables)、原子操作(Atomic Operations)、锁定和事务内存(Transactional Memory)、消息传递(Message Passing)、顺序一致性(Sequential Consistency)、避免共享状态、软件事务内存(Software Transactional Memory, STM)。

本文仅阐述在前端领域中常见的竞争场景,整理一些常见的解决方案。

问题来源

如图现在有这么一个搜索框,我们希望用户停止输入后自动发出请求获取搜索结果。我们很容易想到使用 debounce 来进行防抖。但 debounce 在于中间等待的时间可能会很久,使用 throttle 能够给用户更加即时的反馈。
在这种场景中,无论你是使用 throttle 还是 debounce ,多次发出的请求,返回成功后会竞争更新搜索结果面板,那么到底该以哪个结果为准呢?如何保证用户输入的关键词和结果是匹配上的?这就是前端最典型的竞争态场景。

节流防抖:throttle、debounce

// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
  /** fetch data */
};
const updateSearchResult = async (inputText) => { 
  const result = await searchReq(inputText); 
  console.log('result is', result);
};

const searchReqThrottled = throttle(updateSearchResult, 300, {
  leading: true,
});

async function test() {
  await delay(0);
  searchReqThrottled('fafa');
  await delay(310);
  searchReqThrottled('kaixin');
  // 请问会输出2遍 result is 吗?
}
// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
  /** fetch data */
};
const updateSearchResult = async (inputText) => {
  const result = await searchReq(inputText);
  console.log('result is', result);
};

const searchReqDebounced = debounce(updateSearchResult, 300, {
  leading: true,
});

async function test() {
  await delay(0);
  searchReqDebounced('fafa');
  await delay(310);
  searchReqDebounced('kaixin');
  // 请问会输出2遍 result is 吗?
}

按如上实现,throttle 和 debounce 保证了不频繁触发请求,但是无法保证用户看到的结果是当前搜索词的最新结果

问题来了,如何解决先请求后返回的问题?

解法一:时间戳

let latestTimestamp = 0;

const searchReq = (inputText, timestamp) => {
  /** fetch data */
};

const updateSearchResult = async (inputText) => {
  const currentTimestamp = Date.now(); // 获取当前时间戳
  latestTimestamp = currentTimestamp; // 更新最新时间戳

  const result = await searchReq(inputText, currentTimestamp);

  // 检查返回的结果是否匹配最新时间戳
  if (result.timestamp === latestTimestamp) {
    console.log('result is', result.inputText);
    // 更新UI等操作
  } else {
    // 忽略不是最新请求的结果
    console.log('Ignoring outdated result for', result.inputText);
  }
};

// throttle  debounce 函数的实现可以保持不变

解法二:TakeLatestPromise

如果是单个请求,我们可以抽象一种名为 TakeLastePromise 函数来包装真正的请求函数。这样可以做到在第二次调用 updateSearchResult 时,前一次的请求被取消,保证数据一致性。TakeLatestPromise 的代码实现可见我的另一篇博文

// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
  /** fetch data */
};
const updateSearchResult = takeLatestPromise(async (inputText) => {
  const result = await searchReq(inputText);
  console.log('result is', result);
});

const searchReqDebounced = updateSearchResult; 

async function test() {
  await delay(0);
  searchReqDebounced('fafa');
  await delay(310);
  searchReqDebounced('kaixin');
  // 会输出几次 result is 
}

当然,我们也可以结合 debounce/throttle 来组合使用 👇🏻

// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
  /** fetch data */
};
const updateSearchResult = takeLatestPromise(async (inputText) => {
  const result = await searchReq(inputText);
  console.log('result is', result);
});

const searchReqDebounced = debounce(updateSearchResult, 300, {
  leading: true,
});

async function test() {
  await delay(0);
  searchReqDebounced('fafa');
  await delay(310);
  searchReqDebounced('kaixin');
  // 会输出1遍 result is
}

解法三:Rx.js

Rx.js 的 switchMap 函数可以做到第二次信号触发时使用调用最新的数据,丢弃前一次的结果。这里不展开 Rx.js 的使用方式,直接上代码:

import { debounceTime, switchMap, takeUntil } from 'rxjs/operators';
import { Subject } from 'rxjs';

// searchReq 第一次请求 900ms返回,第二次 300ms返回。
const searchReq = () => {
  /** fetch data */
};

const search$ = new Subject();
search$
  .pipe(
    // 这里可以用 throttleTime(300) 来实现 throttle 的限流
    debounceTime(300),
    switchMap((inputText) => searchReq(inputText))
  )
  .subscribe((result) => console.log('result is', result));

async function test() {
  search$.next('fafa');
  setTimeout(() => search$.next('kaixin'), 310);
}

test();

问题变种:

2 个不同位置的按钮 A B,AB 点击后分别触发请求,A 请求发出后展示 loading,loading 过程中 B 点击需要展示 loading 态并等待 A 发出的请求完成,2 个按钮要共享 loading 状态,只要请求成功后,AB 按钮 loading 态立刻消失。

const aLoading = false;
const fetchDataA = async () => {
  aLoading = true;
  await fetchReq();
  aLoading = false;
};
const bLoading = false;
const fetchDataB = async () => {
  bLoading = true;
  await fetchReq();
  bLoading = false;
};
// fetchReq 该如何实现?

解法:ReusePromise

const fetchReq = reusePromise(fetchReqFn, (...args) => {
  return '';
});

async function fetchReqFn(...args: unknown) {
  return await fetch('/api/xxx');
}

export function reusePromise<Params, Result>(
  api: (params: Params) => Promise<Result>,
  generateHash: (params: Params) => string
): (params: Params) => Promise<Result> {
  /**
   * 基于 hash 来做请求状态的复用
   */
  const duringAPIPromiseHashMappings: Record<
    string,
    Promise<Result> | undefined
  > = {};

  return (params) => {
    const currentHash = generateHash(params);

    const existPromise = duringAPIPromiseHashMappings[currentHash];
    // 如果存在请求中的数据结构
    if (existPromise) {
      // 等待 Promise 结束
      return existPromise;
    }

    // 等待这次流程处理结束,将 hash 的缓存删除
    const res = (duringAPIPromiseHashMappings[currentHash] = api(
      params
    ).finally(() => {
      delete duringAPIPromiseHashMappings[currentHash];
    }));

    return res;
  };
}

当然,类似的问题我们都可以通过 Rx.js 来实现:

import { from, Observable, throwError } from 'rxjs';
import { shareReplay, catchError } from 'rxjs/operators';

const fetchReqFn = (params: unknown): Observable<Response> => {
  const fetch$ = from(fetch(`https://api.example.com/data?params=${params}`));
  return fetch$.pipe(
    catchError((err) => {
      console.error('Error occurred:', err);
      return throwError(err); // 返回一个发出错误信息的 Observable
    }),
    shareReplay(1)
  );
};

const fetchReq = async (params: unknown) => {
  try {
    return await fetchReqFn(params).toPromise();
  } catch (err) {
    console.error('Failed to fetch data:', err);
    throw err; // 在这里,我们可以决定如何处理错误
  }
};

问题升级:

搜索场景 + 过滤器

const filterResult = async ({ searchText, order, day }) => {
  const params = {
    searchText,
    order,
    dateRange: {
      start: startOfDay(day),
      end: endOfDay(day),
    },
  };
  return Axios.get('/api/search', { params });
};

用户操作时,如何保证用户看到的结果是最后一次请求返回的数据。

function onSearchTextChange(newSearchText: string) {
  filterResult({
    searchText: newSearchText,
  });
}

function onOrderChange(newOrder: 'desc' | 'sec') {
  filterResult({
    order: newOrder,
  });
}

function dayChange(newDay: Date) {
  filterResult({
    day: newDay,
  });
}

方案一:通过请求 id 来保证

let currentReqId = genUUID();

const filterResultFn = async ({ searchText, order, day, reqId }) => {
  const params = {
    searchText,
    order,
    dateRange: {
      start: startOfDay(day),
      end: endOfDay(day),
    },
  };
  const result = await Axios.get('/api/search', { params });

  if (reqId !== currentReqId) {
    return;
  } else {
    return result;
  }
};

const filterResult = (params) => {
  const newReqId = genUUID();
  currentReqId = newReqId;
  return filterResultFn({ ...params, reqId: newReqId });
};

另一种方案是 Rx.js

import { Subject, combineLatest } from 'rxjs';
import { switchMap, startWith } from 'rxjs/operators';
import axios from 'axios';

// 为每个可能改变的值创建一个 Subject
const searchText$ = new Subject<string>();
const order$ = new Subject<'desc' | 'asc'>();
const day$ = new Subject<Date>();

// 当任何一个 Subject 发出新的值时,都会触发这个函数
const fetchResults = ([searchText, order, day]: [
  string,
  'desc' | 'asc',
  Date
]) => {
  const params = {
    searchText,
    order,
    dateRange: {
      start: startOfDay(day),
      end: endOfDay(day),
    },
  };
  // 返回一个 Observable,这个 Observable 会发出请求的结果
  return from(axios.get('/api/search', { params }));
};

// 使用 combineLatest 来组合这些 Subject
const results$ = combineLatest([
  searchText$.pipe(startWith('')), // 使用 startWith 来设置一个初始值
  order$.pipe(startWith('desc')),
  day$.pipe(startWith(new Date())),
]).pipe(
  switchMap(fetchResults) // 使用 switchMap 来确保总是获取最后一次请求的结果
);

// 订阅 results$,以便在请求的结果到达时进行处理
results$.subscribe((results) => {
  // 在这里处理结果
  console.log(results);
});

// 触发新的请求
function onSearchTextChange(newSearchText: string) {
  searchText$.next(newSearchText);
}

function onOrderChange(newOrder: 'desc' | 'asc') {
  order$.next(newOrder);
}

function dayChange(newDay: Date) {
  day$.next(newDay);
}

问题再升级:

如果此时再升级一下场景的复杂度:搜索场景 + 过滤器 + 推拉结合。

// 增加一个定时器定时刷新,又该如何保证最终展示的结果是最新一次请求的返回呢?
setInterval(() => {
  fetchResults(currentParams);
});

可以通过将定时器的逻辑也使用 RxJS 来实现,并且将它和过滤条件组合在一起。这样,无论是由于过滤条件的改变,还是由于定时器的触发,都会发送一个新的请求。

import { interval, Subject, combineLatest } from 'rxjs';
import { startWith, map, switchMap } from 'rxjs/operators';
import axios from 'axios';

// 为每个可能改变的值创建一个 Subject
const searchText$ = new Subject<string>();
const order$ = new Subject<'desc' | 'asc'>();
const day$ = new Subject<Date>();

// 创建一个 interval Observable,每隔一定时间就发出一个值
const timer$ = interval(10000); // 每隔 10 秒刷新一次

// 当任何一个 Subject 或者 timer$ 发出新的值时,都会触发这个函数
const fetchResults = ([searchText, order, day]: [
  string,
  'desc' | 'asc',
  Date
]) => {
  const params = {
    searchText,
    order,
    dateRange: {
      start: startOfDay(day),
      end: endOfDay(day),
    },
  };
  // 返回一个 Observable,这个 Observable 会发出请求的结果
  return from(axios.get('/api/search', { params }));
};

// 使用 combineLatest 来组合这些 Subject  timer$
// timer$ 发出的值实际上我们并不关心,但它的变化会触发新的请求
const results$ = combineLatest([
  searchText$.pipe(startWith('')), // 使用 startWith 来设置一个初始值
  order$.pipe(startWith('desc')),
  day$.pipe(startWith(new Date())),
  timer$,
]).pipe(
  map(([searchText, order, day]) => [searchText, order, day]), // 不关心 timer$ 的值,所以在这里过滤掉
  switchMap(fetchResults) // 使用 switchMap 来确保总是获取最后一次请求的结果
);

// 订阅 results$,以便在请求的结果到达时进行处理
results$.subscribe((results) => {
  // 在这里处理结果
  console.log(results);
});

// 触发新的请求
function onSearchTextChange(newSearchText: string) {
  searchText$.next(newSearchText);
}

function onOrderChange(newOrder: 'desc' | 'asc') {
  order$.next(newOrder);
}

function dayChange(newDay: Date) {
  day$.next(newDay);
}

但是这种处理方式,我们没法复用。如何抽象一种通用的处理方式呢?

抽象 & 复用

我们假设有这么一种复用的函数:

const filterResultFn = async ({ searchText, order, day }) => {
  const params = {
    searchText,
    order,
    dateRange: {
      start: startOfDay(day),
      end: endOfDay(day),
    },
  };
  const result = await Axios.get('/api/search', { params });
  return result;
};

const filterResultReq = makeTakeLatestSubject(filterResultFn);
filterResultReq.next(params);
filterResultReq.subscribe((result) => {
  // handle result
  console.log('result is:', result);
});

在 makeTakeLatestSubject 中自动封装好多次请求的处理,只处理最后一次请求的结果。对应的实现如下:

makeTakeLatestSubject

import {
  Subscription,
  Subject,
  Observable,
  from,
  switchMap,
  catchError,
  of,
} from 'rxjs';

type MakeTakeLatestSubjectReturn<FnParams, FnResult> = {
  subscribe: (
    cb: (data: FnResult) => void,
    err?: (error: any) => void
  ) => Subscription;
  next: (params: FnParams) => void;
  unsubscribe: () => void;
};

/**
 * 用于处理请求函数的竞争问题,当请求函数被调用时,如果上一次的请求没有返回的情况下会取消上一次的请求
 */
export function makeTakeLatestSubject<FnParams, FnResult>(
  fn: (p: FnParams) => Promise<FnResult>
): MakeTakeLatestSubjectReturn<FnParams, FnResult> {
  const request$ = new Subject<FnParams>();
  const err$ = new Subject<unknown>();

  const resolve$: Observable<FnResult> = request$.pipe(
    switchMap((data) => {
      return from<PromiseLike<FnResult>>(fn(data));
    }),
    catchError((err, caught) => {
      err$.next(err);
      return caught;
    })
  );

  return {
    subscribe: (cb, err) => {
      const cbSubscription = resolve$.subscribe(cb);
      const errSubscription = err$.subscribe(err);
      cbSubscription.add(errSubscription);
      return cbSubscription;
    },
    unsubscribe: () => {
      request$.unsubscribe();
    },
    next: (params) => {
      request$.next(params);
    },
  };
}

但是这个函数也不够好用。存在以下问题:

  1. 传入的函数 fn 只能传入一个参数
  2. 很多时候我们需要知道当前取到的请求结果对应的是哪一次请求,请求时间是什么。这时候我们需要再进行封装一下。

进一步封装

import {
  Subscription,
  Subject,
  Observable,
  from,
  switchMap,
  catchError,
  of,
} from 'rxjs';

type MakeTakeLatestSubjectReturn<FnParams extends any[], FnResult> = {
  subscribe: (
    cb: (
      data:
        | { params: FnParams; result: FnResult }
        | { params: FnParams; error: Error }
    ) => void,
    err?: (error: any) => void
  ) => Subscription;
  next: (...params: FnParams) => void;
  unsubscribe: () => void;
};

/**
 * 用于处理请求函数的竞争问题,当请求函数被调用时,如果上一次的请求没有返回的情况下会取消上一次的请求
 */
export function makeTakeLatestSubject<FnParams extends any[], FnResult>(
  fn: (...params: FnParams) => Promise<FnResult>
): MakeTakeLatestSubjectReturn<FnParams, FnResult> {
  const request$ = new Subject<FnParams>();
  const err$ = new Subject<unknown>();

  const resolve$: Observable<
    { params: FnParams; result: FnResult } | { params: FnParams; error: Error }
  > = request$.pipe(
    switchMap((data) => {
      return from<
        PromiseLike<
          | { params: FnParams; result: FnResult }
          | { params: FnParams; error: Error }
        >
      >(
        fn(...data).then(
          (result) => {
            return {
              params: data,
              result: result,
            };
          },
          (error) => {
            return {
              params: data,
              error,
            };
          }
        )
      );
    }),
    catchError((err, caught) => {
      err$.next(err);
      return caught;
    })
  );

  return {
    subscribe: (cb, err) => {
      const cbSubscription = resolve$.subscribe(cb);
      const errSubscription = err$.subscribe(err);
      cbSubscription.add(errSubscription);
      return cbSubscription;
    },
    unsubscribe: () => {
      request$.unsubscribe();
    },
    next: (...params) => {
      request$.next(params);
    },
  };
}

进阶问题

到此为止,我们还有一些场景没有考虑:

  1. 如果需要给请求增加重试机制该怎么办?
  2. 如果需要给请求增加手动取消该怎么办?

这些问题可以在我的 常用 Promise 解决方案 中找到答案。