This function receives a context object that provides methods for logging and storing data that users can later use on the handler function. It doesn’t return any value.
init
function is mainly used to initialize data, for example, loading models. Thus, the init
function will only be called once upon node startup.
import joblib
async def init(ctx):
ctx.logger.info("[worker init]")
ctx.set('model', joblib.load(ctx.path('models/model.joblib')))
package main
import (
"github.com/konstellation-io/kre/runners/kre-go"
)
func handlerInit(ctx *kre.HandlerContext) {
ctx.Logger.Info("init handler")
// Saves a value in the context internal registry
ctx.Set("greeting", "Hello")
}
The handler
function receives a context (the same as the init
function does), and an incoming
payload as raw data.
The context object is shared between different executions.
handler
functions are used to process data, users can implement their service logic here.
Handlers must return a proto valid structure that will be used as payload in the message sent to the
following node. It is compulsory that one request generates one answer.
import pandas as pd
async def handler(ctx, data):
req = ModelOutput()
data.Unpack(req)
market_price, label = get_market_price_and_label(req.price_category, ctx.get('currency'))
ctx.logger.info(f"Estimated market price[{market_price}] with label[{label}]")
# Stores input fields and prediction to measurements
measurement_fields = MessageToDict(req.request, preserving_proto_field_name=True,
including_default_value_fields=True)
measurement_fields['prediction'] = label
ctx.measurement.save(MEASUREMENT, measurement_fields, MEASUREMENT_TAGS)
res = Response()
res.success = True
res.message = 'Predicted market price'
res.market_price = market_price
res.category = label
return res
func handler(ctx *kre.HandlerContext, data *any.Any) (proto.Message, error) {
ctx.Logger.Info("[handler invoked]")
req := &Request{}
res := &EtlOutput{}
err := anypb.UnmarshalTo(data, req, proto2.UnmarshalOptions{})
if err != nil {
return res, fmt.Errorf("invalid request: %s", err)
}
res.Component = req.Component
saveEtlMetrics(ctx, req.Component)
return res, nil
}
Once the init
and handler
functions have been declared and implemented,
they have to be served in a certain way, so runners recognize them and execute them automatically.
Just by having two functions named init and handler. Python runners will interpret what source code is given.
async def init(ctx):
...
async def handler(ctx, data):
...
Golang’s nodes use the Golang runners as a package. To load the runners, you need to execute
the function start
implemented by the runners’ package passing down as its variables
the functions that serve as init
and handler
.
We recommend doing this inside the main
function in the main.go
file. Then, implement said
init
and handler
functions in a different package.
func main() {
kre.Start(handlerInit, handler)
}