🍏任务开发
# 任务单元
我们在一些列开发过程中,总会有数据处理的service开发。从数据库获取数据、从远程获取数据使用 HTTP、RPC 又获取数据从本次磁盘File中获取 不管使用那种获取数据的方式,都会涉及到本次磁盘IO和网络IO的问题。但是这种也正式需要消耗系统资源的问题所在。
在开始之前建议看一下 Gobrs-Async (opens new window) 异步编排框架,对具有依赖关系的任务进行异步编排,提高系统并发。 因为本人是 Gobrs-Async的作者,所以也直接使用了该框架作为底层异步任务编排基础框架。 关于Gobrs-Async的介绍在这里不做过多介绍,大家有兴趣可以 去官网查看详细文档介绍。
# 开发异步任务
以下以优惠券RPC、价格、库存RPC为例开发三个异步任务
开发一个异步任务只需要以下两步走
- 注解
@CubeAsync
- 继承
CubeTask
抽象类
# 优惠券任务
@CubeAsync
public class CouponRpcService extends CubeTask<BizParam, Result> {
@Override
public Result task(BizParam param, TaskSupport support) {
Map product = getResult(support, ProductRpcService.class, Map.class);
try {
Assert.notNull(product);
}catch (Exception ex){
System.out.println(ex);
}
System.out.println("商品数据" + JSONObject.toJSONString(product));
Result result = Result.builder().name("优惠券-会员专项").content("-会员专项优惠99 减 5").build();
return result;
}
}
# 价格任务
@CubeAsync
public class PriceRpcService extends CubeTask<BizParam, Object> {
@Override
public Object task(BizParam bizParam, TaskSupport support) {
Result coupon = getResult(support, CouponRpcService.class, Result.class);
try {
Assert.notNull(coupon);
} catch (Exception ex) {
System.out.println(ex);
}
System.out.println("优惠券结果" + JSONObject.toJSONString(coupon));
Map stockResult = getResult(support, StockRpcService.class, Map.class);
try {
Assert.notNull(stockResult);
} catch (Exception ex) {
System.out.println(ex);
}
System.out.println("库存结果" + JSONObject.toJSONString(stockResult));
String price = "价格数据";
return price;
}
}
# 库存任务
@CubeAsync
public class StockRpcService extends CubeTask<BizParam, Object> {
@Override
public Object task(BizParam transfer, TaskSupport support) {
TaskResult aResult = support.getResultMap().get(ProductRpcService.class);
try {
Assert.notNull(aResult);
} catch (Exception ex) {
System.out.println(ex);
}
System.out.println("商品结果" + JSONObject.toJSONString(aResult));
Map<String, String> stock = new HashMap<>();
stock.put("stock", "库存不足");
return stock;
}
}
# 如何使用异步任务API
关于如何开发一个异步任务,其实很简单以下就是开发一个异步任务。那有朋友会问到,如何获取自身所依赖的任务返回的数据呢?
答案是: 使用getResult
方法
该方法需要传入三个参数
TaskSupport support
通过task方法透传即可Class<? extends Task> clazz
所要获取结果的依赖服务类Class<Result> type
所依赖的数据类型
String coupon = getResult(support, CouponRpcService.class, String.class);
# 如何获取到前端传递过来的参数
关于这个问题有两种方式
# 第一种方式-应用上下文
开发者可以将需要传递给(RPC层、服务层、Service层)的数据统一放入BizParam
中(BizParam
可以理解成应用上下文参数,应用中主接口的所有参数和结果都封装在这一对象中进行统一的数据流转)
# 设置参数
在设置参数时需要以下操作:
Transfer transfer = TransferFactory.create((trans) -> {
trans.setContext(bizParam);
trans.setTemplateCode(bizParam.getTemplateCode());
trans.setArrangementRule("test");
return trans;
});
那么所有的RPC service 层都会通过泛型的方式接收到这个参数(BizParam
)
CubeTask
泛型中的参数介绍:
- 第一个参数为当前传入
task
的参数类型 - 第二个参数为
task
的返回结果
如下:
@CubeAsync
public class CouponNewPersonRpcService extends CubeTask<BizParam, Result> {
@Override
public Result task(BizParam o, TaskSupport support) {
TaskResult aResult = support.getResultMap().get(BigoumaRpcService.class);
Map bigResult = getResult(support, BigoumaRpcService.class, Map.class);
try {
Assert.notNull(aResult);
} catch (Exception ex) {
System.out.println(ex);
}
System.out.println("必购码结果" + JSONObject.toJSONString(aResult));
Result result = Result.builder().name("优惠券新人券").content("新人立减首购立减100").build();
return result;
}
}
# 第二种方式-指定参数
和第一种不同的仅仅是, 指定参数的方式需要传递一个Map<Class,Object>
给Transfer
,如下:
# 设置参数
Transfer transfer = TransferFactory.create((trans) -> {
Map<Class, Object> params = new HashMap<>();
params.put(CouponNewPersonRpcService.class, "价格参数"); // key 为具体service的类类型, value 为参数内容
trans.setContext(bizParam);
trans.setTemplateCode(bizParam.getTemplateCode());
trans.setArrangementRule("test");
return trans;
});
# 获取参数
获取参数这边只需要改变以下CubeTask的第一个泛型类型即可。
@CubeAsync
public class CouponNewPersonRpcService extends CubeTask<String, Result> {
@Override
public Result task(String o, TaskSupport support) {
TaskResult aResult = support.getResultMap().get(BigoumaRpcService.class);
Map bigResult = getResult(support, BigoumaRpcService.class, Map.class);
try {
Assert.notNull(aResult);
} catch (Exception ex) {
System.out.println(ex);
}
System.out.println("必购码结果" + JSONObject.toJSONString(aResult));
Result result = Result.builder().name("优惠券新人券").content("新人立减首购立减100").build();
return result;
}
}
# 扩展
如果想对异步编排任务做一些特殊处理,例如如下操作:
- 根据条件判断当前任务是否执行
- 任务执行成功todo
- 任务执行失败todo
- 任务统一拦截前置处理、后置处理
- 任务异常拦截
- 其他扩展
如果你想做这些操作。因Cube在做动态任务编排底层使用的是Gobrs-Async
,所以Gobrs-Async
所具备的所有能力同样适用于Cube框架。
异步任务详情请查阅Gobrs-Async (opens new window)