Brand Safety with Structured Streaming, Delta Lake, and Databricks

The original blog is from Eyeview Engineering’s blog Brand Safety with Spark Streaming and Delta Lake reproduced with permission.

Eyeview serves data-driven online video ads for its clients.

Brands know the importance of ad placement and ensuring their ads are not placed next to unfavorable content.

The Internet Advertising Bureau (IAB), defines brand safety as keeping a brand’s reputation safe when they advertise online.

In practice, this means avoiding the ads placement next to inappropriate content.

The content of the webpage defines the segments of that page URL (e.

g.

CNN.

com has news about the presidential election, then politics can be segments of that page).

Brand Safety at Eyeview The diagram below shows how Eyeview implemented the concept of brand safety and what challenges were faced.

Eyeview’s cluster of real-time bidders requires the information on whether a URL is brand safe or not in order to make the decision to serve the ad.

  It gets this information from Aerospike (our real-time database with latency close to 10ms), but to persist this information in Aerospike we defined an offline solution that loads the segment information.

Once Eyeview’s cluster of bidders gets the bid request,  the URL of that bid request is dumped into S3.

The number of requests the cluster gets is close to 500k requests per second.

Getting numerous URLs every second dumped into S3 after the deduping process (so that HTTP call is not made for the same URLs multiple times in the same timeframe) can create a massive problem processing a large number of small files simultaneously.

Big Data Challenges Many of the developers in the big data world face this problem in Spark or Hadoop environments.

Below is the list of challenges that can arise while batch processing a large number of small files.

Challenge #1: Underutilized Resources Underutilizing the cluster resources (seen in the photo below) for reading a small size file (~KB size) using a 1TB cluster would be like cutting bread using a sword instead of a butter knife.

Challenge #2: Manual Checkpointing It is important to perform manual checkpointing to track which files were processed and which were not.

  This can be extremely tedious in cases involving reprocessing of files or failures.

Also, this may not be scalable if the data size becomes very large.

Challenge #3: Parquet Table Issues Let’s assume somehow we managed to process these large number of small files and we are not caching/persisting data on the cluster and writing directly to a parquet table via Spark, we then would end up writing too many small files to the tables.

The problem with parquet files is that the continuous append on the table is too slow.

  We leveraged overwrite mode to save the data which ended up creating millisecond partitions to table.

Challenge #4: No Concurrent Reads on Parquet Tables The latency of the offline jobs became so high that the job was continuously writing the data to the parquet tables, which means no other jobs can query that table and parquet does not work great with a very large number of partitions.

Databricks Spark Streaming and Delta Lake To solve the above challenges we introduced two new technologies: Databricks Spark Streaming and Delta Lake.

  The source of most of a large number of small files can be converted from batch processing to streaming processing.

Databricks Spark streaming helped in solving the first two challenges.

Instead of a cluster of bidders writing files which contain the URLs to S3, we started sending URLs directly to a kinesis stream.

This way we didn’t have a small number of files; all the data is in the streams, which would lead to utilizing our Spark resources efficiently.

By connecting Spark Streaming with Kinesis streams we no longer need to do manual checkpointing.

Since Spark Streaming is inherently fault-tolerant we don’t have worry about failures and reprocessing of files.

The code snippet below reads the data from the Kinesis stream.

import org.

apache.

spark.

sql.

types.

_ val jsonSchema = new StructType() .

add(“entry”, StringType) .

add(“ts”, LongType) val kinesisDF = spark.

readStream .

format(“kinesis”) .

option(“streamName”, “kinesis Stream Name”) .

option(“initialPosition”, “LATEST”) .

option(“region”, “aws-region”) .

load() val queryDf = kinesisDF .

map(row => Charset.

forName(“UTF-8”).

newDecoder().

decode(ByteBuffer.

wrap(new Base64().

decode(row.

get(1)).

asInstanceOf[Array[Byte]])).

toString) .

selectExpr(“cast (value as STRING) jsonData”) .

select(from_json(col(“jsonData”), jsonSchema).

as(“bc”)) .

withColumn(“entry”,lit($”bc.

entry”)) .

withColumn(“_tmp”, split($”entry”, “,”)) .

select( $”_tmp”.

getItem(0).

as(“device_type”), $”_tmp”.

getItem(1).

as(“url”), $”_tmp”.

getItem(2).

as(“os”), $”bc.

ts”.

as(“ts”) ).

drop(“_tmp”) The other two challenges with the parquet table are solved by introducing a new table format,  Delta Lake.

  Delta Lake supports ACID transactions, which basically means we can concurrently and reliably read/write this table.

Delta Lake tables are also very efficient with continuous appends to the tables.

  A table in Delta Lake is both a batch table, as well as a streaming source and sink.

The below code shows persisting the data into delta lake.

This also helped us in removing the millisecond partitions; see the below code for reference (partitions are up to only hour level).

val sparkStreaming = queryDf.

as[(String,String,String,Long)].

mapPartitions{partition=> val http_client = new HttpClient http_client.

start val partitionsResult = partition.

map{record=> try{ val api_url = ApiCallingUtility.

createAPIUrl(record.

_2,record.

_1,record.

_3) val result = ApiCallingUtility.

apiCall(http_client.

newRequest(api_url).

timeout(500, TimeUnit.

MILLISECONDS).

send(),record.

_2,record.

_1) aerospikeWrite(api_url,result) result } catch{ case e:Throwable=>{ println(e) } } } partitionsResult } Conclusion and Results By default, our cluster of bidders makes no bid as the decision if the bidders did not have segment information of a URL.

This would result in less advertising traffic which would have a substantial monetary impact.

The latency of the old architecture was so high that the result was filtered out URLs – i.

e.

no ads.

By switching the process to Databricks Spark Streaming and Delta Lake, we decreased the number of bid calls to be filtered by 50%!.Once we had moved the architecture from batch processing to a streaming solution, we were able to reduce the cluster size of the Spark jobs, thus significantly reducing the cost of the solution.

More impressively, now we only require one job to take care of all the brand safety providers, which further reduced costs.

Try Databricks for free.

Get started today.

. More details

Leave a Reply