🍫资源同步设计
# 说明
提示
本篇主要介绍框架是如何做到可以让开发者在 DashBoard端实现灵活配置的,数据源(可配置资源)是如何而来的?
这里说到的可配置资源即为应用中的领域能力、处理器、参数、远程调用参数等资源数据
# 注解Sanner扫描
框架中提供多套注解,开发者只需要注解对应的注解即可完成可选的配置项上报
上报注解
@CubeDomainAbility
领域能力标识@CubeCooperateFloorProcessor
共建楼层处理器标识@IndependentParam
独立楼层应用上下文参数扫描上报注解@IndependentDataProcessor
独立楼层扩展处理器标识
# 监听事件
框架会自动监听Spring的ApplicationReadyEvent
触发资源上报
public class SupportLoader implements ApplicationListener<ApplicationReadyEvent> {
private ResourcesReporter actReporter;
public SupportLoader(ResourcesReporter actReporter) {
this.actReporter = actReporter;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
actReporter.report();
}
}
# CAS 排重
使用CAS排除极端情况下的重复上报问题,降低资源消耗和无端浪费请求
@Slf4j
public abstract class AbstractReporter implements Reporter {
AtomicBoolean switcher = new AtomicBoolean(false);
private List<ResourcesProcessorScanner> scanners;
public AbstractReporter(List<ResourcesProcessorScanner> scanners) {
this.scanners = scanners;
}
@Override
public void report() {
if (switcher.compareAndSet(false, true)) {
scanners.forEach(x -> {
try {
List<ActResource> actResources = x.scanResource();
if (CollUtil.isNotEmpty(actResources)) {
doReport(x.getType(), actResources);
}
} catch (Exception exception) {
log.error("ResourcesProcessorScanner{} errror{} ", JSONObject.toJSONString(x.isvAbilityProperties), exception);
}
});
}
}
/**
* Do report boolean.
*
* @param sourceType the source type
* @param resourceList the resource list
* @return the boolean
*/
abstract boolean doReport(String sourceType, List<ActResource> resourceList);
}
# 策略扫描器
使用策略模式继承@ResourcesProcessorScanner
的所有bean
(扫描器)
以下以领域能力扫描器进行介绍
package com.jd.cube.push.scanner;
import cn.hutool.core.util.StrUtil;
import com.jd.cube.common.anno.CubeAsync;
import com.jd.cube.common.anno.CubeDomainAbility;
import com.jd.cube.common.exception.CubeException;
import com.jd.cube.common.log.RecordLog;
import com.jd.cube.common.util.PlatformUtil;
import com.jd.cube.common.util.SpringUtil;
import com.jd.cube.domain.context.TemplateChannel;
import com.jd.cube.domain.properties.CubeProperties;
import com.jd.cube.domain.report.ActResource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.StopWatch;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.util.*;
import java.util.stream.Collectors;
import static com.jd.cube.common.constant.SourceType.SOURCE_TYPE_ABILITY;
import static com.jd.cube.common.enums.ResourceTypeEnum.UNIVERSAL_NO;
import static com.jd.cube.common.enums.ResourceTypeEnum.UNIVERSAL_YES;
/**
* The type Ability resources scanner.
*
* @program: cube
* @ClassName AbilityResourcesScanner
* @description:
* @author: sizegang
* @create: 2022 -08-16
*/
@Slf4j
public class AbilityResourcesScanner extends ResourcesProcessorScanner {
public static final String Object = "Object";
/**
* The constant TASK_NAME.
*/
public static final String TASK_NAME = "Ability and Cooperation resources scanner cost";
/**
* Instantiates a new Act scanner.
*
* @param isvAbilityProperties the isv ability properties
*/
public AbilityResourcesScanner(CubeProperties isvAbilityProperties) {
super(isvAbilityProperties);
}
@Override
public List<ActResource> doInit() {
StopWatch stopWatch = new StopWatch();
stopWatch.start(TASK_NAME);
String[] allBeanNames = applicationContext.getBeanDefinitionNames();
List<ActResource> isvResouces = new ArrayList<>();
String platformKey = PlatformUtil.key(isvAbilityProperties.getAccessToken());
Set<String> abilityCodes = new HashSet<>();
for (String beanName : allBeanNames) {
Object bean = SpringUtil.getBean(beanName);
ActResource actResource = null;
CubeDomainAbility abilityAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), CubeDomainAbility.class);
if (!Objects.isNull(abilityAnnotation)) {
actResource = dealAbilityAnnotation(abilityAnnotation, abilityCodes, platformKey, beanName);
}
if (Objects.isNull(actResource)) {
continue;
}
isvResouces.add(actResource);
}
stopWatch.stop();
RecordLog.warn(String.format("%s {}", TASK_NAME), stopWatch.getTotalTimeMillis());
return isvResouces;
}
@Override
public String getType() {
return SOURCE_TYPE_ABILITY;
}
/**
* ISV领域能力扫描上报
*
* @param abilityAnnotation
* @param codes
* @param platformKey
* @param beanName
* @return
*/
private ActResource dealAbilityAnnotation(CubeDomainAbility abilityAnnotation, Set<String> codes, String platformKey, String beanName) {
Object bean = SpringUtil.getBean(beanName);
String universal = UNIVERSAL_NO.getType();
if (((ParameterizedTypeImpl) bean.getClass().getGenericSuperclass()).getActualTypeArguments()[0].getTypeName()
== TemplateChannel.class.getName()) {
universal = UNIVERSAL_YES.getType();
}
if (StrUtil.isBlank(abilityAnnotation.code())) {
throw new CubeException(String.format("%s 未设置领域能力code ", beanName));
}
if (codes.contains(abilityAnnotation.code())) {
throw new CubeException(String.format("%s 存在重复的领域能力code ", beanName));
}
codes.add(abilityAnnotation.code());
Class[] source = abilityAnnotation.source();
for (Class aClass : source) {
if (AnnotationUtils.findAnnotation(aClass, CubeAsync.class) == null) {
if (!Object.equals(aClass.getSimpleName())) {
throw new CubeException(String.format("%s 的ISVDomainAbility 注解中的source属性异常 ", beanName));
}
}
}
List<String> sources = Arrays.stream(source).map((x) -> x.getSimpleName()).collect(Collectors.toList());
clearSource(sources);
ActResource actResource = new ActResource()
.setCode(abilityAnnotation.code())
.setService(beanName)
.setSourceType(universal)
.setPlatformKey(platformKey)
.setPlatformCode(PlatformUtil.getPlatform(isvAbilityProperties.getAccessToken()))
.setDesc(StrUtil.isBlank(abilityAnnotation.description()) ?
beanName : abilityAnnotation.description())
.setSource(sources);
return actResource;
}
private void clearSource(List<String> sources) {
if (sources.size() == 1 && sources.get(0).equals(Object)) {
sources.clear();
}
}
}
# 数据上报
# 资源上报器
/**
* The type Act reporter.
*
* @program: cube
* @ClassName ActReporter
* @description:
* @author: sizegang
* @create: 2022 -07-19
* @date 2022-08-26 18:30
* @author sizegang1
*/
public class ResourcesReporter extends AbstractReporter {
private CubeProperties isvAbilityProperties;
private AtomicBoolean success = new AtomicBoolean(false);
public static final String CODE = "code";
/**
* Instantiates a new Act reporter.
*
* @param scanners the scanners
* @param isvAbilityProperties the isv ability properties
*/
public ResourcesReporter(List<ResourcesProcessorScanner> scanners, CubeProperties isvAbilityProperties) {
super(scanners);
this.isvAbilityProperties = isvAbilityProperties;
}
/**
* Instantiates a new Act reporter.
*
* @param actScanner the act scanner
*/
public ResourcesReporter(List<ResourcesProcessorScanner> actScanner) {
super(actScanner);
}
/**
* @param resourceList the resource list
* @return
*/
@Override
boolean doReport(String sourceType, List<ActResource> resourceList) {
try {
String res = sendResources(sourceType, resourceList);
RecordLog.info("ActReporter report success {} res {}", JSONObject.toJSONString(resourceList), JSON.toJSONString(res));
return true;
} catch (Exception exception) {
RecordLog.error("【ActReporter report exception】 {}", exception);
retryReport(sourceType, resourceList);
return false;
}
}
/**
* send resources
*
* @param resourceList
* @return
*/
private String sendResources(String sourceType, List<ActResource> resourceList) {
CollectActResource collectActResource = new CollectActResource();
collectActResource.setResourceList(resourceList);
collectActResource.setType(sourceType);
String res = HttpRequest.post(getAddress())
.header(Header.CONTENT_TYPE, APPLICATION_JSON).body(JSONObject.toJSONString(collectActResource), APPLICATION_JSON).execute().body();
if (JSONObject.parseObject(res).get(CODE).equals(HttpStatus.HTTP_OK)) {
success.compareAndSet(false, true);
}
return res;
}
/**
* High availability retry mechanism
*
* @param resourceList
*/
private void retryReport(String sourceType, List<ActResource> resourceList) {
ScheduledFuture<?> scheduledFuture = ThreadExecutor.availabilityCheckRetry(() -> {
try {
sendResources(sourceType, resourceList);
} catch (Exception exception) {
RecordLog.error("【retryReport】 exception ", exception);
}
}, 10, 15000, TimeUnit.MILLISECONDS);
AsyncPool.asyncDo(() -> {
for (; ; ) {
if (success.compareAndSet(true, false)) {
scheduledFuture.cancel(true);
RecordLog.info("Report Resources success congratulate!!!");
break;
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
}
});
}
/**
* @return
* @method getAddress
*/
private String getAddress() {
String serverAddr = isvAbilityProperties.getServerInfo().getServerAddr();
Random random = new Random();
String[] address = serverAddr.split(",");
return address[random.nextInt(address.length)] + COLLECT_ABILITY;
}
}
# 双队列入库
# 事件机制区分不同资源
具体可以移至源码中cube-dashoboard-controller (opens new window)的CollectController
入口中进行查看
# 上报权限校验
@Slf4j
@Component
public abstract class AbstractReportDataProcessor implements ReportDataProcessor{
@Resource
private RedisTemplate redisTemplate;
@Resource
private IsvPlatformService isvPlatformService;
/**
*
* @param resources
* @param builder
* @return
*/
protected ResourceReportCheckDto platformAccessCheck(CollectActResource resources,
ResourceReportCheckDto.ResourceReportCheckDtoBuilder builder) {
List<ActResource> resourceList = resources.getResourceList();
if (Objects.isNull(resourceList) || resourceList.isEmpty()) {
return builder.pass(false).build();
}
ActResource actResource = resourceList.get(0);
String platform = PlatformUtil.getPlatformByPlaintext(actResource.getPlatformKey());
Object platCache = redisTemplate.opsForHash().get(PLATFORM_KEY, platform);
if (Objects.isNull(platCache)) {
IsvPlatform isvPlatform = new IsvPlatform();
isvPlatform.setCode(platform);
IsvPlatform persistedPlatform = isvPlatformService.get(isvPlatform);
if (persistedPlatform == null) {
log.error("领域能力数据上报失败 原因: 权限不足 platformKey: {}", actResource.getPlatformKey());
return builder.pass(false).build();
} else {
redisTemplate.opsForHash().put(PLATFORM_KEY, platform, persistedPlatform);
return builder.pass(true).build();
}
}
return builder.pass(true).build();
}
}
# 资源入队列
框架通过NotifyCenter.publishEvent发布不同资源类型的入对列事件。通过监听不同的入队事件进而区分不同类型的资源类型 以下以领域能力为例:
@Slf4j
@Service(SourceType.SOURCE_TYPE_ABILITY)
public class AbilityReportDataProcessor extends AbstractReportDataProcessor {
@Override
public ResourceReportCheckDto process(CollectActResource resources) {
ResourceReportCheckDto.ResourceReportCheckDtoBuilder builder = ResourceReportCheckDto.builder();
ResourceReportCheckDto resourceReportCheckDto = platformAccessCheck(resources, ResourceReportCheckDto.builder());
if (resourceReportCheckDto.isPass()) {
NotifyCenter.publishEvent(new ActResourceChangeEvent(resources.getResourceList()));
resourceReportCheckDto.setMessage("success");
return resourceReportCheckDto;
}
return builder.pass(false).message("权限不足").build();
}
}
# 双队列缓冲
为什么要使用双队列? 为什么不能将来的请求直接入库。还要搞一个队列入队是不是太麻烦了?
至于为什么这样做可以参考Nacos
在实现配置上报时是如何实现的数据持久化操作
# 抽象资源入库类执行器
public abstract class AbstractEventProcessor<T> implements Processor<List<T>>, InitializingBean {
/**
*
*/
private AtomicBoolean down = new AtomicBoolean(false);
/**
* The constant CACHE_ABILITY.
*/
protected Map<String, Map<String, T>> cache = new ConcurrentHashMap<>();
/**
* The Persistent act.
*/
protected LinkedBlockingQueue<T> actQueue = new LinkedBlockingQueue<>();
/**
* The Insert queue.
*/
protected LinkedBlockingQueue<T> insertQueue = new LinkedBlockingQueue<>();
@Override
public void execute(List<T> t) {
if (Objects.isNull(t)) {
return;
}
this.actQueue.addAll(t);
}
@Override
public void afterPropertiesSet() throws Exception {
if (!down.compareAndSet(false, true)) {
return;
}
List<T> actResources = new ArrayList<>();
List<T> insertedResources = new ArrayList<>();
ThreadExecutorFactory.initResource(() -> {
try {
actQueue.drainTo(actResources);
handle(actResources);
actResources.clear();
} catch (Exception exception) {
RecordLog.error("【AbilityEventProcessor comsumer】 exception", exception);
}
}, 100, 1000, TimeUnit.MILLISECONDS);
ThreadExecutorFactory.checkResources(() -> {
try {
check();
} catch (Exception exception) {
RecordLog.error("【check abilityDomain process】exception {}", exception);
}
}, 1, 60, TimeUnit.SECONDS);
ThreadExecutorFactory.resourcesPersistent(() -> {
insertQueue.drainTo(insertedResources);
if(CollUtil.isEmpty(insertedResources)){
return;
}
try {
persistent(insertedResources);
} catch (Exception exception) {
RecordLog.error("【AbilityEventProcessor persistant】 exception", exception);
persistentCallback(insertedResources);
} finally {
insertedResources.clear();
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
/**
* Handle.
*
* @param resources the resources
*/
abstract void handle(List<T> resources);
/**
* Check.
*/
abstract void check();
/**
* Persistent.
*
* @param resources the resources
*/
abstract void persistent(List<T> resources);
/**
* Persistent callback.
*
* @param resources the resources
*/
abstract void persistentCallback(List<T> resources);
protected String cacheKey(String type, String platformKey) {
return String.format("%s_%s", type, platformKey);
}
}
# 领域能力资源事件执行器
@Component
public class AbilityEventProcessor extends AbstractEventProcessor<ActResource> {
@Resource
private CubeConfigDomainAbilityService cubeConfigDomainAbilityService;
@Override
void handle(List<ActResource> resources) {
for (ActResource x : resources) {
String platformKey = x.getPlatformKey();
Map<String, ActResource> resourceMap = cache.get(cacheKey(SOURCE_TYPE_ABILITY, platformKey));
if (Objects.isNull(resourceMap)) {
insertQueue.add(x);
continue;
}
ActResource actResource = resourceMap.get(x.getCode());
try {
if (Objects.nonNull(actResource) && Objects.nonNull(actResource.getDesc()) && actResource.getDesc().equals(x.getDesc())
&& actResource.getSource().equals(x.getSource())
&& x.getPlatformCode().equals(actResource.getPlatformCode()) && x.getPlatformKey().equals(actResource.getPlatformKey())
&& x.getSourceType().equals(actResource.getSourceType())) {
continue;
}
} catch (Exception exception) {
RecordLog.error("领域能力执行器上报检查异常{} actResource {}", exception, x);
}
insertQueue.add(x);
}
}
@Override
void persistent(List<ActResource> resources) {
cubeConfigDomainAbilityService.abilityPersistent(resources);
}
@Override
void persistentCallback(List<ActResource> resources) {
for (ActResource insertedResource : resources) {
try {
cubeConfigDomainAbilityService.insertAbilityProcess(insertedResource);
} catch (Exception exc) {
RecordLog.error("【AbilityEventProcessor every persistant】 exception", exc);
}
}
}
@Override
void check() {
IsvConfigDomainAbility isvConfigDomainAbility = new IsvConfigDomainAbility();
List<IsvConfigDomainAbility> abilityServiceList = cubeConfigDomainAbilityService.findList(isvConfigDomainAbility);
for (IsvConfigDomainAbility x : abilityServiceList) {
Map<String, ActResource> resourceMap = cache.get(cacheKey(SOURCE_TYPE_ABILITY, x.getPlatformKey()));
if (Objects.isNull(resourceMap)) {
resourceMap = new ConcurrentHashMap<>();
fillMap(x, resourceMap);
cache.put(cacheKey(SOURCE_TYPE_ABILITY, x.getPlatformKey()), resourceMap);
continue;
}
ActResource actResource = resourceMap.get(x.getCode());
if (Objects.isNull(actResource)) {
fillMap(x, resourceMap);
}
}
}
/**
* @param x
* @param resourceMap
*/
private void fillMap(IsvConfigDomainAbility x, Map<String, ActResource> resourceMap) {
ActResource actResource = new ActResource();
actResource.setCode(x.getCode());
actResource.setDesc(x.getDesc());
actResource.setService(x.getService());
actResource.setPlatformKey(x.getPlatformKey());
actResource.setPlatformCode(x.getPlatformCode());
actResource.setSourceType(x.getUniversal());
actResource.setSource(Arrays.asList(StrUtil.split(x.getSource(), ",")));
resourceMap.put(x.getCode(), actResource);
}
}
帮助我们改善此文档 (opens new window)