You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
195 lines
5.0 KiB
195 lines
5.0 KiB
package cron
|
|
|
|
import (
|
|
"encoding/json"
|
|
"git.oa00.com/go/rabbitmq"
|
|
"github.com/panjf2000/ants/v2"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"log"
|
|
"recook/internal/v2/lib/supply"
|
|
"recook/internal/v2/logic/third"
|
|
"time"
|
|
)
|
|
|
|
type skuMessage struct {
|
|
SkuId uint `json:"skuId"`
|
|
}
|
|
|
|
// Parse @Title 解析数据
|
|
func (s *skuMessage) Parse(data []byte) {
|
|
json.Unmarshal(data, s)
|
|
return
|
|
}
|
|
|
|
// Consume @Title 消费队列
|
|
func (s skuMessage) Consume(queueName string, autoAck bool, prefetchCount int, callback func(skuMessage, amqp.Delivery)) error {
|
|
return rabbitmq.Rabbitmq.Consume(queueName, autoAck, prefetchCount, func(delivery amqp.Delivery) {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("消息队列调用错误,err:", recoverErr)
|
|
}
|
|
}()
|
|
var skuMessage skuMessage
|
|
skuMessage.Parse(delivery.Body)
|
|
callback(skuMessage, delivery)
|
|
})
|
|
}
|
|
|
|
var SupplyTask = &supplyTask{}
|
|
|
|
type supplyTask struct {
|
|
}
|
|
|
|
// @Title 启动任务
|
|
func (s *supplyTask) run() {
|
|
// 价格更新任务
|
|
go s.price()
|
|
// 商品信息更新任务
|
|
go s.data()
|
|
// 拆单
|
|
go s.orderSplit()
|
|
// 订单取消
|
|
go s.orderCancel()
|
|
// 订单出库
|
|
go s.orderStockOut()
|
|
}
|
|
|
|
// @Title 价格更新通知
|
|
func (s *supplyTask) price() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("供应链价格变动mq错误", recoverErr)
|
|
}
|
|
time.Sleep(time.Second * 3)
|
|
s.price()
|
|
}()
|
|
pool, _ := ants.NewPool(5)
|
|
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuPriceChangeQueue(), false, 5, func(message skuMessage, delivery amqp.Delivery) {
|
|
pool.Submit(func() {
|
|
defer delivery.Ack(false)
|
|
if err := third.SupplyLogic.SyncPrice([]uint{message.SkuId}); err != nil {
|
|
log.Println("价格更新失败,skuId:", message.SkuId, ",err:", err)
|
|
return
|
|
}
|
|
})
|
|
})); err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
|
|
// @Title 商品信息更新通知
|
|
func (s *supplyTask) data() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("供应链商品变动mq错误", recoverErr)
|
|
}
|
|
time.Sleep(time.Second * 3)
|
|
s.data()
|
|
}()
|
|
pool, _ := ants.NewPool(5)
|
|
if err := (skuMessage{}.Consume(supply.Api.Mq.SkuChangeQueue(), false, 5, func(message skuMessage, delivery amqp.Delivery) {
|
|
pool.Submit(func() {
|
|
defer delivery.Ack(false)
|
|
if err := third.SupplyLogic.SyncData([]uint{message.SkuId}); err != nil {
|
|
log.Println("商品信息更新失败,skuId:", message.SkuId, ",err:", err)
|
|
return
|
|
}
|
|
})
|
|
})); err != nil {
|
|
log.Println(err)
|
|
}
|
|
}
|
|
|
|
// @Title 拆单通知
|
|
func (s *supplyTask) orderSplit() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("供应链拆单mq错误", recoverErr)
|
|
}
|
|
time.Sleep(time.Second * 3)
|
|
s.orderSplit()
|
|
}()
|
|
pool, _ := ants.NewPool(5)
|
|
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderSplit(), false, 5, func(delivery amqp.Delivery) {
|
|
pool.Submit(func() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("消息队列调用错误,err:", recoverErr)
|
|
}
|
|
}()
|
|
var orderSplit supply.OrderSplit
|
|
json.Unmarshal(delivery.Body, &orderSplit)
|
|
|
|
if err := third.SupplyLogic.OrderSplit(orderSplit); err != nil {
|
|
log.Println("拆单处理失败,skuId:", string(delivery.Body), ",err:", err)
|
|
delivery.Reject(true)
|
|
return
|
|
} else {
|
|
defer delivery.Ack(false)
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
// @Title 取消订单通知
|
|
func (s *supplyTask) orderCancel() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("供应链拆单mq错误", recoverErr)
|
|
}
|
|
time.Sleep(time.Second * 3)
|
|
s.orderCancel()
|
|
}()
|
|
pool, _ := ants.NewPool(5)
|
|
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderCancel(), false, 5, func(delivery amqp.Delivery) {
|
|
pool.Submit(func() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("消息队列调用错误,err:", recoverErr)
|
|
}
|
|
}()
|
|
var orderCancel supply.OrderCancel
|
|
json.Unmarshal(delivery.Body, &orderCancel)
|
|
|
|
if err := third.SupplyLogic.OrderCancel(orderCancel); err != nil {
|
|
log.Println("取消订单处理失败,skuId:", string(delivery.Body), ",err:", err)
|
|
delivery.Reject(true)
|
|
return
|
|
} else {
|
|
defer delivery.Ack(false)
|
|
}
|
|
})
|
|
})
|
|
}
|
|
|
|
// @Title 订单出库通知
|
|
func (s *supplyTask) orderStockOut() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("订单出库mq错误", recoverErr)
|
|
}
|
|
time.Sleep(time.Second * 3)
|
|
s.orderCancel()
|
|
}()
|
|
pool, _ := ants.NewPool(5)
|
|
rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderStockOut(), false, 5, func(delivery amqp.Delivery) {
|
|
pool.Submit(func() {
|
|
defer func() {
|
|
if recoverErr := recover(); recoverErr != nil {
|
|
log.Println("消息队列调用错误,err:", recoverErr)
|
|
}
|
|
}()
|
|
var orderStockOut supply.OrderStockOut
|
|
json.Unmarshal(delivery.Body, &orderStockOut)
|
|
|
|
if err := third.SupplyLogic.OrderStockOut(orderStockOut); err != nil {
|
|
log.Println("订单出库处理失败,skuId:", string(delivery.Body), ",err:", err)
|
|
delivery.Reject(true)
|
|
return
|
|
} else {
|
|
defer delivery.Ack(false)
|
|
}
|
|
})
|
|
})
|
|
}
|