A Billion Taxi Rides on Amazon EMR running Spark

$ hive CREATE EXTERNAL TABLE trips_orc ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION 's3://<s3_bucket>/orc/'; CREATE EXTERNAL TABLE trips_parquet ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS parquet LOCATION 's3://<s3_bucket>/parquet/'; 1.1 Billion Trips in Parquet Format The following will copy all the data in the trips_orc table into the trips_parquet..This process should take around 2 hours but could be quicker if you add more task nodes to your cluster..$ screen $ echo "INSERT INTO TABLE trips_parquet SELECT * FROM trips_orc;" | hive Benchmarking Queries in Spark SQL Make sure Spark is using at least 8 GB of memory when you launch it..The fourth query executed in the benchmark will run out of heap space with the amount of memory specified in the stock EMR 4.3.0 configuration settings..$ SPARK_MEM=${SPARK_MEM:-8196m} $ export SPARK_MEM $ JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" The following will launch the Spark SQL CLI interface..$ spark-sql I was surprised how quickly Spark SQL could query the data..Unlike Presto, which could use both core and task nodes to complete queries, Spark only used the task nodes to do all the heavy lifting..Though Spark can do analytics its main strengths seem to be in running machine learning code, graph analysis and other recursive and memory-hungry workloads..So it might not be fair comparing results of the Presto benchmarks I did in the Presto on EMR blog post..Nonetheless its interesting to see such differences in performance..The following completed in 4 minutes and 24 seconds (3.5x slower than Presto)..SELECT cab_type, count(*) FROM trips_parquet GROUP BY cab_type; The following completed in 5 minutes and 13 seconds (5x slower than Presto).. More details

Leave a Reply