You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
5.9 KiB

package cron
import (
"encoding/json"
"git.oa00.com/go/rabbitmq"
"github.com/panjf2000/ants/v2"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"recook/internal/v2/lib/supply"
"recook/internal/v2/logic/third"
"time"
)
type skuMessage struct {
SkuId uint `json:"skuId"`
}
// Parse @Title 解析数据
func (s *skuMessage) Parse(data []byte) {
json.Unmarshal(data, s)
return
}
// Consume @Title 消费队列
func (s skuMessage) Consume(queueName string, autoAck bool, prefetchCount int, callback func(skuMessage, amqp.Delivery)) error {
return rabbitmq.Rabbitmq.Consume(queueName, autoAck, prefetchCount, func(delivery amqp.Delivery) {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("消息队列调用错误,err:", recoverErr)
}
}()
var skuMessage skuMessage
skuMessage.Parse(delivery.Body)
callback(skuMessage, delivery)
})
}
var SupplyTask = &supplyTask{}
type supplyTask struct {
}
// @Title 启动任务
func (s *supplyTask) run() {
// 价格更新任务
go s.price()
// 商品信息更新任务
go s.data()
// 拆单
go s.orderSplit()
// 订单取消
go s.orderCancel()
// 订单出库
go s.orderStockOut()
// 订单完成
go s.orderFinish()
}
// @Title 价格更新通知
func (s *supplyTask) price() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("供应链价格变动mq错误", recoverErr)
}
time.Sleep(time.Second * 3)
s.price()
}()
pool, _ := ants.NewPool(30)
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuPriceChangeQueue(), false, 30, func(message skuMessage, delivery amqp.Delivery) {
pool.Submit(func() {
defer delivery.Ack(false)
if err := third.SupplyLogic.SyncPrice([]uint{message.SkuId}); err != nil {
log.Println("价格更新失败,skuId:", message.SkuId, ",err:", err)
return
}
})
})); err != nil {
log.Println(err)
}
}
// @Title 商品信息更新通知
func (s *supplyTask) data() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("供应链商品变动mq错误", recoverErr)
}
time.Sleep(time.Second * 3)
s.data()
}()
pool, _ := ants.NewPool(30)
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuChangeQueue(), false, 30, func(message skuMessage, delivery amqp.Delivery) {
pool.Submit(func() {
defer delivery.Ack(false)
if err := third.SupplyLogic.SyncData([]uint{message.SkuId}); err != nil {
log.Println("商品信息更新失败,skuId:", message.SkuId, ",err:", err)
return
}
})
})); err != nil {
log.Println(err)
}
}
// @Title 拆单通知
func (s *supplyTask) orderSplit() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("供应链拆单mq错误", recoverErr)
}
time.Sleep(time.Second * 3)
s.orderSplit()
}()
pool, _ := ants.NewPool(5)
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderSplit(), false, 5, func(delivery amqp.Delivery) {
pool.Submit(func() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("消息队列调用错误,err:", recoverErr)
}
}()
var orderSplit supply.OrderSplit
json.Unmarshal(delivery.Body, &orderSplit)
if err := third.SupplyLogic.OrderSplit(orderSplit); err != nil {
log.Println("拆单处理失败,skuId:", string(delivery.Body), ",err:", err)
delivery.Reject(true)
return
} else {
defer delivery.Ack(false)
}
})
})
}
// @Title 取消订单通知
func (s *supplyTask) orderCancel() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("供应链拆单mq错误", recoverErr)
}
time.Sleep(time.Second * 3)
s.orderCancel()
}()
pool, _ := ants.NewPool(5)
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderCancel(), false, 5, func(delivery amqp.Delivery) {
pool.Submit(func() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("消息队列调用错误,err:", recoverErr)
}
}()
var orderCancel supply.OrderCancel
json.Unmarshal(delivery.Body, &orderCancel)
if err := third.SupplyLogic.OrderCancel(orderCancel); err != nil {
log.Println("取消订单处理失败,skuId:", string(delivery.Body), ",err:", err)
delivery.Reject(true)
return
} else {
defer delivery.Ack(false)
}
})
})
}
// @Title 订单出库通知
func (s *supplyTask) orderStockOut() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("订单出库mq错误", recoverErr)
}
time.Sleep(time.Second * 3)
s.orderCancel()
}()
pool, _ := ants.NewPool(5)
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderStockOut(), false, 5, func(delivery amqp.Delivery) {
pool.Submit(func() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("消息队列调用错误,err:", recoverErr)
}
}()
var orderStockOut supply.OrderStockOut
json.Unmarshal(delivery.Body, &orderStockOut)
if err := third.SupplyLogic.OrderStockOut(orderStockOut); err != nil {
log.Println("订单出库处理失败,skuId:", string(delivery.Body), ",err:", err)
delivery.Reject(true)
return
} else {
defer delivery.Ack(false)
}
})
})
}
// @Title 订单完成通知
func (s *supplyTask) orderFinish() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("订单完成mq错误", recoverErr)
}
time.Sleep(time.Second * 3)
s.orderFinish()
}()
pool, _ := ants.NewPool(5)
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderFinish(), false, 5, func(delivery amqp.Delivery) {
pool.Submit(func() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
log.Println("消息队列调用错误,err:", recoverErr)
}
}()
var orderFinish supply.OrderFinish
json.Unmarshal(delivery.Body, &orderFinish)
if err := third.SupplyLogic.OrderFinish(orderFinish); err != nil {
log.Println("订单出库处理失败,skuId:", string(delivery.Body), ",err:", err)
delivery.Reject(true)
return
} else {
defer delivery.Ack(false)
}
})
})
}