There's something wrong with the Pandas API on Spark
So, over the past week or so I have been conducting a bit of an investigation into Synapse pipeline errors that were arising as a result of a serialization exception, due to going over memory limits when converting between the world of Spark and the world of pandas.
In order to explain this, I need to define the three different APIs that we are currently using in spark notebooks:
- Pandas (Let's call this PD)
- Pandas for Spark API (Let's call this PS)
- And, plain old Spark
And, for context, the notebook is question was:
- Reading something in from CSV via PD
- Using pandas for schema validation
- Converting from PD to PS via
ps.from_pandas()
- Reducing the number of columns and some other data processing
- Converting back to PD via
ps.to_pandas()
The errors occurred when doing the final conversion back to PD. And, annoyingly, only happening in the pipeline as opposed to running the notebook manually.
And, the error in question looks like this:
Job aborted due to stage failure: Serialized task 8:0 was 482765623 bytes,
which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes).
Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
Hypothesis
The hypothesis was that the reducing of the columns in step 4 wasn't working as expected.
The reason for this is that once the columns have been reduced the data shouldn't exceed the memory limit specified in the to_pandas()
error - we roughly worked out how much data would be there at this point and it shouldn't have been anywhere near those quantities.
Experimental Approach
In order to investigate these errors and isolate the issue, I needed to:
- Reproduce the error in a controlled environment
- Prove / disprove the hypothesis
- Produce possible solutions for fixing the issue
Reproducing the error
The first step in the experimental process is to reproduce the thing you are investigating in a more controlled environment. This will allow you to better isolate and test various parts of a system.
To do this, I created a large randomised dataset (of mixed types):
row = 30000
cols = 2000
Random = np.random.randint(low=0, high=100, size=(row, cols))
random_df = pd.DataFrame(Random)
cols_str = 2000
Random_Str = np.random.randint(low=0, high=100, size=(row, cols2))
random_str_df = pd.DataFrame(Random2).astype(str)
combined_df = pd.merge(random_df, random2_df, left_index=True, right_index=True)
I found that I to make my dataset bigger than the original one (increasing the number of rows to 60000 and the total number of columns to 8000). I likely could have reproduced the error with a dataset in between these sizes - but this wasn't the focus of the investigation so I didn't experiment further to find the exact point at which the errors were produced.
With differences in types etc, I think the fact I had to increase the size probably just means that the original is close to the limit. This is also supported by the fact that it was originally failing in the pipeline, but not in the notebook, because once I made my dataset bigger I started seeing the same error when running the notebook manually.
However, at this point the feedback loop for the experimentation was pretty painful. There was one step in the process that was taking almost an hour:
The conversion from PD to PS using ps.from_pandas()
It wasn't obvious why this was taking so long, as the data was already in memory, and it was taking almost 60x as long as it did to originally read the data out of CSV. I started experimenting with different methods of doing this conversion to PS, and I made the first significant discovery:
When converting to PS using ps.from_pandas()
, the conversion took over an hour. However, if we instead used spark.createDataFrame()
and then used df.pandas_api()
to expose the pandas API on Spark, it took less than 30s.
So, if you are converting from PD to PS, especially with large datasets I would strongly advise using this approach.
Now, this wasn't necessarily related to the cause of the error (though as we'll see it probably is also indicative of there being an underlying issue with the pandas API on Spark). But, it meant that I could continue with the experiment using fast feedback loops and get to the root of the problem much faster.
Proving / Disproving the Hypothesis
Once I could reproduce the error reasonably quickly, I started experimenting to see if I could prove that there was indeed more data than there should be, once the columns had been reduced.
The first thing I tried was to write the data back out to CSV once the columns had been reduced, to see how big the CSV was. This would allow us to see how much data we were trying to process at this point, compared to how much we started with.
I first tried to write it back out using PS, after the columns had been reduced, but before trying to convert back to PD (where the error was happening). However this produced the same serialization error. It might at first glance indicate that there is too much data at this point. However, I wasn't sure if this was actually true, or if the PS API actually called ps.to_pandas()
when writing out a CSV and therefore the error was just being produced in exactly the same way.
So, I then tried writing out to CSV using Spark:
reduced_columns_spark.repartition(1).write.mode('overwrite').csv("/reduced_columns_new.csv", sep=',')
And this also produced the same error. My hypothesis being because it collects all of the data into one node as part of .repartition(1)
, so again exceeds the memory limit.
But, the fact that this is happening in normal Spark, and completely outside the world of pandas, clearly shows that there actually is too much data at this point. Which indeed indicates that the column reduction hasn't worked as expected.
The final thing that I needed to do to prove that this was the case was reduce the columns in a different way, and show that this fixed the error (and therefore that if the columns were reduced as expected, there definitely wasn't too much data for the system).
So, the next part of the experiment was:
- Read in the big data via Spark
- Reduce the columns via Spark
- Write the data back out via Spark
- Repeat the processing with the reduced data:
- Read it in via PD
- Convert to PS
- Convert back to PD
And, this worked. With this, I had successfully proved the hypothesis, by showing that if the columns were reduced in Spark then the data was small enough to complete the rest of the processing. But, when reduced in the pandas API for Spark, this wasn't the case.
Solutions
So, the first solution to the issue would be to complete the processing as above - by reducing the columns in Spark before then continuing with the pandas processing. However, this would be reasonably disruptive as it would involve large-scale change in how we were reading in the data.
So, I also tried:
- Read in via PD
- Reduce columns via PD
- Convert to PS
- Convert back to PD
And that also works. And was therefore my recommendation for how to fix our original notebook without any large scale restructuring.
Conclusion
Overall, I would say that the takeaways are:
We should not be trying to convert from spark to pandas (using any API) with any large datasets, or any datasets that could in theory get large, because it will run into these errors
Proposed fix: Do any data reduction prior to conversion.
And,
Something isn't quite right with the PS API and I'm not sure we should really be using it at all
This is true in two places we have observed here:
- The conversion between PD and PS is much slower than it should be
Proposed fix: Use spark.createDataframe()
and then df.pandas_api()
if you do need to expose the PS API
- When reducing the number of columns using PS the amount of data doesn't seem to actually be reduced
Proposed fix: Do column reduction using either Spark or PD
More investigation is needed to fully understand what is happening under the covers. But, the above is useful in understanding how Spark and pandas interact, and highlights some of the things we should be aware of when processing data in notebooks via these APIs.