Cube Cube
首页
  • v1.0.0
👼问答
💖支持
🤾加入咚咚群
  • 项目介绍
  • 更新记录
  • 参与研发
GitHub (opens new window)
首页
  • v1.0.0
👼问答
💖支持
🤾加入咚咚群
  • 项目介绍
  • 更新记录
  • 参与研发
GitHub (opens new window)
  • 🍤Cube简介
  • 🍑框架特性
  • 🍄快速开始

    • 🍟说明
    • 🍅集成三步走

      • 🍉SDK集成
      • 🍒申请接入
      • 🍍环境配置
    • 🍊核心楼层

      • 🍋接口开发
      • 🍣服务开发
      • 🍏任务开发
      • 🍿领域能力
    • 🍔共建楼层

      • 🫕依赖引入
      • 🍠共建仓库
      • 🍱应用共建
    • 🏈独立楼层

      • 🍗使用说明
      • 🥯参数注解
      • 🥫集成开发
      • 🧇楼层扩展
  • 🍓设计文档

    • 🍫资源同步设计
      • 说明
      • 注解Sanner扫描
        • 监听事件
        • CAS 排重
        • 策略扫描器
      • 数据上报
        • 资源上报器
      • 双队列入库
        • 事件机制区分不同资源
        • 上报权限校验
        • 资源入队列
        • 双队列缓冲
        • 抽象资源入库类执行器
        • 领域能力资源事件执行器
    • 🫕配置并发设计
    • 🍬SPI扩展设计
    • 🥩架构设计
  • 指南
  • 🍓设计文档
sizegang
2022-09-03
目录

🍫资源同步设计

# 说明

提示

本篇主要介绍框架是如何做到可以让开发者在 DashBoard端实现灵活配置的,数据源(可配置资源)是如何而来的?

这里说到的可配置资源即为应用中的领域能力、处理器、参数、远程调用参数等资源数据

# 注解Sanner扫描

框架中提供多套注解,开发者只需要注解对应的注解即可完成可选的配置项上报

上报注解

  • @CubeDomainAbility 领域能力标识
  • @CubeCooperateFloorProcessor 共建楼层处理器标识
  • @IndependentParam 独立楼层应用上下文参数扫描上报注解
  • @IndependentDataProcessor 独立楼层扩展处理器标识

# 监听事件

框架会自动监听Spring的ApplicationReadyEvent触发资源上报

public class SupportLoader implements ApplicationListener<ApplicationReadyEvent> {

    private ResourcesReporter actReporter;

    public SupportLoader(ResourcesReporter actReporter) {
        this.actReporter = actReporter;
    }


    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        actReporter.report();
    }
}

# CAS 排重

使用CAS排除极端情况下的重复上报问题,降低资源消耗和无端浪费请求

@Slf4j
public abstract class AbstractReporter implements Reporter {

    AtomicBoolean switcher = new AtomicBoolean(false);

    private List<ResourcesProcessorScanner> scanners;

    public AbstractReporter(List<ResourcesProcessorScanner> scanners) {
        this.scanners = scanners;
    }

    @Override
    public void report() {
        if (switcher.compareAndSet(false, true)) {
            scanners.forEach(x -> {
                try {
                    List<ActResource> actResources = x.scanResource();
                    if (CollUtil.isNotEmpty(actResources)) {
                        doReport(x.getType(), actResources);
                    }
                } catch (Exception exception) {
                    log.error("ResourcesProcessorScanner{} errror{} ", JSONObject.toJSONString(x.isvAbilityProperties), exception);
                }
            });
        }
    }

    /**
     * Do report boolean.
     *
     * @param sourceType   the source type
     * @param resourceList the resource list
     * @return the boolean
     */
    abstract boolean doReport(String sourceType, List<ActResource> resourceList);
}

# 策略扫描器

使用策略模式继承@ResourcesProcessorScanner的所有bean(扫描器) 以下以领域能力扫描器进行介绍

package com.jd.cube.push.scanner;

import cn.hutool.core.util.StrUtil;
import com.jd.cube.common.anno.CubeAsync;
import com.jd.cube.common.anno.CubeDomainAbility;
import com.jd.cube.common.exception.CubeException;
import com.jd.cube.common.log.RecordLog;
import com.jd.cube.common.util.PlatformUtil;
import com.jd.cube.common.util.SpringUtil;
import com.jd.cube.domain.context.TemplateChannel;
import com.jd.cube.domain.properties.CubeProperties;
import com.jd.cube.domain.report.ActResource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.StopWatch;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;

import java.util.*;
import java.util.stream.Collectors;

import static com.jd.cube.common.constant.SourceType.SOURCE_TYPE_ABILITY;
import static com.jd.cube.common.enums.ResourceTypeEnum.UNIVERSAL_NO;
import static com.jd.cube.common.enums.ResourceTypeEnum.UNIVERSAL_YES;

/**
 * The type Ability resources scanner.
 *
 * @program: cube
 * @ClassName AbilityResourcesScanner
 * @description:
 * @author: sizegang
 * @create: 2022 -08-16
 */
@Slf4j
public class AbilityResourcesScanner extends ResourcesProcessorScanner {

    public static final String Object = "Object";
    /**
     * The constant TASK_NAME.
     */
    public static final String TASK_NAME = "Ability and Cooperation resources scanner cost";

    /**
     * Instantiates a new Act scanner.
     *
     * @param isvAbilityProperties the isv ability properties
     */
    public AbilityResourcesScanner(CubeProperties isvAbilityProperties) {
        super(isvAbilityProperties);
    }

    @Override
    public List<ActResource> doInit() {

        StopWatch stopWatch = new StopWatch();
        stopWatch.start(TASK_NAME);
        String[] allBeanNames = applicationContext.getBeanDefinitionNames();
        List<ActResource> isvResouces = new ArrayList<>();
        String platformKey = PlatformUtil.key(isvAbilityProperties.getAccessToken());

        Set<String> abilityCodes = new HashSet<>();

        for (String beanName : allBeanNames) {
            Object bean = SpringUtil.getBean(beanName);

            ActResource actResource = null;

            CubeDomainAbility abilityAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), CubeDomainAbility.class);

            if (!Objects.isNull(abilityAnnotation)) {
                actResource = dealAbilityAnnotation(abilityAnnotation, abilityCodes, platformKey, beanName);
            }

            if (Objects.isNull(actResource)) {
                continue;
            }

            isvResouces.add(actResource);
        }

        stopWatch.stop();
        RecordLog.warn(String.format("%s {}", TASK_NAME), stopWatch.getTotalTimeMillis());
        return isvResouces;
    }

    @Override
    public String getType() {
        return SOURCE_TYPE_ABILITY;
    }

    /**
     * ISV领域能力扫描上报
     *
     * @param abilityAnnotation
     * @param codes
     * @param platformKey
     * @param beanName
     * @return
     */
    private ActResource dealAbilityAnnotation(CubeDomainAbility abilityAnnotation, Set<String> codes, String platformKey, String beanName) {
        Object bean = SpringUtil.getBean(beanName);

        String universal = UNIVERSAL_NO.getType();

        if (((ParameterizedTypeImpl) bean.getClass().getGenericSuperclass()).getActualTypeArguments()[0].getTypeName()
                == TemplateChannel.class.getName()) {
            universal = UNIVERSAL_YES.getType();
        }

        if (StrUtil.isBlank(abilityAnnotation.code())) {
            throw new CubeException(String.format("%s 未设置领域能力code ", beanName));
        }

        if (codes.contains(abilityAnnotation.code())) {
            throw new CubeException(String.format("%s 存在重复的领域能力code ", beanName));
        }

        codes.add(abilityAnnotation.code());
        Class[] source = abilityAnnotation.source();

        for (Class aClass : source) {
            if (AnnotationUtils.findAnnotation(aClass, CubeAsync.class) == null) {
                if (!Object.equals(aClass.getSimpleName())) {
                    throw new CubeException(String.format("%s 的ISVDomainAbility 注解中的source属性异常  ", beanName));
                }
            }
        }
        List<String> sources = Arrays.stream(source).map((x) -> x.getSimpleName()).collect(Collectors.toList());
        clearSource(sources);
        ActResource actResource = new ActResource()
                .setCode(abilityAnnotation.code())
                .setService(beanName)
                .setSourceType(universal)
                .setPlatformKey(platformKey)
                .setPlatformCode(PlatformUtil.getPlatform(isvAbilityProperties.getAccessToken()))
                .setDesc(StrUtil.isBlank(abilityAnnotation.description()) ?
                        beanName : abilityAnnotation.description())
                .setSource(sources);

        return actResource;
    }

    private void clearSource(List<String> sources) {
        if (sources.size() == 1 && sources.get(0).equals(Object)) {
            sources.clear();
        }
    }
}

# 数据上报

# 资源上报器


/**
 * The type Act reporter.
 *
 * @program: cube
 * @ClassName ActReporter
 * @description:
 * @author: sizegang
 * @create: 2022 -07-19
 * @date 2022-08-26 18:30
 * @author sizegang1
 */
public class ResourcesReporter extends AbstractReporter {


    private CubeProperties isvAbilityProperties;

    private AtomicBoolean success = new AtomicBoolean(false);

    public static final String CODE = "code";

    /**
     * Instantiates a new Act reporter.
     *
     * @param scanners             the scanners
     * @param isvAbilityProperties the isv ability properties
     */
    public ResourcesReporter(List<ResourcesProcessorScanner> scanners, CubeProperties isvAbilityProperties) {
        super(scanners);
        this.isvAbilityProperties = isvAbilityProperties;
    }

    /**
     * Instantiates a new Act reporter.
     *
     * @param actScanner the act scanner
     */
    public ResourcesReporter(List<ResourcesProcessorScanner> actScanner) {
        super(actScanner);
    }

    /**
     * @param resourceList the resource list
     * @return
     */
    @Override
    boolean doReport(String sourceType, List<ActResource> resourceList) {
        try {
            String res = sendResources(sourceType, resourceList);
            RecordLog.info("ActReporter report  success {} res {}", JSONObject.toJSONString(resourceList), JSON.toJSONString(res));
            return true;
        } catch (Exception exception) {
            RecordLog.error("【ActReporter report exception】 {}", exception);
            retryReport(sourceType, resourceList);
            return false;
        }
    }

    /**
     * send resources
     *
     * @param resourceList
     * @return
     */
    private String sendResources(String sourceType, List<ActResource> resourceList) {
        CollectActResource collectActResource = new CollectActResource();
        collectActResource.setResourceList(resourceList);
        collectActResource.setType(sourceType);
        String res = HttpRequest.post(getAddress())
                .header(Header.CONTENT_TYPE, APPLICATION_JSON).body(JSONObject.toJSONString(collectActResource), APPLICATION_JSON).execute().body();
        if (JSONObject.parseObject(res).get(CODE).equals(HttpStatus.HTTP_OK)) {
            success.compareAndSet(false, true);
        }
        return res;
    }

    /**
     * High availability retry mechanism
     *
     * @param resourceList
     */
    private void retryReport(String sourceType, List<ActResource> resourceList) {
        ScheduledFuture<?> scheduledFuture = ThreadExecutor.availabilityCheckRetry(() -> {
            try {
                sendResources(sourceType, resourceList);
            } catch (Exception exception) {
                RecordLog.error("【retryReport】 exception ", exception);
            }
        }, 10, 15000, TimeUnit.MILLISECONDS);

        AsyncPool.asyncDo(() -> {
            for (; ; ) {
                if (success.compareAndSet(true, false)) {
                    scheduledFuture.cancel(true);
                    RecordLog.info("Report Resources success congratulate!!!");
                    break;
                }
                try {
                    Thread.sleep(30000);
                } catch (InterruptedException e) {
                }
            }
        });
    }

    /**
     * @return
     * @method getAddress
     */
    private String getAddress() {
        String serverAddr = isvAbilityProperties.getServerInfo().getServerAddr();
        Random random = new Random();
        String[] address = serverAddr.split(",");
        return address[random.nextInt(address.length)] + COLLECT_ABILITY;
    }
}

# 双队列入库

# 事件机制区分不同资源

具体可以移至源码中cube-dashoboard-controller (opens new window)的CollectController入口中进行查看

# 上报权限校验

@Slf4j
@Component
public abstract class AbstractReportDataProcessor implements ReportDataProcessor{

    @Resource
    private RedisTemplate redisTemplate;

    @Resource
    private IsvPlatformService isvPlatformService;
    /**
     *
     * @param resources
     * @param builder
     * @return
     */
    protected ResourceReportCheckDto platformAccessCheck(CollectActResource resources,
                                                       ResourceReportCheckDto.ResourceReportCheckDtoBuilder builder) {
        List<ActResource> resourceList = resources.getResourceList();
        if (Objects.isNull(resourceList) || resourceList.isEmpty()) {
            return builder.pass(false).build();
        }
        ActResource actResource = resourceList.get(0);

        String platform = PlatformUtil.getPlatformByPlaintext(actResource.getPlatformKey());

        Object platCache = redisTemplate.opsForHash().get(PLATFORM_KEY, platform);

        if (Objects.isNull(platCache)) {
            IsvPlatform isvPlatform = new IsvPlatform();
            isvPlatform.setCode(platform);
            IsvPlatform persistedPlatform = isvPlatformService.get(isvPlatform);
            if (persistedPlatform == null) {
                log.error("领域能力数据上报失败 原因: 权限不足 platformKey: {}", actResource.getPlatformKey());
                return builder.pass(false).build();
            } else {
                redisTemplate.opsForHash().put(PLATFORM_KEY, platform, persistedPlatform);
                return builder.pass(true).build();
            }
        }
        return builder.pass(true).build();
    }
}

# 资源入队列

框架通过NotifyCenter.publishEvent发布不同资源类型的入对列事件。通过监听不同的入队事件进而区分不同类型的资源类型 以下以领域能力为例:

@Slf4j
@Service(SourceType.SOURCE_TYPE_ABILITY)
public class AbilityReportDataProcessor extends AbstractReportDataProcessor {

    @Override
    public ResourceReportCheckDto process(CollectActResource resources) {
        ResourceReportCheckDto.ResourceReportCheckDtoBuilder builder = ResourceReportCheckDto.builder();
        ResourceReportCheckDto resourceReportCheckDto = platformAccessCheck(resources, ResourceReportCheckDto.builder());
        if (resourceReportCheckDto.isPass()) {
            NotifyCenter.publishEvent(new ActResourceChangeEvent(resources.getResourceList()));
            resourceReportCheckDto.setMessage("success");
            return resourceReportCheckDto;
        }
        return builder.pass(false).message("权限不足").build();
    }
}

# 双队列缓冲

为什么要使用双队列? 为什么不能将来的请求直接入库。还要搞一个队列入队是不是太麻烦了?

至于为什么这样做可以参考Nacos在实现配置上报时是如何实现的数据持久化操作

# 抽象资源入库类执行器

public abstract class AbstractEventProcessor<T> implements Processor<List<T>>, InitializingBean {
    /**
     *
     */
    private AtomicBoolean down = new AtomicBoolean(false);

    /**
     * The constant CACHE_ABILITY.
     */
    protected Map<String, Map<String, T>> cache = new ConcurrentHashMap<>();


    /**
     * The Persistent act.
     */
    protected LinkedBlockingQueue<T> actQueue = new LinkedBlockingQueue<>();

    /**
     * The Insert queue.
     */
    protected LinkedBlockingQueue<T> insertQueue = new LinkedBlockingQueue<>();

    @Override
    public void execute(List<T> t) {
        if (Objects.isNull(t)) {
            return;
        }
        this.actQueue.addAll(t);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (!down.compareAndSet(false, true)) {
            return;
        }
        List<T> actResources = new ArrayList<>();

        List<T> insertedResources = new ArrayList<>();

        ThreadExecutorFactory.initResource(() -> {
            try {
                actQueue.drainTo(actResources);
                handle(actResources);
                actResources.clear();
            } catch (Exception exception) {
                RecordLog.error("【AbilityEventProcessor comsumer】 exception", exception);
            }
        }, 100, 1000, TimeUnit.MILLISECONDS);

        ThreadExecutorFactory.checkResources(() -> {
            try {
                check();
            } catch (Exception exception) {
                RecordLog.error("【check abilityDomain process】exception {}", exception);
            }

        }, 1, 60, TimeUnit.SECONDS);

        ThreadExecutorFactory.resourcesPersistent(() -> {
            insertQueue.drainTo(insertedResources);
            if(CollUtil.isEmpty(insertedResources)){
                return;
            }
            try {
                persistent(insertedResources);
            } catch (Exception exception) {
                RecordLog.error("【AbilityEventProcessor persistant】 exception", exception);
                persistentCallback(insertedResources);
            } finally {
                insertedResources.clear();
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }

    /**
     * Handle.
     *
     * @param resources the resources
     */
    abstract void handle(List<T> resources);

    /**
     * Check.
     */
    abstract void check();

    /**
     * Persistent.
     *
     * @param resources the resources
     */
    abstract void persistent(List<T> resources);

    /**
     * Persistent callback.
     *
     * @param resources the resources
     */
    abstract void persistentCallback(List<T> resources);


    protected String cacheKey(String type, String platformKey) {
        return String.format("%s_%s", type, platformKey);
    }
}

# 领域能力资源事件执行器

@Component
public class AbilityEventProcessor extends AbstractEventProcessor<ActResource> {

    @Resource
    private CubeConfigDomainAbilityService cubeConfigDomainAbilityService;


    @Override
    void handle(List<ActResource> resources) {
        for (ActResource x : resources) {
            String platformKey = x.getPlatformKey();
            Map<String, ActResource> resourceMap = cache.get(cacheKey(SOURCE_TYPE_ABILITY, platformKey));
            if (Objects.isNull(resourceMap)) {
                insertQueue.add(x);
                continue;
            }
            ActResource actResource = resourceMap.get(x.getCode());
            try {
                if (Objects.nonNull(actResource) && Objects.nonNull(actResource.getDesc()) && actResource.getDesc().equals(x.getDesc())
                        && actResource.getSource().equals(x.getSource())
                        && x.getPlatformCode().equals(actResource.getPlatformCode()) && x.getPlatformKey().equals(actResource.getPlatformKey())
                        && x.getSourceType().equals(actResource.getSourceType())) {
                    continue;
                }
            } catch (Exception exception) {
                RecordLog.error("领域能力执行器上报检查异常{} actResource {}", exception, x);
            }
            insertQueue.add(x);
        }
    }


    @Override
    void persistent(List<ActResource> resources) {
        cubeConfigDomainAbilityService.abilityPersistent(resources);

    }

    @Override
    void persistentCallback(List<ActResource> resources) {
        for (ActResource insertedResource : resources) {
            try {
                cubeConfigDomainAbilityService.insertAbilityProcess(insertedResource);
            } catch (Exception exc) {
                RecordLog.error("【AbilityEventProcessor every persistant】 exception", exc);
            }
        }
    }

    @Override
    void check() {
        IsvConfigDomainAbility isvConfigDomainAbility = new IsvConfigDomainAbility();
        List<IsvConfigDomainAbility> abilityServiceList = cubeConfigDomainAbilityService.findList(isvConfigDomainAbility);
        for (IsvConfigDomainAbility x : abilityServiceList) {
            Map<String, ActResource> resourceMap = cache.get(cacheKey(SOURCE_TYPE_ABILITY, x.getPlatformKey()));
            if (Objects.isNull(resourceMap)) {
                resourceMap = new ConcurrentHashMap<>();
                fillMap(x, resourceMap);
                cache.put(cacheKey(SOURCE_TYPE_ABILITY, x.getPlatformKey()), resourceMap);
                continue;
            }

            ActResource actResource = resourceMap.get(x.getCode());
            if (Objects.isNull(actResource)) {
                fillMap(x, resourceMap);
            }
        }
    }

    /**
     * @param x
     * @param resourceMap
     */
    private void fillMap(IsvConfigDomainAbility x, Map<String, ActResource> resourceMap) {
        ActResource actResource = new ActResource();
        actResource.setCode(x.getCode());
        actResource.setDesc(x.getDesc());
        actResource.setService(x.getService());
        actResource.setPlatformKey(x.getPlatformKey());
        actResource.setPlatformCode(x.getPlatformCode());
        actResource.setSourceType(x.getUniversal());
        actResource.setSource(Arrays.asList(StrUtil.split(x.getSource(), ",")));
        resourceMap.put(x.getCode(), actResource);
    }

}

帮助我们改善此文档 (opens new window)
🧇楼层扩展
🫕配置并发设计

← 🧇楼层扩展 🫕配置并发设计→

Theme by Vdoing | Copyright © 2022-2022 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式