This function receives a context object that provides methods for logging and storing data that users can later use on the handler function. It doesn’t return any value.
init
function is mainly used to initialize data, for example, loading models. Thus, the init
function will only be called once upon node startup.
import joblib
async def init(ctx):
ctx.logger.info("[worker init]")
ctx.configuration.set('model', joblib.load(ctx.path('models/model.joblib')))
package main
import (
"github.com/konstellation-io/kre/runners/kre-go"
)
func handlerInit(ctx *kre.HandlerContext) {
ctx.Logger.Info("worker init")
address := fakeModelLoader(ctx.Path("models/model.joblib"))
ctx.Configuration.Set("model", address)
}
The handler
function receives a context (the same as the init
function does), and an incoming
payload as raw data.
The context object is shared between different executions.
handler
functions are used to process data, users can implement their service logic here.
Handlers in this version don’t need to reply nor generate any message per request and can generate
several responses to different subjects if desired as well.
To send a response, users need to make use of the function send_output
that comes within the
context object. The output sent must be a proto valid structure as it will be used as the
payload in the message sent to the following node.
Also, in this runner version users can decide which function implemented by the node will act as
the handler
depending on the source of the incoming payload.
async def default_handler(ctx, req):
if ctx.is_message_early_reply():
return
etl_output = EtlOutput()
req.Unpack(etl_output)
email = etl_output.email
category = classify_email(email)
res = ClassificatorOutput()
res.email.CopyFrom(email)
res.category = category
if category == CATEGORY_REPARATIONS:
await ctx.send_output(res, "repairs")
await ctx.send_output(res)
func handler(ctx *kre.HandlerContext, data *anypb.Any) error {
ctx.Logger.Info("[handler invoked]")
req := &proto.ClassificatorOutput{}
err := anypb.UnmarshalTo(data, req, protobuf.UnmarshalOptions{})
if err != nil {
return fmt.Errorf("invalid request: %s", err)
}
err = storeEmail(ctx, req.Email)
if err != nil {
ctx.Logger.Errorf("error storing email: %w", err)
}
return nil
}
Once the init
and handler
functions have been declared and implemented,
they have to be served in a certain way, so runners recognize them and execute them automatically.
The init
function will be a function named init.
async def init(ctx):
...
As for the handlers, the function named default_handler will act as the default handler. Custom handlers can be declared inside a dictionary named custom_handlers, the dictionary will have as key the expected source node name and value the function to execute.
import pandas as pd
async def default_handler(ctx, data):
ctx.logger.info("message received from non recognized node, executing default handler")
...
async def repairs_handler(ctx, data):
ctx.logger.info("message received from node repairs, executing repairs handler")
...
async def spam_handler(ctx, data):
ctx.logger.info("message received from node spam, executing spam handler")
...
custom_handlers = {
"repairs": repairs_handler,
"spam": spam_handler,
}
Golang’s nodes use the Golang runners as a package. To load the runners, you need to execute
the function start
implemented by the runners’ package passing down as its variables
the functions that serve as init
and handler
.
We recommend doing this inside the main
function in the main.go
file. Then, implement said
init
and handler
functions in a different package.
To load default and custom handlers, they have to be used in the function start
from the runner’s
package as their arguments. The first argument will be the init function, the second the
default handler and the third a map that will have as key the expected source node name and value
the function to execute.
func main() {
handlers := map[string]kre.Handler{
"entrypoint": entrypointHandler,
"nodeA": nodeAHandler,
}
kre.Start(handlerInit, defaultHandler, handlers)
}