golang使用redis queue来实现一个发布者插入消息,另一个消费者一旦有数据就读取的程序

在 Go 语言中使用 Redis 作为消息队列来实现发布者-订阅者模式,可以利用 Redis 的 LPUSHBRPOP 命令。LPUSH 用于向列表左侧插入元素,而 BRPOP 则是从列表右侧阻塞地弹出一个元素。如果列表为空,则 BRPOP 会等待直到有新的元素被添加到列表中。

下面是一个简单的例子,展示如何用 Go 语言结合 Redis 实现这样的功能。首先确保你的环境中已经安装了 Go 语言环境和 Redis 服务,并且安装了 Go 语言的 Redis 客户端库 go-redis/redis

  1. 使用 go get 命令安装 go-redis/redis 库:
   go get -u github.com/go-redis/redis/v8
  1. 创建发布者(producer)程序:
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379", // Redis服务器地址
    })

    for i := 0; i < 10; i++ {
        msg := fmt.Sprintf("Message %d", i)
        err := rdb.LPush(ctx, "myqueue", msg).Err()
        if err != nil {
            panic(err)
        }
        fmt.Println("Published:", msg)
        time.Sleep(500 * time.Millisecond) // 模拟延时
    }
}
  1. 创建消费者(consumer)程序:
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379", // Redis服务器地址
    })

    for {
        val, err := rdb.BRPop(ctx, 0, "myqueue").Result()
        if err != nil {
            fmt.Println("Error:", err)
            time.Sleep(1 * time.Second) // 如果发生错误,稍后再试
            continue
        }
        fmt.Printf("Consumed: %s -> %s\n", val[0], val[1])
    }
}

在这两个示例中,发布者将一系列的消息推送到名为 myqueue 的 Redis 列表中,而消费者则从这个列表中取出消息。注意这里的 BRPop 方法有一个超时参数设置为 0,意味着如果没有消息的话它会一直阻塞直到有新消息到达。

确保你运行这两个程序之前启动了 Redis 服务器,并且它们能够连接到该服务器。你可以分别在不同的终端窗口运行发布者和消费者程序,以观察消息是如何通过 Redis 队列进行传递的。

BRPOP 命令返回的是一个数组,因为它的设计是为了能够从多个列表中取出元素。当你调用 BRPOP 时,你可以指定多个键(即多个列表),它会阻塞并等待这些列表中的任意一个有数据可取。一旦某个列表中有数据可用,BRPOP 就会移除并返回这个列表的最后一个元素。

返回值是一个包含两个元素的数组:

  1. 第一个元素是被弹出元素所属的列表的键名。
  2. 第二个元素是从该列表中弹出的实际元素值。

这样的设计允许你同时监控多个队列,并且知道是从哪个队列获取的数据。即使你只提供了一个键名给 BRPOP,返回值仍然会以这种形式给出,这是为了保持接口的一致性。

在你的例子中,因为你只提供了单个键 “myqueue” 给 BRPOP,所以返回的数组的第一个元素总是 "myqueue",而第二个元素是你实际关心的消息内容。

例如,如果 myqueue 列表中有元素 "Hello, world!",那么 BRPOP 返回的结果将是 ["myqueue", "Hello, world!"]

如果你确定只会使用一个队列,可以忽略第一个元素,直接处理第二个元素即可。代码示例中就是这么做的:

val, err := rdb.BRPop(ctx, 0, "myqueue").Result()
if err != nil {
    fmt.Println("Error:", err)
    time.Sleep(1 * time.Second) // 如果发生错误,稍后再试
    continue
}
// val[0] 是 "myqueue"
// val[1] 是实际的消息内容
fmt.Printf("Consumed: %s -> %s\n", val[0], val[1])

这里 val[1] 就是你想要消费的消息内容。

Was this helpful?

0 / 0

发表回复 0