Making sense of Apache Flink state migration with Scala and Avro

Making sense of Apache Flink state migration with Scala and AvroMarc RoodingBlockedUnblockFollowFollowingApr 24IntroductionAll stateful applications need a state storage mechanism.

In traditional applications, this often is a relational or a non-relational database.

When using Apache Flink, state storage and management is offered out of the box.

This, on one hand, makes it easier to get started with Flink, but on the other hand, it forces you to learn how you can leverage the built-in state management Flink offers.

One of the major aspects that are vital for any production-grade application, is state migration.

You don’t just build software in one go, you start small, and make continuous iterations to improve your software.

This is no different when writing a Flink job.

With software relying on databases, you will most likely resort to migration tools that update your database schema and data as your software and data model evolves.

With Apache Flink, this has long been one of the harder issues to get right.

This article will first go into the history of how state schema evolution in Flink evolved over time.

It also explains our trial-and-error approach to state schema migration offered out-of-the-box in version 1.

8.

To conclude, it shows how we eventually used custom serializers to support Apache Avro-backed schema evolution for our Scala-based jobs.

TL;DR Sample projectThroughout this article, I’ll use snippets which all come from the sample end-solution that I’ve put together on GitHub.

Going through all the code in this article would distract from the main topic.

Thus, if you find yourself wondering where a reference or variable in one of the snippets come from, please take a look at the sample code.

The past and the presentBefore version 1.

7, Flink relied on the Java Kryo serialization framework to serialize and deserialize your data objects.

When taking a savepoint of your job’s state, Flink would use Kryo serializers to serialize your data models.

Both the serializer itself and the serialized data would be persisted in the savepoint.

The problem lies in trying to restore from a savepoint with a job that contains a change to your data object (e.

g.

adding an additional field).

Any change to your object would result in a different hash code of the class than the one calculated at the time of creating the check- or savepoint.

Subsequently, Flink wouldn’t be able to match any of the persisted serializers to the updated data object on the classpath.

To support data schema evolution in Flink pre 1.

7 the only solution was to create custom serializers.

In a previous article, my colleague, Niels Denissen, explained how that was achieved using Apache Avro.

The release notes for Flink 1.

7, however, showed to be very promising in offering an out-of-the-box solution to state migration:With Flink 1.

7.

0, the community added state evolution which allows you to flexibly adapt a long-running application’s user states schema while maintaining compatibility with previous savepoints.

State schema evolution now works out-of-the-box when using Avro’s generated classes as user state, meaning that the schema of the state can be evolved according to Avro’s specifications.

However, in version 1.

7, the POJO serialization support did not yet support composite types like Scala case classes.

With the release of Flink 1.

8.

0, all built-in serializers have been upgraded to use the new abstraction.

All composite types supported by Flink, like Either or Scala case classes now are generally evolve-able as well when they have a nested evolvable type, such as a POJO.

Generating case classes from Avro schemasThe Flink documentation was very specific about the state schema evolution working out-of-the-box when using Avro’s generated classes.

We decided to give this a try first.

In our pre 1.

8 solution, we didn’t generate case classes from Avro schemas yet so this is the first step to take.

In Scala, it’s incredibly easy to generate case classes based on Avro schemas using sbt-avrohugger.

Avrohugger can generate 3 different formats: Standard (for use with Avro’s GenericRecord), SpecificRecord and Scavro.

Our previous solution used to serialize and deserialize GenericRecord, so we thought we’d give the Standard format a go.

However, we quickly ran into problems with this approach: Flink expects each data object to have a no-argument constructor.

Scala case classes are inherently immutable, offering only 1 constructor which takes all fields the case class contains as arguments.

By switching to the SpecificRecord standard we would achieve what we needed to adhere to the POJO rules as stated in the documentation:The class is public and standalone (no non-static inner class)The class has a public no-argument constructorAll non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

As an example, this is what a generated case class would look like for a simple object:/** MACHINE-GENERATED FROM AVRO SCHEMA.

DO NOT EDIT DIRECTLY */package nl.

mrooding.

dataimport scala.

annotation.

switchcase class ProductDescription(var id: String, var description: String, var updatedAt: java.

time.

Instant) extends org.

apache.

avro.

specific.

SpecificRecordBase { def this() = this("", "", java.

time.

Instant.

now) def get(field$: Int): AnyRef = { (field$: @switch) match { case 0 => { id }.

asInstanceOf[AnyRef] case 1 => { description }.

asInstanceOf[AnyRef] case 2 => { updatedAt.

toEpochMilli }.

asInstanceOf[AnyRef] case _ => new org.

apache.

avro.

AvroRuntimeException("Bad index") } } def put(field$: Int, value: Any): Unit = { (field$: @switch) match { case 0 => this.

id = { value.

toString }.

asInstanceOf[String] case 1 => this.

description = { value.

toString }.

asInstanceOf[String] case 2 => this.

updatedAt = { value match { case (l: Long) => { java.

time.

Instant.

ofEpochMilli(l) } } }.

asInstanceOf[java.

time.

Instant] case _ => new org.

apache.

avro.

AvroRuntimeException("Bad index") } () } def getSchema: org.

apache.

avro.

Schema = ProductDescription.

SCHEMA$}object ProductDescription { val SCHEMA$ = new org.

apache.

avro.

Schema.

Parser().

parse("{"type":"record","name":"ProductDescription","namespace":"nl.

mrooding.

data","fields":[{"name":"id","type":"string"},{"name":"description","type":"string"},{"name":"updatedAt","type":{"type":"long","logicalType":"timestamp-millis","default":0}}]}")}You can see that it uses var for all the fields to make it mutable.

It also adds the no-arg constructor required for initialization.

It also adds the entire Avro schema as a val SCHEMA$ into the companion object.

TypeInformation inferenceAt the core of Flink’s typing system is the TypeInformation class.

It contains information about the data types that you use as state.

You can either declare a specific implementation of TypeInformation yourself or let Flink infer it automatically for you.

The documentation clearly states that state schema evolution is only supported if you let Flink infer the types.

By using the following import, we were able to let Flink infer the TypeInformation for our generated Avro case classes.

import org.

apache.

flink.

api.

scala.

_Unfortunately, even with the use of case classes adhering to the Flink POJO rules, we weren’t able to get it working and received the following error:Caused by: org.

apache.

avro.

AvroRuntimeException: Not a Specific class: class nl.

mrooding.

data.

Product at org.

apache.

avro.

specific.

SpecificData.

createSchema(SpecificData.

java:285) at org.

apache.

avro.

specific.

SpecificData$2.

load(SpecificData.

java:218) at org.

apache.

avro.

specific.

SpecificData$2.

load(SpecificData.

java:215) at avro.

shaded.

com.

google.

common.

cache.

LocalCache$LoadingValueReference.

loadFuture(LocalCache.

java:3568) at avro.

shaded.

com.

google.

common.

cache.

LocalCache$Segment.

loadSync(LocalCache.

java:2350) at avro.

shaded.

com.

google.

common.

cache.

LocalCache$Segment.

lockedGetOrLoad(LocalCache.

java:2313) at avro.

shaded.

com.

google.

common.

cache.

LocalCache$Segment.

get(LocalCache.

java:2228) .

29 moreIf we look at the source code of SpecificData we can see that it tries to resolve the Avro schema by looking for a static property called SCHEMA$ in the Java class using Java Reflection.

Our Scala case classes have a companion object which contains the SCHEMA$ property but Java reflection techniques are unable to read Scala properties (#111).

The only solution we came up with for this issue is to switch to Java generated Avro classes.

We decided to not go down this road for now.

Instead, we decided to see how far we’d get with plain old case classes.

Back to plain old case classesGiven a basic Scala case class, like ProductDescription:import java.

time.

Instantcase class ProductDescription(id: String, description: String, updatedAt: Instant)We’ll start with the most basic example that contains state that you can think of.

Namely, consuming a DataStream with only a keyBy.

val wordCountStream: DataStream[ProductDescription] = text .

keyBy(_.

word)In this example, the keyBy is the function that creates state.

For this type of state, you need the native POJO serialization and deserialization with state schema migration support.

In Flink 1.

7, state schema evolution would only work for this example if the ProductDescription case class did not contain any composite types.

With POJO schema state migration, there are a few rules to what you can and can’t do:Fields can be removed.

Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints.

New fields can be added.

The new field will be initialized to the default value for its type, as defined by Java.

Declared fields types cannot change.

Class name of the POJO type cannot change, including the namespace of the class.

So far so good.

We can use the out-of-the-box state schema evolution to manage DataStream state.

Let’s now take a look at state kept within a processor, like ValueState.

In our sample application, we have a CoProcessFunction[ProductDescription, ProductStock, Product] which joins two data models and outputs Product.

As we said before, Flink should be able to manage state schema evolution if you let Flink infer the TypeInformation.

Thus, we define our ValueState in the processor as follows:import org.

apache.

flink.

api.

scala.

_private[this] lazy val stateDescriptor: ValueStateDescriptor[Product] = new ValueStateDescriptor[Product]("product-join", classOf[Product])private[this] lazy val state: ValueState[Product] = getRuntimeContext.

getState(stateDescriptor)Again, we import scala.

_ so that Flink uses implicit conversions to infer TypeInformation for Product.

If we run our app with this setup, it works fine.

Restarting the job with a savepoint also works, as long as we do not modify the Product case class.

The problems start as soon as we try to modify Product.

After adding an additional field and restarting from a savepoint, the following error is presented:Caused by: com.

esotericsoftware.

kryo.

KryoException: Unable to find class: .

Serialization trace:extra (nl.

mrooding.

data.

Product) at com.

esotericsoftware.

kryo.

util.

DefaultClassResolver.

readName(DefaultClassResolver.

java:138) at com.

esotericsoftware.

kryo.

util.

DefaultClassResolver.

readClass(DefaultClassResolver.

java:115) at com.

esotericsoftware.

kryo.

Kryo.

readClass(Kryo.

java:641) at com.

esotericsoftware.

kryo.

serializers.

ObjectField.

read(ObjectField.

java:99) at com.

esotericsoftware.

kryo.

serializers.

FieldSerializer.

read(FieldSerializer.

java:528) at com.

esotericsoftware.

kryo.

Kryo.

readClassAndObject(Kryo.

java:761) at org.

apache.

flink.

api.

java.

typeutils.

runtime.

kryo.

KryoSerializer.

deserialize(KryoSerializer.

java:346) at org.

apache.

flink.

runtime.

state.

heap.

StateTableByKeyGroupReaders.

lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.

java:74) at org.

apache.

flink.

runtime.

state.

KeyGroupPartitioner$PartitioningResultKeyGroupReader.

readMappingsInKeyGroup(KeyGroupPartitioner.

java:297) at org.

apache.

flink.

runtime.

state.

heap.

HeapRestoreOperation.

readKeyGroupStateData(HeapRestoreOperation.

java:290) at org.

apache.

flink.

runtime.

state.

heap.

HeapRestoreOperation.

readStateHandleStateData(HeapRestoreOperation.

java:251) at org.

apache.

flink.

runtime.

state.

heap.

HeapRestoreOperation.

restore(HeapRestoreOperation.

java:153) at org.

apache.

flink.

runtime.

state.

heap.

HeapKeyedStateBackendBuilder.

build(HeapKeyedStateBackendBuilder.

java:127) .

11 moreCaused by: java.

lang.

ClassNotFoundException: .

at java.

net.

URLClassLoader.

findClass(URLClassLoader.

java:382) at java.

lang.

ClassLoader.

loadClass(ClassLoader.

java:424) at org.

apache.

flink.

runtime.

execution.

librarycache.

FlinkUserCodeClassLoaders$ChildFirstClassLoader.

loadClass(FlinkUserCodeClassLoaders.

java:129) at java.

lang.

ClassLoader.

loadClass(ClassLoader.

java:357) at java.

lang.

Class.

forName0(Native Method) at java.

lang.

Class.

forName(Class.

java:348) at com.

esotericsoftware.

kryo.

util.

DefaultClassResolver.

readName(DefaultClassResolver.

java:136) .

23 moreIt’s abundantly clear from the stack trace that Flink is falling back to Kryo to (de)serialize our data model, which is that we would’ve expected.

At this point, we decided that we spent enough time trying to get it working out-of-the-box.

We decided to resort back to custom serializers for now.

Custom serialization: what changed?As we explained earlier, before version 1.

7, Flink not only wrote the serialized data into the savepoints.

It also wrote the serializers themselves into savepoints.

In 1.

7 and beyond, the restore serializer is determined at compile time.

Flink achieves this by only saving serializer configuration in state and reconstructing the serializer itself based on this configuration, at runtime.

This is the exact opposite of how it used to work: the serializer would first be reconstructed from the state.

The reconstructed serializer would then try to instantiate an instance of TypeSerializerConfigSnapshot.

Since our previous solution was always converting the actual data models (case classes) into Avro GenericRecord objects before serializing, we only required 1 serializer.

Besides that, we also didn’t need to keep any configuration related to our serializer.

This is because by adhering to the Avro schema evolution compatibility rules, we would always be able to deserialize from a previous version of our data models.

A downside which Niels Denissen already pointed out in the previous article is that by only using GenericRecord we cannot guarantee that each data object will get serialized when taking a savepoint.

This means that, for each data object, we needed to keep the entire history of Avro schema updates.

Why?.Because we lacked the guarantee that all data stored in a savepoint was serialized using the latest schema version.

The AvroSchemaTranscoder we built at that time merged all available historical schema’s and tried to deserialize each object using one of the available schema’s.

Working with the custom serialization APIsIn the next sections, we’ll talk about the most important methods for the 2 Flink APIs that we needed to implement.

The gist of what we’ll be implementing is:the serializer, implementing TypeSerializer, which takes care of serializing and deserializing our data modelthe serializer snapshot, implementing TypeSerializerSnapshot, which is used to write and read information about the state saved in the savepoint.

It also instantiates the serializer based on the snapshot data upon restoring from a snapshot.

In our solution, we’ll still be converting all our data models to GenericRecord before storing it in the savepoint.

Because of this, we could build our solution around 2 traits.

Every data model that we want to be able to serialize, would have a specific extension of both traits.

Again, it can contain references which may not be entirely clear from the snippets shown.

Please have a look at the sample project if you find yourself struggling.

Note: this implementation has only been tested using a memory-state backend.

The TypeSerializer implementationThere are quite a lot of methods that you’ll need to implement, but we’ll only cover the most important ones:public abstract void serialize(T record, DataOutputView target) throws IOException;public abstract T deserialize(DataInputView source) throws IOException;public abstract TypeSerializerSnapshot<T> snapshotConfiguration();Our serializer constructor will take 1 argument, stateSchema: Option[Schema].

We want to have the ability to instantiate it in 2 different scenarios:Starting the job without restoring from a savepoint.

In this case, we’ll provide a None value to the constructor.

The only schema we care about is the runtime schema.

Starting the job by restoring from a savepoint.

In this case, we’ll retrieve the schema saved in the savepoint and provide it as an argument to the constructor.

Why we would want to have both the state schema and the runtime schema in the serializer will make sense when we take a look at the deserialize implementation:override def deserialize(source: DataInputView): T = { val blobSize = source.

readInt() val blob = new Array[Byte](blobSize) source.

read(blob) val decoder = DecoderFactory.

get().

binaryDecoder(blob, null) val reader = stateSchema match { case Some(previousSchema) => new GenericDatumReader[GenericRecord](previousSchema, getCurrentSchema) case None => new GenericDatumReader[GenericRecord](getCurrentSchema) } val genericRecord = reader.

read(null, decoder) fromGenericRecord(genericRecord)}We first read the binary data from the DataInputView and then decide with which schemas to construct the GenericDatumReader.

The GenericDatumReader has multiple constructors:one that takes both a writer and reader schemaone that only takes a single schemaIf our serializer implementation has a stateSchema defined, we construct the GenericDatumReader with both the writer and reader schema.

If stateSchema is None, we’ll construct it using only the runtime schema.

Last but not least, we decode the binary data into an Avro GenericRecord and then call fromGenericRecord which converts the GenericRecord into an instance of the data model.

The implementation of serialize basically converts the data model instance into a GenericRecord, then encodes it into binary data to eventually write it to the DataOutputViewoverride def serialize(instance: T, target: DataOutputView): Unit = { val genericRecord = instance.

toGenericRecord val builder = ByteString.

newBuilder val avroEncoder = EncoderFactory.

get().

binaryEncoder(builder.

asOutputStream, null) new GenericDatumWriter[GenericRecord](genericRecord.

getSchema).

write(genericRecord, avroEncoder) avroEncoder.

flush() val blob = builder.

result().

toArray target.

writeInt(blob.

length) target.

write(blob)}The last method that we’ll cover is the implementation of snapshotConfiguration.

The implementation of snapshotConfiguration is left to be implemented by each data model specific SerializerSnapshot.

We do not provide the stateSchema argument during instantiation.

We rely on the no-argument constructor which sets the stateSchema value to None.

override def snapshotConfiguration(): TypeSerializerSnapshot[Product] = new ProductSerializerSnapshot()The TypeSerializerSnapshot implementationThere are 2 methods available which you need to implement to read and write configuration data to the snapshot:void writeSnapshot(DataOutputView out) throws IOException;void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;Besides that, you also need to implement the method that does the actual instantiation of the serializer:TypeSerializer<T> restoreSerializer();As mentioned earlier, every data model will have an implementation of TypeSerializerSnapshot.

By doing this, we would resolve the issue of having to maintain all Avro schema history indefinitely.

It would also greatly improve our code by using proper typing.

Instead of writing a state descriptor using GenericRecord objects and afterwards mapping it to our own data model we would be able to define a descriptor immediately with our data model.

This ensures that the data always gets serialized when creating a savepoint.

Previously:val descriptor: ValueStateDescriptor[GenericRecord] = new ValueStateDescriptor[GenericRecord](.

)val bufferState: ValueState[GenericRecord] = getRuntimeContext.

getState(descriptor)val state: Option[Product] = Option(bufferState.

value()).

map(Product.

fromGenericRecord)After:val descriptor: ValueStateDescriptor[Product] = new ValueStateDescriptor[Product](.

)val bufferState: ValueState[Product] = getRuntimeContext.

getState(descriptor)val state: Option[Product] = Option(bufferState.

value())Back to our implementation for readSnapshot and writeSnapshot.

Let’s say we have a data model which requires a new field.

What would we actually need to persist, except for the data itself, to be able to deserialize it again?.Obviously, the Avro schema that belongs to that data.

Thus, the implementation for writeSnapshot is nothing more than writing the Avro schema as a String:override def writeSnapshot(out: DataOutputView): Unit = out.

writeUTF(getCurrentSchema.

toString(false))For thereadSnapshot implementation, we can read the stored schema and parse it as an Avro Schema object.

Finally, we assign it to the stateSchema property.

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { val previousSchemaDefinition = in.

readUTF this.

stateSchema = Some(parseAvroSchema(previousSchemaDefinition))}With the ability to read and write snapshots, the last step of the puzzle is to write the implementation of restoreSerializer:override def restoreSerializer(): TypeSerializer[Product] = new ProductSerializer(stateSchema)This time we use the stateSchema retrieved by readSnapshot to instantiate the ProductSerializer.

Closing upOne further improvement that we were considering is replacing the custom toGenericRecord and fromGenericRecord method that each of our data models needs to implement.

For this, we looked at using avro4s.

It allows you to convert from and to GenericRecord based on the Avro schema.

Unfortunately, due to #250, we weren’t able to get this working right away.

In this article, we’ve covered the current state of schema migration in Apache Flink.

We also tried to get the out-of-the-box support working for our use case, and eventually showed you how you can implement custom serialization.

We’re very interested to hear from others how they’ve solved schema migration in their Flink jobs.

As mentioned before, you can find an example project on Github.

.

. More details

Leave a Reply