You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
1.9 KiB

package mq
import (
"fmt"
"log"
"recook/configs"
"runtime"
"time"
"github.com/streadway/amqp"
)
type rabbitMq struct {
*amqp.Connection
}
var (
Conn *rabbitMq
Conn1 *rabbitMq
)
func SetUpMq() {
uri := fmt.Sprintf("amqp://%s:%s@%s:%s",
configs.ConfigJCookMqUser,
configs.ConfigJCookMqPwd,
configs.ConfigJCookMqHost,
configs.ConfigJCookMqPort,
)
conn, err := amqp.Dial(uri)
if err != nil {
log.Println(err)
}
Conn = &rabbitMq{conn}
}
func SetUpShaMa() {
uri1 := fmt.Sprintf("amqp://%s:%s@%s:%s",
configs.ConfigShaMaMqUser,
configs.ConfigShaMaMqPwd,
configs.ConfigShaMaMqHost,
configs.ConfigShaMaMqPort,
)
conn1, err := amqp.Dial(uri1)
if err != nil {
log.Println(err)
}
Conn1 = &rabbitMq{conn1}
}
func (o rabbitMq) ListenRetry(queue string, count int, handler func([]byte) error) {
log.Printf("start listening ... %s\n", queue)
ch, err := o.Channel()
if err != nil {
log.Printf("%s:%s\n", queue, err.Error())
return
}
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
switch err.(type) {
case runtime.Error, string:
str := fmt.Sprintf("recovered (runtime.Error) panic:%s", err)
fmt.Println(str)
}
}
}()
defer ch.Close()
closeChan := make(chan *amqp.Error, 1)
notifyClose := ch.NotifyClose(closeChan)
d, err := ch.Consume(queue, queue, false, false, false, false, amqp.Table{})
if err != nil {
log.Printf("%s:%s\n", queue, err.Error())
return
}
ch.Qos(30, 0, true)
for {
select {
case e := <-notifyClose:
time.Sleep(5 * time.Second)
count += 1
log.Printf("chan通道错误, e: %s. 还有第%d次重试\n", e.Error(), count)
close(closeChan)
time.Sleep(10 * time.Second)
go o.ListenRetry(queue, count, handler)
return
case v := <-d:
if err = handler(v.Body); err != nil {
log.Printf("%s:%s\n", queue, err.Error())
_ = v.Nack(false, true)
} else {
_ = v.Ack(false)
}
//time.Sleep(time.Second)
}
}
}