Meetup on Airflow and Cloud Composer

Later, we will load it into our Data Ware House cluster on the cloud, then we will process it by some ETL operations to get a final report.For this purpose, we will adopt some of Google Cloud Platform solutions for workflow scheduling and data analysis which are respectively Cloud Composer and BigQuery.1 — Data RetrievalAs intended, we will use the Meetup API to get the raw data for our experiment, note that you can retrieve this data using the api calls or the SDK .The code below describes how we get the daily events related to ‘Tech’ field in France.Getting Tech groups in FranceFetch daily events of those groupsHow-To: We look for all the groups labeled as Tech groups in France, then we try to inspect every elapsed event (yesterday events in this example) then we store those information.However, before saving data, it would be great if we could retrieve a set of keywords related to each event..For this case, we will opt for BigQuery as our data warehouse.First, you should already have created a dataset named ‘meetup’.Then, knowing that all the exported data is formatted as JSON entities, use the following command to create a table without worrying about the load –autodetect –source_format=NEWLINE_DELIMITED_JSON ./output_events_*.txtSo now you can see what an event entry looks like in your table preview.3 — Google Cloud Composer and AirflowAfter getting some data for our experiment, it's time to make this process a periodically scheduled workflow.We will create a Cloud Composer environment which follows this architecture.Google Cloud Composer Architecture 2018Thus, the instantiation of this one will avoid us any dev-ops operations to build a workflow orchestration service.For this case, we will use a beta version of Composer which will gives us the opportunity to specify the most recent composer version (1.10.0) and the version of our python SDK 3.7Type the following command in the GCP CLI or in your terminal to create your environment:gcloud beta composer environments create meetup-events-workflow –project ${PROJECT_ID} –location europe-west1 –zone europe-west1-b –image-version=composer-1.3.0-airflow-1.10.0 –machine-type=n1-standard-1 –labels env=beta –node-count=3 –disk-size=20 –python-version=3To be sure that everything is ok, under composer product in the GCP console, you should get this kind of rendering:You can perceive that a new Google Cloud Storage was created for this project, named as ‘${region}-meetup-events–*’..Your application uses the service account to call the Google API of a service, so that the users aren’t directly involved.Use this command to attach the created service account for Google Language Api to the composer workers:'$SERVICE_ACCOUNT_FILE.JSON' is the name of your service account json file, and it should be located in the dags/resources foldergcloud beta composer environments run meetup-events-workflow –location=europe-west1 connections — -a –conn_id="airflow_service_account_conn" –conn_type=google_cloud_platform –conn_extra='{ "extra__google_cloud_platform__project": "'"$project_id"'", "extra__google_cloud_platform__key_path":"'"/home/airflow/gcs/dags/resources/$SERVICE_ACCOUNT_FILE.JSON"'", "extra__google_cloud_platform__scope": ""}'Straightaway, our composer environment is ready to use.4 — Create DAGIf you are not familiar with Airflow concepts, please refer here.Directed Acyclic Graph (DAG) :In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.Right now, let’s describe a daily running DAG named 'meetup_daily_events'.Get data from APINext, to execute the script which consumes the Meetup data from the api, we use a simple PythonOperator.Tip: We connected the task to a single threaded pool so we don't harness the Meetup api with calls from concurrent tasks.To create the pool, use this command line:gcloud composer environments run meetup-events-workflow –location=europe-west1 pool — -s meetup_api_pool 1 'meetup api task usage'Load Data into BigQueryThe second DAG task will check if the previous one did export an event file to GCS.. More details

Leave a Reply