找回密码
 立即注册
首页 业界区 业界 订单初版—2.生单链路中的技术问题说明文档 ...

订单初版—2.生单链路中的技术问题说明文档

挫莉虻 7 天前
大纲
1.生单链路的业务代码
2.生单链路中可能会出现数据不一致的问题
3.Seata AT模式下的分布式事务的原理
4.Seata AT模式下的分布式事务的读写隔离原理
5.Seata AT模式下的死锁问题以及超时机制
6.Seata AT模式下的读写隔离机制的影响
7.生单链路使用Seata AT模式的具体步骤
8.生单链路使用Seata AT模式时的原理流程
9.生单链路使用Seata AT模式时的并发问题
10.生单链路如何解决库存全局锁争用问题
 
1.生单链路的业务代码
(1)生成订单流程
(2)入参检查与风控检查
(3)获取商品信息与计算订单价格及验证价格
(4)锁定优惠券与商品库存
(5)新增订单到数据库
(6)发送延迟消息到MQ
 
(1)生成订单流程
1.png
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //提交订单/生成订单接口
  5.     @GlobalTransactional(rollbackFor = Exception.class)
  6.     @Override
  7.     public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {
  8.         //1.入参检查
  9.         checkCreateOrderRequestParam(createOrderRequest);
  10.         //2.风控检查
  11.         checkRisk(createOrderRequest);
  12.         //3.获取商品信息
  13.         List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);
  14.         //4.计算订单价格
  15.         CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);
  16.         //5.验证订单实付金额
  17.         checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);
  18.         //6.锁定优惠券
  19.         lockUserCoupon(createOrderRequest);
  20.         //7.锁定商品库存
  21.         lockProductStock(createOrderRequest);
  22.         //8.生成订单到数据库
  23.         addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);
  24.         //9.发送订单延迟消息用于支付超时自动关单
  25.         sendPayOrderTimeoutDelayMessage(createOrderRequest);
  26.         //返回订单信息
  27.         CreateOrderDTO createOrderDTO = new CreateOrderDTO();
  28.         createOrderDTO.setOrderId(createOrderRequest.getOrderId());
  29.         return createOrderDTO;
  30.     }
  31.     ...
  32. }
复制代码
(2)入参检查与风控检查
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //检查创建订单请求参数
  5.     private void checkCreateOrderRequestParam(CreateOrderRequest createOrderRequest) {
  6.         ParamCheckUtil.checkObjectNonNull(createOrderRequest);
  7.         //订单ID检查
  8.         String orderId = createOrderRequest.getOrderId();
  9.         ParamCheckUtil.checkStringNonEmpty(orderId, OrderErrorCodeEnum.ORDER_ID_IS_NULL);
  10.         //业务线标识检查
  11.         Integer businessIdentifier = createOrderRequest.getBusinessIdentifier();
  12.         ParamCheckUtil.checkObjectNonNull(businessIdentifier, OrderErrorCodeEnum.BUSINESS_IDENTIFIER_IS_NULL);
  13.         if (BusinessIdentifierEnum.getByCode(businessIdentifier) == null) {
  14.             throw new OrderBizException(OrderErrorCodeEnum.BUSINESS_IDENTIFIER_ERROR);
  15.         }
  16.         //用户ID检查
  17.         String userId = createOrderRequest.getUserId();
  18.         ParamCheckUtil.checkStringNonEmpty(userId, OrderErrorCodeEnum.USER_ID_IS_NULL);
  19.         //订单类型检查
  20.         Integer orderType = createOrderRequest.getOrderType();
  21.         ParamCheckUtil.checkObjectNonNull(businessIdentifier, OrderErrorCodeEnum.ORDER_TYPE_IS_NULL);
  22.         if (OrderTypeEnum.getByCode(orderType) == null) {
  23.             throw new OrderBizException(OrderErrorCodeEnum.ORDER_TYPE_ERROR);
  24.         }
  25.         //卖家ID检查
  26.         String sellerId = createOrderRequest.getSellerId();
  27.         ParamCheckUtil.checkStringNonEmpty(sellerId, OrderErrorCodeEnum.SELLER_ID_IS_NULL);
  28.         //配送类型检查
  29.         Integer deliveryType = createOrderRequest.getDeliveryType();
  30.         ParamCheckUtil.checkObjectNonNull(deliveryType, OrderErrorCodeEnum.USER_ADDRESS_ERROR);
  31.         if (DeliveryTypeEnum.getByCode(deliveryType) == null) {
  32.             throw new OrderBizException(OrderErrorCodeEnum.DELIVERY_TYPE_ERROR);
  33.         }
  34.         //地址信息检查
  35.         String province = createOrderRequest.getProvince();
  36.         String city = createOrderRequest.getCity();
  37.         String area = createOrderRequest.getArea();
  38.         String streetAddress = createOrderRequest.getStreet();
  39.         ParamCheckUtil.checkStringNonEmpty(province, OrderErrorCodeEnum.USER_ADDRESS_ERROR);
  40.         ParamCheckUtil.checkStringNonEmpty(city, OrderErrorCodeEnum.USER_ADDRESS_ERROR);
  41.         ParamCheckUtil.checkStringNonEmpty(area, OrderErrorCodeEnum.USER_ADDRESS_ERROR);
  42.         ParamCheckUtil.checkStringNonEmpty(streetAddress, OrderErrorCodeEnum.USER_ADDRESS_ERROR);
  43.         //区域ID检查
  44.         String regionId = createOrderRequest.getRegionId();
  45.         ParamCheckUtil.checkStringNonEmpty(regionId, OrderErrorCodeEnum.REGION_ID_IS_NULL);
  46.         //经纬度检查
  47.         BigDecimal lon = createOrderRequest.getLon();
  48.         BigDecimal lat = createOrderRequest.getLat();
  49.         ParamCheckUtil.checkObjectNonNull(lon, OrderErrorCodeEnum.USER_LOCATION_IS_NULL);
  50.         ParamCheckUtil.checkObjectNonNull(lat, OrderErrorCodeEnum.USER_LOCATION_IS_NULL);
  51.         //收货人信息检查
  52.         String receiverName = createOrderRequest.getReceiverName();
  53.         String receiverPhone = createOrderRequest.getReceiverPhone();
  54.         ParamCheckUtil.checkStringNonEmpty(receiverName, OrderErrorCodeEnum.ORDER_RECEIVER_IS_NULL);
  55.         ParamCheckUtil.checkStringNonEmpty(receiverPhone, OrderErrorCodeEnum.ORDER_RECEIVER_IS_NULL);
  56.         //客户端设备信息检查
  57.         String clientIp = createOrderRequest.getClientIp();
  58.         ParamCheckUtil.checkStringNonEmpty(clientIp, OrderErrorCodeEnum.CLIENT_IP_IS_NULL);
  59.         //商品条目信息检查
  60.         List<CreateOrderRequest.OrderItemRequest> orderItemRequestList = createOrderRequest.getOrderItemRequestList();
  61.         ParamCheckUtil.checkCollectionNonEmpty(orderItemRequestList, OrderErrorCodeEnum.ORDER_ITEM_IS_NULL);
  62.         for (CreateOrderRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {
  63.             Integer productType = orderItemRequest.getProductType();
  64.             Integer saleQuantity = orderItemRequest.getSaleQuantity();
  65.             String skuCode = orderItemRequest.getSkuCode();
  66.             ParamCheckUtil.checkObjectNonNull(productType, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR);
  67.             ParamCheckUtil.checkObjectNonNull(saleQuantity, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR);
  68.             ParamCheckUtil.checkStringNonEmpty(skuCode, OrderErrorCodeEnum.ORDER_ITEM_PARAM_ERROR);
  69.         }
  70.         //订单费用信息检查
  71.         List<CreateOrderRequest.OrderAmountRequest> orderAmountRequestList = createOrderRequest.getOrderAmountRequestList();
  72.         ParamCheckUtil.checkCollectionNonEmpty(orderAmountRequestList, OrderErrorCodeEnum.ORDER_AMOUNT_IS_NULL);
  73.         for (CreateOrderRequest.OrderAmountRequest orderAmountRequest : orderAmountRequestList) {
  74.             Integer amountType = orderAmountRequest.getAmountType();
  75.             ParamCheckUtil.checkObjectNonNull(amountType, OrderErrorCodeEnum.ORDER_AMOUNT_TYPE_IS_NULL);
  76.             if (AmountTypeEnum.getByCode(amountType) == null) {
  77.                 throw new OrderBizException(OrderErrorCodeEnum.ORDER_AMOUNT_TYPE_PARAM_ERROR);
  78.             }
  79.         }
  80.         Map<Integer, Integer> orderAmountMap = orderAmountRequestList.stream()
  81.             .collect(Collectors.toMap(CreateOrderRequest.OrderAmountRequest::getAmountType, CreateOrderRequest.OrderAmountRequest::getAmount));
  82.         //订单支付原价不能为空
  83.         if (orderAmountMap.get(AmountTypeEnum.ORIGIN_PAY_AMOUNT.getCode()) == null) {
  84.             throw new OrderBizException(OrderErrorCodeEnum.ORDER_ORIGIN_PAY_AMOUNT_IS_NULL);
  85.         }
  86.         //订单运费不能为空
  87.         if (orderAmountMap.get(AmountTypeEnum.SHIPPING_AMOUNT.getCode()) == null) {
  88.             throw new OrderBizException(OrderErrorCodeEnum.ORDER_SHIPPING_AMOUNT_IS_NULL);
  89.         }
  90.         //订单实付金额不能为空
  91.         if (orderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()) == null) {
  92.             throw new OrderBizException(OrderErrorCodeEnum.ORDER_REAL_PAY_AMOUNT_IS_NULL);
  93.         }
  94.         if (StringUtils.isNotEmpty(createOrderRequest.getCouponId())) {
  95.             //订单优惠券抵扣金额不能为空
  96.             if (orderAmountMap.get(AmountTypeEnum.COUPON_DISCOUNT_AMOUNT.getCode()) == null) {
  97.                 throw new OrderBizException(OrderErrorCodeEnum.ORDER_DISCOUNT_AMOUNT_IS_NULL);
  98.             }
  99.         }
  100.         //订单支付信息检查
  101.         List<CreateOrderRequest.PaymentRequest> paymentRequestList = createOrderRequest.getPaymentRequestList();
  102.         ParamCheckUtil.checkCollectionNonEmpty(paymentRequestList, OrderErrorCodeEnum.ORDER_PAYMENT_IS_NULL);
  103.         for (CreateOrderRequest.PaymentRequest paymentRequest : paymentRequestList) {
  104.             Integer payType = paymentRequest.getPayType();
  105.             Integer accountType = paymentRequest.getAccountType();
  106.             if (payType == null || PayTypeEnum.getByCode(payType) == null) {
  107.                 throw new OrderBizException(OrderErrorCodeEnum.PAY_TYPE_PARAM_ERROR);
  108.             }
  109.             if (accountType == null || AccountTypeEnum.getByCode(accountType) == null) {
  110.                 throw new OrderBizException(OrderErrorCodeEnum.ACCOUNT_TYPE_PARAM_ERROR);
  111.             }
  112.         }
  113.     }
  114.     //风控检查
  115.     private void checkRisk(CreateOrderRequest createOrderRequest) {
  116.         //调用风控服务进行风控检查
  117.         CheckOrderRiskRequest checkOrderRiskRequest = createOrderRequest.clone(CheckOrderRiskRequest.class);
  118.         JsonResult<CheckOrderRiskDTO> jsonResult = riskApi.checkOrderRisk(checkOrderRiskRequest);
  119.         if (!jsonResult.getSuccess()) {
  120.             throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());
  121.         }
  122.     }
  123.     ...
  124. }
复制代码
(3)获取商品信息与计算订单价格及验证价格
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //获取订单条目商品信息
  5.     private List<ProductSkuDTO> listProductSkus(CreateOrderRequest createOrderRequest) {
  6.         List<CreateOrderRequest.OrderItemRequest> orderItemRequestList = createOrderRequest.getOrderItemRequestList();
  7.         List<ProductSkuDTO> productSkuList = new ArrayList<>();
  8.         for (CreateOrderRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {
  9.             String skuCode = orderItemRequest.getSkuCode();
  10.             ProductSkuQuery productSkuQuery = new ProductSkuQuery();
  11.             productSkuQuery.setSkuCode(skuCode);
  12.             productSkuQuery.setSellerId(createOrderRequest.getSellerId());
  13.             JsonResult<ProductSkuDTO> jsonResult = productApi.getProductSku(productSkuQuery);
  14.             if (!jsonResult.getSuccess()) {
  15.                 throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());
  16.             }
  17.             ProductSkuDTO productSkuDTO = jsonResult.getData();
  18.             //sku不存在
  19.             if (productSkuDTO == null) {
  20.                 throw new OrderBizException(OrderErrorCodeEnum.PRODUCT_SKU_CODE_ERROR, skuCode);
  21.             }
  22.             productSkuList.add(productSkuDTO);
  23.         }
  24.         return productSkuList;
  25.     }
  26.     //计算订单价格,如果使用了优惠券、红包、积分等,会一并进行扣减
  27.     //@param createOrderRequest 订单信息
  28.     //@param productSkuList     商品信息
  29.     private CalculateOrderAmountDTO calculateOrderAmount(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList) {
  30.         CalculateOrderAmountRequest calculateOrderPriceRequest = createOrderRequest.clone(CalculateOrderAmountRequest.class, CloneDirection.FORWARD);
  31.         //订单条目补充商品信息
  32.         Map<String, ProductSkuDTO> productSkuDTOMap = productSkuList.stream().collect(Collectors.toMap(ProductSkuDTO::getSkuCode, Function.identity()));
  33.         calculateOrderPriceRequest.getOrderItemRequestList().forEach(item -> {
  34.             String skuCode = item.getSkuCode();
  35.             ProductSkuDTO productSkuDTO = productSkuDTOMap.get(skuCode);
  36.             item.setProductId(productSkuDTO.getProductId());
  37.             item.setSalePrice(productSkuDTO.getSalePrice());
  38.         });
  39.         //调用营销服务计算订单价格
  40.         JsonResult<CalculateOrderAmountDTO> jsonResult = marketApi.calculateOrderAmount(calculateOrderPriceRequest);
  41.         //检查价格计算结果
  42.         if (!jsonResult.getSuccess()) {
  43.             throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());
  44.         }
  45.         CalculateOrderAmountDTO calculateOrderAmountDTO = jsonResult.getData();
  46.         if (calculateOrderAmountDTO == null) {
  47.             throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR);
  48.         }
  49.         //订单费用信息
  50.         List<OrderAmountDTO> orderAmountList = ObjectUtil.convertList(calculateOrderAmountDTO.getOrderAmountList(), OrderAmountDTO.class);
  51.         if (orderAmountList == null || orderAmountList.isEmpty()) {
  52.             throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR);
  53.         }
  54.         //订单条目费用明细
  55.         List<OrderAmountDetailDTO> orderItemAmountList = ObjectUtil.convertList(calculateOrderAmountDTO.getOrderAmountDetail(), OrderAmountDetailDTO.class);
  56.         if (orderItemAmountList == null || orderItemAmountList.isEmpty()) {
  57.             throw new OrderBizException(OrderErrorCodeEnum.CALCULATE_ORDER_AMOUNT_ERROR);
  58.         }
  59.         return calculateOrderAmountDTO;
  60.     }
  61.     //验证订单实付金额
  62.     private void checkRealPayAmount(CreateOrderRequest createOrderRequest, CalculateOrderAmountDTO calculateOrderAmountDTO) {
  63.         List<CreateOrderRequest.OrderAmountRequest> originOrderAmountRequestList = createOrderRequest.getOrderAmountRequestList();
  64.         Map<Integer, CreateOrderRequest.OrderAmountRequest> originOrderAmountMap =
  65.             originOrderAmountRequestList.stream().collect(Collectors.toMap(CreateOrderRequest.OrderAmountRequest::getAmountType, Function.identity()));
  66.         //前端给的实付金额
  67.         Integer originRealPayAmount = originOrderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()).getAmount();
  68.         List<CalculateOrderAmountDTO.OrderAmountDTO> orderAmountDTOList = calculateOrderAmountDTO.getOrderAmountList();
  69.         Map<Integer, CalculateOrderAmountDTO.OrderAmountDTO> orderAmountMap =
  70.             orderAmountDTOList.stream().collect(Collectors.toMap(CalculateOrderAmountDTO.OrderAmountDTO::getAmountType, Function.identity()));
  71.         //营销计算出来的实付金额
  72.         Integer realPayAmount = orderAmountMap.get(AmountTypeEnum.REAL_PAY_AMOUNT.getCode()).getAmount();
  73.         if (!originRealPayAmount.equals(realPayAmount)) {
  74.             //订单验价失败
  75.             throw new OrderBizException(OrderErrorCodeEnum.ORDER_CHECK_REAL_PAY_AMOUNT_FAIL);
  76.         }
  77.     }
  78.     ...
  79. }
复制代码
(4)锁定优惠券与商品库存
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //锁定用户优惠券
  5.     private void lockUserCoupon(CreateOrderRequest createOrderRequest) {
  6.         String couponId = createOrderRequest.getCouponId();
  7.         if (StringUtils.isEmpty(couponId)) {
  8.             return;
  9.         }
  10.         LockUserCouponRequest lockUserCouponRequest = createOrderRequest.clone(LockUserCouponRequest.class);
  11.         //调用营销服务锁定用户优惠券
  12.         JsonResult<Boolean> jsonResult = marketApi.lockUserCoupon(lockUserCouponRequest);
  13.         //检查锁定用户优惠券结果
  14.         if (!jsonResult.getSuccess()) {
  15.             throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());
  16.         }
  17.     }
  18.     //锁定商品库存
  19.     private void lockProductStock(CreateOrderRequest createOrderRequest) {
  20.         String orderId = createOrderRequest.getOrderId();
  21.         List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = ObjectUtil.convertList(
  22.             createOrderRequest.getOrderItemRequestList(), LockProductStockRequest.OrderItemRequest.class);
  23.         LockProductStockRequest lockProductStockRequest = new LockProductStockRequest();
  24.         lockProductStockRequest.setOrderId(orderId);
  25.         lockProductStockRequest.setOrderItemRequestList(orderItemRequestList);
  26.         JsonResult<Boolean> jsonResult = inventoryApi.lockProductStock(lockProductStockRequest);
  27.         //检查锁定商品库存结果
  28.         if (!jsonResult.getSuccess()) {
  29.             throw new OrderBizException(jsonResult.getErrorCode(), jsonResult.getErrorMessage());
  30.         }
  31.     }
  32.     ...
  33. }
  34. @DubboService(version = "1.0.0", interfaceClass = MarketApi.class, retries = 0)
  35. public class MarketApiImpl implements MarketApi {
  36.     ...
  37.     //锁定用户优惠券记录
  38.     @Override
  39.     public JsonResult<Boolean> lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) {
  40.         try {
  41.             Boolean result = couponService.lockUserCoupon(lockUserCouponRequest);
  42.             return JsonResult.buildSuccess(result);
  43.         } catch (MarketBizException e) {
  44.             log.error("biz error", e);
  45.             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
  46.         } catch (Exception e) {
  47.             log.error("system error", e);
  48.             return JsonResult.buildError(e.getMessage());
  49.         }
  50.     }
  51.     ...
  52. }
  53. @Service
  54. public class CouponServiceImpl implements CouponService {
  55.     ...
  56.     //锁定用户优惠券
  57.     @Transactional(rollbackFor = Exception.class)
  58.     @Override
  59.     public Boolean lockUserCoupon(LockUserCouponRequest lockUserCouponRequest) {
  60.         //检查入参
  61.         checkLockUserCouponRequest(lockUserCouponRequest);
  62.         String userId = lockUserCouponRequest.getUserId();
  63.         String couponId = lockUserCouponRequest.getCouponId();
  64.         CouponDO couponDO = couponDAO.getUserCoupon(userId, couponId);
  65.         if (couponDO == null) {
  66.             throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_NULL);
  67.         }
  68.         //判断优惠券是否已经使用了
  69.         if (CouponUsedStatusEnum.USED.getCode().equals(couponDO.getUsed())) {
  70.             throw new MarketBizException(MarketErrorCodeEnum.USER_COUPON_IS_USED);
  71.         }
  72.         couponDO.setUsed(CouponUsedStatusEnum.USED.getCode());
  73.         couponDO.setUsedTime(new Date());
  74.         couponDAO.updateById(couponDO);
  75.         return true;
  76.     }
  77.     ...
  78. }
  79. @DubboService(version = "1.0.0", interfaceClass = InventoryApi.class, retries = 0)
  80. public class InventoryApiImpl implements InventoryApi {
  81.     @Autowired
  82.     private InventoryService inventoryService;
  83.     //锁定商品库存
  84.     @Override
  85.     public JsonResult<Boolean> lockProductStock(LockProductStockRequest lockProductStockRequest) {
  86.         try {
  87.             Boolean result = inventoryService.lockProductStock(lockProductStockRequest);
  88.             return JsonResult.buildSuccess(result);
  89.         } catch (InventoryBizException e) {
  90.             log.error("biz error", e);
  91.             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
  92.         } catch (Exception e) {
  93.             log.error("system error", e);
  94.             return JsonResult.buildError(e.getMessage());
  95.         }
  96.     }
  97.     ...
  98. }
  99. @Service
  100. public class InventoryServiceImpl implements InventoryService {
  101.     ...
  102.     //锁定商品库存
  103.     @Transactional(rollbackFor = Exception.class)
  104.     @Override
  105.     public Boolean lockProductStock(LockProductStockRequest lockProductStockRequest) {
  106.         //检查入参
  107.         checkLockProductStockRequest(lockProductStockRequest);
  108.         List<LockProductStockRequest.OrderItemRequest> orderItemRequestList = lockProductStockRequest.getOrderItemRequestList();
  109.         for (LockProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {
  110.             String skuCode = orderItemRequest.getSkuCode();
  111.             ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode);
  112.             if (productStockDO == null) {
  113.                 throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_ERROR);
  114.             }
  115.             Integer saleQuantity = orderItemRequest.getSaleQuantity();
  116.             //执行库存扣减,并需要解决防止超卖的问题
  117.             int nums = productStockDAO.lockProductStock(skuCode, saleQuantity);
  118.             if (nums <= 0) {
  119.                 throw new InventoryBizException(InventoryErrorCodeEnum.LOCK_PRODUCT_SKU_STOCK_ERROR);
  120.             }
  121.         }
  122.         return true;
  123.     }
  124.     ...
  125. }
复制代码
(6)发送延迟消息到MQ
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //新增订单数据到数据库
  5.     private void addNewOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {
  6.         //封装新订单数据
  7.         NewOrderDataHolder newOrderDataHolder = new NewOrderDataHolder();
  8.         //生成主订单
  9.         FullOrderData fullMasterOrderData = addNewMasterOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);
  10.         //封装主订单数据到NewOrderData对象中
  11.         newOrderDataHolder.appendOrderData(fullMasterOrderData);
  12.         //如果存在多种商品类型,需要按商品类型进行拆单
  13.         Map<Integer, List<ProductSkuDTO>> productTypeMap = productSkuList.stream().collect(Collectors.groupingBy(ProductSkuDTO::getProductType));
  14.         if (productTypeMap.keySet().size() > 1) {
  15.             for (Integer productType : productTypeMap.keySet()) {
  16.                 //生成子订单
  17.                 FullOrderData fullSubOrderData = addNewSubOrder(fullMasterOrderData, productType);
  18.                 //封装子订单数据到NewOrderData对象中
  19.                 newOrderDataHolder.appendOrderData(fullSubOrderData);
  20.             }
  21.         }
  22.         //保存订单到数据库
  23.         //订单信息
  24.         List<OrderInfoDO> orderInfoDOList = newOrderDataHolder.getOrderInfoDOList();
  25.         if (!orderInfoDOList.isEmpty()) {
  26.             orderInfoDAO.saveBatch(orderInfoDOList);
  27.         }
  28.         //订单条目
  29.         List<OrderItemDO> orderItemDOList = newOrderDataHolder.getOrderItemDOList();
  30.         if (!orderItemDOList.isEmpty()) {
  31.             orderItemDAO.saveBatch(orderItemDOList);
  32.         }
  33.         //订单配送信息
  34.         List<OrderDeliveryDetailDO> orderDeliveryDetailDOList = newOrderDataHolder.getOrderDeliveryDetailDOList();
  35.         if (!orderDeliveryDetailDOList.isEmpty()) {
  36.             orderDeliveryDetailDAO.saveBatch(orderDeliveryDetailDOList);
  37.         }
  38.         //订单支付信息
  39.         List<OrderPaymentDetailDO> orderPaymentDetailDOList = newOrderDataHolder.getOrderPaymentDetailDOList();
  40.         if (!orderPaymentDetailDOList.isEmpty()) {
  41.             orderPaymentDetailDAO.saveBatch(orderPaymentDetailDOList);
  42.         }
  43.         //订单费用信息
  44.         List<OrderAmountDO> orderAmountDOList = newOrderDataHolder.getOrderAmountDOList();
  45.         if (!orderAmountDOList.isEmpty()) {
  46.             orderAmountDAO.saveBatch(orderAmountDOList);
  47.         }
  48.         //订单费用明细
  49.         List<OrderAmountDetailDO> orderAmountDetailDOList = newOrderDataHolder.getOrderAmountDetailDOList();
  50.         if (!orderAmountDetailDOList.isEmpty()) {
  51.             orderAmountDetailDAO.saveBatch(orderAmountDetailDOList);
  52.         }
  53.         //订单状态变更日志信息
  54.         List<OrderOperateLogDO> orderOperateLogDOList = newOrderDataHolder.getOrderOperateLogDOList();
  55.         if (!orderOperateLogDOList.isEmpty()) {
  56.             orderOperateLogDAO.saveBatch(orderOperateLogDOList);
  57.         }
  58.         //订单快照数据
  59.         List<OrderSnapshotDO> orderSnapshotDOList = newOrderDataHolder.getOrderSnapshotDOList();
  60.         if (!orderSnapshotDOList.isEmpty()) {
  61.             orderSnapshotDAO.saveBatch(orderSnapshotDOList);
  62.         }
  63.     }
  64.     //新增主订单信息订单
  65.     private FullOrderData addNewMasterOrder(CreateOrderRequest createOrderRequest, List<ProductSkuDTO> productSkuList, CalculateOrderAmountDTO calculateOrderAmountDTO) {
  66.         NewOrderBuilder newOrderBuilder = new NewOrderBuilder(createOrderRequest, productSkuList, calculateOrderAmountDTO, orderProperties);
  67.         FullOrderData fullOrderData = newOrderBuilder.buildOrder()
  68.             .buildOrderItems()
  69.             .buildOrderDeliveryDetail()
  70.             .buildOrderPaymentDetail()
  71.             .buildOrderAmount()
  72.             .buildOrderAmountDetail()
  73.             .buildOperateLog()
  74.             .buildOrderSnapshot()
  75.             .build();
  76.         //订单信息
  77.         OrderInfoDO orderInfoDO = fullOrderData.getOrderInfoDO();
  78.         //订单条目信息
  79.         List<OrderItemDO> orderItemDOList = fullOrderData.getOrderItemDOList();
  80.         //订单费用信息
  81.         List<OrderAmountDO> orderAmountDOList = fullOrderData.getOrderAmountDOList();
  82.         //补全地址信息
  83.         OrderDeliveryDetailDO orderDeliveryDetailDO = fullOrderData.getOrderDeliveryDetailDO();
  84.         String detailAddress = getDetailAddress(orderDeliveryDetailDO);
  85.         orderDeliveryDetailDO.setDetailAddress(detailAddress);
  86.         //补全订单状态变更日志
  87.         OrderOperateLogDO orderOperateLogDO = fullOrderData.getOrderOperateLogDO();
  88.         String remark = "创建订单操作0-10";
  89.         orderOperateLogDO.setRemark(remark);
  90.         //补全订单商品快照信息
  91.         List<OrderSnapshotDO> orderSnapshotDOList = fullOrderData.getOrderSnapshotDOList();
  92.         for (OrderSnapshotDO orderSnapshotDO : orderSnapshotDOList) {
  93.             //优惠券信息
  94.             if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_COUPON.getCode())) {
  95.                 ...
  96.             }
  97.             //订单费用信息
  98.             else if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_AMOUNT.getCode())) {
  99.                 orderSnapshotDO.setSnapshotJson(JsonUtil.object2Json(orderAmountDOList));
  100.             }
  101.             //订单条目信息
  102.             else if (orderSnapshotDO.getSnapshotType().equals(SnapshotTypeEnum.ORDER_ITEM.getCode())) {
  103.                 orderSnapshotDO.setSnapshotJson(JsonUtil.object2Json(orderItemDOList));
  104.             }
  105.         }
  106.         return fullOrderData;
  107.     }
  108.     ...
  109. }
复制代码
 
2.生单链路中可能会出现数据不一致的问题
在更新优惠券本地事务、更新库存本地事务、插入订单数据本地事务中,可能会出现优惠券和库存都已经更新成功了,但订单数据却插入失败,此时就会出现数据不一致的问题。
2.png
 
3.Seata AT模式下的分布式事务的原理
说明一:需要部署一个Seata Server服务器。
 
说明二:在各个服务的分支事务的数据库中,需要新增一张undo_log表,用来记录各个服务的分支事务失败时可以执行的回滚SQL。
 
说明三:当入口服务开启一个分布式事务时,需要向Seata Server开启一个全局事务。
 
说明四:各个服务对其分支事务的执行情况会同步给Seata Server服务器。
 
说明五:当Seata Server发现某分支事务失败时,便会通知各服务进行事务回滚。
 
说明六:当各个服务进行事务回滚时,会从undo_log表查出对应SQL去执行。
3.png
 
4.Seata AT模式下的分布式事务的读写隔离原理
为了避免有其他线程修改某数据后又进行回滚,就一定要加本地锁 + 全局锁。本地锁是为了避免当前机器的其他线程对数据进行修改并回滚,全局锁是为了避免分布式机器的线程对数据进行修改并回滚。
 
服务A在更新某数据之前,需要先获取本地锁。服务A在成功获取本地锁之后,需要插入undo log数据。接着,服务A需要向Seata Server服务器获取全局锁。服务A在成功获取全局锁之后,会提交本地事务并释放本地锁。
 
如果服务A对服务B进行RPC调用并提交其本地事务,则继续按前面的步骤处理服务B的数据更新。当服务B的本地事务也提交完成后,不需要继续执行其他分支事务了,服务A便可以提交分布式事务,并释放全局锁。
 
分布式事务的全链路在执行完毕前,对应数据的全局锁是不会释放的。
4.png
 
5.Seata AT模式下的死锁问题以及超时机制
Seata的一条事务链路里,每一个事务都会按如下顺序执行:首先获取本地锁更新本地数据,然后插入undo log记录,接着获取本地数据对应的全局锁,最后提交本地事务并释放本地锁。
 
Seata的一条事务链路里,一个事务执行完就会继续执行下一个事务。如果事务链路里的所有事务都执行完成了,那么就提交事务,并释放全局锁。如果某个事务需要回滚,那么就需要获取该事务本地数据的本地锁,然后获取undo log记录生成逆向操作的SQL语句来进行补偿和更新,补偿完毕才能释放本地数据的全局锁。
 
由于Seata AT模式的写隔离是通过本地数据的全局锁来实现的,所以写隔离的过程中,就涉及到了本地数据的本地锁和全局锁两把锁,这时候就很容易导致出现死锁的情况。
 
比如当事务1的分支事务提交数据1的本地事务后,会释放数据1的本地锁。此时事务2的分支事务就可以获取数据1的本地锁,但要等待获取事务1释放数据1的全局锁后,才能释放数据1的本地锁。如果事务1的后续分支事务出现异常需要进行回滚,那么事务1就需要获取数据1的本地锁,执行回滚补偿处理。事务1执行完分支事务的回滚补偿处理后,才能释放数据1的全局锁。
 
于是就出现了这样的死锁场景:事务1对数据1的回滚,占用了数据1的全局锁,需等待获取数据1的本地锁。事务2对数据1的更新,占用了数据1的本地锁,需等待获取数据1的全局锁。
 
Seata为了解决这个问题,会引入等待全局锁的超时机制。如果事务2在等待数据1的全局锁时出现超时,就会释放其占用的本地锁。从而让事务1能获取到数据1的本地锁,完成其事务操作,而不用一直等待。
5.png
 
6.Seata AT模式下的读写隔离机制的影响
由于全局锁的存在,会严重影响Seata AT分布式事务的并发吞吐量。所以除非是金融级别的系统,才会使用像Seata AT模式这么严格的事务来保证数据的强一致性。
 
当然,通常情况下分布式事务基本都是更新不同的数据。只要更新不同的数据,那么Seata AT分布式事务也不会出现全局锁等待。只有一些特殊情况下,才可能会出现大量分布式事务更新同一条数据。当使用Seata AT分布式事务时,特别注意尽量不要让全局锁等待。
 
如果不使用全局锁,那么Seata AT模式的分布式事务就会出现写未提交。就可能出现分支事务更新失败时无法回滚,因为回滚的数据已被覆盖。
 
Seata AT模式的分布式事务默认是读未提交的,即分布式事务在未提交前,分支事务更新的数据是可被其他事务读到的。
 
此外,很多公司都是使用基于RocketMQ的柔性事务来实现分布式事务。
 
7.生单链路使用Seata AT模式的具体步骤
(1)生单链路中分布式事务的主要分支事务
(2)订单系统 + 优惠券系统 + 商品库存系统都需要在pom.xml文件中引入Seata
(3)订单系统的生单接口作为分布式事务入口需添加@GlobalTransactional注解开启全局事务
(4)优惠券系统的锁定优惠券接口需添加Spring的事务注解@Transactional
(5)商品库存系统的锁定库存接口需添加Spring的事务注解@Transactional
(6)各分支事务操作的数据库需要添加undo log表
 
(1)生单链路中分布式事务的主要分支事务
分布式事务入口:订单系统的生单接口
分支事务1:优惠券系统锁定优惠券
分支事务2:商品库存系统锁定商品库存
分支事务3:订单系统创建订单数据
 
(2)订单系统 + 优惠券系统 + 商品库存系统都需要在pom.xml文件中引入Seata
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //发送支付订单超时延迟消息,用于支付超时自动关单
  5.     private void sendPayOrderTimeoutDelayMessage(CreateOrderRequest createOrderRequest) {
  6.         PayOrderTimeoutDelayMessage message = new PayOrderTimeoutDelayMessage();
  7.         message.setOrderId(createOrderRequest.getOrderId());
  8.         message.setBusinessIdentifier(createOrderRequest.getBusinessIdentifier());
  9.         message.setCancelType(OrderCancelTypeEnum.TIMEOUT_CANCELED.getCode());
  10.         message.setUserId(createOrderRequest.getUserId());
  11.         message.setOrderType(createOrderRequest.getOrderType());
  12.         message.setOrderStatus(OrderStatusEnum.CREATED.getCode());
  13.         String msgJson = JsonUtil.object2Json(message);
  14.         defaultProducer.sendMessage(
  15.             RocketMqConstant.PAY_ORDER_TIMEOUT_DELAY_TOPIC,
  16.             msgJson,
  17.             RocketDelayedLevel.DELAYED_30m,
  18.             "支付订单超时延迟消息"
  19.         );
  20.     }
  21.     ...
  22. }
  23. @Component
  24. public class DefaultProducer {
  25.     private final DefaultMQProducer producer;
  26.     @Autowired
  27.     public DefaultProducer(RocketMQProperties rocketMQProperties) {
  28.         producer = new DefaultMQProducer(RocketMqConstant.ORDER_DEFAULT_PRODUCER_GROUP);
  29.         producer.setNamesrvAddr(rocketMQProperties.getNameServer());
  30.         start();
  31.     }
  32.     //对象在使用之前必须要调用一次,只能初始化一次
  33.     public void start() {
  34.         try {
  35.             this.producer.start();
  36.         } catch (MQClientException e) {
  37.             log.error("producer start error", e);
  38.         }
  39.     }
  40.     ...
  41.     //发送消息
  42.     public void sendMessage(String topic, String message, Integer delayTimeLevel, String type) {
  43.         Message msg = new Message(topic, message.getBytes(StandardCharsets.UTF_8));
  44.         try {
  45.             if (delayTimeLevel > 0) {
  46.                 msg.setDelayTimeLevel(delayTimeLevel);
  47.             }
  48.             SendResult send = producer.send(msg);
  49.             if (SendStatus.SEND_OK == send.getSendStatus()) {
  50.                 log.info("发送MQ消息成功, type:{}, message:{}", type, message);
  51.             } else {
  52.                 throw new OrderBizException(send.getSendStatus().toString());
  53.             }
  54.         } catch (Exception e) {
  55.             log.error("发送MQ消息失败:", e);
  56.             throw new OrderBizException(OrderErrorCodeEnum.SEND_MQ_FAILED);
  57.         }
  58.     }
  59.     ...
  60. }
复制代码
(3)订单系统的生单接口作为分布式事务入口需添加@GlobalTransactional注解开启全局事务
通过添加Seata提供的注解@GlobalTransactional来开启全局事务。
  1. <dependency>
  2.     <groupId>com.alibaba.cloud</groupId>
  3.     spring-cloud-starter-alibaba-seata</artifactId>
  4.     <exclusions>
  5.         <exclusion>
  6.             <groupId>io.seata</groupId>
  7.             seata-spring-boot-starter</artifactId>
  8.         </exclusion>
  9.     </exclusions>
  10. </dependency>
  11. <dependency>
  12.     <groupId>io.seata</groupId>
  13.     seata-spring-boot-starter</artifactId>
  14.     <version>1.3.0</version>
  15. </dependency>
复制代码
(4)优惠券系统的锁定优惠券接口需添加Spring的事务注解@Transactional
通过添加Spring提供的@Transactional注解来开启本地事务。Seata会代理Spring的事务,进行本地锁申请 + undo log写入 + 全局锁请求 + 提交/回滚本地事务等操作。
  1. @Service
  2. public class OrderServiceImpl implements OrderService {
  3.     ...
  4.     //提交订单/生成订单接口
  5.     //@param createOrderRequest 提交订单请求入参
  6.     //@return 订单号
  7.     @GlobalTransactional(rollbackFor = Exception.class)
  8.     @Override
  9.     public CreateOrderDTO createOrder(CreateOrderRequest createOrderRequest) {
  10.         //1.入参检查
  11.         checkCreateOrderRequestParam(createOrderRequest);
  12.         //2.风控检查
  13.         checkRisk(createOrderRequest);
  14.         //3.获取商品信息
  15.         List<ProductSkuDTO> productSkuList = listProductSkus(createOrderRequest);
  16.         //4.计算订单价格
  17.         CalculateOrderAmountDTO calculateOrderAmountDTO = calculateOrderAmount(createOrderRequest, productSkuList);
  18.         //5.验证订单实付金额
  19.         checkRealPayAmount(createOrderRequest, calculateOrderAmountDTO);
  20.         //6.锁定优惠券
  21.         lockUserCoupon(createOrderRequest);
  22.         //7.锁定商品库存
  23.         lockProductStock(createOrderRequest);
  24.         //8.生成订单到数据库
  25.         addNewOrder(createOrderRequest, productSkuList, calculateOrderAmountDTO);
  26.         //9.发送订单延迟消息用于支付超时自动关单
  27.         sendPayOrderTimeoutDelayMessage(createOrderRequest);
  28.         //返回订单信息
  29.         CreateOrderDTO createOrderDTO = new CreateOrderDTO();
  30.         createOrderDTO.setOrderId(createOrderRequest.getOrderId());
  31.         return createOrderDTO;
  32.     }
  33.     ...
  34. }
复制代码
(5)商品库存系统的锁定库存接口需添加Spring的事务注解@Transactional
通过添加Spring提供的@Transactional注解来开启本地事务。Seata会代理Spring的事务,进行本地锁申请 + undo log写入 + 全局锁请求 + 提交/回滚本地事务等操作。
[code]@Servicepublic class InventoryServiceImpl implements InventoryService {    ...    //锁定商品库存    @Transactional(rollbackFor = Exception.class)    @Override    public Boolean lockProductStock(LockProductStockRequest lockProductStockRequest) {        //检查入参        checkLockProductStockRequest(lockProductStockRequest);        List orderItemRequestList = lockProductStockRequest.getOrderItemRequestList();        for (LockProductStockRequest.OrderItemRequest orderItemRequest : orderItemRequestList) {            String skuCode = orderItemRequest.getSkuCode();            ProductStockDO productStockDO = productStockDAO.getBySkuCode(skuCode);            if (productStockDO == null) {                throw new InventoryBizException(InventoryErrorCodeEnum.PRODUCT_SKU_STOCK_ERROR);            }            Integer saleQuantity = orderItemRequest.getSaleQuantity();            //执行库存扣减,并需要解决防止超卖的问题            int nums = productStockDAO.lockProductStock(skuCode, saleQuantity);            if (nums

相关推荐

您需要登录后才可以回帖 登录 | 立即注册