Use AWS Glue and/or Databricks’ Spark-xml to process XML data

As xml data is mostly multilevel nested, the crawled metadata table would have complex data types such as structs, array of structs,…And you won’t be able to query the xml with Athena since it is not supported.So it is necessary to convert xml into a flat format.To flatten the xml either you can choose an easy way to use Glue’s magic(!), a simple trick convert it to csv or you can use Glue transforms to flatten the data, which i will elaborate on shortly.You need to be careful about the flattening, which might cause null values even the data is available in original structure.I will give an example for alternative approaches, and it is up to you which to choose according to your use case.Crawl XMLConvert to CSV with Glue JobUsing Glue PySpark Transforms to flatten the dataAn Alternative : Use Databricks Spark-xmlDataset : : dataset to S3:Download the file from the given link and go to S3 service on AWS console.Create a bucket with “aws-glue-” prefix(I am leaving settings default for now)Click on the bucket name and click on Upload:(this is the easiest way to do this, you can also setup AWS CLI to interact with aws services from your local machine, which would require a bit more work incl. installing aws cli/configurations etc.)Click on Add files and choose the file you would like to upload, just click Upload.You can setup security/lifecycle configurations, if you click Next.Crawl XML MetadataFirst of all , if you know the tag in the xml data to choose as base level for the schema exploration, you can create a custom classifier in Glue ..Without the custom classifier, Glue will infer the schema from the top level.In the example xml dataset above, I will choose “items” as my classifier and create the classifier as easily as follows:Go to Glue UI and click on Classifiers tab under Data Catalog section.“item” will be the root level for the schema explorationI create the crawler with the classifier :Give the crawler a name and Select the classifier from the listLeave everything as default for now , browse for the sample data location (‘Include path’)Add Another Data Store : NoYou can use your IAM role with the relevant read/write permissions on the S3 bucket or you can create a new one :Frequency: Run On DemandChoose the default db(or you can create a new one) and leave settings as defaultReview and Click Finish.Now we are ready to run the crawler: Select the crawler and click on Run Crawler ,once the Status is ‘Ready’ , visit Database section and see the tables in database.(Tables added :1 means that our metadata table is created )Go to Tables and filter your DB:Click on table name and the output schema is as follows:Now we have an idea of the schema, but we have complex data types and need to flatten the data.2..Convert to CSV :It will be simple and we will use the script provided by Glue:Go to Jobs section in ETL menu and Add Job:Name the job and choose the IAM role we created earlier simply(make sure that this role has permissions to read/write from/to source and target locations)Tick the option above,Choose the target data store as S3 ,format CSV and set target pathNow the magic step:(If we selected Parquet as format, we would do the flattening ourselves, as parquet can have complex types but the mapping is revealed easily for csv.)You can rename, change the data types, remove and add columns in target..I want to point that the array fields mapped to string which is not desirable from my point of view.I leave everything as default,review,save and continue with edit script.Glue proposed script:We can Run the job immediately or edit the script in any way.Since it is a python code fundamentally, you have the option to convert the dynamic frame into spark dataframe, apply udfs etc..and convert back to dynamic frame and save the output.(You can stick to Glue transforms, if you wish .They might be quite useful sometimes since the Glue Context provides extended Spark transformations.)I have added some lines to the proposed script to generate a single CSV output, otherwise the output will be multiple small csv files based on partitions.Save and Click on Run Job, this will bring a configuration review, so you can set the DPU to 2(the least it can be) and timeout as follows:Let’s run and see the output.You can monitor the status in Glue UI as follows:Once the Run Status is Succeeded , go to your target S3 location:Click on the file name and go to the Select From tab as below:If you scroll down, you can preview and query small files easily by clicking Show File Preview/Run SQL(Athena in the background):The struct fields propagated but the array fields remained, to explode array type columns, we will use pyspark.sql explode in coming stages.3..Glue PySpark Transforms for UnnestingThere are two pyspark transforms provided by Glue :Relationalize : Unnests the nested columns, pivots array columns, generates joinkeys for relational operations(joins, etc.), produces list of framesUnnestFrame : Unnests the frame, generates joinkeys for array type columns , produces a single frame with all fields incl..joinkey columns.We will use Glue DevEndpoint to visualize these transformations :Glue DevEndpoint is the connection point to data stores for you to debug your scripts , do exploratory analysis on data using Glue Context with a Sagemaker or Zeppelin Notebook .Moreover you can also access this endpoint from Cloud9 ,which is the cloud-based IDE environment to write, run, and debug your codes.You just need to generate SSH key on Cloud9 instance and add the public ssh key while creating the endpoint..To connect to the endpoint you will use the “SSH to Python REPL” command in endpoint details(click on endpoint name in Glue UI),replace private key parameter with the location of yours on your Cloud9 instance.Create a Glue DevEndpoint and a Sagemaker Notebook:I will use this endpoint also for Databricks spark-xml example, so download the jar file to your PC from, upload the jar to S3 and set “Dependent jars path” accordingly:Name it and choose the IAM role we used before.If you have a codebase you want to use, you can add its path to Python library path.You can leave every other configuration as default and click Finish .It takes approx..6 mins for the endpoint to be Ready.Once the endpoint is ready, we are ready to create a notebook to connect to it.Choose your endpoint and click create Sagemaker Notebook from Actions drop down list.It will take a couple of minutes for the notebook to be ready, once created.Name it, leave default settings and name the new IAM role , click Create NotebookOpen the notebook and create a new Pyspark notebook:You can copy and paste the boilerplate from the csv job we created previously , change glueContext line as below and comment out the job related libraries and snippets:You can either create dynamic frame from catalog, or using “from options” with which you can point to a specific S3 location to read the data and, without creating a classifier as we did before ,you can just set format options to read the data.You can find more about format options in used the frame created by from options for the following steps:(the outputs will be the same even if you use the catalog option, the catalog does not persist a static schema for the data.)You can see that the transform returns a list of frames, each has an id and index col for join keys and array elements respectively.It will be clearer if you look at the root table. For e.g. the fillings has only an integer value in this field in root table, this value matches the id column in the root_fillings_filling frame above.An important thing is that we see that “batters.batter” field propagated into multiple columns.For the item 2 “batters.batter” column is identified as struct , however for item 3 this field is an array!. So here comes the difficulty of working with Glue.If you have complicated multilevel nested complicated structure then this behavior might cause lack of maintenance and control over the outputs and problems such as data loss ,so alternative solutions should be considered.Unnest Frame:Let’s see how this transform will give us a different output :We can see that this time everything is in one frame but again “batters.batter” resulted in multiple columns , this brings uncertainty around the number of columns also. Considering an ETL pipeline, each time a new file comes in, this structure will probably change.And unnest could spread out the upper level structs but is not effective on flattening the array of structs. So since we can not apply udfs on dynamic frames we need to convert the dynamic frame into Spark dataframe and apply explode on columns to spread array type columns into multiple rows.I will leave this part for your own investigation.Moreover I would expect to have not two different spread of “batters.batter” and imho there could be an “array of structs” type column for this field and the “item 2” would have an array of length 1 having its one struct data.And Finally… Databricks spark-xml :It may not be the best solution but this package is very useful in terms of control and accuracy. A good feature is that un-parseable records are also detected and a _corrupt_record column is added with relevant information.Now here is the difference I expected 🙂 ..You can see that “batters.batter” is an array of structs..Moreover for more reading options, you can have a look at nulls, no probsSo you don’t need to consider whether there is an struct or array column, you can write a generic function for exploding array columns by making use of the extracted schema.We saw that even though Glue provides one line transforms for dealing with semi/unstructured data, if we have complex data types, we need to work with samples and see what fits our purpose.Hope you enjoyed it !. More details

Leave a Reply