Skip to content
Charlotte Gayton By Charlotte Gayton Apprentice Engineer I
Working with JSON in Pyspark

TL;DR Having a document based format such as JSON may require a few extra steps to pivoting into tabular format. This blog talks through how using explode() in PySpark can help to transform JSON data into a PySpark DataFrame which takes advantage of Spark clusters to increase processing speeds whilst managing your nested properties.

As part of rolling out the adaptation of OpenChain across endjin's code base we have been generating SBOM data which is formatted in JSON, along with metadata about CodeOps processes. We loaded the data into an endjin synapse Azure Data Lake Store (Gen2), we then use a Azure Synapse Notebook to transform the data, and the notebook is then hosted in an Azure Synapse Pipeline in order to productionise the process. We were working with deeply nested JSON, meaning it wasn't a case of a simple conversion. I explain the steps we took in this blog.

What is PySpark?

PySpark is the Python API for Apache Spark, it applies SQL-like analysis on large sets of data. This data is run using a cluster which is a group of connected computing nodes which work together as a whole, which are partitioned to be run in parallel. This makes processing times much faster for large sets of data because of parallel processing meaning multiple processors are working simultaneously.

Example Test Data

Here I created a test C# project and imported Newtonsoft.Json as an example package.

Then using Covenant to generate an SBOM in a custom JSON format rather than the standard CycloneDX/SPDX, it generates the following output:

{
  "Name": "PySparkExample",
  "Version": "0.0.0",
  "ToolVendor": "Covenant",
  "ToolVersion": "0.12.0+ec84391de1b7c13b5a4e39bfed49db7f199b0d86",
  "Components": [
    {
      "Data": "1.0.0",
      "UUID": "fd90c2b1-4450-4c72-b3f2-f395ba161749",
      "Purl": "pkg:covenant/dotnet/PySparkExample@1.0.0",
      "Name": "PySparkExample",
      "Version": "1.0.0",
      "Groups": [
        "PySparkExample.sln"
      ],
      "IsRoot": true
    },
    {
      "Data": "13.0.2",
      "UUID": "e2701915-6fcc-4a4c-aa5e-f068b43bebe4",
      "Purl": "pkg:nuget/Newtonsoft.Json@13.0.2",
      "Name": "Newtonsoft.Json",
      "Version": "13.0.2",
      "Kind": "Library",
      "Copyright": "Copyright © James Newton-King 2008",
      "Hash": {
        "Algorithm": "SHA512",
        "Content": "D743AE673BAC17FDBF53C05983DBA2FFDB99D7E6AF8CF5FE008D57AA30B6C6CA615D672C4140EEC516E529EB6AD5ACF29C20B5CC059C86F98C80865652ACDDE1"
      },
      "License": {
        "Id": "MIT",
        "Name": "MIT License",
        "Url": "https://licenses.nuget.org/MIT"
      }
    }
  ]
}

Importing the data into PySpark

Firstly we have to import the packages we will be using:

from pyspark.sql.functions import *

I import my data into the notebook using PySparks spark.read

df = spark.read.load(
    '[PATH_TO_FILE]', 
    format='json',
    multiLine=True, 
    schema=None)

df is a PySpark DataFrame, it is equivalent to a relational table in SQL. To view the DataFrame you have to use display() otherwise if you try to print the table just df you get a schema of the table:

DataFrame[Components: array<struct<Copyright:string,Data:string,Groups:array<string>,Hash:struct<Algorithm:string,Content:string>,IsRoot:boolean,Kind:string,License:struct<Id:string,Name:string,Url:string>,Name:string,Purl:string,UUID:string,Version:string>>, Name: string, ToolVendor: string, ToolVersion: string, Version: string]
The best hour you can spend to refine your own data strategy and leverage the latest capabilities on Azure to accelerate your road map.

Which then results in this when displaying using display(df):

Table of initial results

As you can see, in the component column all the data is in one column. This is because the data is being stored as an array so will need a few more steps to pull it out of this format.

Then we need to select out the metadata about the SBOM run, rather than actual information about the components.

Metadata = df.select(
     col('Name'),
     col('ToolVendor'),
     col('ToolVersion'),
     col('Version')
 )

The picture below shows the new table metadata after we have selected the columns we need from the original 'df' dataframe.

Name ToolVendor ToolVersion Version
PySparkExample Covenant 0.12.0+ec84391de1b7c13b5a4e39bfed49db7f199b0d86 0.0.0

Now we want to work on getting the components data out of the original df table. We take the same approach with the select as we did with the metadata.

First we want to take the column 'Components' and because all the data is stored in an array, we explode the values. Explode creates a new row for each element in the array of results. Because we only had 2 separate components we are left with 2 rows:

Exploded Table

You can see within each row are individual key-value pairs and more embedded JSON. Because our data doesn't follow a set schema, is different for each component, and not each one needs exploding, we can just select out each part of our information again using select.

ExpandedComponents = Components.select(
    col('col').getItem('Name').alias('Name'),
    col('col').getItem('Purl').alias('Purl'),
    col('col').getItem('UUID').alias('UUID'),
    col('col').getItem('License').getItem('Id').alias('License_ID'),
    col('col').getItem('License').getItem('Name').alias('License_Name'),
    col('col').getItem('License').getItem('Url').alias('License_Url'),
    col('col').getItem('Version').alias('Version'),
    col('col').getItem('isRoot').alias('isRoot'),
    col('col').getItem('Data').alias('Data'),
    col('col').getItem('Kind').alias('Kind'),
    col('col').getItem('Hash').getItem('Algorithm').alias('Hash_Algorithm'),
    col('col').getItem('Hash').getItem('Content').alias('Hash_Content'),
    col('col').getItem('Copyright').alias('Copyright'),
)

Above you can see we used 'col' to get the column name, currently all the data is only in that one column from Components called 'col'. To get each item within the column we can use getItem() using the name of the key, and then following getItem() we can use alias() which renames the column. The reason we don't want to use a loop to do this because if we did we'd lose the advantage of being about to process this data quickly using partitioning of clusters. By using explode, and these getItem() functions, when the notebook runs, we can get really fast processing speeds.

Name Purl UUID License_ID License_Name License_Url Version isRoot Data Kind Hash_Algorithm Hash_Content Copyright
PySparkExample pkg:covenant/dotnet/PySparkExample@1.0.0 fd90c2b1-4450-4c72-b3f2-f395ba161749 1.0.0 true 1.0.0
Newtonsoft.Json pkg:nuget/Newtonsoft.Json@13.0.2 e2701915-6fcc-4a4c-aa5e-f068b43bebe4 MIT MIT License https://licenses.nuget.org/MIT 13.0.2 13.0.2 Library SHA512 D743AE673BAC17FDBF53C05983DBA2FFDB99D7E6AF8CF5FE008D57AA30B6C6CA615D672C4140EEC516E529EB6AD5ACF29C20B5CC059C86F98C80865652ACDDE1 Copyright © James Newton-King 2008

As you can see now, all of our data has been expanded out from it's nested form meaning it's much easier to be analysed. From here you can save it back to the datalake, as a parquet file, or can be passed into a pipeline as the next stage of a process.

To save to parquet:

ExpandedComponents.write.parquet(
    "[PATH_TO_FILE]", 
    mode="overwrite")
Azure Weekly is a summary of the week's top Microsoft Azure news from AI to Availability Zones. Keep on top of all the latest Azure developments!

You can partition by name, which means it will split each unique name into a new folder. Partitioning allows you to increase performance whilst minimising query costs. In this example it isn't that useful to do that because we only have a small dataset, but where you have a bigger dataset, this could be useful for splitting up your data.

Conclusion

When using PySpark you want to be careful when writing your code to be aware of whether you are making the most out of the spark clusters, and the partitioning of processing. Working with JSON in Python can be tricky when you have deeply nested values, but using the right tools can make it a lot easier.

Charlotte Gayton

Apprentice Engineer I

Charlotte Gayton

Charlotte was seconded to endjin from September 2022 until August 2023 for her Industrial Placement year, as part of her BEng in Computer Science at the University of York.

She was part of the summer 2021 internship cohort, and worked on creating a synthetic customer data generation tool to create statistically accurate "fake" data which could be used to create large volumes of realistic data for testing scenarios for endjin's customers.

During her year-long Industrial Placement at endjin, Charlotte focused on the adoption and roll out of the OpenChain ISO 5230 international standard for open source compliance across endjin's open source estate.

For her final year project at University, Charlotte will build upon the work of her Industrial Placement and implement OpenChain ISO/IEC DIS 18974 - the industry standard for open source security assurance programs.