stream源码阅读 流计算 vs 装饰设计模式


stream源码阅读 流计算 vs 装饰设计模式

地址: github.com/ghemawat/stream

装饰设计模式

https://github.com/senghoo/golang-design-pattern/tree/master/20_decorator

类似于gin的中间件,如果在一个函数是这样写的: IN(INA(INB(INC(arg))))

stream的实现

很巧妙的实现了上一个结果,下一个处理函数能接收到

/*
AOP设计模式: IN(INA(INB(INC(arg))))

this: IN(f1, f2, f3)

1. 先整个字符串
2. Step1函数在尾部加上strp1
3. 依次类推
*/

package main

import (
	"errors"
	"fmt"
)

type Arg struct {
	In    <-chan string
	Out   chan<- string
	dummy bool // To allow later expansion
}

type Filter interface {
	RunFilter(Arg) error
}

type FilterFunc func(Arg) error

func (f FilterFunc) RunFilter(arg Arg) error { return f(arg) }

func Sequence(filters ...Filter) Filter {
	if len(filters) == 1 {
		return filters[0]
	}
	return FilterFunc(func(arg Arg) error {
		in := arg.In
		for _, f := range filters {
			c := make(chan string, 1000)
			go runFilter(f, Arg{In: in, Out: c})
			in = c
		}

		for s := range in {
			fmt.Println("seq调试: ", s)
			arg.Out <- s
		}
		return errors.New("seq")
	})
}

func ForEach(filter Filter, fn func(s string)) error {
	in := make(chan string)
	close(in)
	out := make(chan string, 1000)
	go runFilter(filter, Arg{In: in, Out: out})
	for s := range out {
		fn(s)
	}

	return errors.New("for each")
}

func runFilter(f Filter, arg Arg) {
	f.RunFilter(arg)
	close(arg.Out)
	for range arg.In { // Discard all unhandled input
	}
}

func Run(filters ...Filter)  {
	ForEach(Sequence(filters...), func(s string) {})
}


// ---------------- 自定义部分-------------------

type InitString struct{}

func (InitString) RunFilter(arg Arg) error {
	arg.Out <- "init string"
	return nil
}


func Step1() Filter {
	return FilterFunc(func(arg Arg) error {
		for s := range arg.In {
			arg.Out <- s + "step1"
		}
		return nil
	})
}

func Step2() Filter {
	return FilterFunc(func(arg Arg) error {
		for s := range arg.In {
			arg.Out <- s + "step2"
		}
		return nil
	})
}

func Console() Filter {
	return FilterFunc(func(arg Arg) error {
		for s := range arg.In {
			fmt.Println(s)
		}
		return nil
	})
}


func main() {
	Run(InitString{}, Step1(), Step2(), Console())
}

精简函数

/*
AOP设计模式: IN(INA(INB(INC(arg))))

this: IN(f1, f2, f3)

1. 先整个字符串
2. Step1函数在尾部加上strp1
3. 依次类推
*/

package main

import (
	"errors"
	"fmt"
	"time"
)

type Arg struct {
	In    <-chan string
	Out   chan<- string
	dummy bool // To allow later expansion
}

type Filter interface {
	RunFilter(Arg) error
}

type FilterFunc func(Arg) error

func (f FilterFunc) RunFilter(arg Arg) error { return f(arg) }

// Sequence 把前一个管道的 out 和 这个管道的 In 连接起来
func Sequence(filters ...Filter) Filter {
	fmt.Println("seq")
	if len(filters) == 1 {
		return filters[0]
	}

	return FilterFunc(func(arg Arg) error {
		in := arg.In
		for _, f := range filters {
			c := make(chan string, 1000)
			go runFilter(f, Arg{In: in, Out: c})
			in = c 
		}

		return errors.New("seq")
	})
}

func ForEach(filter Filter, fn func(s string)) error {
	fmt.Println("for each")
	out := make(chan string, 1000)
	go runFilter(filter, Arg{In: nil, Out: out})

	return errors.New("for each")
}

func runFilter(f Filter, arg Arg) {
	f.RunFilter(arg)
	close(arg.Out)
	for range arg.In { // Discard all unhandled input
	}
}

func Run(filters ...Filter)  {
	ForEach(Sequence(filters...), func(s string) {})
}


// ---------------- 自定义部分-------------------

type InitString struct{}

func (InitString) RunFilter(arg Arg) error {
	arg.Out <- "init string"
	return nil
}


func Step1() Filter {
	return FilterFunc(func(arg Arg) error {
		for s := range arg.In {
			arg.Out <- s + "step1"
		}
		return nil
	})
}

func Step2() Filter {
	return FilterFunc(func(arg Arg) error {
		for s := range arg.In {
			arg.Out <- s + "step2"
		}
		return nil
	})
}

func Console() Filter {
	return FilterFunc(func(arg Arg) error {
		for s := range arg.In {
			fmt.Println(s)
		}
		return nil
	})
}


func main() {
	Run(InitString{}, Step1(), Step2(), Console())
	time.Sleep(time.Second)
}