Skip to content
James Broome By James Broome Director of Engineering
Batch Processing Triggered Pipeline Runs in Azure Synapse

Overcoming Event-Driven Limitations: Batching Triggered Pipeline Runs in Azure Synapse

Azure Synapse Pipelines and Azure Data Factory Pipelines provide a flexible way to orchestrate data movement and transformation. Event-driven triggers are a useful feature, allowing pipelines to start automatically as soon as data lands in a storage account. This "just-in-time" processing is often ideal for responsive data ingestion and near-real-time analytics - something that's heavily pushed via the various "Synapse Link for..." integrations.

However, there are scenarios where a purely event-driven approach isn't quite the optimal fit. What if you have a high volume of small, frequent events, and you'd prefer to process them in larger, more efficient batches? Or perhaps you need to ensure that a pipeline, once triggered, doesn't immediately re-trigger if subsequent events arrive within a very short window, leading to an unwanted cascade of runs. This is where the limitations of the out-the-box event-based triggers can become apparent – they don't natively support batching.

Microsoft Fabric Weekly is a summary of the week's top news to help you build on the Microsoft Fabric Platform.

Frequent, bursty pipeline triggers can cause numerous problems. When multiple external events (file arrivals, API calls, whatever your trigger source) try to kick off the same pipeline in short succession, you can end up with a queue of pipeline runs all doing the same thing, maybe with slight variations of the data being processed. Depending on the pipeline's complexity and the resources it consumes, this can lead to:

  • Long Delays: If 5 updates happened within a short period, and your pipeline takes 10 mins to run, you'll be waiting a long time to see the latest outputs
  • Race Conditions: If your pipeline isn't designed to be idempotent, concurrent runs can lead to data inconsistencies
  • Resource Contention: Multiple runs fighting for the same compute, storage, or API limits
  • Performance Degradation: Everything slows down because the system is overloaded
  • Unnecessary Costs: Paying for multiple parallel runs when a single, batched run would suffice

In this post, I'm going to describe a reusable pattern that solves the problems described above, using out-the-box pipeline activities to achieve "batched triggered" pipeline runs in Azure Synapse Pipelines (and can equally be applied in Azure Data Factory). In a subsequent post, I'll describe how the same pattern can be adapted for Microsoft Fabric Data Pipelines.

The Batched Trigger Orchestrator Pattern

The core idea behind this pattern is to introduce an "orchestrator" pipeline that is called from a parent "workload" pipeline. Upon the orchestrator being triggered, it checks for any other pending, queued runs of the same parent workload pipeline. If it finds them, it effectively signals to the parent pipeline to defer its own execution, passing on to the next triggered pipeline run in the queue. By specifying a configurable delay window, the net effect is that we can control how frequently a "batch" of triggered workload pipeline runs that have accumulated are actually executed.

The best hour you can spend to refine your own data strategy and leverage the latest capabilities on Azure to accelerate your road map.

For the approach to work, the parent workload pipeline needs to be set to only run once at a time using the pipeline concurrency settings. This forces the queue of trigger runs to build up, so that the orchestrator can decide which runs to execute and which runs to ignore. The parent workload pipeline receives the response from the orchestrator telling it whether to proceed or exit (quickly), and reacts accordingly.

Before we take a look at the pipeline definition, let's start with some scenarios.

Scenario 1: Single triggered pipeline (No Queued Runs)

In the simplest case, where a single event triggers the pipeline, and no other events arrive within the batching window, then the pipeline executes as expected:

sequenceDiagram participant D1 as Data Source participant E1 as Storage Event Trigger participant P1 as Workload Pipeline participant O1 as Batched Trigger Orchestrator Pipeline participant SynapseAPI as Synapse API participant P1_Work as Pipeline activities D1->>E1: New event occurs (e.g., file arrival) E1->>P1: Workload Pipeline is triggered P1->>O1: Execute orchestration pipeline (20 second delay) O1->>SynapseAPI: Query Synapse API for queued Workload Pipeline runs since trigger time SynapseAPI-->>O1: Returns empty list O1->>O1: Delay for 20 seconds Note over O1: No other Workload Pipeline runs initiated during 20 seconds O1->>SynapseAPI: After 20s, query Synapse API for queued Workload Pipeline runs between trigger time and +20 seconds SynapseAPI-->>O1: Returns empty list O1-->>P1: Signals result: Continue Execution P1->>P1: Checks result P1->>P1_Work: Execute workload P1_Work-->>P1: Work completes P1-->>P1: Workload Pipeline completes

Scenario 2: Multiple triggered events (Effective Batching)

In this scenario, several events arrive in quick succession, demonstrating how the orchestrator allows only one instance (the most recent) of the workload pipeline runs to proceed, effectively batching the work.

sequenceDiagram participant D1 as Data Source participant E1 as Storage Event Trigger participant P1 as Workload Pipeline (Run 1) participant O1 as Batched Trigger Orchestrator Pipeline (Run 1) participant P2 as Workload Pipeline (Run 2) participant O2 as Batched Trigger Orchestrator Pipeline (Run 2) participant P3 as Workload Pipeline (Run 3) participant O3 as Batched Trigger Orchestrator Pipeline (Run 3) participant SynapseAPI as Synapse API participant P1_Work as Pipeline activities D1->>E1: New event occurs (e.g., file arrival) E1->>P1: Workload Pipeline is triggered P1->>O1: Execute orchestration pipeline (20 second delay) O1->>SynapseAPI: Query Synapse API for queued Workload Pipeline runs SynapseAPI-->>O1: Returns empty list O1->>O1: Delay for 20 seconds D1->>E1: New event occurs (+5 seconds) (e.g., file arrival) E1->>P2: Workload Pipeline is queued D1->>E1: New event occurs (+10 seconds) (e.g., file arrival) E1->>P3: Workload Pipeline is queued O1->>SynapseAPI: After 20s, query Synapse API for queued Workload Pipeline runs between trigger time and +20 seconds SynapseAPI-->>O1: Returns list containing Run 2 and 3 O1-->>P1: Signals result: Cancel Execution P1->>P1: Checks result P1-->>P1: Workload Pipeline exits P2->>O2: Execute orchestration pipeline (20 second delay) O2->>SynapseAPI: Query Synapse API for queued Workload Pipeline runs between trigger time and +20 seconds SynapseAPI-->>O2: Returns list containing Run 3 O2->>O2: Signals result: Cancel Execution P2->>P2: Checks result P2-->>P2: Workload Pipeline exits P3->>O3: Execute orchestration pipeline (20 second delay) O3->>SynapseAPI: Query Synapse API for queued Workload Pipeline runs between trigger time and +20 seconds SynapseAPI-->>O3: Returns empty list O3->>O3: Signals result: Continue Execution P3->>P3: Checks result P3->>P1_Work: Execute workload P1_Work-->>P3: Work completes P3-->>P3: Workload Pipeline completes

Scenario 3: Events spread out (Multiple Batches)

This scenario shows what happens if multiple triggered events arrive with sufficient gaps, leading to multiple successful runs of the workload pipeline.

sequenceDiagram participant D1 as Data Source participant E1 as Event Trigger participant P1 as Workload Pipeline (Run 1) participant O1 as Batched Trigger Orchestrator Pipeline (Run 1) participant P2 as Workload Pipeline (Run 2) participant O2 as Batched Trigger Orchestrator Pipeline (Run 2) participant SynapseAPI as Synapse API participant P_Work as Pipeline activities D1->>E1: New event occurs (e.g., file arrival) E1->>P1: Workload Pipeline is triggered P1->>O1: Execute orchestration pipeline (20 second delay) O1->>SynapseAPI: Query Synapse API for queued Workload Pipeline runs SynapseAPI-->>O1: Returns empty list O1->>O1: Delay for 20 seconds O1->>SynapseAPI: After 20s, query Synapse API for queued Workload Pipeline runs between trigger time and +20 seconds SynapseAPI-->>O1: Returns empty list O1-->>P1: Signals result: Continue Execution P1->>P1: Checks result P1->>P_Work: Execute workload P_Work-->>P1: Work completes P1-->>P1: Workload Pipeline completes D1->>E1: New event occurs (+30 seconds) (e.g., file arrival) E1->>P2: Workload Pipeline is triggered P2->>O2: Execute orchestration pipeline (20 second delay) O2->>SynapseAPI: Query Synapse API for queued Workload Pipeline runs SynapseAPI-->>O2: Returns empty list O2->>O2: Delay for 20 seconds O2->>SynapseAPI: After 20s, query Synapse API for queued Workload Pipeline runs between trigger time and +20 seconds SynapseAPI-->>O2: Returns empty list O2-->>P2: Signals result: Continue Execution P2->>P2: Checks result P2->>P_Work: Execute workload P_Work-->>P2: Work completes P2-->>P2: Workload Pipeline completes

Pipeline Definition: Batched Orchestrator

Here's the JSON definition for the "Batched Trigger Orchestrator" pipeline that can be used by any workload pipeline that needs to implement this pattern. This pipeline is entirely reusable and generic:

{
    "name": "Batched Trigger Orchestrator",
    "properties": {
        "activities": [
            {
                "name": "Initial pipeline queue check",
                "type": "WebActivity",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "method": "POST",
                    "url": {
                        "value": "https://@{pipeline().DataFactory}.dev.azuresynapse.net//queryPipelineRuns?api-version=2020-12-01",
                        "type": "Expression"
                    },
                    "connectVia": {
                        "referenceName": "AutoResolveIntegrationRuntime",
                        "type": "IntegrationRuntimeReference"
                    },
                    "body": {
                        "value": "{\n  \"lastUpdatedAfter\": \"@{pipeline().parameters.CallingPipelineTriggerTime}\",\n  \"lastUpdatedBefore\": \"@{utcNow()}\",\n  \"filters\": [\n    {\n      \"operand\": \"PipelineName\",\n      \"operator\": \"Equals\",\n      \"values\": [\n        \"@{pipeline()?.TriggeredByPipelineName}\"\n      ]\n    },\n    {\n      \"operand\": \"Status\",\n      \"operator\": \"Equals\",\n      \"values\": [\n        \"Queued\"\n      ]\n    }\n  ]\n}\n",
                        "type": "Expression"
                    },
                    "authentication": {
                        "type": "MSI",
                        "resource": "https://dev.azuresynapse.net/"
                    }
                }
            },
            {
                "name": "If no queued runs then delay",
                "type": "IfCondition",
                "dependsOn": [
                    {
                        "activity": "Update queued run count",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@greater(length(activity('Initial pipeline queue check').output.value), 0) ",
                        "type": "Expression"
                    },
                    "ifFalseActivities": [
                        {
                            "name": "Delay",
                            "type": "Wait",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "waitTimeInSeconds": {
                                    "value": "@pipeline().parameters.DelayInSeconds",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Delayed pipeline queue check",
                            "type": "WebActivity",
                            "dependsOn": [
                                {
                                    "activity": "Delay",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "method": "POST",
                                "url": {
                                    "value": "https://@{pipeline().DataFactory}.dev.azuresynapse.net//queryPipelineRuns?api-version=2020-12-01",
                                    "type": "Expression"
                                },
                                "connectVia": {
                                    "referenceName": "AutoResolveIntegrationRuntime",
                                    "type": "IntegrationRuntimeReference"
                                },
                                "body": {
                                    "value": "{\n  \"lastUpdatedAfter\": \"@{pipeline().parameters.CallingPipelineTriggerTime}\",\n  \"lastUpdatedBefore\": \"@{utcNow()}\",\n  \"filters\": [\n    {\n      \"operand\": \"PipelineName\",\n      \"operator\": \"Equals\",\n      \"values\": [\n        \"@{pipeline()?.TriggeredByPipelineName}\"\n      ]\n    },\n    {\n      \"operand\": \"Status\",\n      \"operator\": \"Equals\",\n      \"values\": [\n        \"Queued\"\n      ]\n    }\n  ]\n}\n",
                                    "type": "Expression"
                                },
                                "authentication": {
                                    "type": "MSI",
                                    "resource": "https://dev.azuresynapse.net/"
                                }
                            }
                        },
                        {
                            "name": "Update queued run count again",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Delayed pipeline queue check",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "QueuedRunCount",
                                "value": {
                                    "value": "@length(activity('Delayed pipeline queue check').output.value) ",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Update queued run count",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Initial pipeline queue check",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "QueuedRunCount",
                    "value": {
                        "value": "@length(activity('Initial pipeline queue check').output.value) ",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "If any queued runs then cancel execution",
                "type": "IfCondition",
                "dependsOn": [
                    {
                        "activity": "If no queued runs then delay",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@greater(variables('QueuedRunCount'), 0)",
                        "type": "Expression"
                    },
                    "ifFalseActivities": [
                        {
                            "name": "Continue execution",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "pipelineReturnValue",
                                "value": [
                                    {
                                        "key": "ContinueExecution",
                                        "value": {
                                            "type": "Boolean",
                                            "content": true
                                        }
                                    }
                                ],
                                "setSystemVariable": true
                            }
                        }
                    ],
                    "ifTrueActivities": [
                        {
                            "name": "Cancel execution",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "pipelineReturnValue",
                                "value": [
                                    {
                                        "key": "ContinueExecution",
                                        "value": {
                                            "type": "Boolean",
                                            "content": false
                                        }
                                    }
                                ],
                                "setSystemVariable": true
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "DelayInSeconds": {
                "type": "int",
                "defaultValue": 1
            },
            "CallingPipelineTriggerTime": {
                "type": "string"
            }
        },
        "variables": {
            "QueuedRunCount": {
                "type": "Integer",
                "defaultValue": 0
            }
        },
        "annotations": []
    }
}

Pipeline Definition: Example workload pipeline, using the Batched Orchestrator pattern

Here's the JSON definition of an example pipeline that uses the batched orchestrator. Note the following important points:

  1. Concurrency is set to 1
  2. The return value from executing the Batched Orchestrator Pipeline is checked to determine whether to proceed with the main pipeline workload, or to exit processing.
{
    "name": "Example Batched Trigger Pipeline",
    "properties": {
        "activities": [
            {
                "name": "Orchestrate Batched Triggers",
                "type": "ExecutePipeline",
                "dependsOn": [],
                "policy": {
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "pipeline": {
                        "referenceName": "Batched Trigger Orchestrator",
                        "type": "PipelineReference"
                    },
                    "waitOnCompletion": true,
                    "parameters": {
                        "DelayInSeconds": 20,
                        "CallingPipelineTriggerTime": {
                            "value": "@pipeline().TriggerTime",
                            "type": "Expression"
                        }
                    }
                }
            },
            {
                "name": "Check if should continue execution",
                "type": "IfCondition",
                "dependsOn": [
                    {
                        "activity": "Orchestrate Batched Triggers",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@equals(activity('Orchestrate Batched Triggers').output.pipelineReturnValue.ContinueExecution, true)",
                        "type": "Expression"
                    },
                    "ifFalseActivities": [
                        {
                            "name": "Cancel execution",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "pipelineReturnValue",
                                "value": [
                                    {
                                        "key": "Result",
                                        "value": {
                                            "type": "String",
                                            "content": "Cancelling execution due to batched trigger orchestration"
                                        }
                                    }
                                ],
                                "setSystemVariable": true
                            }
                        }
                    ],
                    "ifTrueActivities": [
                        {
                            "name": "Do work",
                            "type": "Wait",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "waitTimeInSeconds": 30
                            }
                        }
                    ]
                }
            }
        ],
        "concurrency": 1,
        "variables": {
            "QueuedRunCount": {
                "type": "Integer",
                "defaultValue": 0
            }
        },
        "annotations": []
    }
}

How it Works: A Step-by-Step Breakdown

  1. Initial Queue Check:
  • The "Initial pipeline queue check" is a WebActivity that queries the Synapse API
  • It looks for any "Queued" runs of the specific calling pipeline (identified by TriggeredByPipelineName) that started after the calling pipeline was triggered.
  1. Count Queued Runs:
  • The "Update queued run count" SetVariable activity takes the results from the initial API response.
  • It populates a pipeline variable, QueuedRunCount, with the number of queued instances found.
  1. Conditional Delay (If No Initial Queue):
  • The "If no queued runs, then delay" IfCondition activity evaluates if QueuedRunCount is zero.
  • If no queued runs are found:
    • It introduces a Delay (a Wait activity) for a configurable DelayInSeconds (defaulting to 1 second). This is how we achieve the micro-batching strategy.
    • After the delay, a "Delayed pipeline queue check" (another WebActivity) re-queries the Synapse API for queued runs, to see if any other pipeline runs were triggered (and queued) during this delay.
  • The QueuedRunCount variable is then updated again with this fresh count via "Update queued run count again."
  1. Continue or Cancel Execution:
  • The final "If any queued runs then cancel execution" IfCondition activity makes the decision based on the latest QueuedRunCount.
  • If QueuedRunCount is greater than zero:
    • The "Cancel execution" SetVariable activity sets the pipeline return variable ContinueExecution to false. This signals to the parent pipeline that it should not proceed, effectively stopping further queuing.
  • If QueuedRunCount is zero:
    • The "Continue execution" SetVariable activity sets the pipeline return variable ContinueExecution to true. This indicates that the calling pipeline can safely continue its execution, as there are no other instances currently waiting.

Considerations

The pattern described in this post allows you to decouple event-driven triggering from the actual workload processing. The DelayInSeconds parameter is key to defining your batching window. Experiment with this value to find the right balance between responsiveness and batching efficiency for your specific use case.

Remember, the "Batched Trigger Orchestrator" pipeline orchestrates the running of the parent pipeline, which is where your actual data transformation and workload logic would reside, only executing based on the instruction from the orchestrator.

Full code samples and pipeline template can be found at: https://github.com/endjin/data-pipeline-patterns

FAQs

How can I prevent multiple Azure Synapse pipelines from running simultaneously when events trigger in rapid succession? You can implement a batched trigger orchestrator pattern that queues triggered pipeline runs and only allows the most recent one to execute after a configurable delay window. This prevents resource contention and unnecessary parallel processing.
What's the best way to handle high-frequency file arrival events in Azure Synapse pipelines without overwhelming the system? Instead of processing each file arrival immediately, you can use a batching pattern that collects multiple triggered events within a time window and processes them as a single batch. This approach reduces costs, prevents race conditions, and improves overall system performance.
How do I avoid pipeline cascades and long processing delays when multiple events trigger the same Azure Synapse pipeline quickly? Implement a batched trigger orchestrator that uses the Azure Synapse API to check for queued pipeline runs and intelligently decides which runs should proceed versus which should be cancelled. This ensures only one optimized run processes all accumulated events rather than creating a backlog of redundant pipeline executions.

James Broome

Director of Engineering

James Broome

James has spent 20+ years delivering high quality software solutions addressing global business problems, with teams and clients across 3 continents. As Director of Engineering at endjin, he leads the team in providing technology strategy, data insights and engineering support to organisations of all sizes - from disruptive B2C start-ups, to global financial institutions. He's responsible for the success of our customer-facing project delivery, as well as the capability and growth of our delivery team.