kyo-flow

Durable workflow engine for Kyo. Workflows are defined as composable, type-safe plans that the engine persists, coordinates across multiple executors, and recovers automatically after crashes.

A Flow is a plan, not an execution. You describe what should happen (values to compute, inputs to wait for, side effects to perform, branches to take) and the engine handles the rest. Every step is checkpointed to a store before the next begins. If the process crashes, another executor claims the work and replays from the last checkpoint, skipping steps that already completed. Side effects in steps must be idempotent because they may re-execute on recovery.

The engine coordinates multiple executors via time-limited claim leases, supports compensation handlers for saga-style rollback, provides retry and timeout per step, emits a full event audit trail, and exposes an auto-generated REST API. Workflow structure can be rendered as Mermaid, DOT, BPMN, ELK, or JSON diagrams. The module compiles across JVM, JavaScript, and Scala Native.

Getting Started

Add the dependency to your build.sbt:

libraryDependencies += "io.getkyo" %% "kyo-flow" % "<latest version>"

The simplest workflow computes a value and stores it:

val flow = Flow.init("hello")
    .output("greeting")(_ => "Hello, World!")

Flow.init creates a named workflow. .output("greeting") computes a value, persists it under the name "greeting", and makes it available to subsequent steps.

Run a workflow locally for testing:

val result = Flow.runLocal(flow, Record.empty)

Outputs

The function passed to .output receives a context record containing all fields produced so far. Here there are none, so we ignore it with _. When there are prior fields, each one is accessible by name with full type safety:

val flow = Flow.init("pricing")
    .output("price")(_ => 100)
    .output("tax")(ctx => ctx.price * 0.08)
    .output("total")(ctx => ctx.price + ctx.tax)

ctx.price is statically typed as Int: accessing a field that doesn't exist is a compile error. This works through Kyo's Record type, which tracks fields as an intersection of Name ~ Value pairs. Each .output call adds a field to the record type:

// After .output("price"), the context type includes "price" ~ Int
// After .output("tax"), it includes "price" ~ Int & "tax" ~ Double
// After .output("total"), it includes all three fields

The three type parameters on Flow[In, Out, S] track this automatically:

  • In: required inputs (accumulated via .input)
  • Out: produced outputs (accumulated via .output, .loop, .dispatch, etc.)
  • S: pending effect types from step computations

Inputs

An input declares a value the workflow needs from the outside world. The execution suspends at the input node until the value is delivered externally via a signal:

val flow = Flow.init("order")
    .input[Order]("order")
    .output("total")(ctx => ctx.order.qty * 100)
    .output("receipt")(ctx => s"${ctx.order.item} x${ctx.order.qty} = ${ctx.total}")

Input types must have a Schema instance for serialization. Like outputs, each input adds a typed field to the context.

For testing, pre-populate inputs with runLocal:

val result = Flow.runLocal(flow, "order" ~ Order("Widget", 3))

The ~ operator creates a typed record field: "order" ~ Order("Widget", 3) is a Record["order" ~ Order]. Multiple fields combine with &: "x" ~ 1 & "y" ~ "hello".

In production, inputs arrive via the engine's signal API or the HTTP endpoint POST /api/v1/executions/:eid/signal/order.

Steps

A step performs a side effect without producing a named value (HTTP calls, database writes, sending notifications):

val flow = Flow.init("notify")
    .input[String]("email")
    .output("message")(ctx => s"Welcome, ${ctx.email}!")
    .step("send")(ctx => sendEmail(ctx.email, ctx.message))

On replay after a crash, completed steps are skipped. Steps are tracked by their completion event in the audit trail, not by a stored value.

Sleep

Sleep pauses the execution for a duration. The pause is durable: if the process restarts, the engine calculates the remaining time and resumes when it expires.

val flow = Flow.init("delayed")
    .input[String]("orderId")
    .step("process")(ctx => processOrder(ctx.orderId))
    .sleep("cooldown", 1.hour)
    .step("followUp")(ctx => sendFollowUp(ctx.orderId))

Note: Parking (sleep, waiting for an input, or a lost claim) is not a failure. The engine signals it with a non-Throwable value (FlowSuspension), so a parked execution releases its in-memory state, never fires compensation handlers, and is resumed later from the store rather than being marked failed.

Branching

Dispatch evaluates conditions in order and executes the first matching branch. It follows a builder pattern: start with .dispatch[V] specifying the result type, chain .when branches, and close with .otherwise:

val flow = Flow.init("approval")
    .input[Int]("amount")
    .dispatch[String]("decision")
    .when(ctx => ctx.amount > 1000, name = "review")(ctx => "needs review")
    .when(ctx => ctx.amount > 100, name = "auto")(ctx => "auto-approved")
    .otherwise(ctx => "instant", name = "default")
    .step("notify")(ctx => notifyResult(ctx.decision))

The type parameter [String] is the type of the value all branches must produce. The result is persisted under the dispatch name ("decision") and accessible downstream as ctx.decision. Calling .otherwise is required, and the compiler enforces this by making further chaining methods unavailable until the dispatch is closed.

Loops

A loop iterates until the body returns Loop.done(value). Each iteration can return Loop.continue to keep going or Loop.done(result) to finish. The final value is stored as a named output:

val flow = Flow.init("poll")
    .input[String]("url")
    .loop("result") { ctx =>
        checkStatus(ctx.url).map {
            case "ready" => Loop.done("complete")
            case _       => Loop.continue
        }
    }

Loops can carry state between iterations. The second argument to .loop is the initial state, and the body receives (state, ctx):

val flow = Flow.init("accumulate")
    .input[Int]("target")
    .loop("sum", 0) { (acc, ctx) =>
        if acc >= ctx.target then Loop.done(acc)
        else Loop.continue(acc + 1)
    }

Scheduled loops (loopOn) insert durable sleeps between iterations using a Schedule. Each iteration is checkpointed independently, so recovery resumes from the last completed iteration:

val flow = Flow.init("monitor")
    .input[String]("endpoint")
    .loopOn("check", Schedule.fixed(5.minutes)) { ctx =>
        probe(ctx.endpoint).map {
            case "healthy" => Loop.continue
            case status    => Loop.done(s"alert: $status")
        }
    }

ForEach

ForEach processes each element of a collection and stores all results as a Chunk:

val flow = Flow.init("batch")
    .input[Seq[String]]("urls")
    .foreach("results")(ctx => ctx.urls)(url => fetch(url))

Composition

Sequential

andThen sequences two flows: the first completes, then the second starts with access to all prior outputs.

val combined = validateFlow.andThen(processFlow)

Parallel

zip runs two flows in parallel and merges their outputs. Both must complete:

val parallel = pricingFlow.zip(inventoryFlow)

For more than two, use gather:

val all = Flow.gather(pricingFlow, inventoryFlow, shippingFlow)

Racing

race runs two flows in parallel. The first to complete wins and the other is abandoned.

val fastest = Flow.race(primaryFlow, fallbackFlow)

Subflows

subflow embeds a child flow within a parent. The input mapper transforms the parent's context into the child's expected inputs:

val childFlow = Flow.init("payment-child")
    .input[Int]("amount")
    .output("confirmation")(ctx => s"paid:${ctx.amount}")

val parent = Flow.init("parent")
    .input[Order]("order")
    .subflow("payment", childFlow)(ctx =>
        "amount" ~ (ctx.order.qty * ctx.order.price)
    )
    .step("ship")(_ => ())

Error Handling

Retry and Timeout

Any output or step can specify a per-attempt timeout and a retry schedule:

val flow = Flow.init("resilient")
    .input[String]("url")
    .output(
        "data",
        timeout = 10.seconds,
        retry = Maybe(Schedule.exponentialBackoff(1.second, 2.0, 1.minute))
    )(ctx => fetchData(ctx.url))

Each attempt is independently timed. When the schedule exhausts, the last error propagates.

Compensation

Outputs and steps can register compensation handlers that fire in reverse order when a later step fails. This implements the saga pattern for distributed transactions:

val flow = Flow.init("saga")
    .input[Order]("order")
    .outputCompensated("reservation")(ctx =>
        reserveInventory(ctx.order)
    )(ctx =>
        cancelReservation(ctx.reservation)
    )
    .outputCompensated("charge")(ctx =>
        chargeCard(ctx.order)
    )(ctx =>
        refundCard(ctx.charge)
    )
    .step("ship")(ctx => ship(ctx.order))

If ship fails, compensations run in reverse: first refundCard, then cancelReservation. Compensations must be idempotent.

For error recovery within a step body, use Kyo's Abort.recover:

Flow.init("recover")
    .input[String]("input")
    .output("result")(ctx =>
        Abort.recover[Throwable](_ => "fallback")(riskyOperation(ctx.input))
    )

Exception Types

Engine operations fail with specific FlowException subtypes organized into sealed groups for pattern matching:

GroupExceptionMeaning
FlowWorkflowExceptionFlowWorkflowNotFoundExceptionWorkflow not in store
FlowWorkflowNotRegisteredExceptionWorkflow not registered with engine
FlowExecutionStateExceptionFlowExecutionNotFoundExceptionExecution not found
FlowExecutionTerminalExceptionCannot signal terminal execution
FlowDuplicateExecutionExceptionExecution ID already exists
FlowSignalExceptionFlowSignalNotFoundExceptionInput name doesn't exist
FlowSignalTypeMismatchExceptionSignal type doesn't match
FlowInputAlreadyDeliveredExceptionInput already delivered

API methods use precise Abort union types, so you can handle exactly the errors each method can produce.

Running Workflows

Local

Flow.runLocal runs a flow in-memory, blocking until completion. Useful for tests:

val simpleFlow = Flow.init("demo").input[Int]("x").output("doubled")(ctx => ctx.x * 2)
val result     = Flow.runLocal(simpleFlow, "x" ~ 42)

Server

Flow.runServer starts an HTTP server with REST endpoints for all registered workflows:

// In-memory store (development)
val serverDev: HttpServer < (Async & Scope) = Flow.runServer(orderFlow, shippingFlow)

// Durable store (production)
val serverProd: HttpServer < (Async & Scope) =
    FlowStore.initMemory.map(store => Flow.runServer(store, orderFlow, shippingFlow))

The server exposes:

MethodPathDescription
GET/api/v1/workflowsList workflows
GET/api/v1/workflows/:idWorkflow metadata
GET/api/v1/workflows/:id/diagramWorkflow diagram
POST/api/v1/workflows/:id/executionsStart execution
GET/api/v1/executions/:eidExecution status
POST/api/v1/executions/:eid/signal/:nameDeliver input
GET/api/v1/executions/:eid/inputsInput delivery status
GET/api/v1/executions/:eid/historyEvent history
GET/api/v1/executions/:eid/diagramDiagram with progress
POST/api/v1/executions/:eid/cancelCancel execution
POST/api/v1/executions/searchSearch executions
POST/api/v1/executions/cancelCancel matching executions

To compose with your own endpoints, use Flow.runHandlers:

FlowStore.initMemory.map { store =>
    Flow.runHandlers(store, orderFlow).map { handlers =>
        HttpServer.init(handlers.toSeq*)
    }
}

Engine

FlowEngine provides the full programmatic API without HTTP:

val engineEffect: FlowEngine < (Async & Scope) =
    FlowStore.initMemory.map(store => FlowEngine.init(store, orderFlow, shippingFlow))
engineEffect.map { engine =>
    for
        handle <- engine.workflows.start(Flow.Id.Workflow("order"))
        _      <- handle.signal("order", Order("Widget", 3))
        status <- handle.status
    yield status
}

The engine runs worker fibers that poll the store, claim executions via time-limited leases, and interpret the flow step by step. Configuration:

FlowStore.initMemory.map { store =>
    FlowEngine.init(
        store,
        workerCount = 4,
        lease = 30.seconds,
        renewEvery = 10.seconds,
        batchSize = 8,
        pollTimeout = 30.seconds,
        flows = Seq(orderFlow, shippingFlow)
    )
}

Monitoring

Status

Executions transition through a status machine:

Running ──→ Completed
Running ──→ Failed (compensations run first if registered)
Running ──→ WaitingForInput ──→ Running (on signal)
Running ──→ Sleeping ──→ Running (on expiry)
Running ──→ Compensating ──→ Failed
Any non-terminal ──→ Cancelled
val eid: Flow.Id.Execution = Flow.Id.Execution("exec-123")
val monitorEffect =
    FlowStore.initMemory.map { store =>
        FlowEngine.init(store, orderFlow).map { engine =>
            engine.executions.describe(eid).map { detail =>
                val _status   = detail.status   // Flow.Status
                val _progress = detail.progress // step-by-step node progress
                val _inputs   = detail.inputs   // which inputs are delivered
            }
        }
    }

Events

Every state change is recorded as a Flow.Event:

FlowStore.initMemory.map { store =>
    FlowEngine.init(store, orderFlow).map { engine =>
        engine.executions.history(eid).map { page =>
            val _events  = page.events  // Chunk[Flow.Event]
            val _hasMore = page.hasMore // pagination
        }
    }
}

Event kinds: Created, StepStarted, StepCompleted, StepRetried, StepTimedOut, InputWaiting, InputReceived, SleepStarted, SleepCompleted, ExecutionResumed, ExecutionClaimed, ExecutionReleased, Completed, Failed, CompensationStarted, CompensationCompleted, CompensationFailed, Cancelled.

Diagrams

Render workflow structure or execution progress:

val wfId: Flow.Id.Workflow = Flow.Id.Workflow("order")
FlowStore.initMemory.map { store =>
    FlowEngine.init(store, orderFlow).map { engine =>
        engine.workflows.diagram(wfId, Flow.DiagramFormat.Mermaid)
        engine.executions.diagram(eid, Flow.DiagramFormat.Dot)
    }
}

Supported formats: Mermaid, Dot, Bpmn, Elk, Json. Also available directly on a flow definition:

Flow.renderMermaid(orderFlow)

Custom Store

The in-memory store (FlowStore.initMemory) is for development and testing. For production, implement FlowStore against a durable database:

class PostgresFlowStore(pool: ConnectionPool) extends FlowStore:
    def claimReady(): Unit   = ??? // SELECT ... FOR UPDATE SKIP LOCKED
    def updateStatus(): Unit = ??? // UPDATE + INSERT in one transaction
    // ... 15 abstract methods total
end PostgresFlowStore

Key invariants:

  • claimReady never returns the same execution to two concurrent callers
  • updateStatus writes event + status atomically
  • putFieldIfAbsent is an atomic check-and-write (exactly-once)
  • renewClaim returns false if the claim was taken by another executor
  • Terminal status cannot revert to non-terminal

Multi-Executor Coordination

Multiple engine instances on the same store coordinate automatically via claim leases:

FlowStore.initMemory.map { store =>
    // Instance A
    FlowEngine.init(store, workerCount = 2, lease = 30.seconds, flows = Seq(orderFlow))
    // Instance B (same store, separate process)
    FlowEngine.init(store, workerCount = 2, lease = 30.seconds, flows = Seq(orderFlow))
}

If an executor crashes, its lease expires and another executor picks up the work.

Coordination rests on claimReady handing each ready execution to exactly one executor under a renewable lease. A step that was in flight when an executor died may run again on the executor that reclaims the lease, but a step already recorded as completed is skipped on replay. This is why step side effects must be idempotent.