SDK for KRT v1

SDK specifications for runners compatible with KRT v1

Init function

This function receives a context object that provides methods for logging and storing data that users can later use on the handler function. It doesn’t return any value.

init function is mainly used to initialize data, for example, loading models. Thus, the init function will only be called once upon node startup.

Python init example

import joblib

async def init(ctx):
  ctx.logger.info("[worker init]")
  ctx.set('model', joblib.load(ctx.path('models/model.joblib')))

Golang init example

package main

import (
  "github.com/konstellation-io/kre/runners/kre-go"
)

func handlerInit(ctx *kre.HandlerContext) {
  ctx.Logger.Info("init handler")
    
  // Saves a value in the context internal registry
  ctx.Set("greeting", "Hello")   
}

Handler function

The handler function receives a context (the same as the init function does), and an incoming payload as raw data.
The context object is shared between different executions.

handler functions are used to process data, users can implement their service logic here. Handlers must return a proto valid structure that will be used as payload in the message sent to the following node. It is compulsory that one request generates one answer.

Python handler example for runner compatible with KRT V1

import pandas as pd

async def handler(ctx, data):
  req = ModelOutput()
  data.Unpack(req)

  market_price, label = get_market_price_and_label(req.price_category, ctx.get('currency'))
  ctx.logger.info(f"Estimated market price[{market_price}] with label[{label}]")

  # Stores input fields and prediction to measurements
  measurement_fields = MessageToDict(req.request, preserving_proto_field_name=True,
                                       including_default_value_fields=True)
  measurement_fields['prediction'] = label
  ctx.measurement.save(MEASUREMENT, measurement_fields, MEASUREMENT_TAGS)

  res = Response()
  res.success = True
  res.message = 'Predicted market price'
  res.market_price = market_price
  res.category = label

  return res

Golang handler example for runner compatible with KRT V1

func handler(ctx *kre.HandlerContext, data *any.Any) (proto.Message, error) {
  ctx.Logger.Info("[handler invoked]")

  req := &Request{}
  res := &EtlOutput{}

  err := anypb.UnmarshalTo(data, req, proto2.UnmarshalOptions{})
  if err != nil {
    return res, fmt.Errorf("invalid request: %s", err)
  }

  res.Component = req.Component

  saveEtlMetrics(ctx, req.Component)

  return res, nil
}

How to provide functions to the runners

Once the init and handler functions have been declared and implemented, they have to be served in a certain way, so runners recognize them and execute them automatically.

For runners compatible with KRT V1

Python

Just by having two functions named init and handler. Python runners will interpret what source code is given.

async def init(ctx):
  ...

async def handler(ctx, data):
  ...

Golang

Golang’s nodes use the Golang runners as a package. To load the runners, you need to execute the function start implemented by the runners’ package passing down as its variables the functions that serve as init and handler.
We recommend doing this inside the main function in the main.go file. Then, implement said init and handler functions in a different package.

func main() {
  kre.Start(handlerInit, handler)
}