原创

Dubbo注册中心

前言

本篇主要介绍一下Dubbo的注册中心的总体工作流程,以及不同类型注册中心的数据结构和实现原理,同时还会介绍一下注册中心支持的通用特性,如缓存机制、重试机制,最会会对整个注册中心中用到的设计模式做深度的解析,主要目的是为了深入理解Dubbo各种注册中心的实现原理,方便后续快速理解并拓展。

概述

注册中心是Dubbo体系中核心组件之一,通过注册中心实现了再分布式环境中各服务之间的注册发现,其主要作用如下:

  • 动态加入:一个服务提供者通过注册中心动态地把自己暴露给其他消费者,无需消费者挨个去更新配置文件。
  • 动态发现:一个消费者可以动态的感知新的配置,路由规则和新的服务提供者,无需重启服务就可以生效。
  • 动态调整:注册中心支持参数的动态调整,新参数自动更新到所有相关服务节点。
  • 统一配置:避免了本地配置导致各个服务的配置不一致的问题。

模块介绍

当前dubbo版本: 3.0.4

模块名称 模块介绍
dubbo-registry-api 包含注册中心所有API和抽象实现类
dubbo-registry-kubernetes 在kubernetes中注册中心的实现
dubbo-registry-dns 使用dns作注册中心的实现
dubbo-registry-multicast multicast模式的服务的注册和发现
dubbo-registry-multiple 多注册中心模式下服务的注册和发现
dubbo-registry-nacos 使用Nacos作为注册中心的实现
dubbo-registry-xds Service-mesh模式下服务的注册和发现(Service mesh)
dubbo-registry-zookeeper 使用ZooKeeper作为注册中心的实现

从dubbo-registry的模块中我们可以看到,Dubbo主要包含6种注册中心的实现,分别是:kubernetes、dns、multicast、nacos、xds、zookeeper。

下面我们来看下各种维度的对比

工作流程

注册红心的总体工作流程比较简单,总体流程如下图所示:

  • 服务提供者服务启动时,会向注册中心写入自己的元数据信息,同时会订阅配置元数据信息。
  • 消费者启动时,也会想注册中心写入自己的元数据信息,并订阅服务提供者、路由和配置元数据信息。
  • 服务治理中心(dubbo-admin)启动时,会同时订阅所有的消费者、服务提供者、路由和配置元数据信息。
  • 当有服务提供者下线或者有新的服务提供者加入时,注册中心服务提供者目录会发生变化,变化信息会动态给消费者、服务治理中心。
  • 当消费者发起服务调用时,会异步将调用、统计信息等上报给监控中心(dubbo-monitor-simple)。

数据结构

注册中心的总体流程大致类似,但是不同注册中心有不同的实现方式,其数据结构也不相同。ZooKeeper、Nacos等注册中心都实现了这个流程,又要有些注册中心并不常用,因此本篇只分析ZooKeeper、Nacos两种实现的数据结构。

ZooKeeper原理概述

ZooKeeper是树形结构的注册中心,每个节点的类型分为持久化节点、持久化顺序节点、临时节点、临时顺序节点。

  • 持久化节点:服务注册后保证节点不会丢失,注册中心重启也会存在。
  • 持久化顺序节点:在持久节点特性的基础上增加了节点先后顺序的能力。
  • 临时节点:服务注册后连接丢失或session超时,注册的节点会自动被移除。
  • 临时顺序节点:在临时节点的特性上增加了节点先后顺序的能力。

Dubbo使用zk作为注册中心时,只会创建持久化节点和临时节点两种,对创建的顺序并没有要求。

/dubbo/org.apache.dubbo.demo.DemoService/providers是服务提供者在ZooKeeper注册中心的路径示例,是一种树形结构,该结构分为四层:root(根节点,对应示例中的dubbo)、service(接口名称,对应示例中的org.apache.dubbo.demo.DemoService)、四种服务目录(对应示例中的providers,其他目录为consumers、routers、configurations)。树形结构如下:

++ /dubbo

++-- xxxservice

​ +-- providers

​ +-- consumers

​ +-- routers

​ +-- configurators

树形结构的关系:

  1. 树的节点是注册中心分组。下面有多个服务接口,分组值分别来自用户配置<dubbo:registry>中的group属性,默认是/dubbo。
  2. 服务接口下包含四类子目录,分别是providers,consumers,routers,configurations这个路径是持久化节点。
  3. 服务提供者目录(/dubbo/service/providers)下面包含的接口有多个服务者URL元数据信息。
  4. 服务消费者目录(/dubbo/service/consumers)下面包含的接口有多个消费者URL元数据信息。
  5. 路由配置目录(/dubbo/service/routers)下面包含多个用于消费者路由策略URL元数据信息。
  6. 动态配置目录(/dubbo/service/configurators)下面包含多个用于服务者动态配置URL元数据信息。

大致的存储结构为:

  1. 在Dubbo框架启动时,会根据用户配置的服务,在注册中心创建四个目录,在providers和consumers目录中分别存储服务提供方、消费方元数据信息,主要包括IP、端口、权重和应用名等数据。
  2. 在Dubbo框架进行服务调用时,用户可以通过服务治理平台(dubbo-admin)下发路由配置。如果要运行时改变服务参数,则用户可以通过服务治理平台(dubbo-admin)下发动态配置。服务端会通过订阅机制收到属性变更,并重新更新已经暴露的服务。

Nacos原理概述

不同的是在Nacos 中,服务注册时在服务端本地会通过轮询注册中心集群节点地址进行服务得注册,在注册中心上,即Nacos Server上采用了Map保存实例信息,当然配置了持久化的服务会被保存到数据库中,在服务的调用方,为了保证本地服务实例列表的动态感知,Nacos与其他注册中心不同的是,采用了 Pull/Push同时运作的方式。通过这些我们对Nacos注册中心的原理有了一定的了解。

订阅与发布

订阅/发布是整个注册中心的核心功能之一。与传统系统应用的差别:配置变化时,无需手动触发配置重新加载,自动化运维。

当我们使用注册中心,一个服务节点下线或者新增一个服务提供者节点,订阅对应接口的消费者和治理中心就能及时厚道注册中心的通知,并更新本地配置信息。整个过程都是自动完成,无需人工参与。

Dubbo在上层抽象类这样一个共走流程,但可以有不同实现。本篇主要讲ZooKeeper和Nacos的实现方式。

ZooKeeper的实现

发布的实现

服务提供者和消费者都需要把自己注册到注册中心,服务提供者的注册是为了让消费者感知服务的存在,从而发起远程调用,也让服务治理中心感知有新的服务提供者上线。消费者的发布是为了让服务治理中心也可以发现自己。ZooKeeper发布代码非常简单,只是调用ZooKeeper的客户端再注册中心创建了一个目录,代码如下:

创建目录

zkClient.create(toUrlPath(url));
url.getParameter(DYNAMIC_KEY, true)

删除路径

zkClient.delete(toUrlPath(url));

订阅的实现

订阅通常有pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送数据给客户端。这两种方式各有利弊,目前dubbo采用的是第一次启动拉取方式,后续接收时间重新拉取数据。

在服务暴露时,服务端会订阅configurators用于监听动态配置,在消费者启动时,消费端会订阅providers、routers和configurators这三个目录,分别对一个服务提供者、路由和动态配置变化通知。

Dubbo中有哪些ZooKeeper客户端实现?

  • Apache Curator
  • zkClient

ZooKeeper注册中心采用的是 事件通知 + 客户端拉取的方式,客户端在第一次连接上注册中心时,会获取对应目录下全量的数据,并在订阅的节点上注册一个watcher,客户端与注册中心之间保持TCP长连接,后续每个节点有任何数据变化的时候,注册中心会根据watcher的回调通知客户端(事件通知),客户端接收到通知,会把对应节点的全量数据都拉取过来(客户端拉取),代码中再NotifyListener#notify(List<URL> urls)接口上就有约束的注释说明。

注意:全量拉取有局限性,当微服务节点较多会对网络造成很大压力。

ZooKeeper每个节点都有一个版本号,当节点数据发生变化(即事务操作)时,该节点的版本号就会发生变化,并触发watcher事件,推送数据给订阅方。版本号强调的是变更次数,即使该节点的值没有变化,只要有更新操作,依然会使版本号变化。

ZooKeeper全量订阅服务代码分析,核心代码来自ZookeeperRegistry#doSubscribe,代码如下:

if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                boolean check = url.getParameter(CHECK_KEY, false);
  //listeners为空说明缓存中没有就初始化一个空map
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
  //这里把listeners放入缓存中
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                  //内部方法 不会立即执行,只会在触发通知时执行
                    for (String child : currentChilds) {
                      //如果子节点有变化则会接到新通知,遍历所有的子节点
                        child = URL.decode(child);
                      //如果存在子节点还没有被订阅,说明是新节点,则订阅
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                Constants.CHECK_KEY, String.valueOf(check)), k);
                        }
                    }
                });
                              //创建持久化节点,接下来订阅持久化节点的直接子节点
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                  //遍历所有子节点进行订阅
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                          //增加当前节点的订阅,并且会返回该节点下所有子节点列表
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(check)), listener);
                    }
                }
            }

从上面可以看出,此处主要支持Dubbo服务治理平台(dubbo-admin),平台在启动时会订阅全量接口,它会感知每个服务的状态。

接下来看一下普通消费者的订阅逻辑,首先根据URL的类别得到一组需要订阅的路径。如果类别是*,则会订阅四种类型的路径(providers、routers、consumers、configurators),否则只订阅providers路径,代码如下:

    List<URL> urls = new ArrayList<>();
    //根据URL的类别,获取一组要订阅的路径
        for (String path : toCategoriesPath(url)) {
     //如果listeners缓存为空则创建缓存
      ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
      //如果zkListener缓存为空则创建缓存
      ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
      if (zkListener instanceof RegistryChildListenerImpl) {
        ((RegistryChildListenerImpl) zkListener).setLatch(latch);
      }
      zkClient.create(path, false);
      List<String> children = zkClient.addChildListener(path, zkListener);
      if (children != null) {
           //订阅,返回该节点下的子路径并缓存。
        urls.addAll(toUrlsWithEmpty(url, path, children));
      }
    }
        //回调NotifyListener,更新本地缓存信息
    notify(url, listener, urls);

注意:此处会根据URL中的category属性获取具体的类别(providers、routers、consumers、configurators),然后拉取直接子节点的数据进行通知,如果是providers类别的数据,则订阅方会更新本地Directory管理的Invoker服务列表,如果是roters分类,则订阅会更新本地路由规则列表,如果是configuators类别,则订阅方会更新或覆盖本地动态参数列表。

Nacos的实现

发布的实现

Nacos发布代码也不复杂,只是调用Nacos的客户端注册了一个实例,代码如下:

注册实例

 public void doRegister(URL url) {
        try {
            String serviceName = getServiceName(url);
            Instance instance = createInstance(url);
            /**
             *  namingService.registerInstance with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
             *  default {@link DEFAULT_GROUP}
             *
             * in https://github.com/apache/dubbo/issues/5978
             */
             //调用nacos客户端创建实例
            namingService.registerInstance(serviceName,
                getUrl().getGroup(Constants.DEFAULT_GROUP), instance);
        } catch (Throwable cause) {
            throw new RpcException("Failed to register " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause);
        }
    }

dubbo中,所以的服务都被封装成了URL,对应nacos中的服务实例Instance,所以服务注册时,只需要简单的将URL转换成Instance就可以注册到nacos中,针对具体细节可参考nacos官方文档

销毁实例

public void doUnregister(final URL url) {
    try {
        String serviceName = getServiceName(url);
        Instance instance = createInstance(url);
        //调用nacos客户端删除实例
        namingService.deregisterInstance(serviceName,
            getUrl().getGroup(Constants.DEFAULT_GROUP),
            instance.getIp()
            , instance.getPort());
    } catch (Throwable cause) {
        throw new RpcException("Failed to unregister " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause);
    }
}

订阅的实现

org.apache.dubbo.registry.nacos.NacosRegistry:518

    private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
        throws NacosException {
        EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener);
        namingService.subscribe(serviceName,
            getUrl().getGroup(Constants.DEFAULT_GROUP),
            eventListener);
    }

nacos的服务监听是EventListener,所以dubbo的服务订阅只需要将NotifyListener的处理包装进onEvent中处理即可, 通过namingService.subscribe添加nacos的订阅。最终EventListener对象会被添加到事件调度器的监听器列表中,见如下代码

com.alibaba.nacos.client.naming.event.InstancesChangeNotifier:54

private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();

    private final Object lock = new Object();

    /**
     * register listener.
     *
     * @param groupName   group name
     * @param serviceName serviceName
     * @param clusters    clusters, concat by ','. such as 'xxx,yyy'
     * @param listener    custom listener
     */
    public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
        String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
        ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
        if (eventListeners == null) {
            synchronized (lock) {
                eventListeners = listenerMap.get(key);
                if (eventListeners == null) {
                    eventListeners = new ConcurrentHashSet<EventListener>();
                    listenerMap.put(key, eventListeners);
                }
            }
        }
        eventListeners.add(listener);
    }

当有instanceEvent变化时触发他的onEvent方法,代码如下:

@Override
public void onEvent(InstancesChangeEvent event) {
    String key = ServiceInfo
            .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
    ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
    if (CollectionUtils.isEmpty(eventListeners)) {
        return;
    }
    for (final EventListener listener : eventListeners) {
        final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
        if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
            ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
        } else {
            listener.onEvent(namingEvent);
        }
    }
}

缓存机制

缓存的意义就在于拿空间换时间,如果每次远程调用都要从注册中心获取一下可以调用的服务列表,注册中心要承受巨大的流量压力。此外,每次额外的网络开销也会让整个系统的性能下降,因此Dubbo的注册中心实现了通用的缓存机制,在抽象类AbstractRegistry中实现。AbstractRegistry类结构关系图如下:

消费者活服务中心获取注册信息后会做本地缓存,内存中会有一份,保存到Properties对象中,磁盘上也会持久化一份文件,通过file对象引用。在AbstractRegistry抽象类中有如下定义:

private final Properties properties = new Properties();
// Local disk cache file
private File file;
private final ConcurrentMap<URL, Map<String, List<URL>>> notified =
  new ConcurrentHashMap<>();

内存中的缓存notified是ConcurrentHashMap里面又嵌套了一个Map,外层Map的key是消费者的URL,内层Map的key是分类,包含providers,consumers,routers,configurations四种。value则是对应的服务列表,对于没有服务提供者提供服务的URL,会已特殊的empty://前缀开头。

缓存的加载

在服务初始化的时候,AbstractRegistry构造函数会从本地磁盘文件中把持久化的注册数据督导properties对象中,并接在到内存缓存中,代码如下:

    private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                  //读取磁盘中的文件
                in = new FileInputStream(file);
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Loaded registry cache file " + file);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry cache file " + file, e);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }

Properties保存了所有服务提供者的URL,使用url#getServiceKey()作为key,提供者列表、路由规则列表、配置规则列表等作为value。由于value是列表,当存在多个的时候使用空格隔开。还有一个特殊的key.registies,保存所有的注册中心的地址。如果应用在启动过程中,注册中心无法连接活宕机,则Dubbo框架会自动通过本地缓存加载Invoker。

缓存的保存与更新

缓存的保存有同步和异步两种方式。异步会使用线程池异步保存,如果线程在执行过程中出现异常,则会再次调用线程池不断重试,代码如下所示。

同步与异步更新缓存

  if (syncSaveFile) {
  //同步保存
  doSaveProperties(version);
  } else {
  //异步保存,放入线程池,会传入一个AtomicLong的版本号,保证数据是最新的。
  registryCacheExecutor.execute(new SaveProperties(version));
  }

AbstractRegistry#notify方法中封装了更新内存缓存和更新文件缓存的逻辑。当客户端第一次订阅获取全量数据,或者后续由于订阅得到新数据时,都会调用该方法进行保存。

重试机制

由上面AbstractRegistry相关类关系图我们可以得知org.apache.dubbo.registry.support.FailbackRegistry继承了AbstractRegistry,并在此基础上增加了失败重试机制作为抽象能力。ZookeeperRegistryRedisRegistry继承该抽象方法后,直接使用即可。
FailbackRegistry抽象类中定义了一个ScheduledExecutorService,每经过固定间隔(默认
为5秒)调用FailbackRegistry#retry()方法。另外,该抽象类中还有五个比较重要的集合,如下表所示。

集合名称 集合介绍
Set failedRegistered 发起注册失败的URL集合
Set failedUnregistered 取消注册失败的URL集合
ConcurrentMap> failedSubscribed 发起订阅失败的监听器集合
ConcurrentMap> failedUnsubscribed 取消订阅失败的监听器集合
ConcurrentMap>> failedNotified 通知失败的URL集合

在定时器中调用retry方法的时候,会把这五个集合分别遍历和重试,重试成功则从集合中移除。FailbackRegistry实现了subscribe、 unsubscribe 等通用方法,里面调用了未实现的模板方法,会由子类实现。通用方法会调用这些模板方法,如果捕获到异常,则会把URL添加到对应的重试集合中,以供定时器去重试。

设计模式

Dubbo注册中心拥有良好的扩展性,用户可以在其基础上,快速开发出符合自己业务需求的注册中心。这种扩展性和Dubbo中使用的设计模式密不可分,下面介绍注册中心模块使用的设计模式。学完之后,能降低读者对注册中心源码阅读的门槛。

模板模式

整个注册中心的逻辑部分使用了模板模式,其类的关系图如图所示。

AbstractRegistry实现了Registry 接口中的注册、订阅、查询、通知等方法,还实现了磁盘文件持久化注册信息这一通用方法。 但是注册、订阅、查询、通知等方法只是简单地把URL加入对应的集合,没有具体的注册或订阅逻辑。

FailbackRegistry又继承了AbstractRegistry, 重写了父类的注册、订阅、查询和通知等方法,并且添加了重试机制。此外,还添加了四个未实现的抽象模板方法,代码如下。

未实现的抽象模板方法

         // ==== Template method ====

    public abstract void doRegister(URL url);

    public abstract void doUnregister(URL url);

    public abstract void doSubscribe(URL url, NotifyListener listener);

    public abstract void doUnsubscribe(URL url, NotifyListener listener);

以订阅为例,FailbackRegistry 重写了subscribe 方法,但只实现了订阅的大体逻辑及异常处理等通用性的东西。具体如何订阅,交给继承的子类实现。这就是模板模式的具体实现,代码如下所示。

模板模式调用

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
                //此处调用了模板方法,由子类自行实现
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            ...
        }
    }

工厂模式

所有的注册中心实现,都是通过对应的工厂创建的。工厂类之间的关系如图所示。

AbstractRegistryFactory实现了RegistryFactory 接口的getRegistry(URL ur1)方法,是一个通用实现,主要完成了加锁,以及调用抽象模板方法createRegistry(URL ur1)创建具体实现等操作,并缓存在内存中。抽象模板方法会由具体子类继承并实现,代码如下所示。

getRegistry抽象实现

        // Lock the registry access process to ensure a single instance of the registry
        registryManager.getRegistryLock().lock();
        try {
            // double check
            // fix https://github.com/apache/dubbo/issues/7265.
            defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
            if (null != defaultNopRegistry) {
                return defaultNopRegistry;
            }
            registry = registryManager.getRegistry(key);
            if (registry != null) {
              //缓存中有 则直接返回
                return registry;
            }
            //create registry by spi/ioc
              //如果注册中心还没创建过,
              //则调用抽象方法createRegistry(ur1)重新创建一个createRegistry方法由具体的子类实现    
            registry = createRegistry(url);
        } catch (Exception e) {
            if (check) {
                throw new RuntimeException("Can not create registry " + url, e);
            } else {
                LOGGER.warn("Failed to obtain or create registry ", e);
            }
        } finally {
            // Release the lock
            registryManager.getRegistryLock().unlock();
        }
                if (registry != null) {
              //创建成功 缓存起来
            registryManager.putRegistry(key, registry);
        }

虽然每种注册中心都有自己具体的工厂类,但是在什么地方判断,应该调用哪个工厂类实现呢?代码中并没有看到显式的判断。答案就在RegistryFactory 接口中,该接口里有一个Registry getRegistry(URL url)方法,该方法上有@Adaptive({"protocol"})注解,代码如下所示。

RegistryFactory源码

@SPI(scope = APPLICATION)
public interface RegistryFactory {
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

了解AOP的胖友就会很容易理解,这个注解会自动生成代码实现一些逻辑,它的value参数会从URL中获取protocol 键的值,并根据获取的值来调用不同的工厂类。例如,当url.protocol = zookeeper获得 ZookeeperRegistryFactory实现类。具体Adaptive注解的实现原理会在第4章Dubbo加载机制中讲解。

小结

本篇介绍了Dubbo中已经支持的注册中心。重点介绍了ZooKeeper和Nacos两种注册中心。讲解了两种注册中心的数据结构,以及订阅发布机制的具体实现。然后介绍了注册中心中一些通用的关键特性,如数据缓存、重试等机制。最后,在对各种机制已经了解的前提下,理解了整个注册中心源码的设计模式。下一篇,我们会详细探讨Dubbo SPI扩展点加载的原理。

关注我,获取免费学习资料,无套路。

正文到此结束