Using SQL to query Kafka, MongoDB, MySQL, PostgreSQL and Redis with Presto

$ sudo mkdir -p /opt/presto/etc/kafka $ sudo vi /opt/presto/etc/kafka/trips.json { "tableName": "trips", "schemaName": "default", "topicName": "trips", "message": { "dataFormat": "csv", "fields": [ {"type": "VARCHAR", "mapping": 0, "name": "trip_id"}, {"type": "VARCHAR", "mapping": 1, "name": "vendor_id"}, {"type": "VARCHAR", "mapping": 2, "name": "pickup_datetime"}, {"type": "VARCHAR", "mapping": 3, "name": "dropoff_datetime"}, {"type": "VARCHAR", "mapping": 4, "name": "store_and_fwd_flag"}, {"type": "VARCHAR", "mapping": 5, "name": "rate_code_id"}, {"type": "VARCHAR", "mapping": 6, "name": "pickup_longitude"}, {"type": "VARCHAR", "mapping": 7, "name": "pickup_latitude"}, {"type": "VARCHAR", "mapping": 8, "name": "dropoff_longitude"}, {"type": "VARCHAR", "mapping": 9, "name": "dropoff_latitude"}, {"type": "VARCHAR", "mapping": 10, "name": "passenger_count"}, {"type": "VARCHAR", "mapping": 11, "name": "trip_distance"}, {"type": "VARCHAR", "mapping": 12, "name": "fare_amount"}, {"type": "VARCHAR", "mapping": 13, "name": "extra"}, {"type": "VARCHAR", "mapping": 14, "name": "mta_tax"}, {"type": "VARCHAR", "mapping": 15, "name": "tip_amount"}, {"type": "VARCHAR", "mapping": 16, "name": "tolls_amount"}, {"type": "VARCHAR", "mapping": 17, "name": "ehail_fee"}, {"type": "VARCHAR", "mapping": 18, "name": "improvement_surcharge"}, {"type": "VARCHAR", "mapping": 19, "name": "total_amount"}, {"type": "VARCHAR", "mapping": 20, "name": "payment_type"}, {"type": "VARCHAR", "mapping": 21, "name": "trip_type"}, {"type": "VARCHAR", "mapping": 22, "name": "pickup"}, {"type": "VARCHAR", "mapping": 23, "name": "dropoff"}, {"type": "VARCHAR", "mapping": 24, "name": "cab_type"}, {"type": "VARCHAR", "mapping": 25, "name": "precipitation"}, {"type": "VARCHAR", "mapping": 26, "name": "snow_depth"}, {"type": "VARCHAR", "mapping": 27, "name": "snowfall"}, {"type": "VARCHAR", "mapping": 28, "name": "max_temperature"}, {"type": "VARCHAR", "mapping": 29, "name": "min_temperature"}, {"type": "VARCHAR", "mapping": 30, "name": "average_wind_speed"}, {"type": "VARCHAR", "mapping": 31, "name": "pickup_nyct2010_gid"}, {"type": "VARCHAR", "mapping": 32, "name": "pickup_ctlabel"}, {"type": "VARCHAR", "mapping": 33, "name": "pickup_borocode"}, {"type": "VARCHAR", "mapping": 34, "name": "pickup_boroname"}, {"type": "VARCHAR", "mapping": 35, "name": "pickup_ct2010"}, {"type": "VARCHAR", "mapping": 36, "name": "pickup_boroct2010"}, {"type": "VARCHAR", "mapping": 37, "name": "pickup_cdeligibil"}, {"type": "VARCHAR", "mapping": 38, "name": "pickup_ntacode"}, {"type": "VARCHAR", "mapping": 39, "name": "pickup_ntaname"}, {"type": "VARCHAR", "mapping": 40, "name": "pickup_puma"}, {"type": "VARCHAR", "mapping": 41, "name": "dropoff_nyct2010_gid"}, {"type": "VARCHAR", "mapping": 42, "name": "dropoff_ctlabel"}, {"type": "VARCHAR", "mapping": 43, "name": "dropoff_borocode"}, {"type": "VARCHAR", "mapping": 44, "name": "dropoff_boroname"}, {"type": "VARCHAR", "mapping": 45, "name": "dropoff_ct2010"}, {"type": "VARCHAR", "mapping": 46, "name": "dropoff_boroct2010"}, {"type": "VARCHAR", "mapping": 47, "name": "dropoff_cdeligibil"}, {"type": "VARCHAR", "mapping": 48, "name": "dropoff_ntacode"}, {"type": "VARCHAR", "mapping": 49, "name": "dropoff_ntaname"}, {"type": "VARCHAR", "mapping": 50, "name": "dropoff_puma"} ] } } MongoDB already named each of the fields in each record so only the connector details are needed for Presto to communicate with it..$ sudo vi /opt/presto/etc/catalog/mongodb.properties connector.name=mongodb mongodb.seeds=127.0.0.1 The following will give Presto the credentials and connection details for MySQL..$ sudo vi /opt/presto/etc/catalog/mysql.properties connector.name=mysql connection-url=jdbc:mysql://127.0.0.1:3306 connection-user=mark connection-password=test The following will give Presto the credentials and connection details for PostgreSQL..$ sudo vi /opt/presto/etc/catalog/postgresql.properties connector.name=postgresql connection-url=jdbc:postgresql://127.0.0.1:5432/taxi connection-user=mark connection-password=test The following will give Presto the connection details for Redis..The schema and table name being used is also declared in this file..$ sudo vi /opt/presto/etc/catalog/redis.properties connector.name=redis redis.table-names=schema1.trips redis.nodes=localhost:6379 Redis stores each record in raw CSV format..The following will define a schema that will name and type cast each CSV field so theyre usable in a tabular form in Presto..$ sudo mkdir -p /opt/presto/etc/redis $ sudo vi /opt/presto/etc/redis/trips.json { "tableName": "trips", "schemaName": "schema1", "value": { "dataFormat": "csv", "fields": [ {"type": "VARCHAR", "mapping": 0, "name": "trip_id"}, {"type": "VARCHAR", "mapping": 1, "name": "vendor_id"}, {"type": "VARCHAR", "mapping": 2, "name": "pickup_datetime"}, {"type": "VARCHAR", "mapping": 3, "name": "dropoff_datetime"}, {"type": "VARCHAR", "mapping": 4, "name": "store_and_fwd_flag"}, {"type": "VARCHAR", "mapping": 5, "name": "rate_code_id"}, {"type": "VARCHAR", "mapping": 6, "name": "pickup_longitude"}, {"type": "VARCHAR", "mapping": 7, "name": "pickup_latitude"}, {"type": "VARCHAR", "mapping": 8, "name": "dropoff_longitude"}, {"type": "VARCHAR", "mapping": 9, "name": "dropoff_latitude"}, {"type": "VARCHAR", "mapping": 10, "name": "passenger_count"}, {"type": "VARCHAR", "mapping": 11, "name": "trip_distance"}, {"type": "VARCHAR", "mapping": 12, "name": "fare_amount"}, {"type": "VARCHAR", "mapping": 13, "name": "extra"}, {"type": "VARCHAR", "mapping": 14, "name": "mta_tax"}, {"type": "VARCHAR", "mapping": 15, "name": "tip_amount"}, {"type": "VARCHAR", "mapping": 16, "name": "tolls_amount"}, {"type": "VARCHAR", "mapping": 17, "name": "ehail_fee"}, {"type": "VARCHAR", "mapping": 18, "name": "improvement_surcharge"}, {"type": "VARCHAR", "mapping": 19, "name": "total_amount"}, {"type": "VARCHAR", "mapping": 20, "name": "payment_type"}, {"type": "VARCHAR", "mapping": 21, "name": "trip_type"}, {"type": "VARCHAR", "mapping": 22, "name": "pickup"}, {"type": "VARCHAR", "mapping": 23, "name": "dropoff"}, {"type": "VARCHAR", "mapping": 24, "name": "cab_type"}, {"type": "VARCHAR", "mapping": 25, "name": "precipitation"}, {"type": "VARCHAR", "mapping": 26, "name": "snow_depth"}, {"type": "VARCHAR", "mapping": 27, "name": "snowfall"}, {"type": "VARCHAR", "mapping": 28, "name": "max_temperature"}, {"type": "VARCHAR", "mapping": 29, "name": "min_temperature"}, {"type": "VARCHAR", "mapping": 30, "name": "average_wind_speed"}, {"type": "VARCHAR", "mapping": 31, "name": "pickup_nyct2010_gid"}, {"type": "VARCHAR", "mapping": 32, "name": "pickup_ctlabel"}, {"type": "VARCHAR", "mapping": 33, "name": "pickup_borocode"}, {"type": "VARCHAR", "mapping": 34, "name": "pickup_boroname"}, {"type": "VARCHAR", "mapping": 35, "name": "pickup_ct2010"}, {"type": "VARCHAR", "mapping": 36, "name": "pickup_boroct2010"}, {"type": "VARCHAR", "mapping": 37, "name": "pickup_cdeligibil"}, {"type": "VARCHAR", "mapping": 38, "name": "pickup_ntacode"}, {"type": "VARCHAR", "mapping": 39, "name": "pickup_ntaname"}, {"type": "VARCHAR", "mapping": 40, "name": "pickup_puma"}, {"type": "VARCHAR", "mapping": 41, "name": "dropoff_nyct2010_gid"}, {"type": "VARCHAR", "mapping": 42, "name": "dropoff_ctlabel"}, {"type": "VARCHAR", "mapping": 43, "name": "dropoff_borocode"}, {"type": "VARCHAR", "mapping": 44, "name": "dropoff_boroname"}, {"type": "VARCHAR", "mapping": 45, "name": "dropoff_ct2010"}, {"type": "VARCHAR", "mapping": 46, "name": "dropoff_boroct2010"}, {"type": "VARCHAR", "mapping": 47, "name": "dropoff_cdeligibil"}, {"type": "VARCHAR", "mapping": 48, "name": "dropoff_ntacode"}, {"type": "VARCHAR", "mapping": 49, "name": "dropoff_ntaname"}, {"type": "VARCHAR", "mapping": 50, "name": "dropoff_puma"} ] } } With all those configuration changes in place Ill restart the Presto server process..$ sudo /opt/presto/bin/launcher restart Querying Multiple Databases with Presto The following with launch Prestos CLI and then query the five databases in a single query..Note the naming convention used here is catalogue.schema.table..$ presto SELECT ( SELECT COUNT(*) FROM kafka.default.trips ), ( SELECT COUNT(*) FROM mysql.taxi.trips ), ( SELECT COUNT(*) FROM redis.schema1.trips ), ( SELECT COUNT(*) FROM mongodb.taxi.trips ), ( SELECT COUNT(*) FROM postgresql.public.trips ); _col0 | _col1 | _col2 | _col3 | _col4 ——-+——-+——-+——-+——- 1000 | 1000 | 1000 | 1000 | 1000 The same query can also be executed via Python..The following will install Python and PyHive among a few other dependencies.. More details

Leave a Reply