Batch Processing Triggered Pipeline Runs in Azure Synapse

Overcoming Event-Driven Limitations: Batching Triggered Pipeline Runs in Azure Synapse
Azure Synapse Pipelines and Azure Data Factory Pipelines provide a flexible way to orchestrate data movement and transformation. Event-driven triggers are a useful feature, allowing pipelines to start automatically as soon as data lands in a storage account. This "just-in-time" processing is often ideal for responsive data ingestion and near-real-time analytics - something that's heavily pushed via the various "Synapse Link for..." integrations.
However, there are scenarios where a purely event-driven approach isn't quite the optimal fit. What if you have a high volume of small, frequent events, and you'd prefer to process them in larger, more efficient batches? Or perhaps you need to ensure that a pipeline, once triggered, doesn't immediately re-trigger if subsequent events arrive within a very short window, leading to an unwanted cascade of runs. This is where the limitations of the out-the-box event-based triggers can become apparent – they don't natively support batching.
Frequent, bursty pipeline triggers can cause numerous problems. When multiple external events (file arrivals, API calls, whatever your trigger source) try to kick off the same pipeline in short succession, you can end up with a queue of pipeline runs all doing the same thing, maybe with slight variations of the data being processed. Depending on the pipeline's complexity and the resources it consumes, this can lead to:
- Long Delays: If 5 updates happened within a short period, and your pipeline takes 10 mins to run, you'll be waiting a long time to see the latest outputs
- Race Conditions: If your pipeline isn't designed to be idempotent, concurrent runs can lead to data inconsistencies
- Resource Contention: Multiple runs fighting for the same compute, storage, or API limits
- Performance Degradation: Everything slows down because the system is overloaded
- Unnecessary Costs: Paying for multiple parallel runs when a single, batched run would suffice
In this post, I'm going to describe a reusable pattern that solves the problems described above, using out-the-box pipeline activities to achieve "batched triggered" pipeline runs in Azure Synapse Pipelines (and can equally be applied in Azure Data Factory). In a subsequent post, I'll describe how the same pattern can be adapted for Microsoft Fabric Data Pipelines.
The Batched Trigger Orchestrator Pattern
The core idea behind this pattern is to introduce an "orchestrator" pipeline that is called from a parent "workload" pipeline. Upon the orchestrator being triggered, it checks for any other pending, queued runs of the same parent workload pipeline. If it finds them, it effectively signals to the parent pipeline to defer its own execution, passing on to the next triggered pipeline run in the queue. By specifying a configurable delay window, the net effect is that we can control how frequently a "batch" of triggered workload pipeline runs that have accumulated are actually executed.
For the approach to work, the parent workload pipeline needs to be set to only run once at a time using the pipeline concurrency settings. This forces the queue of trigger runs to build up, so that the orchestrator can decide which runs to execute and which runs to ignore. The parent workload pipeline receives the response from the orchestrator telling it whether to proceed or exit (quickly), and reacts accordingly.
Before we take a look at the pipeline definition, let's start with some scenarios.
Scenario 1: Single triggered pipeline (No Queued Runs)
In the simplest case, where a single event triggers the pipeline, and no other events arrive within the batching window, then the pipeline executes as expected:
Scenario 2: Multiple triggered events (Effective Batching)
In this scenario, several events arrive in quick succession, demonstrating how the orchestrator allows only one instance (the most recent) of the workload pipeline runs to proceed, effectively batching the work.
Scenario 3: Events spread out (Multiple Batches)
This scenario shows what happens if multiple triggered events arrive with sufficient gaps, leading to multiple successful runs of the workload pipeline.
Pipeline Definition: Batched Orchestrator
Here's the JSON definition for the "Batched Trigger Orchestrator" pipeline that can be used by any workload pipeline that needs to implement this pattern. This pipeline is entirely reusable and generic:
{
"name": "Batched Trigger Orchestrator",
"properties": {
"activities": [
{
"name": "Initial pipeline queue check",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"method": "POST",
"url": {
"value": "https://@{pipeline().DataFactory}.dev.azuresynapse.net//queryPipelineRuns?api-version=2020-12-01",
"type": "Expression"
},
"connectVia": {
"referenceName": "AutoResolveIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"body": {
"value": "{\n \"lastUpdatedAfter\": \"@{pipeline().parameters.CallingPipelineTriggerTime}\",\n \"lastUpdatedBefore\": \"@{utcNow()}\",\n \"filters\": [\n {\n \"operand\": \"PipelineName\",\n \"operator\": \"Equals\",\n \"values\": [\n \"@{pipeline()?.TriggeredByPipelineName}\"\n ]\n },\n {\n \"operand\": \"Status\",\n \"operator\": \"Equals\",\n \"values\": [\n \"Queued\"\n ]\n }\n ]\n}\n",
"type": "Expression"
},
"authentication": {
"type": "MSI",
"resource": "https://dev.azuresynapse.net/"
}
}
},
{
"name": "If no queued runs then delay",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Update queued run count",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@greater(length(activity('Initial pipeline queue check').output.value), 0) ",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Delay",
"type": "Wait",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"waitTimeInSeconds": {
"value": "@pipeline().parameters.DelayInSeconds",
"type": "Expression"
}
}
},
{
"name": "Delayed pipeline queue check",
"type": "WebActivity",
"dependsOn": [
{
"activity": "Delay",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"method": "POST",
"url": {
"value": "https://@{pipeline().DataFactory}.dev.azuresynapse.net//queryPipelineRuns?api-version=2020-12-01",
"type": "Expression"
},
"connectVia": {
"referenceName": "AutoResolveIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"body": {
"value": "{\n \"lastUpdatedAfter\": \"@{pipeline().parameters.CallingPipelineTriggerTime}\",\n \"lastUpdatedBefore\": \"@{utcNow()}\",\n \"filters\": [\n {\n \"operand\": \"PipelineName\",\n \"operator\": \"Equals\",\n \"values\": [\n \"@{pipeline()?.TriggeredByPipelineName}\"\n ]\n },\n {\n \"operand\": \"Status\",\n \"operator\": \"Equals\",\n \"values\": [\n \"Queued\"\n ]\n }\n ]\n}\n",
"type": "Expression"
},
"authentication": {
"type": "MSI",
"resource": "https://dev.azuresynapse.net/"
}
}
},
{
"name": "Update queued run count again",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Delayed pipeline queue check",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "QueuedRunCount",
"value": {
"value": "@length(activity('Delayed pipeline queue check').output.value) ",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Update queued run count",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Initial pipeline queue check",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "QueuedRunCount",
"value": {
"value": "@length(activity('Initial pipeline queue check').output.value) ",
"type": "Expression"
}
}
},
{
"name": "If any queued runs then cancel execution",
"type": "IfCondition",
"dependsOn": [
{
"activity": "If no queued runs then delay",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@greater(variables('QueuedRunCount'), 0)",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Continue execution",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "pipelineReturnValue",
"value": [
{
"key": "ContinueExecution",
"value": {
"type": "Boolean",
"content": true
}
}
],
"setSystemVariable": true
}
}
],
"ifTrueActivities": [
{
"name": "Cancel execution",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "pipelineReturnValue",
"value": [
{
"key": "ContinueExecution",
"value": {
"type": "Boolean",
"content": false
}
}
],
"setSystemVariable": true
}
}
]
}
}
],
"parameters": {
"DelayInSeconds": {
"type": "int",
"defaultValue": 1
},
"CallingPipelineTriggerTime": {
"type": "string"
}
},
"variables": {
"QueuedRunCount": {
"type": "Integer",
"defaultValue": 0
}
},
"annotations": []
}
}
Pipeline Definition: Example workload pipeline, using the Batched Orchestrator pattern
Here's the JSON definition of an example pipeline that uses the batched orchestrator. Note the following important points:
- Concurrency is set to 1
- The return value from executing the Batched Orchestrator Pipeline is checked to determine whether to proceed with the main pipeline workload, or to exit processing.
{
"name": "Example Batched Trigger Pipeline",
"properties": {
"activities": [
{
"name": "Orchestrate Batched Triggers",
"type": "ExecutePipeline",
"dependsOn": [],
"policy": {
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"pipeline": {
"referenceName": "Batched Trigger Orchestrator",
"type": "PipelineReference"
},
"waitOnCompletion": true,
"parameters": {
"DelayInSeconds": 20,
"CallingPipelineTriggerTime": {
"value": "@pipeline().TriggerTime",
"type": "Expression"
}
}
}
},
{
"name": "Check if should continue execution",
"type": "IfCondition",
"dependsOn": [
{
"activity": "Orchestrate Batched Triggers",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@equals(activity('Orchestrate Batched Triggers').output.pipelineReturnValue.ContinueExecution, true)",
"type": "Expression"
},
"ifFalseActivities": [
{
"name": "Cancel execution",
"type": "SetVariable",
"dependsOn": [],
"policy": {
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"variableName": "pipelineReturnValue",
"value": [
{
"key": "Result",
"value": {
"type": "String",
"content": "Cancelling execution due to batched trigger orchestration"
}
}
],
"setSystemVariable": true
}
}
],
"ifTrueActivities": [
{
"name": "Do work",
"type": "Wait",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"waitTimeInSeconds": 30
}
}
]
}
}
],
"concurrency": 1,
"variables": {
"QueuedRunCount": {
"type": "Integer",
"defaultValue": 0
}
},
"annotations": []
}
}
How it Works: A Step-by-Step Breakdown
- Initial Queue Check:
- The "Initial pipeline queue check" is a
WebActivity
that queries the Synapse API - It looks for any "Queued" runs of the specific calling pipeline (identified by
TriggeredByPipelineName
) that started after the calling pipeline was triggered.
- Count Queued Runs:
- The "Update queued run count"
SetVariable
activity takes the results from the initial API response. - It populates a pipeline variable,
QueuedRunCount
, with the number of queued instances found.
- Conditional Delay (If No Initial Queue):
- The "If no queued runs, then delay"
IfCondition
activity evaluates ifQueuedRunCount
is zero. - If no queued runs are found:
- It introduces a
Delay
(aWait
activity) for a configurableDelayInSeconds
(defaulting to 1 second). This is how we achieve the micro-batching strategy. - After the delay, a "Delayed pipeline queue check" (another
WebActivity
) re-queries the Synapse API for queued runs, to see if any other pipeline runs were triggered (and queued) during this delay.
- It introduces a
- The
QueuedRunCount
variable is then updated again with this fresh count via "Update queued run count again."
- Continue or Cancel Execution:
- The final "If any queued runs then cancel execution"
IfCondition
activity makes the decision based on the latestQueuedRunCount
. - If
QueuedRunCount
is greater than zero:- The "Cancel execution"
SetVariable
activity sets the pipeline return variableContinueExecution
tofalse
. This signals to the parent pipeline that it should not proceed, effectively stopping further queuing.
- The "Cancel execution"
- If
QueuedRunCount
is zero:- The "Continue execution"
SetVariable
activity sets the pipeline return variableContinueExecution
totrue
. This indicates that the calling pipeline can safely continue its execution, as there are no other instances currently waiting.
- The "Continue execution"
Considerations
The pattern described in this post allows you to decouple event-driven triggering from the actual workload processing. The DelayInSeconds
parameter is key to defining your batching window. Experiment with this value to find the right balance between responsiveness and batching efficiency for your specific use case.
Remember, the "Batched Trigger Orchestrator" pipeline orchestrates the running of the parent pipeline, which is where your actual data transformation and workload logic would reside, only executing based on the instruction from the orchestrator.
Full code samples and pipeline template can be found at: https://github.com/endjin/data-pipeline-patterns