Understanding Customer Churning with Big Data Analytics

I have come up with the below 7 features,Representative User InteractionsWe would reasonably expect some of the other user interactions will eventually lead to our user churning.

We could use a box plot to perform the first level filtering.

Box plots will effectively help us visualize the minimum, 25th percentile, mean, 75th percentile and maximum for particular data distribution.

By plotting box plots for both churned and non-churned users, we could clearly interpret the difference between the two types of users for a particular interaction.

The following interactions showed significant distribution differences between the two groups,Add Friends — Churned users are less likely to add friendsAdd to Playlist — Churned users are less likely to add to playlistsUpgrade — Churned users have a wide range of upgrade activitiesNextSong — Churned users are less likely to play next songThumbsUp — Churned users are less likely to press thumbs upRoll Advert — Churned users have a wider spread on roll advertSettings — Churned users less likely to visit the settings pageLog out — Churned users less likely to log out (due to fewer logins)Help — Non-churned users more likely to ask for helpHome — Churned users less likely to visit the home pageNotice that all of our user interactions are in the same column, we will need to pivot and aggregate the total number of a certain interaction for each customer.

Based on the above filtering, we will drop some of the less significant interactions.

events = events.

drop('firstName', 'lastName', 'auth', 'gender', 'song','artist', 'status', 'method', 'location', 'registration', 'itemInSession')events_pivot = (events.

groupby(["userId"]) .

pivot("page") .

count() .

fillna(0))events_pivot = events_pivot.

drop('About', 'Cancel', 'Login', 'Submit Registration','Register', 'Save Settings')2.

Average Music Play TimeFor myself, I would probably be using it way shorter than normal users.

Therefore, the average length of time the user spent playing music will be a really important factor.

A simple visualization showed confirming results on the left.

We will add this feature to our events_pivot table,# filter events log to contain only next songevents_songs = events.

filter(events.

page == 'NextSong')# Total songs length playedtotal_length = (events_songs.

groupby(events_songs.

userId) .

agg(sum('length')))# join events pivotevents_pivot = (events_pivot.

join(total_length, on = 'userId', how = 'left') .

withColumnRenamed("Cancellation Confirmation", "Churn") .

withColumnRenamed("sum(length)", "total_length"))3.

Number of active daysWe would also expect a difference in the number of active days between the churned/non-churned groups.

Since the datetime column only contains units in seconds, we will need to use a window function to aggregate the total active time for each customer and convert the value to days.

We will add this feature to events_pivot.

convert = 1000*60*60*24 # conversion factor to days# Find minimum/maximum time stamp of each usermin_timestmp = events.

select(["userId", "ts"]) .

groupby("userId") .

min("ts")max_timestmp = events.

select(["userId", "ts"]) .

groupby("userId") .

max("ts")# Find days active of each userdaysActive = min_timestmp.

join(max_timestmp, on="userId")daysActive = (daysActive.

withColumn("days_active", (col("max(ts)")-col("min(ts)")) / convert))daysActive = daysActive.

select(["userId", "days_active"])# join events pivotevents_pivot = events_pivot.

join(daysActive, on = 'userId', how = 'left')4.

Number of days as paid usersSimilarly, we could also calculate the number of days as paid users by using a window function, we only need to add a filter for the customer to be a paid customer user.

# Find minimum/maximum time stamp of each user as paid userpaid_min_ts = events.

filter(events.

level == 'paid') .

groupby("userId").

min("ts")paid_max_ts = events.

filter(events.

level == 'paid') .

groupby("userId").

max("ts")# Find days as paid user of each userdaysPaid = paid_min_ts.

join(paid_max_ts, on="userId")daysPaid = (daysPaid.

withColumn("days_paid", (col("max(ts)")-col("min(ts)")) / convert))daysPaid = daysPaid.

select(["userId", "days_paid"])# join events pivotevents_pivot = events_pivot.

join(daysPaid, on = 'userId', how='left')5.

Number of days as a free userNow using the free user filter, we can find the number of days as a free user for each customer,# Find minimum/maximum time stamp of each user as paid userfree_min_ts = events.

filter(events.

level == 'free') .

groupby("userId").

min("ts")free_max_ts = events.

filter(events.

level == 'free') .

groupby("userId").

max("ts")# Find days as paid user of each userdaysFree = free_min_ts.

join(free_max_ts, on="userId")daysFree = (daysFree.

withColumn("days_free", (col("max(ts)")-col("min(ts)")) / convert))daysFree = daysFree.

select(["userId", "days_free"])# join events pivotevents_pivot = events_pivot.

join(daysFree, on = 'userId', how='left')6.

Number of sessionsThe number of music playing sessions could also be a contributing factor.

Since the sessionId is available in this dataset, we could directly count the number of unique ids for each user with the groupby clause.

# count the number of sessionsnumSessions = (events.

select(["userId", "sessionId"]) .

distinct() .

groupby("userId") .

count() .

withColumnRenamed("count", "num_sessions"))# join events pivotevents_pivot = events_pivot.

join(numSessions, on = 'userId', how = 'left')7.

User access agentThe streaming service might have disparate performances on different user agents.

We will try to add this factor to the model.

Since there are 56 different user agents, we will use Spark’s one-hot encoder to turn these different user agents into a vector.

# find user access agents, and perform one-hot encoding on the user userAgents = events.

select(['userId', 'userAgent']).

distinct()userAgents = userAgents.

fillna('Unknown')# build string indexerstringIndexer = StringIndexer(inputCol="userAgent", outputCol="userAgentIndex")model = stringIndexer.

fit(userAgents)userAgents = model.

transform(userAgents)# one hot encode userAgent columnencoder = OneHotEncoder(inputCol="userAgentIndex", outputCol="userAgentVec")userAgents = encoder.

transform(userAgents) .

select(['userId', 'userAgentVec'])# join events pivotevents_pivot = events_pivot.

join(userAgents, on = 'userId', how ='left')Model BuildingAfter we have engineered the appropriate features, we will build three models — logistic regression, random forest and gradient boosting trees.

To avoid writing redundant code, we will build stage objects and construct pipelines with a different classifier at the end of the pipeline.

# Split data into train and test setevents_pivot = events_pivot.

withColumnRenamed('Churn', 'label')training, test = events_pivot.

randomSplit([0.

8, 0.

2])# Create vector from feature datafeature_names = events_pivot.

drop('label', 'userId').

schema.

namesvec_asembler = VectorAssembler(inputCols = feature_names, outputCol = "Features")# Scale each columnscalar = MinMaxScaler(inputCol="Features", outputCol="ScaledFeatures")# Build classifiersrf = RandomForestClassifier(featuresCol="ScaledFeatures", labelCol="label", numTrees = 50, featureSubsetStrategy='sqrt')lr = LogisticRegression(featuresCol="ScaledFeatures", labelCol="label", maxIter=10, regParam=0.

01)gbt = GBTClassifier(featuresCol="ScaledFeatures", labelCol="label")# Consturct 3 pipelinespipeline_rf = Pipeline(stages=[vec_asembler, scalar, rf])pipeline_lr = Pipeline(stages=[vec_asembler, scalar, lr])pipeline_gbt = Pipeline(stages=[vec_asembler, scalar, gbt])# Fit the modelsrf_model = pipeline_rf.

fit(training)lr_model = pipeline_lr.

fit(training)gbt_model = pipeline_gbt.

fit(training)now the three objects rf_model, lr_model, gbt_model, represents the 3 different fitted models.

Model EvaluationWe will test the fitted models' performances and select the one that has the best performance as the final model.

We will start by building a function especially for this purpose,def modelEvaluations(model, metric, data): """ Evaluate a machine learning model's performance Input: model – pipeline object metric – the metric of the evaluations data – data being evaluated Output: [score, confusion matrix] """ # generate predictions evaluator = MulticlassClassificationEvaluator( metricName = metric) predictions = model.

transform(data) # calcualte score score = evaluator.

evaluate(predictions) confusion_matrix = (predictions.

groupby("label") .

pivot("prediction") .

count() .

toPandas()) return [score, confusion_matrix]We will call the above function to evaluate the above modelsf1_rf, conf_mtx_rf = modelEvaluations(rf_model, 'f1', test)f1_lr, conf_mtx_lr = modelEvaluations(lr_model, 'f1', test)f1_gbt, conf_mtx_gbt = modelEvaluations(gbt_model, 'f1', test)Gradient boosting model showed the best performance (F1 Score) in the test set.

Feature ImportanceWe will utilize the feature importance function and visualize the relative importance rank of each feature we built.

Due to the last feature userAgentVec is, in fact, a one-hot encoded vector, we will treat the userAgentVec feature as one.

The below code will sum up all the importance values for all the sub-features obtained from the one-hot encoded vector.

feature_importances = np.

array(gbt_model.

stages[-1] .

featureImportances)userAgentVec = feature_importances[len(feature_names) :].

sum()feature_importances = feature_importances[:len(feature_names)] + [userAgentVec]Now we plot the feature importance for gradient boosting trees.

Most of the features we constructed are quite important contributing factors for user churning, with days_active as the most important factor.

Full Dataset RunWe have constructed the appropriate framework — we are ready to follow the same steps as above to let the model run on the full 12 GB dataset using AWS’s EMR service.

We will initialize the session by the following way# Create spark sessionspark = (SparkSession .

builder .

appName("Sparkify") .

getOrCreate())# Read in full sparkify datasetevent_data = "s3n://dsnd-sparkify/sparkify_event_data.

json"events = spark.

read.

json(event_data)We will not repeat the steps again — I have attached the full script on the nbviewer site.

Eventually, the gradient boosting model produced an F1 score of 0.

8896, which is a great performance.

+—–+—-+—+|label| 0.

0|1.

0|+—–+—-+—+| 0|1612| 70|| 1| 163|344|+—–+—-+—+Business StrategyWhat a journey we have been through — but we have not finished our mission yet.

In the data science world, there is a business intention behind every model.

With the feature importance we produced, we could definitely come up with some business strategies to counter customer churns.

We will briefly discuss two possible strategies that will truly drive some values for our providers.

We know that the number of active days is the most significant factor, we could advise the upper management to construct a reward system to encourage low activity users to stay online for extended periods of time.

In addition, since the agents that users used to access the service is also quite significant, we could also find out the poor-performing agent and get our engineering team to work specifically to solve the issue.

Hope you have enjoyed this very long post.

If there is one thing we should take away from this piece — It comes back to the point we have mentioned at the beginning of the story.

Our model needs to solve business problems for it to be valuable.

.

. More details

Leave a Reply