The meaningfulness of Events via standardization ( Part 2 )

The meaningfulness of Events via standardization ( Part 2 )

In this part of Event standardization, I'm going to follow part 1 ( here ) and show how we can better use standards to guarantee the promises established.

In Part 1 we talked about:

  • Event Envelope

  • Event Metadata

  • Event Data

  • Promises

These are the simple steps to prepare enterprise-level better governance and inter-component communication

But Asking why and how we can use them to better communicate and better govern the software, brings us toward this part ( part 2 )

Documentation:

The documentation for an event-driven design can be achieved via one or a mix of the following tools

  • AsyncApi: to document the stream and messaging contracts side of channel, protocol, etc …

  • Event Catalog: to document in a visualized manner the event flow and communication between different components in a single or multiple bounded context.

  • Markdown Docs: This is a text-based, simple doc that provides a good starting when discovering new services.

  • Mermaid: This is a useful tool to schematize any kind of component relationship and flow diagram. you can draw via a simple syntax a collaboration, sequence or flow diagram.

AsyncApi:

asyncApi document is a JSON / Yaml standard that allows providing Api documents like openApi but with a focus on async communication, Async API has the following parts:

  • Info

  • Servers

  • Channels

  • Components

Example:

---
asyncapi: 2.6.0
info:
  version: 1.0.0
  title: Event
servers:
  eventTopic:
    url: source-event-sns-topic
    protocol: HTTP
channels:
  notificationEvents:
    servers:
      - eventTopic
    subscribe:
      summary: Subscribe to receive notification events.
      message:
        $ref: '#/components/messages/NotificationMessage'
components:
  messages:
    NotificationMessage:
      name: notification event
      payload:
        $ref: '#/components/schemas/Event'
  schemas:
    Event:
      type: object
      required:
      - spacVersion
      - source
      - type
      - time
      - idempotencyKey
      - id
      properties:
        spacVersion:
          type: string
        source:
          type: string
        time:
          type: string
          format: date-time
        type:
          type: string
          enum:
          - order.created
          - order.udated
          - order.rejected
        category:
          type: string
          enum:
          - INTEGRATION
          - NOTIFICATION
          - DELTA
          - CARRIED-STATE
        idempotencyKey:
          type: string
        contentType:
          type: string
          enum:
          - application/json
          - application/*+avro
        data:
          type: object
          properties:
            orderId:
              type: string
            state:
              type: string
              enum:
              - OrderCreated
              - OrderCanceled
              - OrderRefused
              - OrderPaymentRejected
              - OrderConfirmed

You can use Asyncapi studio or the vs code extension to render your definition and validate visually your contract creation process.

Event Catalog:

The Event Catalog is a great tool for generating multi-context communication visualization. you can define the following pars by just creating the folder structure.

  • Domains

  • Services

  • Events

This is a sample from the article code, we define the different domains and the related services and events inside each one. each domain, event, or service is represented by an index.md file.

An example of a domain:

---
name: Order Management
summary: |
  Domain for Order Management
owners:
    - omid.eidivandi

---
<Admonition>Domain for everything to do with Shopping at our business. Before adding any events or services to this domain make sure you contact the domain owners and verify it's the correct place.</Admonition>
### Details
This domain encapsulates everything in our business that has to do with shopping and users. This might be new items added to our online shop or online cart management.

<NodeGraph title="Domain Graph" />

The result can be generated and presented with a simple configuration file and event catalogue command line interface ( find more here )

Find out more by looking at the online demo: https://app.eventcatalog.dev/events/

Mermaid:

Mermaid is a great tool to cover basic diagrams as code for any level of project and the advantage is that can be integrated into most of the tools easily

sequenceDiagram
    Order->>+Payment: NewOrderCreated!
    Payment->>+Order: PaymentConfirmed!

The documentation represents a contract when designing distributed systems and event-driven design is not an exception. but this kind of documentation just accelerates the development phase and makes easier the evolutions in SDLC, what about run-time needs? what about promise protection?

To cover the tun time needs contract validation is the first step.

Validation:

Validation helps to confirm that an incoming piece of data at transit respects the promises established by a given contract.

For a successful validation process, one of the ways of guaranteeing the promise is versioning and that is why we had different versions based on different parts of events.

Reminder: in part one we introduced the following version items

  • SpecVersion: The Spec version represents the event standard at the enterprise level, this is related to the format of top-level elements of an event.

  • MetadataVersion: The Metadata version represents the version of the Metadata item and its content at the service level for All types of events.

  • DataVersion: The data version has a similar lifecycle as the metadata but its contract and promises are harder to define, this is mostly related to Domain, Aggregate, View and Delta Events.

Who is responsible for event validation? The Producer must guarantee and respect the promised contract, but the validation preferably must be done at the producer side and consumer side, particularly when using the Carried-Event transfer State Pattern or data Aggregation pattern.

Who must know the validation details? The Validation details are better mastered by the producer as the producer better knows the state changes in the system and the proper validation for any state change.

The Documentation must help us to establish a validation process, in other words, the validation must be based on the documentation

Validate definition:

The first step in the validation process will be to validate the contract asyncApi definition, here we use the async API command line interface to validate the definition.

$ npm i -g asyncapi-cli
$ asycapi validate ./docs/streams/asyncapi.yaml

Validate Event Spec:

Till now our main goal was to introduce a contract and validate it as a well-defined document. now we will try to use that contract ( promises ) to validate our events in real-time.

in the validation section, we began with the question about Who is responsible for event validation. and the response was

The Producer must guarantee and respect the promised contract, but the validation preferably must be done at the producer side and consumer side, particularly when using the Carried-Event transfer State Pattern or data Aggregation pattern.

So here we are going to prepare and share an event validation process and let the consumers double-check the event as well.

To validate the event against the definition we first need to access those contract definitions

const { Parser, fromFile } = require('@asyncapi/parser');
const validator = require('./ajv-validator');

class SchemaValidator {
    schema; 
    asyncapiParser;
    defaultFile = 'asyncapi.yaml';
    defaultversion = 'v1';
    ajvValidator = validator;
    constructor(version, path, file) {
        this.file = file ?? this.defaultFile ;
        this.version = version ?? this.defaultversion;
        this.path = path;
        this.asyncapiParser = new Parser();
    }

    getParser = async (path) => {
        return await fromFile(this.asyncapiParser, path).parse();
    }       

    initSchema = async (specVersion) => {
        const ver = specVersion ?? this.version;
        if( !this.schema ) {
            const data = await this.getParser(`${this.path}/${ver}/${this.file}`);
            this.schema = data.document.json();
        }
    }
}

module.exports = {
    SchemaValidator
}

In the above snippet of code, we extract the definition schema as a JSON object, this is an object representing the above async API definition.

now we need to introduce an example event validation based on the schema, referring to the Serverless advocate article here using ajv seems a nice start, there are some other equivalents but here for simplicity, we continue with ajv

The Ajv validator as SchemaValidator

const Ajv = require('ajv');
const addFormats = require('ajv-formats');

const ajvOptions = {  allErrors: true };

function validate(
  obj,
  schema,
  key,
  ref
) {
  const ajv = new Ajv(ajvOptions);
  addFormats(ajv);

  ajv.addVocabulary([
    'asyncapi',
    'info',
    'servers',
    'channels',
    'components',
    'x-parser-api-version',
    'x-parser-spec-parsed',
    'x-parser-schema-id']);

  ajv.addSchema(schema, key);
  const valid = ajv.validate({ $ref: ref }, obj);
  if (!valid) {
    const errorMessage = JSON.stringify(ajv.errors);
    throw new Error(errorMessage);
  }
}

module.exports = { validate };

now we need a SpecValidator as below, this is the event validator that uses the above SchemaValidator as well as the @asyncapi/parser to parse the definition,

const { Parser, fromFile } = require('@asyncapi/parser');
const validator = require('./ajv-validator');

class SchemaValidator {
    schema; 
    asyncapiParser;
    defaultFile = 'asyncapi.yaml';
    defaultversion = 'v1';
    ajvValidator = validator;
    constructor(version, path, file) {
        this.file = file ?? this.defaultFile ;
        this.version = version ?? this.defaultversion;
        this.path = path;
        this.asyncapiParser = new Parser();
    }

    getParser = async (path) => {
        return await fromFile(this.asyncapiParser, path).parse();
    }       

    initSchema = async (specVersion) => {
        const ver = specVersion ?? this.version;
        if( !this.schema ) {
            const data = await this.getParser(`${this.path}/${ver}/${this.file}`);
            this.schema = data.document.json();
        }
    }

    validate = async (event, schemaPath) => {
        await this.initSchema(this.version);
        return this.ajvValidator.validate(event, this.schema, schemaPath.split('#')[0], schemaPath);
    }
}

module.exports = {
    SchemaValidator
}

The code introduces a simple ValidateSpec that validates just the received event against Event Schema.

Let’s Use it

Here the provided event will be validated against the definition via the validateSpec method.

const { SchemaValidator } = require('../../../asyncapi/schema-validator');
const { Version1  } = require('../versions');

const validator = new SchemaValidator(Version1, './shared/streams/order', 'asyncapi.yaml');
const body = {
    specVersion: "1",
    id: "jkhqskdjhqskdhqskdh",
    idempotencyKey: 'AAAAAAAAAAAAAAAAAAAAAAAA',
    source: 'ordermanagement:order',
    time: "2023-01-01T12:54;00.000Z",
    type: 'order.created',
    category: 'INTEGRATION',
    datacontentType: 'application/json',
    dataVersion: "1.0.0",
    dataSchema: 'IntegrationEvent#/components/schemas/IntegrationEvent',
    data:{ 
    }
};

Promise.resolve(validator.validateSpec(
      body,
      body.ref
    ))
    .then(() => 
      console.log("Event respects the enterrise specification")
    ).catch(
        (error) => { 
            const msg = { Errors: JSON.parse(error.message) };
            console.log(msg)
        }
    );

The validator throws an exception with an error object or will pass as a success if the validation passes.

Validate Data:

To validate metadata we need to separate the metadata schema from the event spec schema by changing the asynapi definition

Let’s enrich the definition, first, let’s remove the metadata details from the Event schema

schemas:
    Event:
      type: object
      required:
      - specVersion
      - scope
      - eventType
      - eventName
      - idempotencyKey
      - contentType
      - metadata
      properties:
        specVersion:
          type: string
        scope:
          type: string
        eventType:
          type: string
          enum:
          - INSERT
          - UPDATE
          - DELETE
        eventName:
          type: string
          enum:
          - INTEGRATION
          - DOMAIN
          - NOTIFICATION
          - DELTA
          - CARRIED-STATE
        idempotencyKey:
          type: string
        contentType:
          type: string
          enum:
          - JSON
          - AVRO
          - PROTOBUF
        ref:
          type: string
        metadata:
          type: object

Now, we add OrderCreatedNotification Schema below

OrderEventMetadata:
  type: object
  properties:
    metadata:
      type: object
      required:
        - orderId
        - state
      properties:
        orderId:
          type: string
        state:
          type: string
          enum:
          - OrderCreated
          - OrderCanceled
          - OrderRefused
          - OrderPaymentRejected
          - OrderConfirmed
OrderCreatedNotification:
  allOf:
    - $ref: '#/components/schemas/Event'
    - $ref: '#/components/schemas/OrderEventMetadata'

Now let’s validate an event against the OrderCreatedNotification

const { SchemaValidator } = require('../../../asyncapi/schema-validator');
const { Version1  } = require('../versions');

const validator = new SchemaValidator(Version1, './shared/streams/order', 'asyncapi.yaml');

const bodywithmetadta = {
    specVersion: "1",
    id: "jkhqskdjhqskdhqskdh",
    idempotencyKey: 'AAAAAAAAAAAAAAAAAAAAAAAA',
    source: 'ordermanagement:order',
    time: "2023-01-01T12:54;00.000Z",
    type: 'order.created',
    category: 'INTEGRATION',
    datacontentType: 'application/json',
    dataVersion: "1.0.0",
    dataSchema: 'IntegrationEvent#/components/schemas/IntegrationEvent',
    data:{ 
       orderId: 'lj0OVlHFdHz9hHfgXUSoW',
       state: 'OrderCreated'
    }
};


Promise.resolve(validator.validate(
      bodywithmetadta,
      bodywithmetadta.ref
  ))
  .then(() => 
      console.log("Event respects the Order Notification Specifications")
  ).catch(
      (error) => { 
          const msg = { Errors: JSON.parse(error.message) };
          console.log(msg)
      }
  );

Here the output

Let’s validate an invalid event

const body = {
    specVersion: "1",
    id: "jkhqskdjhqskdhqskdh",
    idempotencyKey: 'AAAAAAAAAAAAAAAAAAAAAAAA',
    source: 'ordermanagement:order',
    time: "2023-01-01T12:54;00.000Z",
    type: 'order.created',
    category: 'INTEGRATION',
    datacontentType: 'application/json',
    dataVersion: "1.0.0",
    dataSchema: 'OrderCreatedNotification#/components/schemas/OrderCreatedNotification',
    data:{ 
      orderId: 'lj0OVlHFdHz9hHfgXUSoW'
    }
};

And here the output

Conclusion:

Distributed communication is an important part of the distributed design and in EDA we need to guarantee the promises, but this validation is better to be done on both producer/consumer sides, but keep in mind that this double validation must not bring more overhead and effort, a better approach is providing a promise as well the Apis in desired programming languages that are used in enterprise-level by the producer.

You can share the promises alongside:

  • AsyncApi Definition

  • Validators

  • Typed models ( like typescript )

  • schema path refs and automated discovery