|
|
|
package cron
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"git.oa00.com/go/rabbitmq"
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
"log"
|
|
|
|
"recook/internal/v2/lib/supply"
|
|
|
|
"recook/internal/v2/logic/third"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
type skuMessage struct {
|
|
|
|
SkuId uint `json:"skuId"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// Parse @Title 解析数据
|
|
|
|
func (s *skuMessage) Parse(data []byte) {
|
|
|
|
json.Unmarshal(data, s)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Consume @Title 消费队列
|
|
|
|
func (s skuMessage) Consume(queueName string, autoAck bool, prefetchCount int, callback func(skuMessage, amqp.Delivery)) error {
|
|
|
|
return rabbitmq.Rabbitmq.Consume(queueName, autoAck, prefetchCount, func(delivery amqp.Delivery) {
|
|
|
|
defer func() {
|
|
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
|
|
log.Println("消息队列调用错误,err:", recoverErr)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
var skuMessage skuMessage
|
|
|
|
skuMessage.Parse(delivery.Body)
|
|
|
|
callback(skuMessage, delivery)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
var SupplyTask = &supplyTask{}
|
|
|
|
|
|
|
|
type supplyTask struct {
|
|
|
|
}
|
|
|
|
|
|
|
|
// @Title 启动任务
|
|
|
|
func (s *supplyTask) run() {
|
|
|
|
// 价格更新任务
|
|
|
|
go s.price()
|
|
|
|
// 商品信息更新任务
|
|
|
|
go s.data()
|
|
|
|
// 拆单
|
|
|
|
go s.orderSplit()
|
|
|
|
}
|
|
|
|
|
|
|
|
// @Title 价格更新通知
|
|
|
|
func (s *supplyTask) price() {
|
|
|
|
defer func() {
|
|
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
|
|
log.Println("供应链价格变动mq错误", recoverErr)
|
|
|
|
}
|
|
|
|
time.Sleep(time.Second * 3)
|
|
|
|
s.price()
|
|
|
|
}()
|
|
|
|
pool, _ := ants.NewPool(5)
|
|
|
|
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuPriceChangeQueue(), false, 5, func(message skuMessage, delivery amqp.Delivery) {
|
|
|
|
pool.Submit(func() {
|
|
|
|
defer delivery.Ack(false)
|
|
|
|
if err := third.SupplyLogic.SyncPrice([]uint{message.SkuId}); err != nil {
|
|
|
|
log.Println("价格更新失败,skuId:", message.SkuId, ",err:", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})); err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// @Title 商品信息更新通知
|
|
|
|
func (s *supplyTask) data() {
|
|
|
|
defer func() {
|
|
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
|
|
log.Println("供应链商品变动mq错误", recoverErr)
|
|
|
|
}
|
|
|
|
time.Sleep(time.Second * 3)
|
|
|
|
s.data()
|
|
|
|
}()
|
|
|
|
pool, _ := ants.NewPool(5)
|
|
|
|
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuChangeQueue(), false, 5, func(message skuMessage, delivery amqp.Delivery) {
|
|
|
|
pool.Submit(func() {
|
|
|
|
defer delivery.Ack(false)
|
|
|
|
if err := third.SupplyLogic.SyncData([]uint{message.SkuId}); err != nil {
|
|
|
|
log.Println("商品信息更新失败,skuId:", message.SkuId, ",err:", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})); err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// @Title 拆单通知
|
|
|
|
func (s *supplyTask) orderSplit() {
|
|
|
|
defer func() {
|
|
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
|
|
log.Println("供应链拆单mq错误", recoverErr)
|
|
|
|
}
|
|
|
|
time.Sleep(time.Second * 3)
|
|
|
|
s.orderSplit()
|
|
|
|
}()
|
|
|
|
pool, _ := ants.NewPool(5)
|
|
|
|
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderSplit(), false, 5, func(delivery amqp.Delivery) {
|
|
|
|
pool.Submit(func() {
|
|
|
|
defer func() {
|
|
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
|
|
log.Println("消息队列调用错误,err:", recoverErr)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
var orderSplit supply.OrderSplit
|
|
|
|
json.Unmarshal(delivery.Body, &orderSplit)
|
|
|
|
|
|
|
|
if err := third.SupplyLogic.OrderSplit(orderSplit); err != nil {
|
|
|
|
log.Println("拆单处理失败,skuId:", string(delivery.Body), ",err:", err)
|
|
|
|
delivery.Reject(true)
|
|
|
|
return
|
|
|
|
} else {
|
|
|
|
defer delivery.Ack(false)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|