Presto rate limitings

Presto rate limitings

June 24, 2022
SummaryAs a distributed query engine for big data that is shared across multiple stakeholders within Careem, Presto required a rate limiting solution to ensure reliability and allocation of resources. The combination of two rate limiting filters will restrict any misuse of the service by a single user and help us to optimize our capacity planning as well as ensure greater governance and proper federation of resources. 
TeamData & AI
Author(s)Salman Patel,  Omkar Thombre,  Mufaddal Jamal 

Introduction

At Careem we encourage everyone to “talk data”. Our tools are built in a way to make data accessible to most colleagues across the company. Our founders have written SQLs themselves to get insights from data. With such broad usage of data systems and very open access, ensuring fair usage of resources has been a challenging task.

Presto, the distributed query engine for big data that uses the SQL query language, became the tool of choice for ad hoc data analysis at Careem. Paired with dashboarding tools, Presto is a valuable platform to quickly analyze data. Many data analysts and data scientists use Jupyter Notebooks consistently to write automated reports and schedule queries. This makes Presto one of the most critical services to serve as a backbone of data access for the majority of our colleagues.

Why Rate Limiting?

The tussle between bots and humans is not new. Apart from hoarding GPUs and PS5s, unrestricted bots (scripts) can also starve critical resources, in our case Presto. Some teams make heavy use of scripts and directly connect to our Presto clusters. At the start of the week a lot of queries would essentially choke our clusters. Scripts keep retrying but it’s the ad hoc use cases and real user experience that ended up suffering a great deal. The power of scripts needed to be… limited.

Presto service became the culprit of the power of scripts and we urgently need to implement a solution at scale for all the colleagues. Presto is used by multiple stakeholders within Careem in order to get various data points from a combination of data sources. It is considered a critical resource to everyone at Careem. As Presto is being shared across multiple teams and users, the objective is to implement the right allocation and a proper governance. We have had instances in the past as depicted in the above graph where a single user was unintentionally choking a complete cluster by submitting many requests in bursts. Eventually all those repeated requests ended in a queue and all users including the given user faced a considerable delay by not getting results in time. In order to ensure Presto service availability and reliability for all users, we introduced rate limiting features on top of Presto service. Rate limiting feature intercepts each request and makes a binary decision whether to allow or reject  requests to pass through based on limits defined.

Overview of Presto Gateway 

Presto is used for query execution by many Careem colleagues in multiple ways. Introduction of  Presto Gateway helps in catering higher load by instantiating multiple Presto clusters behind a Gateway. This will ensure that while we do have multiple clusters, users will have a single endpoint to access Presto and not worry about which cluster query lands to.

We have multiple Presto gateway instances and requests can land on any of the instances based on round robin fashion.  

  • One of the topmost functions of a gateway is to work as a proxy, receiving a query from the user and passing it on to the appropriate Presto cluster.
  • Presto Gateway being an entity in the middle service opens doors to do many things before we pass it to any Presto cluster. 
  • We can make decisions regarding the selection of Presto clusters. 
  • We can also employ rate limiting features on the gateway and block requests at the gateway itself rather than allowing them to engage the Presto queues.  

Rate Limiting Service 

Every request submitted to be executed on a Presto cluster passes through the Presto gateway. We then decided to place a rate limiting service as part of the Presto gateway.

For every passing request, the rate limiting service makes either of the two decisions below:

  1. Allows a request to be submitted for execution through gateway proxy
  2. Rejects a request and sends an appropriate error message to user

We have decided to implement a rate limiting service in two steps that will be described in detail in the below sections.

Traditional rate limiting is a time interval based rate limiting also called API rate limiting. Rules for such rate limiting would be for instance k1 requests in n1 seconds and k2 requests in n2 minutes. API rate limiting will make sure that Presto is not subject to bursts of queries in a very small time window.

Considering the behavior of Presto, each submitted query request can take a very different runtime and Presto resources based on submitted query. This means that Api rate limiting will be unfair to users who submit fast queries which take less Presto resources. 

Our perspective is to allow more queries from a user who submits fast queries versus a user with slow queries which consume resources for longer duration in a given time frame. We then implemented an additional filter which specifically monitors already submitted queries and ensures the user has no more than k concurrent queries present in a Presto cluster. 

API Rate Limiting

The API Rate Limiter is the first filter applied on each incoming request. The request has to pass through this filter before it gets submitted to Presto. The API rate limiter will ensure that users submit queries in bursts. The rule for this filter looks like “n1 requests in t1 seconds AND n2 requests in t2 seconds AND…“. This filter will ensure that Presto resources are available to all users fairly.

Concurrent Running Queries 

Since individual requests or queries through Presto can have very different execution times, we need to apply another filter on top of the API Rate limiter. This filter will ensure that, if a user is submitting a lower number of requests that are qualified as slow, then that user should not be allowed to execute more than k queries at the same time. This enables  proper governance of users submitting fast queries to execute more queries compared to the one  submitting slow queries.

Proxy

Once requests pass through, both the filters proxy take care of dispatching them to the appropriate Presto cluster. Current implementation of Presto dispatch is on a round robin basis but we have the flexibility for custom logic to pick an appropriate Presto cluster.  Once selection is made, the target host is modified in order to submit this request to the chosen Presto cluster.

Cache

Cache is the place where we preserve data required by both the filters. For the API rate limiter we need to store timings of submitted requests, while for the second filter, we need information about the number of queries executed on Presto for a user. Considering that we have more than one Presto gateway machine, we need this information to be shared across all machines as AWS elasticache is the selection we made for our implementation. 

Active Monitor 

Concurrent running queries filter needs information about the number of executing queries on Presto for a given user to be available in cache in order to evaluate a rule. This information is available with the Presto cluster and we need a mechanism to extract it and make it available in cache. Active monitor component is the component which runs on all Presto gateway machines at fixed time intervals. It invokes Presto queries API to store information in a cache.

Presto Cluster

Presto cluster is the final destination for any submitted request where actual execution happens. 

Sequence diagram depicting interaction among various objects. It shows what happens with a request once it is submitted by a user.

Filter: API Rate Limiting

The API Rate Limiting filter will ensure that users are not submitting queries in bursts and that they are instead scattered uniformly across time frames. The filter will help to reduce overall load on the Presto gateway controller, which can be a single point of failure. 

Filtering Rules 

  • We have defined a set of rules and all requests should obey these rules in order to pass on to the next stage. Rule is a combination of one or more conditions like n1 requests in t1 time duration AND n2 requests in t2 time and so on.
  • For example – 1 request in 1 sec AND 3 requests in 5 sec. In that case both rules have to be implemented. These rules are configurable at runtime and can even be applied at different granularities.

Implementation

  • Implementation of the API Rate Limiting filter requires us to keep track of historical requests with their respective timestamps in a shared cache. 
  • Whenever a request is submitted, this information will help us to evaluate rules defined for this filter across multiple time frames.

Cache

  • Entry – [user-key, sorted-set-of-timestamps]
  • Key – string a combination of presto user + redash user + “suffix”
  • Value – sorted set of timestamps for recently allowed requests
  • Example :
presto_global_redash:[email protected] -> [1642068002, 1642068020, 1642068025]
  • We do make sure that keys are deleted after a fixed expiration time
  • We also delete timestamps within each entry periodically. This will ensure we are using cache optimally.

Filter Algorithm

  • Each incoming request is evaluated against the below algorithm which returns either true or false. 
  • In the algorithm, we loop over all the rules as defined in the config. 
  • For a given rule, e.g. ‘n requests in t secs’: we take a timeline of t secs and get a count of requests submitted in the last t seconds. That number is evaluated against n to take a decision.

Error Message

  • Users will be given appropriate messages whenever the rate limit service rejects a request. It will give feedback with limits that the user has been granted.  

Filter: Concurrent Running Queries 

While API Rate Limiting is efficient and handles large bursts requests, it is completely based on the number of requests in a given time frame. It only considers request count and blocks users from submitting queries in bursts. But considering the behavior of our system where each request or query can have varying execution time, deploying only an API Rate Limiting is not enough. We also would like to consider the duration of the queries. If a user runs heavy queries then we apply another filter in which we restrict users to allow a maximum of k concurrent executing queries. 

Filtering Rules 

  • On each request submission, the filter checks running queries by a given user in order to make a decision whether to allow or deny a given request.

Implementation

  • On each request we need to know the number of running queries for that user at a given time. 
  • We achieve this by deploying an independent component called active query monitor. 
  • Active monitor periodically queries a Presto API and updates a distributed cache. 
  • Each entry in a distributed cache will give information about concurrent running queries by a given user.
  • When a request comes we check the current value against the limit we have set. 
  • If a new request crosses that limit then we reject the request or else we will allow it to pass through.

Cache Entry

  • Entry – < user-key : # of running queries >
  • Key – string a combination of presto user + redash user
  • Value – string representing running queries on presto by a given user

Example: 

presto_global_redash:[email protected] -> 3

Error Message

  • Users will be given appropriate messages whenever the rate limit service rejects a request. It will give feedback with limits that the user has been granted.

Rules Overrides

Filtering rules are given to a service as configuration parameters. Rules can be set by either invoking an API or by directly inserting into MySql database. Configuration can be altered at runtime and it is reflected in the gateway-application  every minute.

Considering the fact that different users or teams will have different Presto usage, we have implemented a mechanism to define rate limiting rules at different granularities. We have also defined priority among them so that user level rule (if available) will be prioritized over global rule.

Priority is defined as below:

  1. User level
  2. User type level
  3. Global level

Configs Formats

User level
Format<name-of-property>:<presto-username>:<redash-username>
Examplesallowed_concurrent_executing_queries:presto_global_redash:username@careem.comlimit override for allowed_concurrent_executing_queries property and redash user identified by presto_global_redash presto username and [email protected] redash usernameallowed_concurrent_executing_queries:presto_customer_growth_team:NAlimit override for allowed_concurrent_executing_queries property and ETL user identified by presto_customer_growth_teamrate_limit_rules:presto_global_redash:Scheduledlimit override for rate_limit_rules property for redash scheduled users identified by presto_global_redash presto username
User Type level
Format<name-of-property>:<user-type>
Examplesallowed_concurrent_executing_queries:Redashlimit override for allowed_concurrent_executing_queries property on all redash ad-hoc queriesrate_limit_rules:Scheduledlimit override for rate_limit_rules property on all Redash scheduled queriesrate_limit_rules:ETLlimit override for rate_limit_rules property on all ETL user queries
Global level
Format<name-of-property>
Examplesallowed_concurrent_executing_queriesrate_limit_rules

Monitoring

We are collecting a variety of metrics to visualize the performance of rate limiting features through dashboards.

A few examples of metrics that we collect are:

  • Submitted/allowed/rejected request counts per filter
  • Distribution of allowed requests by type (allowed by rule, allowed as Presto username was missing, allowed as user is given exception etc)
  • Cache access latency
  • Active monitor updation frequency

Snapshot of dashboard

Throughout the monitoring setup, we detected that a single user was misusing our API and blocked it accordingly.

Conclusions

By deploying rate limiting features, we were able to ensure optimal resource allocation and reliability of presto resources. Rate limiting also played a crucial role in helping us define better capacity planning along with finer governance of Presto. Rate limiting solutions will also enforce Presto users to plan for their usage.