You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

87 lines
1.7 KiB

4 years ago
package mq
import (
"fmt"
"log"
"recook/configs"
"time"
4 years ago
"github.com/streadway/amqp"
4 years ago
)
type rabbitMq struct {
*amqp.Connection
}
var (
4 years ago
Conn *rabbitMq
4 years ago
Conn1 *rabbitMq
4 years ago
)
func SetUpMq() {
uri := fmt.Sprintf("amqp://%s:%s@%s:%s",
configs.ConfigJCookMqUser,
configs.ConfigJCookMqPwd,
configs.ConfigJCookMqHost,
configs.ConfigJCookMqPort,
)
conn, err := amqp.Dial(uri)
if err != nil {
log.Println(err)
}
Conn = &rabbitMq{conn}
4 years ago
}
4 years ago
4 years ago
func SetUpShaMa() {
4 years ago
uri1 := fmt.Sprintf("amqp://%s:%s@%s:%s",
configs.ConfigShaMaMqUser,
configs.ConfigShaMaMqPwd,
configs.ConfigShaMaMqHost,
configs.ConfigShaMaMqPort,
)
conn1, err := amqp.Dial(uri1)
if err != nil {
log.Println(err)
}
Conn1 = &rabbitMq{conn1}
4 years ago
}
func (o rabbitMq) ListenRetry(queue string, count int, handler func([]byte) error) {
log.Printf("start listening ... %s\n", queue)
4 years ago
ch, err := o.Channel()
4 years ago
if err != nil {
log.Printf("%s:%s\n", queue, err.Error())
return
}
defer ch.Close()
closeChan := make(chan *amqp.Error, 1)
notifyClose := ch.NotifyClose(closeChan)
d, err := ch.Consume(queue, queue, false, false, false, false, amqp.Table{})
if err != nil {
log.Printf("%s:%s\n", queue, err.Error())
return
}
4 years ago
ch.Qos(30, 0, true)
4 years ago
for {
select {
case e := <-notifyClose:
4 years ago
time.Sleep(5 * time.Second)
count += 1
log.Printf("chan通道错误, e: %s. 还有第%d次重试\n", e.Error(), count)
4 years ago
close(closeChan)
time.Sleep(10 * time.Second)
4 years ago
go o.ListenRetry(queue, count, handler)
4 years ago
return
4 years ago
4 years ago
case v := <-d:
if err = handler(v.Body); err != nil {
log.Printf("%s:%s\n", queue, err.Error())
_ = v.Nack(false, true)
} else {
_ = v.Ack(false)
}
//time.Sleep(time.Second)
}
}
}