Discovering CQRS and Event Sourcing with Axon Framework

Photo by Dmitri Popov on UnsplashDiscovering CQRS and Event Sourcing with Axon FrameworkGeorgeBlockedUnblockFollowFollowingMar 5This post is a companion for a Axon Labs — Chat Application exercise which is intended as an introduction to Axon Framework.

Axon is a CQRS/Event sourcing framework and server implemented in Java with extensive use of Spring and Spring Boot.

Axon is a wonderful framework to learn and to experiment with CQRS and Event Sourcing concepts.

So let’s get started.

The exercise itself (with full solution) is available on the original GitHub repository from AxonIQ.

Here I will just show some interesting observations I have stumbled upon while completing the exercise and learning Axon in general.

Getting readyThe coolest thing about Axon is that everything is ready to go thanks to Spring Boot configuration.

Get the code for the chat application from the link above.

Once the repository is pulled in it should just compile as is.

The tests do not, of course, pass just yet.

Discovering the POM file we notice that there is a Kotlin Maven plugin for compiling Kotlin classes.

Kotlin is used for succinctly describing all the messages: commands, events and queries.

But I don’t like to mix languages so I decided to rewrite Kotlin classes in Java using Lombok.

It is easily installed via a Maven dependency and can be used with IntelliJ with a handy plugin.

This allows to declare Value classes for all immutable messages without actually that much more code.

Then there is no need for special Kotlin compiler.

As the tutorial states, there is Axon server which needs to be available for the application to run.

The easiest thing is to use Docker for running an instance of Axon server on the background.

Following the documentation, run the server from the Docker image:$ docker run -d –name axon -p 8024:8024 -p 8124:8124 axoniq/axonserverI left the default port mappings.

However, when running the application there is a need to set the application property in order for the application to connect to Axon server.

# in application.

propertiesaxon.

axonserver.

servers=localhostCommand model or AggregateThe first thing to do is to set up our aggregate.

It’s in the ChatRoom.

java class.

Axon documentation speaks of Aggregate and AggregateRoot annotations.

However, you cannot use just AggregateRoot annotation, it will not setup the aggregate.

So now we are ready to implement and test our first command.

Implement (convert from Kotlin) CreateRoomCommandAdd constructor to the aggregate taking this command as an argument.

This will direct Axon to create an instance of the aggregate when handing this command.

Add command dispatching from CommandController using the wired CommandGateway to dispatch the command upon a POST to /rooms endpoint.

This is where the connection to Axon server will take place.

If you run the Spring Boot application at this point you should be able to see it in the overview of the Axon Dashboard at localhost:8024/#overviewYou also should see CreateRoomCommand listed under “Commands” tab.

At this point you can issue requests to the command controller using Swagger UI at localhost:8080/swagger-ui.

html .

First question I asked myself at this point was: are the aggregates persisted in the default event store beyond the restarts of the application.

Trying to create several rooms with the same roomId (i.

e.

aggregate identifier): as expected, there is an error.

2019-02-23 17:01:24.

435 WARN 7 — [-worker-ELG-3-1] i.

a.

a.

localstorage.

LocalEventStore : Error while storing eventsjava.

util.

concurrent.

CompletionException: io.

axoniq.

axonserver.

exception.

MessagingPlatformException: [AXONIQ-2000] Invalid sequence number 0 for aggregate r1, expected 7Axon has a default H2 backed in-memory datastore for the aggregates so you cannot create two aggregates with the same ID, even after the restart of the application (as long as Axon server is not restarted).

Event SourcingCommand side of the application is responsible for updating the state, i.

e.

creating and mutating the aggregates.

This implies a two stage process: first a command is dispatched by the command gateway to the particular aggregate instance using the value of the command field annotated with TargetAggregateIdentifier .

The command handler (method) verifies that the intent of the command (the intended action) should result in a domain event which is then dispatched via AggregateLifecycle.

apply() method.

Then a method annotated with EventSourcingHandler annotation (in the same aggregate instance as the one which handled the command) actually updates the state of the instance (creating a chat room, adding a participant to the list of participants in the chat room, etc.

).

Couple of things to notice here:Event handling method which mutates the state of the aggregate’s instance should be annotated with EventSourcingHandler (not EventHandler ).

Basically, EventSourcingHandler handles the events generated by the instance of the aggregate itself during command handling phase.

EventHandling is for the handling of the events outside of the aggregate: in query handing components, sagas.

Business logic, such as checking of the preconditions should be done in the command handler (not in the event handler) with the outcome of the check being an application of this or that event or an exception.

The event sourcing event should just change the state of the aggregate instance according to the information in the event’s payload.

Remember, that each mutation will be replayed for each instance of the aggregate when this instance is demanded.

Projections and query handlingQuery side of the application is responsible for maintaining a queryable (usually denormalized) model of the application according to various requirements determined by the application’s UI logic, for example.

This is the read side (or query side) of the CQRS.

First the projection has to be constructed and persisted in a repository as a result of handling of one or several events in which the projection is interested.

This is done in the methods annotated with EventHandler annotation.

Then a projection can be queried using a method annotated with QueryHandler annotation which usually queries the underlying repository.

There is an interesting point when calling the query from the query controller.

The query will be matched using the type of the query (Java class of the query instance) and the exact match for the response type of the query.

In the case when the query returns a generic collection, such as the case with AllRoomsQuery in the exercise, one needs to specify the response type as parametrized for a certain type (RoomSummary in our case).

This is exactly what MultipleInstancesResponseType class is for.

Inspecting embedded H2 database we can actually see how the projection is stored in the database.

There is also a token in TOKEN_ENTRY generated for each projection.

Subscription type queries are pretty cool.

In the exercise, there isChatMessageProjection which can update any subscription for a RoomMessagesQuery with a new message for a matching room.

The subscription to the query returns an instance of SubscriptionQueryResult which contains an instance of Mono with the list of initial query results and an instance of Flux which will be updated for each new message posted (with matching room ID, that’s why the subscription takes a filter).

This is a nice usage of reactive API.

Some general points.

Axon framework is a very interesting way to initiate oneself to CQRS and ES.

It’s well put together and easy to get started with.

The provided dashboard is very handy.

Here is an example (GitHub) of how one can use Axon framework but without an Axon server and without event sourcing.

.. More details

Leave a Reply