You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.3 KiB

3 years ago
package cron
import (
"encoding/json"
"git.oa00.com/go/rabbitmq"
"github.com/panjf2000/ants/v2"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"recook/internal/v2/lib/supply"
"recook/internal/v2/logic/third"
)
type skuMessage struct {
SkuId uint `json:"skuId"`
}
// Parse @Title 解析数据
func (s *skuMessage) Parse(data []byte) {
json.Unmarshal(data, s)
return
}
// Consume @Title 消费队列
func (s skuMessage) Consume(queueName string, autoAck bool, prefetchCount int, callback func(skuMessage, amqp.Delivery)) error {
return rabbitmq.Rabbitmq.Consume(queueName, autoAck, prefetchCount, func(delivery amqp.Delivery) {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("消息队列调用错误,err:", recoverErr)
}
}()
var skuMessage skuMessage
skuMessage.Parse(delivery.Body)
callback(skuMessage, delivery)
})
}
var SupplyTask = &supplyTask{}
type supplyTask struct {
}
// @Title 启动任务
func (s *supplyTask) run() {
// 价格更新任务
go s.price()
// 商品信息更新任务
go s.data()
}
// @Title 价格更新通知
func (s *supplyTask) price() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("供应链价格变动mq错误", recoverErr)
}
s.price()
}()
pool, _ := ants.NewPool(5)
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuPriceChangeQueue(), false, 5, func(message skuMessage, delivery amqp.Delivery) {
pool.Submit(func() {
defer delivery.Ack(false)
if err := third.SupplyLogic.SyncPrice([]uint{message.SkuId}); err != nil {
log.Println("价格更新失败,skuId:", message.SkuId, ",err:", err)
return
}
})
})); err != nil {
log.Println(err)
}
}
// @Title 商品信息更新通知
func (s *supplyTask) data() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("供应链商品变动mq错误", recoverErr)
}
s.data()
}()
pool, _ := ants.NewPool(5)
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuChangeQueue(), false, 5, func(message skuMessage, delivery amqp.Delivery) {
pool.Submit(func() {
defer delivery.Ack(false)
if err := third.SupplyLogic.SyncData([]uint{message.SkuId}); err != nil {
log.Println("商品信息更新失败,skuId:", message.SkuId, ",err:", err)
return
}
})
})); err != nil {
log.Println(err)
}
}