Let’s Build a Streaming Data Pipeline

Well luckily, there was a way to transfer this data to an environment where I could access tools like Python and Google Cloud Platform (GCP).

This was, however going to be a long process so I needed to do something that would allow me to develop while I waited for the data transfer.

The solution I arrived at was to create some fake data using the Faker library in Python.

I had never used the library before but quickly realized how useful it was.

Taking this approach allowed me to start writing code and testing the pipeline without having the actual data.

With that said, In this post, I will walk through how I built the pipeline described above using some of the technologies available on GCP.

In particular, I will be using Apache Beam (python version), Dataflow, Pub/Sub, and Big Query to collect user logs, transform the data and feed it into a database for further analysis.

For my use case, I only needed the batch functionality of beam since my data was not coming in real-time so Pub/Sub was not required.

I will, however, focus on the streaming version since this is what you might commonly come across in practice.

Introduction to GCP and Apache BeamGoogle Cloud Platform provides a bunch of really useful tools for big data processing.

Some of the tools I will be using include:Pub/Sub is a messaging service that uses a Publisher-Subscriber model allowing us to ingest data in real-time.

DataFlow is a service that simplifies creating data pipelines and automatically handles things like scaling up the infrastructure which means we can just concentrate on writing the code for our pipeline.

BigQuery is a cloud data warehouse.

If you are familiar with other SQL style databases then BigQuery should be pretty straightforward.

Finally, we will be using Apache Beam and in particular, we will focus on the Python version to create our pipeline.

This tool will allow us to create a pipeline for streaming or batch processing that integrates with GCP.

It is particularly useful for parallel processing and is suited to Extract, Transform, and Load (ETL) type tasks so if we need to move data from one place to another while performing transformations or calculations Beam is a good choice.

There is a wide variety of tools available on GCP so it can be difficult to keep track of them all and what their purpose is but here is a summary of them for reference.

Visualizing our PipelineLet’s visualize the components of our pipeline using figure 1.

At a high level, what we want to do is collect the user-generated data in real time, process it and feed it into BigQuery.

The logs are generated when users interact with the product sending requests to the server which is then logged.

This data can be particularly useful in understanding how users engage with our product and whether things are working correctly.

In general, the pipeline will have the following steps:Our user log data is published to a Pub/Sub topic.

We will connect to Pub/Sub and transform the data into the appropriate format using Python and Beam (step 3 and 4 in Figure 1).

After transforming the data, Beam will then connect to BigQuery and append the data to our table(step 4 and 5 in Figure 1).

To carry out analysis we can connect to BigQuery using a variety of tools such as Tableau and Python.

Beam makes this process very easy to do whether we have a streaming data source or if we have a CSV file and want to do a batch job.

You will see later that there are only minimal changes to the code required to switch between the two.

This is one of the advantages of using Beam.

Figure 1: General Data Pipeline: Source:Creating Pseudo data using FakerAs I mentioned before, due to limited access to the data I decided to create fake data that was the same format as the actual data.

This was a really useful exercise as I could develop the code and test the pipeline while I waited for the data.

I suggest taking a look at the Faker documentation if you want to see what else the library has to offer.

Our user data will in general look similar to the example below.

Based on this format we can generate data line by line to simulate real-time data.

These logs give us information such as the date, the type of request, the response from the server, the IP address, etc.




161 – – [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.

1" [401] 155 "https://harris-lopez.

com/categories/about/" "Mozilla/5.

0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.



0 Safari/5312"Based on the line above we want to create our LINE variable using the 7 variables in the curly brackets below.

We will also use these as variable names in our table schema a little later as well.

LINE = """{remote_addr} – – [{time_local}] "{request_type} {request_path} HTTP/1.

1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}""""If we were doing a batch job the code would be quite similar although we would need to create a bunch of samples over some time range.

To use faker we just create an object and call the methods we need.

In particular, faker was useful for generating IP addresses as well as websites.

I used the following methods:fake.





pySetting up Google Cloud.

Note: To run the pipeline and publish the user log data I used the google cloud shell as I was having problems running the pipeline using Python 3.

Google cloud shell uses Python 2 which plays a bit nicer with Apache Beam.

To be able to run the pipeline we need to do a bit of setup.

For those of you who haven't used GCP before you will need to go through the 6 steps outlined on this page.

After this, we will need to upload our scripts to Google cloud storage and copy them to over to our Google cloud shell.

Uploading to cloud storage is pretty straightforward and explained here.

To copy our files, we can open up the Google Cloud shell in the toolbar by clicking the first icon on the left in Figure 2 below.

Figure 2The commands we need to copy over the files and install the necessary libraries are listed below.

# Copy file from cloud storagegsutil cp gs://<YOUR-BUCKET>/ * .

sudo pip install apache-beam[gcp] oauth2client==3.


0 sudo pip install -U pipsudo pip install Faker==1.


2# Environment variablesBUCKET=<YOUR-BUCKET>PROJECT=<YOUR-PROJECT>Creating our database and tableAfter we have completed the set-up steps, the next thing we need to do is create a dataset and a table in BigQuery.

There are a few different ways to do this but the easiest is to just use the google cloud console and first create a dataset.

You can follow the steps in the following link to create a table and a schema.

Our table will have 7 columns corresponding to the components of each user log.

For ease, we will define all columns as strings apart from the timelocal variable and name them according to the variables we generated previously.

Our table schema should look like figure 3.

Figure 3 Table SchemaPublishing our user-log dataPub/Sub is a vital component of our pipeline as it allows multiple independent applications to interact with each other.

In particular, it acts as a middle man allowing us to send and receive messages between applications.

The first thing we need to do is create a topic.

This is pretty simple to do by going to Pub/Sub in the console and clicking CREATE TOPIC.

The code below calls our script to generate log data defined above and then connects to and sends the logs to Pub/Sub.

The only things we need to do are create a PublisherClient object, add the path to the topic using the topic_path method and call the publish function while passing the topic_path and data.

Notice that we are importing generate_log_line from our stream_logs script so make sure these files are in the same folder or you will get an import error.

We can then run this in our google console using:python publish.

pyOnce the file is running we should be able to see log data printing to the console like the figure below.

This script will keep running until we use CTRL+C to kill it.

Figure 4: publish_logs.

py outputCoding up our PipelineNow that we have the initial set up out of the way we can get to the fun stuff and code up our pipeline using Beam and Python.

To create a Beam pipeline we need to create a pipeline object (p).

Once we have created the pipeline object we can apply multiple functions one after the other using the pipe (|) operator.

In general, the workflow looks like the image below.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform] | [Second Transform] | [Third Transform])In our code, we create two custom functions.

The regex_clean function which searches the data and extracts the appropriate string based on the PATTERNS list using the re.

search function.

The function returns a comma-separated string.

If you are not a regex expert I recommend looking at this tutorial and playing around in a notebook to test the code.

After this, we define a custom ParDo function called Split which is a type of Beam transform for doing parallel processing.

There is a specific way of doing this in Python where we have to create a class which inherits from the DoFn Beam class.

The Split function takes the parsed string from the previous function and returns a list of dictionaries with keys equal to the column names in our BigQuery table.

The one thing to note about this function is that I had to import datetime within the function for it to work.

I was getting an error when I imported at the top of the file which was odd.

This list then gets passed to the WriteToBigQuery function which just appends our data to the table.

The code for both the Batch DataFlow job and the Streaming DataFlow job are provided below.

The only difference between the batch and streaming code is that in the batch job we are reading a CSV from src_path using the ReadFromText function in Beam.

Batch DataFlow Jobmain_pipeline_batch.

pyStreaming DataFlow Jobmain_pipeline_streaming.

pyRunning the PipelineWe can execute the pipeline a few different ways.

If we wanted to we could just run it locally from the terminal provided we have remotely logged in to GCP.

python -m main_pipeline_stream.

py –input_topic "projects/user-logs-237110/topics/userlogs" –streamingWe are going to be running it using DataFlow, however.

We can do this using the command below while also setting the following mandatory options.

project – The ID of your GCP project.

runner – The pipeline runner that will parse your program and construct your pipeline.

For cloud execution, this must be DataflowRunner.

staging_location – A Cloud Storage path for Cloud Dataflow to stage code packages needed by workers executing the job.

temp_location – A Cloud Storage path for Cloud Dataflow to stage temporary job files created during the execution of the pipeline.

streamingpython main_pipeline_stream.

py –runner DataFlow –project $PROJECT –temp_location $BUCKET/tmp –staging_location $BUCKET/staging–streamingWhile this command is running we can head over to the DataFlow tab in the google console and view our pipeline.

When we click into the pipeline we should something like Figure 4.

For debugging purposes, it can be quite helpful to go into the logs and then Stackdriver to view detailed logs.

This has helped me figure out issues with the pipeline on a number of occasions.

Figure 4: Beam PipelineAccessing our data in BigQueryRight we should have our pipeline up and running with data flowing into our table.

To confirm this, we can go over to BigQuery and view the data.

After using the command below you should see the first few rows of the dataset.

Now that we have our data stored in BigQuery we can do further analysis as well as share the data with colleagues and start answering and addressing business questions.

SELECT * FROM `user-logs-237110.


logdata` LIMIT 10;Figure 5: BigQueryTakeawaysHopefully, this provides a useful example of creating a streaming data pipeline and also of finding ways of making data more accessible.

Having the data in this format provides many benefits to us.

We can now start answering useful questions like how many people use our product?.Is the user base growing over time?.What aspects of the product are people interacting with the most?.and are there any errors happening when there shouldn't be?.These are the types of questions that an organization will be interested in and based on these insights we can drive improvements to the product and improve user engagement.

Beam is really useful for this type of exercise and there are a number of other interesting use cases as well.

For example, you may want to analyze stock tick data in real-time and make trades based on the analysis, maybe you have sensor data coming in from vehicles and you want to figure out calculate the level of traffic.

You could also, for example, be a games company collecting data on users and using this to create dashboards to track key metrics.

Ok guys, so that’s it for another post, thanks for reading and for those who want to see the full code, below is a link to my GitHub.

DFoly/User_log_pipelineCreating a Streaming Pipeline for user log data in Google Cloud Platform – DFoly/User_log_pipelinegithub.

com.. More details

Leave a Reply