Movie recommendation using Apache Spark

Movie recommendation using Apache SparkVarun AbhiBlockedUnblockFollowFollowingFeb 25Apache Spark being the biggest player in the Big data analysis and processing world has so many advantages over the other platforms such as Apache Flink, Apache Storm etcSpark is currently used by top companies in the world.

Here is the list of some companies who claim to use Spark in their infrastructurePowered By Spark | Apache SparkNames like "Spark BigCoProduct" are not OK, as are names including "Spark" in general.

The above links, however…spark.


orgWe all love to watch Netflix and It is amazing how Netflix recommend us the top N number of movies related to the particular genre or in specific related to some movie that we have already watched.

So let us see how Spark can do the same thing for us.

As Spark perfectly handles the large data set, So we will be considering the large set of movies and then we will find out the list of top 10 movies related to the selected movie.

Some prior knowledge of Spark along with some hands-on experience will be preferable and in addition to that some basic knowledge of data science algorithms is also requiredWe will be using Scala as a programming language for this use case but Python as well as R will also work fine for the same.

Also, please find attached the Github repo link belowLet’s startStep 1: We will be requiring some data set to operate on, So the following link contains the data that we will be using for our ApplicationMovie recommendation – Google DriveEdit descriptiontinyurl.

comThe u.

data file downloaded from above link contains the mapping in the following format:UserId -> MovieID -> Rating -> TimestampAnd the u.

item file contains the mapping of movie id with its name in the formatMovieId -> Name -> Release date -> ….

Step 2: We will be reading the u.

data file and will be mapping our data to output RDD of the form (UserId, (MovieId, Rating) )def mapUserIdAndMovieRatings(): RDD[(Int,(Int,Double))]= { val dataFile: RDD[String] =spark.


data") val userIdMappedWithMovieIdAndRating: RDD[(Int, (Int, Double))] =dataFile.

map(line =>{ val fields=line.

split("s+") //Using regex to split (fields(0).



toDouble)) }) userIdMappedWithMovieIdAndRating}Step 3: Once we have a mapping of every userId with a movieID they watched and the rating they gave, now we will self join this data obtained from Step 2 to obtain mapping of userId with pair of movies watched by them.

After Self join we will obtain RDD of the form:UserID -> ((MovieID,Rating),(MovieId,Rating))So this gives us the RDD that represents 2 movies watched by any same user and the rating they gave.

//Finding movie sets along with the rating given by each particular userval pairOfMoviesWatchedBySameUser: RDD[(Int, ((Int, Double), (Int, Double)))] =userIdMappedWithMovieIdAndRatings.

join(userIdMappedWithMovieIdAndRatings)Step 4: Now, as we did a self join so we have to remove duplicates from above data obtained in step 3.

Duplicates will be considered in the following casesIf both movies are same2.

(Movie A, Rating A),(Movie B, Rating B) is same as (Movie B, Rating B),( Movie A, Rating A)So to remove this duplicacy, we can use any method and one of them is to remove movie pairs with MovieId1=MovieId2 and remove pair of movies with MovieID1<MovieId2 (As stated in point 2 above)So this will eliminate all duplicate records.

//Filtering duplicate movies dataval pairOfMoviesWithoutDuplicates: RDD[(Int, ((Int, Double), (Int, Double)))] =pairOfMoviesWatchedBySameUser.

filter(filterDuplicateMovieData)def filterDuplicateMovieData(userIdAndPairOfMovies:(Int,((Int,Double),(Int,Double)))): Boolean = { val movieId1: Int =userIdAndPairOfMovies.



_1 val movieId2: Int =userIdAndPairOfMovies.



_1 movieId1<movieId2 //MovieId1==MovieId2 ( Same movie ) and MovieId2 < MovieId1 (Repeated Pair )}Step 5: Now moving forward we don't need userID any more, so we will be mapping our data to the form(MovieID1,MovieId2) -> (Rating1,Rating2)So we will be having Rating pair corresponding to every movie pair.

//Mapping userMovieData in the form (movie1,movie2) => (rating1,rating2)val moviePairAndRatings: RDD[((Int, Int), (Double, Double))] =pairOfMoviesWithoutDuplicates.

map(mapMoviePairsWithRatings)def mapMoviePairsWithRatings(userIdAndMovieData:(Int,((Int,Double),(Int,Double)))): ((Int,Int),(Double,Double))= { val movieId1=userIdAndMovieData.



_1 val movieId2=userIdAndMovieData.



_1 val rating1=userIdAndMovieData.



_2 val rating2=userIdAndMovieData.



_2 ((movieId1,movieId2),(rating1,rating2))}Step 6: Now once we are ready with Movie pair and their rating pair, we will be performing the group by key operation on this RDD.

What group by key will do is that Group all the ratings together for the same movie pair.

So example, we will have our dataset pointing Toy story and Incredibles to the group of ratings given to this movie pair by the users in our dataset (u.

data file).

//Combining all the same movie sets with their ratingsval groupOfRatingPairsForSameMoviePair: RDD[((Int, Int), Iterable[(Double, Double)])] =moviePairAndRatings.

groupByKey()Step 7: Now we come across to the data science stuffOnce we have all the Movie pairs mapped to a group of their ratings, we will be applying the Cosine Similarity algorithm to find out the similarity between our movie pair.

Cosine similarity algorithm is used in data science to find out the similarity between two vectors.

//Computing similarity among rating pairs using Cosine similarity algorithmval moviePairsAndSimilarityScore: RDD[((Int, Int), (Double, Int))] =groupOfRatingPairsForSameMoviePair.

mapValues(computeCosineSimilarity)def computeCosineSimilarity(ratingPairs: Iterable[(Double, Double)]): (Double,Int) = { var numOfPairs: Int =0 var sumXX: Double =0.

0 var sumYY: Double =0.

0 var sumXY: Double =0.

0 for(ratingPair: (Double, Double) <- ratingPairs){ val ratingX: Double =ratingPair.

_1 val ratingY: Double =ratingPair.

_2 sumXX+=ratingX*ratingX sumYY+=ratingY*ratingY sumXY+=ratingX*ratingY numOfPairs+=1 } val numerator: Double =sumXY val denominator: Double =Math.


sqrt(sumYY) val result: Double =numerator/denominator (result,numOfPairs)}So after applying the cosine similarity algorithm we will have our dataset in the form: (MovieID1, MovieId2) -> (Similarity score, Number of pairs)So every movie pair mapped with the Similarity score and number of pairs for this particular movie pair as collected from our data file (u.

data file)Step 8: The data file that we have considered in this example is supervised dataset and we will be using following parameters as computed from this datasetThreshold score -> Cosine value should be greater than this for movies to match togetherCo-occurrence threshold -> Number of rating pairs should be greater than this value to obtain that more than this number of users have watched these moviesSo for this use case: Threshold score we will using is 0.

97 because Cosine (0) is 1 and exactly 1 means the same movie, so we are considering 0.

97And for co-occurrence threshold, we are using a value of 50 by analyzing our data set.

(It's completely dependent on the data file that we took )So these are some supervised results that we have for our data set.

Step 9: So now we will be filtering our dataset obtained in step 7 on the basis of threshold score and co-occurrence as mentioned in step 8.

So movie pairs having a similarity score greater than the threshold score (0.

97) and the number of pairs greater than co-occurrence threshold will be considered for similarity.

val scoreThreshold: Double = 0.

97 //Calculated according to our data setval coOccurenceThreshold: Double = 50.

0 //Calculated according to our data setval movieId: Int =args.


toIntval moviePairsFilteredAccordingToThreshold: RDD[((Int, Int), (Double, Int))] =moviesAndSimilarityScore.

filter((moviePairAndScore: ((Int, Int), (Double, Int))) => { val moviePair: (Int, Int) =moviePairAndScore.

_1 val ratingAndNumOfPairs: (Double, Int) =moviePairAndScore.

_2 (moviePair.

_1==movieId || moviePair.

_2==movieId) && ratingAndNumOfPairs.

_1>scoreThreshold && ratingAndNumOfPairs.

_2>coOccurenceThreshold})Step 10: Now once we have RDD obtained from step 9, we can use “take” API of spark to take n records from our dataset.

So to find out the top 10 movie pairs related to that movie that we have selected, we will be doing take(10) on our dataset.

val first10MoviesAndTheirScores: Array[((Int, Int), (Double, Int))] =moviePairsFilteredAccordingToThreshold.

take(10)Step 9: Now we have our top 10 movie pairs in which one movie is the movie that we have selected and the other one is our required result.

But we have to find out the name of movies instead of the IDs.

So, we will be using “u.

item” file to map movie id with the particular name.

def mapMovieIdAndName():Map[Int,String]={ // Handle character encoding issues: implicit val codec = Codec("UTF-8") codec.


REPLACE) codec.


REPLACE) val idAndNameMapped: Map[Int, String] =Source.




map(line =>{ val lineArr=line.

split('|') (lineArr.


toInt,lineArr(1)) }).

toMap[Int,String] idAndNameMapped}Once we have mapped id with name, we will find out the name for every movie id that we have in our top 10 listAfter this, we will iterate all the 10 movie pairs and will print out the movie name that is similar to our selected moviefirst10MoviesAndTheirScores.

foreach(moviePairAndScore =>{ val movie1: Int =moviePairAndScore.


_1 val movie2: Int =moviePairAndScore.


_2 var suggestedMovie: Int =movie2 if(movie2==movieId) { suggestedMovie = movie1 }println(idAndMovieNames(suggestedMovie))This will look something like this:That’s it.

This was the process to recommend movies by using Apache SparkThe whole codebase can be found here:https://github.

com/varunthk7/Spark-use-casesI would love to hear comments for further enhancementHappy Coding!.

. More details

Leave a Reply