Parallel runners with teardown in go
Tagged
golang
, dev
From time to time in go I have to start multiple small services acting in
parallel. For example, a ssh server tied to an administration console
(one is running on port 80 the other on port 22), or a kafka consumer pushing
to a database and a website to serve those informations. You can decouple this
in multiple programs, or run them through some kind of a manager and handle
everything at the same place. Both solutions have trades off, but we will look
at the last one today, because we will write a Golang wrapper for this.
What do we want to achieve
The basis is running multiple services in parallel, handle the potential errors
and bubble them to stop the program if needed. Furthermore, we’d like to have
support for teardown, and interrupt.
In go there’s multiple way to do this, one of them is channel
the other one
is the sync
package. During this development I found it easier to deal with
sync
(you can still go for channel
).
We will also take a deeper look at Golang interfaces, and design patterns.
If you still haven’t seen this excellent
talk, just do it,
we will use some of its points.
I will also publish the code in
my snippet repo.
First draft, build base interface
Let’s define what we will provide to the outside world as a simple interface.
import (
"context"
)
type (
// This interface is simple enough, it gets a context which will be used
// to propagate some event down the pipe. We also want to handle potential
// errors.
Runner interface {
Run(ctx context.Context) error
}
// This is more boilerplate, it will help us write runners without needing
// structs. This is related to Tomas Senart video.
RunnerFunc func(context.Context) error
)
func (rf RunnerFunc) Run(ctx context.Context) error { return rf(ctx) }
Okay, let’s check what a simple implementation can look like. An example is
worth a thousand words.
import (
"context"
"fmt"
"time"
)
// Miscellaneous runner because why not.
func Misc() Runner {
// Here we use a bit of functionnal programming, to return a function implementing
// our interface.
return RunnerFunc(func(ctx context.Context) error {
fmt.Println("Starting doing something...")
// We will now block on two different events, either our work finish
// normally, or we receive a done event from the context.
select {
// The normal workflow need to use a channel, to allow event base handling
// here is a good example: https://gobyexample.com/select.
// We just use the time package to wait a fixed duration.
case <-time.After(20 * time.Second):
fmt.Println("Done doing something...")
return nil // nothing went wrong so no need to return an error
// Here is the event received from the context, it asks to stop the current
// work. We perform a teardown operation which takes a given time (just sleeping here).
case <-ctx.Done():
fmt.Println("Tearing down...")
time.Sleep(5 * time.Second)
fmt.Println("Tearing down, finished...")
// Everything went ok, so no need to return any error, if teardown fails
// we can propagate error
return nil
}
})
}
We can now implement the interrupt logic:
import (
"context"
"os"
"os/signal"
)
func Interrupt() Runner {
// Plug the interrupt call to a channel, so we can easily get the event.
interrupt := make(chan os.Signal)
signal.Notify(interrupt, os.Interrupt)
// Build our runner.
return RunnerFunc(func(ctx context.Context) error {
// Here we block and return on either event is the first one to fire.
select {
case <-interrupt:
return nil
case <-ctx.Done():
return nil
}
})
}
And finally, the service consumming this runner thing and handling the logic.
As I said we will use the sync
package here and rely on go
keyword for
the parallelization.
import (
"context"
"sync"
)
type (
// Our manager is just a slice of tasks which we want to run.
RunnerManager []Runner
)
// Only one function required, we block the program until all execution has been done.
// We use a context as argument, so user can provide more constraints, like
// timeouts or external cancellation
// (see: https://golang.org/pkg/context/#WithCancel or https://golang.org/pkg/context/#WithTimeout for ideas).
// We also return an error, for the moment it will be the first caught one if any,
// we will improve this later.
func (rm RunnerManager) Wait(ctx context.Context) error {
// Required variables:
wg := sync.WaitGroup{} // WaitGroup keep tracks of running goroutines.
ctx, cancel := context.WithCancel(ctx) // Enhance the base context with a cancel function.
errors := []error{} // We need to store errors, as multiple can come.
// We loop over all the tasks we want to run
for _, runner := range rm {
// Add it to our group, this has to be done before going parallel as
// go routines may not have started before the end of the function
// and main function, causing program to exit before the work starts
wg.Add(1)
// Start a goroutine, collect errors, decrement group when task is done
go func(runner Runner) {
if err := runner.Run(ctx); err != nil {
errors = append(errors, err)
}
// Task ended, decrement WaitGroup counter
wg.Done()
// Once task is done, we want to cancel all the remaining ones.
// This is a design choice, nothing stops you from continuing execution
// and only cancel when an error is caught.
cancel()
}(runner)
}
// Use the WaitGroup counter to block execution until all tasks finished.
wg.Wait()
// Return first caught error.
if len(errors) != 0 {
return errors[0]
} else {
return nil
}
}
How to use it
Now, let’s write our main function to see how all of this perform.
import (
"fmt"
)
func main() {
// We queue our two tasks, and use a default context
RunnerManager{Interrupt(), Misc()}.Wait(context.Background())
}
Just play around, try to interrupt or let go, things should work as expected
(I hope so ;)).
Going further
Better error handling
We may need to record all errors and return them.
To do so we will define a new kind of error based on an array.
type (
// Simple type, just a list of errors
Errors []error
)
// We only print the first one, yet another choice, feel free to customize according to your needs.
// We let it panic, cause we should have at least one error here.
func (e Errors) Error() string { return e[0].Error() }
// Adapt the function accordingly
func (rm RunnerManager) Wait(ctx context.Context) error {
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(ctx)
errors := []error{}
for _, runner := range rm {
wg.Add(1)
go func(runner Runner) {
if err := runner.Run(ctx); err != nil {
errors = append(errors, err)
}
wg.Done()
cancel()
}(runner)
}
wg.Wait()
// Here is our change, we check if we gathered an error
if len(errors) != 0 {
// We cast our array to the new type
return Errors(errors)
} else {
// Otherwise we just return nil, as no error occured.
return nil
}
}
Better API
We can improve the API by providing a way to deal with runners without using
complex context usage. Basically, our API allows you to run stuff in a blocking
way, and then to teardown resources. Let’s create a new interface with those
attributes, and a function in case someone does not need a struct
.
import (
"context"
)
// Make a synchronous function asynchronous using a channel and a goroutine.
// We will use this in our next function.
func Async(cb func() error) <-chan error {
out := make(chan error)
go func() { out <- cb() }()
return out
}
// We use here two function to create a runner. Those functions do not rely on
// any context logic, and are both blocking.
func TeardownRunnerFunc(run func() error, teardown func() error) Runner {
return RunnerFunc(func(ctx context.Context) error {
// We make the run function asynchronous and we wait both on return and context
select {
case err := <-Async(run):
return err
case <-ctx.Done():
return teardown()
}
})
}
And here the interface with a function to cast it to a runner:
// Define an interface with the two function needed.
type TeardownRunner interface {
Run() error
Teardown() error
}
// Define a function to use our new interface as a runner.
func RunnerWithTeardown(tr TeardownRunner) Runner {
return TeardownRunnerFunc(tr.Run, tr.Teardown)
}
That’s it, I tried to put the code in the simpler way I could, and brings some
way to extend it. Do not hesitate to checkout the
repo
in order to see it complete with tests.