Enterprise-wide orchestration using multiple data factories

An illustration depicting Azure Data Factory, next to a picture of Bit the Raccoon.

Introduction

Building simple data engineering pipelines with a single Azure Data Factory (ADF) is easy, and multiple activities can be orchestrated within a single pipeline. More sophisticated data engineering patterns require flexibility and reusability through Pipeline Orchestration. A common scenario is to orchestrate pipelines using the built-in Execute Pipeline Activity however this does not support invoking pipelines outside of the current data factory. This blog will review how to approach cross-factory pipeline orchestration, reviewing the options for both asynchronous and synchronous control.

 

Background

Robust and flexible pipeline orchestration is an important tool in the data development toolkit as it allows for data processing operations to have segregation of execution planes, for example Extract, Transform and Load. Most large data pipelines consist of repetitive operations and require encapsulation of logic into pipelines which can be orchestrated by a “master pipeline”, enabling automation of custom workflows.

In ADF we can achieve pipeline orchestration within a single factory by using the Execute Pipeline Activity. This activity allows selecting of pipelines within the current Data Factory and the ability to customise if we wish to wait, or not, for the invoked  Pipeline to complete by selecting the Wait on Completion property. However, this activity is limited to the scope of the existing data factory only.

To achieve the same orchestration of pipelines residing in other data factories (figure 1) we can call the Data Factory Rest API Pipelines-Create Run using either the Web Activity (does not wait for completion) or Webhook Activity (waits for completion).

Figure 1 - Orchestrating Pipelines across subscriptions

Figure 1 – Orchestrating Pipelines across subscriptions

 

Conventions Used

Master Factory: The data factory which will invoke and orchestrate pipelines in other (worker) data factories.

Master Pipeline: A pipeline in the master factory which invokes pipelines in other (worker) data factories. This pipeline may often contain complex orchestration of worker pipelines.

Worker Factory:  The data factory containing pipelines which will be invoked by the master

Invoked Pipeline: The pipeline which is invoked by the master pipeline. Often this is where a workload will run such as a copy activity or a transformation using mapping data flows.

 

Setting up the Environment

To complete the demo below, you will need access to at least one Azure Subscription and two Azure Data Factories within a resource group in which you have contributor access. Note, the cost of running this entire demo will be less than a few pence, as there is no charge to run the master pipeline, and you are only billed for the worker pipeline execution. Setup guidance for this is provided below, but first let’s examine authentication and authorisation between the factories.

 

Security between Data Factories

To allow for an API call between data factories, we will be using the ADF Managed Identity (MI), which is an Azure Managed Application, registered to Azure Active Directory and represents a specific data factory. The MI authentication is supported by both Web Activity and Webhook and provides the benefit of not having to store or manage passwords or certificates.

Data Factory Contributor role will authorise the Master Data Factory to invoke pipelines in the Worker Factory. We need to assign this role to the Master on the Worker.  Custom RBAC roles can be defined if more granular permissions are needed (i.e. only execute pipelines is required).

Figure 2 - Cross Data Factory Authentication using M

Figure 2 – Cross Data Factory Authentication using MI

Note: Managed Identities require objects to be held within the same Azure Active Directory Tenant. Therefore, using MI for authentication is only supported within the same Tenant.

 

Build Master and Worker Data Factories and configure security

  1. Create an Azure Data Factory using the Portal or API. We recommend suffixing the Factory Name with -Master as this will become the Master Factory. Make a note of this name as it will be required later in step 3
  2. Create a second Azure Data Factory using the Portal or API. We recommend suffixing the Factory Name with -Worker as this will become our Worker Factory.
  3. Once the Worker factory is built, within the Azure Portal select Access Control IAM (see Figure 3).
    1. Within the Access Control slicer, select Role Assignments
    2. Click Add Role Assignment
    3. In the Add Role Assignment panel select:
      1. Role: Data Factory Contributor
      2. Assign Access To: Azure AD User, Group or service principal
      3. Select: Enter the name of the Master Data Factory created in Step 1. This should be found and selected from the results
      4. Check details and click Save.

Figure 3 - Setting Permissions for the Master Data Factory

Figure 3 – Setting Permissions for the Master Data Factory

At this point you should have two Data Factories, Master and Worker and have configured security to allow the Master Data Factory to invoke pipelines in the Worker Data Factory.

 

Orchestration Using Web Activity (Does not wait for completion)

Now that we have the two Data Factories, we can begin to build a simple Pipeline Orchestration between them using the Web Activity for a workflow that is fire and forget.

Figure 4 - Sample of using Web Activity to Invoke Pipeline

Figure 4 – Sample of using Web Activity to Invoke Pipeline

With this activity, the Master Pipeline only reports the success of the Pipeline staring, it does not wait for feedback on status or results from the Invoked Pipeline. This can be useful when we do not wish for the Master Pipeline to fail if the underlying Invoked Pipeline has failed.

This guide will comprise of 3 main stages:

  1. Create a Pipeline on the Worker Data Factory. This will be invoked by the Master Data Factory.
  2. Create a Master Pipeline with the Web Activity to invoke the pipeline on the Worker Data Factory through the Rest API Pipeline – Create Run
  3. Test and run the Master Pipeline and verify the results.

 

Stage 1: Create a pipeline in the Worker Data Factory

To begin testing cross Data Factory Orchestration, we firstly need to create a test pipeline on the Worker Data Factory. To do this, please see the steps below:

  1. Within the Azure Portal, Select the Worker Data Factory and from the Overview slicer, select Author and Monitor.
  2. On the Let’s get started Page switch to the Author tab in the left panel

Within the Azure Portal, Select the Worker Data Factory and from the Overview slicer, select Author and Monitor.

  1. Select Pipelines from the Factory Resources
  2. Create a new Pipeline, and in the Name call this InvokedPipeline1
  3. Within Activities, Select General and Add a Wait activity to the pipeline. This Wait Activity will simulate some type of data movement or transformation activity, you are free to replace this with any type of activity you wish.

On the Let’s get started Page switch to the Author tab in the left panel

  1. Select the Wait Activity created in the pipeline and within the General settings, name this Sample Wait activity
  2. Within the Settings of the Sample Wait activity, set the Wait time in Seconds to 30. This should provide enough time to demonstrate the Web Activity is working asynchronously when invoked from the Master Pipeline.
  3. Select Publish All to save the pipeline to the data factory.

Note: Only published pipelines can be invoked on the Worker Data Factory through the API, they cannot be invoked in Debug mode. Be sure to save changes to your pipelines on the Worker Data Factory by Publishing your changes.

At this point we now have a sample pipeline in our Worker Data Factory, called InvokedPipeline1 which will be used in stages 2 and 3 to test cross factory orchestration.

 

Stage 2: Create a Pipeline in the Master Data Factory

Next we need to build a pipeline in the Master Data Factory, which will be used to asynchronously invoke the InvokedPipeline1 pipeline in the Worker Data Factory. To do this, we will be using the  Web Activity to invoke the pipeline in the Worker Data Factory using the Rest API call Pipelines – Create Run.

The Rest API call has a set of parameters that must be built correctly, some are contextual based on the name and Resource groups of your Master Data Factory. We will run through these parameters in the following table. It is recommended you use a text editor to prepare these before continuing.

 

Property Description Value to Use
Name Name of the Web Activity Run_Pipeline1
Method Rest API method for the target endpoint. POST
URL

 

Target and endpoint Path This value is comprised of the following properties to construct the URL of the target ADF:

·       {Subscriptionid} – the Subscription ID of the Worker Data factory.

·       {ResourceGroupName} – The Resource Group Name of the Worker Data factory.

·       {FactoryName} – The Name of the Worker Data factory

·       {PipelineName} – The Name of the Pipeline to be invoke i.e. InvokedPipeline1 (created in Step 1)

 

These parameters can then be used in the following URL, replacing <Parameter> with your value:

 

https://management.azure.com/subscriptions/<coordinator-factory-subscription-id>/resourceGroups/<coordinator-resource-group>/providers/Microsoft.DataFactory/factories/<coordinator-factory-name>/pipelines/<coordinator-pipeline-name>/createRun?api-version=2018-06-01

 

Headers Headers that are sent to the request. Name: Content-Type

Value: application/json

Body Represents the payload that is sent to the endpoint. These can be used to set parameters between Pipelines, see Section Body Parameters between Pipelines below {“type”:”object”,”properties”:{“inputPath”:{“type”:”string”},”outputPath”:{“type”:”string”}}}
Authentication (Advanced) Authentication method used for calling the endpoint. Select : MSI

Resource: https://management.core.windows.net/

See Details here.

 

Now using these parameters, configure the Web Activity:

  1. Within the Azure Portal, Select the Master Data Factory and from the Overview tab, select Author and Monitor.
  2. On the Let’s get Started Page switch to the Author tab in the Left Panel
  3. Select Pipelines from the Factory Resources
  4. Create a new Pipeline, and in the Name call this MasterPipeline1
  5. Within Activities, Select General and Add a Web activity to the Pipeline. This Web Activity will be used to invoke the pipeline in the Worker Data Factory.
  6. Select the Web Activity created in Step 5, and in the General tab, replace the Name with Run_Pipeline1
  7. Select the Settings tab and replace these settings with the corresponding parameters from above.

Select the Settings tab and replace these settings with the corresponding parameters from above.

  1. Select Publish All to save the Pipeline to the Data Factory.

At this point we now have a pipelines on our Worker and Master Data Factories and we can begin testing the orchestration between them in Stage 3.

 

Stage 3: Testing the orchestration pipeline

Now that the factories are setup and configured, it is time to test our pipeline, and to observe that they are behaving as expected, asynchronously. To do this we will be initiating the MasterPipeline1 in the Master Data Factory, which will then call the InvokedPipeline1 in the Worker Data Factory. Finally we will use the Data Factory Monitor to review the results.

  1. Within the Master Data Factory, open MasterPipeline1

Within the Master Data Factory, open MasterPipeline1

  1. Select Add Trigger and Select Trigger Now. This will run the pipeline MasterPipeline1
  2. To view the results of the Pipeline, select the Monitor

To view the results of the Pipeline, select the Monitor tab.

We can see that the Pipeline should have started successfully completed and should have effectively taken just a few seconds to run.

  1. Within the Worker Data Factory, select the Monitor

Within the Worker Data Factory, select the Monitor tab.

The InvokedPipeline1 pipeline should have successfully started on the Worker Data Factory and will either still be running or completed. InvokedPipeline1 takes 30 seconds to complete whereas MasterPipeline1 completes within 3 to 5 seconds. This demonstrates that once invoked, the MasterPipeline1 does not wait for completion of InvokedPipeline1.

Within the Next section we will build out a new Pipeline this time using the Webhook which will be used to build a new master pipeline which will wait for completion of the worker pipeline.

 

Orchestration Using Webhooks (wait for completion)

Now that we understand how to build orchestration pipelines using the Web Activity to run pipelines which do not wait for completion, we will next review using the same methodology for building an orchestration pipeline with feedback using the Webhook Activity.

This will ensure the Master Data Factory will wait for the Invoked pipeline to complete before commencing to the next step in its workflow. Webhook also has the benefit reporting the status of the invoked pipeline run, failing the activity should the invoked pipeline fail. Webhook can also pass custom parameters between the Master Pipeline and Invoked Pipeline, giving more extensibility over the Web Activity.

Figure 5 – Sample of using Webhooks to invoke a Pipeline

Figure 5 – Sample of using Webhooks to invoke a Pipeline

The implementation of a Webhook differs to the Web Activity in that the invoked pipeline needs to “acknowledge” the completion of its pipeline through a callback step. The callback URI used by the Worker Factory will be automatically supplied by the Master Factory in the body of the request.

This guide will comprise of 3 main stages:

  1. Create a pipeline in the Worker Factory. This will be invoked by the Master Factory.
  2. Create a Master Pipeline with the Webhook activity to invoke the pipeline on the Worker Factory through the Rest API Pipeline – Create Run
  3. Test and run the Master Pipeline and verify the results.

 

Stage 1: Create a Pipeline in the Worker Data Factory

  1. Within the Azure Portal, Select the Master Data Factory and from the Overview Slicer, select Author and Monitor.
  2. On the Let’s get Started page switch to the Author tab in the Left Panel
  3. Select Pipelines from the Factory Resources
  4. Create a new pipeline, and in the Name call this InvokedPipeline2
  5. Create a new parameter in InvokedPipeline2 called CallBackURI, set the Type to String

Create a new parameter in InvokedPipeline2 called CallBackURI, set the Type to String

  1. Within Activities, Select General and Add a Wait activity to the Pipeline. This Wait Activity will simulate some type of data movement or transformation activity, you are free to replace this with any type of activity you wish.

Within Activities, Select General and Add a Wait activity to the Pipeline. This Wait Activity will simulate some type of data movement or transformation activity, you are free to replace this with any type of activity you wish.

  1. Select the Wait Activity created in the Pipeline and within the General settings, name this Sample Wait activity
  2. Within the Settings of the Sample Wait activity, set the Wait time in Seconds to 30. This should provide enough time to demonstrate the webhook is waiting for this activity to complete when invoking this pipeline from the Master Factory.
  3. Within Activities, Select General and Add a Web activity to the Pipeline. The Web Activity will be used to make the callback to the Master factory.

Within Activities, Select General and Add a Web activity to the Pipeline. The Web Activity will be used to make the callback to the Master factory.

  1. Within the Web Activity, set the URL as follows:
@pipeline().parameters.callBackUri
  1. Within Web Activity, set method to POST and the Body to {} as we are not returning any parameters at the moment.
  2. Select Publish All to save the Pipeline to the Data Factory.

Note: Only Published Pipelines can be invoked on the Worker Data Factory through the API, they cannot be invoked in Debug mode. Be sure to save changes to your Pipelines in the Worker Data Factory by Publishing your changes.

 

Stage 2: Create a Pipeline in the Master Data Factory

As in the Web Activity section, the parameters used for the Webhook are based upon the environmental components such as the Data Factory names and Resource Group Names. We recommend building your configuration first using the table below as a guide:

Property Description Value to Use
Name Name of the Webhook Activity Run_Pipeline2
Method Rest API method for the target endpoint. POST
URL

 

Target and endpoint Path This value is comprised of the following properties to construct the URL of the target ADF:

·       {Subscriptionid} – the Subscription ID of the Worker Data factory.

·       {ResourceGroupName} – The Resource Group Name of the Worker Data factory.

·       {FactoryName} – The Name of the Worker Data factory

·       {PipelineName} – The Name of the Pipeline to invoke (Created in Step 1 InvokedPipeline1)

 

These parameters can then be used in the following URL, replacing <Parameter> with your value:

 

https://management.azure.com/subscriptions/<coordinator-factory-subscription-id>/resourceGroups/<coordinator-resource-group>/providers/Microsoft.DataFactory/factories/<coordinator-factory-name>/pipelines/<coordinator-pipeline-name>/createRun?api-version=2018-06-01

 

Headers Headers that are sent to the request. Name: Content-Type

Value: application/json

Body Represents the payload that is sent to the endpoint These can be used to set parameters between Pipelines see section Body Parameters between Pipelines below {}
Timeout Time within which the webhook should be called back. This will result in a failed activity if the Call back has not occurred within this timeframe 10:00
Authentication (Advanced) Authentication method used for calling the endpoint. Select : MSI

Resource: https://management.core.windows.net/

See Details here.

Report Status on Call-back This can be used to pass back a status code and error in the call back to report the success of fail status. False (or see note below)

 

Note: Should you wish to control the status code (success/fail) returned by the Invoked pipeline then enable this flag. Additional parameters can also be returned to the Master pipeline from the Invoked pipeline – to demonstrate this enter the following json into the Web Activity body payload at step 11 above:

{"Output":{"testProp":"testPropValue"},"StatusCode":"200"}

Later in the section entitled Body Parameters between Pipelines of this blog, we’ll discuss how to consume this parameter from the Master Pipeline.

Now using these parameters configure the Webhook Activity:

  1. Within the Azure Portal, Select the Master Data Factory and from the Overview Slicer, select Author and Monitor.
  2. On the Let’s get Started Page switch to the Author tab in the left panel
  3. Select Pipelines from the Factory Resources
  4. Create a new pipeline, and in the Name field call this MasterPipeline2
  5. Within Activities, Select General and Add a Webhook activity to the Pipeline. This webhook activity will be used to invoke the Pipeline in the Worker Data Factory.
  6. Select the Webhook activity created in Step 5, and in the General tab, replace the Name with Run_Pipeline2
  7. Select the Settings tab and replace these settings with the corresponding parameters above.

Select the Settings tab and replace these settings with the corresponding parameters above.

Body Parameters between Pipelines

The body parameter facilitates the passing of additional parameters such as filters or control flags, from the Master to the Worker. These can be read by the Worker pipeline as parameters. Note the callBackUri is generated automatically and is appended to the body payload upon execution.

For instance, if {“myvar”:”test”} is added to the body variable in the Master and myvar is added as a parameter to the Worker pipeline, we would observe it passed through as execution parameters in the Monitor section.

{"myvar":"test"} added to the body variable in the Master

Similarly, from the Worker pipeline we can pass data back to the Master pipeline and access these using a dynamic content snippet, for e.g.

@activity('WebHook').output.testProp

where ‘WebHook’ represents the name of the activity that initiated the call-back, which in our example is Run_Pipeline2.

 

Stage 3: Testing the orchestration Pipelines

We are now ready to test the webhook method of orchestration. To do this we will be initiating the MasterPipeline2 in the Master Data Factory, which will then call the InvokedPipeline2 in the Worker Data Factory, using the Data Factory Monitor to review the results.

  1. Within the Master Factory, open MasterPipeline2

Within the Master Factory, open MasterPipeline2

  1. Select Add Trigger and Select Trigger Now. This will run the pipeline MasterPipeline2
  2. To view the results of the pipeline, select Monitor.

To view the results of the pipeline, select Monitor.

We can see that the pipeline successfully completed after 30 seconds which was the specified wait time in the Invoked pipeline.

  1. Within the Worker factory, select the Monitor panel

Within the Worker factory, select the Monitor panel

The InvokedPipeline2 pipeline should have successfully started in the Worker Data Factory and completed in just over 30 seconds. This demonstrates that once invoked, the MasterPipeline2 will wait for completion of the InvokedPipeline2.

Additionally, we may also want to check the “CallBackUri” parameter which is dynamically generated at run time. If we inspect the execution parameters of Worker factory in the monitoring pane, we will see the “callBackUri” parameter provided by the Master Factory.

If we inspect the execution parameters of Worker factory in the monitoring pane, we will see the “callBackUri” parameter provided by the Master Factory.

https://dpwesteurope.svc.datafactory.azure.com/dataplane/workflow/callback/<guid>&callbackUrl=<longrandomstring>&activityRunId=<guid> &shouldReportToMonitoring=True&activityType=WebHook

 

Next Steps: Handling more complex scenarios

If you have followed this guide, you will have successfully demonstrated cross data factory orchestration, using both the Web and Webhook Activities. So far we have demonstrated this using very simple examples, but organisation will often have more complex orchestration requirements whereby:

  • Multiple pipelines need to run in a set sequence
  • Each pipeline will run for a different duration of time
  • Downstream pipelines depend on the completion of the upstream pipelines.

For example a more complex flow may look like the following:

Figure 6 - More complex orchestration - “the diamond problem”

Figure 6 – More complex orchestration – “the diamond problem”

Using the Webhook activity, the Master Factory will wait for the successful execution of each invoked pipeline invocation before moving on to the next step, thus the control flow is still honoured.

To simulate this scenario, make four clones of the Worker pipeline and suffix them with 1-4 and then Publish. Edit the Wait activity of pipeline3 to be longer than the wait time of pipeline2. This will allow us to verify that step 4 in the Master will wait for both 2 and 3 to complete.

Then in the Master, create a clone of the existing pipeline, and create three additional copies of the Webhook, renaming them accordingly. Ensure each activity’s URL invokes its associated Worker pipeline.

Run this and notice how activity 4 is only run after 2 and 3 complete.

Run this and notice how activity 4 is only run after 2 and 3 complete.

Run this and notice how activity 4 is only run after 2 and 3 complete.