Golang 并发 队列模式

并发队列模式允许一组 Goroutines 计算多个不同的任务。实现它有几种方法。其中一些方法甚至不需要使用队列数据结构。

本文介绍了如何在 Golang 中编写一个线程安全的队列并使用它来并发计算不同的任务。

让我们首先说明什么是队列。队列是一个可以插入或从队列中移除对象的集合。这是通过使用先进先出的方法完成的:

图片

每次我们把东西放入队列时,都会在队列的末尾添加一个新的元素(入队)。另一方面,每次我们移除一个元素时,我们移除的是最早放入队列的那个元素(出队)。请注意,我说的“元素”而不是像图中的“任务”,队列是非常通用的。我们可以在其中存储我们想要的任何东西,重要的是它在插入和移除这些元素时是高效的,并且具有O(1)的时间复杂度。

在我们的案例中,我们将使用队列来存储“任务”。任务可以是任何进程、线程或程序应该做的事情。可以是计算平方根、发起和处理一个HTTP请求、在数据库中搜索和编辑某些东西,等等。我们可以使用任何数据结构来表示一个任务。

现在我们来看一个具体的问题,即如何使用并发计算1到10之间所有数字的n*(n+1)/2的值。如果这是情况,任务可以简单地定义为:

type Task int

这里的整数n是我们需要计算结果值的输入,即,“任务”只是输入值,点。现在我们可以定义我们的队列来存储我们需要执行的所有任务:


type Queue struct {
  tasks []Task
}

func (queue *Queue) Dequeue() Task {
  // -1 will tell us that the queue is empty when we try to dequeue it 
  if len(queue.tasks) == 0 {
   return -1
  }

  item := queue.tasks[0]
  queue.tasks = queue.tasks[1:]
  return item
}

func (queue *Queue) Enqueue(task Task) {
  queue.tasks = append(queue.tasks, task)
}

我们来这样使用它:

queue := Queue{}
queue.Enqueue(2)
val := queue.Dequeue() // val == 2

现在我们需要启动多个 Goroutines 来并发地解决这些任务。我预计,这里的最大问题将是如何处理并发的入队和出队操作。这意味着,如果两个或更多的 Goroutines 同时尝试出队,那么它们中的任何一个都应该得到相同的任务作为结果。数据应该是完全一致的。

我们可以这样启动 Goroutines。它们将只是一直在循环,等待队列中有值:


func main() {
  queue := Queue{}

  // we launch 3 goroutines to be ready to perform or computations
  workers := 3
  for i := 0; i < workers; i++ {
    go func(workerId int) {
      for {
        task := queue.Dequeue()
        if task < 0 {
          break
        }
        result := int(task * (task + 1) / 2) // n*(n+1)/2
      }
    }(i)
  }
}

希望我们用一些大的任务填充队列,以便让 Goroutines 高效地处理这些“密集型”任务。让我们使用 “main”Goroutine 向队列中入队一些任务。

func main() {
  // launch goroutines
  // ...

  // enqueue tasks from the main goroutine
  for i := 0; i < 10; i++ {
    queue.Enqueue(Task(i))
  }
}

现在我们需要向队列添加一些同步机制,以便在多个 Goroutines 读取和修改队列的并发执行中线程安全。为了解决这个问题,我们将向 Queue 结构体添加一个来自 sync Go 包的 Mutex。这将允许 Goroutines 调用 Lock 和 Unlock 方法,以便在没有竞态条件的情况下并发访问队列。


type Queue struct {
  tasks []Task
  mu sync.Mutex
}

func (queue *Queue) Dequeue() Task {
  queue.mu.Lock()
  defer queue.mu.Unlock()
  if len(queue.tasks) == 0 {
   return -1
  }

  item := queue.tasks[0]
  queue.tasks = queue.tasks[1:]
  return item
}

func (queue *Queue) Enqueue(task Task) {
  queue.mu.Lock()
  defer queue.mu.Unlock()
  queue.tasks = append(queue.tasks, task)
}

锁定队列结构体。这意味着一旦一个 Goroutine 调用了这个函数,其他 Goroutines 将会等待,直到Unlock()函数被调用。这由第一个 Goroutine 在函数调用的末尾执行,注意关键字 defer,它表示“在函数作用域结束时执行”。

现在我们需要额外地在一个位置存储结果,为了更好地说明正在发生的事情以及哪个 Goroutine 计算了每个输出,我们可以创建这个结构体来存储结果:

type Result struct {
  workerId int // who computed the result
  input Task   // which input did it receive
  output int   // which output did it compute
}

为了获取结果并打印,我们将使用 Go 推荐的方法是:通道(channels)。我们首先定义一个通道,然后每个 Goroutine 将结果写入该通道。同样地,在主函数中,我们创建了一个带有“select”语句的无限循环,以便在结果准备好时立即从通道中捕获结果。我们在3秒后停止执行。


func main() {
  fmt.Println("Launching goroutines")

  queue := Queue{}
  results := make(chan Result)

  workers := 3
  for i := 0; i < workers; i++ {
    go func(workerId int) {
      for {
        task := queue.Dequeue()
          if task < 0 {
            break
          }
          result := int(task * (task + 1) / 2) // n*(n+1)/2
          results <- result // the goroutine write the result in the channel
       }
     }(i)
  }

  for i := 0; i < 10; i++ {
    queue.Enqueue(Task(i))
  }

  for {
    select {
      case result := <-results: // the main goroutine reads the channel
        fmt.Printf("Result worker: %d input: %d output: %d\n", result.workerId, result.input, result.output)
      case <- time.After(time.Duration(3 * time.Second)):
        fmt.Println("Finishing execution")
        return
    }
  }
}

上面程序的执行结果是:

$ go run main.go 
Launching goroutines
Result worker: 1 input: 2 output: 3
Result worker: 2 input: 0 output: 0
Result worker: 0 input: 1 output: 1
Result worker: 1 input: 3 output: 6
Result worker: 2 input: 5 output: 15
Result worker: 0 input: 4 output: 10
Result worker: 0 input: 8 output: 36
Result worker: 2 input: 7 output: 28
Result worker: 1 input: 6 output: 21
Result worker: 0 input: 9 output: 45
Finishing execution

Was this helpful?

0 / 0

发表回复 0