Build a Pipeline for Harvesting Medium Top Author Data

Build a Pipeline for Harvesting Medium Top Author DataHow to Use Luigi and Docker to Build a Simple Data Engineering Pipeline for MediumLuc RussellBlockedUnblockFollowFollowingJan 29IntroductionThe idea for this project started when some Python scripts I wrote for a data engineering pipeline got out of control:The pipeline required thousands of API calls to a REST service, each downloading a JSON file.

Some parts of the pipeline took a long time to run and the process would sometimes fail.

My process looked like this:These cumulative API calls added up:fetch parents (1 request)for each parent fetch child (150 requests)for each child fetch info (5000 requests)for each info fetch configuration items (14,000 requests…that’s a lot!)This was a quirk of the API I was using; there was no way to make bulk requests.

There were many issues:The sequence took hours to runIf something failed, it was difficult to work out where to restart fromThere was no way to check progress; if the scripts had been running for an hour, were they nearly done yet?Consulting Google, I found that my design was spookily similar to this example of how not to do things:This thing had grown into a monster.

Luigi To The RescueI looked around for a lightweight data engineering framework which could support:A famous plumberWorkflow orchestrationParallel processingSimple deploymentA quick learning curveWith those requirements in mind, Spotify Luigi looked just right:Luigi is a Python module that helps you build complex pipelines of batch jobs.

It handles dependency resolution, workflow management, visualization etc.

Spotify use Luigi for their data engineering batch jobs which recommend music, for example the Discover Weekly playlists (and I’m generally quite pleased with Discover Weekly’s choices for me, despite a recent disco epidemic).

My face when I saw Luigi do all the thingsLuigi was surprisingly satisfying to work with, and I wondered how I could use it again outside of my day job.

I thought of reusing the approach in a pipeline to harvest data about top Medium authors, and the text below describes a simple framework for a data engineering pipeline which could be reused in other domains.

This is geared towards data engineering vs data science, although the modular architecture of Luigi should make the addition of analytics components straightforward.

Put another way, this is a system design for extracting data, rather than an approach for analyzing and deriving insights from the data (maybe a topic for a future story).

Harvesting Interesting Top Author Data from MediumOur pipeline will harvest data about the top authors in a publication:The list of authors this top story author followsThe total number of stories they’ve writtenHow many claps each story receivedThe average headline length of their storiesThe average word count of their storiesPublications they’ve written forA quick note about the Medium API.

The API itself is fairly limited.

However, you can append ?format=json to many standard Medium URLs, which will return a JSON representation of the data usually shown on the page, along with some extra metadata.

For example, the front page of Towards Data Science renders like this with the ?format=json parameter:])}while(1);</x>{"success":true,"payload":{"collection":{"id":"7f60cf5620c9","name":"Towards Data Science","slug":"towards-data-science","tags":["DATA SCIENCE","MACHINE LEARNING","ARTIFICIAL INTELLIGENCE","BIG DATA","ANALYTICS"],"creatorId":"895063a310f4","description":"Sharing concepts, ideas, and codes.

","shortDescription":"Sharing concepts, ideas, and codes.



etcCredit to Radu Raicea’s excellent article How I used Python to find interesting people to follow on Medium for pointing me to this feature.

The JSON result needs a little bit of cleaning up:Remove invalid characters from the responseNuts and BoltsOne key requirement was to make deployment of my Luigi workflow very simple.

I wanted to assume only one thing about the deployment environment; that the Docker daemon would be available.

With Docker, I wouldn’t need to be concerned with Python version mismatches or other environmental discrepancies.

It took me a little while to work out how to run Luigi inside Docker, though.

The first step is to start the central scheduler in it’s own container.

You can do this with an existing image like this one.

You can submit jobs to the scheduler like this:PYTHONPATH='.

' luigi –module top_artists Top10Artists –date-interval 2012-07That’s using an example from the Luigi docs.

So for deployment, this is the approach I took; a docker-compose file which includes:The Luigi central schedulerA separate Python-based container for the Luigi tasksAn nginx container for exposing the reportsThe entry point of the tasks container sleeps for some period of time, then starts the Luigi pipeline, and this sleep takes the place of a cron job.

Parallel Processing With LuigiMy script was very slow, and I needed a way to run multiple URL fetching processes in parallel, which led to this design:A WrapperTask wraps all the components of the pipelineThe first URL fetch gets a list of many items which need to be queried individually (this is a single request)These chunks are divided between workers, with the result of their work being placed into a file named after each worker id (e.



json)From this point, the files are used by downstream workersThe approach is adapted from this article.

Data Harvesting TasksFor the Medium data gathering pipeline, the WrapperTask looks like this:Pipeline wrapperThe requires() method is doing most of the work here, building up a list of the tasks which need to complete before PipelineTaskis considered complete.

The first task in the pipeline is FetchUserList.

This gets a list of the Medium authors from the front page of a Medium publication.

The list of authors is placed into a file which will be used by downstream tasks.

Fetch a list of authors from a publicationRunning it on the Towards Data Science publication will give us a list of authors mentioned on the page:A list of publication authorsYou might notice that the list of authors returned in this result doesn’t match what’s displayed on the page; what’s up with that?.It turns out that a page submits a series of requests to the Medium API when it loads up, and each returned JSON result includes a pointer to the next set of results in the series.

We need to handle this paging behavior when obtaining the data:Handle Medium API pagingThe implementation for this is largely borrowed from Radu Raicea’s article.

Note that only one FetchUserList task will execute in a given pipeline, because we haven't included a file_number parameter.

The next task is FetchUserFollowings.

This task will be executed concurrently by a number of workers, with parallelization being governed by the workers configuration parameter.

When the task begins executing, it determines which section of the JSON file it is responsible for processing.

The logic for determining the section is controlled by get_part_of_list():Split the whole list, so we can delegate a chunk to each workerThe ExtractUserMetrics task picks out some interesting data points from the post data, for example the total clap count for each article:Extract metricsWe can then derive a few averages from the extracted story data:Calculate averagesFinally, this line starts the process:luigi.

build([PipelineTask()], workers=worker_count)Exposing the ResultsYou might want to expose the reports to end users, for example to data scientists on your team.

One simple way is to add an nginx web server and list the contents of the output directory.

This will allow anyone to hit a URL and download reports, with a report directory for each day.

Trying It OutOk, let’s kick the tires on this thing…First of all, specify a starting point for the URL crawl, by specifying a collection_id in the luigi.

conf file.

Once configured, there are a couple of ways to run the code:At development time, you can run __main__.

py directly.

You'll need to start Luigi first if you want to do this.

You can also run the whole application stack with docker-compose:docker-compose up -dThis will start Luigi, nginx and the tasks container, which will trigger __main__.


Inspecting the summary report, we can get some information about some of the top authors currently writing for Towards Data Science.

For example, we can get some summary stats for one of the current top authors, Will Koehrsen:Some example metricsNice work, Will!Unit TestingI chose pytest as a testing framework.

I like pytest for two main reasons:It requires less boilerplate codeYou can use regular assert statements, instead of needing to remember a special syntaxTo test using a different configuration to the production version, you can use luigi.


add_config_path() in your tests.

This will load config from a specific location:Find and load test configurationI used the MockTarget class so I could run the tests in a repeatable way.

I didn't want one test depending on data created by a previous test, or tests leaving behind result files after they finished running.

MockTarget simulates a Luigi target which will store results in memory instead of writing them out to the file system.

Finally, to allow testing of URL requests without requiring the endpoint to be available, I used the requests_mock library.

The full source code for this sample pipeline is available on Github:lucrussell/docker-luigiA data engineering pipeline for harvesting top author data from Medium — lucrussell/docker-luigigithub.

comConclusionsThis project is a Luigi pipeline for harvesting top author data from Medium.

The project started life as a simple framework for orchestrating a problematic data engineering process.

The pipeline extracts information about authors and their stories by crawling URLs of a publication front page, then gathers some basic metrics about the top authors.

It uses Luigi to coordinate the Python logic, and Docker to simplify deployment.


. More details

Leave a Reply