Runner features

Features available to runners compatible with KRT V2

Send Output

The ‘SendOutput’ functionality is the bread and butter of all node. Nodes are able to publish from 0 to n messages to their own NATS subject. In order to do so, the ‘SendOutput’ function is called using a protobuf structure as argument.
This function is used when users want to publish a known protobuf structure as the payload.

Here is an example in Python:

async def default_handler(ctx, _):
    ctx.logger.info("[executing default handler]")
    emails = ctx.get("emails")

    res = Response()
    res.message = f"Processing of {len(emails)} emails in progress"
    await ctx.send_early_reply(res)

    for email in emails:
        etl_output = EtlOutput()
        etl_output.email.CopyFrom(email)
        await ctx.send_output(etl_output)
    return

Send Any

The ‘SendAny’ functionality is very similar to the ‘SendOut’. Nodes are able to publish from 0 to n messages to their own NATS subject using this function as well. What differs is that the protobuf structure used as payload has not been declared previously by the handler.
This function is used when users want to publish an unknown protobuf structure as the payload. As for example redirecting incoming messages.

Here is an example in Python:

async def default_handler(ctx, data):
  ctx.logger.info("[executing default handler]")

  if os.environ["REDIRECT_MESSAGES"] == "true":
    ctx.send_any(data) # we don't know which proto is data
    return
  
  ...

Message Types

When publishing a payload to a subject the payload is encapsulated in a kre message structure, this message has a type assigned to it. There are 4 different message types:

  • OK: Message has been processed correctly.
  • Error: This message encountered errors during its process, there is no payload attached.
  • Early reply: For early reply operations (see description below).
  • Early exit: For early exit operations (see description below).

‘SendOutput’ and ‘SendAny’ functions will send OK typed messages. Error typed messages will be sent by the runners if an error occurred.
The context implements a function to check for each message type:

  • ‘IsMessageOK’
  • ‘IsMessageError’
  • ‘IsMessageEarlyReply’
  • ‘IsMessageEarlyExit’

Early Reply

‘SendEarlyReply’ function will publish any desired payload with an EarlyReply type attached to it. Users can later on check on this by using the function ‘IsMessageEarlyReply’.

Users may use this functionality to quickly reply synchronous GRPC requests then handle the execution on a second plane.

It is advised that the exitpoint is subscribed to nodes capable of emitting early reply messages, then handle the early reply and reply to the entrypoint.
Nodes subscribed to early reply emitting nodes should be capable of dealing with this type of message as well.
Also, to send a payload expected by the entrypoint. This can be done by using the SendAny function in the exitpoint and just redirect the payload to the entrypoint.

Here is an example in Go:

func firstNodeHandler(ctx *kre.HandlerContext, data *any.Any) error {
  ctx.Logger.Info("[first node handler invoked]")

  // don't keep the entrypoint waiting
  finalRes := &proto.Response{}
  finalRes.result = "processing messages..."
  ctx.SendEarlyReply(finalRes)

  req := &Request{}
  res := &Output{}

  ...

  return ctx.SendOutput(res)
}

func secondNodeHandler(ctx *kre.HandlerContext, data *any.Any) error {
  ctx.Logger.Info("[second node handler invoked]")

  // early replies are to be ignored
  if ctx.IsEarlyReply(){
    return
  }

  ...

}

func exitpointHandler(ctx *kre.HandlerContext, data *any.Any) error {
  ctx.Logger.Info("[exitpoint handler invoked]")

  // early replies are to be reported to the entrypoint
  if ctx.IsEarlyReply(){
    ctx.SendAny(data)
    return
  }

  req := &ExitpointRequest{}

  // process result if not an early reply
  err := anypb.UnmarshalTo(data, req, protobuf.UnmarshalOptions{})
  if err != nil {
    return fmt.Errorf("invalid request: %s", err)
  }
  
  ctx.DB.Save("results", req)

  ...

}

Early Exit

‘SendEarlyExit’ function will publish any desired payload with an EarlyExit type attached to it. Users can later on check on this by using the function ‘IsMessageEarlyExit’.

Users may use this functionality to halt execution of a workflow without necessarily throwing an exception or an error.

It is advised that the exitpoint is subscribed to nodes capable of emitting early exit messages, then handle the early exit and reply to the entrypoint.
Nodes subscribed to early reply emitting nodes should be capable of dealing with this type of messages as well.
Also, to send a payload expected by the entrypoint. This can be done by using the SendAny function in the exitpoint and just redirect the payload to the entrypoint.

Here is an example in Go:

func firstNodeHandler(ctx *kre.HandlerContext, data *any.Any) error {
  ctx.Logger.Info("[handler invoked]")

  req := &Request{}
  err := anypb.UnmarshalTo(data, req, protobuf.UnmarshalOptions{})
  if err != nil {
    return fmt.Errorf("invalid request: %s", err)
  }

  // we are not processing tests samples minor than 10 emails
  if len(req.emails) < 10 {
    finalRes := &FinalRes{}
    finalRes.result = "email length minor than 10, stopping execution"
    ctx.SendEarlyReply(finalRes)
    return
  }

  res := &Output{}

  ...

  return ctx.SendOutput(res)
}

func secondNodeHandler(ctx *kre.HandlerContext, data *any.Any) error {
  ctx.Logger.Info("[second node handler invoked]")

  // early exits are to be ignored
  if ctx.IsEarlyExit(){
    return
  }

  ...

}

func exitpointHandler(ctx *kre.HandlerContext, data *any.Any) error {
  ctx.Logger.Info("[exitpoint handler invoked]")

  // early exits are to be reported to the entrypoint
  if ctx.IsEarlyExit(){
    ctx.SendAny(data)
    return
  }

  ...

}