Faster ClickHouse Imports

'; The above took 20 minutes and 23 seconds to complete.

Ill launch ClickHouses Client, create a table pointing to the trips table on MySQL and then import that data into a ClickHouse table which will store the records using the "Log" engine.

$ clickhouse-client CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'trips', 'mark', 'test'); INSERT INTO trips SELECT * FROM mysql_db.

trips; The above took 10 minutes and 57 seconds.

MySQLs internal format needed 42 GB of space to store the dataset.

The dataset is 9.

9 GB when kept in ClickHouses internal Log engine format.

During the import I could see ClickHouse using 50% of a CPU core and MySQL needing 2.

75 CPU cores of capacity.

The disk throughput required never breached more than 20% of what the SSD is capable of.

Importing From JSON Ill dump the 80 million rows in ClickHouse to JSON.

$ clickhouse-client –query='SELECT * FROM trips FORMAT JSONEachRow' > out.

json The resulting JSON file is 100 GB in size and took 9 minutes and 22 seconds to produce.

Here is a sampling of the first three rows.

$ head -n3 out.

json {"trip_id":471162830,"vendor_id":"CMT","pickup_datetime":"2011-09-27 10:19:11","dropoff_datetime":"2011-09-27 10:24:19","store_and_fwd_flag":"N","rate_code_id":1,"pickup_longitude":-73.

994426,"pickup_latitude":40.

745873,"dropoff_longitude":-74.

005886,"dropoff_latitude":40.

745395,"passenger_count":1,"trip_distance":0.

8,"fare_amount":4.

9,"extra":0,"mta_tax":0.

5,"tip_amount":0,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":5.

4,"payment_type":"CSH","trip_type":0,"pickup":"1156","dropoff":"2098","cab_type":"yellow","precipitation":0,"snow_depth":0,"snowfall":0,"max_temperature":-6,"min_temperature":-50,"average_wind_speed":16,"pickup_nyct2010_gid":-124,"pickup_ctlabel":"91","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"009100","pickup_boroct2010":"1009100","pickup_cdeligibil":"I","pickup_ntacode":"MN13","pickup_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","pickup_puma":"3807","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"} {"trip_id":471162831,"vendor_id":"VTS","pickup_datetime":"2011-09-15 17:14:00","dropoff_datetime":"2011-09-15 17:48:00","store_and_fwd_flag":"u0000","rate_code_id":1,"pickup_longitude":-73.

968207,"pickup_latitude":40.

752457,"dropoff_longitude":-74.

008213,"dropoff_latitude":40.

74876,"passenger_count":1,"trip_distance":3.

15,"fare_amount":17.

7,"extra":1,"mta_tax":0.

5,"tip_amount":0,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":19.

2,"payment_type":"CSH","trip_type":0,"pickup":"1432","dropoff":"2098","cab_type":"yellow","precipitation":18,"snow_depth":0,"snowfall":0,"max_temperature":-12,"min_temperature":117,"average_wind_speed":21,"pickup_nyct2010_gid":-104,"pickup_ctlabel":"90","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"009000","pickup_boroct2010":"1009000","pickup_cdeligibil":"I","pickup_ntacode":"MN19","pickup_ntaname":"Turtle Bay-East Midtown","pickup_puma":"3808","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"} {"trip_id":471162832,"vendor_id":"VTS","pickup_datetime":"2011-09-25 12:55:00","dropoff_datetime":"2011-09-25 13:05:00","store_and_fwd_flag":"u0000","rate_code_id":1,"pickup_longitude":-73.

99651,"pickup_latitude":40.

72563,"dropoff_longitude":-74.

007203,"dropoff_latitude":40.

75135,"passenger_count":1,"trip_distance":2.

48,"fare_amount":8.

5,"extra":0,"mta_tax":0.

5,"tip_amount":1,"tolls_amount":0,"ehail_fee":0,"improvement_surcharge":0,"total_amount":10,"payment_type":"CRD","trip_type":0,"pickup":"1471","dropoff":"2098","cab_type":"yellow","precipitation":0,"snow_depth":0,"snowfall":0,"max_temperature":11,"min_temperature":-45,"average_wind_speed":4,"pickup_nyct2010_gid":-65,"pickup_ctlabel":"55.

02","pickup_borocode":1,"pickup_boroname":"Manhattan","pickup_ct2010":"005502","pickup_boroct2010":"1005502","pickup_cdeligibil":"I","pickup_ntacode":"MN23","pickup_ntaname":"West Village","pickup_puma":"3810","dropoff_nyct2010_gid":50,"dropoff_ctlabel":"99","dropoff_borocode":1,"dropoff_boroname":"Manhattan","dropoff_ct2010":"009900","dropoff_boroct2010":"1009900","dropoff_cdeligibil":"I","dropoff_ntacode":"MN13","dropoff_ntaname":"Hudson Yards-Chelsea-Flatiron-Union Square","dropoff_puma":"3807"} Ill split the JSON file into four files so that I can attempt parallel imports of the JSON data.

$ split –lines=20000000 out.

json The above operation produced four files of 25 GB each.

$ ls -alht xa* -rw-rw-r– 1 mark mark 25G Oct 15 04:13 xad -rw-rw-r– 1 mark mark 25G Oct 15 04:09 xac -rw-rw-r– 1 mark mark 25G Oct 15 04:06 xab -rw-rw-r– 1 mark mark 25G Oct 15 04:02 xaa Ill truncate the trips table in ClickHouse and then import the above four JSON files sequentially.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ for FILENAME in xa*; do clickhouse-client –query="INSERT INTO trips FORMAT JSONEachRow" < $FILENAME done The above took 10 minutes and 11 seconds.

The CPU usage during the import never breached 50%.

The SSD achieved read speeds of 225 MB/s and writes of 40 MB/s.

Ill truncate the trips table and attempt a parallel import to see if I can reduce the import time.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ vi jobs clickhouse-client –query="INSERT INTO trips FORMAT JSONEachRow" < xaa clickhouse-client –query="INSERT INTO trips FORMAT JSONEachRow" < xab clickhouse-client –query="INSERT INTO trips FORMAT JSONEachRow" < xac clickhouse-client –query="INSERT INTO trips FORMAT JSONEachRow" < xad $ cat jobs | xargs -n1 -P4 -I% bash -c "%" The above attempt with four parallel processes failed with the following error.

Code: 209.

DB::NetException: Timeout exceeded while reading from socket ([::1]:9000): while receiving packet from localhost:9000 Ill attempt a parallel import with two processes instead.

$ cat jobs | xargs -n1 -P2 -I% bash -c "%" The above took 8 minutes and 43 seconds.

Since the SSD is showing such a high read rate Ill compress the JSON files and attempt a sequential import.

$ pigz –keep xa* 9.

2 GB of GZIP-compressed JSON files were produced by the above operation.

Ill then truncate the trips table and import the GZIP-compressed data.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ for FILENAME in xa*.

gz; do gunzip -c $FILENAME | clickhouse-client –query="INSERT INTO trips FORMAT JSONEachRow" done The above took 8 minutes and 9 seconds.

Importing From PostgreSQL Ill create credentials in PostgreSQL, a database and then a table which will store the 80 million trips.

$ sudo -u postgres bash -c "psql -c "CREATE USER mark WITH PASSWORD 'test' SUPERUSER;"" $ createdb trips $ psql trips CREATE TABLE trips ( trip_id INT, vendor_id VARCHAR(3), pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag VARCHAR(1), rate_code_id SMALLINT, pickup_longitude DECIMAL(18,14), pickup_latitude DECIMAL(18,14), dropoff_longitude DECIMAL(18,14), dropoff_latitude DECIMAL(18,14), passenger_count SMALLINT, trip_distance DECIMAL(18,6), fare_amount DECIMAL(18,6), extra DECIMAL(18,6), mta_tax DECIMAL(18,6), tip_amount DECIMAL(18,6), tolls_amount DECIMAL(18,6), ehail_fee DECIMAL(18,6), improvement_surcharge DECIMAL(18,6), total_amount DECIMAL(18,6), payment_type VARCHAR(3), trip_type SMALLINT, pickup VARCHAR(50), dropoff VARCHAR(50), cab_type VARCHAR(6), precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel VARCHAR(10), pickup_borocode SMALLINT, pickup_boroname VARCHAR(13), pickup_ct2010 VARCHAR(6), pickup_boroct2010 VARCHAR(7), pickup_cdeligibil VARCHAR(1), pickup_ntacode VARCHAR(4), pickup_ntaname VARCHAR(56), pickup_puma VARCHAR(4), dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel VARCHAR(10), dropoff_borocode SMALLINT, dropoff_boroname VARCHAR(13), dropoff_ct2010 VARCHAR(6), dropoff_boroct2010 VARCHAR(7), dropoff_cdeligibil VARCHAR(1), dropoff_ntacode VARCHAR(4), dropoff_ntaname VARCHAR(56), dropoff_puma VARCHAR(4) ); I will then import the 80 million records from four CSV files sequentially.

copy trips FROM '000000_0.

csv' DELIMITER ',' CSV copy trips FROM '000000_1.

csv' DELIMITER ',' CSV copy trips FROM '000000_2.

csv' DELIMITER ',' CSV copy trips FROM '000000_3.

csv' DELIMITER ',' CSV The above completed in 11 minutes and 32 seconds.

Nether the SSD nor the CPU were at capacity during the above operation.

Its worth looking at tuning PostgreSQL, examining alternative import methods and seeing if parallelism could have a strong effect on import times.

The resulting dataset is 31 GB in PostgreSQLs internal format.

Ill truncate the trips table in ClickHouse and import the dataset from PostgreSQL using UNIX pipes to deliver CSV data to ClickHouse.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ psql trips -c "COPY trips TO STDOUT WITH CSV" | clickhouse-client –query="INSERT INTO trips FORMAT CSV" The above took 9 minutes and 39 seconds.

The CPU showed 70% of capacity being utilised while the SSD showed peaks of 60 MB/s being read and 120 MB/s being written at any one time.

Importing From Kafka Ill install Kafka manually using the binary package distributed by one of Apaches mirrors.

$ sudo mkdir -p /opt/kafka $ wget -c -O kafka.

tgz http://www-eu.

apache.

org/dist/kafka/2.

3.

0/kafka_2.

12-2.

3.

0.

tgz $ sudo tar xzvf kafka.

tgz –directory=/opt/kafka –strip 1 Ill then create a log file for Kafka which will be owned by my UNIX account.

$ sudo touch /var/log/kafka.

log $ sudo chown mark /var/log/kafka.

log Ill then launch Kafkas server process.

$ sudo nohup /opt/kafka/bin/kafka-server-start.

sh /opt/kafka/config/server.

properties > /var/log/kafka.

log 2>&1 & Ill create a trips topic in Kafka.

$ /opt/kafka/bin/kafka-topics.

sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic trips Ill then import the CSV data into the trips topic in Kafka.

$ cat 000000_[0-3].

csv | /opt/kafka/bin/kafka-console-producer.

sh –topic trips –broker-list localhost:9092 1>/dev/null The above took 8 minutes and 30 seconds.

During that time the CPU was at 90% of capacity and the SSD achieved read and write speeds between 70 – 100 MB/s at any one time.

The resulting dataset is 36 GB in Kafkas internal format.

Ill truncate the trips table in ClickHouse, then create a table in ClickHouse which points to the trips Kafka topic and then import its contents into the trips table in ClickHouse.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ clickhouse-client CREATE TABLE trips_kafka_csv ( trip_id UInt32, vendor_id String, pickup_datetime DateTime, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'trips', kafka_group_name = 'group1', kafka_format = 'CSV', kafka_num_consumers = 4; INSERT INTO trips SELECT * FROM trips_kafka_csv; During the above operation the CPU was at 50% capacity and there was almost no disk activity to speak of.

The progress indicator showed 3,180 rows per second being imported into ClickHouse.

→ Progress: 5.

67 million rows, 3.

33 GB (3.

17 thousand rows/s.

, 1.

86 MB/s.

) This is a capture from top during the above operation.

top – 02:53:24 up 1:30, 3 users, load average: 0.

99, 2.

14, 1.

75 Tasks: 206 total, 2 running, 204 sleeping, 0 stopped, 0 zombie %Cpu0 : 0.

4 us, 0.

0 sy, 0.

0 ni, 99.

6 id, 0.

0 wa, 0.

0 hi, 0.

0 si, 0.

0 st %Cpu1 : 1.

6 us, 0.

4 sy, 0.

0 ni, 98.

0 id, 0.

0 wa, 0.

0 hi, 0.

0 si, 0.

0 st %Cpu2 : 8.

3 us, 91.

7 sy, 0.

0 ni, 0.

0 id, 0.

0 wa, 0.

0 hi, 0.

0 si, 0.

0 st %Cpu3 : 0.

4 us, 0.

4 sy, 0.

0 ni, 99.

3 id, 0.

0 wa, 0.

0 hi, 0.

0 si, 0.

0 st KiB Mem : 24.

3/8157556 [||||||||||||||||||||||| ] KiB Swap: 1.

2/8385532 [| ] PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 11422 clickho+ 20 0 5544596 417140 29588 S 150.

2 5.

1 3:24.

86 /usr/bin/clickhouse-server 30574 root 20 0 5730472 998060 10408 S 0.

7 12.

2 8:58.

01 java -Xmx1G -Xms1G -server Had I allowed the operation to continue it would have taken seven hours to complete.

I havent come across any obvious parameter tuning in the ClickHouse manual at this point so Ill have to see if this can be addressed in a later post.

Traditionally Id use Flume to land Kafka streams onto HDFS and then run periodic jobs via Airflow to import time-partitioned datasets into OLAP systems similar to ClickHouse.

At some point when Ive uncovered the bottleneck above Ill aim to publish a comparison of these methods.

Importing CSV into ClickHouse Directly Ill import four GZIP-compressed CSV files sequentially using gunzip to decompress the contents before presenting them to ClickHouse via a UNIX pipe.

Each file is slightly less than 1,929 MB in size when compressed.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ for FILENAME in *.

csv.

gz; do gunzip -c $FILENAME | clickhouse-client –query="INSERT INTO trips FORMAT CSV" done The above completed in 4 minutes and 45 seconds.

Ill perform the same import but use four parallel processes.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ vi jobs gunzip -c 000000_0.

csv.

gz | clickhouse-client –query="INSERT INTO trips FORMAT CSV" gunzip -c 000000_1.

csv.

gz | clickhouse-client –query="INSERT INTO trips FORMAT CSV" gunzip -c 000000_2.

csv.

gz | clickhouse-client –query="INSERT INTO trips FORMAT CSV" gunzip -c 000000_3.

csv.

gz | clickhouse-client –query="INSERT INTO trips FORMAT CSV" $ cat jobs | xargs -n1 -P4 -I% bash -c "%" The above completed in 4 minutes and 39 seconds.

Barely any improvement at all.

Ill see if using data that is already decompressed speeds up the import process.

Below Ill import four decompressed files sequentially.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ for FILENAME in *.

csv; do clickhouse-client –query="INSERT INTO trips FORMAT CSV" < $FILENAME done The above completed in 5 minutes and 59 seconds.

Ill try the same operation with four parallel processes.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ vi jobs clickhouse-client –query="INSERT INTO trips FORMAT CSV" < 000000_0.

csv clickhouse-client –query="INSERT INTO trips FORMAT CSV" < 000000_1.

csv clickhouse-client –query="INSERT INTO trips FORMAT CSV" < 000000_2.

csv clickhouse-client –query="INSERT INTO trips FORMAT CSV" < 000000_3.

csv $ cat jobs | xargs -n1 -P4 -I% bash -c "%" The above completed in 6 minutes and 6 seconds.

It appears that parallel imports, regardless of file compression, has little or even a negative impact on import times.

The increased amount of data being read off of disk appears to have a negative impact on importing versus simply reading compressed data off disk and decompressing it just prior to import.

Importing Parquet into ClickHouse The Parquet dataset is Snappy-compressed and is made up of four files that are ~1,983 MB each in size.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ for FILENAME in *.

pq; do cat $FILENAME | clickhouse-client –query="INSERT INTO trips FORMAT Parquet" done The above completed in 2 minutes and 38 seconds.

Ill attempt the same operation again but using four parallel processes to import the data.

$ clickhouse-client –query="TRUNCATE TABLE trips" $ vi jobs cat 000000_0.

pq | clickhouse-client –query="INSERT INTO trips FORMAT Parquet" cat 000000_1.

pq | clickhouse-client –query="INSERT INTO trips FORMAT Parquet" cat 000000_2.

pq | clickhouse-client –query="INSERT INTO trips FORMAT Parquet" cat 000000_3.

pq | clickhouse-client –query="INSERT INTO trips FORMAT Parquet" $ cat jobs | xargs -n1 -P4 -I% bash -c "%" The above completed in 2 minutes and 48 seconds.

Much like what was seen with CSV data, parallel imports didnt have a positive impact on total import time.

Importing From HDFS into ClickHouse Ive installed HDFS via the Hadoop distribution provided by Apache.

The installation steps I took can be found in my Hadoop 3 Installation Guide.

Do note I didnt install Hive, Presto nor Spark for this exercise as theyre not needed.

Ill copy the decompressed CSV files onto HDFS.

$ hdfs dfs -mkdir -p /csv $ hdfs dfs -copyFromLocal ~/*.

csv /csv/ Ill then connect to ClickHouse and create a table representing the CSV data on HDFS.

$ clickhouse-client CREATE TABLE trips_hdfs_csv ( trip_id UInt32, vendor_id String, pickup_datetime DateTime, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = HDFS('hdfs://localhost:9000/csv/*', 'CSV'); Ill then truncate the trips table and import the data off HDFS into a native ClickHouse table.

TRUNCATE TABLE trips; INSERT INTO trips SELECT * FROM trips_hdfs_csv; The above completed in 3 minutes and 11 seconds.

During the import the CPU was almost completely utilised.

ClickHouse had almost three cores saturated while the HDFS DataNode took half a core to itself.

top – 07:13:06 up 30 min, 2 users, load average: 6.

72, 3.

33, 1.

76 Tasks: 201 total, 3 running, 198 sleeping, 0 stopped, 0 zombie %Cpu0 : 78.

0 us, 15.

7 sy, 0.

0 ni, 1.

0 id, 3.

5 wa, 0.

0 hi, 1.

7 si, 0.

0 st %Cpu1 : 40.

0 us, 29.

1 sy, 0.

0 ni, 1.

1 id, 8.

7 wa, 0.

0 hi, 21.

1 si, 0.

0 st %Cpu2 : 57.

6 us, 20.

9 sy, 0.

0 ni, 10.

4 id, 9.

7 wa, 0.

0 hi, 1.

4 si, 0.

0 st %Cpu3 : 70.

3 us, 15.

9 sy, 0.

0 ni, 5.

1 id, 5.

8 wa, 0.

0 hi, 2.

9 si, 0.

0 st KiB Mem : 8157556 total, 149076 free, 2358936 used, 5649544 buff/cache KiB Swap: 8386556 total, 8298996 free, 87560 used.

5465288 avail Mem PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 1102 clickho+ 20 0 5583684 1.

446g 74952 S 289.

0 18.

6 4:39.

64 /usr/bin/clickhouse-server 1894 root 20 0 3836828 276156 24244 S 0.

3 3.

4 0:09.

45 /usr/bin/java -Dproc_namenode 2059 root 20 0 3823000 268328 24388 S 47.

2 3.

3 3:28.

72 /usr/bin/java -Dproc_datanode Given the large amount of memory allocated towards the buffer / cache I suspect HDFS has done a good job of getting Linux to keep as many HDFS blocks as possible in the Page Cache.

Ill perform the same import off of HDFS but this time Ill use a Parquet-formatted, Snappy-compressed copy of the dataset.

Below Ill copy the Parquet files onto HDFS.

$ hdfs dfs -mkdir -p /pq $ hdfs dfs -copyFromLocal ~/*.

pq /pq/ Ill then create a table in ClickHouse to represent that dataset as a table.

$ clickhouse-client CREATE TABLE trips_hdfs_pq ( trip_id UInt32, vendor_id String, pickup_datetime DateTime, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = HDFS('hdfs://localhost:9000/pq/*', 'Parquet'); Ill then truncate the trips table and import the data off HDFS into a native ClickHouse table.

TRUNCATE TABLE trips; INSERT INTO trips SELECT * FROM trips_hdfs_pq; The above operation failed stating Id hit a single-query memory limit for ClickHouse Received exception from server (version 19.

15.

3): Code: 241.

DB::Exception: Received from localhost:9001.

DB::Exception: Memory limit (for query) exceeded: would use 10.

63 GiB (attempt to allocate chunk of 2684354560 bytes), maximum: 9.

31 GiB.

I had to upgrade the RAM on the system from 8 GB to 20 GB and set the maximum memory limit to an arbitrarily high number.

TRUNCATE TABLE trips; SET max_memory_usage = 20000000000; INSERT INTO trips SELECT * FROM trips_hdfs_pq; The above completed in 1 minute and 56 seconds.

The fastest of any method Ive attempted.

With the above the CPU was almost fully utilised, the SSD was nearing its throughput limits and almost the entirety of the RAM available to the system was in use.

Conclusion The format in which data is presented and the disk system it is presented with to ClickHouse can have a huge impact on import times.

Importing compressed JSON off of disk was 7.

3x slower than loading Parquet off of HDFS.

Parallelism doesnt seem to have any benefits either.

It was great to see how fast Parquet data would load off of HDFS but its a shame to see the RAM-intensity of this operation.

Every other import ran fine with no more than 8 GB of system memory.

A ratio of 8 GB of RAM per CPU core appears to be a sweet spot for this particular workload.

Below is a summary, sorted from fastest to slowest, of the above imports.

Seconds Method 116 Snappy-compressed Parquet off HDFS (sequential) 158 Snappy-compressed Parquet off disk (sequential) 168 Snappy-compressed Parquet off disk (parallel x 4) 191 Uncompressed CSV off HDFS (sequential) 278 GZIP-compressed CSV off disk (parallel x 4) 285 GZIP-compressed CSV off disk (sequential) 358 Uncompressed CSV off disk (sequential) 366 Uncompressed CSV off disk (parallel x 4) 522 Uncompressed JSON off disk (parallel x 2) 579 PostgreSQL 610 Uncompressed JSON off disk (sequential) 657 MySQL 849 GZIP-compressed JSON off disk (sequential).

. More details

Leave a Reply