package main

import (
	"bytes"
	"context"
	"crypto/tls"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"net"
	"net/http"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"reflect"
	"strconv"
	"strings"
	"syscall"
	"time"

	"github.com/google/uuid"
	"github.com/gpuaas/platform/packages/shared/controllerretry"
	"golang.org/x/crypto/ssh"
)

type config struct {
	APIBaseURL           string
	BearerToken          string
	ServiceAccountID     string
	ServiceAccountKeyID  string
	ServiceAccountSecret string
	OrgID                string
	SharedRuntimeID      string
	SharedRuntimeKeyID   string
	SharedRuntimeSecret  string
	AccessCredentialID   string
	ProjectID            string
	AppInstanceID        string
	AppSlug              string
	SSHPrivateKeyPath    string
	BootstrapScriptPath  string
	HTTPHostOverride     string
	TLSServerName        string
	ReconcileInterval    time.Duration
}

type memberListResponse struct {
	Items []member `json:"items"`
}

type operationListResponse struct {
	Items []memberOperation `json:"items"`
}

type member struct {
	ID                    string         `json:"id"`
	AppInstanceID         string         `json:"app_instance_id"`
	ComponentKey          string         `json:"component_key"`
	Status                string         `json:"status"`
	BoundNodeID           *string        `json:"bound_node_id"`
	BoundNodeResourceName *string        `json:"bound_node_resource_name"`
	AdapterDetail         map[string]any `json:"adapter_detail"`
}

type memberOperation struct {
	ID               string         `json:"id"`
	AppInstanceID    string         `json:"app_instance_id"`
	Action           string         `json:"action"`
	ComponentKey     *string        `json:"component_key"`
	TargetMemberID   *string        `json:"target_member_id"`
	RequestedCount   *int           `json:"requested_count"`
	AllocationIntent map[string]any `json:"allocation_intent"`
	Status           string         `json:"status"`
	CorrelationID    string         `json:"correlation_id"`
}

type appInstance struct {
	ID              string         `json:"id"`
	AppSlug         string         `json:"app_slug"`
	AppArtifactID   *string        `json:"app_artifact_id"`
	Status          string         `json:"status"`
	PlacementIntent map[string]any `json:"placement_intent"`
	RuntimeState    map[string]any `json:"runtime_state"`
}

type appInstanceListResponse struct {
	Items []appInstance `json:"items"`
}

type sharedAppRuntime struct {
	ID              string         `json:"id"`
	AppSlug         string         `json:"app_slug"`
	RuntimeBackend  string         `json:"runtime_backend"`
	Status          string         `json:"status"`
	PlacementIntent map[string]any `json:"placement_intent"`
	RuntimeState    map[string]any `json:"runtime_state"`
}

type sharedWorker struct {
	ID               string         `json:"id"`
	SharedRuntimeID  string         `json:"shared_runtime_id"`
	OrgID            string         `json:"org_id"`
	SourceProjectID  *string        `json:"source_project_id"`
	AttachmentID     *string        `json:"attachment_id"`
	ComponentKey     string         `json:"component_key"`
	ContributionMode string         `json:"contribution_mode"`
	AllocationID     string         `json:"allocation_id"`
	BoundNodeID      *string        `json:"bound_node_id"`
	Status           string         `json:"status"`
	RuntimeState     map[string]any `json:"runtime_state"`
}

type sharedWorkerListResponse struct {
	Items []sharedWorker `json:"items"`
}

type sharedWorkerOperation struct {
	ID                     string   `json:"id"`
	SharedRuntimeID        string   `json:"shared_runtime_id"`
	OrgID                  string   `json:"org_id"`
	Action                 string   `json:"action"`
	AttachmentID           *string  `json:"attachment_id"`
	SourceProjectID        *string  `json:"source_project_id"`
	RequestedAllocationIDs []string `json:"requested_allocation_ids"`
	Status                 string   `json:"status"`
	CorrelationID          string   `json:"correlation_id"`
}

type sharedWorkerOperationListResponse struct {
	Items []sharedWorkerOperation `json:"items"`
}

type allocation struct {
	ID         string      `json:"id"`
	NodeID     *string     `json:"node_id"`
	SSHKeys    []sshKeyRef `json:"ssh_keys"`
	Connection struct {
		Host           *string `json:"host"`
		Hostname       *string `json:"hostname"`
		Port           *int    `json:"port"`
		UsernameOnNode *string `json:"username_on_node"`
	} `json:"connection"`
}

type sshKeyRef struct {
	ID string `json:"id"`
}

type projectAccessCredentialListResponse struct {
	Items []projectAccessCredential `json:"items"`
}

type projectAccessCredential struct {
	ID             string                           `json:"id"`
	ResourceType   *string                          `json:"resource_type"`
	ResourceID     *string                          `json:"resource_id"`
	Bindings       []projectAccessCredentialBinding `json:"bindings"`
	CredentialKind string                           `json:"credential_kind"`
	Status         string                           `json:"status"`
}

type projectAccessCredentialBinding struct {
	ResourceType string `json:"resource_type"`
	ResourceID   string `json:"resource_id"`
	Purpose      string `json:"purpose"`
}

type deliveredAccessCredentialResponse struct {
	Credential struct {
		ID             string `json:"id"`
		CredentialKind string `json:"credential_kind"`
		DeliveryMode   string `json:"delivery_mode"`
	} `json:"credential"`
	Delivery struct {
		Mode         string  `json:"mode"`
		WrappedToken *string `json:"wrapped_token"`
		UnwrapURL    *string `json:"unwrap_url"`
	} `json:"delivery"`
}

type runtimeSecretBundle struct {
	AppInstanceID  string  `json:"app_instance_id"`
	ProjectID      string  `json:"project_id"`
	AppArtifactID  string  `json:"app_artifact_id"`
	AppSlug        string  `json:"app_slug"`
	AppVersion     string  `json:"app_version"`
	ArtifactKind   string  `json:"artifact_kind"`
	SourceType     string  `json:"source_type"`
	Repository     string  `json:"repository"`
	SourceURI      *string `json:"source_uri"`
	Digest         string  `json:"digest"`
	RuntimeBackend string  `json:"runtime_backend"`
	Purpose        string  `json:"purpose"`
	DeliveryMode   string  `json:"delivery_mode"`
	WrappedToken   string  `json:"wrapped_token"`
	UnwrapURL      *string `json:"unwrap_url"`
	ExpiresAt      *string `json:"expires_at"`
}

type controllerToken struct {
	AccessToken      string `json:"access_token"`
	ExpiresInSeconds int    `json:"expires_in_seconds"`
	TokenType        string `json:"token_type"`
}

type controllerTokenCache struct {
	token         string
	expiresAt     time.Time
	refreshBefore time.Duration
	now           func() time.Time
}

var runBootstrapFunc = runBootstrap
var runRemoteSudoCaptureFunc = runRemoteSudoCapture
var reconcileBootstrapSSHTrustFunc = reconcileBootstrapSSHTrust
var bootstrapSlurmWorkerNodeFunc = bootstrapSlurmWorkerNode

func main() {
	log := slog.New(slog.NewJSONHandler(os.Stdout, nil)).With("service", "slurm-reference-controller")
	slog.SetDefault(log)

	cfg, err := loadConfig()
	if err != nil {
		log.Error("config error", "error", err)
		os.Exit(1)
	}

	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	if cfg.ReconcileInterval <= 0 {
		runCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
		defer cancel()
		if err := reconcile(runCtx, log, cfg); err != nil {
			log.Error("reconcile failed", "error", err)
			os.Exit(1)
		}
		log.Info("reconcile complete", "project_id", cfg.ProjectID, "org_id", cfg.OrgID, "app_instance_id", cfg.AppInstanceID, "shared_runtime_id", cfg.SharedRuntimeID, "app_slug", cfg.AppSlug)
		return
	}

	log.Info("starting continuous reconcile loop", "project_id", cfg.ProjectID, "org_id", cfg.OrgID, "app_instance_id", cfg.AppInstanceID, "shared_runtime_id", cfg.SharedRuntimeID, "app_slug", cfg.AppSlug, "interval", cfg.ReconcileInterval.String())
	tokenCache := newControllerTokenCache()
	retryBackoff := controllerretry.NewExponentialBackoff(cfg.ReconcileInterval, 5*time.Minute)
	for {
		runCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
		err := reconcileWithTokenCache(runCtx, log, cfg, tokenCache)
		cancel()
		nextDelay := cfg.ReconcileInterval
		if err != nil {
			nextDelay = retryBackoff.RecordFailure()
			log.Error("reconcile iteration failed", "error", err, "project_id", cfg.ProjectID, "org_id", cfg.OrgID, "app_instance_id", cfg.AppInstanceID, "shared_runtime_id", cfg.SharedRuntimeID, "app_slug", cfg.AppSlug, "retry_failures", retryBackoff.Failures(), "next_retry_delay", nextDelay.String())
		} else {
			retryBackoff.Reset()
			log.Info("reconcile iteration complete", "project_id", cfg.ProjectID, "org_id", cfg.OrgID, "app_instance_id", cfg.AppInstanceID, "shared_runtime_id", cfg.SharedRuntimeID, "app_slug", cfg.AppSlug)
		}
		select {
		case <-ctx.Done():
			log.Info("controller exiting", "reason", ctx.Err())
			return
		case <-time.After(nextDelay):
		}
	}
}

func loadConfig() (config, error) {
	cfg := config{
		APIBaseURL:           strings.TrimRight(strings.TrimSpace(getenvDefault("SLURM_REFERENCE_API_BASE_URL", "https://api.gpuaas.localhost")), "/"),
		BearerToken:          strings.TrimSpace(os.Getenv("SLURM_REFERENCE_BEARER_TOKEN")),
		ServiceAccountID:     strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SERVICE_ACCOUNT_ID")),
		ServiceAccountKeyID:  strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SERVICE_ACCOUNT_KEY_ID")),
		ServiceAccountSecret: strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SERVICE_ACCOUNT_SECRET")),
		OrgID:                strings.TrimSpace(os.Getenv("SLURM_REFERENCE_ORG_ID")),
		SharedRuntimeID:      strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SHARED_RUNTIME_ID")),
		SharedRuntimeKeyID:   strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SHARED_RUNTIME_OPERATOR_KEY_ID")),
		SharedRuntimeSecret:  strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SHARED_RUNTIME_OPERATOR_SECRET")),
		AccessCredentialID:   strings.TrimSpace(os.Getenv("SLURM_REFERENCE_ACCESS_CREDENTIAL_ID")),
		ProjectID:            strings.TrimSpace(os.Getenv("SLURM_REFERENCE_PROJECT_ID")),
		AppInstanceID:        strings.TrimSpace(os.Getenv("SLURM_REFERENCE_APP_INSTANCE_ID")),
		AppSlug:              strings.TrimSpace(getenvDefault("SLURM_REFERENCE_APP_SLUG", "slurm-reference")),
		SSHPrivateKeyPath:    strings.TrimSpace(os.Getenv("SLURM_REFERENCE_SSH_PRIVATE_KEY_PATH")),
		BootstrapScriptPath:  strings.TrimSpace(getenvDefault("SLURM_REFERENCE_BOOTSTRAP_SCRIPT_PATH", "infra/env/dev-gpu/slurm-reference/bootstrap-single-node.sh")),
		HTTPHostOverride:     strings.TrimSpace(os.Getenv("SLURM_REFERENCE_HTTP_HOST")),
		TLSServerName:        strings.TrimSpace(os.Getenv("SLURM_REFERENCE_TLS_SERVER_NAME")),
	}
	hasBearerToken := cfg.BearerToken != ""
	hasServiceAccountCredential := cfg.ServiceAccountID != "" || cfg.ServiceAccountKeyID != "" || cfg.ServiceAccountSecret != ""
	hasSharedRuntimeCredential := cfg.SharedRuntimeID != "" || cfg.SharedRuntimeKeyID != "" || cfg.SharedRuntimeSecret != ""
	if !hasBearerToken && !hasServiceAccountCredential && !hasSharedRuntimeCredential {
		return config{}, errors.New("either SLURM_REFERENCE_BEARER_TOKEN or machine credential env vars are required")
	}
	if hasServiceAccountCredential && (cfg.ServiceAccountID == "" || cfg.ServiceAccountKeyID == "" || cfg.ServiceAccountSecret == "") {
		return config{}, errors.New("SLURM_REFERENCE_SERVICE_ACCOUNT_ID, SLURM_REFERENCE_SERVICE_ACCOUNT_KEY_ID, and SLURM_REFERENCE_SERVICE_ACCOUNT_SECRET must all be set together")
	}
	if hasSharedRuntimeCredential && (cfg.SharedRuntimeID == "" || cfg.SharedRuntimeKeyID == "" || cfg.SharedRuntimeSecret == "") {
		return config{}, errors.New("SLURM_REFERENCE_SHARED_RUNTIME_ID, SLURM_REFERENCE_SHARED_RUNTIME_OPERATOR_KEY_ID, and SLURM_REFERENCE_SHARED_RUNTIME_OPERATOR_SECRET must all be set together")
	}
	if cfg.SharedRuntimeID != "" {
		if cfg.OrgID == "" {
			return config{}, errors.New("SLURM_REFERENCE_ORG_ID is required for shared runtime mode")
		}
	} else {
		if cfg.ProjectID == "" {
			return config{}, errors.New("SLURM_REFERENCE_PROJECT_ID is required")
		}
		if cfg.AppInstanceID == "" && cfg.AppSlug == "" {
			return config{}, errors.New("either SLURM_REFERENCE_APP_INSTANCE_ID or SLURM_REFERENCE_APP_SLUG is required")
		}
	}
	if intervalRaw := strings.TrimSpace(os.Getenv("SLURM_REFERENCE_RECONCILE_INTERVAL_SECONDS")); intervalRaw != "" {
		seconds, err := strconv.Atoi(intervalRaw)
		if err != nil || seconds < 0 {
			return config{}, errors.New("SLURM_REFERENCE_RECONCILE_INTERVAL_SECONDS must be a non-negative integer")
		}
		cfg.ReconcileInterval = time.Duration(seconds) * time.Second
	}
	return cfg, nil
}

func reconcile(ctx context.Context, log *slog.Logger, cfg config) error {
	return reconcileWithTokenCache(ctx, log, cfg, newControllerTokenCache())
}

func reconcileWithTokenCache(ctx context.Context, log *slog.Logger, cfg config, tokenCache *controllerTokenCache) error {
	if strings.TrimSpace(cfg.SharedRuntimeID) != "" {
		return reconcileSharedRuntimeWithTokenCache(ctx, log, cfg, tokenCache)
	}
	return reconcileProjectWithTokenCache(ctx, log, cfg, tokenCache)
}

func reconcileProject(ctx context.Context, log *slog.Logger, cfg config) error {
	return reconcileProjectWithTokenCache(ctx, log, cfg, newControllerTokenCache())
}

func reconcileProjectWithTokenCache(ctx context.Context, log *slog.Logger, cfg config, tokenCache *controllerTokenCache) error {
	if strings.TrimSpace(cfg.AppInstanceID) != "" {
		return reconcileInstanceWithTokenCache(ctx, log, cfg, tokenCache)
	}
	client := &http.Client{
		Timeout: 45 * time.Second,
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				MinVersion: tls.VersionTLS12,
				ServerName: cfg.TLSServerName,
			},
		},
	}
	if cfg.BearerToken == "" {
		token, minted, err := tokenCache.ServiceAccountToken(ctx, client, cfg)
		if err != nil {
			return err
		}
		cfg.BearerToken = token
		if minted {
			log.Info("minted service account token", "service_account_id", cfg.ServiceAccountID)
		}
	}
	items, err := listAppInstances(ctx, log, cfg)
	if err != nil {
		return err
	}
	for i := range items {
		instanceCfg := cfg
		instanceCfg.AppInstanceID = items[i].ID
		if err := reconcileInstance(ctx, log, instanceCfg); err != nil {
			return err
		}
	}
	return nil
}

func reconcileSharedRuntime(ctx context.Context, log *slog.Logger, cfg config) error {
	return reconcileSharedRuntimeWithTokenCache(ctx, log, cfg, newControllerTokenCache())
}

func reconcileSharedRuntimeWithTokenCache(ctx context.Context, log *slog.Logger, cfg config, tokenCache *controllerTokenCache) error {
	client := &http.Client{
		Timeout: 45 * time.Second,
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				MinVersion: tls.VersionTLS12,
				ServerName: cfg.TLSServerName,
			},
		},
	}
	if cfg.BearerToken == "" {
		token, minted, err := tokenCache.SharedRuntimeOperatorToken(ctx, client, cfg)
		if err != nil {
			return err
		}
		cfg.BearerToken = token
		if minted {
			log.Info("minted shared runtime operator token", "shared_runtime_id", cfg.SharedRuntimeID)
		}
	}
	runtimeItem, err := getSharedRuntime(ctx, client, cfg)
	if err != nil {
		return err
	}
	if !strings.EqualFold(strings.TrimSpace(runtimeItem.RuntimeBackend), "slurm") {
		return fmt.Errorf("shared runtime %s is not slurm-backed", runtimeItem.ID)
	}
	keyPath, err := ensureSSHPrivateKeyPath(ctx, client, cfg)
	if err != nil {
		return err
	}
	cfg.SSHPrivateKeyPath = keyPath
	allocationID, err := sharedRuntimeControllerAllocationID(runtimeItem)
	if err != nil {
		return err
	}
	target, err := getOrgAllocation(ctx, client, cfg, allocationID)
	if err != nil {
		return err
	}
	host, user, err := allocationSSHTarget(target)
	if err != nil {
		return err
	}
	switch strings.TrimSpace(runtimeItem.Status) {
	case "running":
		log.Info("shared runtime already running", "shared_runtime_id", runtimeItem.ID, "allocation_id", allocationID)
	case "deleting", "deleted":
		log.Info("shared runtime not bootstrapped in current state", "shared_runtime_id", runtimeItem.ID, "status", runtimeItem.Status)
		return nil
	default:
		log.Info("bootstrapping shared runtime controller", "shared_runtime_id", runtimeItem.ID, "allocation_id", allocationID, "host", host, "user", user, "status", runtimeItem.Status)
		if err := runBootstrapFunc(ctx, cfg.BootstrapScriptPath, cfg.SSHPrivateKeyPath, user, host); err != nil {
			message := err.Error()
			runtimeState := cloneMap(runtimeItem.RuntimeState)
			runtimeState["phase"] = "shared_controller_failed"
			runtimeState["last_adapter_error"] = message
			runtimeState["controller_allocation_id"] = allocationID
			runtimeState["controller_hostname"] = host
			runtimeState["last_runtime_observed_at"] = time.Now().UTC().Format(time.RFC3339)
			if reportErr := reportSharedRuntimeStatus(ctx, client, cfg, "failed", &message, runtimeState); reportErr != nil {
				return fmt.Errorf("bootstrap failed: %w; shared runtime report failed: %v", err, reportErr)
			}
			return err
		}
		now := time.Now().UTC().Format(time.RFC3339)
		runtimeState := cloneMap(runtimeItem.RuntimeState)
		for k, v := range map[string]any{
			"role":                     "shared_controller",
			"phase":                    "slurm_ready",
			"cluster_name":             "gpuaas-local",
			"partition_name":           "compute",
			"controller_allocation_id": allocationID,
			"controller_hostname":      host,
			"slurmctld_state":          "up",
			"slurmd_state":             "idle",
			"last_runtime_observed_at": now,
		} {
			runtimeState[k] = v
		}
		if err := reportSharedRuntimeStatus(ctx, client, cfg, "running", nil, runtimeState); err != nil {
			return err
		}
	}
	return reconcileSharedWorkerOperations(ctx, log, client, cfg)
}

func reconcileInstance(ctx context.Context, log *slog.Logger, cfg config) error {
	return reconcileInstanceWithTokenCache(ctx, log, cfg, newControllerTokenCache())
}

func reconcileInstanceWithTokenCache(ctx context.Context, log *slog.Logger, cfg config, tokenCache *controllerTokenCache) error {
	client := &http.Client{
		Timeout: 45 * time.Second,
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				MinVersion: tls.VersionTLS12,
				ServerName: cfg.TLSServerName,
			},
		},
	}
	if cfg.BearerToken == "" {
		token, minted, err := tokenCache.ServiceAccountToken(ctx, client, cfg)
		if err != nil {
			return err
		}
		cfg.BearerToken = token
		if minted {
			log.Info("minted service account token", "service_account_id", cfg.ServiceAccountID)
		}
	}
	instance, err := getAppInstance(ctx, client, cfg)
	if err != nil {
		return err
	}
	members, err := listMembers(ctx, client, cfg)
	if err != nil {
		_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_member_discovery_failed", "degraded", err.Error(), nil)
		return err
	}
	controller, err := pickController(members)
	if err != nil {
		_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_controller_member_missing", "degraded", err.Error(), nil)
		return err
	}
	if isSlurmInstanceLifecycleStatus(instance.Status) {
		return reconcileAppInstanceLifecycle(ctx, log, client, cfg, instance, controller)
	}
	if controller.Status == "ready" && instance.Status == "running" {
		log.Info("controller already ready and instance already running", "member_id", controller.ID, "app_instance_id", instance.ID)
		if instanceHasStaleAdapterError(instance) {
			if err := reportInstanceStatus(ctx, client, cfg, instance, "running", nil, map[string]any{"health_status": "healthy"}); err != nil {
				return err
			}
		}
	} else {
		resolvedCredentialID, err := resolveAccessCredentialID(ctx, client, cfg, instance)
		if err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_bootstrap_blocked", "degraded", err.Error(), nil)
			return err
		}
		cfg.AccessCredentialID = resolvedCredentialID
		_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "bootstrap_credential_resolved", "progressing", "Resolved app-instance bootstrap SSH credential.", map[string]any{
			"access_credential_id": resolvedCredentialID,
		})
		keyPath, err := ensureSSHPrivateKeyPath(ctx, client, cfg)
		if err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "bootstrap_credential_delivery_failed", "degraded", err.Error(), map[string]any{
				"access_credential_id": resolvedCredentialID,
			})
			return err
		}
		cfg.SSHPrivateKeyPath = keyPath

		controllerAllocationID, err := controllerAllocationIDForBootstrap(instance, controller)
		if err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_controller_allocation_missing", "degraded", err.Error(), nil)
			return err
		}
		host, user, err := controllerSSHTarget(ctx, client, cfg, controller)
		if err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_controller_ssh_target_missing", "degraded", err.Error(), map[string]any{
				"member_id": controller.ID,
			})
			return err
		}
		if err := reconcileBootstrapSSHTrustFunc(ctx, client, cfg, controllerAllocationID, user, cfg.SSHPrivateKeyPath, "present"); err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "bootstrap_ssh_trust_reconcile_failed", "degraded", err.Error(), map[string]any{
				"member_id":        controller.ID,
				"allocation_id":    controllerAllocationID,
				"username_on_node": user,
			})
			return err
		}
		_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "bootstrap_ssh_trust_reconciled", "progressing", "Bootstrap SSH trust reconciled on the controller allocation.", map[string]any{
			"member_id":        controller.ID,
			"allocation_id":    controllerAllocationID,
			"username_on_node": user,
		})
		artifactSecretDetail, artifactPullToken, err := issueArtifactPullRuntimeSecretIfConfigured(ctx, client, cfg, instance)
		if err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "artifact_pull_secret_issue_failed", "degraded", err.Error(), map[string]any{
				"member_id":       controller.ID,
				"app_artifact_id": strings.TrimSpace(stringValue(instance.AppArtifactID)),
			})
			return err
		}
		if artifactSecretDetail != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "artifact_pull_secret_issued", "progressing", "Issued trusted artifact pull secret for Slurm bootstrap.", artifactSecretDetail)
		}

		log.Info("bootstrapping controller member", "member_id", controller.ID, "host", host, "user", user, "member_status", controller.Status, "instance_status", instance.Status)
		bootstrapDetail := map[string]any{
			"member_id": controller.ID,
			"host":      host,
		}
		if artifactSecretDetail != nil {
			bootstrapDetail["artifact_pull"] = artifactSecretDetail
		}
		_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_bootstrap_running", "progressing", "Starting Slurm controller bootstrap over SSH.", bootstrapDetail)
		if err := runBootstrapWithArtifactPullSecret(ctx, cfg.BootstrapScriptPath, cfg.SSHPrivateKeyPath, user, host, artifactPullToken, artifactSecretDetail); err != nil {
			_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_bootstrap_failed", "degraded", err.Error(), map[string]any{
				"member_id": controller.ID,
				"host":      host,
			})
			return err
		}
		_ = reportInstanceBootstrapEvent(ctx, client, cfg, instance, "slurm_bootstrap_completed", "healthy", "Slurm controller bootstrap completed.", map[string]any{
			"member_id": controller.ID,
			"host":      host,
		})

		now := time.Now().UTC().Format(time.RFC3339)
		memberDetail := cloneMap(controller.AdapterDetail)
		for k, v := range map[string]any{
			"role":                     "controller",
			"phase":                    "slurm_ready",
			"controller_hostname":      host,
			"cluster_name":             "gpuaas-local",
			"partition_name":           "compute",
			"slurmctld_state":          "up",
			"slurmd_state":             "idle",
			"last_runtime_observed_at": now,
		} {
			memberDetail[k] = v
		}
		if artifactSecretDetail != nil {
			memberDetail["artifact_pull"] = artifactSecretDetail
		}
		if controller.BoundNodeID != nil {
			if err := reportMemberReady(ctx, client, cfg, controller, memberDetail); err != nil {
				return err
			}
		}
		if instance.Status != "running" {
			if err := reportInstanceRunning(ctx, client, cfg); err != nil {
				return err
			}
		}
	}
	operations, err := listMemberOperations(ctx, client, cfg)
	if err != nil {
		return err
	}
	if len(operations) > 0 {
		operationCfg, err := resolveWorkerOperationRuntimeCredential(ctx, client, cfg, instance)
		if err != nil {
			return err
		}
		cfg = operationCfg
	}
	for i := range operations {
		currentMembers, membersErr := listMembers(ctx, client, cfg)
		if membersErr != nil {
			return membersErr
		}
		currentController, controllerErr := pickController(currentMembers)
		if controllerErr != nil {
			return controllerErr
		}
		if err := reconcileWorkerOperation(ctx, log, client, cfg, currentController, currentMembers, operations[i]); err != nil {
			return err
		}
	}
	return nil
}

func instanceHasStaleAdapterError(instance *appInstance) bool {
	if instance == nil || len(instance.RuntimeState) == 0 {
		return false
	}
	if strings.TrimSpace(stringAny(instance.RuntimeState["last_adapter_error"])) != "" {
		return true
	}
	if strings.TrimSpace(stringAny(instance.RuntimeState["bootstrap_log_excerpt"])) != "" {
		return true
	}
	return false
}

func issueArtifactPullRuntimeSecretIfConfigured(ctx context.Context, client *http.Client, cfg config, instance *appInstance) (map[string]any, string, error) {
	if instance == nil || strings.TrimSpace(stringValue(instance.AppArtifactID)) == "" {
		return nil, "", nil
	}
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/runtime-secrets", cfg.APIBaseURL, cfg.ProjectID, instance.ID)
	var bundle runtimeSecretBundle
	if err := doJSON(ctx, client, cfg, http.MethodPost, url, map[string]any{"purpose": "artifact_pull"}, &bundle); err != nil {
		return nil, "", err
	}
	if strings.TrimSpace(bundle.WrappedToken) == "" {
		return nil, "", errors.New("runtime secret response missing wrapped_token")
	}
	if strings.TrimSpace(bundle.AppArtifactID) != strings.TrimSpace(stringValue(instance.AppArtifactID)) {
		return nil, "", fmt.Errorf("runtime secret app_artifact_id mismatch: got %s want %s", bundle.AppArtifactID, strings.TrimSpace(stringValue(instance.AppArtifactID)))
	}
	detail := map[string]any{
		"purpose":         strings.TrimSpace(bundle.Purpose),
		"delivery_mode":   strings.TrimSpace(bundle.DeliveryMode),
		"app_artifact_id": strings.TrimSpace(bundle.AppArtifactID),
		"artifact_kind":   strings.TrimSpace(bundle.ArtifactKind),
		"source_type":     strings.TrimSpace(bundle.SourceType),
		"repository":      strings.TrimSpace(bundle.Repository),
		"digest":          strings.TrimSpace(bundle.Digest),
		"runtime_backend": strings.TrimSpace(bundle.RuntimeBackend),
		"secret_issued":   true,
	}
	if bundle.SourceURI != nil && strings.TrimSpace(*bundle.SourceURI) != "" {
		detail["source_uri"] = strings.TrimSpace(*bundle.SourceURI)
	}
	if bundle.ExpiresAt != nil && strings.TrimSpace(*bundle.ExpiresAt) != "" {
		detail["expires_at"] = strings.TrimSpace(*bundle.ExpiresAt)
	}
	if bundle.UnwrapURL != nil && strings.TrimSpace(*bundle.UnwrapURL) != "" {
		detail["unwrap_url_present"] = true
	}
	return detail, strings.TrimSpace(bundle.WrappedToken), nil
}

func resolveWorkerOperationRuntimeCredential(ctx context.Context, client *http.Client, cfg config, instance *appInstance) (config, error) {
	if strings.TrimSpace(cfg.SSHPrivateKeyPath) != "" {
		return cfg, nil
	}
	if strings.TrimSpace(cfg.AccessCredentialID) == "" {
		resolvedCredentialID, err := resolveAccessCredentialID(ctx, client, cfg, instance)
		if err != nil {
			return config{}, err
		}
		cfg.AccessCredentialID = resolvedCredentialID
	}
	keyPath, err := ensureSSHPrivateKeyPath(ctx, client, cfg)
	if err != nil {
		return config{}, err
	}
	cfg.SSHPrivateKeyPath = keyPath
	return cfg, nil
}

func listAppInstances(ctx context.Context, log *slog.Logger, cfg config) ([]appInstance, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances?page_size=100", cfg.APIBaseURL, cfg.ProjectID)
	var out appInstanceListResponse
	if err := doJSON(ctx, &http.Client{
		Timeout: 45 * time.Second,
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				MinVersion: tls.VersionTLS12,
				ServerName: cfg.TLSServerName,
			},
		},
	}, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	items := make([]appInstance, 0, len(out.Items))
	for _, item := range out.Items {
		if strings.TrimSpace(item.AppSlug) != cfg.AppSlug {
			continue
		}
		switch item.Status {
		case "requested", "deploying", "running", "stopping", "starting", "restarting", "upgrading", "rolling_back":
			items = append(items, item)
		}
	}
	log.Info("discovered app instances for reconcile", "project_id", cfg.ProjectID, "app_slug", cfg.AppSlug, "count", len(items))
	return items, nil
}

func getAppInstance(ctx context.Context, client *http.Client, cfg config) (*appInstance, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	var out appInstance
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return &out, nil
}

func listMembers(ctx context.Context, client *http.Client, cfg config) ([]member, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/members", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	var out memberListResponse
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return out.Items, nil
}

func listMemberOperations(ctx context.Context, client *http.Client, cfg config) ([]memberOperation, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/member-operations?page_size=50", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	var out operationListResponse
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	items := make([]memberOperation, 0, len(out.Items))
	for _, item := range out.Items {
		if item.Action != "add" && item.Action != "drain" && item.Action != "remove" {
			continue
		}
		if item.Status != "accepted" && item.Status != "in_progress" {
			continue
		}
		items = append(items, item)
	}
	return items, nil
}

func pickController(items []member) (*member, error) {
	for i := range items {
		if items[i].ComponentKey == "controller" {
			return &items[i], nil
		}
	}
	return nil, errors.New("controller member not found")
}

func controllerSSHTarget(ctx context.Context, client *http.Client, cfg config, item *member) (host string, user string, err error) {
	rawHost, _ := item.AdapterDetail["host"].(string)
	rawUser, _ := item.AdapterDetail["username_on_node"].(string)
	rawHostname, _ := item.AdapterDetail["hostname"].(string)
	port := intAny(item.AdapterDetail["port"])
	host = strings.TrimSpace(rawHost)
	if host == "" {
		host = strings.TrimSpace(rawHostname)
	}
	user = strings.TrimSpace(rawUser)
	if host != "" && user != "" {
		return sshEndpoint(host, port), user, nil
	}
	allocationID := strings.TrimSpace(stringAny(item.AdapterDetail["allocation_id"]))
	if allocationID != "" {
		target, targetErr := getProjectAllocation(ctx, client, cfg, allocationID)
		if targetErr != nil {
			return "", "", targetErr
		}
		return allocationSSHTarget(target)
	}
	if host == "" || user == "" {
		return "", "", errors.New("controller member missing host or username_on_node in adapter_detail")
	}
	return host, user, nil
}

type controllerRuntimeTarget struct {
	keyPath      string
	host         string
	user         string
	allocationID string
}

func isSlurmInstanceLifecycleStatus(status string) bool {
	switch strings.TrimSpace(status) {
	case "stopping", "starting", "restarting":
		return true
	default:
		return false
	}
}

func reconcileAppInstanceLifecycle(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, instance *appInstance, controller *member) error {
	operation := lifecycleOperationForInstanceStatus(instance.Status)
	_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "lifecycle_target_resolving", "progressing", "Resolving Slurm controller runtime target.", nil)
	target, err := resolveControllerRuntimeTarget(ctx, client, cfg, instance, controller)
	if err != nil {
		return reportAppInstanceLifecycleFailure(ctx, client, cfg, instance, "resolve_runtime_target", err)
	}
	targetDetail := map[string]any{
		"member_id":     controller.ID,
		"allocation_id": target.allocationID,
		"host":          target.host,
	}
	_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "lifecycle_target_resolved", "progressing", "Resolved Slurm controller runtime target.", targetDetail)

	switch strings.TrimSpace(instance.Status) {
	case "stopping":
		log.Info("stopping slurm app instance", "app_instance_id", instance.ID, "host", target.host)
		_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "slurm_services_stopping", "progressing", "Stopping Slurm services on the controller allocation.", targetDetail)
		if _, err := runRemoteSudoCaptureFunc(ctx, target.keyPath, target.user, target.host, "systemctl stop slurmd slurmctld"); err != nil {
			return reportAppInstanceLifecycleFailure(ctx, client, cfg, instance, "stop", err)
		}
		_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "slurm_services_stopped", "progressing", "Slurm service stop command completed; verifying inactive services.", targetDetail)
		if err := verifySystemdInactive(ctx, target.keyPath, target.user, target.host, "slurmd"); err != nil {
			return reportAppInstanceLifecycleFailure(ctx, client, cfg, instance, "stop", err)
		}
		if err := verifySystemdInactive(ctx, target.keyPath, target.user, target.host, "slurmctld"); err != nil {
			return reportAppInstanceLifecycleFailure(ctx, client, cfg, instance, "stop", err)
		}
		now := time.Now().UTC().Format(time.RFC3339)
		detail := cloneMap(controller.AdapterDetail)
		detail["role"] = "controller"
		detail["phase"] = "slurm_stopped"
		detail["slurmctld_state"] = "stopped"
		detail["slurmd_state"] = "stopped"
		detail["host"] = target.host
		detail["username_on_node"] = target.user
		detail["last_runtime_observed_at"] = now
		if err := reportMemberStatus(ctx, client, cfg, controller, "ready", detail); err != nil {
			return err
		}
		return reportInstanceStatus(ctx, client, cfg, instance, "stopped", nil, map[string]any{
			"adapter_phase":            "slurm_stopped",
			"phase":                    "stopped",
			"last_operation":           "stop",
			"last_runtime_observed_at": now,
			"access": map[string]any{
				"status": "unavailable",
				"detail": "Slurm services are stopped on the controller allocation.",
			},
		})
	case "starting":
		log.Info("starting slurm app instance", "app_instance_id", instance.ID, "host", target.host)
		_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "slurm_bootstrap_running", "progressing", "Starting Slurm controller bootstrap over SSH.", targetDetail)
		if err := runBootstrapFunc(ctx, cfg.BootstrapScriptPath, target.keyPath, target.user, target.host); err != nil {
			return reportAppInstanceLifecycleFailure(ctx, client, cfg, instance, "start", err)
		}
		_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "slurm_bootstrap_completed", "progressing", "Slurm controller bootstrap completed; reporting runtime ready.", targetDetail)
		return reportSlurmControllerRunning(ctx, client, cfg, instance, controller, target, "start")
	case "restarting":
		log.Info("restarting slurm app instance", "app_instance_id", instance.ID, "host", target.host)
		_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "slurm_services_restarting", "progressing", "Restarting Slurm services on the controller allocation.", targetDetail)
		command := `systemctl restart munge slurmctld slurmd
sleep 2
scontrol update NodeName=$(hostname -s) State=RESUME Reason="gpuaas-restart-ready" || true
scontrol ping
sinfo -N -l`
		if _, err := runRemoteSudoCaptureFunc(ctx, target.keyPath, target.user, target.host, command); err != nil {
			return reportAppInstanceLifecycleFailure(ctx, client, cfg, instance, "restart", err)
		}
		_ = reportInstanceLifecycleEvent(ctx, client, cfg, instance, operation, "slurm_services_restarted", "progressing", "Slurm service restart completed; reporting runtime ready.", targetDetail)
		return reportSlurmControllerRunning(ctx, client, cfg, instance, controller, target, "restart")
	default:
		return nil
	}
}

func lifecycleOperationForInstanceStatus(status string) string {
	switch strings.TrimSpace(status) {
	case "stopping":
		return "stop"
	case "starting":
		return "start"
	case "restarting":
		return "restart"
	default:
		return strings.TrimSpace(status)
	}
}

func resolveControllerRuntimeTarget(ctx context.Context, client *http.Client, cfg config, instance *appInstance, controller *member) (controllerRuntimeTarget, error) {
	resolvedCredentialID, err := resolveAccessCredentialID(ctx, client, cfg, instance)
	if err != nil {
		return controllerRuntimeTarget{}, err
	}
	cfg.AccessCredentialID = resolvedCredentialID
	keyPath, err := ensureSSHPrivateKeyPath(ctx, client, cfg)
	if err != nil {
		return controllerRuntimeTarget{}, err
	}
	allocationID, err := controllerAllocationIDForBootstrap(instance, controller)
	if err != nil {
		return controllerRuntimeTarget{}, err
	}
	host, user, err := controllerSSHTarget(ctx, client, cfg, controller)
	if err != nil {
		return controllerRuntimeTarget{}, err
	}
	if err := reconcileBootstrapSSHTrustFunc(ctx, client, cfg, allocationID, user, keyPath, "present"); err != nil {
		return controllerRuntimeTarget{}, err
	}
	return controllerRuntimeTarget{
		keyPath:      keyPath,
		host:         host,
		user:         user,
		allocationID: allocationID,
	}, nil
}

func reportSlurmControllerRunning(ctx context.Context, client *http.Client, cfg config, instance *appInstance, controller *member, target controllerRuntimeTarget, operation string) error {
	now := time.Now().UTC().Format(time.RFC3339)
	detail := cloneMap(controller.AdapterDetail)
	detail["role"] = "controller"
	detail["phase"] = "slurm_ready"
	detail["controller_hostname"] = target.host
	detail["cluster_name"] = "gpuaas-local"
	detail["partition_name"] = "compute"
	detail["slurmctld_state"] = "up"
	detail["slurmd_state"] = "idle"
	detail["host"] = target.host
	detail["username_on_node"] = target.user
	detail["allocation_id"] = target.allocationID
	detail["last_runtime_observed_at"] = now
	if err := reportMemberReady(ctx, client, cfg, controller, detail); err != nil {
		return err
	}
	return reportInstanceStatus(ctx, client, cfg, instance, "running", nil, map[string]any{
		"adapter_phase":            "slurm_ready",
		"phase":                    "ready",
		"last_operation":           operation,
		"last_runtime_observed_at": now,
		"access": map[string]any{
			"status": "available",
			"detail": "Slurm controller services are running.",
		},
	})
}

func reportAppInstanceLifecycleFailure(ctx context.Context, client *http.Client, cfg config, instance *appInstance, operation string, cause error) error {
	message := cause.Error()
	now := time.Now().UTC().Format(time.RFC3339)
	detail := map[string]any{
		"operation": operation,
		"message":   message,
	}
	return reportInstanceStatus(ctx, client, cfg, instance, "failed", &message, map[string]any{
		"adapter_phase":              "lifecycle_" + operation + "_failed",
		"phase":                      "failed",
		"last_operation":             operation,
		"last_lifecycle_phase":       "lifecycle_" + operation + "_failed",
		"last_lifecycle_message":     message,
		"last_lifecycle_at":          now,
		"last_adapter_error":         message,
		"last_runtime_observed_at":   now,
		"lifecycle_health_status":    "degraded",
		"lifecycle_log_excerpt":      bootstrapLogExcerpt(message, 80),
		"lifecycle_failure_detail":   detail,
		"lifecycle_activity":         appendLifecycleActivity(instance.RuntimeState, operation, "lifecycle_"+operation+"_failed", "degraded", message, detail, now),
		"health_status":              "degraded",
		"bootstrap_or_lifecycle_gap": "controller_reported_failure",
	})
}

func newControllerTokenCache() *controllerTokenCache {
	return &controllerTokenCache{
		refreshBefore: 60 * time.Second,
		now:           time.Now,
	}
}

func (c *controllerTokenCache) ServiceAccountToken(ctx context.Context, client *http.Client, cfg config) (string, bool, error) {
	return c.get(ctx, func() (string, int, error) {
		return mintServiceAccountTokenWithExpiry(ctx, client, cfg)
	})
}

func (c *controllerTokenCache) SharedRuntimeOperatorToken(ctx context.Context, client *http.Client, cfg config) (string, bool, error) {
	return c.get(ctx, func() (string, int, error) {
		return mintSharedRuntimeOperatorTokenWithExpiry(ctx, client, cfg)
	})
}

func (c *controllerTokenCache) get(_ context.Context, mint func() (string, int, error)) (string, bool, error) {
	if c == nil {
		token, _, err := mint()
		return token, true, err
	}
	now := c.now().UTC()
	if strings.TrimSpace(c.token) != "" && now.Add(c.refreshBefore).Before(c.expiresAt) {
		return c.token, false, nil
	}
	token, expiresIn, err := mint()
	if err != nil {
		return "", false, err
	}
	if expiresIn <= 0 {
		expiresIn = 900
	}
	c.token = token
	c.expiresAt = now.Add(time.Duration(expiresIn) * time.Second)
	return c.token, true, nil
}

func reconcileWorkerOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, controller *member, members []member, op memberOperation) error {
	if op.Status != "accepted" && op.Status != "in_progress" {
		return nil
	}
	if !isWorkerOperation(op, members) {
		return nil
	}
	switch op.Action {
	case "add":
		return reconcileWorkerAddOperation(ctx, log, client, cfg, controller, members, op)
	case "drain":
		return reconcileWorkerDrainOperation(ctx, log, client, cfg, controller, members, op)
	case "remove":
		return reconcileWorkerRemoveOperation(ctx, log, client, cfg, controller, members, op)
	default:
		return nil
	}
}

func isWorkerOperation(op memberOperation, members []member) bool {
	if strings.TrimSpace(stringValue(op.ComponentKey)) == "worker" {
		return true
	}
	targetID := strings.TrimSpace(stringValue(op.TargetMemberID))
	if targetID == "" {
		return false
	}
	for i := range members {
		if members[i].ID == targetID && members[i].ComponentKey == "worker" {
			return true
		}
	}
	return false
}

func reconcileWorkerAddOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, controller *member, members []member, op memberOperation) error {
	host, user, boundNodeID, detail, err := workerTargetFromOperation(ctx, client, cfg, op)
	if err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, stringValue(op.TargetMemberID), boundNodeID, detail, err)
	}
	if existing := activeWorkerOnNode(members, boundNodeID); existing != nil {
		existingDetail := cloneMap(existing.AdapterDetail)
		for key, value := range detail {
			if _, ok := existingDetail[key]; !ok {
				existingDetail[key] = value
			}
		}
		if sameBoundNodeID(controller, boundNodeID) {
			controllerHost, controllerUser, targetErr := controllerSSHTarget(ctx, client, cfg, controller)
			if targetErr != nil {
				return reportWorkerOperationFailureWithState(ctx, client, cfg, op, existing.ID, boundNodeID, existingDetail, targetErr)
			}
			if strings.TrimSpace(controllerHost) != "" {
				existingDetail["host"] = controllerHost
			}
			if strings.TrimSpace(controllerUser) != "" {
				existingDetail["username_on_node"] = controllerUser
			}
		}
		delete(existingDetail, "last_adapter_error")
		delete(existingDetail, "last_error_code")
		delete(existingDetail, "last_error_message")
		existingDetail["role"] = "worker"
		existingDetail["phase"] = "slurmd_ready"
		existingDetail["slurmd_state"] = "idle"
		existingDetail["registered_in_controller"] = true
		existingDetail["last_runtime_observed_at"] = time.Now().UTC().Format(time.RFC3339)
		return reportWorkerOperation(ctx, client, cfg, op, "succeeded", existing.ID, ptrString("ready"), boundNodeID, existingDetail, nil, nil)
	}
	targetMemberID := stringValue(op.TargetMemberID)
	if targetMemberID == "" {
		targetMemberID = uuid.NewString()
	}
	log.Info("processing worker add operation", "operation_id", op.ID, "target_member_id", targetMemberID, "host", host, "user", user)
	if allocationID := strings.TrimSpace(stringAny(op.AllocationIntent["allocation_id"])); allocationID != "" {
		if err := reconcileBootstrapSSHTrustFunc(ctx, client, cfg, allocationID, user, cfg.SSHPrivateKeyPath, "present"); err != nil {
			return reportWorkerOperationFailureWithState(ctx, client, cfg, op, targetMemberID, boundNodeID, detail, err)
		}
	}
	if err := reportWorkerOperation(ctx, client, cfg, op, "in_progress", targetMemberID, ptrString("reconciling"), boundNodeID, detail, nil, nil); err != nil {
		return err
	}
	if sameBoundNodeID(controller, boundNodeID) {
		controllerHost, controllerUser, targetErr := controllerSSHTarget(ctx, client, cfg, controller)
		if targetErr != nil {
			return reportWorkerOperationFailureWithState(ctx, client, cfg, op, targetMemberID, boundNodeID, detail, targetErr)
		}
		if strings.TrimSpace(controllerHost) != "" {
			host = controllerHost
			detail["host"] = controllerHost
		}
		if strings.TrimSpace(controllerUser) != "" {
			user = controllerUser
			detail["username_on_node"] = controllerUser
		}
		err = nil
	} else {
		err = bootstrapSlurmWorkerNodeFunc(ctx, client, cfg, controller, host, user, workerNodeNameFromDetail(host, detail))
	}
	if err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, targetMemberID, boundNodeID, detail, err)
	}
	now := time.Now().UTC().Format(time.RFC3339)
	for k, v := range map[string]any{
		"role":                     "worker",
		"phase":                    "slurmd_ready",
		"worker_hostname":          host,
		"slurmd_state":             "idle",
		"registered_in_controller": true,
		"cluster_name":             "gpuaas-local",
		"partition_name":           "compute",
		"last_runtime_observed_at": now,
	} {
		detail[k] = v
	}
	delete(detail, "last_adapter_error")
	delete(detail, "last_error_code")
	delete(detail, "last_error_message")
	if sameBoundNodeID(controller, boundNodeID) {
		if err := reportControllerSlurmdState(ctx, client, cfg, controller, "idle", now); err != nil {
			return err
		}
	}
	return reportWorkerOperation(ctx, client, cfg, op, "succeeded", targetMemberID, ptrString("ready"), boundNodeID, detail, nil, nil)
}

func reconcileWorkerDrainOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, controller *member, members []member, op memberOperation) error {
	target, err := workerMemberFromOperation(members, op)
	if err != nil {
		return reportWorkerOperationFailure(ctx, client, cfg, op, err)
	}
	host, user, nodeName, boundNodeID, detail, err := workerTargetFromMember(target)
	if err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, target.BoundNodeID, cloneMap(target.AdapterDetail), err)
	}
	log.Info("processing worker drain operation", "operation_id", op.ID, "target_member_id", target.ID, "host", host, "user", user, "node_name", nodeName)
	if err := reportWorkerOperation(ctx, client, cfg, op, "in_progress", target.ID, ptrString("draining"), boundNodeID, detail, nil, nil); err != nil {
		return err
	}
	cmd := fmt.Sprintf("scontrol update NodeName=%s State=DRAIN Reason='gpuaas-worker-drain'", shellQuote(nodeName))
	if err := runRemoteSudoCommand(ctx, cfg.SSHPrivateKeyPath, user, host, cmd); err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, boundNodeID, detail, err)
	}
	if err := verifyNodeStateContains(ctx, cfg.SSHPrivateKeyPath, user, host, nodeName, "DRAIN"); err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, boundNodeID, detail, err)
	}
	now := time.Now().UTC().Format(time.RFC3339)
	for k, v := range map[string]any{
		"role":                     "worker",
		"phase":                    "slurmd_drained",
		"slurmd_state":             "drained",
		"registered_in_controller": true,
		"cluster_name":             "gpuaas-local",
		"partition_name":           "compute",
		"last_runtime_observed_at": now,
	} {
		detail[k] = v
	}
	if sameNode(controller, target) {
		if err := reportControllerSlurmdState(ctx, client, cfg, controller, "drained", now); err != nil {
			return err
		}
	}
	return reportWorkerOperation(ctx, client, cfg, op, "succeeded", target.ID, ptrString("draining"), boundNodeID, detail, nil, nil)
}

func reconcileWorkerRemoveOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, controller *member, members []member, op memberOperation) error {
	target, err := workerMemberFromOperation(members, op)
	if err != nil {
		return reportWorkerOperationFailure(ctx, client, cfg, op, err)
	}
	if shouldCompleteDeletedWorkerRemoval(target) {
		now := time.Now().UTC().Format(time.RFC3339)
		detail := cloneMap(target.AdapterDetail)
		detail["role"] = "worker"
		detail["phase"] = "cleanup_removed"
		detail["registered_in_controller"] = false
		detail["last_runtime_observed_at"] = now
		log.Info("worker already deleted; completing remove operation as no-op", "operation_id", op.ID, "target_member_id", target.ID)
		return reportWorkerOperation(ctx, client, cfg, op, "succeeded", target.ID, ptrString("deleted"), target.BoundNodeID, detail, nil, nil)
	}
	if shouldTombstoneBootstrapFailedWorker(target) {
		now := time.Now().UTC().Format(time.RFC3339)
		cleanupDetail := cloneMap(target.AdapterDetail)
		for k, v := range map[string]any{
			"role":                     "worker",
			"phase":                    "cleanup_removed",
			"registered_in_controller": false,
			"last_runtime_observed_at": now,
		} {
			cleanupDetail[k] = v
		}
		log.Info("tombstoning bootstrap-failed worker without ssh", "operation_id", op.ID, "target_member_id", target.ID)
		return reportWorkerOperation(ctx, client, cfg, op, "succeeded", target.ID, ptrString("deleted"), target.BoundNodeID, cleanupDetail, nil, nil)
	}
	host, user, nodeName, boundNodeID, detail, err := workerTargetFromMember(target)
	if err != nil {
		if shouldTombstoneHistoricalFailedWorker(target) {
			now := time.Now().UTC().Format(time.RFC3339)
			cleanupDetail := cloneMap(target.AdapterDetail)
			for k, v := range map[string]any{
				"role":                     "worker",
				"phase":                    "cleanup_removed",
				"registered_in_controller": false,
				"last_runtime_observed_at": now,
			} {
				cleanupDetail[k] = v
			}
			log.Info("tombstoning historical failed worker without ssh", "operation_id", op.ID, "target_member_id", target.ID)
			return reportWorkerOperation(ctx, client, cfg, op, "succeeded", target.ID, ptrString("deleted"), target.BoundNodeID, cleanupDetail, nil, nil)
		}
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, target.BoundNodeID, cloneMap(target.AdapterDetail), err)
	}
	log.Info("processing worker remove operation", "operation_id", op.ID, "target_member_id", target.ID, "host", host, "user", user, "node_name", nodeName)
	if err := reportWorkerOperation(ctx, client, cfg, op, "in_progress", target.ID, ptrString("deleting"), boundNodeID, detail, nil, nil); err != nil {
		return err
	}
	cmd := fmt.Sprintf("systemctl disable --now slurmd && scontrol update NodeName=%s State=DOWN Reason='gpuaas-worker-removed' || true", shellQuote(nodeName))
	if err := runRemoteSudoCommand(ctx, cfg.SSHPrivateKeyPath, user, host, cmd); err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, boundNodeID, detail, err)
	}
	if err := verifySystemdInactive(ctx, cfg.SSHPrivateKeyPath, user, host, "slurmd"); err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, boundNodeID, detail, err)
	}
	if err := verifyNodeStateContains(ctx, cfg.SSHPrivateKeyPath, user, host, nodeName, "DOWN"); err != nil {
		return reportWorkerOperationFailureWithState(ctx, client, cfg, op, target.ID, boundNodeID, detail, err)
	}
	now := time.Now().UTC().Format(time.RFC3339)
	for k, v := range map[string]any{
		"role":                     "worker",
		"phase":                    "slurmd_removed",
		"slurmd_state":             "stopped",
		"registered_in_controller": false,
		"cluster_name":             "gpuaas-local",
		"partition_name":           "compute",
		"last_runtime_observed_at": now,
	} {
		detail[k] = v
	}
	if sameNode(controller, target) {
		if err := reportControllerSlurmdState(ctx, client, cfg, controller, "stopped", now); err != nil {
			return err
		}
	}
	return reportWorkerOperation(ctx, client, cfg, op, "succeeded", target.ID, ptrString("deleted"), boundNodeID, detail, nil, nil)
}

func shouldCompleteDeletedWorkerRemoval(item *member) bool {
	return item != nil && item.ComponentKey == "worker" && item.Status == "deleted"
}

func shouldTombstoneBootstrapFailedWorker(item *member) bool {
	if item == nil {
		return false
	}
	if item.ComponentKey != "worker" || item.Status != "failed" {
		return false
	}
	detail := cloneMap(item.AdapterDetail)
	switch strings.TrimSpace(stringAny(detail["phase"])) {
	case "bootstrap_failed", "remove_failed":
	default:
		return false
	}
	if strings.TrimSpace(stringAny(detail["slurmd_state"])) != "" {
		return false
	}
	if registered, ok := detail["registered_in_controller"].(bool); ok && registered {
		return false
	}
	return true
}

func activeWorkerOnNode(items []member, boundNodeID *string) *member {
	nodeID := strings.TrimSpace(stringValue(boundNodeID))
	if nodeID == "" {
		return nil
	}
	for i := range items {
		item := &items[i]
		if item.ComponentKey != "worker" {
			continue
		}
		if item.Status == "deleted" || item.Status == "failed" {
			continue
		}
		if strings.TrimSpace(stringValue(item.BoundNodeID)) == nodeID {
			return item
		}
	}
	return nil
}

func shouldTombstoneHistoricalFailedWorker(item *member) bool {
	if item == nil {
		return false
	}
	if item.ComponentKey != "worker" || item.Status != "failed" {
		return false
	}
	if item.BoundNodeID != nil && strings.TrimSpace(*item.BoundNodeID) != "" {
		return false
	}
	detail := cloneMap(item.AdapterDetail)
	if strings.TrimSpace(stringAny(detail["host"])) != "" {
		return false
	}
	if strings.TrimSpace(stringAny(detail["hostname"])) != "" {
		return false
	}
	if strings.TrimSpace(stringAny(detail["worker_hostname"])) != "" {
		return false
	}
	if strings.TrimSpace(stringAny(detail["username_on_node"])) != "" {
		return false
	}
	return true
}

func workerTargetFromOperation(ctx context.Context, client *http.Client, cfg config, op memberOperation) (host string, user string, boundNodeID *string, detail map[string]any, err error) {
	detail = cloneMap(op.AllocationIntent)
	allocationID := strings.TrimSpace(stringAny(detail["allocation_id"]))
	if allocationID != "" {
		target, targetErr := getProjectAllocation(ctx, client, cfg, allocationID)
		if targetErr != nil {
			return "", "", nil, nil, targetErr
		}
		host, user, boundNodeID, detail = allocationIntentFromAllocation(target)
		return host, user, boundNodeID, detail, nil
	}
	rawHost, _ := detail["host"].(string)
	rawHostname, _ := detail["hostname"].(string)
	rawUser, _ := detail["username_on_node"].(string)
	rawBoundNodeID, _ := detail["bound_node_id"].(string)
	host = strings.TrimSpace(rawHost)
	if host == "" {
		host = strings.TrimSpace(rawHostname)
	}
	user = strings.TrimSpace(rawUser)
	if rawBoundNodeID != "" {
		boundNodeID = &rawBoundNodeID
	}
	if host == "" || user == "" {
		return "", "", nil, nil, errors.New("worker add operation missing allocation_id or host/username_on_node in allocation_intent")
	}
	detail["host"] = host
	if strings.TrimSpace(rawHostname) == "" {
		detail["hostname"] = host
	}
	detail["username_on_node"] = user
	return host, user, boundNodeID, detail, nil
}

func workerMemberFromOperation(items []member, op memberOperation) (*member, error) {
	targetID := stringValue(op.TargetMemberID)
	if targetID == "" {
		return nil, errors.New("worker operation missing target_member_id")
	}
	for i := range items {
		if items[i].ID == targetID {
			return &items[i], nil
		}
	}
	return nil, fmt.Errorf("worker member %s not found", targetID)
}

type slurmNodeSizing struct {
	Name     string
	Address  string
	CPUs     string
	MemoryMB string
}

func workerNodeNameFromDetail(host string, detail map[string]any) string {
	if hostname := strings.TrimSpace(stringAny(detail["hostname"])); hostname != "" {
		return hostname
	}
	return strings.TrimSpace(host)
}

func bootstrapSlurmWorkerNode(ctx context.Context, client *http.Client, cfg config, controller *member, workerHost, workerUser, workerNodeName string) error {
	controllerHost, controllerUser, err := controllerSSHTarget(ctx, client, cfg, controller)
	if err != nil {
		return err
	}
	controllerNodeName := strings.TrimSpace(stringValue(controller.BoundNodeResourceName))
	if controllerNodeName == "" {
		controllerNodeName = strings.TrimSpace(stringAny(controller.AdapterDetail["hostname"]))
	}
	if controllerNodeName == "" {
		controllerNodeName = controllerHost
	}
	controllerSizing, err := fetchSlurmNodeSizing(ctx, cfg.SSHPrivateKeyPath, controllerUser, controllerHost, controllerNodeName)
	if err != nil {
		return err
	}
	controllerSizing.Address = controllerHost
	workerSizing, err := fetchSlurmNodeSizing(ctx, cfg.SSHPrivateKeyPath, workerUser, workerHost, workerNodeName)
	if err != nil {
		return err
	}
	workerSizing.Address = workerHost
	mungeKey, err := runRemoteSudoCaptureFunc(ctx, cfg.SSHPrivateKeyPath, controllerUser, controllerHost, "base64 -w0 /etc/munge/munge.key")
	if err != nil {
		return err
	}
	slurmConf := renderTwoNodeSlurmConfig("gpuaas-local", "compute", controllerSizing, workerSizing)
	if err := configureSlurmWorker(ctx, cfg.SSHPrivateKeyPath, workerUser, workerHost, strings.TrimSpace(mungeKey), slurmConf); err != nil {
		return err
	}
	if err := configureSlurmControllerForWorker(ctx, cfg.SSHPrivateKeyPath, controllerUser, controllerHost, slurmConf, workerSizing.Name); err != nil {
		return err
	}
	return nil
}

func fetchSlurmNodeSizing(ctx context.Context, keyPath, user, host, fallbackName string) (slurmNodeSizing, error) {
	command := `printf '%s %s %s' "$(hostname -s)" "$(nproc)" "$(awk '/MemTotal:/ {print int($2/1024)}' /proc/meminfo)"`
	output, err := runRemoteSudoCaptureFunc(ctx, keyPath, user, host, command)
	if err != nil {
		return slurmNodeSizing{}, err
	}
	parts := strings.Fields(output)
	if len(parts) < 3 {
		return slurmNodeSizing{}, fmt.Errorf("unexpected node sizing output %q", strings.TrimSpace(output))
	}
	name := strings.TrimSpace(parts[0])
	if name == "" {
		name = fallbackName
	}
	return slurmNodeSizing{Name: name, CPUs: parts[1], MemoryMB: parts[2]}, nil
}

func renderTwoNodeSlurmConfig(clusterName, partitionName string, controller, worker slurmNodeSizing) string {
	return fmt.Sprintf(`ClusterName=%s
SlurmctldHost=%s
MailProg=/bin/true
ProctrackType=proctrack/linuxproc
ReturnToService=2
SchedulerType=sched/backfill
PriorityType=priority/basic
AccountingStorageType=accounting_storage/none
SelectType=select/cons_tres
SelectTypeParameters=CR_Core_Memory
SlurmUser=slurm
StateSaveLocation=/var/lib/slurm/slurmctld
SlurmdSpoolDir=/var/lib/slurm/slurmd
SlurmctldPidFile=/run/slurmctld.pid
SlurmdPidFile=/run/slurmd.pid
AuthType=auth/munge
MpiDefault=none
SlurmctldPort=6817
SlurmdPort=6818
SlurmdTimeout=300
SlurmctldTimeout=120
TaskPlugin=task/none
NodeName=%s%s CPUs=%s RealMemory=%s State=UNKNOWN
NodeName=%s%s CPUs=%s RealMemory=%s State=UNKNOWN
PartitionName=%s Nodes=ALL Default=YES MaxTime=INFINITE State=UP
`, clusterName, slurmctldHostValue(controller), controller.Name, slurmNodeAddrValue(controller), controller.CPUs, controller.MemoryMB, worker.Name, slurmNodeAddrValue(worker), worker.CPUs, worker.MemoryMB, partitionName)
}

func slurmctldHostValue(node slurmNodeSizing) string {
	name := strings.TrimSpace(node.Name)
	address := strings.TrimSpace(node.Address)
	if name == "" || address == "" || name == address {
		return name
	}
	return fmt.Sprintf("%s(%s)", name, address)
}

func slurmNodeAddrValue(node slurmNodeSizing) string {
	address := strings.TrimSpace(node.Address)
	if address == "" || address == strings.TrimSpace(node.Name) {
		return ""
	}
	return fmt.Sprintf(" NodeAddr=%s", address)
}

func configureSlurmWorker(ctx context.Context, keyPath, user, host, mungeKeyBase64, slurmConf string) error {
	command := fmt.Sprintf(`export DEBIAN_FRONTEND=noninteractive
apt-get update -qq
apt-get install -y munge slurm-wlm
install -d -m 0755 /etc/munge /etc/slurm /var/lib/slurm/slurmd /var/log/slurm
printf %%s %s | base64 -d >/etc/munge/munge.key
chown munge:munge /etc/munge/munge.key
chmod 0400 /etc/munge/munge.key
cat > /etc/slurm/slurm.conf <<'EOF'
%sEOF
chown -R slurm:slurm /var/lib/slurm /var/log/slurm
systemctl enable munge
systemctl restart munge
systemctl disable --now slurmctld || true
systemctl restart slurmd
systemctl enable slurmd
systemctl is-active --quiet slurmd`, shellQuote(mungeKeyBase64), slurmConf)
	return runRemoteSudoCommand(ctx, keyPath, user, host, command)
}

func configureSlurmControllerForWorker(ctx context.Context, keyPath, user, host, slurmConf, workerNodeName string) error {
	command := fmt.Sprintf(`worker_node=%s
cat > /etc/slurm/slurm.conf <<'EOF'
%sEOF
systemctl restart slurmctld slurmd
scontrol reconfigure || true
scontrol update NodeName="$worker_node" State=RESUME Reason="gpuaas-worker-ready" || true
for attempt in $(seq 1 30); do
  state=$(sinfo -h -N -n "$worker_node" -o "%%T" | head -n1)
  case "$state" in
    ""|*"*"*) sleep 2 ;;
    *) exit 0 ;;
  esac
done
sinfo -N -l
exit 1`, shellQuote(workerNodeName), slurmConf)
	return runRemoteSudoCommand(ctx, keyPath, user, host, command)
}

func workerTargetFromMember(item *member) (host string, user string, nodeName string, boundNodeID *string, detail map[string]any, err error) {
	detail = cloneMap(item.AdapterDetail)
	rawHost, _ := detail["host"].(string)
	rawHostname, _ := detail["hostname"].(string)
	rawWorkerHostname, _ := detail["worker_hostname"].(string)
	rawUser, _ := detail["username_on_node"].(string)
	port := intAny(detail["port"])
	host = strings.TrimSpace(rawHost)
	if host == "" {
		host = strings.TrimSpace(rawHostname)
	}
	if host == "" {
		host = strings.TrimSpace(rawWorkerHostname)
	}
	user = strings.TrimSpace(rawUser)
	nodeName = strings.TrimSpace(stringValue(item.BoundNodeResourceName))
	if nodeName == "" {
		nodeName = strings.TrimSpace(rawHostname)
	}
	if nodeName == "" {
		nodeName = strings.TrimSpace(rawWorkerHostname)
	}
	if nodeName == "" {
		nodeName = host
	}
	if host == "" || user == "" {
		return "", "", "", nil, nil, errors.New("worker member missing host or username_on_node in adapter_detail")
	}
	return sshEndpoint(host, port), user, nodeName, item.BoundNodeID, detail, nil
}

func runBootstrap(ctx context.Context, scriptPath, keyPath, user, host string) error {
	return runBootstrapDirect(ctx, scriptPath, keyPath, user, host, nil)
}

func runBootstrapWithArtifactPullSecret(ctx context.Context, scriptPath, keyPath, user, host, wrappedToken string, artifactDetail map[string]any) error {
	if strings.TrimSpace(wrappedToken) == "" {
		return runBootstrapFunc(ctx, scriptPath, keyPath, user, host)
	}
	script, err := prependArtifactPullSecretBootstrapPrelude(nil, strings.TrimSpace(wrappedToken), artifactDetail)
	if err != nil {
		return err
	}
	return runBootstrapDirect(ctx, scriptPath, keyPath, user, host, script)
}

func runBootstrapDirect(ctx context.Context, scriptPath, keyPath, user, host string, prelude []byte) error {
	script, err := os.ReadFile(scriptPath)
	if err != nil {
		return err
	}
	if len(prelude) > 0 {
		script = append(prelude, script...)
	}
	var output bytes.Buffer
	args := append(baseSSHArgs(keyPath, user, host), "sudo", "-n", "bash", "-s")
	cmd := exec.CommandContext(ctx, "ssh", args...)
	cmd.Stdin = bytes.NewReader(script)
	cmd.Stdout = io.MultiWriter(os.Stdout, &output)
	cmd.Stderr = io.MultiWriter(os.Stderr, &output)
	if err := cmd.Run(); err != nil {
		if excerpt := bootstrapLogExcerpt(output.String(), 80); excerpt != "" {
			return fmt.Errorf("%w\n%s", err, excerpt)
		}
		return err
	}
	return nil
}

func prependArtifactPullSecretBootstrapPrelude(script []byte, wrappedToken string, artifactDetail map[string]any) ([]byte, error) {
	metadata := map[string]any{}
	for k, v := range artifactDetail {
		if strings.EqualFold(k, "wrapped_token") {
			continue
		}
		metadata[k] = v
	}
	metadataJSON, err := json.Marshal(metadata)
	if err != nil {
		return nil, err
	}
	var out bytes.Buffer
	out.WriteString("artifact_pull_token_file=\"$(mktemp /tmp/gpuaas-artifact-pull.XXXXXX)\"\n")
	out.WriteString("artifact_pull_metadata_file=\"$(mktemp /tmp/gpuaas-artifact-pull-metadata.XXXXXX)\"\n")
	out.WriteString("chmod 0400 \"$artifact_pull_token_file\" \"$artifact_pull_metadata_file\"\n")
	out.WriteString("cleanup_gpuaas_artifact_pull_secret() { rm -f \"$artifact_pull_token_file\" \"$artifact_pull_metadata_file\"; }\n")
	out.WriteString("trap cleanup_gpuaas_artifact_pull_secret EXIT\n")
	out.WriteString("cat >\"$artifact_pull_token_file\" <<'GPUAAS_ARTIFACT_PULL_TOKEN'\n")
	out.WriteString(wrappedToken)
	out.WriteString("\nGPUAAS_ARTIFACT_PULL_TOKEN\n")
	out.WriteString("cat >\"$artifact_pull_metadata_file\" <<'GPUAAS_ARTIFACT_PULL_METADATA'\n")
	out.Write(metadataJSON)
	out.WriteString("\nGPUAAS_ARTIFACT_PULL_METADATA\n")
	out.WriteString("export GPUAAS_ARTIFACT_PULL_TOKEN_FILE=\"$artifact_pull_token_file\"\n")
	out.WriteString("export GPUAAS_ARTIFACT_PULL_METADATA_FILE=\"$artifact_pull_metadata_file\"\n")
	out.Write(script)
	return out.Bytes(), nil
}

func controllerAllocationIDForBootstrap(instance *appInstance, controller *member) (string, error) {
	if controller != nil {
		if allocationID := strings.TrimSpace(stringAny(controller.AdapterDetail["allocation_id"])); allocationID != "" {
			return allocationID, nil
		}
	}
	if instance != nil {
		if allocationID := strings.TrimSpace(stringAny(instance.PlacementIntent["controller_allocation_id"])); allocationID != "" {
			return allocationID, nil
		}
	}
	return "", errors.New("controller allocation_id missing for bootstrap")
}

func ensureSSHPrivateKeyPath(ctx context.Context, client *http.Client, cfg config) (string, error) {
	if strings.TrimSpace(cfg.AccessCredentialID) == "" {
		if strings.TrimSpace(cfg.SSHPrivateKeyPath) == "" {
			return "", errors.New("slurm controller missing ssh private key path or resolvable access credential")
		}
		return strings.TrimSpace(cfg.SSHPrivateKeyPath), nil
	}
	if strings.TrimSpace(cfg.ProjectID) == "" {
		return "", errors.New("project-scoped access credential delivery requires SLURM_REFERENCE_PROJECT_ID")
	}
	cacheDir := filepath.Join(os.TempDir(), "slurm-reference")
	if err := os.MkdirAll(cacheDir, 0o700); err != nil {
		return "", err
	}
	cachePath := filepath.Join(cacheDir, strings.TrimSpace(cfg.AccessCredentialID)+".key")
	delivered, err := deliverAccessCredential(ctx, client, cfg)
	if err != nil {
		return "", err
	}
	privateKey, err := unwrapSSHPrivateKey(ctx, client, delivered)
	if err != nil {
		return "", err
	}
	if !strings.HasSuffix(privateKey, "\n") {
		privateKey += "\n"
	}
	tmpPath := cachePath + ".tmp"
	_ = os.Remove(tmpPath)
	if err := os.WriteFile(tmpPath, []byte(privateKey), 0o600); err != nil {
		return "", err
	}
	if err := os.Chmod(tmpPath, 0o400); err != nil {
		_ = os.Remove(tmpPath)
		return "", err
	}
	if err := os.Rename(tmpPath, cachePath); err != nil {
		_ = os.Remove(tmpPath)
		return "", err
	}
	return cachePath, nil
}

func resolveAccessCredentialID(ctx context.Context, client *http.Client, cfg config, instance *appInstance) (string, error) {
	if strings.TrimSpace(cfg.AccessCredentialID) != "" {
		return strings.TrimSpace(cfg.AccessCredentialID), nil
	}
	items, err := listProjectAccessCredentials(ctx, client, cfg)
	if err != nil {
		return "", err
	}
	for _, item := range items {
		if item.Status != "active" || item.CredentialKind != "ssh_key" {
			continue
		}
		if strings.TrimSpace(stringValue(item.ResourceType)) == "app_instance" && strings.TrimSpace(stringValue(item.ResourceID)) == strings.TrimSpace(instance.ID) {
			return strings.TrimSpace(item.ID), nil
		}
		for _, binding := range item.Bindings {
			if strings.TrimSpace(binding.ResourceType) != "app_instance" {
				continue
			}
			if strings.TrimSpace(binding.ResourceID) != strings.TrimSpace(instance.ID) {
				continue
			}
			return strings.TrimSpace(item.ID), nil
		}
	}
	return "", errors.New("no active app_instance-bound ssh_key access credential found for slurm instance")
}

func deliverAccessCredential(ctx context.Context, client *http.Client, cfg config) (*deliveredAccessCredentialResponse, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/access-credentials/%s/deliver", cfg.APIBaseURL, cfg.ProjectID, cfg.AccessCredentialID)
	var out deliveredAccessCredentialResponse
	req := map[string]any{"wrap_ttl_seconds": 900}
	if err := doJSON(ctx, client, cfg, http.MethodPost, url, req, &out); err != nil {
		return nil, err
	}
	return &out, nil
}

func listProjectAccessCredentials(ctx context.Context, client *http.Client, cfg config) ([]projectAccessCredential, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/access-credentials?page_size=100", cfg.APIBaseURL, cfg.ProjectID)
	var out projectAccessCredentialListResponse
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return out.Items, nil
}

func unwrapSSHPrivateKey(ctx context.Context, client *http.Client, delivered *deliveredAccessCredentialResponse) (string, error) {
	if delivered == nil || delivered.Delivery.WrappedToken == nil || strings.TrimSpace(*delivered.Delivery.WrappedToken) == "" {
		return "", errors.New("access credential delivery missing wrapped token")
	}
	if delivered.Delivery.UnwrapURL == nil || strings.TrimSpace(*delivered.Delivery.UnwrapURL) == "" {
		return "", errors.New("access credential delivery missing unwrap_url")
	}
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimSpace(*delivered.Delivery.UnwrapURL), nil)
	if err != nil {
		return "", err
	}
	req.Header.Set("X-Vault-Token", strings.TrimSpace(*delivered.Delivery.WrappedToken))
	resp, err := client.Do(req)
	if err != nil {
		return "", err
	}
	defer func() { _ = resp.Body.Close() }()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		body, _ := io.ReadAll(resp.Body)
		return "", fmt.Errorf("vault unwrap failed: status=%d body=%s", resp.StatusCode, strings.TrimSpace(string(body)))
	}
	var decoded struct {
		Data struct {
			CredentialData map[string]string `json:"credential_data"`
		} `json:"data"`
	}
	if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil {
		return "", err
	}
	privateKey := strings.TrimSpace(decoded.Data.CredentialData["private_key"])
	if privateKey == "" {
		return "", errors.New("access credential unwrap payload missing private_key")
	}
	return privateKey, nil
}

func reconcileBootstrapSSHTrust(ctx context.Context, client *http.Client, cfg config, allocationID, usernameOnNode, keyPath, desiredState string) error {
	allocationID = strings.TrimSpace(allocationID)
	usernameOnNode = strings.TrimSpace(usernameOnNode)
	keyPath = strings.TrimSpace(keyPath)
	desiredState = strings.TrimSpace(strings.ToLower(desiredState))
	if allocationID == "" || usernameOnNode == "" {
		return nil
	}
	body := map[string]any{
		"access_credential_id": normalizedStringValue(cfg.AccessCredentialID),
		"allocation_id":        allocationID,
		"username_on_node":     usernameOnNode,
		"desired_state":        desiredState,
	}
	if desiredState == "present" {
		if keyPath == "" {
			return errors.New("bootstrap ssh trust present reconcile requires ssh private key path")
		}
		publicKey, err := publicKeyFromPrivateKeyPath(keyPath)
		if err != nil {
			return err
		}
		body["public_key"] = publicKey
	}
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/bootstrap-ssh/reconcile", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	return doJSON(ctx, client, cfg, http.MethodPost, url, body, nil)
}

func publicKeyFromPrivateKeyPath(path string) (string, error) {
	raw, err := os.ReadFile(path)
	if err != nil {
		return "", err
	}
	signer, err := ssh.ParsePrivateKey(raw)
	if err != nil {
		return "", err
	}
	return strings.TrimSpace(string(ssh.MarshalAuthorizedKey(signer.PublicKey()))), nil
}

func normalizedStringValue(v string) any {
	if strings.TrimSpace(v) == "" {
		return nil
	}
	return strings.TrimSpace(v)
}

func runRemoteSudoCommand(ctx context.Context, keyPath, user, host, command string) error {
	_, err := runRemoteSudoCaptureFunc(ctx, keyPath, user, host, command)
	return err
}

func runRemoteSudoCapture(ctx context.Context, keyPath, user, host, command string) (string, error) {
	remoteCommand := buildRemoteSudoSSHCommand(command)
	args := append(baseSSHArgs(keyPath, user, host), remoteCommand)
	cmd := exec.CommandContext(ctx, "ssh", args...)
	var stdout bytes.Buffer
	var stderr bytes.Buffer
	cmd.Stdout = io.MultiWriter(os.Stdout, &stdout)
	cmd.Stderr = io.MultiWriter(os.Stderr, &stderr)
	if err := cmd.Run(); err != nil {
		combined := strings.TrimSpace(stdout.String() + "\n" + stderr.String())
		if combined == "" {
			return "", err
		}
		return "", fmt.Errorf("%w: %s", err, combined)
	}
	return stdout.String(), nil
}

func buildRemoteSudoSSHCommand(command string) string {
	return fmt.Sprintf("sudo -n bash -lc %s", shellQuote(command))
}

func verifyNodeStateContains(ctx context.Context, keyPath, user, host, nodeName, requiredState string) error {
	output, err := runRemoteSudoCaptureFunc(ctx, keyPath, user, host, fmt.Sprintf("scontrol show node %s", shellQuote(nodeName)))
	if err != nil {
		return err
	}
	if !strings.Contains(output, "State=") || !strings.Contains(output, requiredState) {
		return fmt.Errorf("node %s did not report required state %s: %s", nodeName, requiredState, strings.TrimSpace(output))
	}
	return nil
}

func verifySystemdInactive(ctx context.Context, keyPath, user, host, unit string) error {
	checkCommand := fmt.Sprintf("if systemctl is-active --quiet %s; then echo active; else echo inactive; fi", shellQuote(unit))
	output, err := runRemoteSudoCaptureFunc(ctx, keyPath, user, host, checkCommand)
	if err != nil {
		return err
	}
	if strings.TrimSpace(output) != "inactive" {
		return fmt.Errorf("systemd unit %s still active: %s", unit, strings.TrimSpace(output))
	}
	return nil
}

func reportWorkerOperation(ctx context.Context, client *http.Client, cfg config, op memberOperation, status string, targetMemberID string, memberStatus *string, boundNodeID *string, detail map[string]any, lastErrorCode *string, lastErrorMessage *string) error {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/member-operations/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID, op.ID)
	req := map[string]any{
		"status":             status,
		"target_member_id":   targetMemberID,
		"member_status":      memberStatus,
		"bound_node_id":      boundNodeID,
		"adapter_detail":     detail,
		"last_error_code":    lastErrorCode,
		"last_error_message": lastErrorMessage,
	}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func reportWorkerOperationFailure(ctx context.Context, client *http.Client, cfg config, op memberOperation, cause error) error {
	return reportWorkerOperationFailureWithState(ctx, client, cfg, op, stringValue(op.TargetMemberID), nil, nil, cause)
}

func reportWorkerOperationFailureWithState(ctx context.Context, client *http.Client, cfg config, op memberOperation, targetMemberID string, boundNodeID *string, detail map[string]any, cause error) error {
	if strings.TrimSpace(targetMemberID) == "" {
		targetMemberID = uuid.NewString()
	}
	code := "bootstrap_failed"
	message := cause.Error()
	mergedDetail := cloneMap(detail)
	if strings.TrimSpace(stringValue(op.ComponentKey)) != "" {
		mergedDetail["role"] = strings.TrimSpace(stringValue(op.ComponentKey))
	} else if _, ok := mergedDetail["role"]; !ok {
		mergedDetail["role"] = "worker"
	}
	mergedDetail["phase"] = failurePhaseForOperation(op.Action)
	mergedDetail["last_adapter_error"] = message
	return reportWorkerOperation(ctx, client, cfg, op, "failed", targetMemberID, ptrString("failed"), boundNodeID, mergedDetail, &code, &message)
}

func failurePhaseForOperation(action string) string {
	switch action {
	case "drain":
		return "drain_failed"
	case "remove":
		return "remove_failed"
	default:
		return "bootstrap_failed"
	}
}

func reportMemberReady(ctx context.Context, client *http.Client, cfg config, item *member, detail map[string]any) error {
	return reportMemberStatus(ctx, client, cfg, item, "ready", detail)
}

func reportMemberStatus(ctx context.Context, client *http.Client, cfg config, item *member, status string, detail map[string]any) error {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/members/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID, item.ID)
	req := map[string]any{
		"status":         status,
		"bound_node_id":  item.BoundNodeID,
		"adapter_detail": detail,
	}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func reportControllerSlurmdState(ctx context.Context, client *http.Client, cfg config, controller *member, slurmdState string, observedAt string) error {
	detail := cloneMap(controller.AdapterDetail)
	detail["slurmd_state"] = slurmdState
	detail["last_runtime_observed_at"] = observedAt
	return reportMemberStatus(ctx, client, cfg, controller, controller.Status, detail)
}

func reportInstanceRunning(ctx context.Context, client *http.Client, cfg config) error {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	req := map[string]any{"status": "running"}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func reportInstanceStatus(ctx context.Context, client *http.Client, cfg config, instance *appInstance, status string, failureReason *string, runtimeState map[string]any) error {
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	current := cloneMap(instance.RuntimeState)
	for k, v := range runtimeState {
		current[k] = v
	}
	if failureReason == nil && status != "failed" {
		delete(current, "last_adapter_error")
		delete(current, "bootstrap_log_excerpt")
	}
	req := map[string]any{
		"status":         status,
		"failure_reason": failureReason,
		"runtime_state":  current,
	}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func reportInstanceLifecycleEvent(ctx context.Context, client *http.Client, cfg config, instance *appInstance, operation, phase, healthStatus, message string, detail map[string]any) error {
	if instance == nil {
		return nil
	}
	status := strings.TrimSpace(instance.Status)
	if !isSlurmInstanceLifecycleStatus(status) {
		return nil
	}
	now := time.Now().UTC().Format(time.RFC3339)
	state := cloneMap(instance.RuntimeState)
	event := lifecycleActivityEvent(operation, phase, healthStatus, message, detail, now)
	if lifecycleEventAlreadyReported(state, event) {
		return nil
	}
	state["lifecycle_activity"] = appendLifecycleActivity(state, operation, phase, healthStatus, message, detail, now)
	state["last_lifecycle_operation"] = strings.TrimSpace(operation)
	state["last_lifecycle_phase"] = strings.TrimSpace(phase)
	state["last_lifecycle_message"] = strings.TrimSpace(message)
	state["last_lifecycle_at"] = now
	state["last_runtime_observed_at"] = now
	if strings.TrimSpace(healthStatus) != "" {
		state["lifecycle_health_status"] = strings.TrimSpace(healthStatus)
		state["health_status"] = strings.TrimSpace(healthStatus)
	}
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	req := map[string]any{
		"status":        status,
		"runtime_state": state,
		"phase":         strings.TrimSpace(phase),
		"health_status": strings.TrimSpace(healthStatus),
		"health_detail": map[string]any{"message": strings.TrimSpace(message)},
	}
	if err := doJSON(ctx, client, cfg, http.MethodPost, url, req, nil); err != nil {
		return err
	}
	instance.RuntimeState = state
	return nil
}

func reportInstanceBootstrapEvent(ctx context.Context, client *http.Client, cfg config, instance *appInstance, phase, healthStatus, message string, detail map[string]any) error {
	if instance == nil {
		return nil
	}
	status := strings.TrimSpace(instance.Status)
	switch status {
	case "", "requested", "deploying":
		status = "deploying"
	default:
		return nil
	}
	now := time.Now().UTC().Format(time.RFC3339)
	state := cloneMap(instance.RuntimeState)
	activity := bootstrapActivityItems(state["bootstrap_activity"])
	event := map[string]any{
		"phase":       strings.TrimSpace(phase),
		"message":     strings.TrimSpace(message),
		"observed_at": now,
	}
	if strings.TrimSpace(healthStatus) != "" {
		event["health_status"] = strings.TrimSpace(healthStatus)
	}
	if len(detail) > 0 {
		event["detail"] = detail
	}
	if bootstrapEventAlreadyReported(state, event) {
		if strings.EqualFold(strings.TrimSpace(healthStatus), "healthy") && instanceHasStaleAdapterError(&appInstance{RuntimeState: state}) {
			delete(state, "last_adapter_error")
			delete(state, "bootstrap_log_excerpt")
			req := map[string]any{
				"status":        status,
				"runtime_state": state,
				"phase":         strings.TrimSpace(phase),
				"health_status": strings.TrimSpace(healthStatus),
				"health_detail": map[string]any{"message": strings.TrimSpace(message)},
			}
			url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
			if err := doJSON(ctx, client, cfg, http.MethodPost, url, req, nil); err != nil {
				return err
			}
			instance.RuntimeState = state
		}
		return nil
	}
	activity = append(activity, event)
	if len(activity) > 20 {
		activity = activity[len(activity)-20:]
	}
	state["bootstrap_activity"] = activity
	state["last_bootstrap_phase"] = strings.TrimSpace(phase)
	state["last_bootstrap_message"] = strings.TrimSpace(message)
	state["last_bootstrap_at"] = now
	if strings.TrimSpace(healthStatus) != "" {
		state["health_status"] = strings.TrimSpace(healthStatus)
	}
	if strings.EqualFold(strings.TrimSpace(healthStatus), "degraded") {
		state["last_adapter_error"] = strings.TrimSpace(message)
		state["bootstrap_log_excerpt"] = bootstrapLogExcerpt(message, 80)
	} else if strings.EqualFold(strings.TrimSpace(healthStatus), "healthy") {
		delete(state, "last_adapter_error")
		delete(state, "bootstrap_log_excerpt")
	}
	url := fmt.Sprintf("%s/api/v1/projects/%s/app-instances/%s/report", cfg.APIBaseURL, cfg.ProjectID, cfg.AppInstanceID)
	req := map[string]any{
		"status":        status,
		"runtime_state": state,
		"phase":         strings.TrimSpace(phase),
		"health_status": strings.TrimSpace(healthStatus),
		"health_detail": map[string]any{"message": strings.TrimSpace(message)},
	}
	if err := doJSON(ctx, client, cfg, http.MethodPost, url, req, nil); err != nil {
		return err
	}
	instance.RuntimeState = state
	return nil
}

func bootstrapActivityItems(raw any) []map[string]any {
	items, ok := raw.([]any)
	if !ok {
		return []map[string]any{}
	}
	out := make([]map[string]any, 0, len(items))
	for _, item := range items {
		m, ok := item.(map[string]any)
		if !ok {
			continue
		}
		out = append(out, cloneMap(m))
	}
	return out
}

func appendLifecycleActivity(state map[string]any, operation, phase, healthStatus, message string, detail map[string]any, observedAt string) []map[string]any {
	activity := lifecycleActivityItems(state["lifecycle_activity"])
	event := lifecycleActivityEvent(operation, phase, healthStatus, message, detail, observedAt)
	activity = append(activity, event)
	if len(activity) > 20 {
		activity = activity[len(activity)-20:]
	}
	return activity
}

func lifecycleActivityEvent(operation, phase, healthStatus, message string, detail map[string]any, observedAt string) map[string]any {
	event := map[string]any{
		"operation":   strings.TrimSpace(operation),
		"phase":       strings.TrimSpace(phase),
		"message":     strings.TrimSpace(message),
		"observed_at": observedAt,
	}
	if strings.TrimSpace(healthStatus) != "" {
		event["health_status"] = strings.TrimSpace(healthStatus)
	}
	if len(detail) > 0 {
		event["detail"] = detail
	}
	return event
}

func lifecycleActivityItems(raw any) []map[string]any {
	items, ok := raw.([]any)
	if !ok {
		return []map[string]any{}
	}
	out := make([]map[string]any, 0, len(items))
	for _, item := range items {
		m, ok := item.(map[string]any)
		if !ok {
			continue
		}
		out = append(out, cloneMap(m))
	}
	return out
}

func reportSharedRuntimeStatus(ctx context.Context, client *http.Client, cfg config, status string, failureReason *string, runtimeState map[string]any) error {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/shared-app-runtimes/%s/report", cfg.APIBaseURL, cfg.OrgID, cfg.SharedRuntimeID)
	req := map[string]any{
		"status":         status,
		"failure_reason": failureReason,
		"runtime_state":  runtimeState,
	}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func bootstrapEventAlreadyReported(state map[string]any, event map[string]any) bool {
	if state == nil || event == nil {
		return false
	}
	if stringAny(state["last_bootstrap_phase"]) != stringAny(event["phase"]) ||
		stringAny(state["last_bootstrap_message"]) != stringAny(event["message"]) {
		return false
	}
	if stringAny(event["health_status"]) != "" && stringAny(state["health_status"]) != stringAny(event["health_status"]) {
		return false
	}
	activity := bootstrapActivityItems(state["bootstrap_activity"])
	if len(activity) == 0 {
		return true
	}
	last := activity[len(activity)-1]
	if stringAny(last["phase"]) != stringAny(event["phase"]) ||
		stringAny(last["message"]) != stringAny(event["message"]) {
		return false
	}
	if stringAny(event["health_status"]) != "" && stringAny(last["health_status"]) != stringAny(event["health_status"]) {
		return false
	}
	_, eventHasDetail := event["detail"]
	lastDetail, lastHasDetail := last["detail"]
	if eventHasDetail != lastHasDetail {
		return false
	}
	if eventHasDetail && !reflect.DeepEqual(lastDetail, event["detail"]) {
		return false
	}
	return true
}

func lifecycleEventAlreadyReported(state map[string]any, event map[string]any) bool {
	if state == nil || event == nil {
		return false
	}
	if stringAny(state["last_lifecycle_phase"]) != stringAny(event["phase"]) ||
		stringAny(state["last_lifecycle_message"]) != stringAny(event["message"]) {
		return false
	}
	if stringAny(event["health_status"]) != "" && stringAny(state["lifecycle_health_status"]) != stringAny(event["health_status"]) {
		return false
	}
	activity := lifecycleActivityItems(state["lifecycle_activity"])
	if len(activity) == 0 {
		return true
	}
	last := activity[len(activity)-1]
	if stringAny(last["operation"]) != stringAny(event["operation"]) ||
		stringAny(last["phase"]) != stringAny(event["phase"]) ||
		stringAny(last["message"]) != stringAny(event["message"]) {
		return false
	}
	if stringAny(event["health_status"]) != "" && stringAny(last["health_status"]) != stringAny(event["health_status"]) {
		return false
	}
	_, eventHasDetail := event["detail"]
	lastDetail, lastHasDetail := last["detail"]
	if eventHasDetail != lastHasDetail {
		return false
	}
	if eventHasDetail && !reflect.DeepEqual(lastDetail, event["detail"]) {
		return false
	}
	return true
}

func listSharedWorkers(ctx context.Context, client *http.Client, cfg config) ([]sharedWorker, error) {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/shared-app-runtimes/%s/workers?page_size=100", cfg.APIBaseURL, cfg.OrgID, cfg.SharedRuntimeID)
	var out sharedWorkerListResponse
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return out.Items, nil
}

func listSharedWorkerOperations(ctx context.Context, client *http.Client, cfg config, status string) ([]sharedWorkerOperation, error) {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/shared-app-runtimes/%s/worker-operations?page_size=100", cfg.APIBaseURL, cfg.OrgID, cfg.SharedRuntimeID)
	if trimmed := strings.TrimSpace(status); trimmed != "" {
		url += "&status=" + trimmed
	}
	var out sharedWorkerOperationListResponse
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	items := make([]sharedWorkerOperation, 0, len(out.Items))
	for _, item := range out.Items {
		if item.Action != "add" && item.Action != "drain" && item.Action != "remove" {
			continue
		}
		if item.Status != "accepted" && item.Status != "in_progress" {
			continue
		}
		items = append(items, item)
	}
	return items, nil
}

func reportSharedWorkerOperation(ctx context.Context, client *http.Client, cfg config, item sharedWorkerOperation, status string, lastErr error, workers []map[string]any) error {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/shared-app-runtimes/%s/worker-operations/%s/report", cfg.APIBaseURL, cfg.OrgID, cfg.SharedRuntimeID, item.ID)
	req := map[string]any{
		"status": status,
	}
	if lastErr != nil {
		req["last_error_message"] = lastErr.Error()
	}
	if len(workers) > 0 {
		req["workers"] = workers
	}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func reportSharedWorker(ctx context.Context, client *http.Client, cfg config, workerID string, status string, boundNodeID *string, runtimeState map[string]any) error {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/shared-app-runtimes/%s/workers/%s/report", cfg.APIBaseURL, cfg.OrgID, cfg.SharedRuntimeID, workerID)
	req := map[string]any{
		"status":        status,
		"bound_node_id": boundNodeID,
		"runtime_state": runtimeState,
	}
	return doJSON(ctx, client, cfg, http.MethodPost, url, req, nil)
}

func reconcileSharedWorkerOperations(ctx context.Context, log *slog.Logger, client *http.Client, cfg config) error {
	workers, err := listSharedWorkers(ctx, client, cfg)
	if err != nil {
		return err
	}
	existingByAllocation := make(map[string]sharedWorker, len(workers))
	for _, item := range workers {
		if strings.TrimSpace(item.AllocationID) == "" {
			continue
		}
		if item.Status == "removed" {
			continue
		}
		existingByAllocation[item.AllocationID] = item
	}
	operations, err := listSharedWorkerOperations(ctx, client, cfg, "")
	if err != nil {
		return err
	}
	for _, op := range operations {
		if err := reconcileSharedWorkerOperation(ctx, log, client, cfg, existingByAllocation, op); err != nil {
			return err
		}
	}
	return nil
}

func sharedWorkerFromOperation(existingByAllocation map[string]sharedWorker, op sharedWorkerOperation) (sharedWorker, error) {
	for _, allocationID := range op.RequestedAllocationIDs {
		allocationID = strings.TrimSpace(allocationID)
		if allocationID == "" {
			continue
		}
		if item, ok := existingByAllocation[allocationID]; ok {
			return item, nil
		}
	}
	return sharedWorker{}, fmt.Errorf("no shared worker found for requested allocations %v", op.RequestedAllocationIDs)
}

func sharedWorkerSSHTarget(item sharedWorker) (host string, user string, nodeName string, err error) {
	detail := cloneMap(item.RuntimeState)
	rawHost, _ := detail["host"].(string)
	rawHostname, _ := detail["hostname"].(string)
	rawWorkerHostname, _ := detail["worker_hostname"].(string)
	rawUser, _ := detail["username_on_node"].(string)
	port := intAny(detail["port"])
	host = strings.TrimSpace(rawHost)
	if host == "" {
		host = strings.TrimSpace(rawHostname)
	}
	if host == "" {
		host = strings.TrimSpace(rawWorkerHostname)
	}
	user = strings.TrimSpace(rawUser)
	nodeName = strings.TrimSpace(rawHostname)
	if nodeName == "" {
		nodeName = strings.TrimSpace(rawWorkerHostname)
	}
	if nodeName == "" {
		nodeName = host
	}
	if host == "" || user == "" {
		return "", "", "", errors.New("shared worker missing host or username_on_node in runtime_state")
	}
	return sshEndpoint(host, port), user, nodeName, nil
}

func reconcileSharedWorkerOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, existingByAllocation map[string]sharedWorker, op sharedWorkerOperation) error {
	switch op.Action {
	case "add":
		return reconcileSharedWorkerAddOperation(ctx, log, client, cfg, existingByAllocation, op)
	case "drain":
		return reconcileSharedWorkerDrainOperation(ctx, log, client, cfg, existingByAllocation, op)
	case "remove":
		return reconcileSharedWorkerRemoveOperation(ctx, log, client, cfg, existingByAllocation, op)
	default:
		return nil
	}
}

func reconcileSharedWorkerAddOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, existingByAllocation map[string]sharedWorker, op sharedWorkerOperation) error {
	if op.Status == "accepted" {
		if err := reportSharedWorkerOperation(ctx, client, cfg, op, "in_progress", nil, nil); err != nil {
			return err
		}
	}
	results := make([]map[string]any, 0, len(op.RequestedAllocationIDs))
	for _, allocationID := range op.RequestedAllocationIDs {
		allocationID = strings.TrimSpace(allocationID)
		if allocationID == "" {
			continue
		}
		if current, ok := existingByAllocation[allocationID]; ok && current.Status != "failed" {
			results = append(results, map[string]any{
				"allocation_id": allocationID,
				"bound_node_id": current.BoundNodeID,
				"status":        current.Status,
				"runtime_state": cloneMap(current.RuntimeState),
			})
			continue
		}
		target, err := getOrgAllocation(ctx, client, cfg, allocationID)
		if err != nil {
			_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
			return err
		}
		host, user, err := allocationSSHTarget(target)
		if err != nil {
			_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
			return err
		}
		log.Info("bootstrapping shared runtime worker", "shared_runtime_id", cfg.SharedRuntimeID, "operation_id", op.ID, "allocation_id", allocationID, "host", host, "user", user)
		if err := runBootstrapFunc(ctx, cfg.BootstrapScriptPath, cfg.SSHPrivateKeyPath, user, host); err != nil {
			_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
			return err
		}
		runtimeState := map[string]any{
			"role":                     "shared_worker",
			"phase":                    "slurm_ready",
			"allocation_id":            allocationID,
			"hostname":                 host,
			"cluster_name":             "gpuaas-local",
			"partition_name":           "compute",
			"slurmd_state":             "idle",
			"last_runtime_observed_at": time.Now().UTC().Format(time.RFC3339),
		}
		results = append(results, map[string]any{
			"allocation_id": allocationID,
			"bound_node_id": target.NodeID,
			"status":        "ready",
			"runtime_state": runtimeState,
		})
		existingByAllocation[allocationID] = sharedWorker{
			AllocationID: allocationID,
			BoundNodeID:  target.NodeID,
			Status:       "ready",
			RuntimeState: runtimeState,
		}
	}
	if len(results) == 0 {
		return nil
	}
	return reportSharedWorkerOperation(ctx, client, cfg, op, "succeeded", nil, results)
}

func reconcileSharedWorkerDrainOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, existingByAllocation map[string]sharedWorker, op sharedWorkerOperation) error {
	target, err := sharedWorkerFromOperation(existingByAllocation, op)
	if err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	if op.Status == "accepted" {
		if err := reportSharedWorkerOperation(ctx, client, cfg, op, "in_progress", nil, nil); err != nil {
			return err
		}
	}
	host, user, nodeName, err := sharedWorkerSSHTarget(target)
	if err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	log.Info("draining shared runtime worker", "shared_runtime_id", cfg.SharedRuntimeID, "operation_id", op.ID, "worker_id", target.ID, "allocation_id", target.AllocationID, "host", host, "user", user, "node_name", nodeName)
	cmd := fmt.Sprintf("scontrol update NodeName=%s State=DRAIN Reason='gpuaas-shared-worker-drain'", shellQuote(nodeName))
	if err := runRemoteSudoCommand(ctx, cfg.SSHPrivateKeyPath, user, host, cmd); err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	if err := verifyNodeStateContains(ctx, cfg.SSHPrivateKeyPath, user, host, nodeName, "DRAIN"); err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	runtimeState := cloneMap(target.RuntimeState)
	runtimeState["phase"] = "slurmd_drained"
	runtimeState["slurmd_state"] = "drained"
	runtimeState["registered_in_controller"] = true
	runtimeState["last_runtime_observed_at"] = time.Now().UTC().Format(time.RFC3339)
	if err := reportSharedWorker(ctx, client, cfg, target.ID, "draining", target.BoundNodeID, runtimeState); err != nil {
		return err
	}
	target.Status = "draining"
	target.RuntimeState = runtimeState
	existingByAllocation[target.AllocationID] = target
	return reportSharedWorkerOperation(ctx, client, cfg, op, "succeeded", nil, []map[string]any{{
		"allocation_id": target.AllocationID,
		"bound_node_id": target.BoundNodeID,
		"status":        "draining",
		"runtime_state": runtimeState,
	}})
}

func reconcileSharedWorkerRemoveOperation(ctx context.Context, log *slog.Logger, client *http.Client, cfg config, existingByAllocation map[string]sharedWorker, op sharedWorkerOperation) error {
	target, err := sharedWorkerFromOperation(existingByAllocation, op)
	if err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	if op.Status == "accepted" {
		if err := reportSharedWorkerOperation(ctx, client, cfg, op, "in_progress", nil, nil); err != nil {
			return err
		}
	}
	host, user, nodeName, err := sharedWorkerSSHTarget(target)
	if err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	log.Info("removing shared runtime worker", "shared_runtime_id", cfg.SharedRuntimeID, "operation_id", op.ID, "worker_id", target.ID, "allocation_id", target.AllocationID, "host", host, "user", user, "node_name", nodeName)
	cmd := fmt.Sprintf("systemctl disable --now slurmd && scontrol update NodeName=%s State=DOWN Reason='gpuaas-shared-worker-removed' || true", shellQuote(nodeName))
	if err := runRemoteSudoCommand(ctx, cfg.SSHPrivateKeyPath, user, host, cmd); err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	if err := verifySystemdInactive(ctx, cfg.SSHPrivateKeyPath, user, host, "slurmd"); err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	if err := verifyNodeStateContains(ctx, cfg.SSHPrivateKeyPath, user, host, nodeName, "DOWN"); err != nil {
		_ = reportSharedWorkerOperation(ctx, client, cfg, op, "failed", err, nil)
		return err
	}
	runtimeState := cloneMap(target.RuntimeState)
	runtimeState["phase"] = "slurmd_removed"
	runtimeState["slurmd_state"] = "stopped"
	runtimeState["registered_in_controller"] = false
	runtimeState["last_runtime_observed_at"] = time.Now().UTC().Format(time.RFC3339)
	if err := reportSharedWorker(ctx, client, cfg, target.ID, "removed", target.BoundNodeID, runtimeState); err != nil {
		return err
	}
	target.Status = "removed"
	target.RuntimeState = runtimeState
	existingByAllocation[target.AllocationID] = target
	return reportSharedWorkerOperation(ctx, client, cfg, op, "succeeded", nil, []map[string]any{{
		"allocation_id": target.AllocationID,
		"bound_node_id": target.BoundNodeID,
		"status":        "removed",
		"runtime_state": runtimeState,
	}})
}

func mintServiceAccountToken(ctx context.Context, client *http.Client, cfg config) (string, error) {
	token, _, err := mintServiceAccountTokenWithExpiry(ctx, client, cfg)
	return token, err
}

func mintServiceAccountTokenWithExpiry(ctx context.Context, client *http.Client, cfg config) (string, int, error) {
	url := fmt.Sprintf("%s/api/v1/auth/service-account/token", cfg.APIBaseURL)
	req := map[string]any{
		"service_account_id": cfg.ServiceAccountID,
		"key_id":             cfg.ServiceAccountKeyID,
		"client_secret":      cfg.ServiceAccountSecret,
	}
	var out controllerToken
	if err := doJSON(ctx, client, cfg, http.MethodPost, url, req, &out); err != nil {
		return "", 0, err
	}
	if strings.TrimSpace(out.AccessToken) == "" {
		return "", 0, errors.New("service-account token response missing access_token")
	}
	return out.AccessToken, out.ExpiresInSeconds, nil
}

func mintSharedRuntimeOperatorToken(ctx context.Context, client *http.Client, cfg config) (string, error) {
	token, _, err := mintSharedRuntimeOperatorTokenWithExpiry(ctx, client, cfg)
	return token, err
}

func mintSharedRuntimeOperatorTokenWithExpiry(ctx context.Context, client *http.Client, cfg config) (string, int, error) {
	url := fmt.Sprintf("%s/api/v1/auth/shared-runtime-operator/token", cfg.APIBaseURL)
	req := map[string]any{
		"shared_runtime_id": cfg.SharedRuntimeID,
		"key_id":            cfg.SharedRuntimeKeyID,
		"client_secret":     cfg.SharedRuntimeSecret,
	}
	var out controllerToken
	if err := doJSON(ctx, client, cfg, http.MethodPost, url, req, &out); err != nil {
		return "", 0, err
	}
	if strings.TrimSpace(out.AccessToken) == "" {
		return "", 0, errors.New("shared-runtime-operator token response missing access_token")
	}
	return out.AccessToken, out.ExpiresInSeconds, nil
}

func doJSON(ctx context.Context, client *http.Client, cfg config, method, url string, body any, out any) error {
	var payload io.Reader
	if body != nil {
		raw, err := json.Marshal(body)
		if err != nil {
			return err
		}
		payload = bytes.NewReader(raw)
	}
	req, err := http.NewRequestWithContext(ctx, method, url, payload)
	if err != nil {
		return err
	}
	if cfg.HTTPHostOverride != "" {
		req.Host = cfg.HTTPHostOverride
	}
	if cfg.BearerToken != "" {
		req.Header.Set("Authorization", "Bearer "+cfg.BearerToken)
	}
	if strings.TrimSpace(cfg.ProjectID) != "" {
		req.Header.Set("X-Project-ID", cfg.ProjectID)
	}
	req.Header.Set("X-Correlation-ID", fmt.Sprintf("slurm-reference-controller-%d", time.Now().UnixNano()))
	if body != nil {
		req.Header.Set("Content-Type", "application/json")
	}
	switch method {
	case http.MethodPost, http.MethodPut, http.MethodPatch, http.MethodDelete:
		req.Header.Set("Idempotency-Key", "slurm-reference-controller-"+uuid.NewString())
	}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer func() { _ = resp.Body.Close() }()
	raw, err := io.ReadAll(resp.Body)
	if err != nil {
		return err
	}
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("%s %s returned %d: %s", method, url, resp.StatusCode, strings.TrimSpace(string(raw)))
	}
	if out != nil && len(raw) > 0 {
		if err := json.Unmarshal(raw, out); err != nil {
			return err
		}
	}
	return nil
}

func getenvDefault(key, fallback string) string {
	v := strings.TrimSpace(os.Getenv(key))
	if v == "" {
		return fallback
	}
	return v
}

func cloneMap(in map[string]any) map[string]any {
	if len(in) == 0 {
		return map[string]any{}
	}
	out := make(map[string]any, len(in))
	for k, v := range in {
		out[k] = v
	}
	return out
}

func ptrString(v string) *string {
	return &v
}

func stringValue(v *string) string {
	if v == nil {
		return ""
	}
	return strings.TrimSpace(*v)
}

func stringAny(v any) string {
	s, _ := v.(string)
	return strings.TrimSpace(s)
}

func intAny(v any) int {
	switch typed := v.(type) {
	case int:
		return typed
	case int32:
		return int(typed)
	case int64:
		return int(typed)
	case float64:
		return int(typed)
	case json.Number:
		n, _ := typed.Int64()
		return int(n)
	case string:
		n, _ := strconv.Atoi(strings.TrimSpace(typed))
		return n
	default:
		return 0
	}
}

func sshEndpoint(host string, port int) string {
	host = strings.TrimSpace(host)
	if override, ok := schedulerSSHOverride(host, port); ok {
		return override
	}
	if host == "" || port <= 0 || port == 22 {
		return host
	}
	if _, _, err := net.SplitHostPort(host); err == nil {
		return host
	}
	return net.JoinHostPort(host, strconv.Itoa(port))
}

type schedulerSSHOverrideTarget struct {
	Host string `json:"host"`
	Port int    `json:"port"`
}

func schedulerSSHOverride(host string, port int) (string, bool) {
	host = strings.TrimSpace(host)
	if host == "" {
		return "", false
	}
	rawMap := firstNonEmpty(
		os.Getenv("SLURM_REFERENCE_SSH_TARGET_OVERRIDE_MAP"),
		os.Getenv("APP_SCHEDULER_SSH_TARGET_OVERRIDE_MAP"),
		os.Getenv("SCHEDULER_SSH_TARGET_OVERRIDE_MAP"),
	)
	if strings.TrimSpace(rawMap) == "" {
		return "", false
	}
	var values map[string]json.RawMessage
	if err := json.Unmarshal([]byte(rawMap), &values); err != nil {
		return "", false
	}
	keys := []string{host}
	if port > 0 {
		keys = append([]string{net.JoinHostPort(host, strconv.Itoa(port))}, keys...)
	}
	for _, key := range keys {
		raw, ok := values[key]
		if !ok {
			continue
		}
		if target, ok := parseSchedulerSSHOverrideTarget(raw); ok {
			return target, true
		}
	}
	return "", false
}

func parseSchedulerSSHOverrideTarget(raw json.RawMessage) (string, bool) {
	var text string
	if err := json.Unmarshal(raw, &text); err == nil {
		text = strings.TrimSpace(text)
		return text, text != ""
	}
	var target schedulerSSHOverrideTarget
	if err := json.Unmarshal(raw, &target); err != nil {
		return "", false
	}
	target.Host = strings.TrimSpace(target.Host)
	if target.Host == "" {
		return "", false
	}
	if target.Port <= 0 {
		return target.Host, true
	}
	return net.JoinHostPort(target.Host, strconv.Itoa(target.Port)), true
}

func firstNonEmpty(values ...string) string {
	for _, value := range values {
		if strings.TrimSpace(value) != "" {
			return value
		}
	}
	return ""
}

func baseSSHArgs(keyPath, user, host string) []string {
	host = strings.TrimSpace(host)
	targetHost := host
	targetPort := ""
	if parsedHost, parsedPort, err := net.SplitHostPort(host); err == nil {
		targetHost = parsedHost
		targetPort = parsedPort
	}
	args := []string{
		"-o", "StrictHostKeyChecking=no",
		"-o", "UserKnownHostsFile=/dev/null",
		"-o", "IdentitiesOnly=yes",
		"-i", keyPath,
	}
	if targetPort != "" {
		args = append(args, "-p", targetPort)
	}
	args = append(args, fmt.Sprintf("%s@%s", user, targetHost))
	return args
}

func bootstrapLogExcerpt(raw string, maxLines int) string {
	trimmed := strings.TrimSpace(raw)
	if trimmed == "" {
		return ""
	}
	lines := strings.Split(trimmed, "\n")
	if maxLines > 0 && len(lines) > maxLines {
		lines = lines[len(lines)-maxLines:]
	}
	return strings.TrimSpace(strings.Join(lines, "\n"))
}

func sameNode(a *member, b *member) bool {
	if a == nil || b == nil {
		return false
	}
	if stringValue(a.BoundNodeID) != "" && stringValue(a.BoundNodeID) == stringValue(b.BoundNodeID) {
		return true
	}
	return stringValue(a.BoundNodeResourceName) != "" && stringValue(a.BoundNodeResourceName) == stringValue(b.BoundNodeResourceName)
}

func sameBoundNodeID(item *member, boundNodeID *string) bool {
	if item == nil {
		return false
	}
	left := stringValue(item.BoundNodeID)
	right := stringValue(boundNodeID)
	return left != "" && left == right
}

func shellQuote(v string) string {
	return "'" + strings.ReplaceAll(v, "'", `'"'"'`) + "'"
}

func getProjectAllocation(ctx context.Context, client *http.Client, cfg config, allocationID string) (*allocation, error) {
	url := fmt.Sprintf("%s/api/v1/projects/%s/allocations/%s", cfg.APIBaseURL, cfg.ProjectID, allocationID)
	var out allocation
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return &out, nil
}

func getOrgAllocation(ctx context.Context, client *http.Client, cfg config, allocationID string) (*allocation, error) {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/allocations/%s", cfg.APIBaseURL, cfg.OrgID, allocationID)
	var out allocation
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return &out, nil
}

func getSharedRuntime(ctx context.Context, client *http.Client, cfg config) (*sharedAppRuntime, error) {
	url := fmt.Sprintf("%s/api/v1/orgs/%s/shared-app-runtimes/%s", cfg.APIBaseURL, cfg.OrgID, cfg.SharedRuntimeID)
	var out sharedAppRuntime
	if err := doJSON(ctx, client, cfg, http.MethodGet, url, nil, &out); err != nil {
		return nil, err
	}
	return &out, nil
}

func sharedRuntimeControllerAllocationID(item *sharedAppRuntime) (string, error) {
	if item == nil {
		return "", errors.New("shared runtime missing")
	}
	raw, ok := item.PlacementIntent["controller_allocation_ids"]
	if !ok {
		return "", errors.New("shared runtime placement_intent missing controller_allocation_ids")
	}
	switch typed := raw.(type) {
	case []string:
		for _, id := range typed {
			id = strings.TrimSpace(id)
			if id != "" {
				return id, nil
			}
		}
	case []any:
		for _, v := range typed {
			if s, ok := v.(string); ok {
				s = strings.TrimSpace(s)
				if s != "" {
					return s, nil
				}
			}
		}
	}
	return "", errors.New("shared runtime controller allocation not configured")
}

func allocationSSHTarget(item *allocation) (host string, user string, err error) {
	if item == nil {
		return "", "", errors.New("allocation missing")
	}
	if item.Connection.Host != nil {
		host = strings.TrimSpace(*item.Connection.Host)
	}
	if host == "" && item.Connection.Hostname != nil {
		host = strings.TrimSpace(*item.Connection.Hostname)
	}
	if item.Connection.UsernameOnNode != nil {
		user = strings.TrimSpace(*item.Connection.UsernameOnNode)
	}
	if host == "" || user == "" {
		return "", "", errors.New("allocation missing connection.host/connection.hostname or connection.username_on_node")
	}
	port := 0
	if item.Connection.Port != nil {
		port = *item.Connection.Port
	}
	return sshEndpoint(host, port), user, nil
}

func allocationIntentFromAllocation(item *allocation) (host string, user string, boundNodeID *string, detail map[string]any) {
	host, user, _ = allocationSSHTarget(item)
	detail = map[string]any{
		"allocation_id":    item.ID,
		"host":             host,
		"username_on_node": user,
	}
	if item.Connection.Hostname != nil && strings.TrimSpace(*item.Connection.Hostname) != "" {
		detail["hostname"] = strings.TrimSpace(*item.Connection.Hostname)
	}
	if item.Connection.Port != nil && *item.Connection.Port > 0 {
		detail["port"] = *item.Connection.Port
	}
	if item.NodeID != nil && strings.TrimSpace(*item.NodeID) != "" {
		id := strings.TrimSpace(*item.NodeID)
		boundNodeID = &id
		detail["bound_node_id"] = id
	}
	return host, user, boundNodeID, detail
}
