Skip to content

Service Discovery

CoSky's Service Discovery layer enables consumers to locate available service instances at runtime. It provides both a standard Redis-backed discovery path and a high-performance consistency cache layer that uses Redis PubSub to maintain local state with near-zero latency. The consistency layer achieves 76M+ ops/s for getInstances and 455M+ ops/s for getServices in benchmark testing.

AspectDetail
InterfaceServiceDiscovery
Standard ImplementationRedisServiceDiscovery
Consistency LayerConsistencyRedisServiceDiscovery
StatisticsRedisServiceStatistic
Consistency MechanismLocal cache + Redis PubSub event-driven updates
Concurrency ModelReactive (Flux<ServiceInstance>, Mono<ServiceInstance>)

ServiceDiscovery Interface

The ServiceDiscovery interface defines four core operations for locating services and instances.

MethodReturn TypeDescriptionSource
getServicesFlux<String>Lists all registered service IDs in a namespaceServiceDiscovery.kt:25
getInstancesFlux<ServiceInstance>Lists all instances for a given service IDServiceDiscovery.kt:26
getInstanceMono<ServiceInstance>Retrieves a specific instance by service ID and instance IDServiceDiscovery.kt:27
getInstanceTtlMono<Long>Returns the TTL expiry timestamp for a specific instanceServiceDiscovery.kt:33

RedisServiceDiscovery

RedisServiceDiscovery is the standard implementation that reads instance data directly from Redis on every request:

ConsistencyRedisServiceDiscovery

ConsistencyRedisServiceDiscovery wraps any ServiceDiscovery delegate and adds a local cache layer that stays consistent through Redis PubSub event notifications. This is the recommended production implementation.

Architecture

The consistency layer maintains two ConcurrentHashMap caches:

Event-Driven Cache Updates

When an instance change occurs (register, deregister, renew, set_metadata, expired), the InstanceChangedEvent handler applies the change to the local cache:

EventCache Action
REGISTERFetch full instance from delegate, add to cache set
RENEWFetch TTL from delegate, update ttlAt in cached instance
SET_METADATAFetch full instance from delegate, replace in cache set
DEREGISTERRemove from cache set
EXPIREDRemove from cache set

This incremental approach avoids full cache rebuilds on every change (ConsistencyRedisServiceDiscovery.kt:138).

Performance

The consistency layer delivers dramatically higher throughput by serving reads from memory:

OperationStandard RedisConsistency LayerImprovement
getInstances~2M ops/s76M+ ops/s~38x
getServices~10M ops/s455M+ ops/s~45x
Latency (p99)Variable (network-bound)Sub-microsecondDeterministic

ServiceStatistic

The ServiceStatistic interface provides service-level statistics:

MethodReturn TypeDescription
statService(namespace)Mono<Void>Triggers statistics recalculation for all services
statService(namespace, serviceId)Mono<Void>Triggers statistics recalculation for a specific service
getServiceStats(namespace)Flux<ServiceStat>Returns all service statistics (service ID + instance count)
getInstanceCount(namespace)Mono<Long>Returns the total instance count across all services

RedisServiceStatistic implements this by executing Lua scripts and listening to instance change events. It filters out RENEW events since renewals do not change instance counts (RedisServiceStatistic.kt:56).

Each ServiceStat holds:

  • serviceId: String -- the service identifier
  • instanceCount: Int -- number of registered instances

Sequence Diagrams

Service Lookup Flow (Standard)

mermaid
sequenceDiagram
    autonumber
    participant Client as Consumer
    participant RSD as RedisServiceDiscovery
    participant Redis as Redis (Lua Script)
    participant Codec as ServiceInstanceCodec

    Client->>RSD: getInstances(namespace, serviceId)
    RSD->>Redis: EVAL discovery_get_instances.lua KEYS=[namespace] ARGV=[serviceId]
    Redis->>Redis: SMEMBERS svc_itc_idx:{serviceId}
    Redis->>Redis: HGETALL svc_itc:{instanceId} for each member
    Redis-->>RSD: List of field-value arrays
    RSD->>Codec: decode(instanceData) for each instance
    Codec-->>RSD: ServiceInstance
    RSD-->>Client: Flux~ServiceInstance~

    Client->>RSD: getServices(namespace)
    RSD->>Redis: SMEMBERS {namespace}:svc_idx
    Redis-->>RSD: Set of service IDs
    RSD-->>Client: Flux~String~

Consistency Cache Update Flow

mermaid
sequenceDiagram
    autonumber
    participant Client as Consumer
    participant CRSD as ConsistencyRedisServiceDiscovery
    participant Cache as Local Cache (CMap)
    participant PubSub as Redis PubSub
    participant Delegate as RedisServiceDiscovery

    Note over Client,Delegate: Initial Load
    Client->>CRSD: getInstances(namespace, serviceId)
    CRSD->>Cache: computeIfAbsent(NamespacedServiceId)
    CRSD->>PubSub: subscribe to instance change events
    CRSD->>Delegate: getInstances(namespace, serviceId)
    Delegate-->>CRSD: List of ServiceInstance
    CRSD->>Cache: store as CopyOnWriteArraySet (cached Mono)
    CRSD-->>Client: Flux~ServiceInstance~

    Note over Client,Delegate: Event-Driven Cache Update
    PubSub->>CRSD: InstanceChangedEvent (REGISTER)
    CRSD->>Cache: find cached instances for service
    CRSD->>Delegate: getInstance(namespace, serviceId, instanceId)
    Delegate-->>CRSD: ServiceInstance
    CRSD->>Cache: add/replace instance in CopyOnWriteArraySet

    Note over Client,Delegate: Subsequent Read (cache hit)
    Client->>CRSD: getInstances(namespace, serviceId)
    CRSD->>Cache: retrieve cached CopyOnWriteArraySet
    CRSD-->>Client: Flux~ServiceInstance~ (from cache, filtered by !isExpired)

Statistics Collection Flow

mermaid
sequenceDiagram
    autonumber
    participant Event as InstanceEventListenerContainer
    participant RSS as RedisServiceStatistic
    participant Redis as Redis (Lua Script)
    participant Data as ServiceStat Hash

    Event->>RSS: InstanceChangedEvent (non-RENEW)
    RSS->>Redis: EVAL service_stat.lua KEYS=[namespace] ARGV=[serviceId]
    Redis->>Redis: compute instance count from svc_itc_idx:{serviceId}
    Redis->>Redis: HSET svc_stat {serviceId} {count}
    Redis-->>RSS: Boolean

    Note over Event,Data: Consumer reads stats
    RSS->>Redis: HGETALL svc_stat
    Redis-->>RSS: Map of serviceId -> instanceCount
    RSS-->>Event: Flux~ServiceStat~

Architecture Diagram

mermaid
flowchart TB
    subgraph Consumers
        C1[Consumer A]
        C2[Consumer B]
    end

    subgraph ConsistencyRedisServiceDiscovery
        CS[Service Cache<br>ConcurrentHashMap]
        CI[Instance Cache<br>CopyOnWriteArraySet]
    end

    subgraph RedisServiceDiscovery
        RSD[Redis Discovery]
    end

    subgraph Redis
        IDX[(svc_idx SET)]
        ITC[(svc_itc_idx SET)]
        HASH[(svc_itc HASH)]
        STAT[(svc_stat HASH)]
        PUB[PubSub Channels]
    end

    C1 --> CS
    C2 --> CI
    CS --> RSD
    CI --> RSD
    RSD --> IDX
    RSD --> ITC
    RSD --> HASH
    PUB -->|service changed| CS
    PUB -->|instance changed| CI

    style Consumers fill:#2d333b,stroke:#6d5dfc,color:#e6edf3
    style ConsistencyRedisServiceDiscovery fill:#161b22,stroke:#30363d,color:#e6edf3
    style RedisServiceDiscovery fill:#161b22,stroke:#30363d,color:#e6edf3
    style Redis fill:#161b22,stroke:#30363d,color:#e6edf3

References

Released under the Apache License 2.0.