Data Engineering — Basics of Apache Airflow — Build Your First Pipeline

Data Engineering — Basics of Apache Airflow — Build Your First PipelineExtracting Data from Multiple Data SourcesNicholas LeongBlockedUnblockFollowFollowingJun 20If you’re in tech, chances are you have data to manage.

Data grows fast, gets more complex and harder to manage as your company scales.

The management wants to extract insights from the data they have, but they do not have the technical skills to do that.

They then hire you, the friendly neighbourhood data scientist/engineer/analyst or whatever title you want to call yourself nowadays ( does not matter you’ll be doing all the work anyways ) to do exactly just that.

You soon realise that in order for you to provide insights, you need to have some kind of visualisation to explain your findings and monitor them over time.

For these data to be up to date, you need to extract, transform, load them into your preferred database from multiple data sources in a fixed time interval ( hourly , daily, weekly, monthly).

Companies have workflow management systems for tasks like these.

They enable us to create and schedule jobs internally.

Everyone has their own preference, but I would say many of the old WMS tend to be inefficient, hard to scale, lack of a good UI, no strategy for retry, not to mention terrible logs that makes troubleshooting / debugging a nightmare.

It causes unnecessary wastage of effort and time.

Here is an example of a traditional crontab used to schedule jobs.

Fret no more.

Apache Airflow is here to save the day.

Apache airflow can act as your company’s WMS, and then some.

Airflow was originally built by the guys at Airbnb, made open source.

It is used by Airbnb for:Data warehousing: extract, transform and load into data warehouseGrowth analytics: compute metrics around guest and host engagement as well as growth accountingExperimentation: compute our A/B testing experimentation frameworks logic and aggregatesEmail targeting: apply triggers to engage customers through email campaignsSessionization: compute clickstream and time spent datasetsSearch: compute search ranking related metricsData infrastructure maintenance: database scrapes, folder cleanup, applying data retention policies(source : https://medium.

com/airbnb-engineering/airflow-a-workflow-management-platform-46318b977fd8)Airflow is also able to interact with popular technologies like Hive, Presto, MySQL, HDFS, Postgres and S3.

The base modules of airflow are also designed to be extended easily, so if your stack is not included (which is unlikely), modules can be re-written to interact with your required technology.

I am not going into detail on how airflow’s backend work.

You can read about it more here.

Airflow also has a shiny UI that allows you to manage and monitor your workflows.

Dependencies are built more easily, logs are easily accessible, code can be easily read, time spent on each task, time to finish, trigger/pause workflows with a click of a button and many more can be done with the UI.

Not sold yet ?You can take Airflow to the cloud now.

Google has launched Google-Cloud-Composer, which is a hosted service of Apache Airflow on the cloud.

This saves you the hassle of running Airflow on a local server in your company.

You will never have to worry about Airflow crashing ever again.

As you can see, data pipelines are just scratching the surface.

The possibilities with airflow are endless.

But you’re not here for that small talk.

You want to get your hands dirty.

Well, let’s do exactly just that.

Photo by Amaury Salas on UnsplashGetting StartedI am not going to go through the installation of Airflow because I am using Google-Cloud-Composer, please research on how to get it installed as it is quite a process.

After that’s done, let’s run through on how to create your first workflow.

Airflow creates workflows through DAGs (Directed Acyclic Graphs).

DAGs are a high-level outline that define the dependent and exclusive tasks that can be ordered and scheduled.

Do not be frightened by this as it is simply a python file that is modified by yourself.

Open your preferred text editor and start coding.

Imports# airflow relatedfrom airflow import modelsfrom airflow import DAG# other packagesfrom datetime import datetime, timedeltaSetting default_argsdefault_dag_args = { # Setting start date as yesterday starts the DAG immediately when it is # detected in the Cloud Storage bucket.

# set your start_date : airflow will run previous dags if dags #since startdate has not run#notify email is a python function that sends notification email upon failure 'start_date': datetime(2019, 5, 1, 7), 'email_on_failure': True, 'email_on_retry': True, 'project_id' : 'your_project_name', 'retries': 1, 'on_failure_callback': notify_email, 'retry_delay': timedelta(minutes=5),}with models.

DAG( dag_id='your_dag_name', # Continue to run DAG once per day schedule_interval = timedelta(days=1), catchup = True, default_args=default_dag_args) as dag:In this block, we define our default_args parameters, you can read about all the parameters in the default arguments here.

We then instantiate a DAG object with the schedule_interval set for daily and the start_date set for May 1st 2019, 7am as given in the default_dag_args.

These are two important parameters to define the execution date/time of your DAG.

 The DAG will run the first time on start_date + schedule_interval .

In this case, the DAG will run the first time on May 2nd 2019, 7am with the execution date = May 1st 2019, 7am .

Heres an example,this DAG’s execution date was 2019–06–12 17:00, the DAG ran on 2019–06–13 17:00, resulting in this task running at 2019–06–13 18:02 because the schedule_interval of the DAG is a day.

This pretty much sets up the backbone of your DAG.

Next, we have to define the tasks to be executed and how to execute those tasks.

Defining our DAG, Tasks and OperatorsFirstly, you need to have an idea on what kind of tasks to run in order to extract data.

Let’s take an example from my pipeline.

It involves extracting data from a Postgres DB and loading it into our data warehouse (Google Bigquery).

From my experience, it goes something like this.

Tasks in order :Connecting to the Postgres DB slave, checking for any replication lag.

Writing data into CSV, storing it somewhere.

Loading the CSV data into the data warehouseChecking for any replication of load in the data warehouseWriting logsSend notification emailsThere are many ways to design your pipeline, but this is how I do it.

I choose to create new custom operators for each task consisting of the code to run it.

I then import it into the DAG and set dependencies and orders according to the operators.

There are a few advantages to this :The DAGs code looks much cleaner, high readability (IMPORTANT).

The operators can be reused for multiple DAGs, no re-typing of code is required.

If there are any changes to the tasks, making corrections to the operator code corrects all the DAGs that uses that specific operatorLess code manipulation and work, the team can just change a parameter in the DAG and the operator will work differentlyThe operators do all the real work.

Let’s write one of the operators from my pipeline: Task 2.

Writing data into CSV from the data sourceOpen a second window in your text editor and start coding your operator.

Imports# airflow relatedfrom airflow.

models import BaseOperatorfrom airflow.

utils.

decorators import apply_defaults# other packagesfrom datetime import datetime, timedeltafrom os import environDefining your OperatorAll operators derive from BaseOperator except sensor operators.

Sensor operators derive from BaseSensorOperator that derive from BaseOperator, we will not cover sensors here.

BaseOperators have an execute function and your self defined functions.

A general format of an operator, which serves as the backbone of operators can be found below.

class AirflowOperatorName(BaseOperator): @apply_defaults def __init__( self, param1, param2, *args, **kwargs): super(AirflowOperatorName, self).

__init__(*args, **kwargs) self.

param1 = param1 self.

param2 = param2 def execute(self, context): # task to be executedApplying that backbone to our Task 2.

Writing data into CSV from the data sourceclass DataSourceToCsv(BaseOperator): """ Extract data from the data source to CSV file """@apply_defaultsdef __init__( self, bigquery_table_name, extract_query, connection = #connection, *args, **kwargs):super(DataSourceToCsv, self).

__init__(*args, **kwargs) self.

bigquery_table_name = bigquery_table_name self.

extract_query = extract_query self.

connection = connection self.

file_path = #filepath_to_save_CSVdef __datasource_to_csv(self, execution_date):# code to execute def execute(self, context):self.

__datasource_to_csv(execution_date)This basically covers the majority of your operator file.

In this block of code we are:Defining parameters to be passed into the operator from the DAGDesigning the code that does the workInitiating an instance of this operator with specific parameter valuesDesign your codeThere are many approaches on how you want the task to be executed.

 Here are the steps and code I use for Writing CSV from the data source:Run a sql code (defined in the DAG) to extract results with minimal transformations.

Write the results into an CSV file with proper naming, save it somewhere accessible by the data warehouse.

Here’s the full code I use for the operator as a whole:Congratulations!You have successfully written your first operator.

Save it as ‘DataSourceToCsv’.

Now let’s import it into your DAG.

The way I import the operators in Google Cloud Composer is by storing the operator files in a folder called ‘operators’ in the DAG folder.

Then import it into the DAG by:Back to the DAG’s code:Import Operator# import operators from the 'operators' filefrom operators import DataSourceToCsvLet’s call the operator in our DAG and pass in our parameters:Calling Operatortask2 = DataSourceToCsv.

DataSourceToCsv( task_id='task name', bigquery_table_name = 'tablename', extract_query = """ Select * from tablename """, connection = #my defined postgres db connection)After defining all your tasks, it’s time to set the sequence and dependencies to execute them.

 To set the sequence of execution, it is as easy as putting ‘>>’ on your tasks.

 Example below:task1 >> task2 >> task3 >> task4To set the dependencies of tasks, it is as easy as putting ‘set_upstream()’ on your tasks.

Example below shows that task1 and task2 executes in parallel, task3 depends on the completion of task2 and executes task4 after that.

task1task2task3.

set_upstream([task2])task3 >> task 4Congratulations!You are pretty much set.

Save your DAG file as ‘DAGNAME.

py’ and upload it into the DAG folder in airflow.

Wait for a couple of minutes and you should see a new DAG pop up in the UI.

How does it feel to build your first pipeline ? I must say, I find satisfaction in building stuff, pipelines are just one of them.

There is just an extremely satisfying feeling when the DAG works from end to end.

However, it does not always work the way you want it to.

After finishing your first DAG, you have to start debugging it as there will always be problems.

Once you gain experience, the errors will pop up less.

It is simply a part of the journey.

Here’s the full code of the DAG I have just described :I hope I have at least sparked your interest in data engineering, if not assisted you in building your first pipeline.

If you have any questions, feel free to drop me an email : nickmydata@gmail.

com or connect with my linkedIn.

To end, let me drop a quote.

Without big data, you are blind and deaf and in the middle of a freeway.

 — Geoffrey Moore.

. More details

Leave a Reply