A Contract Pattern for Schemaless DataStores

At Jet we do a lot of event sourcing. Our event types are typically modelled using discriminated unions:

type CartEvent =
    | CartCreated
    | ItemAdded of ItemAdded
    | ItemRemoved of ItemRemoved
    | CartCheckedOut of CartCheckedOut

and ItemAdded = { skuId : string ; quantity : int }
and ItemRemoved = { skuId : string }
and CartCheckedOut = { paymentDetails : string }

Since DUs carry no canonical representation in most popular serialization formats, encoding and decoding typically involves a bit of manual work:

let encode = function
    | CartCreated -> "cartCreated", "null"
    | ItemAdded e -> "itemAdded", serialize e
    | ItemRemoved e -> "itemRemoved", serialize e
    | CartCheckedOut e -> "cartCheckedOut", serialize e

let decode = function
    | "cartCreated", _ -> CartCreated
    | "itemAdded", json -> ItemAdded(deserialize json)
    | "itemRemoved", json -> ItemAdded(deserialize json)
    | "cartCheckedout", json -> CartCheckedOut(deserialize json)
    | t,_ -> failwithf "unrecognized event type '%s'" t

where the functions serialize : 'T -> string and deserialize : string -> 'T represent some serialization library like Json.NET. The purpose of the encoder functions is to embed domain events into a tupled representation, the first element being a string identifier for the event type, and the second potentially containing pickled payload data. Sending encoded events over the wire is then a relatively straightforward undertaking.

While this approach works fine with small examples, the boilerplate it introduces tends to become unwieldy as domain complexity grows. The keen-eyed reader might have noticed that I’ve inserted a couple of bugs in my decode function.

While one could argue that using techniques such as property based testing might be sufficient to catch such classes of bugs, the issue remains that making changes in event types generally comes with high maintenance. What if there existed a way to canonically represent union types in a way that is properly generic and serialization format agnostic?

The UnionContract Module

The TypeShape library that I maintain ships with a small module called UnionContract. The surface API is tiny, exposing only a single function and a couple of interfaces. At its core it defines a datatype generic program which accepts a serializer instance, represented by instances of the interface:

type IEncoder<'Format> =
    abstract Encode<'T> : value:'T -> 'Format
    abstract Decode<'T> : fmt:'Format -> 'T
    abstract Empty : 'Format

and generates functions capable of encoding and decoding union values in the sense described previously. We can equivalently express the above encoders using the code:

open System.Runtime.Serialization
open TypeShape.UnionContract
open Newtonsoft.Json

type CartEvent =
    | [<DataMember(Name = "cartCreated")>] CartCreated
    | [<DataMember(Name = "itemAdded")>] ItemAdded of ItemAdded
    | [<DataMember(Name = "itemRemoved")>] ItemRemoved of ItemRemoved
    | [<DataMember(Name = "cartCheckedOut")>] CartCheckedOut of CartCheckedOut
with
    interface IUnionContract // marker interface

// object  json string encoder used for union case payloads
let jsonEncoder =
    { new IEncoder<string> with
        member __.Empty = "null" // payload to be inserted in nullary cases
        member __.Encode(t : 'T) = JsonConvert.SerializeObject(t)
        member __.Decode(json : string) = JsonConvert.DeserializeObject<'T>(json) }

// creates a union contract encoder
let unionEncoder = UnionContractEncoder.Create<CartEvent, string>(jsonEncoder)

The UnionContractEncoder.Create method accepts a datatype generic encoder that pickles union case payloads into a fixed format type (in this case string), and generates an encoder that acts on the union level.

We can use the newly created encoder to encode and decode union instances:

unionEncoder.Encode(CartCreated)
// val it : EncodedUnion<string> = {CaseName = "cartCreated";
//                                  Payload = "null";}

unionEncoder.Encode(ItemAdded { skuId = "a" ; quantity = 2 })
// val it : EncodedUnion<string> = {CaseName = "itemAdded";
//                                  Payload = "{"skuId":"a","quantity":2}";}

Union instances can be reconstructed by applying the inverse function to the encoded unions:

unionEncoder.Decode { CaseName = "cartCreated" ; Payload = null }
// val it : CartEvent = CartCreated

So how does it all work? Under the hood, the implementation uses constructs from the TypeShape library to duplicate the logic of the original encode and decode functions in a datatype generic manner.

Plugging Serialization Formats

Serialization formats can be controlled by plugging different IEncoder implementations. For instance:

open Newtonsoft.Json.Linq

let jtokenEncoder =
    { new IEncoder<JToken> with
        member __.Empty = JValue.CreateNull() :> _
        member __.Encode(t : 'T) = JToken.FromObject t
        member __.Decode(t : JToken) = t.ToObject<'T>() }        

let jtokenUnionEncoder = UnionContractEncoder.Create<CartEvent,JToken>(jtokenEncoder)

produces an encoder that pickles event payloads as Newtonsoft JToken values.

Versioning Events

In the original example, suppose we want to alter the ItemRemoved event so that it additionally includes a quantityRemoved field like so:

type ItemRemoved = { skuId : string ; quantityRemoved : int}

Assuming that events using the original shape have already been persisted to the database, it is evident that this introduces a breaking change to the schematisation. Many databases typically used for event sourcing are of the “schemaless” variety, however as the author of Designing Data-Intensive Applications put it:

Document databases are sometimes called schemaless, but that’s misleading, as the code that reads the data usually assumes some kind of structure—i.e., there is an implicit schema, but it is not enforced by the database. A more accurate term is schema-on-read (the structure of the data is implicit, and only interpreted when the data is read), in contrast with schema-on-write (the traditional approach of relational databases, where the schema is explicit and the database ensures all written data conforms to it).

-Chapter 2, Data Models and Query Languages

So how do we ensure that union contracts enforce a version tolerant schema-on-read? I propose an approach adhering to the following set of principles:

  1. Apply separation of domain types from DTOs.
  2. Model DTOs using the UnionContract construct.
  3. Make DTO union contracts append-only.
  4. Freely evolve domain types as schema changes.
  5. Push versioning and migration responsibility to the DTO layer.

Applying this to the above example would result in the following arrangement for domain events:

namespace Domain

type CartEvent =
    | CartCreated
    | ItemAdded of ItemAdded
    | ItemRemoved of ItemRemoved
    | CartCheckedOut of CartCheckedOut

and ItemRemoved = { skuId : string ; quantityRemoved : int}
...

whereas the contract type would look as follows:

namespace Contracts

type CartEventDto =
    | [<DataMember(Name = "cartCreated")>] CartCreated
    | [<DataMember(Name = "itemAdded")>] ItemAdded of ItemAdded
    | [<DataMember(Name = "itemRemoved")>] ItemRemovedV1 of ItemRemovedV1
    | [<DataMember(Name = "itemRemoved/v2")>] ItemRemovedV2 of ItemRemovedV2
    | [<DataMember(Name = "cartCheckedOut")>] CartCheckedOut of CartCheckedOut
with
    interface IUnionContract

and ItemRemovedV1 = { skuId : string }
and ItemRemovedV2 = { skuId : string ; quantityRemoved : int}
...

Notice how the two versions of ItemRemoved are defined as separate, explicitly qualified DTO union cases. On the other hand, the Domain event type only contains a single, canonicalized ItemRemoved case.

We need a fromDomain function to map domain events to DTOs:

let fromDomain = function
    | CartEvent.CartCreated -> CartEventDto.CartCreated
    | CartEvent.ItemAdded e -> CartEventDto.ItemAdded { skuId = e.skuId ; quantity = e.quantity }
    | CartEvent.ItemRemoved e -> CartEventDto.ItemRemovedV2 { skuId = e.skuId ; removedQuantity = e.removedQuantity }
    | CartEvent.CartCheckedOut e -> CartEventDto.CartCreated { checkoutDetails = e.checkoutDetails }

The opposite toDomain function is more interesting, as it contains migration logic:

let toDomain = function
    | CartEventDto.CartCreated -> CartEvent.CartCreated
    | CartEventDto.ItemAdded e -> CartEvent.ItemAdded { skuId = e.skuId ; quantity = e.quantity }
    | CartEventDto.ItemRemovedV1 e -> CartEvent.ItemRemoved { skuId = e.skuId ; removedQuantity = 1 }
    | CartEventDto.ItemRemovedV2 e -> CartEvent.ItemRemoved { skuId = e.skuId ; removedQuantity = e.removedQuantity }
    | CartEventDto.CartCheckedOut e -> CartEvent.CartCreated { checkoutDetails = e.checkoutDetails }

In this function, we choose to interpret “v1” ItemRemoved events as having removed quantity 1. By using the two functions in conjunction with the union encoder we obtain serialization/schema-on-read for the domain types as follows:

let serialize (e : CartEvent) = fromDomain e |> unionEncoder.Encode
let deserialize (c : EncodedUnion) = unionEncoder.Decode c |> toDomain

A nice property of this approach is that we are forcing any versioning and migration concerns outside of the domain, making business logic cleaner to work with. We have been using this pattern with success in a few of our systems.

Versioning Snapshots

This pattern is not strictly applicable to event serialization. We can also use it for versioning snapshot schemata:

type SnapshotV1 = { ... }
type SnapshotV2 = { ... }
type SnapshotV3 = { ... }

type SnapshotDto =
    | [<DataMember(Name = "snapshot/v1")>] SnapshotV1 of SnapshotV1
    | [<DataMember(Name = "snapshot/v2")>] SnapshotV2 of SnapshotV2
    | [<DataMember(Name = "snapshot/v3")>] SnapshotV3 of SnapshotV3 // current
with
    interface IUnionContract

let unionEncoder = UnionContractEncoder.Create<SnapshotDto,_>(jtokenEncoder)

We can then define our conversion functions, with a slight variation:

let fromDomain snapshot = SnapshotV3 snapshot

let toDomain = function
    | SnapshotV1 snap -> migrateFromV1 snap
    | SnapshotV2 snap -> migrateFromV2 snap
    | SnapshotV3 snap -> snap // current snapshot, no migration needed

Then as before, we obtain the serialization/migration functions by composition

let serialize (e : SnapshotV3) = fromDomain e |> unionEncoder.Encode
let deserialize (c : EncodedUnion) = unionEncoder.Decode c |> toDomain

Again, we have achieved pushing all versioning and migration concerns to the DTO layer of our codebase.

Conclusions

I have attempted to illustrate the utility of using F# Discriminated Unions as a means to encode code-first schematisations for “schemaless” (schema-on-read) datastores. The use of the TypeShape UnionContract module in conjunction with separating domain types from DTOs provides us with an effective pattern to isolate versioning and data migration concerns from core domain logic. The versioning logic itself is pure and as such highly amenable to testing. We have used this pattern effectively in multiple production systems across at least two different teams within Jet.

Leave a comment