Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The key data structure used in fetching shuffle data is the “results” queue in ShuffleBlockFetcherIterator, which buffers data that we have in serialized (and maybe compressed) form, but haven’t yet deserialized / processed.  The results queue is filled by many threads fetching data over the network (the number of concurrent threads fetching data is equal to the number of remote executors we’re currently fetching data from) [0], and is consumed by a single thread that deserializes the data and computes some function over it (e.g., if you’re doing rdd.count(), the thread deserializes the data and counts the number of items).  As we fetch data over the network, we track bytesInFlight, which is data that has been requested (and possibly received) from a remote executor, but that hasn’t yet been deserialized / processed by the consumer thread.  So, this includes all of the data in the “results” queue, and possibly more data that’s currently outstanding over the network.  We always issue as many requests as we can, with the constraint that bytesInFlight remains less than a specified maximum [1].

In a little more detail, here’s exactly what happens when a task begins reading shuffled data:

(1) Issue requests [2] to fetch up to maxBytesInFlight bytes of data [1] over the network (this happens here).

These requests are all executed asynchronously using a ShuffleClient [3] via the shuffleClient.fetchBlocks call [4].  We pass in a callback that, once a block has been successfully fetched, sticks it on the “results” queue.

(2) Begin processing the local data.  One by one, we request the local data from the local block manager (which memory maps the file) and then stick the result onto the results queue.  Because we memory map the files, which is speedy, the local data typically all ends up on the results queue in front of the remote data.

(3) One the async network requests have been issued (note — issued, but not finished!) and we’ve “read” (memory-mapped) the local data (i.e., (1) and (2) have happened), ShuffleBlockFetcherIterator returns an iterator that gets wrapped too many times to count [5] and eventually gets unrolled [6].  Each time next() is called on the iterator, it blocks waiting for an item from the results queue.  This may return right away, or if the queue is empty, will block waiting on new data from the network [6].  Before returning from next(), we update our accounting for the bytes in flight: the chunk of data we return is no longer considered in-flight, because it’s about to be processed, so we update the current bytesInFlight, and if it won’t result in > maxBytesInFlight outstanding, send some more requests for data.

————————————————

...