Optimus: dynamic dependency management

Optimus: dynamic dependency management

January 23, 2024
SummaryDynamic dependency management is the process of managing all tasks execution with the use of an external coordination service. We get key benefits in the execution of tasks like idempotent execution, ease of operability, etc.
TeamData & AI
Author(s)Jatin Kumar
About the Author(s)Jatin Kumar is a Staff Software Engineer at Careem, working in the Data & AI team.

Introduction

Careem is growing day by day, and so is the data generated from various services and sources. In order to get the most information from this generated data, we need to provide a tool to create a holistic view of this data. One of the tool’s major challenges was to allow users to onboard view creation logic with minimal effort while also handling massive amounts of data. Optimus, an in-house ETL solution, solves both of these problems and many more, like dynamic dependency management and table availability while writing or updating new or existing data, etc. We have a couple of open source solutions like DBT, Marmaray, Gobblin, etc., but they didn’t fit our requirements.

Architecture

Dynamic dependency management

In its current state, Optimus supports internal dependency management for a given batch configuration, which works for small loads (50+ ETL jobs in a single batch). With the increase in data and the inception of new KPIs, running everything as part of a single batch is not efficient. Due to this reason, the team has distributed load across more than 10+ clusters, and it is expected to increase when we start distributing Optimus as a service. We use the EMR service of AWS to run Optimus in create and destroy mode. We have to execute pipelines at different times of the day, so having a long-running cluster will be costlier and not optimized as well (except for real-time pipelines). So we decided to create a smaller cluster as per our needs and destroy it after the pipeline finishes. We will have cost benefits, as it will be cheaper and can support different execution times.

Features supported in the current state

  • Users can submit a list of ETL jobs by adding a single config file.
  • Users can define dependencies within a batch.
  • Users can specify concurrency levels.
  • Users can run a specific set of ETL jobs.
  • Users can retry ETL jobs as many times as they desire.

Challenges in the current state

  • No dependency management between applications that are running on different clusters
  • Cross-dependencies within batches executing on different clusters are not honored.
  • Users are not permitted to run multiple batches concurrently on separate clusters with interdependencies.Inter-batch dependencies are not supported, and results will not be consistent.
  • The freshness criteria of an upstream ETL dependency cannot be specified by users, and the default is T0, which is the start of the current day. 

Proposed state

As a part of this, we will present a mechanism that will allow global dependency management while maintaining scalability and resilience.

Implementation design

On critically examining the above-stated issues, the problem can be solved using two levels of state management while maintaining consistent coordination between sets of clusters, as stated below.

  • Internal state: User-provided configuration is converted to an in-memory graph; no task shall be executed unless its parent task is completed.
  • External state: Each task exposes its state to an external system using some coordination service; this information is used to fetch the run-time status of tasks that are part of other clusters. Zookeeper is used to maintain this state in the form of ephemeral nodes.

Detailed flow

The steps involved in the proposed data flow are listed below.

  • Check if the data has already been processed. If yes, then mark this as finished.
  • Before processing, try creating a corresponding zkNode (ephemeral) path.
    • If a node is already present, then it means some other task is already running on some other cluster. Enqueue this task to an in-memory queue, where a consumer thread will periodically check for its completion.
    • If there is no node, create one and schedule this task.
  • After the task is completed, update the final status in the in-memory graph, which will trigger execution of the downstream path.

Note: All above communications are happening in a completely asynchronous mode.

def exec(task: Executable): Future[Unit] =
 isTaskBlacklisted(task.id) match { 
// check if the current task is blacklisted before this job started
   case true => { skipTask(task)}
   case false => {
     task.getDependencies() match {
       case map if map.isEmpty => { execAndUpdate(task).map(_ => ())}
       case parent_tasks => {
         isETLFresh(task) match {
           case true => execAndUpdate(task).map(_ => ())}
           case false => {
             val parentTasks = parent_tasks
               .map({ case (id, taskConfig) =>
                 ExecutablePlaceHolder(appConfig, id, taskConfig)
               }).toList 
// a list of all the parent tasks (specified in the getDependencies) as dummy executables
             val compactedParentTasks = parentTasks.filterNot(isETLFresh) 
// filter out already fresh parents
             val compactedParentIds = compactedParentTasks.map(_.id) 
// filter out already fresh parents
             compactedParentIds match {
               case Nil => execAndUpdate(task).map(_ => ()) 

// No need to check if the parents have skipped or not in the last run}
               case _ => {
                 compactedParentIds
                   .filter(!execMap.keys.toList.contains(_))
                   .map(id => {
                     

ExternalExecutablePlaceHolder(appConfig, s"${task.id}#$id", id, parent_tasks.getOrElse(id, Map.empty))})
                   .map(x => x.id -> (x, Promise[Unit]))
                   .collect {
                     case (id, (t, p)) => {
                       externalETLDependencyInMemory.offer(ExternalTaskDependencyMetaInfo(t)) 
// Insert external dependency
                       execMap = execMap.updated(id, ExecTuple(t, p, p.future))
                       sendToKafka(t.id, NodeStatus.ENQUEUED, "external task enqueued")}}}}}}}

Orchestration (Airflow):

We generate airflow dag dynamically and run tasks in parallel on different clusters. 

Key benefits

  • Idempotent execution: The user can submit the same task as many times as they desire. A new instance will only be launched if execution was unsuccessful. This pre-condition requires a notion of self-aware freshness criteria; by default, it is assumed to be today’s date. As a result, if a user submits the same task multiple times, only one execution is performed and the others are skipped.Note: The user can define custom freshness criteria for the task as well as its parent task, which means the user can force run the job by defining the freshness criteria as the current time.
  • Optimus on-demand: Users can submit tasks on demand without worrying about the upstream task execution status.
  • Ease of operability: The support team doesn’t need to manually identify execution graphs and start tasks anymore. We run our whole pipeline through airflow, in which each dag runs one JSON that consists of more than 30 jobs. We can do a complete restart of the airflow dag, and that will suffice.
  • Cost Benefits: Using multiple EMR clusters with a create and destroy mechanism provides cost optimization by paying only for utilized compute resources, elastic scaling based on workload demands, resource isolation for efficient performance, and improved resource utilization by avoiding idle clusters.

Updating the dependencies

We have introduced dependenciesWithMeta property in optimus config that will help in dependency management across jobs that may run in different clusters.
Sample dependenciesWithMeta:

"dependenciesWithMeta": {
    "Task_1": {
      "freshnessCriteriaInHours": 24
    },
    "Task_2": {
      "freshnessCriteriaInHours": 168
    },
    "Task_3": {
      "freshnessCriteriaInHours": 96
    },
    "Task_4": {
      "freshnessCriteriaInHours": 72
    }
}

If any of the task’s parents is new (depending on dependenciesWithMeta), it should not be considered a parent in the execution flow. 

Task lifecycle

After we have skipped the fresh tasks and decided which parents to consider for the current execution of a task, the following flow has to be followed to run the task.

  1. First, we determine whether the task is new or has already been run in accordance with its refresh schedule, and if so, we mark it as successful.
  2. Check if it is marked as skipped; if yes, it will be skipped, and notification will be sent to different channels.
  3. The job will be reviewed for its parents.If parents exist, we will check their status for all of them; if all of them are successful, the job will be completed.
  4. If any of the parents is not successful, the job will be executed. If any of the parents is successful, the job will be executed every 2 minutes for 30 minutes.
  5. We will mark the job as failed after 30 minutes because the parent is not fresh. 

Conclusion

Dynamic dependency management implemented with the coordination of Zookeeper gives us an edge in running our pipelines. Now we can run tasks on any Spark cluster, and it gives us the guarantee that they will run only once. The current solution has a limitation in that dynamic dependency is tightly coupled with Zookeeper. If the entire zookeeper cluster fails, it can be a single point of failure.