Add concurrency setting
Allow concurrency limit on webhooks
This commit is contained in:
parent
f9706929d8
commit
fd9bb94c99
|
@ -1,7 +1,10 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
@ -16,20 +19,67 @@ type ConfigSettings struct {
|
|||
}
|
||||
|
||||
type Hook struct {
|
||||
Name string
|
||||
Route string
|
||||
Command string
|
||||
Background bool
|
||||
Name string `yaml:"name"` // name of the hook
|
||||
Route string `yaml:"route"` // http route
|
||||
Command string `yaml:"command"` // Actual command to execute
|
||||
Background bool `yaml:"background"` // Run in background
|
||||
Concurrency int `yaml:"concurrency"` // Number of allowed concurrent runs
|
||||
concurrency chan struct{} // Concurrency guard
|
||||
}
|
||||
|
||||
func (cf *Config) SetDefaults() {
|
||||
cf.Settings.BindAddress = ":2088"
|
||||
}
|
||||
|
||||
// Check performs sanity checks on the config
|
||||
func (cf *Config) Check() error {
|
||||
if cf.Settings.BindAddress == "" {
|
||||
return fmt.Errorf("no bind address configured")
|
||||
}
|
||||
for _, hook := range cf.Hooks {
|
||||
if hook.Name == "" {
|
||||
return fmt.Errorf("hook without name")
|
||||
}
|
||||
if hook.Route == "" {
|
||||
return fmt.Errorf("hook %s with no route", hook.Name)
|
||||
}
|
||||
if hook.Command == "" {
|
||||
return fmt.Errorf("hook %s with no command", hook.Name)
|
||||
}
|
||||
if hook.Concurrency < 1 {
|
||||
hook.Concurrency = 1
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cf *Config) LoadYAML(filename string) error {
|
||||
content, err := ioutil.ReadFile(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return yaml.Unmarshal(content, cf)
|
||||
if err := yaml.Unmarshal(content, cf); err != nil {
|
||||
return err
|
||||
}
|
||||
return cf.Check()
|
||||
}
|
||||
|
||||
func (hook *Hook) Init() {
|
||||
hook.concurrency = make(chan struct{}, hook.Concurrency)
|
||||
}
|
||||
|
||||
// Run executes the given command and return it's return code. It also respects the given concurrency number and will block until resources are free
|
||||
func (hook *Hook) Run() error {
|
||||
// Respect concurrency
|
||||
hook.concurrency <- struct{}{}
|
||||
defer func() { <-hook.concurrency }()
|
||||
|
||||
split := strings.Split(hook.Command, " ")
|
||||
args := make([]string, 0)
|
||||
if len(split) > 1 {
|
||||
args = split[1:]
|
||||
}
|
||||
cmd := exec.Command(split[0], args...)
|
||||
return cmd.Run()
|
||||
}
|
||||
|
|
|
@ -8,8 +8,6 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var cf Config
|
||||
|
@ -60,6 +58,7 @@ func main() {
|
|||
|
||||
// Register hooks
|
||||
for i, hook := range cf.Hooks {
|
||||
hook.Init()
|
||||
if hook.Route == "" {
|
||||
fmt.Fprintf(os.Stderr, "Invalid hook %s: No route defined\n", hook.Name)
|
||||
}
|
||||
|
@ -73,33 +72,23 @@ func main() {
|
|||
log.Fatal(http.ListenAndServe(cf.Settings.BindAddress, nil))
|
||||
}
|
||||
|
||||
// Execute the given command and return it's return code
|
||||
func execute(command string) error {
|
||||
split := strings.Split(command, " ")
|
||||
args := make([]string, 0)
|
||||
if len(split) > 1 {
|
||||
args = split[1:]
|
||||
}
|
||||
cmd := exec.Command(split[0], args...)
|
||||
return cmd.Run()
|
||||
}
|
||||
|
||||
// create a http handler function from the given hook
|
||||
func createHandler(hook Hook) Handler {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
log.Printf("%s %s", r.RemoteAddr, hook.Name)
|
||||
if hook.Background { // Execute command in background
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprintf(w, "{\"status\":\"ok\"}")
|
||||
go func() {
|
||||
if err := execute(hook.Command); err != nil {
|
||||
if err := hook.Run(); err != nil {
|
||||
log.Printf("Hook %s failed: %s", hook.Name, err)
|
||||
} else {
|
||||
log.Printf("Hook %s completed", hook.Name)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
if err := execute(hook.Command); err != nil {
|
||||
if err := hook.Run(); err != nil {
|
||||
log.Printf("Hook %s failed: %s", hook.Name, err)
|
||||
w.WriteHeader(500)
|
||||
fmt.Fprintf(w, "{\"status\":\"fail\"}")
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
## Weblug example config
|
||||
|
||||
settings:
|
||||
bind: ":2088"
|
||||
bind: ":2088" # bind address for webserver
|
||||
|
||||
# hook definition. A hook needs to define the HTTP endpoint ("route") and the
|
||||
# command that will be executed, once this route is executed
|
||||
|
@ -11,9 +11,12 @@ hooks:
|
|||
route: "/webhooks/1"
|
||||
command: "sleep 5"
|
||||
background: True # Terminate http request immediately
|
||||
concurrency: 3 # At most 3 parallel processes are allowed
|
||||
- name: 'hook two'
|
||||
route: "/webhooks/2"
|
||||
command: "sleep 2"
|
||||
command: "sleep 5"
|
||||
concurrency: 2 # At most 2 parallel processes are allowed
|
||||
- name: 'hook 3'
|
||||
route: "/webhooks/data/3"
|
||||
command: "/srv/fetch-new-data.sh"
|
||||
concurrency: 1 # No concurrency is allowed. Request blocks until resources are free
|
||||
|
|
Loading…
Reference in a new issue