In other words, how exactly is the transform_script used?It is actually quite straightforward.
When the file gets downloaded from S3, it is saved in a temporary file on the local filesystem (say /tmp/tmpekzxuzb4).
The S3FileTransformOperator then crates another temporary file (for instance /tmp/tmpib45wk81) and executes a command in which both of these are passed as arguments to the transform_script.
In our case the executed command would look as follows:/bin/cp /tmp/tmpekzxuzb4 /tmp/tmpib45wk81It may not seem like much, but this is actually the core of the robust framework we mentioned above — since transform_script can be literally any program that can be executed in the same environment as Airflow, S3FileTransformOperator can cover all sorts of transformation needs of data on S3.
Returning to our previous examples with AWSAthenaOperator, what we would really like to do is a bit more involved than the sample script above.
We would like to pick up the UUID of the query we executed, construct the path on S3 where we should be able to find the results once the query finishes, and then use this path as source_s3_key in S3FileTransformOperator .
To do so, we will need a way of passing information from one Airflow task to another.
Luckily, Airflow has us covered once again with a feature called XComs.
Passing information between Airflow tasksAs an abbreviation of “cross-communication”, XComs can be viewed as messages Airflow tasks are passing down the DAG.
Each task can xcom_push a message which can then be xcom_pulled by another task — the only necessary parameter is the task_id of the task that xcom_pushed the message in the first place.
It is also possible to use XComs in a much more advanced way.
That is out of scope of this article — please consult the documentation for more info on that.
When it comes to XComs and Airflow Operators, they are very neatly integrated — the return value of Operator’s execute() function is automatically xcom_pushed, so that subsequent tasks can xcom_pull the return value by just specifying the task_id they are interested in.
On top of all this, xcom_pull can be used directly in templated strings we already mentioned.
This means that with very small changes, execution of Airflow’s tasks can be conditioned on previous tasks return values — we just need to specify these parameters in templated strings.
Here is a quick example of what such a string may look like:SELECT * FROM some_tableWHERE id={{ task_instance.
xcom_pull(task_ids='generate_id') }}Sadly, not all Airflow Operators return values that may be useful to subsequent tasks in the DAG.
Although it may change in the future, the AWSAthenaOperator is one such example.
It does not return anything, even though we could really use the UUID of the executed query: that is how we locate the file with results.
Thankfully, this problem has a pretty straightforward solution.
Since AWSAthenaOperator is just a Python class, we can inherit it and ensure that this new class of ours works just the same, except for returning the query ID in its execute function.
Here is what it may look like:With an operator like this, we should have all pieces of the puzzle ready:A way of executing Athena queriesA way of finding the results on S3A way of copying a file on S3 to a different locationLet’s finally put it all together.
The grand finale: putting it all togetherAfter all this preparation, we can finally do what the title promised: execute an Athena query and move the results to a specific S3 location.
We will do so by using a slightly altered AWSAthenaOperator described above which will allow us to piece together the S3 path with the results of the Athena query.
We’ll then download this file from S3 and reupload it to a different location using the S3FileTransformOperator .
As Linus Torvalds says, “Talk is cheap.
Show me the code”.
We agree, so here it is:As we can see, it took only about 30 lines to do something like this: a tribute to Airflow, its operators and a community of contributors who build and maintain them.
Testing this DAG out should also be fairly straightforward — if you still have all the environment variables set up correctly, the following command can be used to execute the first task:airflow test athena_query_and_move run_query 2019–06–07A quick note on XCom messages: since they are stored in the Airflow’s DB, they persist between task runs.
This is of great advantage to us in this case, as we can just test the move_results task knowing full well that xcom_pull will be able to retrieve the query UUID from the previous run:airflow test athena_query_and_move move_results 2019–06–07If all went well, at this point you should be able to locate the results of your query at s3://mybucket/otherpath/myresults.
csv!BONUS: Transforming query results to different data formatsAs we mentioned above, the transformation script did not do anything particularly interesting: just an exact 1:1 copy.
With a little bit of code, however, it can be updated to convert the downloaded data to a different format: say from CSV to Apache Parquet, which can then be read back by AWS Athena (and the circle will be complete).
To do so, we will need to install pandas along with the pyarrow module Pandas requires for working with the Parquet data format.
Our transformation script will basically be a oneliner: we will just use Pandas to read the CSV file (the first argument of the script) in and save it to a Parquet file (the second argument).
The updated DAG, along with the small transformation script, may look something like this:Just as before, we can test this updated DAG via Airflow’s test command:airflow test athena_query_and_move run_query 2019–06–07airflow test athena_query_and_move move_results 2019–06–07and if all went well, we should be able to find the query results stored in the Apache Parquet format at s3://mybucket/otherpath/myresults.
parquet!ConclusionAlthough it took over 2000 words to describe, executing Athena queries and moving the results around S3 is not that difficult in the end — it could even be argued that it is actually quite easy!As we saw, this is mostly due to the well thought-out architecture of Airflow’s tasks, Operators and other concepts that allow us to neatly tie them together.
With all of these pieces of the puzzle at our disposal, executing Athena queries, moving the results around S3, and even applying transformations on top of it is pretty much a solved problem.
.. More details