Return 504 on concurenncy limit reached
Instead of queuing new webhooks we return now 503 errors when the queue is full.
This commit is contained in:
parent
fd9bb94c99
commit
09222fa898
|
@ -5,6 +5,7 @@ import (
|
|||
"io/ioutil"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
@ -19,12 +20,12 @@ type ConfigSettings struct {
|
|||
}
|
||||
|
||||
type Hook struct {
|
||||
Name string `yaml:"name"` // name of the hook
|
||||
Route string `yaml:"route"` // http route
|
||||
Command string `yaml:"command"` // Actual command to execute
|
||||
Background bool `yaml:"background"` // Run in background
|
||||
Concurrency int `yaml:"concurrency"` // Number of allowed concurrent runs
|
||||
concurrency chan struct{} // Concurrency guard
|
||||
Name string `yaml:"name"` // name of the hook
|
||||
Route string `yaml:"route"` // http route
|
||||
Command string `yaml:"command"` // Actual command to execute
|
||||
Background bool `yaml:"background"` // Run in background
|
||||
Concurrency int `yaml:"concurrency"` // Number of allowed concurrent runs
|
||||
concurrentRuns int32 // Number of current concurrent runs
|
||||
}
|
||||
|
||||
func (cf *Config) SetDefaults() {
|
||||
|
@ -65,16 +66,22 @@ func (cf *Config) LoadYAML(filename string) error {
|
|||
return cf.Check()
|
||||
}
|
||||
|
||||
func (hook *Hook) Init() {
|
||||
hook.concurrency = make(chan struct{}, hook.Concurrency)
|
||||
// Tries to lock a spot. Returns false, if the max. number of concurrent runs has been reached
|
||||
func (hook *Hook) TryLock() bool {
|
||||
res := int(atomic.AddInt32(&hook.concurrentRuns, 1))
|
||||
if res > hook.Concurrency {
|
||||
atomic.AddInt32(&hook.concurrentRuns, -1)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (hook *Hook) Unlock() {
|
||||
atomic.AddInt32(&hook.concurrentRuns, -1)
|
||||
}
|
||||
|
||||
// Run executes the given command and return it's return code. It also respects the given concurrency number and will block until resources are free
|
||||
func (hook *Hook) Run() error {
|
||||
// Respect concurrency
|
||||
hook.concurrency <- struct{}{}
|
||||
defer func() { <-hook.concurrency }()
|
||||
|
||||
split := strings.Split(hook.Command, " ")
|
||||
args := make([]string, 0)
|
||||
if len(split) > 1 {
|
||||
|
|
|
@ -58,10 +58,12 @@ func main() {
|
|||
|
||||
// Register hooks
|
||||
for i, hook := range cf.Hooks {
|
||||
hook.Init()
|
||||
if hook.Route == "" {
|
||||
fmt.Fprintf(os.Stderr, "Invalid hook %s: No route defined\n", hook.Name)
|
||||
}
|
||||
if hook.Concurrency < 1 {
|
||||
hook.Concurrency = 1
|
||||
}
|
||||
log.Printf("Webhook %d: '%s' [%s] \"%s\"\n", i, hook.Name, hook.Route, hook.Command)
|
||||
http.HandleFunc(hook.Route, createHandler(hook))
|
||||
}
|
||||
|
@ -75,23 +77,35 @@ func main() {
|
|||
// create a http handler function from the given hook
|
||||
func createHandler(hook Hook) Handler {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
log.Printf("GET %s %s", r.RemoteAddr, hook.Name)
|
||||
|
||||
// Check for available slots
|
||||
if hook.TryLock() == false {
|
||||
log.Printf("ERR: \"%s\" max concurrency reached", hook.Name)
|
||||
// 503 - Service Unavailable
|
||||
w.Header().Add("Retry-After", "120") // Suggest to retry after 2 minutes
|
||||
w.WriteHeader(503)
|
||||
fmt.Fprintf(w, "{\"status\":\"fail\",\"reason\":\"max concurrency reached\"}")
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("%s %s", r.RemoteAddr, hook.Name)
|
||||
if hook.Background { // Execute command in background
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprintf(w, "{\"status\":\"ok\"}")
|
||||
go func() {
|
||||
defer hook.Unlock()
|
||||
if err := hook.Run(); err != nil {
|
||||
log.Printf("Hook %s failed: %s", hook.Name, err)
|
||||
log.Printf("Hook \"%s\" failed: %s", hook.Name, err)
|
||||
} else {
|
||||
log.Printf("Hook %s completed", hook.Name)
|
||||
log.Printf("Hook \"%s\" completed", hook.Name)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
defer hook.Unlock()
|
||||
if err := hook.Run(); err != nil {
|
||||
log.Printf("Hook %s failed: %s", hook.Name, err)
|
||||
log.Printf("ERR: \"%s\" exec failure: %s", hook.Name, err)
|
||||
w.WriteHeader(500)
|
||||
fmt.Fprintf(w, "{\"status\":\"fail\"}")
|
||||
fmt.Fprintf(w, "{\"status\":\"fail\",\"reason\":\"program error\"}")
|
||||
} else {
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprintf(w, "{\"status\":\"ok\"}")
|
||||
|
|
|
@ -11,12 +11,12 @@ hooks:
|
|||
route: "/webhooks/1"
|
||||
command: "sleep 5"
|
||||
background: True # Terminate http request immediately
|
||||
concurrency: 3 # At most 3 parallel processes are allowed
|
||||
concurrency: 2 # At most 2 parallel processes are allowed
|
||||
- name: 'hook two'
|
||||
route: "/webhooks/2"
|
||||
command: "sleep 5"
|
||||
concurrency: 2 # At most 2 parallel processes are allowed
|
||||
concurrency: 5 # At most 5 parallel processes are allowed
|
||||
- name: 'hook 3'
|
||||
route: "/webhooks/data/3"
|
||||
command: "/srv/fetch-new-data.sh"
|
||||
concurrency: 1 # No concurrency is allowed. Request blocks until resources are free
|
||||
concurrency: 1 # No concurrency is allowed. Returns
|
||||
|
|
Loading…
Reference in a new issue