package mq import ( "fmt" "log" "recook/configs" "time" "github.com/streadway/amqp" ) type rabbitMq struct { *amqp.Connection } var ( Conn *rabbitMq ) func SetUpMq() { uri := fmt.Sprintf("amqp://%s:%s@%s:%s", configs.ConfigJCookMqUser, configs.ConfigJCookMqPwd, configs.ConfigJCookMqHost, configs.ConfigJCookMqPort, ) conn, err := amqp.Dial(uri) if err != nil { log.Println(err) } Conn = &rabbitMq{conn} } func (o rabbitMq) ListenRetry(queue string, count int, handler func([]byte) error) { log.Printf("start listening ... %s\n", queue) ch, err := Conn.Channel() if err != nil { log.Printf("%s:%s\n", queue, err.Error()) return } defer ch.Close() closeChan := make(chan *amqp.Error, 1) notifyClose := ch.NotifyClose(closeChan) d, err := ch.Consume(queue, queue, false, false, false, false, amqp.Table{}) if err != nil { log.Printf("%s:%s\n", queue, err.Error()) return } ch.Qos(30, 0, true) for { select { case e := <-notifyClose: num := count - 1 if num <= 0 { return } log.Printf("chan通道错误, e: %s. 还有%d重试机会\n", e.Error(), count) close(closeChan) time.Sleep(10 * time.Second) go o.ListenRetry(queue, num, handler) return case v := <-d: if err = handler(v.Body); err != nil { log.Printf("%s:%s\n", queue, err.Error()) _ = v.Nack(false, true) } else { _ = v.Ack(false) } //time.Sleep(time.Second) } } }