1.1 Billion Taxi Rides on AWS EMR 5.3.0 & Spark 2.1.0

SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_parquet GROUP BY passenger_count, year(pickup_datetime); The following completed in 85.942 seconds..SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_parquet GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc; Next, Ill query the same tables on the same cluster using Presto..$ presto-cli –schema default –catalog hive The following completed in 16 seconds..SELECT cab_type, count(*) FROM trips_parquet GROUP BY cab_type; The following completed in 22 seconds..SELECT passenger_count, avg(total_amount) FROM trips_parquet GROUP BY passenger_count; The following completed in 30 seconds..SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_parquet GROUP BY passenger_count, year(pickup_datetime); The following completed in 37 seconds..SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_parquet GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc; 1.1B Trips in ORC Format There isnt enough HDFS capacity to store both the Parquet and ORC datasets so Ill remove the Parquet table and files first..$ hdfs dfs -rm -r /user/hive/warehouse/trips_parquet $ hive DROP TABLE trips_parquet; Ill then check that there are a few hundred gigs of free space on HDFS..$ hdfs dfsadmin -report | head Configured Capacity: 370168258560 (344.75 GB) Present Capacity: 367607626838 (342.36 GB) DFS Remaining: 367171158016 (341.95 GB) DFS Used: 436468822 (416.25 MB) DFS Used%: 0.12% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 Missing blocks (with replication factor 1): 0 Ill then launch Hive and create an ORC-formatted table using some compression, stripe and stride settings that have suited this dataset well in the past..$ hive CREATE EXTERNAL TABLE trips_orc ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc TBLPROPERTIES ("orc.compress"="SNAPPY", "orc.stripe.size"="67108864", "orc.row.index.stride"="50000"); The following will copy and convert the CSV data into ORC format..INSERT INTO trips_orc SELECT * FROM trips_csv; The above completed in 1 hour, 21 minutes and 49 seconds..Query ID = hadoop_20170129122340_868b5b63-6783-4382-9618-8aa5f8a16cbf Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id application_1485684315400_0003) ———————————————————————————————- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ———————————————————————————————- Map 1 ……….. More details

Leave a Reply