Browse Source

pipeline example late commit, need to add tests

Aaron Torres 2 years ago
parent
commit
19cb706fab

+ 21 - 0
chapter9/pipeline/encode.go

@@ -0,0 +1,21 @@
+package pipeline
+
+import (
+	"context"
+	"encoding/base64"
+	"fmt"
+)
+
+// Encode takes plain text as int
+// and returns "string => <base64 string encoding>
+// as out
+func (w *Worker) Encode(ctx context.Context) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case val := <-w.in:
+			w.out <- fmt.Sprintf("%s => %s", val, base64.StdEncoding.EncodeToString([]byte(val)))
+		}
+	}
+}

+ 26 - 0
chapter9/pipeline/example/main.go

@@ -0,0 +1,26 @@
+package main
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/agtorre/go-cookbook/chapter9/pipeline"
+)
+
+func main() {
+	ctx := context.Background()
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	in, out := pipeline.NewPipeline(ctx, 10, 2)
+
+	go func() {
+		for i := 0; i < 20; i++ {
+			in <- fmt.Sprint("Message", i)
+		}
+	}()
+
+	for i := 0; i < 20; i++ {
+		<-out
+	}
+}

+ 28 - 0
chapter9/pipeline/pipeline.go

@@ -0,0 +1,28 @@
+package pipeline
+
+import "context"
+
+// NewPipeline initializes the workers and
+// connects them, it returns the input of the pipeline
+// and the final output
+func NewPipeline(ctx context.Context, numEncoders, numPrinters int) (chan string, chan string) {
+	inEncode := make(chan string, numEncoders)
+	inPrint := make(chan string, numPrinters)
+	outPrint := make(chan string, numPrinters)
+	for i := 0; i < numEncoders; i++ {
+		w := Worker{
+			in:  inEncode,
+			out: inPrint,
+		}
+		go w.Work(ctx, Encode)
+	}
+
+	for i := 0; i < numPrinters; i++ {
+		w := Worker{
+			in:  inPrint,
+			out: outPrint,
+		}
+		go w.Work(ctx, Print)
+	}
+	return inEncode, outPrint
+}

+ 20 - 0
chapter9/pipeline/print.go

@@ -0,0 +1,20 @@
+package pipeline
+
+import (
+	"context"
+	"fmt"
+)
+
+// Print prints w.in and repalys it
+// on w.out
+func (w *Worker) Print(ctx context.Context) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case val := <-w.in:
+			fmt.Println(val)
+			w.out <- val
+		}
+	}
+}

+ 35 - 0
chapter9/pipeline/worker.go

@@ -0,0 +1,35 @@
+package pipeline
+
+import "context"
+
+// Worker have one role
+// that is determined when
+// Work is called
+type Worker struct {
+	in  chan string
+	out chan string
+}
+
+// Job is a job a worker can do
+type Job string
+
+const (
+	// Print echo's all input to
+	// stdout
+	Print Job = "print"
+	// Encode base64 encodes input
+	Encode Job = "encode"
+)
+
+// Work is how to dispatch a worker, they are assigned
+// a job here
+func (w *Worker) Work(ctx context.Context, j Job) {
+	switch j {
+	case Print:
+		w.Print(ctx)
+	case Encode:
+		w.Encode(ctx)
+	default:
+		return
+	}
+}