并发队列模式允许一组 Goroutines 计算多个不同的任务。实现它有几种方法。其中一些方法甚至不需要使用队列数据结构。
本文介绍了如何在 Golang 中编写一个线程安全的队列并使用它来并发计算不同的任务。
让我们首先说明什么是队列。队列是一个可以插入或从队列中移除对象的集合。这是通过使用先进先出的方法完成的:
每次我们把东西放入队列时,都会在队列的末尾添加一个新的元素(入队)。另一方面,每次我们移除一个元素时,我们移除的是最早放入队列的那个元素(出队)。请注意,我说的“元素”而不是像图中的“任务”,队列是非常通用的。我们可以在其中存储我们想要的任何东西,重要的是它在插入和移除这些元素时是高效的,并且具有O(1)的时间复杂度。
在我们的案例中,我们将使用队列来存储“任务”。任务可以是任何进程、线程或程序应该做的事情。可以是计算平方根、发起和处理一个HTTP请求、在数据库中搜索和编辑某些东西,等等。我们可以使用任何数据结构来表示一个任务。
现在我们来看一个具体的问题,即如何使用并发计算1到10之间所有数字的n*(n+1)/2的值。如果这是情况,任务可以简单地定义为:
type Task int
这里的整数n是我们需要计算结果值的输入,即,“任务”只是输入值,点。现在我们可以定义我们的队列来存储我们需要执行的所有任务:
type Queue struct {
tasks []Task
}
func (queue *Queue) Dequeue() Task {
// -1 will tell us that the queue is empty when we try to dequeue it
if len(queue.tasks) == 0 {
return -1
}
item := queue.tasks[0]
queue.tasks = queue.tasks[1:]
return item
}
func (queue *Queue) Enqueue(task Task) {
queue.tasks = append(queue.tasks, task)
}
我们来这样使用它:
queue := Queue{}
queue.Enqueue(2)
val := queue.Dequeue() // val == 2
现在我们需要启动多个 Goroutines 来并发地解决这些任务。我预计,这里的最大问题将是如何处理并发的入队和出队操作。这意味着,如果两个或更多的 Goroutines 同时尝试出队,那么它们中的任何一个都应该得到相同的任务作为结果。数据应该是完全一致的。
我们可以这样启动 Goroutines。它们将只是一直在循环,等待队列中有值:
func main() {
queue := Queue{}
// we launch 3 goroutines to be ready to perform or computations
workers := 3
for i := 0; i < workers; i++ {
go func(workerId int) {
for {
task := queue.Dequeue()
if task < 0 {
break
}
result := int(task * (task + 1) / 2) // n*(n+1)/2
}
}(i)
}
}
希望我们用一些大的任务填充队列,以便让 Goroutines 高效地处理这些“密集型”任务。让我们使用 “main”Goroutine 向队列中入队一些任务。
func main() {
// launch goroutines
// ...
// enqueue tasks from the main goroutine
for i := 0; i < 10; i++ {
queue.Enqueue(Task(i))
}
}
现在我们需要向队列添加一些同步机制,以便在多个 Goroutines 读取和修改队列的并发执行中线程安全。为了解决这个问题,我们将向 Queue 结构体添加一个来自 sync Go 包的 Mutex。这将允许 Goroutines 调用 Lock 和 Unlock 方法,以便在没有竞态条件的情况下并发访问队列。
type Queue struct {
tasks []Task
mu sync.Mutex
}
func (queue *Queue) Dequeue() Task {
queue.mu.Lock()
defer queue.mu.Unlock()
if len(queue.tasks) == 0 {
return -1
}
item := queue.tasks[0]
queue.tasks = queue.tasks[1:]
return item
}
func (queue *Queue) Enqueue(task Task) {
queue.mu.Lock()
defer queue.mu.Unlock()
queue.tasks = append(queue.tasks, task)
}
锁定队列结构体。这意味着一旦一个 Goroutine 调用了这个函数,其他 Goroutines 将会等待,直到Unlock()函数被调用。这由第一个 Goroutine 在函数调用的末尾执行,注意关键字 defer,它表示“在函数作用域结束时执行”。
现在我们需要额外地在一个位置存储结果,为了更好地说明正在发生的事情以及哪个 Goroutine 计算了每个输出,我们可以创建这个结构体来存储结果:
type Result struct {
workerId int // who computed the result
input Task // which input did it receive
output int // which output did it compute
}
为了获取结果并打印,我们将使用 Go 推荐的方法是:通道(channels)。我们首先定义一个通道,然后每个 Goroutine 将结果写入该通道。同样地,在主函数中,我们创建了一个带有“select”语句的无限循环,以便在结果准备好时立即从通道中捕获结果。我们在3秒后停止执行。
func main() {
fmt.Println("Launching goroutines")
queue := Queue{}
results := make(chan Result)
workers := 3
for i := 0; i < workers; i++ {
go func(workerId int) {
for {
task := queue.Dequeue()
if task < 0 {
break
}
result := int(task * (task + 1) / 2) // n*(n+1)/2
results <- result // the goroutine write the result in the channel
}
}(i)
}
for i := 0; i < 10; i++ {
queue.Enqueue(Task(i))
}
for {
select {
case result := <-results: // the main goroutine reads the channel
fmt.Printf("Result worker: %d input: %d output: %d\n", result.workerId, result.input, result.output)
case <- time.After(time.Duration(3 * time.Second)):
fmt.Println("Finishing execution")
return
}
}
}
上面程序的执行结果是:
$ go run main.go
Launching goroutines
Result worker: 1 input: 2 output: 3
Result worker: 2 input: 0 output: 0
Result worker: 0 input: 1 output: 1
Result worker: 1 input: 3 output: 6
Result worker: 2 input: 5 output: 15
Result worker: 0 input: 4 output: 10
Result worker: 0 input: 8 output: 36
Result worker: 2 input: 7 output: 28
Result worker: 1 input: 6 output: 21
Result worker: 0 input: 9 output: 45
Finishing execution
Was this helpful?
0 / 0