Skip to content
Ian Griffiths By Ian Griffiths Technical Fellow I
Spark dev containers: writing tests

In this third post in my series on improving feedback loops for Spark development I'll show how we enable repeatable, local automated testing of Spark code that will ultimately run in a hosted environment such as Microsoft Fabric, Databricks, or Azure Synapse.

The gap

The normal way to use Spark from Python in a hosted system such as Databricks or Microsoft Fabric is via a notebook. When you create a new Python notebook in these systems, they make a spark variable available that you can use to access the Spark cluster that these environments supply. But they typically don't provide an obvious way to write and run automated tests.

When writing Python locally in a development environment, we can use a test framework such as behave to write tests. If we're using VS Code, it offers testing functionality that lets us run tests and view the results (or you can just run tests from the command line like in the olden days). This requires the code under test to be in .py files on our local machines.

But you can't just load up a .py file into a notebook. These are two different ways of writing Python code. If I write a local .py file so I can test it locally, how am I then going to run the same code in a hosted environment such as Fabric or Synapse, which expects my Python code to live in a notebook?

In fact there are a couple of ways we can deploy .py files to a hosted environment. The approach I'm going to show in this series is to put the code into a .whl package. We can then load that into Synapse, Databricks, or Fabric, after which you can invoke the code it contains from a notebook. (Another option is to create a Spark Job Definition. This can be more complex to set up, but it can be faster. That's because it avoids the package deployment step, described in the next blog, which can be quite slow.)

So our hosted notebooks do not contain any critical logic. They just invoke methods in these packages. During development, we run those same .py files locally, typically talking to a local Spark instance as discussed in an earlier post.

In this post, I'll show how we go about creating and running the tests locally, and in the next post I'll show how to deploy this tested code to a hosted environment.

A local Spark test

The simple example I'm about to walk through uses the Spark devcontainer set up last time as the starting point. (Not only will you need the devcontainer.json and Dockerfile, you will also need to have run the pip install pyspark==3.5.0 command.)

First, here's the code I will be testing—this is the actual application logic, and it will go into a heights.py file. For now I'll put this in my workspace's root folder, although when I get to package creation in this next blog in this series, I'll add some more directory structure, and this will move. But for now, putting this in the root folder will work:

from pyspark.sql import DataFrame, functions as F

def get_average_height(df:DataFrame):
    return df.agg(F.mean('HeightInCm')).collect()[0][0]

def get_max_height(df:DataFrame):
    # Note the deliberate error
    return df.agg(F.mean('HeightInCm')).collect()[0][0]

These methods perform some very simple analysis over a table containing height information. The crucial feature is that this code gets Spark to do all the work. (I've also left a mistake in there, so I can check that the tests I'm about to write do actually work.)

I'll need a testing framework. At endjin, we are fans of BDD, so I'll install the behave package, which supports this style of testing:

pip install behave

There's a Behave VSC extension for VS code that we find useful for writing and running BDD tests, so I've also installed that. (Really, I should be updating my .devcontainer/devcontainer.json file so that this gets installed automatically, but that's out of scope for this post.)

By default, the Behave VSC extension expects us to put tests in a folder called features. So I'll create that and then create a file called features/heights.feature with this content:

Feature: statistical analysis of heights

    Background: Load height data
        Given I have loaded height data from the local CSV file 'data.csv'
    
    Scenario: mean height
        When I calculate the mean height
        Then the calculated height is 106.0cm

    Scenario: maximum height
        When I calculate the maximum height
        Then the calculated height is 183.0cm

In case you've not seen it before, this is a Gherkin file. Barry's blog describes some of the benefits in detail, but here, the most important point is that this .feature file describes the tests I'd like to perform.

The lines beginning with Given, When, and Then are steps, and the Behave test framework expects us to supply code that implements these steps. Conventionally these go in a folder called steps, which I'll create under features. I can then supply an implementation for these steps with this features/steps/height_steps.py file:

from behave import given, when, then
from behave.runner import Context
from heights import get_average_height, get_max_height

@given(
    "I have loaded height data from the local CSV file '{csv_file}'"
)
def setup_loaded_height_csv(context: Context, csv_file: str):
    context.height_df = context.spark.read.option("header", True).option("inferSchema", True).csv(csv_file)

@when("I calculate the mean height")
def calculate_mean_height(context: Context):
    context.calculated_height = get_average_height(context.height_df)

@when("I calculate the maximum height")
def calculate_mean_height(context: Context):
    context.calculated_height = get_max_height(context.height_df)

@then("the calculated height is {height_cm}cm")
def the_calculated_height_is(context: Context, height_cm: str):
    assert context.calculated_height == float(height_cm)

Note that each of the methods here has been annotated with either @given, @when, or @then, which is how Behave knows which method to execute for each step.

The setup_loaded_height_csv method here loads data from a CSV file—it is invoked by this part of the .feature file:

    Background: Load height data
        Given I have loaded height data from the local CSV file 'data.csv'

Because I've described this as Background, it gets execute before each Scenario. In other words, both of the tests that this particular .feature file defines will be begin by executing this step that loads the data.csv file.

The data.csv file it refers to is in my root project folder, and it looks like this:

Id,Name,Hobby,HeightInCm
1,Ian,Lindy Hop,183.0
2,Buzz,Falling with Style,29.0

Notice that all of the methods in my height_steps.py file refer to context.spark. Behave supplies this context object. It passes it to each step, enabling a step to put things in the context that can be used by subsequent steps. The step that loads data puts it in context.height_df. The steps that invoke the actual methods under test retrieve that context.height_df, and then store their results in context.calculated_height, which in turn is inspected by the final @when step.

But where did that context.spark object used by my initial @given step come from? Behave doesn't know anything about Spark, so it won't put that there. I need to add some code to populate that with a Spark Context.

Inside my features folder I can create a file called environment.py, which gives me a place to put code that can set up my test environment. Here's my features/environment.py:

from behave.runner import Context
from pyspark.sql import SparkSession

def before_feature(context: Context, feature):
    context.spark = SparkSession.builder.getOrCreate()

def after_feature(context, feature):
    context.spark.stop()

That before_feature method name is special: Behave looks for a method with this name, and if it finds it, it will execute it before running the steps in each .feature file. Likewise, after_feature gives us a chance clean up after all scenarios in the feature file have been executed. In this case I shut down the Spark context, because you're not meant to create multiple contexts in a single process. (Behave also recognizes before/after_scenario and before/after_all so you can run code once for each scenario, and once for the entire test run.)

I now have all that I require. Because I installed the Behave VSC extension, VS code will be showing this flask icon on the column of buttons at the left of the window to indicate that there are tests:

The flask icon in VS Code, providing access to the Testing panel

If I click that, the Testing panel shows me my two test scenarios:

VS Code's Testing panel, showing tests as a hierarchical view. At the root is a single item, 'statistical analysis of heights', and underneath that are 'mean height' and 'maximum height'.

At the top, the third button (which looks like a double 'play' icon) will run all the tests. I had already done that by the time I took this screenshot, which is why you can see that the first test succeeded (it has a green tick by it) but the second failed (shown by a red cross). If I click on the failing test, it goes to the corresponding Scenario in my .feature file, and shows the details of the failure:

The 'maximum height' scenario in the feature file. VS Code has added an 'Assertion error' box beneath this line showing a Python traceback. The traceback ends with the failing assert statement highlighted.

We can see the Python assert statement that has failed. In an ideal world I'd be able to double click that to be taken to the source file in question, but that doesn't happen with the current version of Behave VSC. It has told me that the failure is at line 19 of features/steps/height_steps.py.

I expected this error. It's because of the deliberate mistake I made in the code under test, repeated here:

def get_max_height(df:DataFrame):
    # Note the deliberate error
    return df.agg(F.mean('HeightInCm')).collect()[0][0]

I can fix this by using the correct Spark function in this aggregation (max instead of mean):

def get_max_height(df:DataFrame):
    return df.agg(F.max('HeightInCm')).collect()[0][0]

When I run the tests again after this change, they all pass.

Wait, who started Spark?

I discussed this briefly last time, but it's worth re-iterating, because access to a Spark instance from tests is very much the point here, and yet it's not completely obvious how the Spark instance starts up.

It turns out that when you create a new PySpark context with SparkSession.builder.getOrCreate(), if you don't tell it exactly how to connect to Spark it choose 'local' mode, meaning it will actually launch a local Spark instance for you.

This works, although it isn't always the fastest way to run tests. My colleague Liam and I have recently been systematically investigating all of the different ways of creating Spark contexts for testing, to work out how to optimize the dev inner loop. We will be reporting our findings at some point in the near future.

Conclusion

I now have a test suite that checks the behaviour of code that relies on Spark to implement its logic. I'm able to run this locally, meaning I don't need to get access to a Spark cluster just to run my tests.

Next time, I'll show how to package up the application logic code (the get_average_height and get_max_height methods) so that we can take the code we just tested locally and run the exact same code up in our hosted Spark environment.

Ian Griffiths

Technical Fellow I

Ian Griffiths

Ian has worked in various aspects of computing, including computer networking, embedded real-time systems, broadcast television systems, medical imaging, and all forms of cloud computing. Ian is a Technical Fellow at endjin, and 17 times Microsoft MVP in Developer Technologies. He is the author of O'Reilly's Programming C# 12.0, and has written Pluralsight courses on WPF fundamentals (WPF advanced topics WPF v4) and the TPL. He's a maintainer of Reactive Extensions for .NET, Reaqtor, and endjin's 50+ open source projects. Ian has given over 20 talks while at endjin. Technology brings him joy.