This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • Shuffle Internals

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Notes:

[0] Note that these threads consume almost no CPU resources, because they just receive data from the OS and then execute a callback that sticks the data on the results queue.

[1] We limit the data outstanding on the network to avoid using too much memory to hold the data we’ve fetched over the network but haven’t yet processed.

[2] Each request may include multiple shuffle blocks, where is a "block" is the data output for this reduce task by a particular map task.  All of the reduce tasks for a shuffle read a total of # map tasks * # reduce tasks shuffle blocks; each reduce task reads # map tasks blocks.  We do some hacks to try to size these requests in a "good" way: we limit each request to about maxBytesInFlight / 5, so that we can fetch from roughly 5 machines concurrently without exceeding maxBytesInFlight.  5 is completely a magic number here that was probably guessed by someone long long ago, and it seems to work ok.

[3] The default configuration uses NettyBlockTransferService as the ShuffleClient implementation (note that this extends BlockTransferService, which extends ShuffleClient).

[4] If you’re curious how the shuffle client fetches data, the default Spark configuration results in exactly one TCP connection from an executor to each other executor.  If executor A is getting shuffle data from executor B, we start by sending an OpenBlocks message from A to B.  The OpenBlocks message includes the list of blocks that A wants to fetch, and causes the remote executor, B, to start to pull the corresponding data into memory from disk (we typically memory map the files, so this may not actually result in the data being read yet), and also to store some state associated with this “stream” of data.  The remote executor, B, responds with a stream ID that helps it to identify the connection.  Next, A requests blocks one at a time from B using an ChunkFetchRequest message (this happens here in OneForOneBlockFetcher, which calls this code in TransportClient; currently, we have a one-to-one mapping from a chunk to a particular block).  It’s possible that there are many sets of shuffle data being fetched concurrently between A and B (e.g., because many tasks are run concurrently).  These requests are serialized, so one block is sent at a time from B, and they’re sent in the order that the requests were issued on A.

[5] In BlockStoreShuffleFetcher, which handles failures; then in HashShuffleReader, which helps aggregate some of the data; etc.

[6] This happens in BlockManager.putIterator, if the RDD is going to be cached; in the function passed in to ResultTask, if this is the last stage in a job; or via the writer.write() call in ShuffleMapTask, if this is a stage that generates intermediate shuffle data.

[7] We time how long we spend blocking on data from the network; this is what’s shown as “fetch wait time” in Spark’s UI.