Motivation

In a multi-framework environment where frameworks have different SLA requirements (production vs. development, service vs. batch), it would be nice if Mesos can protect the high-SLA frameworks' throughput by throttling other frameworks' QPS.

Mesos Framework API Rate Limiting allows operators to specify the rate (i.e., queries per second) Mesos Master processes messages from a particular framework (can be a group of frameworks, see details about principals below) so individual frameworks will not overwhelm the Master.

Requirements

  • Should provide a way for operators to monitor the framework QPS.
    • So they'll have some idea about what values to use to configure the limiter, how much traffic a framework has put on the Master and see how frameworks react to the throttling.
  • (Stage 2, not in current scope) Should support online tuning of rate limits (add/delete/update).
  • (Stage 3, not in current scope) The rate limit configuration should survive Master failover.

Usage

The usage notes are published here.

Design for the Current Scope

Protobuf Format

The user-specified configuration in JSON format is converted into a protobuf object which is used to initialize the RateLimiters. The ProtoBuf format is published here.

Rate Limiting in Master

Mesos already has a RateLimiter implementation that queues up incoming requests via a Future-based interface and service them at a configured rate. The majority of the work for MESOS-1306 is to have the limiters correctly configured for each framework, use them in Message handling, plus a few improvements on RateLimiter.

Setting up rate limiters and counters

Two-level mapping: setting up mappings from principal to RateLimiter and Counters

  • Rate Limiters are created statically at startup.
    • Master loads the protobuf RateLimits object Master::rates via flags.
    • RateLimiters are created according to the flags when Master initializes and stored in <principal, RateLimiter> limits map. They are kept around until Master terminates.
  • Counters are added when frameworks are added and removed when frameworks are removed.
    • A framework can be added as the result of both registration and reregistration and removed as the result of scheduler-initiated unregistration or failover timeout.
    • Counters are added to a <principal, Metrics::Frameworks> frameworks map when it is the first framework with such principal (to register with Master) and removed from it when it's the last one with such principal.

Two-level mapping: setting up mappings from framework UPID to principal

  • When a framework registers
    • A <UPID, principal> pair is added to frameworks.principals.
    • This mapping is used by both the counters and the RateLimiter is one is configured for this principal.
  • When a framework fails over, the the <UPID, principal> is updated from the old UPID to the new UPID. 

Throttling coming messages

  • In Master::visit() Master has the message's from UPID and it looks up the limiter and the counters by querying the <UPID, principal>, <principal, RateLimiter>, <principal, Metrics::Frameworks> maps.

  • The framework is throttled by the default RateLimiter if:

    1. the default RateLimiter is configured (and)

    2. the framework doesn't have a principal or its principal is not specified in 'flags.rate_limits'.

  • The framework is not throttled if:
    1. the default RateLimiter is not configured to handle case 2) (or)
    2. the principal exists in RateLimits but 'qps' is not set.

Export framework QPS

(Stage 0)

  • There is currently no built-in way to export "rates".
    • MESOS-1338 adds all Message types, not only framework messages but also slave messages on Master.
  • We should add Message counters on a per-framework basis and the observability client can derive the framework API rate from the counters.
    • Master::Metrics maintains hashmap<string, Metrics::Frameworks> frameworks in Master::Metrics.
    • When a framework is added to Master, Master exports "frameworks/<principal>/messages_received" and "frameworks/<principal>/messages_processed" counters to MetricsProcess.
    • The messages_received counter is incremented (In Master::visit()) before the delay due to throttling if the message is to be throttled
      • of course before the processing of this message
    • The messages_processed counter is incremented after the processing of the message (In Master::_visit()).
      • of course after the delay due to throttling.
      • Also due to Master's asynchronous nature, this doesn't necessarily mean the work requested by this message has finished.

    • These counters only count messages from active scheduler instances while they are registered.
      • Messages prior to the completion of (re)registration (AuthenticateMessage and (Re)RegisterFrameworkMessage) and messages from an inactive scheduler instance (after the framework has failed over) are not counted.

Implementation Notes

Classes

This diagram shows only the changes to these classes

Framework Rate Limiter

Expose the authenticated principal through Authenticator::authenticate() result

  • The only way to reliably detect the framework's principal is to have Authenticator expose that information.
  • See ticket: MESOS-1383.

Augment FrameworkInfo

The changes to be made to messages.proto.

message FrameworkInfo {
+  optional string principal;
+  optional string label; // Not in the initial implementation.
}
  • With MESOS-1373, a (UPID, principal) pair is added to Master::authenticated after authentication.
  • The framework can then set FrameworkInfo::principal and FrameworkInfo::label in registration.
  • When framework registers:
    • If Master requires authentication:
      • If FrameworkInfo::principal is set and doesn't match Master::authenticated, Master denies its registration request.
      • If FrameworkInfo::principal is not set, we can use the principal in Master::authenticated to initialize the limiter and the counter.
        • A <UPID, counter> pair is added to Master::Metrics::framework_messages.
        • If the principal matches an entry in RateLimit config object rates, a limiter is added to (UPID, RateLimiter) limiters.
      • If FrameworkInfo::principal is set and matches Master::authenticated, we use that value to initialize the limiter and the counter.
    • If Master doesn't require authentication:
      • If FrameworkInfo::principal is set, Master takes FrameworkInfo::principal without verification and adds counter and limiter as such.
      • If FramworkInfo::principal is not set, no counter or limiter is added.

Throttling MessageEvents

With the current libprocess, MessageEvent is not copyable. It and its member Message are both created on the heap and destroy after ProcessManager::resume() has serviced the event.

To use limiter to throttle the event, MessageEvent needs to be made copyable and its copy constructor needs to copy the Message member so that the original event can be deleted safely and the new copy of the event can be bound to a  Deferred.

Issues

Dropped Requests

  • If RateLimiters drop messages then clients should rely on Master responses to establish causal relationship between actions.
  • This can cause problems if framework developers don't realize this.
  • In the initial implementation the messages are not dropped. The queue in RateLimiters can grow unbounded until the process is OOMed. This is an acceptable behavior for now and the Master will just fail over to another instance.

Out of Order Requests

  • If MessageEvents are throttled and ExitedEvents are not. A messages that is sent from a scheduler prior the exit of its process

Exclude a framework from throttling

Frameworks are not throttled if:

  • They are not defined explicitly in the config file.
  • There is no 'aggregate_default_qps' specified.

How to Identify a Framework?

Candidates:

  • Existing FrameworkInfo::name: Easiest but maybe unsuitable because users may expect it to be free-form.
  • Add a FrameworkInfo::source (a la ExecutorInfo::source): the client needs to specify it.
  • (Chosen) FrameworkInfo::principal and FrameworkInfo::label combination. FrameworkInfo::principal is verified against the principal is authenticates itself with, if Master requires framework authentication. We can fine tune the configuration: for the same principal, different labels have different rate limits.

Implementation Stages

  • Stage 0: Export framework API call counts. MESOS-1305
  • Stage 1: Initial framework rate limiting implementation. Set rates via flags, no storage of rates, no online tuning. MESOS-1350

Our current scope is limited to stage 0+1.

  • Stage 2: Set rates via HTTP endpoints and store it in replicated log. Support online tuning. After Master failover it reads the config from replicated log but still uses the flag value.
  • Stage 3: After Master failover, read configuration from log and use that value.

Testing

Code Tests

LoadGeneratorScheduler

  • LoadGeneratorScheduler sends many empty ReconcileTasksMessages to Master in at a configured rate (which needs to be higher than the rate Master throttles it at) and it will eventually OOM the Master and cause a failover.
  • If the scheduler keeps running, after the Master failover it keeps bombarding the new Master. The failover continues as Masters takes leadership in a round-robin manner.
  • To run this tests continuously without disruption, we can
    • Configure the scheduler to run for a duration which doesn't OOM the master with the (load generator rate, Master throttling rate) combination.
    • At the end of test, send a Message that expects a response from the Master. A response from Master will indicate the completion of processing previous messages.
    • Monitor the message received/processed rate on Master and ensure it's in the expected range.

Appendix: Design and Implementation Notes for Out of Scope Features

Online Tuning of Rate Limits

Configuration via Authenticated HTTP endpoint

(Stage 2-3)

POST http://<host:port>/ratelimits with body data the same as rates.json .

  • When two mechanisms are both used, the HTTP endpoint mechanism has precedence over flags.
    • This supports such a use case
      • All Masters have the initial rates set via flags
      • Operators set new rates
      • After the failover the new Master's flags have the old configs but will read the new configs from the replicated log.
    • When HTTP endpoint is implemented the flags is not "required" anymore but perhaps worth keeping around.
  • Depends on MESOS-1131.

Configuration Persistence and Recovery for Master Failover

Store the RateLimits protobuf configuration in the replicated log.

  • (Stage 2) We can pass the State instance created in master/main.cpp into Master and use it to store the object whenever it is updated.
    • The object is wrapped as Option<Variable<RateLimit> > Master::rates.
    • There is a TODO for implementing a "a generic abstraction for applying batches of operations against State Variables" which can be leveraged but I don't think it's strictly necessary.
    • As a result, both state and registrar (which also takes the state instance) are passed into Master.
  • (Stage 3) When Master fails over to a new instance it recovers the "ratelimits" variable from the state similar to (and alongside) Registrar's recovery process.
  • It then updates the data structures in Master to set up the rate limiters. See "Setting up rate limiters".

Add "label" to Configuration

  • As a followup to the current scope: a label can be optionally used to fine tune the limiting criteria so that only frameworks with such a label (in additional to matching the principal, both are also specified in FrameworkInfo) are throttled by this rate.

Online Tuning of Rate Limits

(Stage 2-3)

  • This is done through the HTTP /ratelimits JSON endpoint, the same way initial setup is done.
    • That is, you cannot tune/add the rate limit for an individual framework but rather, update the entire configuration.
  • Master first stores the new config object to replicated log.
  • If storage is successful, Master then goes through limiters map and call limiter.set(rate) on each of them.
  • In this process, new limiters are added and old limiters are deactivated (drained) and then deleted. (Need to add such functionality in RateLimiter)
    • Pending events still exist in the limiters!

Limiter Improvements

(Stage 2-3, changes unnecessary for Stage 1)

  • Add Queue capacity: When a framework's requests exceed the limiter capacity the limiter returns a failure so that Master can discard the incoming request.

    • Need to be cautious about dropping messages. See "Out of Order or Dropped Requests".

  • Mutability: For online rate tuning. Add method set(int permits, const Duration& duration).

    • Not making capacity tunable at this time.

  • Drain limiter: if we want to stop limiting a framework, we need drain the existing messages in the limiter.

    • If the limiter is terminated without draining, all the pending requests are discarded.


  • No labels