Apps using Tez have the ability to determine the number of tasks reading the initial external data for a job (the number of mappers in MapReduce parlance). Here is a short description of how that works.
First, Tez tries to find out the resource availability in the cluster for these tasks. For that, YARN provides a headroom value (and in future other attributes may be used). Lets say this value is T.
int totalResource = getContext().getTotalAvailableResource().getMemory(); |
Next W is multiplied by a wave factor (from configuration - tez.grouping.split-waves) to determine the number of tasks to be used. Lets say this value is N.
int taskResource = getContext().getVertexTaskResource().getMemory(); float waves = conf.getFloat( TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES, TezSplitGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT); int numTasks = (int)((totalResource * waves)/taskResource); |
If this value is between tez.grouping.max-size & tez.grouping.min-size then N is accepted as the number of tasks. If not, then N is adjusted to bring the data per task in line with the max/min depending on which threshold was crossed.
if (lengthPerGroup > maxLengthPerGroup) { // splits too big to work. Need to override with max size. int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1; ... } else if (lengthPerGroup < minLengthPerGroup) { // splits too small to work. Need to override with size. int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1; |
For experimental purposes tez.grouping.split-count can be set in configuration to specify the desired number of groups. If this config is specified then the above logic is ignored and Tez tries to group splits into the specified number of groups. This is best effort.
int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0); if (configNumSplits > 0) { // always use config override if specified desiredNumSplits = configNumSplits; |
Here is the detailed explanation of grouping algorithm (TezSplitGrouper.getGroupedSplits).