SDK for KRT v2

SDK specifications for runners compatible with KRT v2

Init function

This function receives a context object that provides methods for logging and storing data that users can later use on the handler function. It doesn’t return any value.

init function is mainly used to initialize data, for example, loading models. Thus, the init function will only be called once upon node startup.

Python init example

import joblib

async def init(ctx):
  ctx.logger.info("[worker init]")
  
  ctx.configuration.set('model', joblib.load(ctx.path('models/model.joblib')))

Golang init example

package main

import (
  "github.com/konstellation-io/kre/runners/kre-go"
)

func handlerInit(ctx *kre.HandlerContext) {
  ctx.Logger.Info("worker init")

  address := fakeModelLoader(ctx.Path("models/model.joblib"))
  ctx.Configuration.Set("model", address)   
}

Handler function

The handler function receives a context (the same as the init function does), and an incoming payload as raw data.
The context object is shared between different executions.

handler functions are used to process data, users can implement their service logic here. Handlers in this version don’t need to reply nor generate any message per request and can generate several responses to different subjects if desired as well.
To send a response, users need to make use of the function send_output that comes within the context object. The output sent must be a proto valid structure as it will be used as the payload in the message sent to the following node.

Also, in this runner version users can decide which function implemented by the node will act as the handler depending on the source of the incoming payload.

Python handler example for runner compatible with KRT V2

async def default_handler(ctx, req):
    if ctx.is_message_early_reply():
        return

    etl_output = EtlOutput()
    req.Unpack(etl_output)
    email = etl_output.email
    category = classify_email(email)

    res = ClassificatorOutput()
    res.email.CopyFrom(email)
    res.category = category
    if category == CATEGORY_REPARATIONS:
        await ctx.send_output(res, "repairs")
    await ctx.send_output(res)

Golang handler example for runner compatible with KRT V2

func handler(ctx *kre.HandlerContext, data *anypb.Any) error {
 ctx.Logger.Info("[handler invoked]")

 req := &proto.ClassificatorOutput{}
 err := anypb.UnmarshalTo(data, req, protobuf.UnmarshalOptions{})
 if err != nil {
  return fmt.Errorf("invalid request: %s", err)
 }

 err = storeEmail(ctx, req.Email)
 if err != nil {
  ctx.Logger.Errorf("error storing email: %w", err)
 }

 return nil
}

How to provide functions to the runners

Once the init and handler functions have been declared and implemented, they have to be served in a certain way, so runners recognize them and execute them automatically.

For runners compatible with KRT V2

Python

The init function will be a function named init.

async def init(ctx):
  ...

As for the handlers, the function named default_handler will act as the default handler. Custom handlers can be declared inside a dictionary named custom_handlers, the dictionary will have as key the expected source node name and value the function to execute.

import pandas as pd

async def default_handler(ctx, data):
    ctx.logger.info("message received from non recognized node, executing default handler")
    ...

async def repairs_handler(ctx, data):
    ctx.logger.info("message received from node repairs, executing repairs handler")
    ...

async def spam_handler(ctx, data):
    ctx.logger.info("message received from node spam, executing spam handler")
    ...

custom_handlers = {
    "repairs": repairs_handler,
    "spam": spam_handler,
}

Golang

Golang’s nodes use the Golang runners as a package. To load the runners, you need to execute the function start implemented by the runners’ package passing down as its variables the functions that serve as init and handler.
We recommend doing this inside the main function in the main.go file. Then, implement said init and handler functions in a different package.

To load default and custom handlers, they have to be used in the function start from the runner’s package as their arguments. The first argument will be the init function, the second the default handler and the third a map that will have as key the expected source node name and value the function to execute.

func main() {
  handlers := map[string]kre.Handler{
    "entrypoint": entrypointHandler,
    "nodeA": nodeAHandler,
  }

  kre.Start(handlerInit, defaultHandler, handlers)
}