Javascript的细节(四): array#sort

发表于更新于阅读时长 15 分钟

不同浏览器和不同标准下的数组sort方法

1. 标准

很容易就可以在现行标准里找到对于sort方法的定义. 通读标准可以发现大部分内容都很符合直觉, 当然其中也有一些值的注意的部分.

一致比较函数是指满足以下条件的函数:

  1. 对于任意 a, b 两值, 总是返回相同结果 v, 其中 v 必须是 Number 类型且不是 NaN
  2. 调用它不会修改原数组
  3. 满足自反性和传递性

满足以下任意条件时排序结果是由实现定义的(与 C++ 标准不同, ECMAScript 标准中不会规定未定义行为):

  1. 传入了比较函数且它不是一致比较函数
  2. 未出入比较函数且数组内容使得默认比较表现为不一致
  3. 数组是稀疏的, 且存在 0<=j<数组长度, proto 为该数组的原型, proto 上存在键为j.toString()的属性
  4. Object.isExtensible(array)返回真, 且存在 0<=j<数组长度, array[j]的[[Configurable]]属性为假
  5. 数组是怪异对象(例如 proxy)且定义了[[GET]][[SET]]等方法
  6. 存在 0<=j<数组长度, array[j]的[[Writable]]属性为假
  7. 未传入比较函数且对数组中的任意一个值进行toString会修改该数组或其原型

在比较两个值 a, b 的大小时, 可以概括为如下步骤:

  1. 如果 a 是数组空位, 返回 1, b 是则返回-1, 都是则返回+0
  2. 如果 a 是 undefined, 返回 1, b 是则返回-1, 都是则返回+0
  3. 如果传入了比较函数compare, 则调用compare.call(undefined, a, b), 将其返回值转换为数字并返回, 若转换出 NaN 则返回+0(尽管任何返回非数字或 NaN 的函数都会由于不是一致比较函数而使得整个比较行为变成由实现定义的)
  4. 否则把 a 和 b 转换成字符串进行比较

因此, 无论传入怎样的比较函数, undefined 值总会被排到数组的尾部, 在其后还可能有数组空位.

对比一下ES6 版本可以发现变化并不大, 仅有在此PR merge 之后才有明显的变化. 可以很清楚地看到, 这个 PR 只修改了一句话, 把"排序不一定是稳定的"改成了"排序必须是稳定的". 那么为什么会这么修改呢?

2. 实现

先说结论, 因为 V8 团队把数组排序的实现改成了稳定的, 所以他们顺带就去要求 TC39 把标准也改了.

V8 团队的这篇博文很好地说明了 V8 里排序算法的变迁. 众所周知, 快速排序由于其缓存友好性, 是应用最广泛的排序算法. STL 中的排序算法也是快排. 当然各种语言在实现快排时都会采用一些通用的优化, 例如:

  • 为了缓存友好性, 显然要实现成原地的, 因此很多语言的排序算法是不稳定的
  • 在数组短于一定长度时转用插入排序(旧版 V8 中阈值为 10)
  • 随机选取 pivot 或者比较头中尾选取 pivot, 以避免复杂度退化到 O(n^2)

但一些人, 包括Tim Peters, 认为快排仍然不够好, 所以发明了TimSort, 它是归并排序的一种变体, 并将它应用于 Python. 此后, Timsort 成为了安卓, V8 以及 Swift 的默认排序算法. 尽管这种算法有很多优势, 但是它的实现非常复杂, 所以不妨让我们来看看其他引擎是如何稳定的排序的. 除了 Chrome 以外的其他主流浏览器使用的 JS 引擎都使用了归并排序.

2.1 EDGE

Edge 浏览器使用的 ChakraCore 引擎使用了 JavaScript 来实现sort方法, 其核心算法定义在这里. Edge 团队非常精巧地实现了一个原地的, 自底向上的归并排序.

function (array, length, compareFn) {
    const buffer = []
    buffer.__proto__ = null

首先开辟数组buffer当作临时存放数据的空间, 同时为了访问速度考虑将其原型置为空

    let bucketSize = 2,
        lastSize = 1,
        position = 0
    const doubleLength = length + length

bucketSize用于存储这一迭代中每个子数组的长度, position则是当前迭代的位置. 此外, 为了性能起见缓存了lastSizedoubleLength, 以免每次都要进行计算.

    while (bucketSize < doubleLength) {
        while (position < length) {
            const left = position
            const mid = left + lastSize

之后就是主要内容, 两层循环. 外层循环把bucketSize增大至数组长度, 共运行 O(log n) 次; 内层循环按照bucketSize把整个数组切分为子数组进行处理. 因此整个程序的时间复杂度为 O(n log n).

            // perform a merge but only if it's necessary
            if (mid < length && compareFn(array[mid], array[mid - 1]) < 0) {

由于子数组的左右两部分都在上一次循环中排序过了, 因此如果左半部分的最后一个小于右半部分的第一个元素, 则整个子数组就是有序的, 可以跳过合并的过程

                let right = position + bucketSize
                right = right < length ? right : length
                let i = mid - 1,
                    j = 0,
                    k = mid

                while (k < right) {
                    buffer[j++] = array[k++]
                }

把右半部分存到buffer里去, 因此整个程序所需要的额外空间为 O(n/2)

                let rightElement = buffer[--j]
                let leftElement = array[i]

                for (;;) {
                    if (compareFn(rightElement, leftElement) < 0) {
                        array[--k] = leftElement
                        if (i > left) {
                            leftElement = array[--i]
                        } else {
                            array[--k] = rightElement
                            break
                        }
                    } else {
                        array[--k] = rightElement
                        if (j > 0) {
                            rightElement = buffer[--j]
                        } else {
                            break
                        }
                    }
                }

                while (j > 0) {
                    array[--k] = buffer[--j]
                }
            }

右半部分的数据已经没用了, 因此从右到左地对子数组的左半部分和buffer归并, 结果直接存放到子数组中. 如果左半部分先用完, 就用buffer的剩余部分填充子数组; 否则直接跳出循环

            position += bucketSize
        }
        position = 0
        lastSize = bucketSize
        bucketSize *= 2
    }

}

增大循环变量, 进入下一迭代.

2.2 Safari

读到这里你可能会有疑问, 为什么这么常用的方法是使用 JS 而非原生代码编写的. 事实上, 在现代 JS 引擎, 即解释器 -> 简单 JIT -> 完全 JIT 的三级架构下, 除了一部分启动时间的损失, 内置方法使用 JS 编写并不会带来特别大的性能损失. Safari 浏览器使用的 JavaScriptCore 里也是如此.

    var sortFunction;
    if (typeof comparator == "function")
        sortFunction = comparatorSort;
    else if (comparator === @undefined)
        sortFunction = stringSort;
    else
        @throwTypeError("Array.prototype.sort requires the comparsion function be a function or undefined");

JSCore 做了一些额外的优化. 如果没传入比较函数, 那就说明最终比较的值一定是字符串, 因此调用stringSort函数, 它会把数组中的每个值都转换成字符串之后再进行桶排序.

    function bucketSort(array, dst, bucket, depth)
    {
        if (bucket.length < 32 || depth > 32) {
            mergeSort(bucket, bucket.length, stringComparator);
            for (var i = 0; i < bucket.length; ++i)
                array[dst++] = bucket[i].value;
            return dst;
        }

        var buckets = [ ];
        for (var i = 0; i < bucket.length; ++i) {
            var entry = bucket[i];
            var string = entry.string;
            if (string.length == depth) {
                array[dst++] = entry.value;
                continue;
            }

            var c = string.@charCodeAt(depth);
            if (!buckets[c])
                buckets[c] = [ ];
            buckets[c][buckets[c].length] = entry;
        }

        for (var i = 0; i < buckets.length; ++i) {
            if (!buckets[i])
                continue;
            dst = bucketSort(array, dst, buckets[i], depth + 1);
        }

        return dst;
    }

这是一个非常朴素的桶排序的实现. 另外, 当桶数量过小或是递归深度过深时会转而使用归并排序. 而归并排序的实现则如下

    function mergeSort(array, valueCount, comparator)
    {
        var buffer = [ ];
        buffer.length = valueCount;

        var dst = buffer;
        var src = array;
        for (var width = 1; width < valueCount; width *= 2) {
            for (var srcIndex = 0; srcIndex < valueCount; srcIndex += 2 * width)
                merge(dst, src, srcIndex, valueCount, width, comparator);

            var tmp = src;
            src = dst;
            dst = tmp;
        }

        if (src != array) {
            for(var i = 0; i < valueCount; i++)
                array[i] = src[i];
        }

这里和 CharkaCore 一样, 都是使用了自底向上的方式, 而merge函数的实现则是

    function merge(dst, src, srcIndex, srcEnd, width, comparator)
    {
        var left = srcIndex;
        var leftEnd = min(left + width, srcEnd);
        var right = leftEnd;
        var rightEnd = min(right + width, srcEnd);

        for (var dstIndex = left; dstIndex < rightEnd; ++dstIndex) {
            if (right < rightEnd) {
                if (left >= leftEnd) {
                    dst[dstIndex] = src[right++];
                    continue;
                }

                var comparisonResult = comparator(src[right], src[left]);
                if ((typeof comparisonResult === "boolean" && !comparisonResult) || comparisonResult < 0) {
                    dst[dstIndex] = src[right++];
                    continue;
                }

            }

            dst[dstIndex] = src[left++];
        }

它的实现非常朴实, 和教科书上的归并算法没什么区别, 都需要使用额外的 O(n) 空间. 此外这里针对比较函数返回布尔值做了额外的判断, 以与其他引擎表现一致.

2.3 Firefox

相比之下 Firefox 浏览器使用的 SpiderMonkey 的sort则是用 C++ 实现的, 源代码在这里

template <typename T, typename Comparator>
MOZ_MUST_USE bool MergeSort(T* array, size_t nelems, T* scratch, Comparator c) {
  const size_t INS_SORT_LIMIT = 3;

  if (nelems <= 1) {
    return true;
  }

  /*
   * Apply insertion sort to small chunks to reduce the number of merge
   * passes needed.
   */
  for (size_t lo = 0; lo < nelems; lo += INS_SORT_LIMIT) {
    size_t hi = lo + INS_SORT_LIMIT;
    if (hi >= nelems) {
      hi = nelems;
    }
    for (size_t i = lo + 1; i != hi; i++) {
      for (size_t j = i;;) {
        bool lessOrEqual;
        if (!c(array[j - 1], array[j], &lessOrEqual)) {
          return false;
        }
        if (lessOrEqual) {
          break;
        }
        T tmp = array[j - 1];
        array[j - 1] = array[j];
        array[j] = tmp;
        if (--j == lo) {
          break;
        }
      }
    }
  }

  T* vec1 = array;
  T* vec2 = scratch;
  for (size_t run = INS_SORT_LIMIT; run < nelems; run *= 2) {
    for (size_t lo = 0; lo < nelems; lo += 2 * run) {
      size_t hi = lo + run;
      if (hi >= nelems) {
        detail::CopyNonEmptyArray(vec2 + lo, vec1 + lo, nelems - lo);
        break;
      }
      size_t run2 = (run <= nelems - hi) ? run : nelems - hi;
      if (!detail::MergeArrayRuns(vec2 + lo, vec1 + lo, run, run2, c)) {
        return false;
      }
    }
    T* swap = vec1;
    vec1 = vec2;
    vec2 = swap;
  }
  if (vec1 == scratch) {
    detail::CopyNonEmptyArray(array, scratch, nelems);
  }
  return true;
}

可以看到 SpiderMonkey 实现的sort方法是先把各元素三个一组进行插入排序, 之后再进行自底向上的归并排序. 其中merge的操作和前面 JSCore 里的差不多, 因此也需要额外 O(n) 的空间

2.4 TimSort

终于到了实现最复杂的 V8 引擎使用的 Timsort, 理论上 Timsort 可以分解成以下三步:

  1. 计算出minrun
  2. 遍历数组, 找到数组中存在天然有序的部分, 称之为run, 如果和所需排序的顺序相反, 就把它倒过来; 如果一个run的长度不足minrun, 则加上后续元素补足到minrun之后进行插入排序
  3. 合并现有的run
  4. 重复 2 和 3 直到整个数组都有序

显然, Timsort 是归并排序的一种变体, 因此它是稳定的. 它充分利用了大部分数据都是部分有序的假设. 对比快排, 它的好处则是稳定以及不会出现退化到 O(n^2) 的状况.

作为得到最多投入因此也是性能最好的 V8 引擎, 自然不会吝啬人力来实现这一复杂的排序算法. 查看 V8 源码, 你会发现他们在sort方法上花费了接近一千五百行代码. 不过好在有很多和本文的话题无关的代码, 接下来笔者将对其进行逐步分析.

首先是一点背景知识, V8 团队撰写这段代码的语言是 torque, 它是为 V8 的 JIT 引擎 turbofan 专门设计的一种类 TypeScript 语言.

sort方法定义在 1390 行的ArrayPrototypeSort, 其中调用了位于 1371 行的ArrayTimSort, 而它调用了位于 1257 行的ArrayTimSortImpl, 它是这个方法的主体代码所在, 其实现如下:

  transitioning macro
  ArrayTimSortImpl(context: Context, sortState: SortState, length: Smi) {
    if (length < 2) return;
    let remaining: Smi = length;

    // March over the array once, left to right, finding natural runs,
    // and extending short natural runs to minrun elements.
    let low: Smi = 0;
    const minRunLength: Smi = ComputeMinRunLength(remaining);
    while (remaining != 0) {
      let currentRunLength: Smi = CountAndMakeRun(low, low + remaining);

      // If the run is short, extend it to min(minRunLength, remaining).
      if (currentRunLength < minRunLength) {
        const forcedRunLength: Smi = SmiMin(minRunLength, remaining);
        BinaryInsertionSort(low, low + currentRunLength, low + forcedRunLength);
        currentRunLength = forcedRunLength;
      }

      // Push run onto pending-runs stack, and maybe merge.
      PushRun(sortState, low, currentRunLength);

      MergeCollapse(context, sortState);

      // Advance to find next run.
      low = low + currentRunLength;
      remaining = remaining - currentRunLength;
    }

    MergeForceCollapse(context, sortState);
    assert(GetPendingRunsSize(sortState) == 1);
    assert(GetPendingRunLength(sortState.pendingRuns, 0) == length);
  }

可以看到这里的实现和前面提到的理论差不多. 首先判断数组长度, 小于 2 则直接返回. 此后先计算minrun, 代码在 1177 行

  // Compute a good value for the minimum run length; natural runs shorter
  // than this are boosted artificially via binary insertion sort.
  //
  // If n < 64, return n (it's too small to bother with fancy stuff).
  // Else if n is an exact power of 2, return 32.
  // Else return an int k, 32 <= k <= 64, such that n/k is close to, but
  // strictly less than, an exact power of 2.
  //
  // See listsort.txt for more info.
  macro ComputeMinRunLength(nArg: Smi): Smi {
    let n: Smi = nArg;
    let r: Smi = 0;  // Becomes 1 if any 1 bits are shifted off.

    assert(n >= 0);
    while (n >= 64) {
      r = r | (n & 1);
      n = n >> 1;
    }

    const minRunLength: Smi = n + r;
    assert(nArg < 64 || (32 <= minRunLength && minRunLength <= 64));
    return minRunLength;
  }

虽然这里利用了位运算, 好在逻辑很清楚地写在了注释里. 注意一下当数组长度小于 64 时, 直接返回数组长度, 此时会对整个数组进行插入排序.

在计算出minrun之后, 开始遍历整个数组并计算各run. 计算过程可以参见前面的注释, 如果不够长时加上后面的元素进行二分查找插入排序, 篇幅起见这里就略过了. 之后把run推到栈上. 之后就开始对run进行合并, 合并逻辑定义在 1213 行.

  // Examines the stack of runs waiting to be merged, merging adjacent runs
  // until the stack invariants are re-established:
  //
  //   1. run_length(i - 3) > run_length(i - 2) + run_length(i - 1)
  //   2. run_length(i - 2) > run_length(i - 1)
  //
  // TODO(szuend): Remove unnecessary loads. This macro was refactored to
  //               improve readability, introducing unnecessary loads in the
  //               process. Determine if all these extra loads are ok.
  transitioning macro MergeCollapse(context: Context, sortState: SortState) {
    const pendingRuns: FixedArray = sortState.pendingRuns;

    // Reload the stack size because MergeAt might change it.
    while (GetPendingRunsSize(sortState) > 1) {
      let n: Smi = GetPendingRunsSize(sortState) - 2;

      if (!RunInvariantEstablished(pendingRuns, n + 1) ||
          !RunInvariantEstablished(pendingRuns, n)) {
        if (GetPendingRunLength(pendingRuns, n - 1) <
            GetPendingRunLength(pendingRuns, n + 1)) {
          --n;
        }

        MergeAt(n);
      } else if (
          GetPendingRunLength(pendingRuns, n) <=
          GetPendingRunLength(pendingRuns, n + 1)) {
        MergeAt(n);
      } else {
        break;
      }
    }
  }

虽然逻辑也非常复杂, 但好在注释也有很好的说明, 总而言之, 为了让每次合并的数组长度尽量接近, 对于连续的三个runX, Y, Z, Timsort 希望满足 |X| > |Y| + |Z| 和 |Y| > |Z|, RunInvariantEstablished正是用于检查第一条. 如果任意一条不满足, 则把 Y 和 X 与 Z 中长度较短的一个合并, 这里是借助于调用定义于 625 行的MergeAt, 它会合并第 n 和 n+1 个run.

  // Merges the two runs at stack indices i and i + 1.
  // Returns kFailure if we need to bailout, kSuccess otherwise.
  transitioning builtin
  MergeAt(implicit context: Context, sortState: SortState)(i: Smi): Smi {
    const stackSize: Smi = GetPendingRunsSize(sortState);

    // We are only allowed to either merge the two top-most runs, or leave
    // the top most run alone and merge the two next runs.
    assert(stackSize >= 2);
    assert(i >= 0);
    assert(i == stackSize - 2 || i == stackSize - 3);

    const workArray = sortState.workArray;

    const pendingRuns: FixedArray = sortState.pendingRuns;
    let baseA: Smi = GetPendingRunBase(pendingRuns, i);
    let lengthA: Smi = GetPendingRunLength(pendingRuns, i);
    const baseB: Smi = GetPendingRunBase(pendingRuns, i + 1);
    let lengthB: Smi = GetPendingRunLength(pendingRuns, i + 1);
    assert(lengthA > 0 && lengthB > 0);
    assert(baseA + lengthA == baseB);

    // Record the length of the combined runs; if i is the 3rd-last run now,
    // also slide over the last run (which isn't involved in this merge).
    // The current run i + 1 goes away in any case.
    SetPendingRunLength(pendingRuns, i, lengthA + lengthB);
    if (i == stackSize - 3) {
      const base: Smi = GetPendingRunBase(pendingRuns, i + 2);
      const length: Smi = GetPendingRunLength(pendingRuns, i + 2);
      SetPendingRunBase(pendingRuns, i + 1, base);
      SetPendingRunLength(pendingRuns, i + 1, length);
    }
    sortState.pendingRunsSize = stackSize - 1;

    // Where does b start in a? Elements in a before that can be ignored,
    // because they are already in place.
    const keyRight = UnsafeCast<JSAny>(workArray.objects[baseB]);
    const k: Smi = GallopRight(workArray, keyRight, baseA, lengthA, 0);
    assert(k >= 0);

    baseA = baseA + k;
    lengthA = lengthA - k;
    if (lengthA == 0) return kSuccess;
    assert(lengthA > 0);

    // Where does a end in b? Elements in b after that can be ignored,
    // because they are already in place.
    const keyLeft = UnsafeCast<JSAny>(workArray.objects[baseA + lengthA - 1]);
    lengthB = GallopLeft(workArray, keyLeft, baseB, lengthB, lengthB - 1);
    assert(lengthB >= 0);
    if (lengthB == 0) return kSuccess;

    // Merge what remains of the runs, using a temp array with
    // min(lengthA, lengthB) elements.
    if (lengthA <= lengthB) {
      MergeLow(baseA, lengthA, baseB, lengthB);
    } else {
      MergeHigh(baseA, lengthA, baseB, lengthB);
    }
    return kSuccess;
  }

合并算法也不同于普通的普通的归并算法. 在合并 X, Y 两个数组时, 首先通过GallopRight找到 Y 中第一个元素在 X 中的位置. GallopRight实现了指数搜索, 共可分为两步:

  1. 确定元素属于[2^j, 2^(j+1)]的区间
  2. 在这个区间内进行二分查找

GallopRightGallopLeft的主要区别在于当遇到相同的元素(在已排序的数组中也必然是连续的)时, 返回这段数据的右边还是左边.

之后再使用GallopLeft找到 X 中最后一个元素在 Y 中的位置, 这样只要对这两个位置中间的数组进行归并即可. MergeLow实现较为复杂, 此处就不给出实现了. 不过其中还有一个优化是, 如果多次归并时连续地从 X 或 Y 中取, 这往往意味着数据存在一定的模式, 因此接下来很有可能还会从同一个数组中取, 因此归并会转换成前述的Gallop模式.

读到这里你可能意识到为什么 Timsort 这么快了: 它投入了如此多种类的优化, 以致于简单描述一下它的实现都要花费这么多篇幅. 如果你还想知道更多, 可以参看 Tim 本人的文章

© 2016 - 2022Austaras Devas