package mq import ( "fmt" "log" "recook/configs" "runtime" "time" "github.com/streadway/amqp" ) type rabbitMq struct { *amqp.Connection } var ( Conn *rabbitMq Conn1 *rabbitMq ) func SetUpMq() { uri := fmt.Sprintf("amqp://%s:%s@%s:%s", configs.ConfigJCookMqUser, configs.ConfigJCookMqPwd, configs.ConfigJCookMqHost, configs.ConfigJCookMqPort, ) conn, err := amqp.Dial(uri) if err != nil { log.Println(err) } Conn = &rabbitMq{conn} } func SetUpShaMa() { uri1 := fmt.Sprintf("amqp://%s:%s@%s:%s", configs.ConfigShaMaMqUser, configs.ConfigShaMaMqPwd, configs.ConfigShaMaMqHost, configs.ConfigShaMaMqPort, ) conn1, err := amqp.Dial(uri1) if err != nil { log.Println(err) } Conn1 = &rabbitMq{conn1} } func (o rabbitMq) ListenRetry(queue string, count int, handler func([]byte) error) { log.Printf("start listening ... %s\n", queue) ch, err := o.Channel() if err != nil { log.Printf("%s:%s\n", queue, err.Error()) return } defer func() { if err := recover(); err != nil { fmt.Println(err) switch err.(type) { case runtime.Error, string: str := fmt.Sprintf("recovered (runtime.Error) panic:%s", err) fmt.Println(str) } } }() defer ch.Close() closeChan := make(chan *amqp.Error, 1) notifyClose := ch.NotifyClose(closeChan) d, err := ch.Consume(queue, queue, false, false, false, false, amqp.Table{}) if err != nil { log.Printf("%s:%s\n", queue, err.Error()) return } ch.Qos(30, 0, true) for { select { case e := <-notifyClose: time.Sleep(5 * time.Second) count += 1 log.Printf("chan通道错误, e: %s. 还有第%d次重试\n", e.Error(), count) close(closeChan) time.Sleep(10 * time.Second) go o.ListenRetry(queue, count, handler) return case v := <-d: if err = handler(v.Body); err != nil { log.Printf("%s:%s\n", queue, err.Error()) _ = v.Nack(false, true) } else { _ = v.Ack(false) } //time.Sleep(time.Second) } } }