Skip to content

服务发现

CoSky 的服务发现层使消费者能够在运行时定位可用的服务实例。它提供了标准的 Redis 支持的发现路径和基于 Redis PubSub 维护本地状态的高性能一致性缓存层,延迟几乎为零。一致性层在基准测试中 getInstances 达到 76M+ ops/s,getServices 达到 455M+ ops/s。

方面详情
接口ServiceDiscovery
标准实现RedisServiceDiscovery
一致性层ConsistencyRedisServiceDiscovery
统计RedisServiceStatistic
一致性机制本地缓存 + Redis PubSub 事件驱动更新
并发模型响应式(Flux<ServiceInstance>Mono<ServiceInstance>

ServiceDiscovery 接口

ServiceDiscovery 接口定义了四个用于定位服务和实例的核心操作。

方法返回类型描述源码
getServicesFlux<String>列出命名空间中所有已注册的服务 IDServiceDiscovery.kt:25
getInstancesFlux<ServiceInstance>列出指定服务 ID 的所有实例ServiceDiscovery.kt:26
getInstanceMono<ServiceInstance>通过服务 ID 和实例 ID 获取特定实例ServiceDiscovery.kt:27
getInstanceTtlMono<Long>返回特定实例的 TTL 过期时间戳ServiceDiscovery.kt:33

RedisServiceDiscovery

RedisServiceDiscovery 是标准实现,每次请求都直接从 Redis 读取实例数据:

ConsistencyRedisServiceDiscovery

ConsistencyRedisServiceDiscovery 包装任意 ServiceDiscovery 委托者,并添加通过 Redis PubSub 事件通知保持一致的本地缓存层。这是推荐的生产环境实现。

架构

一致性层维护两个 ConcurrentHashMap 缓存:

事件驱动缓存更新

当实例发生变更(注册、注销、续约、设置元数据、过期)时,InstanceChangedEvent 处理器将变更应用到本地缓存:

事件缓存操作
REGISTER从委托者获取完整实例,添加到缓存集合
RENEW从委托者获取 TTL,更新缓存实例的 ttlAt
SET_METADATA从委托者获取完整实例,替换缓存集合中的条目
DEREGISTER从缓存集合中移除
EXPIRED从缓存集合中移除

这种增量方式避免了每次变更时的全量缓存重建 (ConsistencyRedisServiceDiscovery.kt:138)。

性能

一致性层通过从内存提供读取,大幅提升了吞吐量:

操作标准 Redis一致性层提升
getInstances~2M ops/s76M+ ops/s~38 倍
getServices~10M ops/s455M+ ops/s~45 倍
延迟 (p99)可变(受网络限制)亚微秒级确定性

ServiceStatistic

ServiceStatistic 接口提供服务级别统计:

方法返回类型描述
statService(namespace)Mono<Void>触发所有服务的统计重新计算
statService(namespace, serviceId)Mono<Void>触发特定服务的统计重新计算
getServiceStats(namespace)Flux<ServiceStat>返回所有服务统计(服务 ID + 实例数)
getInstanceCount(namespace)Mono<Long>返回所有服务的实例总数

RedisServiceStatistic 通过执行 Lua 脚本并监听实例变更事件来实现。它过滤掉 RENEW 事件,因为续约不会改变实例数 (RedisServiceStatistic.kt:56)。

每个 ServiceStat 包含:

  • serviceId: String -- 服务标识符
  • instanceCount: Int -- 已注册实例数

时序图

服务查询流程(标准模式)

mermaid
sequenceDiagram
    autonumber
    participant Client as 消费者
    participant RSD as RedisServiceDiscovery
    participant Redis as Redis (Lua 脚本)
    participant Codec as ServiceInstanceCodec

    Client->>RSD: getInstances(namespace, serviceId)
    RSD->>Redis: EVAL discovery_get_instances.lua KEYS=[namespace] ARGV=[serviceId]
    Redis->>Redis: SMEMBERS svc_itc_idx:{serviceId}
    Redis->>Redis: 对每个成员执行 HGETALL svc_itc:{instanceId}
    Redis-->>RSD: 字段-值数组列表
    RSD->>Codec: 对每个实例执行 decode(instanceData)
    Codec-->>RSD: ServiceInstance
    RSD-->>Client: Flux~ServiceInstance~

    Client->>RSD: getServices(namespace)
    RSD->>Redis: SMEMBERS {namespace}:svc_idx
    Redis-->>RSD: 服务 ID 集合
    RSD-->>Client: Flux~String~

一致性缓存更新流程

mermaid
sequenceDiagram
    autonumber
    participant Client as 消费者
    participant CRSD as ConsistencyRedisServiceDiscovery
    participant Cache as 本地缓存 (CMap)
    participant PubSub as Redis PubSub
    participant Delegate as RedisServiceDiscovery

    Note over Client,Delegate: 初始加载
    Client->>CRSD: getInstances(namespace, serviceId)
    CRSD->>Cache: computeIfAbsent(NamespacedServiceId)
    CRSD->>PubSub: 订阅实例变更事件
    CRSD->>Delegate: getInstances(namespace, serviceId)
    Delegate-->>CRSD: ServiceInstance 列表
    CRSD->>Cache: 存储为 CopyOnWriteArraySet(缓存 Mono)
    CRSD-->>Client: Flux~ServiceInstance~

    Note over Client,Delegate: 事件驱动缓存更新
    PubSub->>CRSD: InstanceChangedEvent (REGISTER)
    CRSD->>Cache: 查找服务的缓存实例
    CRSD->>Delegate: getInstance(namespace, serviceId, instanceId)
    Delegate-->>CRSD: ServiceInstance
    CRSD->>Cache: 在 CopyOnWriteArraySet 中添加/替换实例

    Note over Client,Delegate: 后续读取(缓存命中)
    Client->>CRSD: getInstances(namespace, serviceId)
    CRSD->>Cache: 获取缓存的 CopyOnWriteArraySet
    CRSD-->>Client: Flux~ServiceInstance~(来自缓存,过滤 !isExpired)

统计收集流程

mermaid
sequenceDiagram
    autonumber
    participant Event as InstanceEventListenerContainer
    participant RSS as RedisServiceStatistic
    participant Redis as Redis (Lua 脚本)
    participant Data as ServiceStat 哈希

    Event->>RSS: InstanceChangedEvent(非 RENEW)
    RSS->>Redis: EVAL service_stat.lua KEYS=[namespace] ARGV=[serviceId]
    Redis->>Redis: 从 svc_itc_idx:{serviceId} 计算实例数
    Redis->>Redis: HSET svc_stat {serviceId} {count}
    Redis-->>RSS: Boolean

    Note over Event,Data: 消费者读取统计
    RSS->>Redis: HGETALL svc_stat
    Redis-->>RSS: serviceId 到 instanceCount 的映射
    RSS-->>Event: Flux~ServiceStat~

架构图

mermaid
flowchart TB
    subgraph Consumers["消费者"]
        C1[消费者 A]
        C2[消费者 B]
    end

    subgraph ConsistencyRedisServiceDiscovery
        CS["服务缓存<br>ConcurrentHashMap"]
        CI["实例缓存<br>CopyOnWriteArraySet"]
    end

    subgraph RedisServiceDiscovery
        RSD["Redis 发现"]
    end

    subgraph Redis
        IDX[(svc_idx SET)]
        ITC[(svc_itc_idx SET)]
        HASH[(svc_itc HASH)]
        STAT[(svc_stat HASH)]
        PUB[PubSub 通道]
    end

    C1 --> CS
    C2 --> CI
    CS --> RSD
    CI --> RSD
    RSD --> IDX
    RSD --> ITC
    RSD --> HASH
    PUB -->|服务变更| CS
    PUB -->|实例变更| CI

    style Consumers fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style ConsistencyRedisServiceDiscovery fill:#161b22,stroke:#30363d,color:#e6edf3
    style RedisServiceDiscovery fill:#161b22,stroke:#30363d,color:#e6edf3
    style Redis fill:#161b22,stroke:#30363d,color:#e6edf3

相关页面

参考文献

基于 Apache License 2.0 许可发布。