Introducing End-to-End Interpolation of Time Series Data in Apache PySpark

Introducing End-to-End Interpolation of Time Series Data in Apache PySparkJessica WalkenhorstBlockedUnblockFollowFollowingJun 22Photo by Steve Halama on UnsplashAnyone working with data knows that real-world data is often patchy and cleaning it takes up a considerable amount of your time (80/20 rule anyone?).

Having recently moved from Pandas to Pyspark, I was used to the conveniences that Pandas offers and that Pyspark sometimes lacks due to its distributed nature.

One of the features I have been particularly missing is a straight-forward way of interpolating (or in-filling) time series data.

While the problem of in-filling missing values has been covered a few times (e.

g.

[here]), I was not able to find a source, which detailed the end-to-end process of generating the underlying time-grid and then subsequently filling in the missing values.

This post tries to close this gap.

Starting from a time-series with missing entries, I will show how we can leverage PySpark to first generate the missing time-stamps and then fill in the missing values using three different interpolation methods (forward filling, backward filling and interpolation).

This is demonstrated using the example of sensor read data collected in a set of houses.

The full code for this post can be found [here in my github].

Preparing the Data and Visualizing the ProblemTo demonstrate the procedure, first, we generate some test data.

The data set contains data for two houses and uses a sin() and a cos() function to generate some sensor read data for a set of dates.

To generate the missing values, we randomly drop half of the entries.

Creating the sensor data test setRaw data setThe following graph shows the data with the missing values clearly visible.

Read Data with Missing EntriesTo parallelize the data set, we convert the Pandas data frame into a Spark data frame.

Note, that we need to divide the datetime by 10⁹ since the unit of time is different for pandas datetime and spark.

We also add the column ‘readtime_existent’ to keep track of which values are missing.

import pyspark.

sql.

functions as funcfrom pyspark.

sql.

functions import coldf = spark.

createDataFrame(df0)df = df.

withColumn("readtime", col('readtime')/1e9) .

withColumn("readtime_existent", col("readtime"))Now our dataframe looks like this:Read Data prepared for interpolation with PySparkInterpolationResampling the Read DatetimeThe first step is to resample the readtime data.

If we were working with Pandas, this would be straight forward, we could just use the resample() method.

However, Spark works on distributed datasets and therefore does not provide an equivalent method.

Obtaining the same functionality in PySpark requires a three-step process.

In the first step, we group the data by ‘house’ and generate an array containing an equally spaced time grid for each house.

In the second step, we create one row for each element of the arrays by using the spark SQL function explode().

In the third step, the resulting structure is used as a basis to which the existing read value information is joined using an outer left join.

The following code shows how this can be done.

Note, that here we are using a spark user-defined function (if you want to learn more about how to create UDFs, you can take a look here.

Starting from Spark 2.

3, Spark provides a pandas udf, which leverages the performance of Apache Arrow to distribute calculations.

If you use Spark 2.

3, I would recommend looking into this instead of using the (badly performant) in-build udfs.

An extract of the resulting table looks like this:As one can see, a null in the readtime_existent column indicates a missing read value.

Forward-filling and Backward-filling Using Window FunctionsWhen using a forward-fill, we infill the missing data with the latest known value.

In contrast, when using a backwards-fill, we infill the data with the next known value.

This can be achieved using an SQL window function in combination with last() and first().

To make sure that we don’t infill the missing values with another missing value, use the ignorenulls=True argument.

We also need to make sure that we set the correct window ranges.

For the forward-fill, we restrict the window to values between minus infinity and now (we only look into the past, not into the future), for the backward-fill we restrict the window to values between now and infinity (we only look into the future, we don’t look into the past).

The code below shows how to implement this.

Note, that if we want to use interpolation instead of forward or backward fill, we need to know the time difference between the previous, existing and the next, existing read value.

Therefore, we need to keep the readtime_existent column.

from pyspark.

sql import Windowimport syswindow_ff = Window.

partitionBy('house') .

orderBy('readtime') .

rowsBetween(-sys.

maxsize, 0) window_bf = Window.

partitionBy('house') .

orderBy('readtime') .

rowsBetween(0, sys.

maxsize) # create series containing the filled valuesread_last = func.

last(df_all_dates['readvalue'], ignorenulls=True) .

over(window_ff)readtime_last = func.

last(df_all_dates['readtime_existent'], ignorenulls=True) .

over(window_ff)read_next = func.

first(df_all_dates['readvalue'], ignorenulls=True) .

over(window_bf)readtime_next = func.

first(df_all_dates['readtime_existent'], ignorenulls=True) .

over(window_bf)# add columns to the dataframedf_filled = df_all_dates.

withColumn('readvalue_ff', read_last) .

withColumn('readtime_ff', readtime_last) .

withColumn('readvalue_bf', read_next) .

withColumn('readtime_bf', readtime_next)InterpolationIn the final step, we use the forward filled and backwards filled data to interpolate the read datetimes and the read values using a simple spline.

This can again be done using a user-defined function.

This leaves us with a single dataframe containing all of the interpolation methods.

This is how its structure looks like:Interpolated read dataVisualizationFinally, we can visualize the results to observe the differences between the interpolation techniques.

The opaque dots show the interpolated values.

Original data (dark) and interpolated data (light), interpolated using (top) forward filling, (middle) backward filling and (bottom) interpolation.

We can clearly see how in the top figure, the gaps have been filled with the last known value, in the middle figure, the gaps have been filled with the next value to come and in the bottom figure, the difference has been interpolated.

Summary and ConclusionIn this post, we have seen how we can use PySpark to perform end-to-end interpolation of time series data.

We have demonstrated, how we can use resample time series data and how we can use the Window function in combination with the first() and last() function to fill-in the generated missing values.

We have then seen, how we can use a user-defined function to perform a simple spline-interpolation.

I hope this post helps to plug the gap of literature about end-to-end time series interpolation in PySpark.

Originally published at https://walkenho.

github.

io.

.

. More details

Leave a Reply