大营销平台系统笔记

前言

Lottery学着还是有些抽象,我还是需要视频,还是all in这个大营销吧
跟着前面的序号学习,就按顺序来,加油
跟着敲代码刚敲到了抽奖中置规则过滤,太慢了,还容易出错,之后直接还是先看文章加对应分支的diff吧,日后再来自己跟敲

基础知识点

父子pom文件

根目录的pom文件

  • parent标签是所有模块共用的依赖
  • dependencyManagement中声明所有依赖的版本,用来统一管理,并不实际引入,子模块会在自己的pom文件里面引入自己需要的依赖,但不用声明版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
</parent>

<!-- 根目录pom文件中的dependencyManagement是统一管理子模块的依赖包版本,不做引用-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
...
...
...
</dependencies>
</dependencyManagement>

Mybatis的xml中的映射

小傅哥是直接建个了映射map,每次都用resultMap指定这个map就行
我之前是直接自动生成sql标签,用的全限定类名,但是有时候会无法绑定,映射失败就全为null了,还是显示地建立一个映射关系好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.bugstack.infrastructure.persistent.dao.IStrategyDao">

<resultMap id="dataMap" type="cn.bugstack.infrastructure.persistent.po.Strategy">
<id column="id" property="id"/>
<result column="strategy_id" property="strategyId"/>
<result column="strategy_desc" property="strategyDesc"/>
<result column="rule_models" property="ruleModels"/>
<result column="create_time" property="createTime"/>
<result column="update_time" property="updateTime"/>
</resultMap>

<select id="queryStrategyByStrategyId" parameterType="java.lang.Long" resultMap="dataMap">
select strategy_id, strategy_desc, rule_models
from strategy
where strategy_id = #{strategyId}
</select>

</mapper>

Redis操作

小傅哥是有一个脚手架,项目建立时就用的这个架子
它那里面的redis连接实现就是在infrastructure.persistence.redis包下,这里面有两个类
一个IRedisService接口,一个RedissonService实现类,他们通过RedissonClient封装了redis操作
之后如果要操作redis,注入这个接口实例就行,spring会自动注入实现类
【业务代码只需要依赖 IRedisService 接口,不需要直接依赖 Redisson,降低了代码耦合度】

抽奖策略

  1. 每个选项概率相加的总和为1
  2. 抽奖为免费抽奖次数+用户消耗积分抽奖
  3. 抽奖活动包含总库存,控制运营成本
  4. 部分奖品需要抽取一定次数才可以解锁
  5. 抽奖完成可以累计运气值,满了可以拿保底
  6. 奖品对接,比如给用户赋予别的模型的权限
  7. 抽完奖可以获取一个随机积分

空间换时间

用概率总和除以最小概率来得出最小概率的份数
这个份数也要存在redis里(range_key),每次抽奖就知道从范围中生成随机数
然后建立一个概率表(table_key),循环填充对应份数的奖品,存在redis里
最后要抽奖的时候就用随机数和概率表就能直接获取奖品id

表中的award_rate字段总和应为1

一开始的建表语句错了,总和设成了100,按第4节的文章中的sql重建吧

最小概率计算最大份数

假如最小概率为0.009则需要1000份,最小为0.18就需要100份

获取最大份数的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 转换计算,只根据小数位来计算。如【0.01返回100】、【0.009返回1000】、【0.0018返回10000】
*/
private double convert(double min) {
if (0 == min) return 1D;

double current = min;
double max = 1;
while (current < 1) {
current = current * 10;
max = max * 10;
}
return max;
}

前置规则过滤(黑名单)

  1. 每次要抽奖的时候,先构造一个抽奖因子实体(RaffleFactorEntity),包含一次抽奖启动的信息,userId和strategyId
  2. 然后给DefaultRaffleStrategy这个抽奖策略的实现类传入抽奖因子来执行抽奖
  3. 先根据stategyId查询出这个策略都有什么规则(rule_model:rule_blacklist, rule_weight)
  4. 然后按顺序进行规则过滤,抽象类有一个protected的执行前置规则过滤的方法,接收一个抽奖因子,所以构建一个新的抽奖因子,把上面查到的规则类型和一开始的因子的属性传入
  5. 进行规则过滤,先判断是否在黑名单,就是解析rule_model的值,判断是否包含rule_blacklist
  6. 优先过滤黑名单规则,因为如果命中黑名单就要直接返回结果了,所以先判断。构建一个ruleMatterEntity,存储抽奖因子属性,同时setRuleModel值为rule_blacklist
  7. 然后调用ILogicFilter接口,它目前有两个过滤器实现类,RuleBlackListLogicFilter和RuleWeightLogicFilter,分别处理黑名单和权重规则的过滤,他们的类上都有一个自定义的LogicStrategy注解,用来标记处理对应的rule_model。过滤器类都由DefaultLogicFactory工厂统一管理,内部有这个注解和对应的过滤器类的map,获取对应过滤器也是通过这个工厂的方法获取工厂的map属性
  8. 调用过滤接口的时候就根据map中对应的rule_model,获取对应的过滤器,然后执行过滤,查询rule_model对应的rule_value,也就是当前哪些用户在黑名单中(100:user001, user002),以及黑名单用户的奖品id,就是前面的100
  9. 如果当前用户在黑名单中,就构建一个RuleActionEntity规则动作实体,存储rule_model,接管的标记,data(RuleActionEntity.RaffleBeforeEntity,他的内部静态类,存储策略id,奖品id)
  10. 然后DefaultRaffleStrategy就获得了,这个规则动作实体,因为标记为接管,走接管的逻辑,构建一个RaffleAwardEntity存储奖品id,返回

第5点的时候,如果不在黑名单里,就继续下一步

  1. 收集所有不是黑名单的rule_model存储进一个ruleList
  2. 依次遍历ruleList,也是每次构建ruleMatterEntity,进入过滤器,不过目前也就一个权重规则,走一个权重过滤器,根据当前用户积分,抽对应领域的奖品,查出来后标记为接管
  3. 返回到DefaultRaffleStrategy,构建一个RaffleAwardEntity存储奖品id,返回

中置规则过滤

一个奖品如果有抽奖次数限制,抽到该奖品的时候没有到达这个抽奖次数限制,就返回一个幸运奖

前期过滤,一个是把黑名单的用户过滤,过滤就是直接截胡了;另一个权重是将奖品的范围过滤在一定的区间内,后续在这个范围里抽
中期过滤,是检查这个奖品的抽奖次数限制,如果没达到,就截胡返回一个幸运奖

加入这个中期过滤的操作:抽象类中抽完奖品后加一个doCheckRaffleCenterLogic,对是否满足当前奖品的次数进行判断返回中期动作实体RuleActionEntity<RuleActionEntity.RaffleCenterEntity>

责任链模式处理抽奖规则

前面用策略、工厂、模版模式来完成的抽奖的前中后规则的过滤处理
【思考】抽奖的前置规则在抽奖中是一个什么行为。其实它可以被抽象为一种策略行为,比如;黑名单抽奖策略、权重抽奖策略、白名单抽奖策略等。而这些策略规则是一种互斥行为,比如走了黑名单规则,就不应该在继续走权重规则了。那么对于这样的情况,责任链的设计就更加合适了。

默认工厂实现就该改成责任链工厂了

原来的策略装配就改成责任链装配

1
2
3
4
5
6
7
public interface ILogicChainArmory {

ILogicChain next();

ILogicChain appendNext(ILogicChain next);

}
  • 此时的黑名单过滤时,如果不在黑名单,就next()放行到下一个节点,在就接管

  • 权重规则过滤时也是一样,接管了就按照消耗的积分制去按对应积分范围抽奖,返回一个奖品id,没接管就放行到下一个节点(一个默认节点兜底)

  • 默认节点就是最普通的抽奖,得到一个随机数,返回一个奖品id

  • 原来的随机数抽奖前置规则过滤是在抽象类中的doCheckRaffleBeforeLogic中的实现,手动每个策略的顺序,然后对每个策略过滤结果进行if,else判断,过滤完再抽奖,返回奖品id,这次每个类都实现一个ILogicChain接口,内部定义了logic方法,方法内部就是对应策略的抽奖规则,每次调用这个接口方法来抽奖,获取一个奖品id,装配的时候也统一装配这个接口类型就行

  • 工厂装配的时候没有规则就直接返回一个默认节点,有规则就按照规则顺序填装,最后填装一个默认节点

  • 工厂内部有一个Map<String, ILogicChain> logicChainGroup组,用于构建责任链存储所有实现了ILogicChain接口的类,接口实现类上有标识,如@Component("rule_blacklist"),它是通过构造器自动注入,Spring支持将相同类型的多个Bean自动装配到一个 Map 中,根据标识作为key。如果一个类只有一个构造函数,Spring会自动将其视为注入点,无需显式添加 @Autowired

  • 规则的装填顺序由数据库中strategy表中的rule_model值的顺序决定

  • 有了责任链之后添加删除策略就变得非常灵活了,原来的这个顺序是完全耦合在抽象类中的,修改得去改那个doCheckRaffleBeforeLogic方法,现在只需要根据数据库的rule_model构建责任链就行

  • 每个节点的过滤逻辑内,如果放行就最后调用next().logic(userId, strategyId)让下一个节点做过滤,拦截就直接返回一个奖品id

规则树(规则引擎)

让过滤节点可以满足一颗二叉树的结构,自由的组合和多分支链路的方式完成流程的处理。此时就不再是一条链了,而是一个树结构
前置规则过滤,比如黑名单规则是命中了就直接返回,这个是适合责任链的。但是中置规则和后置规则就不一定是这样的链状结构了,所以引入规则树(这个时候之前的Filter就不需要了)

实现规则树的模型:

  • RuleTreeVO 决策树的树根信息,标记出最开始从哪个节点执行「treeRootRuleNode」。

  • RuleTreeNodeVO 决策树的节点,这些节点可以组合出任意需要的规则树。

  • RuleTreeNodeLineVO 决策树节点连线,用于标识出怎么从一个节点到下一个节点。这是NodeVO的一个属性

  • 所有规则数节点都实现ILogicTreeNode接口,暂时实现三个节点

    • rule_lock:次数锁,判断当前是否满足奖品的前置抽奖次数
    • rule_stock:库存锁,判断当前奖品的库存是否充足
    • rule_luck_award:兜底幸运奖,如果当前抽到的奖品条件不满足,就不给这个奖品,用兜底幸运奖替代
  • 其中lock指向luck_award和stock,stock指向luck_award,luck_award指向null,这个调用关系暂时在测试类中手动赋值

  • 然后调用IDecisionTreeEngine treeEngine = defaultTreeFactory.openLogicTree(ruleTreeVO);将构造号的规则树传入,返回一个决策树引擎

  • 然后调用这个决策树的proce方法,以用户id,策略id,奖品id作为参数就会开始决策

  • 首先获取tree的根节点,从这开始进入while循环,进行当前节点的逻辑,获取结果,结果包含是否放行和奖品数据

  • 然后依据当前的状态(是否放行等)遍历当前节点的TreeNodeLineVOList属性,通过逻辑选择获取符合条件的TreeNodeVO,也就是下一个指向的节点,再次循环。如果没有符合条件的节点则会抛出规则树无法找到节点的异常

  • 最终下一个节点为null时终止循环,返回奖品数据

目前中置规则的调用关系

  1. 先进入rule_lock根节点(如果这个商品有抽奖次数限制的话,这个就会是树的根节点),判断是否满足次数限制,不满足就TAKE_OVER,然后根据节点指向关系表中的当前节点的标志位去获取下一个节点,走到rule_luck_award;如果满足就返回一个ALLOW,根据节点指向关系表走到rule_stock
  2. 进入到rule_stock节点,对库存做判断,不足就返回ALLOW,放行到rule_luck_award,充足就进入下一个节点,此时关系表中没有满足的下一个节点,就进入到null,结束循环,成功抽到这个奖品,返回奖品数据
  3. 假如不满足,放行到了rule_luck_award,就将奖品数据更改为随机积分,然后走到空节点,结束循环,返回奖品数据

前置规则与中置后置规则

  • 前置规则是这个抽奖策略本身的规则,比如黑名单用户抽奖,是否使用积分来改变抽奖范围
  • 中置后置规则就是是这个奖品的规则,比如当前奖品有抽奖次数限制,当前奖品库存不足,所以每个商品都应该有一个规则树,因为都有扣减库存和兜底奖励
  • 所以前置规则在strategy_rule表中,中置后置规则在strategy_award表中(但奖品的次数限制也会写在strategy_rule表中,但这个次数也会写在tree_node里面,直接按节点来就行)

模板模式串联抽奖规则

抽象类内使用责任链前置过滤,规则树作为中置后置过滤,执行这一系列的总的逻辑都在抽象类中进行,这个抽象类中定义的就是模版方法

  • 之前的doCheckRaffleBeforeLogic方法和doCheckRaffleCenterLogic方法就都可以替换成责任链和规则树的调用了raffleLogicChain和raffleLogicTree
  • 数据库中新加入了三个表,就是规则树,树节点策略,和树节点连线,之前这个关系是在测试类中手动赋值的
  • 被前置过滤之后会标记为不是默认规则,直接返回奖品,不会走规则树进行中置过滤
  • 如果是默认规则,就会去strategy_award表下查找当前抽奖策略的奖品的rule_model字段,看是否是
  • 修改Navicat的数据库要点左下角的应用更改,这节的sql有问题,tree_id都是tree_lock,但是却是按照rule_model字段去查找的,全是空指针,应该是策略奖品表的rule_model就填tree_id,tree_id就对应了他掌管的中置规则,所以要走中置规则的奖品,在rule_model字段中加上tree_id就行。我重整理一下,但是100003策略的rule_model中设置了blacklist,但是rule表中却没有记录,这个肯定错了,然后award表中的rule_models字段应该从三个规则改成一个tree_lock
  • 修完了,可以继续了,然后查出来要用的规则树、节点、指向关系后,将节点和指向关系转换为map,构建一个tree存map,再将这个tree存入redis
  • 用这个完整的规则树ruleTreeVO构建一个决策树引擎,然后调用引擎的process方法开始处理
  • 先从VO中获取根节点,作为nextNode,然后以nextNode开启一个while循环,每个节点都做逻辑处理,处理完打上对应的type,根据type从当前节点的LineVO(指向关系)中获取下一个节点,开启下一轮循环
  • 如果条件不合法,比如库存不足,限制次数未达到,就会改变这个奖品id,并且打上返回的type,合法就保留原来的奖品id。循环完毕后就返回策略奖品数据,完成抽奖

不超卖库存规则实现

实现那三个规则树节点功能,并增加对库存的扣减和消费延迟队列

这种众多用户抽奖的秒杀场景肯定还是要用redis来处理,来确保不超卖

  • 装配策略的时候,新增将商品的库存数量缓存到redis中的功能
  • IStrategyDispatch接口新增加了一个subtractionAwardStock方法(扣减奖品库存),实现类还是StrategyArmoryDispatch
  • Stock树节点就会先调用这个方法,来扣减库存,如果成功则返回写入延迟队列,延迟更新数据库,失败就返回放行,去走兜底节点(放行接管这样的标志位,具体对应走哪个节点都是和数据库中rule_tree_node_line(节点指向关系表)中的限制字段决定的)。不超卖的保证,使用redis的decr进行原子性自减,如果减完的库存值小于0,将库存重置回0,并返回false(然后用setnx设置分布式锁,用来兜底,避免后续有库存的恢复,导致库存从96消耗后又变回了98重复消费)
  • 策略仓储层添加一个将要扣减的奖品数据放入延迟队列方法,之后让定时任务去消费队列,这个阻塞队列也是存在redis中的
  • RuleLock次数锁树节点完善对用户当前抽奖次数进行判断的逻辑,满足就放行,不满足就拦截,然后走到兜底奖品去
  • RuleLuckAward兜底奖励节点根举当前策略奖品设置的rule_value(101:1,100)进行解析,并返回这个兜底奖品的id和奖品的配置
  • 新增一个IRaffleStock接口,内部有两个方法,获取奖品库存消耗队列,更新奖品库存消耗记录,抽象类实现接口,交由子类实现方法
  • 任务消费队列,放在trigger.job.UpdateAwardStockJob中,是一个组件,注入IRaffleStock接口,然后设置一个定时任务,每5秒从延迟队列中获取奖品数据,更新库存消耗记录

前端引入大转盘

用的组件库的转盘组件
然后对前端进行mock(模拟/虚假的意思,模拟真实数据,测试接口,api测试软件里面填参数就是一种mock,自己构建一个数据对象作为参数也是mock)

抽奖API接口实现

类似于MVC的Controller接口,但是在这个架构中有一个trigger模块,专门提供触发操作,这里我们把 HTTP 调用、RPC(Dubbo)调用、定时任务、MQ监听等动作,都称为触发操作。触发表示通过一种调用方式,调用到领域的服务上。

  • 新增一个IRaffleAward接口用来查询奖品数据,同时抽象类实现的IRaffleStock和这个接口都交由子类实现,这个子类当前就是继承一个抽象类,然后实现两个接口,这样一眼就知道这个类在干什么,易于维护
  • 新增api模块,内部有一个IRaffleService接口,里面定义了策略装配接口,查询抽奖奖品列表配置,随机抽奖接口。这个接口的包名是cn.bugstack.trigger.api
  • trigger模块中的http包下新增一个RaffleController类,实现IRaffleService接口,实现这三个方法,然后就可以访问这些接口了。之前都是setup()单元测试的时候手动调用的

app启动的时候报错java: 程序包cn.bugstack.trigger.api不存在,maven的缓存问题,去生命周期里面清理一下

然后端口是8091,同时类Controller里面还有个注解@CrossOrigin("${app.config.cross-origin}")这个是应对跨域的,在配置文件中指定,暂时不需要,设置为*
url地址中带有指定版本号/api/${app.config.api-version}/raffle/和上面的跨域在yml文件中的同一位置,当前设置为v1。RequestMapping上面的${}会直接替换为yml文件中的值,但是apifox下面的实际请求,只会替换路径参数也就是{}的值,所以实际请求为$v1这就是请求404的原因。所以现在还是手动的把调试时候的${}换成v1吧

前后端接口对接

目前的项目还是要通过地址栏路径参数手动指定装配策略id,然后点击装配按钮才能抽奖
比如http://localhost:3000/?strategyId=100006

引入新表

  • 抽奖活动表,配置了用户参与一个活动的时候,需要进行的必要信息判断。时间、库存、状态等。
  • 参与次数表,单独分离出来。这样更方便后续基于不同的次数编号,做扩展。比如兑换一个新的抽奖次数。
  • 活动下单记录表,用户参与活动,则需要先创建一笔订单记录。如果用户抽奖中有失败流程,也可以基于订单的状态,用户重新发起抽奖,也不会额外占用库存记录。
  • 活动次数账户表,记录着一个用户在一个活动的可参与次数数据,也就是个人活动账户。
  • 账户次数流水表,每一笔对账户变动的记录,无论是任何的方式的变动,都要有一条流水。

分库分表

大营销项目有一个配置库和两个分库,需要对两个分库进行配置路由的操作。
配置库是一个单库单表存储活动等配置类信息

分库分表就是,假设设置dbCount: 2,tbCount: 4,list: db01,db02,routerKey: userId。此时用这个key的dao层操作增删改一条user记录,这个操作就会通过哈希值计算,操作对应哈希值对应的库表。配置了两个分库,每个分库有4个分表,原来存储在一个库表的记录就可以分开存储了。类似于redis集群的分片。
但是这肯定会带来问题,跨多个库表的范围搜索就很困难了,之后这个就可以交给搜索引擎ES来完成

  • 以用户对数据库的操作为视角,发生用户类的行为操作时【账户、下单、流水】,则会根据用户ID(userId)进行路由,把数据分配到x库y表中。
  • 路由计算的处理,是以配置了 @DBRouter注解的 DAO 方法进行路由切面开始。通过获取用户ID(userId)值进行哈希索引计算。哈希值 & 2从n次幂数量的库表 - 1 得到一个值,在根据这个值计算应该分配到哪个库表上去。比如这个是6,分库分表是2库4表,共计8个,那么6就分配到了1库4+2库2个等于6,也就得到了2库2表。
  • 对于计算得到的分库分表值,存入到 ThreaLocal 中,这个东西的目的是可以在一个线程的调用中,可以随时获取值,而不需要通过方法传递。
  • 最后 Spring 在执行数据库操作前,会获取路由。而路由组件则实现了动态路由,从 ThreadLocal 中获取。此外注意,因为还有分表的操作,比如 table 需要为 table_01 这个动作是由 MyBatis Plugin 插件开发实现的。
  • 此外 sharding-jdbc 也可以做分库分表,但直接使用小伙伴们会错过理解分库分表的核心设计,所以我们这里选择使用星球「码农会锁」里的 DB-Router 进行分库分表。

配置数据源

在application.yml中配置分库分表路由组件,以前就连一个MySQL其中一个数据库就行
这次在mini-db-router:下分别配置db0、db1、db2三个数据源

  • dbCount 分几个库,tbCount 分几个表,两个数的乘积为2的次幂。
  • default 为默认不走分库分表时候路由到哪个库,这里是我们需要的配置库。
  • routerKey 默认走的路由 Key,一个数据路由,是需要有一个键的,这里选择的是用户ID作为路由计算键。
  • list: db01,db02 表示分库分表,走那套库。
  • db0、db1、db2 就是配置的数据库信息了。这里给每个数据库都配置了对应的连接池信息。

抽奖活动订单流程设计

新增加sku表(stock keeping unit,库存单位),并去掉分库分表中的 flow 流水表,而是直接由 order 订单表提供。
之后想获得更多的抽奖次数,就直接对 sku 下单即可。无论是通过赠送、签到、打卡、积分兑换等任何方式,都是可以的。这样也就增强了营销活动的扩展性。

  • domain模块下新建了activity领域对象,与之前的strategy对象并列
  • 在当前领域的service包下创建IRaffleOrder接口,内部有一个方法,以sku创建抽奖活动订单,获得可参与抽奖的资格(就是可消耗的次数)参数为活动sku实体,返回一个活动参与记录实体
  • 然后定义AbstractRaffleActivity抽象类实现这个接口,抽象类的作用是定义一个执行下单的标准流程。后续逐步完成这部分的功能
  • DDD中通常把具有唯一ID标识,影响数据库变动的操作,定义为实体对象。用于描述对象属性的值,如枚举值,没有生命周期对象,定义为VO对象(valobj包下,意思是值对象,没有id,意味只要属性值相同就是一个对象)

抽奖活动流水入库

以 domain 下的 activity 领域模块进行功能实现;

  • IRaffleOrder 是抽奖下单的入口,也就是给用户在当前的这个活动,个人的账户上充值。比如这次是允许抽奖1次。
  • AbstractRaffleActivity 是抽象类,定义出抽奖下单的流程。
  • RaffleActivitySupport 是支撑类,类似 Spring 源码中也会有 XxxSupport 来提供数据支撑。这样可以简化抽象类(AbstractRaffleActivity)里的代码量。(抽象类会继承这个支撑类)
  • RaffleActivityService 是抽象类定义的抽象接口由此类实现。
  • rule 模块下是责任链的规则实现部分。

对sku理解

购买的最小单位,比如raffle_activity_sku表
这个表属性:id, sku, activity_id, stock_count, stock_count_surplus, 创建时间, 更新时间
其中sku是类似于id的同种商品类的唯一标识
这里面一个活动中的抽奖次数就能作为最小商品单位,也就是sku
每个sku是都可以有自己的库存是吗,这同种商品的库存共用一个sku

建立这个表的好处:
独立控制每种商品的库存,解耦活动与商品

当前这个类的sku

当前这个抽象类是在处理用户通过充值某个 SKU(库存单位)来生成配额订单的业务逻辑。具体解释如下:

  1. 用户充值 SKU
    用户向系统支付一定金额或资源,购买某个特定的 SKU(例如某种虚拟商品、服务或权益)。
  2. 生成配额订单
    系统根据用户充值的 SKU,为其分配相应的配额(例如参与活动的次数、抽奖机会等),并生成一个订单记录。
  3. 业务目标
    这个过程的核心是将用户的充值行为转化为系统内的资源配额,并通过订单的形式进行管理和追踪。

就是一个用¥给账户充钱买抽奖次数
一个消耗抽奖次数去抽奖

创建订单流程

  1. 先通过sku在sku表查询到这个活动id,以及其他信息,用变量接收
  2. 然后通过活动id在活动表查询到这个活动的次数id,以及其他信息,用变量接收
  3. 然后通过次数id在次数表中查询次数信息(用户在活动上可参与的次数),以及其他信息,用变量接收
  4. 抽象类这次查询出用户在活动上可参与的次数后,获取责任链并进行规则校验,暂时不实现,只做声明,也不处理责任链的结果
  5. 然后声明两个protected的抽象方法,构建订单聚合对象,保存订单
  6. 构建订单聚合对象要把当前,活动商品充值实体对象(SkuRechargeEntity)和上面那三次查询获得的三个实体作为参数传入,内部构建一个活动订单实体,在用这个实体和其他参数的信息构建聚合对象,返回这个对象,这个对象属性是userid,活动id,三个次数信息,还有一个刚构建的活动订单实体
  7. 然后保存这个订单,保存订单方法内部先创建一个订单对象,再创建一个账户对象,填充好信息后以用户ID作为切分键进行分库分表(后续的事务前基本都要先用这个分库分表),开始编程式事务,先写入订单,再更新账户,如果账户不存在就新建一个账户,如果已经存在这个订单号就返回失败,唯一索引冲突(幂等业务单号,SkuRechargeEntity的属性)
  8. 最后返回单号

重新介绍责任链:
第三点就是,介绍的非常清楚

  • 责任链需要一个规则接口,一个组装规则的接口,一个抽象类来填充责任链。
  • 之后是实现责任链接口的各个具体要处理的规则操作,比如;活动基础信息校验,活动库存处理。
  • 此外还有一个责任链的处理工厂,负责将各部分责任链对象注入进来后加工组装出一个责任链的链条⛓。就像我们讲的,这个责任链是一个固定结构的链接,所以在工厂中提供个统一的链就可以了。
  • 活动下单动作的责任链是固定的,不像策略规则要根据每个字段添加,所以直接在构造函数中组装就行

引入MQ处理活动SKU库存一致性

RabbitMQ使用

  1. 先下载erlang环境,然后下载RabbitMQ的windos安装器
  2. 安装完了之后开始菜单会出现RabbitMQ的启动、停止、重新安装、还有管理工具
  3. 打开管理工具后net start RabbitMQ启动,net stop RabbitMQ停止
  4. 然后启动rabbitmq_management插件,管理工具(或sbin目录下的cmd)输入rabbitmq-plugins enable rabbitmq_management
  5. 然后输入启动命令之后就可以通过http://localhost:15672/访问管理工具了`,默认账号和密码都是 guest
  6. 在上面的admin页下添加一个账号密码都是admin的账号
  7. 新添加的admin进入,直接点set permissions 给他设置/的所有权限
  8. 然后就去pom文件里面导入依赖,去yml文件中设置配置,配置用户名密码和端口号就能连接了,5672和15672都是他的端口号,还可以设置topic的值

最终一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public boolean subtractionActivitySkuStock(Long sku, String cacheKey, Date endDateTime) {
long surplus = redisService.decr(cacheKey);
if (surplus == 0) {
// 库存消耗没了以后,发送MQ消息,更新数据库库存
eventPublisher.publish(activitySkuStockZeroMessageEvent.topic(), activitySkuStockZeroMessageEvent.buildEventMessage(sku));
return false;
} else if (surplus < 0) {
// 库存小于0,恢复为0个
redisService.setAtomicLong(cacheKey, 0);
return false;
}
// 1. 按照cacheKey decr 后的值,如 99、98、97 和 key 组成为库存锁的key进行使用。
// 2. 加锁为了兜底,如果后续有恢复库存,手动处理等【运营是人来操作,会有这种情况发放,系统要做防护】,也不会超卖。因为所有的可用库存key,都被加锁了。
// 3. 设置加锁时间为活动到期 + 延迟1天
String lockKey = cacheKey + Constants.UNDERLINE + surplus;
long expireMillis = endDateTime.getTime() - System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1);
Boolean lock = redisService.setNx(lockKey, expireMillis, TimeUnit.MILLISECONDS);
if (!lock) {
log.info("活动sku库存加锁失败 {}", lockKey);
}
return lock;
}

这个123点的加锁的流程(加锁是为了防止超卖):

  1. 首先是有一个库存总量的keystock:product_1001: -> "3"当前1001商品的库存为3
  2. 然后每次尝试扣减时,先用decr减库存,得到减库存之后的一个数量也就是2,然后通过这个数量获取这个库存槽位锁lock:preduct_1001:2 -> "任意值",再设置一个ttl
  3. 获取锁成功(设置这个key成功),就代表当前线程成功抢占了这个库存槽位,正常放行,如果失败就代表没抢到,抛出异常
  • decr 扣减库存完成后,如果为 0 则发送 MQ 消息,由消费端处理更新库存。
  • lock 是一个兜底的设计,类似于【账户和流水】一笔消费,对应一个锁的记录。这样的兜底,可以确保极端情况下,如运营误操作的时候,恢复库存为错误的情况下,有流水的锁记录,也不会超卖。
  • 另外锁的时间为到活动结束时间 + 延迟1天,锁就被删掉了。这个时候活动也完结了。

假如库存有100此时A购买了50次,库存剩50,期间对99到50都加锁,然后后台此时进行补货将库存又设置回了100,那此时B再来购买,前50次购买都会失败,直到51次购买,获取到了49的锁,这就是防止人工补货造成的超卖,虽然库存被设回 100,但实际可售的只有 新增的 50 个槽位(49~0)。

发送MQ消息

  • 如果库存扣减完为0,就发送MQ消息,由消费端处理更新库存。
    eventPublisher.publish(activitySkuStockZeroMessageEvent.topic(), activitySkuStockZeroMessageEvent.buildEventMessage(sku));
    通过工具类向这个主题发送消息
  • 然后在domain.event包下创建一个活动库存为0的消息类,加上@Componext,继承BaseEvent(用于构建事件消息和指定主题)topic用@Value(“${activity.sku.stock.zero.topic}”)指定为配置文件中的主题
  • 在trigger.listener包下创建一个这个事件的监听器作为消费者类,也是用topic指定这个主题,监听方法上加上 @RabbitListener(queuesToDeclare = @Queue(value = "activity_sku_stock_zero")),方法内部将这个消息转换为EventMessage类,然后获取数据更新库存,清空队列

编程式事务写法

1
2
3
4
5
6
7
8
9
10
11
12
transactionTemplate.execute(status -> {
try {
// 事务逻辑
orderDao.insert(order);
accountDao.update(account);
} catch (Exception e) {
log.error("失败", e);
status.setRollbackOnly() // 捕获异常时回滚事务
return -1;
}
return 1;
});

事务中的断点调试

直接在方法内部打断点,然后让恢复程序自己走到事务里
一直步进过是不进去的,因为事务是走的代理

总流程梳理

就是在完善上面的创建订单流程

  1. 首先还是先查出来活动信息,活动sku信息,活动次数信息
  2. 然后获取责任链,进入责任链校验环节,这次的责任链是固定流程,所以DefaultActivityChainFactory构造函数中直接写死了
  3. 先进行活动的库存、时间校验,然后再进行库存的校验,库存校验都成功后,就写入延迟消费队列,延迟更新库存记录
  4. 如果扣减为0会发送MQ消息,更新库存,然后将这个延迟消费队列清空,因为此时就不需要更新了,trigger.listener包下有这个事件的监听器,由它来执行这些逻辑

用户领取活动库表设计

通用的是用户id、活动id、创建时间、更新时间

  • 分库中新加入用户活动账户表:用户id、活动id、总次数、总剩余次数、日次数、日剩余次数、月次数、月剩余次数、创建时间,更新时间。记录每日和每月的参与次数
  • 然后是表:中间的次数只保留月次数、月剩余次数、和对应月份。每月一条记录
  • 然后是表:中间的次数只保留日次数、日剩余次数、和对应日期。每日一条记录
  • 用户抽奖订单表:活动名称、策略id、订单id、下单时间。每次参与抽奖生成

下面两个表应该在一个事务中

  • 用户中奖记录表:策略id、订单id、奖品id、奖品名称、中奖时间、状态。参与抽奖后获得具体奖品的记录表
  • 任务表:消息主题、主题实体、状态。用于发送MQ消息,通过任务扫描发送,兜底

领取活动扣减账户额度

actity领域下要划分多个子领域,避免activity内部平铺过多service

用户抽奖的业务流程分为;给自己的活动账户添加额度(购买、兑换、打卡),领取活动(扣减互动账户额度)、执行抽奖策略、抽奖结果落库。本节实现到领取活动部分。

在用户参与活动中,需要扣减;总额度、月额度、日额度,以及写入一笔参与活动的订单流水。

先给actity的service包下新增quota额度子领域包,然后迁移原来的的额度账户功能,原来的那个下单校验总的抽象模版类,还有责任链相关类都放在quota包下了

  • 再给actity的service包下新增partake(参与)子领域包,内部定义一个抽象接口IRaffleActivityPartakeService,内部是创建抽奖单的方法:用户参与抽奖活动,扣减活动账户库存,产生抽奖单。如果错在未被使用的抽奖单就直接返回已存在的抽奖单
  • 然后定义一个抽奖活动参与抽奖类,属于活动参与领域,主要处理用户参与抽奖活动的订单创建逻辑。(之前那个和sku有关的是账户配额领域,主要处理用户充值 SKU 并生成配额订单的逻辑。)
  • 在抽象类中定义出领取活动的订单的执行流程,包括活动的校验、查询未消费订单、账户过滤和构建对象、创建订单、组装聚合对象和保存订单。

抽奖开始流程

  1. 先进行活动参数进行校验,对账户额度进行过滤,都成功就构建一个聚合对象,包含总-月-日的账户等
  2. 然后用随机生成的订单号,创建一个状态为Create的抽奖单(上面过滤的时候会先在数据库查找有没有未被使用的抽奖单,也就是状态为Create的)
  3. set这个订单为聚合对象的属性值
  4. 用这个聚合对象进行扣减账户次数的业务逻辑,然后保存订单到数据库(全在一个事务里),每次进行抽奖都会同时扣除一次总-月-日账户里的剩余次数
  5. 返回订单信息
  6. todo发放奖品,将抽奖单状态改为成功等,是之后要做的

写入中奖记录和任务补偿发送MQ

使用 task表机制,在写入奖品记录时,一个事务下完成 task mq 消息的写入,并可以通过补偿的方式推送 MQ 消息。

  • 在domain领域层添加奖品领域、任务领域两个模块,一个处理奖品写入记录,一个处理任务扫描补偿
  • trigger层一个是任务扫描,一个是监听奖品记录后发送的MQ消息

奖品领域:

  1. 先定义一个IAwardService接口,内部有一个保存用户奖品记录方法
  2. 然后实现类实现方法,方法内部来构建消息对象、任务对象、聚合对象
  3. 最后进行保存聚合对象的逻辑:建立一条记录和task,写入记录和task(一个事务中)
  4. 发送任务消息,更新任务表中的task为完成(这个不保证成功,所以job那里还有一个任务会不断扫描库中的task表,来发MQ消息)

任务领域:

  1. 定义一个ITaskService接口,内部有查询发送失败和未发送的MQ,发送信息,更新task为完成,更新task为失败的方法
  2. 然后实现这些crud方法

任务补偿:

  1. 定义一个SendMessageTaskJob组件类,注入这上面的任务类
  2. 内部有一个定时任务,每五秒执行一次逐个扫描每个库的task表,获取失败或未发送的MQ,重新发送这些MQ消息(用线程池来多线程发送,提高效率)
  3. 只要确定MQ消息发送成功就可以将task更新成已完成,由消息队列完成后续奖品的发放
  4. 所有操作都是有唯一id来保证幂等性的,所以不怕重复发送

消息监听:

  1. 新建一个SendAwardCustomer组件类,监听spring.rabbitmq.topic.send_award这个topic
  2. todo 监听方法的内部进行奖品发放的逻辑(保存完task的时候会发送消息)

抽奖活动流程串联

api模块下新增IRaffleActivityService接口,实现活动装配和抽奖(实现类在trigger.http下)
之前的IRaffleService接口改名为IRaffleStrategyService用于区分

下面这都是Controller对外提供接口的方法了

活动装配实现:

  1. 活动装配,由活动ID发起,需要把活动id对应sku记录一起查询出来进行装配,预热到缓存
  2. 装配活动的策略,并预热到缓存,活动与策略是一对一绑定的
  3. 可以直接用之前的查询语句作为预热

抽奖流程串联:

  1. 校验参数
  2. 创建参与记录订单(扣除总-月-日次数,将参与表存储在user_raffle_order表中)
  3. 开始抽奖,执行抽奖策略。(就是调用之前抽奖策略的逻辑,获取一个奖品,校验还有库存扣减)
  4. 存放结果,写入中奖记录。(中奖记录存入user_award_record表中,然后写入任务,之后会用任务表发送mq消息,同时将上面的参与表记录状态更新为used)
  5. 返回前端一个结果

活动信息API迭代和功能完善

扩展查询奖品信息接口【queryRaffleAwardList】,以用户活动为视角进行查询增加返回信息

  • 之前抽奖时奖品的次数锁在前端应该显示还有几次抽奖才解锁,以及是否解锁,这个信息要返回给前端

  • 以活动开始的抽奖,对抽奖策略加锁key设置过期时间为活动结束时间

  • 添加Redisson序列化设置

  • 建立一个前端奖品列表响应DTO作为返回对象

  • 查询抽奖列表的Controller接口内部,在开头加上参数校验,查询奖品配置,获取规则配置,获取奖品的解锁限制,然后再遍历填充每个奖品数据,这次DTO填充要带上奖品次数规则,奖品是否解锁,等待解锁次数

  • 这次扣减库存decr后加的库存锁设置ttl为活动结束时间

  • 目前所有活动都共用一个延迟队列,之前库存清空逻辑就先不能用了

序列化处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Bean("redissonClient")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext, RedisClientConfigProperties properties) {
Config config = new Config();
// 根据需要可以设定编解码器;https://github.com/redisson/redisson/wiki/4.-%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96
config.setCodec(JsonJacksonCodec.INSTANCE); // 就是这一行设置的序列化器
config.useSingleServer() // 剩下的就是配置文件里面的内容
.setAddress("redis://" + properties.getHost() + ":" + properties.getPort())
.setPassword(properties.getPassword())
.setConnectionPoolSize(properties.getPoolSize())
.setConnectionMinimumIdleSize(properties.getMinIdleSize())
.setIdleConnectionTimeout(properties.getIdleTimeout())
.setConnectTimeout(properties.getConnectTimeout())
.setRetryAttempts(properties.getRetryAttempts())
.setRetryInterval(properties.getRetryInterval())
.setPingConnectionInterval(properties.getPingInterval())
.setKeepAlive(properties.isKeepAlive())
;
return Redisson.create(config);
}

这里设定 config.setCodec(JsonJacksonCodec.INSTANCE); 为序列化方式,否则会乱码。
哎呀我,终于能看懂redis写的是啥了,这个序列化器怎么现在才设置

用户行为返利入账

开发rebate返利领域,提供返利订单创建接口。并在写入订单后发送MQ消息,后续处理奖励入账

返利是根据用户行为触发包括:打卡、签到、支付、开户、交易、拉新等

  • 一个用户行为可能会给多种奖励,所以要根据配置组装聚合对象。聚合是为了做统一的事务
  • 一个聚合对象中包含了返利的订单实体对象,写入task的实体对象。他们是一个事务入库
  • 另外是发送MQ消息,在完成入口动作后会直接发送MQ消息,如果发送失败会有任务兜底【这就是MQ发送失败时候的处理方式】

新增加库表

big_market配置库新增一个daily_behavior_rebate表,存储日常行为返利
big_market01/02两个分库新增分表,user_behavior_rebate_order_000 ~ 004,存储所有用户返利的记录

返利领域创建订单流程

  1. 先查询返利配置
  2. 构建聚合对象(返利订单实体、task对象(MQ消息对象))
  3. 然后存储到数据库,在一个事务中,同步发送MQ消息

用户行为返利结算

接收异步MQ消息进行结算处理,并提供出日历签到接口,用于后续对接

监听消息:

trigger.listener包下新增一个RebateMessageCustomer类,监听返利信息的消息队列
有消息就取出来,转换为消息格式,然后获取数据,根据奖品类型做对应处理,sku奖励就直接入账(调用quota接口),其他类型暂不处理

账户入账(放在同一事务中):

  1. 将订单插入到活动订单表
  2. 更新总账户,如果不存在就插入一个新账户
  3. 更新月账户、日账户

规则完善和应用接口实现

api模块下新增接口:

  • IRaffleActivityService增加3个接口:签到 - calendarSignRebate、是否已签到 - isCalendarSignRebate、查询账户额度 - queryUserActivityAccount
  • IRaffleStrategyService增加一个接口:查询权重配置 - queryRaffleStrategyRuleWeight 这里需要封装下权重配置信息,方便前端渲染使用,具体可以参考代码对象结构定义
  • RuleWeightLogicChain 权重抽奖,增加查询用户在当前活动下中抽奖次数,完成权重规则的过滤。

api模块接口的实现类就是trigger模块下的Controller类,提供对外接口
实现的内容就和之前一样,就是Controller里面调用之前的service接口,然后返回结果

查询权重配置实现:

  • 查询用户抽奖总次数,在raffle_activity_account表中记录了一个用户在一个活动下的总次数,剩余次数,相减就是用户抽奖次数
  • 然后通过当前活动的策略查找权重规则配置,并按照配置中的奖品ID查询出奖品名称,Controller层里面返回的数据都是后面处理好了的RuleWeightVO数据,这个实体内部有四个属性,规则配置的字符串,解析后的积分值,一个list存储范围内所有奖品id,一个list存储奖品id和名称
  • 是否签到可以通过查询用户返利行为表中是否存在签到行为,通过userId和outBusinessNo字段(日期)查找
  • 点击签到就增加用户当前活动下的抽奖次数,更改总-月-日账户下的剩余抽奖次数

积分发奖服务实现

创建用户积分表,并开发用户抽到积分奖品后,给用户发积分的流程

主要在domain模块中的award领域下开发:

  1. service包下新加入一个分发奖品子领域IDistributeAward接口

  2. 分发积分作为实现类,用@Component("user_credit_random")修饰,之后在AwardService类中的属性private final Map<String, IDistributeAward> distributeAwardMap;,Spring会自动收集所有IDistributeAward接口的实现类,然后用bean的名字作为key将这个实现类放入map中

  3. 传入一个分发奖品实体作为参数,查询奖品id(优先走透传的随机积分奖品配置award_config,比如被黑名单过滤的时候就会被黑名单奖品配置中写死的0.01,1直接覆盖),获取这个配置然后校验

  4. 依据这个配置,在这个区间内生成随机值,然后构建聚合对象

  5. 依据聚合对象进行存储,更新奖品记录,更新用户积分

  6. 给IAwardService新开发一个分发奖品的方法

  7. 实现类中实现这个方法,接收分发奖品实体,然后获取奖品key,再用key通过map获取这个奖品的分发方式,这里就是随机积分(user_credit_random),如果没有对应服务就抛异常

  8. 调用上面子领域开发的方法,传入奖品实体,分发奖品

异步MQ消息实现奖品发放:

  1. 完善原来的SendAwardCustomer消费者类,之前只是打印了日志
  2. 这次在内部取出队列中的消息,然后取出data,构建发放奖品实体,调用分发awardService的分发奖品方法

积分领域调额服务

新增积分领域,开发积分调额接口。串联行为发奖动作,发放用户积分奖励。
分库新增用户积分订单表

接口:ICreditAdjustService(内部方法,创建积分额度订单)
实现:创建积分实体,创建账户实体,构建交易聚合对象,保存积分交易订单

行为返利中新增tradeEntity类型返利奖励处理,之前只有sku类型,现在用switch case根据奖励类型处理这两种奖励
这个表中out_business_no字段是唯一索引,插入已经存在的业务单号会出异常,通过这个保持幂等性

积分支付兑换商品

  • 首先,对 sku 商品库增加积分金额,用于积分支付,而签到兑换类则无需关注金额。
  • 之后,商品下单需要提供出交易策略;无支付交易和有支付交易。
  • 最后,下单完成则进行积分抵扣,以及接收到支付成功消息,进行充值入账

activity的子领域quota下新增policy包,负责用积分支付和无支付的策略
积分兑换:有支付,新建一个状态为待支付的聚合订单,并保存
签到返利:无支付,直接修改订单金额为0,状态为完成,并给用户账户充值

然后activity父领域领域的接口新增订单出货的方法,更新总-月-日账户

用积分支付,保存用户积分订单时,到了Repository层,编程式事务完成保存账户订单,并写入task表后
发送MQ消息,然后更新task任务表位完成,代表发货成功

新增一个积分调整成功消息监听类,收到消息后进行异步发货,发货就是新建一个deliverOrder实体,然后调用Quota领域的updateOrder方法

回顾这几个分库表

都包含用户id、活动id、订单id

分表:

  • raffle_activity_order:抽奖活动订单,主要包含商品sku、总-月-日次数、支付金额(积分)、订单状态(待支付/已完成)、业务防重id(外部透传)
  • user_award_record:抽奖订单,主要包含、奖品id、奖品发奖状态(创建/发奖)
  • user_raffle_order:用户抽奖订单,主要包含订单状态(创建/已使用)
  • user_credit_order:用户积分订单,主要包含交易名称(行为返利/抽奖兑换)、交易类型(加/减)、交易金额(积分数量)、业务防重id
  • user_behavior_rebate_order:用户行为返利订单,主要包含行为类型(sign)、返利类型(sku商品/Integer积分)、返利配置(和返利配型搭配,sku值或积分值)、业务防重id、业务id(拼接的唯一值)

单一表:

  • raffle_activity_account:抽奖活动账户,用户id、活动id、总-月-日次数、总-月-日剩余次数
  • task:任务表,消息主题、消息编号、消息主题(JSON)、任务状态(create/completed)
  • user_credit_account:用户积分账户,用户id、总积分、可用积分、账户状态

抽奖流程

  • 阶段一:获取资格 (充值/签到/兑换)
    • 动作:用户购买SKU或完成签到。
    • 涉及表:raffle_activity_order
    • 状态流转:
      1. 创建记录,状态为 completed (或根据业务定义为已入账)。
      2. 联动更新:raffle_activity_account (账户表) 中的 剩余次数 +N。
    • 结果:用户拥有了抽奖次数,但尚未产生具体的抽奖流水。
  • 阶段二:发起抽奖 (扣减次数)
    • 动作:用户点击页面上的“抽奖”按钮。
    • 涉及表:user_raffle_order
    • 前置校验:检查 raffle_activity_account 剩余次数 > 0。
    • 事务操作:
      1. 扣减账户:raffle_activity_account 剩余次数 -1。
      2. 创建参与单:插入 user_raffle_order。
      • order_id: 生成唯一单号。
      • status: create (表示已扣费,等待开奖结果)。
    • 结果:次数已扣,生成了一个状态为 create 的待处理抽奖单。
  • 阶段三:执行策略与落库 (开奖)
    • 动作:后端执行抽奖策略(黑名单/权重/随机/规则树)。
    • 涉及表:user_raffle_order, user_award_record (中奖记录), task (消息任务)
    • 事务操作:
      1. 更新参与单:将 user_raffle_order 状态由 create 改为 used (表示该次机会已消耗完毕)。
      2. 写入中奖记录:插入 user_award_record (记录奖品ID),状态通常为 create (待发奖)。
      3. 写入任务表:插入 task 表,用于异步发送MQ消息进行发奖。
    • 结果:抽奖流程结束,用户获得奖品记录,等待异步发奖。
  • 阶段四:异步发奖 (后续)
    • 动作:定时任务扫描 task 表 -> 发送MQ -> 消费者监听。
    • 涉及表:user_award_record, user_credit_account (若是积分) 等。
    • 状态流转:
      发奖成功后,更新 user_award_record 状态为 completed (已发奖)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
graph TD
A[用户充值/签到] -->|创建 | B(raffle_activity_order: completed)
B -->|更新 | C[raffle_activity_account: 次数+N]

C -->|用户点击抽奖 | D{次数>0?}
D -- 是 --> E[raffle_activity_account: 次数-1]
E -->|创建 | F(user_raffle_order: create)

F -->|执行抽奖策略 | G[计算奖品]
G -->|事务提交 | H[user_raffle_order: used]
G -->|事务提交 | I[user_award_record: create]
G -->|事务提交 | J[task: create]

J -->|定时任务/MQ | K[执行发奖逻辑]
K -->|更新 | L[user_award_record: completed]

关键点提示

  • 幂等性:user_raffle_order 的创建通常依赖唯一的 order_id 或业务防重ID,防止重复扣减次数。
  • 事务边界:user_raffle_order 的创建(create)与 状态更新(used)以及 user_award_record 的写入必须在同一个本地事务中,确保“扣了次数必有结果”。
  • 最终一致性:发奖环节(user_award_record 变为 completed)通过 MQ 和 task 表保证最终一致性,不阻塞主抽奖流程

积分应用场景接口实现

以支持积分使用「商品、账户、兑换」对接前端诉求,在本节开发所需的接口。

  • 查询sku商品:

activity领域下新建一个skuProduct接口,内部方法为查询当前活动下创建的sku商品
新建一个子领域product,内部新建一个实现类实现这个接口

  • 创建、返回未支付订单:

activity领域quota接口新增一个方法,创建sku账户充值订单,给用户增加抽奖次数
实现类用map注入ITradePolicy不同交易策略的实现,然后实现新增接口方法
先参数校验,然后查询未支付订单,查询基础信息(sku、活动、次数)、活动动作规则校验、构建订单聚合对象,调用交易策略,返回订单信息

  • 串联功能,实现控制器接口:
    • creditPayExchangeSku积分兑换商品:
      创建兑换商品sku订单,返回订单号和业务单号
      调用creditAdjustService的创建订单,实现支付兑换商品功能,就是把上面创建完的订单记录状态更新为完成,同时写入任务表,返回orderId
    • querySkuProductListByActivityId商品查询:
      参数校验,查询商品&封装数据
    • queryUserCreditAccount查询积分数据

引入Nacos+Dubbo框架

引入分布式技术栈框架 Nacos + Dubbo,用于微服务间调用

注册中心Nacos:

  • 注册服务:多个大营销系统实例注册为服务
  • 获取服务(服务发现):博客应用、点评应用获取服务
  • 调用服务:各种引用获取完服务后,通过RPC调用服务(Dubbo作为RPC框架)

nacos使用

  1. 先去下载nacos应用nacos官网

  2. 给nacos配置三个鉴权属性,主端口和控制台端口,还有一个专门的数据库写在安装的配置文件夹中

  3. 默认是集群模式,右键那个启动cmd类,修改成下面这样

    1
    2
    rem set MODE="cluster"
    set MODE="standalone"
  4. 之后浏览器访问控制台端口就能看到nacos页面http://127.0.0.1:8847/,默认用户名密码都是nacos

  5. 启动java应用后,登录进控制台就能看到所有注册好的服务

框架引入

  1. 引入Nacos和Dubbo依赖
  2. yml文件中加入dubbo配置,指定注册中心为nacos,ip还是设为127.0.0.1
  3. 启动类上加上@EnableDubbo注解
  4. 为所有接口出入参对象加上Serializable序列化标识,这是Dubbo需要的(就是Controller的出入参)
  5. 为接口添加注解 @DubboService(version = “1.0”) 注解,这样就会被 Dubbo 管理了。

服务使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 这个属性就会变成dubbo生成的代理对象,就像注入本地接口实现类一样
@DubboReference(interfaceClass = IRaffleActivityService.class, version = "1.0")
private IRaffleActivityService raffleActivityService;

public void test_rpc() {
ActivityDrawRequestDTO request = new ActivityDrawRequestDTO();
request.setActivityId(100301L);
request.setUserId("xiaofuge");
// 调用这个远程服务的代理对象
Response<ActivityDrawResponseDTO> response = raffleActivityService.draw(request);

log.info("请求参数:{}", JSON.toJSONString(request));
log.info("测试结果:{}", JSON.toJSONString(response));
}

分布式动态配置动态降级

基于Zookeeper实现分布式动态配置中心服务,用于分布式应用节点系统中的环境属性值变更。这样我们可以让所有分布式系统中,类下的属性值做动态的调整,及时的对系统进行;切量、熔断、降级、黑白名单等用途。

Zookeeper作用(配置中心)

  1. 配置存储,集中存储所有配置,不用一个应用一个yml文件了
  2. 配置监听,实时感知配置变化
  3. 配置同步,保证所有节点配置一致
  4. 高可用,集群配置保证服务不中断

ZooKeeper 是这个配置中心的底层存储和通知引擎,配置中心是建立在 ZK 之上的应用层抽象,让开发者通过 @DCCValue 注解就能实现配置的动态注入和实时刷新
实现这个配置的动态调整就是下面Zookeeper引入在做的事

Zookeeper使用

windos上Zookeeper安装
跟着配置就行,windos配置要改的点还挺多
配置完之后用命令就能启动
服务端命令:zkServer
客户端命令:zkCli

Zookeeper引入

  1. 加依赖

  2. 建立配置类

  3. 添加yml配置,还是配置成本地,等以后用docker再用远程ip

  4. 自定义注解,目的是用于对需要变更字段属性的标记作用,所有使用了注解的字段,都会被动态配置管理。
    DCC = Dynamic Configuration Center(动态配置中心)
    这是基于 ZooKeeper 实现的项目级配置中心。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Retention(RetentionPolicy.RUNTIME)// 运行时保留
    @Target({ElementType.FIELD})// 注解使用在字段上
    @Documented// 注解会被包含在文档中
    public @interface DCCValue {

    // 一个value属性,默认值为空字符串
    // 虽然有()但是是字段,不是方法,这是注解的规范,为什么这么写可以问ai
    String value() default "";
    }

    特殊规则:当注解只有一个 value 属性时,可以省略属性名。

    1
    2
    3
    4
    5
    6
    // 以下两种写法等价
    @DCCValue("username")
    @DCCValue(value = "username")

    // 使用默认值
    @DCCValue // 相当于 @DCCValue("")

    使用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class User {
    @DCCValue("user_name")
    private String name;

    @DCCValue // 使用默认值
    private String email;
    }

    // 通过反射获取注解属性
    Class<?> clazz = obj.getClass();
    Field[] fields = clazz.getDeclaredFields();

    for (Field field : fields) {
    if (field.isAnnotationPresent(DCCValue.class)) {
    DCCValue annotation = field.getAnnotation(DCCValue.class);
    String configKey = annotation.value();

    System.out.println("字段: " + field.getName());
    System.out.println("配置键: " + configKey);
    System.out.println("---");
    }
    }
  5. 监听配置类

    • 新建一个DCCValueBeanFactory配置处理工厂类,实现了 BeanPostProcessor 接口,重写了postProcessAfterInitialization方法,对所有bean实例都进行后置处理,反射查看这个bean是否有DCC字段
    • DCCValueBeanFactory 的构造函数获取 Zookeeper 连接对象,并判断所需要监听的节点是否存在「不存在则创建」。之后是对节点的监听,当有值变化时会对对应的属性复制。
    • postProcessAfterInitialization 的用途是获取扫描的 Bean 对象类,对这些类的属性判断是否存在添加了自定义注解的属性,存在则进行管理。这里面有一些小的逻辑,当一个值已经存在于配置中心,则在启动的时候把配置中心中的值赋值给应用中类的属性。另外如果是首次启动,则会把自定义注解中默认配置的值,赋值给属性。如 @DCCValue(“degradeSwitch:open”) 赋值 open 给属性。
  6. 配置使用
    在 RaffleActivityController 类中,添加一个属性用于做抽奖的降级处理。
    degradeSwitch 降级开关,默认配置了一个 open 值。之后在应用方法中判断这个值是否做降级处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @DCCValue("degradeSwitch:open")
    private String degradeSwitch;

    @RequestMapping(value = "draw", method = RequestMethod.POST)
    @Override
    public Response<ActivityDrawResponseDTO> draw(@RequestBody ActivityDrawRequestDTO request) {
    try {
    log.info("活动抽奖开始 userId:{} activityId:{}", request.getUserId(), request.getActivityId());
    // 判断开关是否是open
    if (!"open".equals(degradeSwitch)) {
    return Response.<ActivityDrawResponseDTO>builder()
    .code(ResponseCode.DEGRADE_SWITCH.getCode())
    .info(ResponseCode.DEGRADE_SWITCH.getInfo())
    .build();
    }

    // ...

    }
  7. 动态变更
    增加DCCController动态配置类,传入key和value来更改配置中心中的值,这样就能可以通过curl命令动态配置类的值
    像是这样的一个配置,在公司里会有单独的专门的服务和方法,就不用每个系统里提供了。
    在变更操作中,会进行节点的判断和创建,也可以判断不存在则直接返回即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    # 访问接口 - 不降级
    curl --request POST \
    --url http://localhost:8091/api/v1/raffle/activity/draw \
    --header 'content-type: application/json' \
    --data '{
    "userId":"xiaofuge",
    "activityId": 100301
    }'

    # 动态设置开关的value为close
    curl "http://localhost:8091/api/v1/raffle/dcc/update_config?key=degradeSwitch&value=close"

    # 访问接口 - 已降级
    # 和上面的访问是同一个
    # 这个curl在powerShell叫Invoke-WebRequest,在cmd中叫curl
    Invoke-WebRequest -Uri "http://localhost:8091/api/v1/raffle/activity/draw" `
    -Method POST `
    -ContentType "application/json" `
    -Body '{"userId":"xiaofuge","activityId":100301}'

我理解的降级流程

  1. 那个工厂配置类是可以往zk中写入配置的,他会监听所有bean创建,如果这个bean的字段有ddc注解,就对这个字段进行反射赋值,zk中不存在就把这个注解的属性按默认值写到zk中,存在就读取zk中的值。
  2. 然后工厂配置类还会将这个bean加入到一个dccObjGroup(map),key为zk路径,然后创建一个CuratorCache缓存监听器监听zk路径,每当有修改,就同步反射修改map中所有bean的字段
  3. 抽奖类新增一个降级开关字段,读取zk中的值,执行抽奖的时候会先根据这个字段的值来决定是否降级
  4. 动态调整,可以新建一个DCCController,内部写一个更新的接口,之后就用curl命令动态将zk中开关字段的值改为close
  5. 目前抽奖如果降级直接返回一个默认值,不执行抽奖逻辑

分布式动态限流和熔断

增加动态黑名单限流组件,通过访问动态限制将限流用户24H存入本地缓存,通过统一 DCC(动态配置中心) 全局配置控制使用。并在方法上引入接口超时熔断组件。

  • 首先,因为我们引入切面了,那么上一节 @DCC 直接获取类操作属性的梳理就要考虑代理类的存在了。因为这时候被切面管理的类,在 Spring 中是一个代理对象,而不是原始对象,所以本节还涉及了 DCC 代码的处理。
  • 之后,本节要增加的是 RateLimiter 限流,当一个用户频繁访问超过N次后,则会将这个用户加入黑名单列表,不允许在访问当前服务。直至过了超时时间从黑名单列表移走后才允许访问。
  • 另外,本节还引入了接口超时熔断组件。降级、熔断、限流,这也是一套分布式微服务非常重要的手段。

黑名单限流功能实现

  1. 引入hystrix包,做超时熔断处理

  2. types模块annotations包下新建一个自定义注解类RateLimiterAccessInterceptor,运行时方法注解
    拥有4个属性:拦截标识、限制频次、黑名单拦截、拦截后执行方法

  3. app模块下新增aop包,新增RateLimiterAOP切面类,拥有限流开关属性@DCCValue("rateLimiterSwitch:close")
    这个类是切面入口,管理所有被添加了@RateLimiterAccessInterceptor注解的方法,被AOP管理的类会成为代理类
    因为是代理类,所以要操作属性的值时,就需要获得原始类的TargeClass。这部分在DCCValueBeanFactory类中做了处理

    • 首先,rateLimiterSwitch 是通过 DCC 控制的限流开关,因为实际生产中并不是所有时候都需要开启限流,一般在大促、活动等场景时候才会启动。所以可以用开关控制。
    • 限流的过程为检测单个用户访问频次是否达到限流配置值,达到后则进行限流黑名单记录,记录方式使用的是 Guava 本地缓存。
    • fallbackMethodResult 则是被限流拦截后,则通过 fallbackMethodResult 返回配置的拦截结果值。
  4. 指定代理类,启动类Application中开启代理操作@EnableAspectJAutoProxy(proxyTargetClass = true),启动Spring AOP的自动代理功能,并指定使用CGLIB来生成代理类,CGLIB是基于继承实现的代理,没有实现接口的类也能代理,比JDK代理更全面

  5. 配置使用
    抽奖接口上面配置注解
    第一个注解是自定义的限流注解,

    1
    2
    3
    4
    5
    @RateLimiterAccessInterceptor(key = "userId", fallbackMethod = "drawRateLimiterError", permitsPerSecond = 1.0d, blacklistCount = 1)// 限流配置,以用户ID为key,1秒内最多访问1次,超过则加入黑名单,调用drawRateLimiterError进行限流拦截
    @HystrixCommand(commandProperties = {
    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "150")
    }, fallbackMethod = "drawHystrixError"
    )// 熔断配置,设置150ms超时时间,超时或异常时跳过原方法,改为调用drawHystrixError方法进行熔断拦截
    • 限流配置 RateLimiterAccessInterceptor

      • key: 以用户ID作为拦截,这个用户访问次数限制
        • fallbackMethod:失败后的回调方法,方法出入参保持一样
        • permitsPerSecond:每秒的访问频次限制
        • blacklistCount:超过多少次都被限制了,还访问的,扔到黑名单里24小时
    • 熔断配置 HystrixCommand

    • 这些参数值你可以自行调整验证,还可以给其他方法添加。也可以自定义一个方法,时候里面写修改5000毫秒,之后验证超时熔断。

我理解的限流流程

  1. 首先这个黑名单限流也有自己的开关rateLimiterSwitch@DCCValue("rateLimiterSwitch:close")开启后才会进行黑名单判断
  2. 然后在Controller接口上面加上自定义注解@RateLimiterAccessInterceptor指定限流参数
  3. 当第一次访问这个接口的时候,方法被aop代理,先进入到aop切面类进行限流判断
    1. 首先判断限流开关,关闭则直接放行进行原方法,开启才继续逻辑
    2. 获取限流标识,从注解获取key字段(userId),检查这个userId是否在黑名单中(这个类有两个Guava本地缓存,一个1分钟存RateLimiter,一个24小时存userId,分布式可以存在redis)
    3. 检查这个userId在24h黑名单中的拦截次数,超过设定阈值则执行注解中的降级方法,没有就继续
    4. 在一分钟缓存中获取RateLimiter(Guava库用令牌桶算法实现的限流器,用于流量控制),先在本地缓存中获取,没有就依据注解配置创建一个并存入本地缓存
    5. 尝试用RateLimiter获取令牌,有令牌就消耗令牌并放行,没令牌就拦截
    6. 拦截后处理,24小时黑名单的计数+1,执行注解中的降级方法

RateLimiter原理:

1
2
3
4
5
6
7
8
令牌桶算法
├── 核心机制
│ ├── 令牌以固定速率添加(如每秒1个)
│ └── 最多存储N个令牌(桶容量)
└── 执行流程
├── 请求到来时尝试获取令牌
├── 有令牌:允许执行,消耗令牌
└── 无令牌:拒绝请求,触发降级

测试命令:

1
2
3
4
5
6
7
# 开启限流开关,记得先关闭上一节的degradeSwitch开关
curl.exe --request GET --url "http://localhost:8091/api/v1/raffle/dcc/update_config?key=rateLimiterSwitch&value=open"
# 测试限流,一秒内执行两次就会触发限流
curl.exe --request POST `
--url http://localhost:8091/api/v1/raffle/activity/draw `
--header 'content-type: application/json' `
--data '{\"userId\":\"xiaofuge\",\"activityId\": 100301}'

分库分表数据同步ES

分库分表后的数据都已经散列到各个库表了,对于聚合查询就变得复杂,所以我们还需要另外一套方案。就是把分散在各个数据库表的数据,通过使用canal组件,基于 binlog 日志,把数据同步到ElasticSearch文件服务中再提供使用。

ES的安装使用

  1. 先下载ES解压,bin里面的bat直接启动ES下载地址

  2. 安装ES可视化工具并解压,然后在解压目录打开cmd,运行npm installnpm run start启动界面(install的时候可能要挂魔法)

  3. 修改es的conf目录下的yml文件,增加跨域选项

    1
    2
    http.cors.enabled: true
    http.cors.allow-origin: "*"

    es是一个java进程,默认占16gb内存,可以去es的jvm.options文件修改参数

  4. 设置es的密码

    1. 先启动es
    2. 然后访问es的bin目录,执行命令,随机生成密码.\elasticsearch-reset-password.bat -u elastic -a或者指定输入密码.\elasticsearch-reset-password.bat -u elastic -i
    3. 测试es密码连接curl.exe -k -u elastic:123456 https://localhost:9201
    4. 之后记住这个密码,之后kibana会用到
  5. start之后就能通过网址访问了http://localhost:9100/,里面连接连es的端口,我设置为9201

kibana安装

这个是ES官方的可视化工具,下载地址

kibana的yml中配置es的账号密码,连接地址(https)

1
2
3
4
5
6
7
# ================== Elasticsearch 连接 ==================
# ⚠️ 注意是 HTTPS
elasticsearch.hosts: ["https://localhost:9201"]

# ================== 认证信息 ==================
elasticsearch.username: "elastic"
elasticsearch.password: "123456"

下载解压完后,在bin目录下的bat文件启动,默认端口是5601
浏览器访问http://localhost:5601/
汉化:yml文件里面修改为i18n.locale: zh-CN

es和kibana的密码和tls

我不明白了,调了半天,kibana就是连不上es,我用es命令测试也都是可以的curl -k -u elastic:123456 http://127.0.0.1:9201
这个kibana到底是怎么回事啊,不就设个账号密码的事吗,怎么这么费劲,我还是先用

卧槽,好像是不能用默认用户elastic,用这个就闪退

  1. 先启动es
  2. 还有一个专门的用户kibana_system,我修改他的密码curl.exe -X POST "http://localhost:9201/_security/user/kibana_system/_password" -H "Content-Type: application/json" -u elastic:123456 -d "{\"password\":\"123456\"}"
  3. 成功之后会返回一个{},用elastic最高权限用户去查询是否设置成功curl.exe -X GET "http://localhost:9201/_security/user/kibana_system" -u elastic:123456
  4. 设置kibana的yml文件,添加这个用户名和密码,然后启动访问http://localhost:5601/,然后用elastic用户登录

最后的yml文件

es.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 绑定到本地回环地址
network.host: 127.0.0.1

# HTTP 端口
http.port: 9201

xpack.security.enabled: true

# 如果启用了TLS需要禁用
xpack.security.http.ssl.enabled: false
xpack.security.transport.ssl.enabled: false

# 其他设置
cluster.initial_master_nodes: ["SOLAIRE"]
http.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"

kibana.yml

1
2
3
4
5
elasticsearch.hosts: ["http://localhost:9201"]
i18n.locale: "zh-CN"

elasticsearch.username: "kibana_system"
elasticsearch.password: "123456"

Canal安装使用

canal使用博客
mysql 数据同步需要创建一个 canal 的账户,之后还需要开启 binlog 日志
canal下载地址

  • canal-admin:canal控制台,可以统一管理canal服务
  • canal-deployer:也是canal-server:canal的一个节点服务
  • canal-instance:canal-server中的一个处理实例,可以处理不同的业务逻辑
  • canal-adaper:canal适配器,canal 1.1.1之后,提供了适配器功能,可将canal server的数据直接输出到目的地,不需要用户编写客户端(个性化需求还需要用户编写客户端实现)

canal-admin配置

  1. 开启MySQL的binlog功能,用这个命令检验show variables like 'log_bin';
    MySQL的my.ini文件下增加这些参数,然后重启mysql

    1
    2
    3
    4
    5
    [mysql]
    no-beep
    log-bin=mysql-bin #添加这一行就 ok
    binlog-format=ROW #选择 row 模式
    server_id=1 #不能和canal的slaveId重复
  2. MySQL新建一个canal用户,然后赋予权限

    1
    2
    3
    CREATE USER 'canal' IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
  3. 解压canal.admin.tar.gz文件

  4. canal.admin\conf里面的sql文件建立一个canal_manager数据库,还要canal用户赋予权限

  5. 修改配置文件conf/application.yml文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
     spring.datasource:
    address: 127.0.0.1:3306
    database: canal_manager
    username: canal
    password: canal
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai


    #canal admin账号和密码
    canal:
    adminUser: admin
    adminPasswd: 123456
  6. 新建一个集群节点,zk地址为127.0.0.1:2181

canal-deployer配置

  1. 下载解压deployer

  2. 在conf目录下,备份canal.properties,将canal_local.properties重命名为canal.properties。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    # 当前节点ip
    canal.register.ip = 127.0.0.1

    # admin的地址和账号密码
    canal.admin.manager = 127.0.0.1:8089
    canal.admin.port = 11110
    canal.admin.user = admin
    # admin密码,使用了mysql的password加密后的密码,与admin的conf/applicaiton.yml中设置的密码对应
    canal.admin.passwd = 123456

    # 开启自动注册模式
    canal.admin.register.auto = true

    # 指定注册的集群名
    canal.admin.register.cluster = cluster-one

    # 注册到admin上的服务名,默认为当前ip
    canal.admin.register.name = server-01
  3. 替换mysql connector
    canal-admin中默认提供的驱动器是mysql5.0的,因此要替换一下(canal-deployer解压目录的lib文件夹)Connector下载地址选Platform Independent

  4. startup.bat启动,如果发现双击之后界面一闪而过,并没有保留在屏幕上。这时候你需要编辑这个bat文件,在最后一行回车输入:pause

将分库分表同步到ES

同步raffle_activity_order表和user_raffle_order表

ES-ORM多数据源配置使用

接下来我们就需要让应用程序可以从 ES 查询数据。也就是如何处理一个应用中多数据源的使用,同时要简化使用。

多数据源配置流程

  1. 更新DB-Router版本,创建MySQL数据源@Bean("mysqlDataSource")

  2. 引入ES插件,yml中配置es数据源

  3. mapper中分为两个文件夹,一个文件夹是mysql,一个文件夹是es,为两套查询对应的Mybatis配置对应数据源,需要在config类内部配置dataSource和SqlSessionFactory的bean

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Configuration
    public class DataSourceConfig {

    @Configuration
    @MapperScan(basePackages = "cn.bugstack.infrastructure.elasticsearch", sqlSessionFactoryRef = "elasticsearchSqlSessionFactory")
    static class ElasticsearchMyBatisConfig {

    @Bean("elasticsearchDataSource")
    @ConfigurationProperties(prefix = "spring.elasticsearch.datasource")
    public DataSource igniteDataSource(Environment environment) {
    return new EsDataSource();
    }

    @Bean("elasticsearchSqlSessionFactory")
    public SqlSessionFactory elasticsearchSqlSessionFactory(@Qualifier("elasticsearchDataSource") DataSource elasticsearchDataSource) throws Exception {
    SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
    factoryBean.setDataSource(elasticsearchDataSource);
    factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/elasticsearch/*.xml"));
    return factoryBean.getObject();
    }
    }

    @Configuration
    @MapperScan(basePackages = "cn.bugstack.infrastructure.dao", sqlSessionFactoryRef = "mysqlSqlSessionFactory")
    static class MysqlMyBatisConfig {

    @Bean("mysqlSqlSessionFactory")
    public SqlSessionFactory mysqlSqlSessionFactory(DataSource mysqlDataSource, Interceptor dbRouterDynamicMybatisPlugin) throws Exception {
    SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
    factoryBean.setDataSource(mysqlDataSource);
    factoryBean.setPlugins(dbRouterDynamicMybatisPlugin);
    factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/mysql/*.xml"));
    return factoryBean.getObject();
    }
    }
    }

xxl-job分布式任务调度

增加 xxl-job 分布式任务调度服务,处理大营销中;发送MQ消息任务队列、更新活动sku库存任务、更新奖品库存任务定时任务。

大营销是分布式系统,所以之前的定时任务如果每个节点都执行就会重复,这时候就要加抢占式锁,避免重复

通过xxl-job管理分布式应用任务:
XXL-JOB 是一个轻量级分布式任务调度平台

XXL-JOB安装使用

  1. docker拉取镜像
  2. 准备一套MySQL库,idea里面配置docker里面的Mysql连接,端口是13306本机是3306,然后执行sql文件
  3. pom文件引入配置,根目录 pom、app 层 pom、trigger 层 pom 都需要引入下 xxl-job
  4. 配置yml文件,启动docker后就能访问了http://127.0.0.1:9090/xxl-job-admin,账号密码admin/123456
  5. 原先发送MQ的job类将@Scheduled(cron = "0/5 * * * * ?")任务更换为 @XxlJob("SendMessageTaskJob_DB")任务
  6. 然后在执行逻辑前加分布式锁,分布式应用N台机器部署互备,任务调度会有N个同时执行,那么这里需要增加抢占机制,谁抢占到谁就执行。完毕后,下一轮继续抢占。
  7. 更新活动sku库存任务和更新奖品库存任务也按照上面处理
  • 之前是每个节点都有个5秒任务,现在交给了xxl-job实现任务标度,调度中心根据路由策略每5秒挑选一个节点执行任务,然后为了避免可能的重复(网络超时、重试等)加上分布式锁
  • 执行任务的间隔(cron也在xxl-job的管理页面调整)

增强抽奖算法策略

  • 算法1:就是原来的算法,将所有概率缓存在map,然后用随机数get直接获取抽到的奖品,时间复杂度O(1)
  • 算法2:对于概率所占格子非常大的时候,可以用Map<Map<Integer,Integer>,Integer>缓存奖品的区间,每次查找的时候生成随机数二分查找,然后获取商品,时间复杂度O(logn)

抽奖算法策略实现

  • 新增strategy.service.armory.algorithm包(抽奖算法)
  • 内部新建一个抽奖算法类,一个接口,一个impl包,impl包内为O1算法和OLogN算法
  • 装配抽象类先查出来策略之后调用装配抽象方法
  • 装配实现类实现抽象方法,用map注入接收算法接口,装配时先计算rateRange(概率份数),如果小于设定值就用O1算法装配,否则用OLogN算法
  • 抽奖算法实现类实现装配方法和调用算法方法
    • O1装配就是和之前一样,先填充一个ArrayList,然后乱序,存到Map结构,存到redis。调度算法就是获取随机数,然后从redis中直接get奖品id
    • OLogN装配时新建一个Map<Map<Integer, Integer>, Integer>key就是概率区间,value还是奖品id,存到redis,调用算法是先获取map的大小,如果小于等于8就挨个比较范围,如果(8,16]就用二分查找,如果大于16就多线程二分查找。查找就是先生成一个随机数,然后逐个比较区间,符合就返回奖品id

后台运营页面

就是建立一个前端页面可以查询需要的信息,方便管理

后台管理-前端页面

前端用Ant Design Pro脚手架直接创建项目

  1. 先安装脚手架
    npm i @ant-design/pro-cli -g
  2. 然后初始化项目,用simple模式
    pro create big-market-erp
  3. 然后安装需要依赖
    cd big-market-erp && npm install
  • 搭建完之后就写代码,react也是面对对象的,先定义展示的数据类型,类似PO
  • 可以然后先mock一些初始数据用来展示
  • 然后配置页面路由:path和component对应
  • 都准备好了就开发页面,接收数据并展示

访问地址http://localhost:8000/user_raffle_order_list

后台管理-后端接口

trigger层的http包新建一个ErpOperateController类对接前端的查询接口,这个查询接口调用的是mapper包下的es查询语句,因为之前配置了esDataRsource,所以es.xml里面的sql查询语句能直接转换成es的DSL语句
在前端services包下的api.ts文件调用后端接口,然后在所需页面引入

RPC接口对接支付返利

用DubboRPC通信来提供当前大营销的返利Service的RPC接口流程:

  1. 先在api模块里面建立一个rpc接口IRebateService,内部是一个返利方法
  2. 在trigger模块中rpc包下新建一个类实现这个接口,类上加@DubboService
  3. 类内部会注入一个Map<String, String> appTokenMap属性,这个是事先在app模块的spring-config-token.xml里面配置的,相当于一个白名单
  4. 类内部实现返利方法,先将入参里面的信息和白名单比较合法就继续,调用返利的Service,最后返回一个响应
  5. 其他微服务组件用@DubboReference注入这个rpc接口,调用它内部的方法就能实现通信

活动上架发布预热对接

后台管理页面增加活动上架服务,通过运营后台管理上架,前端工程根据渠道sc值查询上架活动。

  1. 新增加活动上架表:raffle_activity_stage
  2. 新增查询上架列表的crud接口,查询es,查出来所用活动给运营人员进行审批
  3. 新增上架活动接口,更新上架活动为生效,运营人员确认后将活动更新为生效
  4. 新增查询有效活动接口
  5. 前端新增一个上架页面,展示所有上架活动

总结

抽奖流程梳理

  1. 先调用功能活动领域的参与活动方法,扣减用户账户抽奖次数,然后生成一个用户参与单
  2. 然后进行抽奖逻辑,获得奖品ID,将用户参与单改为已使用,新增未发奖中奖记录,中奖奖品写入task。之后发送MQ消息
  3. 发送奖品消费者类接受MQ消息来发奖,调用奖品领域的发奖方法,将奖品记录的状态改为已完成

防超卖

使用decr + setnx的无锁化减库存,无竞争

  • 正常只需要decr这个原子操作就能做到,decr后返回值小于0就返回false,然后设置回0
  • 在这基础上在加上一个库存锁,加锁为了兜底,如果后续有暴力重置库存数等,也不会超卖。因为所有的可用库存key,都被加锁了。
    1. 用每次decr后的返回值建一个锁代表当前这个库存数量,给他用setnx加锁,只有获取成功才能继续执行
    2. 之后如果库存消耗完了,有人直接将库存数量暴力重设为100,decr会进行,但是setnx会失败,因为之前的用户已经把99到0的库存位都加了锁

【为什么要加库存锁】

因为在分布式系统中,“库存数量”只是一个缓存值,而“锁”代表了已发生的交易事实。

  • 库存数量 (Value):易变的,可以被任何人修改,甚至被网络抖动、主从切换导致的数据不一致所影响。
  • 库存位锁 (Lock Keys):是交易凭证。每一个 Lock Key 的存在,都代表着“这个位置的库存已经被合法消费了”。
  • 归根结底就是保证每个库存只能被消费一次

【数据库更新】

  • decr成功并且获取到库存锁,将当前库存写入延迟队列(减缓扣减速率),sku库存job类会定期获取延迟队列队首元素(就是先扣减的库存数),按这个更新数据库内库存数
  • decr后库存为0,会向发送一个sku库存为0的MQ消息,然后监听库存为0的消费者job类执行清空redis延迟队列和将数据库sku库存设置为0的逻辑
  • T+1日数据校验:第二天的库存是可以根据第一天的库存数和订单数进行计算的

【恢复库存】

  • 目前记录库存数,每个库存数量加锁,如果恢复库存原来的锁都在,其实加的库存就没用还是消费不了(但一般也就在库存为0的时候恢复库存,这时候就可以把所有库存key清除,再添加库存)
  • 库存扣减的方式改成incr然后将返回值和总量相比,判断库存是否空,这样恢复库存的时候直接加总量,很方便,之前锁的key也都放在之前不用管

【如果要获取实时数据怎么办】

  • 当前数据库更新是用定时任务和延迟队列实现最终一致性,有短期不一致
  • 所以要获取最新数据,就同时查缓存和数据库就行

【抽奖扣减了,但是写入订单失败怎么办】

  • 抽奖的时候会先生成一个抽奖单,然后开始抽奖,抽完了标记为已使用并同步写入订单记录
  • 如果抽奖失败,但是Redis库存扣减成功并写入Redis延迟队列,更新数据库次数,下一次抽奖就可以重新用之前的抽奖单,保证抽奖次数不多用
  • 本质是redis和mysql没办法放在一个事务里,所以用抽奖单来确保中奖

MQ消息

【怎么保证发送的发奖消息不丢失,一定被消费?】

配置文件中启动手动ack模式,然后在消费者类完成完发奖的逻辑后,手动channel.basicAck(deliveryTag, false);,MQ收到ack后才会删除消息,就能保证消息一定被消费一次。
用try catch包裹,出异常时先从消息头获取重试次数,小于3回就重新发送,大于就进入死信队列,由后续人工干预

跨域攻击

  • 域 (Domain / Host):通常指 URL 中的主机名部分,例如 api.bank.com 或 bank.com。
  • 源 (Origin):由 协议 + 域名 + 端口 三部分组成。只有这三者完全一致,才算“同源

【CSRF】跨站请求伪造

  • 假设用户登录了bank.com,浏览器存储了用户的cookie,这个应该是绝密的
  • 但是用户登录了一个恶意网站evil.com,这里面假设图片里面有一个隐藏代码,加载的时候就自动用bank.com的域名发送请求,浏览器就根据请求的域名会自动赋上他的cookie,cookie就被盗走了
1
2
3
4
5
6
<!-- 黑客在 evil.com 埋下的代码,直接用用户的cookie去给自己转钱 -->
<img
src="https://bank.com/transfer?to=黑客账户&money=10000"
width="0"
height="0"
/>

为什么叫跨域?
因为请求是从 evil.com(恶意域)发起的,但作用在了 bank.com(目标域)上。浏览器默认允许这种“写操作”(如加载图片、提交表单),这就是 CSRF 利用的漏洞。

【CORS配置错误】(跨域数据读取)

正常情况:
如果evil.com写了一段 JS 代码去请求 api.bank.com/balance,银行返回数据并附上跨域设置,但浏览器发现来源是evil.com,目标是bank.com,返回的跨域设置不允许,就拦截数据,除非银行配置允许evil.com访问

错误情况:
bank.com配置中还用的生产环境的域,“允许任何网站读取我的数据”(HTTP 头: Access-Control-Allow-Origin: *)

为什么这是跨域攻击?
因为浏览器的同源策略本来是为了阻止 evil.com 读取 bank.com 的数据,但因为 bank.com 的错误配置(CORS 配置不当),导致这个安全防线失效了。

【怎么防御】

  • 防 CSRF:银行在转账时要求你必须输入一个只有你自己知道的“动态验证码”(Token),坏人猜不到,就没法伪造请求。或者设置 Cookie 为 SameSite,禁止跨站携带。
  • 防 CORS 泄露:银行严格规定,只允许 bank.com 自己读取数据,绝不设置 *

其他小知识点

  • biz_id(业务号):这个是由外部透传的,orderId每次都是随机生成(无法保证幂等性),只有外部透传的biz_id能在重复发奖的时候冲突,保证幂等性

扩展点

规则拓展

可以扩展
身份属性、频率与周期、资产与任务、环境与风控、库存状态

抽奖前:
对VIP用户做判断,提高中奖概率等
页面浏览时间、前置任务是否完成、道具消耗、抽奖次数是否充足、单日是否到上限

抽奖后:
抽奖后是否满足了什么条件主动发额外奖励


大营销平台系统笔记
http://www.981928.xyz/2026/02/05/大营销平台系统笔记/
作者
981928
发布于
2026年2月5日
许可协议