package cron import ( "encoding/json" "git.oa00.com/go/rabbitmq" "github.com/panjf2000/ants/v2" amqp "github.com/rabbitmq/amqp091-go" "log" "recook/internal/v2/lib/supply" "recook/internal/v2/logic/third" "time" ) type skuMessage struct { SkuId uint `json:"skuId"` } // Parse @Title 解析数据 func (s *skuMessage) Parse(data []byte) { json.Unmarshal(data, s) return } // Consume @Title 消费队列 func (s skuMessage) Consume(queueName string, autoAck bool, prefetchCount int, callback func(skuMessage, amqp.Delivery)) error { return rabbitmq.Rabbitmq.Consume(queueName, autoAck, prefetchCount, func(delivery amqp.Delivery) { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("消息队列调用错误,err:", recoverErr) } }() var skuMessage skuMessage skuMessage.Parse(delivery.Body) callback(skuMessage, delivery) }) } var SupplyTask = &supplyTask{} type supplyTask struct { } // @Title 启动任务 func (s *supplyTask) run() { // 价格更新任务 go s.price() // 商品信息更新任务 go s.data() // 拆单 go s.orderSplit() // 订单取消 go s.orderCancel() // 订单出库 go s.orderStockOut() // 订单完成 go s.orderFinish() } // @Title 价格更新通知 func (s *supplyTask) price() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("供应链价格变动mq错误", recoverErr) } time.Sleep(time.Second * 3) s.price() }() pool, _ := ants.NewPool(30) if err := (skuMessage{}.Consume(supply.Api.Mq.SkuPriceChangeQueue(), false, 30, func(message skuMessage, delivery amqp.Delivery) { pool.Submit(func() { defer delivery.Ack(false) if err := third.SupplyLogic.SyncPrice([]uint{message.SkuId}); err != nil { log.Println("价格更新失败,skuId:", message.SkuId, ",err:", err) return } }) })); err != nil { log.Println(err) } } // @Title 商品信息更新通知 func (s *supplyTask) data() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("供应链商品变动mq错误", recoverErr) } time.Sleep(time.Second * 3) s.data() }() pool, _ := ants.NewPool(30) if err := (skuMessage{}.Consume(supply.Api.Mq.SkuChangeQueue(), false, 30, func(message skuMessage, delivery amqp.Delivery) { pool.Submit(func() { defer delivery.Ack(false) if err := third.SupplyLogic.SyncData([]uint{message.SkuId}); err != nil { log.Println("商品信息更新失败,skuId:", message.SkuId, ",err:", err) return } }) })); err != nil { log.Println(err) } } // @Title 拆单通知 func (s *supplyTask) orderSplit() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("供应链拆单mq错误", recoverErr) } time.Sleep(time.Second * 3) s.orderSplit() }() pool, _ := ants.NewPool(5) rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderSplit(), false, 5, func(delivery amqp.Delivery) { pool.Submit(func() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("消息队列调用错误,err:", recoverErr) } }() var orderSplit supply.OrderSplit json.Unmarshal(delivery.Body, &orderSplit) if err := third.SupplyLogic.OrderSplit(orderSplit); err != nil { log.Println("拆单处理失败,skuId:", string(delivery.Body), ",err:", err) delivery.Reject(true) return } else { defer delivery.Ack(false) } }) }) } // @Title 取消订单通知 func (s *supplyTask) orderCancel() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("供应链拆单mq错误", recoverErr) } time.Sleep(time.Second * 3) s.orderCancel() }() pool, _ := ants.NewPool(5) rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderCancel(), false, 5, func(delivery amqp.Delivery) { pool.Submit(func() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("消息队列调用错误,err:", recoverErr) } }() var orderCancel supply.OrderCancel json.Unmarshal(delivery.Body, &orderCancel) if err := third.SupplyLogic.OrderCancel(orderCancel); err != nil { log.Println("取消订单处理失败,skuId:", string(delivery.Body), ",err:", err) delivery.Reject(true) return } else { defer delivery.Ack(false) } }) }) } // @Title 订单出库通知 func (s *supplyTask) orderStockOut() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("订单出库mq错误", recoverErr) } time.Sleep(time.Second * 3) s.orderCancel() }() pool, _ := ants.NewPool(5) rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderStockOut(), false, 5, func(delivery amqp.Delivery) { pool.Submit(func() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("消息队列调用错误,err:", recoverErr) } }() var orderStockOut supply.OrderStockOut json.Unmarshal(delivery.Body, &orderStockOut) if err := third.SupplyLogic.OrderStockOut(orderStockOut); err != nil { log.Println("订单出库处理失败,skuId:", string(delivery.Body), ",err:", err) delivery.Reject(true) return } else { defer delivery.Ack(false) } }) }) } // @Title 订单完成通知 func (s *supplyTask) orderFinish() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("订单完成mq错误", recoverErr) } time.Sleep(time.Second * 3) s.orderFinish() }() pool, _ := ants.NewPool(5) rabbitmq.Rabbitmq.Consume(supply.Api.Mq.OrderFinish(), false, 5, func(delivery amqp.Delivery) { pool.Submit(func() { defer func() { if recoverErr := recover(); recoverErr != nil { log.Println("消息队列调用错误,err:", recoverErr) } }() var orderFinish supply.OrderFinish json.Unmarshal(delivery.Body, &orderFinish) if err := third.SupplyLogic.OrderFinish(orderFinish); err != nil { log.Println("订单出库处理失败,skuId:", string(delivery.Body), ",err:", err) delivery.Reject(true) return } else { defer delivery.Ack(false) } }) }) }