Building a Data Pipeline with Airflow

$ psql rates CREATE TYPE USD_PAIR AS ENUM ( 'AED', 'AFN', 'ALL', 'AMD', 'ANG', 'AOA', 'ARS', 'AUD', 'AWG', 'AZN', 'BAM', 'BBD', 'BDT', 'BGN', 'BHD', 'BIF', 'BMD', 'BND', 'BOB', 'BRL', 'BSD', 'BTC', 'BTN', 'BWP', 'BYN', 'BYR', 'BZD', 'CAD', 'CDF', 'CHF', 'CLF', 'CLP', 'CNY', 'COP', 'CRC', 'CUC', 'CUP', 'CVE', 'CZK', 'DJF', 'DKK', 'DOP', 'DZD', 'EEK', 'EGP', 'ERN', 'ETB', 'EUR', 'FJD', 'FKP', 'GBP', 'GEL', 'GGP', 'GHS', 'GIP', 'GMD', 'GNF', 'GTQ', 'GYD', 'HKD', 'HNL', 'HRK', 'HTG', 'HUF', 'IDR', 'ILS', 'IMP', 'INR', 'IQD', 'IRR', 'ISK', 'JEP', 'JMD', 'JOD', 'JPY', 'KES', 'KGS', 'KHR', 'KMF', 'KPW', 'KRW', 'KWD', 'KYD', 'KZT', 'LAK', 'LBP', 'LKR', 'LRD', 'LSL', 'LTL', 'LVL', 'LYD', 'MAD', 'MDL', 'MGA', 'MKD', 'MMK', 'MNT', 'MOP', 'MRO', 'MTL', 'MUR', 'MVR', 'MWK', 'MXN', 'MYR', 'MZN', 'NAD', 'NGN', 'NIO', 'NOK', 'NPR', 'NZD', 'OMR', 'PAB', 'PEN', 'PGK', 'PHP', 'PKR', 'PLN', 'PYG', 'QAR', 'RON', 'RSD', 'RUB', 'RWF', 'SAR', 'SBD', 'SCR', 'SDG', 'SEK', 'SGD', 'SHP', 'SLL', 'SOS', 'SRD', 'STD', 'SVC', 'SYP', 'SZL', 'THB', 'TJS', 'TMT', 'TND', 'TOP', 'TRY', 'TTD', 'TWD', 'TZS', 'UAH', 'UGX', 'USD', 'UYU', 'UZS', 'VEF', 'VND', 'VUV', 'WST', 'XAF', 'XAG', 'XAU', 'XCD', 'XDR', 'XOF', 'XPD', 'XPF', 'XPT', 'YER', 'ZAR', 'ZMK', 'ZMW', 'ZWL'); CREATE TABLE rates ( pk SERIAL, pair USD_PAIR, valid_until TIMESTAMP WITH TIME ZONE, rate DECIMAL, CONSTRAINT pk PRIMARY KEY (pk, pair, valid_until) ); Installing Airflow Ill create a virtual environment, activate it and install the python modules..As of this writing Airflow 1.7.1.3 is the latest version available via PyPI..When including [postgres] along side Airflow itll install psycopg2 automatically..$ virtualenv .pipeline $ source .pipeline/bin/activate $ pip install airflow[postgres] celery cryptography MySQL-python redis Ill then initialise Airflows database and workspace..$ airflow initdb By default example DAGs will be loaded up for Airflow so if you havent installed any Hive dependencies youll see this annoying error message a lot: ERROR [airflow.models.DagBag] Failed to import: /home/mark/.pipeline/local/lib/python2.7/site-packages/airflow/example_dags/example_twitter_dag.py Traceback (most recent call last): File "/home/mark/.pipeline/local/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file m = imp.load_source(mod_name, filepath) File "/home/mark/.pipeline/local/lib/python2.7/site-packages/airflow/example_dags/example_twitter_dag.py", line 26, in <module> from airflow.operators import BashOperator, HiveOperator, PythonOperator ImportError: cannot import name HiveOperator To avoid loading these examples run the following to patch Airflows configuration: $ sed -i 's/load_examples = True/load_examples = False/' airflow/airflow.cfg Setting up Database Connections Airflow has an inventory system for database and API connections..The easiest way to add these in is via the web interface..The following will launch the web server on port 8080 and bind it to the 0.0.0.0 network interface..$ airflow webserver If you then load up the following URL itll take you to the connections page: $ open http://127.0.0.1:8080/admin/connection/ Ill create two connections..The first will have: A connection type of HTTP..A connection identifier of openexchangerates..A host string of the full API endpoint: https://openexchangerates.org/api/latest.json?app_id=…..The second connection will have: A connection type of Postgres.. More details

Leave a Reply