Convert CSVs to ORC Faster

Every analytical database Ive used converts imported data into a form that is quicker to read.

Often this means storing data in column form instead of row form.

The taxi trip dataset I benchmark with is around 100 GB when gzip-compressed compressed in row form but the five columns that are queried can be stored in around 3.

5 GB of space in columnar form when compressed using a mixture of dictionary encoding, run-length encoding and Snappy compression.

The process of converting rows into columns is time consuming and compute-intensive.

Most systems can take the better part of an hour to finish this conversion, even when using a cluster of machines.

I once believed that compression was causing most of the overhead but in researching this post I found Spark 2.

4.

0 had a ~7% difference in conversion time between using Snappy, zlib, lzo and not using any compression at all.

Ive historically used Hive for this conversion process but there are ways to mix and match different Hadoop tools, including Spark and Presto, to get the same outcome and often with very different processing times.

Spark, Hive and Presto are all very different code bases.

Spark is made up of 500K lines of Scala, 110K lines of Java and 40K lines of Python.

Presto is made up of 600K lines of Java.

Hive is made up of over one million lines of Java and 100K lines of C++ code.

Any libraries they share are out-weighted by the unique approaches theyve taken in the architecture surrounding their SQL parsers, query planners, optimizers, code generators and execution engines when it comes to tabular form conversion.

I recently benchmarked Spark 2.

4.

0 and Presto 0.

214 and found that Spark out-performed Presto when it comes to ORC-based queries.

In this post Im going to examine the ORC writing performance of these two engines plus Hive and see which can convert CSV files into ORC files the fastest.

AWS EMR Up & Running The following will launch an EMR cluster with a single master node and 20 core nodes.

The cluster runs version 2.

8.

5 of Amazons Hadoop distribution, Hive 2.

3.

4, Presto 0.

214 and Spark 2.

4.

0.

All nodes are spot instances to keep the cost down.

$ aws emr create-cluster –applications Name=Hadoop Name=Hive Name=Presto Name=Spark –auto-scaling-role EMR_AutoScaling_DefaultRole –ebs-root-volume-size 10 –ec2-attributes '{ "KeyName": "emr", "InstanceProfile": "EMR_EC2_DefaultRole", "EmrManagedSlaveSecurityGroup": "sg-.

", "EmrManagedMasterSecurityGroup": "sg-.

" }' –enable-debugging –instance-groups '[ { "Name": "Core – 2", "InstanceCount": 20, "BidPrice": "OnDemandPrice", "InstanceType": "m3.

xlarge", "InstanceGroupType": "CORE" }, { "InstanceCount": 1, "Name": "Master – 1", "InstanceGroupType": "MASTER", "EbsConfiguration": { "EbsOptimized": false, "EbsBlockDeviceConfigs": [ { "VolumeSpecification": { "VolumeType": "standard", "SizeInGB": 400 }, "VolumesPerInstance": 1 } ] }, "BidPrice": "OnDemandPrice", "InstanceType": "m3.

xlarge" } ]' –log-uri 's3n://aws-logs-.

-eu-west-1/elasticmapreduce/' –name 'My cluster' –region eu-west-1 –release-label emr-5.

20.

0 –scale-down-behavior TERMINATE_AT_TASK_COMPLETION –service-role EMR_DefaultRole –termination-protected With the cluster provisioned and bootstrapped I was able to SSH in.

$ ssh -i ~/.

ssh/emr.

pem hadoop@54.

155.

139.

6 __| __|_ ) _| ( / Amazon Linux AMI ___|___|___| https://aws.

amazon.

com/amazon-linux-ami/2018.

03-release-notes/ 3 package(s) needed for security, out of 6 available Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR The CSV dataset Ill be using in this benchmark is a data dump Ive produced of 1.

1 billion taxi trips conducted in New York City over a six year period.

The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together.

Theyre stored on AWS S3 so Ill configure the aws CLI with my access and secret keys and retrieve them.

$ aws configure Ill then set the clients concurrent requests limit to 100 so the files download quicker than they would with stock settings.

$ aws configure set default.

s3.

max_concurrent_requests 100 Ive requested an EBS storage volume on the master node so that I can download the dataset before loading it onto HDFS.

$ df -h Filesystem Size Used Avail Use% Mounted on devtmpfs 7.

4G 80K 7.

4G 1% /dev tmpfs 7.

4G 0 7.

4G 0% /dev/shm /dev/xvda1 9.

8G 4.

6G 5.

1G 48% / /dev/xvdb1 5.

0G 33M 5.

0G 1% /emr /dev/xvdb2 33G 289M 33G 1% /mnt /dev/xvdc 38G 33M 38G 1% /mnt1 /dev/xvdd 400G 33M 400G 1% /mnt2 I ran the following to pull the dataset off of S3.

$ mkdir -p /mnt2/csv/ $ aws s3 sync s3://<bucket>/csv/ /mnt2/csv/ I then ran the following to push the data onto HDFS.

$ hdfs dfs -mkdir /trips_csv/ $ hdfs dfs -copyFromLocal /mnt2/csv/*.

csv.

gz /trips_csv/ Converting CSVs to ORC using Hive Ill use Hive to create a schema catalogue for the various datasets that will be produced in this benchmark.

The following will create the table for the CSV-formatted dataset.

$ hive CREATE TABLE trips_csv ( trip_id INT, vendor_id VARCHAR(3), pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag VARCHAR(1), rate_code_id SMALLINT, pickup_longitude DECIMAL(18,14), pickup_latitude DECIMAL(18,14), dropoff_longitude DECIMAL(18,14), dropoff_latitude DECIMAL(18,14), passenger_count SMALLINT, trip_distance DECIMAL(6,3), fare_amount DECIMAL(6,2), extra DECIMAL(6,2), mta_tax DECIMAL(6,2), tip_amount DECIMAL(6,2), tolls_amount DECIMAL(6,2), ehail_fee DECIMAL(6,2), improvement_surcharge DECIMAL(6,2), total_amount DECIMAL(6,2), payment_type VARCHAR(3), trip_type SMALLINT, pickup VARCHAR(50), dropoff VARCHAR(50), cab_type VARCHAR(6), precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel VARCHAR(10), pickup_borocode SMALLINT, pickup_boroname VARCHAR(13), pickup_ct2010 VARCHAR(6), pickup_boroct2010 VARCHAR(7), pickup_cdeligibil VARCHAR(1), pickup_ntacode VARCHAR(4), pickup_ntaname VARCHAR(56), pickup_puma VARCHAR(4), dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel VARCHAR(10), dropoff_borocode SMALLINT, dropoff_boroname VARCHAR(13), dropoff_ct2010 VARCHAR(6), dropoff_boroct2010 VARCHAR(7), dropoff_cdeligibil VARCHAR(1), dropoff_ntacode VARCHAR(4), dropoff_ntaname VARCHAR(56), dropoff_puma VARCHAR(4) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/trips_csv/'; Ill create a table that will store the dataset as an ORC-formatted, Snappy-compressed dataset that will be produced by Hive.

CREATE TABLE trips_orc_snappy_hive ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION '/trips_orc_snappy_hive/' TBLPROPERTIES ("orc.

compress"="snappy"); Below Ill convert the CSV dataset into ORC using Hive alone.

The following took 55 mins and 5 seconds.

INSERT INTO trips_orc_snappy_hive SELECT * FROM trips_csv; Converting CSVs to ORC using Spark Ill create a table for Spark to store its ORC-formatted, Snappy-compressed data.

$ hive CREATE TABLE trips_orc_snappy_spark ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION '/trips_orc_snappy_spark/' TBLPROPERTIES ("orc.

compress"="snappy"); Ill then launch Spark and convert the CSV data into ORC using its engine.

$ spark-sql The following took 1 hour, 43 mins and 7 seconds.

INSERT INTO TABLE trips_orc_snappy_spark SELECT * FROM trips_csv; To show that Parquet isnt the more optimised format Ill create a table to store Snappy-compressed data in Parquet format and run the same CSV conversion.

CREATE TABLE trips_parquet_snappy_spark ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS parquet LOCATION '/trips_parquet_snappy_spark/' TBLPROPERTIES ("parquet.

compress"="snappy"); The following took 1 hour, 56 minutes and 29 seconds.

INSERT INTO TABLE trips_parquet_snappy_spark SELECT * FROM trips_csv; The HDFS cluster only has 1.

35 TB of capacity and 3x replication is being used so Ill clear out these datasets before continuing.

$ hdfs dfs -rm -r -skipTrash /trips_orc_snappy_hive/ $ hdfs dfs -rm -r -skipTrash /trips_orc_snappy_spark/ $ hdfs dfs -rm -r -skipTrash /trips_parquet_snappy_spark/ Converting CSVs to ORC using Presto Below Ill create a table for Presto to store a Snappy-compressed ORC dataset.

$ hive CREATE TABLE trips_orc_snappy_presto ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION '/trips_orc_snappy_presto/' TBLPROPERTIES ("orc.

compress"="snappy"); Ill run the CSV to ORC conversion in Prestos CLI.

$ presto-cli –schema default –catalog hive The following took 37 mins and 35 seconds.

INSERT INTO trips_orc_snappy_presto SELECT * FROM trips_csv; The above generated a dataset 118.

6 GB in size (excluding HDFS replication).

$ hdfs dfs -du -s -h /trips_orc_snappy_presto/ Facebook have been working on implementing ZStandard, a lossless data compression algorithm, and integrating it into various tools in the Hadoop ecosystem.

Spark 2.

4.

0 on EMR with stock settings isnt able to read this compression scheme but Presto can both read and write it.

Ill create a ZStandard-compressed table for Presto below using Hive.

$ hive CREATE TABLE trips_orc_zstd_presto ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION '/trips_orc_zstd_presto/' TBLPROPERTIES ("orc.

compress"="zstd"); Ill then use Presto to convert the CSV data into ZStandard-compressed ORC files.

$ presto-cli –schema default –catalog hive The following took 37 mins and 44 seconds.

INSERT INTO trips_orc_zstd_presto SELECT * FROM trips_csv; The above generated 79 GB of data (excluding HDFS replication).

$ hdfs dfs -du -s -h /trips_orc_zstd_presto/ Presto ORC Benchmark: Snappy versus ZStandard ZStandard did a good job to save space on HDFS and still converted the data in a very short amount of time.

Below Ill look at the impact of the two compression schemes on query performance.

The following were the fastest times I saw after running each query multiple times.

$ presto-cli –schema default –catalog hive These four queries were run on the Snappy-compressed, ORC-formatted dataset.

The following completed in 5.

48 seconds.

SELECT cab_type, count(*) FROM trips_orc_snappy_presto GROUP BY cab_type; The following completed in 7.

85 seconds.

SELECT passenger_count, avg(total_amount) FROM trips_orc_snappy_presto GROUP BY passenger_count; The following completed in 8.

55 seconds.

SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_orc_snappy_presto GROUP BY passenger_count, year(pickup_datetime); The following completed in 11.

92 seconds.

SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_orc_snappy_presto GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc; These four queries were run on the ZStandard-compressed, ORC-formatted dataset.

The following completed in 4.

21 seconds.

SELECT cab_type, count(*) FROM trips_orc_zstd_presto GROUP BY cab_type; The following completed in 5.

97 seconds.

SELECT passenger_count, avg(total_amount) FROM trips_orc_zstd_presto GROUP BY passenger_count; The following completed in 7.

3 seconds.

SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_orc_zstd_presto GROUP BY passenger_count, year(pickup_datetime); The following completed in 11.

68 seconds.

SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_orc_zstd_presto GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc; Thoughts on the Results Hive being twice as fast as Spark at converting CSVs to ORC files took me by surprise as Spark has a younger code base.

That being said, Presto being 1.

5x faster as Hive was another shocker.

Im hoping in publishing this post that the community are made more aware of these performance differences and we can find improvements in future releases of all three packages.

Hive being a central store of catalogue data for so many tools in the Hadoop ecosystem cannot be ignored and I expect I will continue to use it for a long time to come.

Where Im put into an awkward position is in recommending that converting CSVs into ORCs should be done in Presto for significant time savings but querying that data afterword is quicker in Spark.

There really is no one tool that rules them all.

ZStandard offers a lot in terms of disk space savings while not impacting query performance significantly.

To take the raw data and make it 1.

5x smaller while not impacting conversion time is fantastic.

When Hadoop 3s Erasure Coding is mixed into the equation itll bring space requirements down by 2/3rds on HDFS.

Being able to buy a petabyte of storage capacity and storing three times as much data on it is huge.

I feel that my research into converting CSVs into ORC files is just getting started.

Beyond fundamental architectural differences between these three pieces of software I suspect stock settings on EMR could be improved to provide faster conversion times.

This isnt something I can prove at this point in time and this will be a subject of further research.

Spark did receive support for ZStandard in 2.

3.

0 so I suspect its nothing more than a configuration change to add support into Spark on EMR.

Im going to keep an eye on future releases of AWS EMR to see if this is added to the stock settings.

.. More details

Leave a Reply