Rendezvous Architecture for Data Science in Production

Additionally, your challenger model might not work so well after all.

Unfortunately, you would have to expose 10% of users to an inferior experience just to find out.

This clearly needs a better approach.

But we are still not done.

In a successful data science endeavour, the boundaries of development and production are increasingly blurry.

A single business objective will see many different incumbent and challenger models, and a requirement to retrain these models in a constantly changing environment with their continuous benchmarking, monitoring and production hand-over.

The actual real world scenario will quickly look much more like this:These are requirements giving any IT department a good run for its money.

The challenge becomes to address the increasing heterogeneity of the data science toolkits, the required elasticity and flexibility of the data infrastructure, and the end-to-end automation of the complex data science model life cycle.

Part 2: The SolutionMeet the rendezvous architectureSummarising the previous introduction to the problem statement, we are looking for some architecture toEvaluate a big number of incumbent and challenger models in parallelManage the model life cycleHandle an increasing heterogeneity of data science toolkitsAllow experimentation in production without impacting the user experience and decouple the business objectives from the data science objectivesDecouple enterprise requirements like SLAs and GDPR from the data science modelsScale this to peaks of 50+k page loads per minute without hiring armies of DevOps engineersThe solution to this very real data science problem came from Ted Dunning and Ellen Friedman named Rendezvous Architecture in their book Machine Learning Logistics(O’Reilly 2017).

At the core of the architecture stands Input Data as a Stream and models subscribing to the data using a pub-sub messaging pattern.

Receiving scoring requests via a stream allows us to easily distribute a request and evaluate a big number of models in parallel.

But it does not solve how to return a response robustly from a big number of models while upholding enterprise requirements.

To return a response the architecture also treats model scores as a stream in combination with the name lending Rendezvous service.

Conceptually, the rendezvous service subscribes to the original scoring requests to combine them with model scores from the model response stream.

Subscribing to scoring requests in the rendezvous services decouples the ability to return a response from the models successfully returning scoring outputs.

It also allows the rendezvous service to implement an SLA: in a scenario where the models are failing or are delayed and no scoring output reaches the rendezvous service within the SLA the rendezvous service can, for example, send a meaningful default response.

This is possible because the rendezvous service was aware of the scoring request in the first place.

The rendezvous service implements a race condition of model scoring outputs versus configurable SLAs and timeout policies.

For a much more detailed introduction of the rendezvous architecture you should read Ted Dunning’s and Ellen Friedman’s great book: https://mapr.

com/ebooks/machine-learning-logistics/ch03.

htmlTaking this conceptual architecture and applying it to the requirements we set out we get the following data science platform:We split the architecture as desired into a hierarchy of a business objective which implements enterprise requirements and the modelling objectives of the data science team.

The entire platform is built with containerised stateless services which provides the desired decoupling and scalability.

Even stateful models can be implemented on the platform with the help of the Model Data Enricher as discussed below in step 3.

When a scoring request arrives the ingress controller assigns the request to the Score Evaluation Service of a Rendezvous Service instance which instantiates an async evaluation policy.

This sets the race condition for the model scoring outputs.

Each policy instance acts like a mailbox collecting the model scoring outputs of their corresponding scoring requests.

The Score Evaluation Service attaches the policy instance ID and the rendezvous service ID to the scoring request and publishes it to the input data stream for that business objective.

A Model Data Enricher subscribes to the input data stream with scoring requests.

This service attaches additional data points like meta data or an external state for stateful models.

For example a recommendation model might take a user’s previous actions into account to create a recommendation.

The data enricher therefore might cross-reference the user ID with a cache of previous user interactions.

The Model Data Enricher then publishes the enriched scoring request to a modelling objective message stream.

A message consumer per model subscribes to the modelling objective input stream and sends the scoring requests to the corresponding model instance.

Each model objective can host a range of different models as long they share the input data schema.

Each model objective also hosts a Decoy Model which logs scoring requests to a Log stream and a Canary Model which is used to detect drift in the input data and to provide a consistent baseline to compare model performance with.

A Message Producer sends model scoring outputs to the corresponding Rendezvous stream based on the rendezvous service ID which had been attached to the message by the Score Evaluation Service.

A Score Collection Service reads all incoming model scoring outputs from its corresponding rendezvous stream.

Each message has a policy instance ID which had been attached to the message by the Score Evaluation Service.

The messages policy instance ID is the name of a Unix Socket on a shared volume between the Score Collection Service and the Score Evaluation Service which allows the collection service to send the model scoring output to the awaiting policy instance of the corresponding scoring request.

The awaiting policy instance receives a model score via the unix socket server and evaluates its policy logic whether to respond with particular score to the originating scoring request.

Implementing the Rendezvous ServiceMost of the platform is fairly straightforward as it is composed of stateless containerised services and a simple pub/sub messaging pattern.

In production this can be deployed easily on Kubernetes using helm charts and the horizontal pod autoscaler and round-robin load balancing.

Actually, every grouped entity in the platform diagram within the business and modelling objectives are pods which are horizontally scaled.

The main challenge is the implementation of the Rendezvous Service which is stateful.

Conceptually, it’s best to describe the rendezvous service as a mailbox.

Each active scoring request has an active mailbox waiting for model scoring responses.

These mailboxes receive the model scores for their corresponding scoring request and execute the attached score evaluation policies.

As model scores are published to a stream, each mailbox could subscribe to that stream and filter out their relevant model scores for evaluation (fan-out pattern).

This would duplicate the messages for each subscribed mailbox.

At the scale of 50+k requests per minute and a big number of incumbent and challenger models returning scores in parallel this would be impractical.

To achieve scale we have to keep each running policy instance as lightweight as possible.

We implemented the Score Evaluation Service in Python using the asyncio library to implement an async event driven policy evaluation.

The following code is a simplified version of the service:import asyncioapp.

route(‘/topic/test’, methods=[‘POST’])def incoming_scoring_request(): scoring_request = SomeMessageDeserialisation( request.

get_json(), os.

environ['THIS_RENDEZVOUS_SERVICE_ID'] ) loop = asyncio.

new_event_loop() asyncio.

set_event_loop(loop) scoring_response = loop.

run_until_complete( async_score_evaluation_service, scoring_request, loop ) return jsonify(scoring_response)async def async_score_evaluation_service(scoring_request, loop): policy = SomePolicy(loop.

create_future()) policy_coro = asyncio.

start_unix_server( policy.

handle_connection, policy.

socket_path, loop=loop ) policy_task = asyncio.

ensure_future(policy_coro) extended_scoring_request = SomeMessageSerialisation( scoring_request, policy.

socket_path ) message_task = syncio.

ensure_future( publish(extended_scoring_request) ) try: scoring_response = await asyncio.

wait_for( policy.

result_future, timeout=policy.

sla ) except asyncio.

TimeoutError: scoring_response = policy.

default_value() return scoring_responseAs shown in the code snippet the model score evaluation policy starts an async unix socket server.

The path of the socket is given by the uuid of the policy instance.

This is all what’s needed to receive model scores.

Alongside the evaluation service a sidecar Score Collection Service subscribes to the model response stream.

import pulsarimport socketpulsar_consumer = get_message_consumer(os.

environ['THIS_RENDEZVOUS_SERVICE_ID'])while True: msg = pulsar_consumer.

receive() scoring_response = SomeMessageDeserialisation( msg.

data() ) policy_socket = scoring_response.

policy_socket_path sock = socket.

socket(socket.

AF_UNIX, socket.

SOCK_STREAM) sock.

connect(policy_socket) sock.

sendall(SomeMessageSerialisation(scoring_response)) sock.

close() consumer.

acknowledge(msg)It sends the messages to the unix socket of the corresponding score evaluation policy instances based on the policy instance ID.

We attached that ID to the extended scoring request before publishing it to the data stream for that business objective in the Score Evaluation Service.

To create connectivity between the Score Collection sidecar and the Score Evaluation Service I use a shared volume between the two containers for the unix sockets.

This implementation pattern scales much better than the naive fan-out and a single Rendezvous Service instance can host a big number of concurrent score evaluation policies.

I use Apache Pulsar (link) as a distributed pub-sub messaging system.

Pulsar is not just a very performant messaging system, it scales easily to a very big number of topics and was built with highly scalable persistent storage of messages as a primary objective.

Topic Compaction and Tiered Persistent Storage (link) allows the rendezvous platform in the future to persist messages indefinitely and replay scoring requests e.

g.

to test and validate a new challenger model or warm-up a model which maintains an internal state.

The following picture shows an example from the minikube development environment:The request runs through all elements of the Rendezvous platform with a Total Time of 51ms.

The test topic fetches a random number via the model data enricher and the model objective test-0 runs a simplistic model which returns the sum of all numeric values from the scoring request.

I use that setup to test the elements of the rendezvous platform.

Actual models are containerised and decoupled which allows us to test them independently.

This means that the overhead for the rendezvous architecture is between 40–50ms.

A pretty small cost for the wide range of benefits the architecture provides and leaving ample time to fetch external data points and evaluate complex models.

The cost of doing it right is 40msA load test of the minikube development environment shows that 100 requests per second are no big sweat either:The reference implementation demonstrated the benefits of the rendezvous architecture and showed very promising performance.

I am currently working on the Kubernetes productionisation looking at helm and operators.

I will follow up this blog post with the results of our production load testing as soon as we have deployed our platform on the production Kubernetes cluster.

CreditWe are standing on the shoulders of giants.

Credit where credit is due:Ted Dunning and Ellen Friedman and their inspirational book “Machine Learning Logistics” (O’Reilly 2017) introducing the conceptual rendezvous architectureTerry McCann from www.

advancinganalytics.

co.

uk and Christopher Conroy who were early day believer, contributor and supporter of my effortBiohttps://www.

linkedin.

com/in/janteichmann/I’m a highly skilled data scientist, data engineer and solution architect.

I hold a PhD in Mathematics from City University London and offer a strong background in machine learning, statistical modelling and programming.

I have extensive experience in big data, full stack web development and interactive data visualisations which helps me to deliver engaging and comprehensive data science products.

I previously co-founded Cambridge Energy Data Lab where we celebrated a successful exit with Enechange, an utility comparison platform, which is now the market leader in Japan.

I use my skills now as a Senior Data Scientist at the Zoopla to lead the data science team.

We build models on Zoopla’s vast amounts of property market data, behavioural data, geo data, property images and text data sets.

.

. More details

Leave a Reply