Skip to content

Add pd disaggregated inference#3558

Open
Bihan wants to merge 9 commits intodstackai:masterfrom
Bihan:add_pd_disaggregated_inference
Open

Add pd disaggregated inference#3558
Bihan wants to merge 9 commits intodstackai:masterfrom
Bihan:add_pd_disaggregated_inference

Conversation

@Bihan
Copy link
Collaborator

@Bihan Bihan commented Feb 10, 2026

Testing Steps

  1. Create (CPU node) in K8s cluster

  2. Create gateway in the CPU node using below config

type: gateway
name: bihan-gateway

backend: kubernetes
region: any

domain: bihan-gateway.dstack.ai
router: sglang
  1. Create GPU-node with 3 instances (1 Prefill, 1 Decode and 1 for testing scaling) in the same K8s cluster where gateway node exists.
    Note: See design doc for details on why the gateway and workers are required to be on the same network.

  2. Apply below prefill-decode service configuration

type: service
name: prefill-decode
image: lmsysorg/sglang:latest

env:
  - HF_TOKEN
  - MODEL_ID=meta-llama/Llama-3.2-1B-Instruct

replicas:
  - count: 1..2
    scaling:
      metric: rps
      target: 3
    commands:
      - |
          python -m sglang.launch_server \
            --model-path $MODEL_ID \
            --disaggregation-mode prefill \
            --disaggregation-transfer-backend mooncake \
            --host 0.0.0.0 \
            --port 8000 \
            --disaggregation-bootstrap-port 8998 \
            --log-level debug \
            > worker-server.log 2>&1
    resources:
      gpu: 1

  - count: 1
    commands:
      - |
          python -m sglang.launch_server \
            --model-path $MODEL_ID \
            --disaggregation-mode decode \
            --disaggregation-transfer-backend mooncake \
            --host 0.0.0.0 \
            --port 8000 \
            --log-level debug \
            > worker-server.log 2>&1
    resources:
      gpu: 1

port: 8000
model: meta-llama/Llama-3.2-1B-Instruct

probes:
  - type: http
    url: /health_generate
    interval: 15s

router:
  type: sglang
  policy: round_robin
  pd_disaggregation: true
  1. When rps>=3 prefill replica scales to 2.

Note: For testing you need to assign wheel to https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.1-py3-none-any.whl

Bihan Rana added 2 commits February 10, 2026 12:04
Test2

Internal IP Test

Add worker with internal_ip

Check status and register

Add Status Ready Log

Add Prefill-Decode

Add PD to dstack

Test register worker without poll

Add router config in service config

Update remove worker

Clean Up router code

Clean Up

Further Cleanup
description="The routing policy. Options: `random`, `round_robin`, `cache_aware`, `power_of_two`"
),
] = "cache_aware"
pd_disaggregation: Annotated[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds pd_disaggregation to both service configurations and gateway configurations. I'd advocate for adding it to service configurations only.

  • Whether or not the service is configured to use PD disaggregation is clearly a service property, because it depends on the replica groups configuration. I don't think many users would want to configure that property at the gateway level, making assumptions about what services are going to run on that gateway in the future.

  • Having two places for the same property complicates the interface — you'd have to explain in the docs how these places are related to each other, when and how one setting overrides the other, etc.

  • Having the property at the gateway level can potentially complicate further development — that way, you can only tell whether a service is using PD disaggregation if the service is already registered, and you need to fetch the GatewayModel object from the database to do so. For example, this would complicate adding the default probe for services with PD disaggregation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

logger.info("Registered %s domain %s", conf.type, conf.domain)

async def unregister(self, domain: str) -> None:
async def unregister(self, domain: str, service: models.Service) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) If it now accepts service, it no longer has to accept domain. The domain is available as service.domain_safe

def _uses_pd_disaggregation(service: models.Service) -> bool:
"""PD disaggregation: router talks to replicas via internal_ip, no SSH tunnels needed."""
return (
service.router is not None and getattr(service.router, "pd_disaggregation", False) is True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Attribute access via getattr is not indexed by the IDE. I think it's almost never a good idea to use it, unless there's no other option.

You could do service.router.pd_disaggregation like in other places.

If you want to make it extra clear that the attribute exists, you could also do isinstance(service.router, SGLangRouterConfig), although it seems to be redundant until we have other router types.

router: Annotated[
Optional[AnyRouterConfig],
Field(description="The router configuration"),
Optional[RouterType],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change isn't backward compatible — if I have a previously created gateway with router: type: sglang, dstack fails to start new services:

[09:10:10] INFO     dstack._internal.server.services.events:205 Emitting event: Run submitted. Status: SUBMITTED. Event targets:                        
                    run(99e05f)pretty-insect-1. Actor: user(aeafc8)admin                                                                                
ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
[...]
  File "pydantic/main.py", line 364, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for GatewayConfigurationResponse
router
  value is not a valid enumeration member; permitted: 'sglang' (type=type_error.enum; enum_values=[<RouterType.SGLANG: 'sglang'>])

And to delete the gateway:

$ dstack gateway delete -y sglang-gateway
Unexpected error: status code 500 when requesting http://localhost:3000/api/project/main/gateways/get. Check the server logs for backend issues, and the
CLI logs at (~/.dstack/logs/cli/latest.log) local CLI output

Apart from backward compatibility, having router: type: sglang is useful in case we ever support specifying any gateway-level router properties, such as the SGLang package version.

router:
  type: sglang
  version: 0.3.2

I can suggest to introduce two different pydantic models for service-level (type, policy, pd_disaggregation) and gateway-level (type, policy) router configuration. Before we can drop the gateway-level router.policy in a future version, we mention that it is deprecated in its description

)
service_https = _get_service_https(run_spec, gateway_configuration)
router = gateway_configuration.router
router = run_spec.configuration.router
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean the SGLang router will not be enabled if the service config does not specify router: type: sglang? I'm not sure if this is expected or not.

  • On one hand, I like these semantics more:
    • To use the SGLang router, you explicitly request it in the service configuration. This prevents misconfiguration cases when a non-SGLang service gets assigned to a gateway with SGLang enabled.
    • This allows to use the same gateway for both SGLang and non-SGLang services, without the need to create two separate gateways.
  • On the other hand, this change of semantics is not backward-compatible — if someone is already using SGLang-enabled gateways, they expect SGLang to be enabled automatically.

Maybe the safest option for now is to keep the old semantics (automatically enable SGLang if specified in the gateway configuration). Later, we can discuss the change of semantics with the team (cc @peterschmidt85) and decide if it's worth it

Copy link
Collaborator Author

@Bihan Bihan Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvstme
cc @peterschmidt85

Maybe we keep GatewayConfiguration's router to Optional[AnyRouterConfig] with properties type and policy without pd_disaggregation as before. This will eliminate backward compatibility issue.

Create new GatewayConfig class as

class GatewayConfig(CoreModel):
    name: Annotated[
        Optional[str],
        Field(description="The name of the gateway. Omit for the default gateway."),
    ] = None
    pd_disaggregation: Annotated[
        bool,
        Field(
            description=(
                "Enable PD disaggregation mode. Requires a gateway with SGLang router. "
                "Uses internal IP instead of SSH tunnels for replica connections."
            ),
        ),
    ] = False

Update ServiceConfigurationParams's gateway field to

gateway: Annotated[
    Optional[Union[bool, str, GatewayConfig]],
    Field(
        description=(
            "Gateway configuration. Use `false` to run without a gateway. "
            "Use `true` to run with the default gateway. "
            "Use a string for the gateway name. "
            "Use an object for name + options (e.g. pd_disaggregation). "
            "Omit to use the default gateway if one exists, otherwise no gateway."
        ),
    ),
    ] = None

YAML configs with gateway..

# Boolean shortcuts (unchanged)
gateway: true   # default gateway
gateway: false  # no gateway

# String shortcut
gateway: "my-gateway"

# Object form
gateway:
  name: "my-gateway"
  pd_disaggregation: true

# Default gateway + pd_disaggregation
gateway:
  pd_disaggregation: true

Validation in _register_service_in_server would be

if isinstance(run_spec.configuration.gateway, GatewayConfig):
    raise ServerClientError(
        "Service is configured with gateway options but no gateway is available. "
        "Please configure a gateway or set gateway to false to run without a gateway."
    )

Validation in _register_service_in_gateway

gateway_configuration = get_gateway_configuration(gateway)
service_gateway = run_spec.configuration.gateway
if (
    isinstance(service_gateway, GatewayConfig)
    and service_gateway.pd_disaggregation
):
    if gateway_configuration.router != RouterType.SGLANG:
        raise ServerClientError(
            f"Service requires a SGLang gateway (pd_disaggregation) but gateway '{gateway.name}' "
            "does not have the SGLang router configured."
        )

Finally router construction in _register_service_in_gateway becomes

router = None
if gateway_configuration.router == RouterType.SGLANG:
    service_gateway = run_spec.configuration.gateway
    pd_disaggregation = (
        isinstance(service_gateway, GatewayConfig) and service_gateway.pd_disaggregation
    )
    policy = gateway_configuration.router_policy  # or gateway_configuration.router_config.policy
    router = ServiceRouterConfig(
        type=RouterType.SGLANG.value,
        policy=policy,
        pd_disaggregation=pd_disaggregation,
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants