并发模式
runner
runner 包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用 runner 包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。这个程序可能会作为 cron 作业执行,或者在基于定时任务的云环境(如 iron.io)里执行。
go
// Gabriel Aszalos 协助完成了这个示例
// runner 包管理处理任务的运行和生命周期
package runner
import (
"errors"
"os"
"os/signal"
"time"
)
// Runner 在给定的超时时间内执行一组任务,
// 并且在操作系统发送中断信号时结束这些任务
type Runner struct {
// interrupt 通道报告从操作系统
// 发送的信号
interrupt chan os.Signal
// complete 通道报告处理任务已经完成
complete chan error
// timeout 报告处理任务已经超时
timeout <-chan time.Time
// tasks 持有一组以索引顺序依次执行的
// 函数
tasks []func(int)
}
// ErrTimeout 会在任务执行超时时返回
var ErrTimeout = errors.New("received timeout")
// ErrInterrupt 会在接收到操作系统的事件时返回
var ErrInterrupt = errors.New("received interrupt")
// New 返回一个新的准备使用的 Runner
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
// Add 将一个任务附加到 Runner 上。这个任务是一个
// 接收一个 int 类型的 ID 作为参数的函数
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
// Start 执行所有任务,并监视通道事件
func (r *Runner) Start() error {
// 我们希望接收所有中断信号
signal.Notify(r.interrupt, os.Interrupt)
// 用不同的 goroutine 执行不同的任务
go func() {
r.complete <- r.run()
}()
select {
// 当任务处理完成时发出的信号
case err := <-r.complete:
return err
// 当任务处理程序运行超时时发出的信号
case <-r.timeout:
return ErrTimeout
}
}
// run 执行每一个已注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
// 检测操作系统的中断信号
if r.gotInterrupt() {
return ErrInterrupt
}
// 执行已注册的任务
task(id)
}
return nil
}
// gotInterrupt 验证是否接收到了中断信号
func (r *Runner) gotInterrupt() bool {
select {
// 当中断事件被触发时发出的信号
case <-r.interrupt:
// 停止接收后续的任何信号
signal.Stop(r.interrupt)
return true
// 继续正常运行
default:
return false
}
}
代码中的程序展示了依据调度运行的无人值守的面向任务的程序,及其所使用的并发模式。在设计上,可支持以下终止点:
- 程序可以在分配的时间内完成工作,正常终止;
- 程序没有及时完成工作,“自杀”;
- 接收到操作系统发送的中断事件,程序立刻试图清理状态并停止工作。
pool
pool这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非 常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
go
// Fatih Arslan 和 Gabriel Aszalos 协助完成了这个示例
// 包 pool 管理用户定义的一组资源
package pool
import (
"errors"
"log"
"io"
"sync"
)
// Pool 管理一组可以安全地在多个 goroutine 间
// 共享的资源。被管理的资源必须
// 实现 io.Closer 接口
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
// ErrPoolClosed 表示请求(Acquire)了一个
// 已经关闭的池
var ErrPoolClosed = errors.New("Pool has been closed.")
// New 创建一个用来管理资源的池。
// 这个池需要一个可以分配新资源的函数,
// 并规定池的大小
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Size value too small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// Acquire 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 检查是否有空闲的资源
case r, ok := <-p.resources:
log.Println("Acquire:", "Shared Resource")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
// 因为没有空闲资源可用,所以提供一个新资源
default:
log.Println("Acquire:", "New Resource")
return p.factory()
}
}
// Release 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
// 保证本操作和 Close 操作的安全
p.m.Lock()
defer p.m.Unlock()
// 如果池已经被关闭,销毁这个资源
if p.closed {
r.Close()
return
}
select {
// 试图将这个资源放入队列
case p.resources <- r:
log.Println("Release:", "In Queue")
// 如果队列已满,则关闭这个资源
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// Close 会让资源池停止工作,并关闭所有现有的资源
func (p *Pool) Close() {
// 保证本操作与 Release 操作的安全
p.m.Lock()
defer p.m.Unlock()
// 如果 pool 已经被关闭,什么也不做
if p.closed {
return
}
// 将池关闭
p.closed = true
// 在清空通道里的资源之前,将通道关闭
// 如果不这样做,会发生死锁
close(p.resources)
// 关闭资源
for r := range p.resources {
r.Close()
}
}
现在看过了pool的代码,了解了pool是如何工作的,让我们看一下 main.go 代码文件里的测试程 序
go
// 这个示例程序展示如何使用 pool 包
// 来共享一组模拟的数据库连接
package main
import (
"log"
"io"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/goinaction/code/chapter7/patterns/pool"
)
const (
maxGoroutines = 25 // 要使用的 goroutine 的数量
pooledResources = 2 // 池中的资源的数量
)
// dbConnection 模拟要共享的资源
type dbConnection struct {
ID int32
}
// Close 实现了 io.Closer 接口,以便 dbConnection
// 可以被池管理。Close 用来完成任意资源的
// 释放管理
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
// idCounter 用来给每个连接分配一个独一无二的 id
var idCounter int32
// createConnection 是一个工厂函数,
// 当需要一个新连接时,资源池会调用这个函数
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
// main 是所有 Go 程序的入口
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// 创建用来管理连接的池
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
// 使用池里的连接来完成查询
for query := 0; query < maxGoroutines; query++ {
// 每个 goroutine 需要自己复制一份要
// 查询值的副本,不然所有的查询会共享
// 同一个查询变量
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
// 等待 goroutine 结束
wg.Wait()
// 关闭池
log.Println("Shutdown Program.")
p.Close()
}
// performQueries 用来测试连接的资源池
func performQueries(query int, p *pool.Pool) {
// 从池里请求一个连接
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// 将该连接释放回池里
defer p.Release(conn)
// 用等待来模拟查询响应
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}
work
work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行 并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大 小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执行。无缓冲的通道保证两个 goroutine 之间的数据交换。这种使用无缓冲的通道的方法允许使用 者知道什么时候 goroutine 池正在执行工作,而且如果池里的所有 goroutine 都忙,无法接受新的 工作的时候,也能及时通过通道来通知调用者。使用无缓冲的通道不会有工作在队列里丢失或者 卡住,所有工作都会被处理。
go
// Jason Waldrip 协助完成了这个示例
// work 包管理一个 goroutine 池来完成工作
package work
import "sync"
// Worker 必须满足接口类型,
// 才能使用工作池
type Worker interface {
Task()
}
// Pool 提供一个 goroutine 池,这个池可以完成
// 任何已提交的 Worker 任务
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
// New 创建一个新工作池
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
// Run 提交工作到工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
// Shutdown 等待所有 goroutine 停止工作
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
接下来让我们看一下 main.go 源代码文件中的测试程序
go
// 这个示例程序展示如何使用 work 包
// 创建一个 goroutine 池并完成工作
package main
import (
"log"
"sync"
"time"
"github.com/goinaction/code/chapter7/patterns/work"
)
// names 提供了一组用来显示的名字
var names = []string{
"steve",
"bob",
"mary",
"therese",
"jason",
}
// namePrinter 使用特定方式打印名字
type namePrinter struct {
name string
}
// Task 实现 Worker 接口
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
// main 是所有 Go 程序的入口
func main() {
// 使用两个 goroutine 来创建工作池
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
// 迭代 names 切片
for _, name := range names {
// 创建一个 namePrinter 并提供
// 指定的名字
np := namePrinter{
name: name,
}
go func() {
// 将任务提交执行。当 Run 返回时
// 我们就知道任务已经处理完成
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
// 让工作池停止工作,等待所有现有的
// 工作完成
p.Shutdown()
}