Skip to content

并发模式

runner

runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。这个程序可能会作为 cron 作业执行,或者在基于定时任务的云环境(如 iron.io)里执行。

go
 // Gabriel Aszalos 协助完成了这个示例
 // runner 包管理处理任务的运行和生命周期
package runner

 import ( 
 "errors"
 "os"
 "os/signal"
 "time"
 ) 


 // Runner 在给定的超时时间内执行一组任务,
 // 并且在操作系统发送中断信号时结束这些任务
 type Runner struct { 
 // interrupt 通道报告从操作系统
 // 发送的信号
 interrupt chan os.Signal

 // complete 通道报告处理任务已经完成
 complete chan error

 // timeout 报告处理任务已经超时
 timeout <-chan time.Time

 // tasks 持有一组以索引顺序依次执行的
 // 函数
 tasks []func(int)
 } 

 // ErrTimeout 会在任务执行超时时返回
 var ErrTimeout = errors.New("received timeout")

 // ErrInterrupt 会在接收到操作系统的事件时返回
 var ErrInterrupt = errors.New("received interrupt")

 // New 返回一个新的准备使用的 Runner
 func New(d time.Duration) *Runner { 
 return &Runner{
 interrupt: make(chan os.Signal, 1),
 complete: make(chan error),
 timeout: time.After(d),
 } 
 } 

 // Add 将一个任务附加到 Runner 上。这个任务是一个
 // 接收一个 int 类型的 ID 作为参数的函数
 func (r *Runner) Add(tasks ...func(int)) { 
 r.tasks = append(r.tasks, tasks...)
 } 

 // Start 执行所有任务,并监视通道事件
 func (r *Runner) Start() error { 
 // 我们希望接收所有中断信号
 signal.Notify(r.interrupt, os.Interrupt)

 // 用不同的 goroutine 执行不同的任务
 go func() { 
 r.complete <- r.run()
 }()

 select { 
 // 当任务处理完成时发出的信号
 case err := <-r.complete:
 return err

 // 当任务处理程序运行超时时发出的信号
 case <-r.timeout:
 return ErrTimeout
 } 
 } 

 // run 执行每一个已注册的任务
 func (r *Runner) run() error { 
 for id, task := range r.tasks { 
 // 检测操作系统的中断信号
 if r.gotInterrupt() { 
 return ErrInterrupt
 } 

 // 执行已注册的任务
 task(id)
 } 

  return nil
 } 

 // gotInterrupt 验证是否接收到了中断信号
 func (r *Runner) gotInterrupt() bool { 
 select { 
 // 当中断事件被触发时发出的信号
 case <-r.interrupt:
 // 停止接收后续的任何信号
 signal.Stop(r.interrupt)
 return true

 // 继续正常运行
 default:
 return false
 } 
 }

代码中的程序展示了依据调度运行的无人值守的面向任务的程序,及其所使用的并发模式。在设计上,可支持以下终止点:

  • 程序可以在分配的时间内完成工作,正常终止;
  • 程序没有及时完成工作,“自杀”;
  • 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作。

pool

pool这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非 常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。

go
// Fatih Arslan 和 Gabriel Aszalos 协助完成了这个示例
 // 包 pool 管理用户定义的一组资源
 package pool

 import ( 
 "errors"
 "log"
 "io"
 "sync"
 ) 

 // Pool 管理一组可以安全地在多个 goroutine 间
 // 共享的资源。被管理的资源必须
 // 实现 io.Closer 接口
 type Pool struct { 
 m sync.Mutex
 resources chan io.Closer
 factory func() (io.Closer, error)
 closed bool
 } 

 // ErrPoolClosed 表示请求(Acquire)了一个
 // 已经关闭的池
 var ErrPoolClosed = errors.New("Pool has been closed.")

 // New 创建一个用来管理资源的池。
 // 这个池需要一个可以分配新资源的函数, 
 // 并规定池的大小
 func New(fn func() (io.Closer, error), size uint) (*Pool, error) { 
 if size <= 0 { 
 return nil, errors.New("Size value too small.")
 } 

 return &Pool{
 factory: fn,
 resources: make(chan io.Closer, size),
 }, nil
 } 

 // Acquire 从池中获取一个资源
 func (p *Pool) Acquire() (io.Closer, error) { 
 select { 
 // 检查是否有空闲的资源
 case r, ok := <-p.resources:
 log.Println("Acquire:", "Shared Resource")
 if !ok { 
 return nil, ErrPoolClosed
 } 
 return r, nil

 // 因为没有空闲资源可用,所以提供一个新资源
 default:
 log.Println("Acquire:", "New Resource")
 return p.factory()
 } 
 } 

 // Release 将一个使用后的资源放回池里
 func (p *Pool) Release(r io.Closer) { 
 // 保证本操作和 Close 操作的安全
 p.m.Lock()
 defer p.m.Unlock()

 // 如果池已经被关闭,销毁这个资源
 if p.closed { 
 r.Close()
 return
 } 

 select { 
 // 试图将这个资源放入队列
 case p.resources <- r:
 log.Println("Release:", "In Queue")

 // 如果队列已满,则关闭这个资源
 default:
 log.Println("Release:", "Closing")
 r.Close()
 } 
 } 

 // Close 会让资源池停止工作,并关闭所有现有的资源
 func (p *Pool) Close() { 
 // 保证本操作与 Release 操作的安全
 p.m.Lock()
 defer p.m.Unlock()

 // 如果 pool 已经被关闭,什么也不做
 if p.closed { 
 return
 } 

 // 将池关闭
 p.closed = true

 // 在清空通道里的资源之前,将通道关闭
 // 如果不这样做,会发生死锁
 close(p.resources)

 // 关闭资源
 for r := range p.resources { 
 r.Close()
 } 
 }

现在看过了pool的代码,了解了pool是如何工作的,让我们看一下 main.go 代码文件里的测试程 序

go
 // 这个示例程序展示如何使用 pool 包
 // 来共享一组模拟的数据库连接
 package main

 import ( 
 "log"
 "io"
 "math/rand"
 "sync"
 "sync/atomic"
 "time"

 "github.com/goinaction/code/chapter7/patterns/pool"
 ) 

 const ( 
 maxGoroutines = 25 // 要使用的 goroutine 的数量
 pooledResources = 2 // 池中的资源的数量
 ) 

 // dbConnection 模拟要共享的资源
 type dbConnection struct { 
 ID int32
 } 

 // Close 实现了 io.Closer 接口,以便 dbConnection
 // 可以被池管理。Close 用来完成任意资源的
 // 释放管理
 func (dbConn *dbConnection) Close() error { 
 log.Println("Close: Connection", dbConn.ID)
 return nil
 } 

 // idCounter 用来给每个连接分配一个独一无二的 id
 var idCounter int32

 // createConnection 是一个工厂函数, 
 // 当需要一个新连接时,资源池会调用这个函数
 func createConnection() (io.Closer, error) { 
 id := atomic.AddInt32(&idCounter, 1)
 log.Println("Create: New Connection", id)

 return &dbConnection{id}, nil
 } 

 // main 是所有 Go 程序的入口
 func main() { 
 var wg sync.WaitGroup
 wg.Add(maxGoroutines)

 // 创建用来管理连接的池
 p, err := pool.New(createConnection, pooledResources)
 if err != nil { 
 log.Println(err)
 } 

 // 使用池里的连接来完成查询
 for query := 0; query < maxGoroutines; query++ { 
 // 每个 goroutine 需要自己复制一份要
 // 查询值的副本,不然所有的查询会共享
 // 同一个查询变量
 go func(q int) { 
 performQueries(q, p)
 wg.Done()
 }(query)
 } 

 // 等待 goroutine 结束
 wg.Wait()

 // 关闭池
 log.Println("Shutdown Program.")
 p.Close()
 } 

 // performQueries 用来测试连接的资源池
 func performQueries(query int, p *pool.Pool) { 
 // 从池里请求一个连接
 conn, err := p.Acquire()
 if err != nil { 
 log.Println(err)
 return
 } 

 // 将该连接释放回池里
 defer p.Release(conn) 

 // 用等待来模拟查询响应
 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
 log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
 }

work

work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行 并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大 小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执行。无缓冲的通道保证两个 goroutine 之间的数据交换。这种使用无缓冲的通道的方法允许使用 者知道什么时候 goroutine 池正在执行工作,而且如果池里的所有 goroutine 都忙,无法接受新的 工作的时候,也能及时通过通道来通知调用者。使用无缓冲的通道不会有工作在队列里丢失或者 卡住,所有工作都会被处理。

go
 // Jason Waldrip 协助完成了这个示例
 // work 包管理一个 goroutine 池来完成工作
 package work

 import "sync"

 // Worker 必须满足接口类型,
 // 才能使用工作池
 type Worker interface { 
 Task()
 } 

 // Pool 提供一个 goroutine 池,这个池可以完成
 // 任何已提交的 Worker 任务
 type Pool struct { 
 work chan Worker
 wg sync.WaitGroup
 } 

 // New 创建一个新工作池
 func New(maxGoroutines int) *Pool { 
 p := Pool{
 work: make(chan Worker),
 } 

 p.wg.Add(maxGoroutines)
 for i := 0; i < maxGoroutines; i++ { 
 go func() { 
 for w := range p.work { 
 w.Task()
 } 
 p.wg.Done()
 }()
 } 

 return &p
 } 

 // Run 提交工作到工作池
 func (p *Pool) Run(w Worker) { 
 p.work <- w
 } 


 // Shutdown 等待所有 goroutine 停止工作
 func (p *Pool) Shutdown() { 
 close(p.work) 
 p.wg.Wait()
 }

接下来让我们看一下 main.go 源代码文件中的测试程序

go
 // 这个示例程序展示如何使用 work 包
 // 创建一个 goroutine 池并完成工作
 package main

 import ( 
 "log"
 "sync"
 "time"

 "github.com/goinaction/code/chapter7/patterns/work"
 ) 

 // names 提供了一组用来显示的名字
 var names = []string{
 "steve",
 "bob",
 "mary",
 "therese",
 "jason",
 } 

 // namePrinter 使用特定方式打印名字
 type namePrinter struct { 
 name string
 } 

 // Task 实现 Worker 接口
 func (m *namePrinter) Task() { 
 log.Println(m.name)
 time.Sleep(time.Second)
 } 

 // main 是所有 Go 程序的入口
 func main() { 
 // 使用两个 goroutine 来创建工作池
 p := work.New(2)

 var wg sync.WaitGroup
 wg.Add(100 * len(names))

 for i := 0; i < 100; i++ { 
 // 迭代 names 切片
 for _, name := range names { 
 // 创建一个 namePrinter 并提供
 // 指定的名字
 np := namePrinter{
 name: name,
 } 

 go func() { 
 // 将任务提交执行。当 Run 返回时
 // 我们就知道任务已经处理完成
 p.Run(&np)
 wg.Done()
 }()
 } 
 } 

 wg.Wait()

 // 让工作池停止工作,等待所有现有的
 // 工作完成
 p.Shutdown()
 }

Released under the MIT License.