Go 常见并发模式实现(二):通过缓冲通道实现共享资源池
今天这篇教程我们继续演示常见并发模式的 Go 语言实现 —— 通过缓冲通道(channel)实现共享资源池。
注:如果你不了解什么是通道和缓冲通道,参考这篇教程。
该资源池可用于管理任意数量的协程(goroutine)之间共享的资源(比如数据库连接),如果某个协程需要从资源池获取资源(比如从数据库连接池获取数据库连接),可以从共享资源池申请(如果没有的话需要新建),并且在使用完成后将其归还到共享资源池。
遵循这个思路,我们来编写对应的实现代码。
创建一个 pool
包,在其中新建一个 pool.go
文件,基于 Go 语言编写共享资源池实现代码如下:
package pool
import (
"errors"
"io"
"log"
"sync"
)
// 定义资源池结构体
type Pool struct {
// 通过锁机制确保资源池的并发安全
m sync.Mutex
// 通过缓冲通道管理资源池,资源池大小即缓冲值
resources chan io.Closer
// 在资源池中注册新的资源
factory func() (io.Closer, error)
// 标识资源池是否关闭
closed bool
}
var ErrPoolClosed = errors.New("资源池已关闭")
// 初始化资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("资源池容量需要大于0")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// 从资源池申请资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
// 资源池不为空则从中获取资源
log.Println("Acquire:", "共享资源")
if !ok {
return nil, ErrPoolClosed
}
return r, nil
default:
// 资源池为空则调用 p.factory() 方法注册新资源
log.Println("Acquire:", "新增资源")
return p.factory()
}
}
// 资源使用完成后释放
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
// 资源池已关闭则支持释放资源
if p.closed {
r.Close()
return
}
// 否则将资源归还到资源池
select {
case p.resources <- r:
log.Println("Release:", "In Queue")
default:
log.Println("Release:", "Closing")
r.Close()
}
}
// 关闭资源池
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
// 释放资源池和资源池中所有资源
p.closed = true
close(p.resources)
for r := range p.resources {
r.Close()
}
}
上述代码定义了一个 Pool
结构体,用来创建共享资源池,每个资源池都可以管理任意类型的资源,只要对应的资源类型实现了 io.Closer
接口即可。其中 factory
属性是一个函数类型,需要调用方定义并传入,用于定义如何注册新资源到资源池。另外,资源池通常有容量(资源池可容纳的资源数量),这个容量也需要调用方初始化资源池时传入(我们可以通过 New
方法看到这一点),由于资源池 resources
是通道类型,因此通道的缓冲值大小即资源池容量。最后,我们还在 Pool
中定义了一个 sync.Mutex 锁,用于在对资源池进行操作时保证并发安全(同时可能有多个协程对资源池进行操作)。
接下来,在 Pool
中定义了初始化资源池、从资源池获取资源、释放资源以及关闭资源池四个方法,具体细节已经通过注释进行说明了,这里不再一一阐释。
至此,我们已经完成了通过缓冲通道实现共享资源池的代码编写,可以编写一段业务代码 db_pool.go
对其进行调用:
package main
import (
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"test/pool"
"time"
)
const (
maxGoroutines = 5
pooledResources = 2
)
type dbConnection struct {
ID int32
}
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
var idCounter int32
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(maxGoroutines)
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
for query := 0; query < maxGoroutines; query++ {
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
wg.Wait()
log.Println("Shutdown Program.")
p.Close()
}
func performQueries(query int, p *pool.Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
// 模拟 SQL 查询
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("QID[%d] CID[%d]n", query, conn.(*dbConnection).ID)
}
在这段调用代码中(主要关注 main
方法),我们演示的是一个数据库连接池,通过 sync.WaitGroup 将最大协程数设置为 5,在初始化共享资源池时,将资源池的大小设置为 2,Pool.factory
属性对应的是这里的创建数据库连接方法 createConnection
,该方法会返回一个数据库连接对象作为资源注册到资源池(数据库连接对象 dbConnection
是一个模拟的伪数据库连接,通过定义 Close
方法实现了 io.Closer
接口)。
接下来,我们通过多个协程(goroutine)并发调用 performQueries
方法执行数据库查询(依然是伪实现),在这个方法中,包含了从资源池申请资源,以及查询完成后将对应资源归还
给资源池的操作。这样一来,我们就可以模拟这篇教程开头设想的场景:多个协程共享资源池中的资源。
执行这段代码,输出结果如下:
由于这里协程数量较少,尚未等到资源被释放回资源池,就已经完成所有资源获取工作,所以所有资源都是通过调用 pool.factory()
对应方法新建的,如果调大协程数量(maxGoroutines
常量值),降低 SQL 查询时间数量级(10ms-100ms),则输出日志里就会出现通过资源池获取共享资源的情况(Acquire: Shared Resource
),感兴趣的同学可以去试试看。
附:本篇教程示例代码结构和上篇教程类似:
--go (项目根目录 ~/Development/go)
|--src
|--test
|--pool
|--pool.go
|--db_pool.go
|--go.mod
(全文完)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 动态在线扩容root根分区大小的方法详解
- centos7使用rpm安装mysql5.7的教程图解
- 关于Linux命令行下的数学运算示例详解
- Apache访问日志的配置与使用
- linux启动和重启nginx方法
- 简单谈谈centos7中配置php
- Linux动态链接库的使用
- Linux下部署springboot项目的方法步骤
- Linux、CentOS下安装zip与unzip指令功能(服务器)
- Linux上查看用户创建日期的几种方法总结
- 详解基于Linux的LVM无缝磁盘水平扩容
- CentOS平台实现搭建rsync远程同步服务器的方法
- CentOS 8设置自动更新的完整步骤
- linux 网络编程 socket选项的实现
- Ubuntu16.04安装Jenkins的方法图文详解