From 628efd6e293d27984a3f5ba33522f8edd19d69d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B8rn=20Erik=20Pedersen?= Date: Thu, 21 Nov 2019 18:38:14 +0100 Subject: [PATCH] common/para: Add parallel task executor helper Usage of this will come later. --- common/para/para.go | 73 ++++++++++++++++++++++++++++++++++ common/para/para_test.go | 85 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 common/para/para.go create mode 100644 common/para/para_test.go diff --git a/common/para/para.go b/common/para/para.go new file mode 100644 index 000000000..319bdb78f --- /dev/null +++ b/common/para/para.go @@ -0,0 +1,73 @@ +// Copyright 2019 The Hugo Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package para implements parallel execution helpers. +package para + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +// Workers configures a task executor with the most number of tasks to be executed in parallel. +type Workers struct { + sem chan struct{} +} + +// Runner wraps the lifecycle methods of a new task set. +// +// Run wil block until a worker is available or the context is cancelled, +// and then run the given func in a new goroutine. +// Wait will wait for all the running goroutines to finish. +type Runner interface { + Run(func() error) + Wait() error +} + +type errGroupRunner struct { + *errgroup.Group + w *Workers + ctx context.Context +} + +func (g *errGroupRunner) Run(fn func() error) { + select { + case g.w.sem <- struct{}{}: + case <-g.ctx.Done(): + return + } + + g.Go(func() error { + err := fn() + <-g.w.sem + return err + }) +} + +// New creates a new Workers with the given number of workers. +func New(numWorkers int) *Workers { + return &Workers{ + sem: make(chan struct{}, numWorkers), + } +} + +// Start starts a new Runner. +func (w *Workers) Start(ctx context.Context) (Runner, context.Context) { + g, ctx := errgroup.WithContext(ctx) + return &errGroupRunner{ + Group: g, + ctx: ctx, + w: w, + }, ctx +} diff --git a/common/para/para_test.go b/common/para/para_test.go new file mode 100644 index 000000000..9f33a234c --- /dev/null +++ b/common/para/para_test.go @@ -0,0 +1,85 @@ +// Copyright 2019 The Hugo Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package para + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + qt "github.com/frankban/quicktest" +) + +func TestPara(t *testing.T) { + + c := qt.New(t) + + c.Run("Order", func(c *qt.C) { + n := 500 + ints := make([]int, n) + for i := 0; i < n; i++ { + ints[i] = i + } + + p := New(4) + r, _ := p.Start(context.Background()) + + var result []int + var mu sync.Mutex + for i := 0; i < n; i++ { + i := i + r.Run(func() error { + mu.Lock() + defer mu.Unlock() + result = append(result, i) + return nil + }) + } + + c.Assert(r.Wait(), qt.IsNil) + c.Assert(result, qt.HasLen, len(ints)) + c.Assert(sort.IntsAreSorted(result), qt.Equals, false, qt.Commentf("Para does not seem to be parallel")) + sort.Ints(result) + c.Assert(result, qt.DeepEquals, ints) + + }) + + c.Run("Time", func(c *qt.C) { + const n = 100 + + p := New(5) + r, _ := p.Start(context.Background()) + + start := time.Now() + + var counter int64 + + for i := 0; i < n; i++ { + r.Run(func() error { + atomic.AddInt64(&counter, 1) + time.Sleep(1 * time.Millisecond) + return nil + }) + } + + c.Assert(r.Wait(), qt.IsNil) + c.Assert(counter, qt.Equals, int64(n)) + c.Assert(time.Since(start) < n/2*time.Millisecond, qt.Equals, true) + + }) + +}