KAI Server Runner SDK

Code your nodes on top of the KAI Server Runner SDK

The Runner SDK

KAI Server provides an SDK that allows users to receive, process and send messages between nodes. As well as store metrics, predictions or other kinds of data desired by the user.

Currently, KAI Server only supports Python and Golang code.

Thanks to the SDK, users can implement handler and init functions, needed for the correct functionality of nodes. These functions are implemented in different manners depending on the KRT version being used. Refer to KRT v1 and KRT v2 for more information.

KAI Server version, KRT version and runner version are dependent of each other but only compatible in certain manners.
Runner features and implementation differ between the KRT version you are aiming for.
Please refer to SDK KRT v1 and SDK KRT v2 for more information.

However, the context object is common to all versions and will be explained here.

Here is a table depicting compatibility between versions:

Compatibility table

KRT VersionKRE versionEntrypoint versionGolang runner versionPython runner version
V1v1 to v71.x.x and 2.x.x1.x.x and 2.x.x1.x.x and 2.x.x
V2v83.x.x3.x.x and 4.x.x3.x.x

It is recommended to use the latest available version when developing your codebase. You can find published versions at the Konstellation registry.

Context

The context object provides a set of utilities that can be used for different purposes on init and handler functions:

  • In-Memory Storage: A volatile variable storage that will be available from start until the version is stopped or restarted. You can set a value with a key name and retrieve it later by that key.

  • Logging: Functions to register any message or error with different log levels: debug, info, warning, error.

  • Data Persistence: A way to persist and/or retrieve data from a DB.

  • Measurements: An easy way to save any amount of arbitrary measurements you need to use later on an Influx graph.

  • Metrics: Allows you to save predicted and real data in order to feed the pre-generated charts on the “Metrics” menu for each version.

Database

During any part of your workflows you may need to persist data or recover previously saved data from a database. In those cases you can use the following functions of the context’s DB attribute:

    func Find(colName string, query QueryData, res interface{}) error 
    func Save(colName string, data interface{}) error
    
    // QueryData is the following type:
    type QueryData map[string]interface{} 

This functions will store on query data on a MongoDB database, so the data inserted should have a BSON compatible structure. Each runtime has its respective database whose name is <runtime-id>-data.

Logger

You can use four log levels that will be shown on the Admin UI page for the Version it belongs. The SDK will take care of saving the timestamp and tag each log with metadata to help locate the source of the error.

These are the available levels and their variants for multiple parameters:

    ctx.Logger.
    
        func Debug(msg string)
        func Info(msg string)
        func Warn(msg string)
        func Error(msg string)
    
        func Debugf(format string, a ...interface{})
        func Infof(format string, a ...interface{}) 
        func Warnf(format string, a ...interface{}) 
        func Errorf(format string, a ...interface{}) 

Measurements

Metrics can be stored in the runtime’s influx bucket anytime.
In order to do so you can use the function save from the Measurement. To use this function you will have to declare previously tags and fields for your desired point, as if you were creating an influx point.
The save function will take as parameters first the collection name you desire to write to, then a dictionary of fields and a dictionary of tags.

By default, this function will attach to the saved tags to every metric the following tags:

  • “version”: the version name to which this node belongs
  • “workflow”: the workflow name to which this node belongs
  • “node”: the node’s name

It is advised to document yourself about influxDB metrics before using this function.

Here is an example in Go:

func saveMetrics(ctx *kre.HandlerContext, component string) {
  tags := map[string]string{}

  fields := map[string]interface{}{
    "requested_component": component,
  }

  // influx is accessible via the ctx.Measurement variable
  ctx.Measurement.Save("requests", fields, tags)

  fields = map[string]interface{}{
    "called_node": "etl",
  }

  ctx.Measurement.Save("number_of_calls", fields, tags)
}

Predictions

To help to analyze and visualize the reliability of a model, prediction’s data can be stored through the save function given by the prediction attribute of the context. This function receives a timestamp, the predicted value as string, and the real value as string. This data is stored in a MongoDB database named <runtime-id>-data.

    ctx.Prediction.Save(time.Now(), "test-predicted-value", "test-true-value")

SDK for KRT v1

SDK specifications for runners compatible with KRT v1

Runner features

Features available to runners compatible with KRT V1