-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig_flow.py
More file actions
263 lines (231 loc) · 10.4 KB
/
config_flow.py
File metadata and controls
263 lines (231 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""Config flow for the SSH Docker integration."""
from __future__ import annotations
import json
import logging
from typing import Any
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult, ConfigEntry
from homeassistant.const import CONF_HOST, CONF_USERNAME, CONF_PASSWORD, CONF_NAME
from homeassistant.core import HomeAssistant, callback
from .const import (
DOMAIN, CONF_SERVICE, CONF_KEY_FILE, CONF_CHECK_KNOWN_HOSTS, CONF_KNOWN_HOSTS,
CONF_DOCKER_COMMAND, CONF_AUTO_UPDATE, CONF_CHECK_FOR_UPDATES,
DEFAULT_DOCKER_COMMAND, DEFAULT_CHECK_KNOWN_HOSTS, DEFAULT_AUTO_UPDATE, DEFAULT_CHECK_FOR_UPDATES,
SSH_COMMAND_DOMAIN, SSH_COMMAND_SERVICE_EXECUTE,
SSH_CONF_OUTPUT, SSH_CONF_EXIT_STATUS,
DOCKER_SERVICES_EXECUTABLE, DEFAULT_TIMEOUT,
)
from .options_flow import SshDockerOptionsFlow, validate_and_build_options
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): str,
vol.Required(CONF_SERVICE): str,
vol.Required(CONF_HOST): str,
vol.Required(CONF_USERNAME): str,
vol.Optional(CONF_PASSWORD): str,
vol.Optional(CONF_KEY_FILE): str,
vol.Optional(CONF_CHECK_KNOWN_HOSTS, default=DEFAULT_CHECK_KNOWN_HOSTS): bool,
vol.Optional(CONF_KNOWN_HOSTS): str,
vol.Optional(CONF_DOCKER_COMMAND, default=DEFAULT_DOCKER_COMMAND): str,
vol.Optional(CONF_CHECK_FOR_UPDATES, default=DEFAULT_CHECK_FOR_UPDATES): bool,
vol.Optional(CONF_AUTO_UPDATE, default=DEFAULT_AUTO_UPDATE): bool,
}
)
async def _check_service_exists(
hass: HomeAssistant, options: dict[str, Any], service: str
) -> str | None:
"""Check whether a service name is present on the remote host.
Runs the same discovery logic used by the integration: prefers the
``docker_services`` executable and falls back to ``docker ps -a``.
Returns ``None`` when the service is found *or* when the check cannot be
performed (SSH unreachable, empty output, etc.). Returns
``"service_not_found"`` only when a non-empty list was retrieved and the
requested name is absent from it.
"""
docker_cmd = options.get(CONF_DOCKER_COMMAND, DEFAULT_DOCKER_COMMAND)
discover_cmd = (
f"if command -v {DOCKER_SERVICES_EXECUTABLE} >/dev/null 2>&1; then"
f" {DOCKER_SERVICES_EXECUTABLE};"
f" elif test -f /usr/bin/{DOCKER_SERVICES_EXECUTABLE}; then"
f" /usr/bin/{DOCKER_SERVICES_EXECUTABLE};"
f" else {docker_cmd} ps -a --format '{{{{.Names}}}}'; fi"
)
service_data: dict[str, Any] = {
CONF_HOST: options.get(CONF_HOST, ""),
CONF_USERNAME: options.get(CONF_USERNAME, ""),
"check_known_hosts": options.get(CONF_CHECK_KNOWN_HOSTS, DEFAULT_CHECK_KNOWN_HOSTS),
"command": discover_cmd,
"timeout": DEFAULT_TIMEOUT,
}
if options.get(CONF_PASSWORD):
service_data[CONF_PASSWORD] = options[CONF_PASSWORD]
if options.get(CONF_KEY_FILE):
service_data["key_file"] = options[CONF_KEY_FILE]
if options.get(CONF_KNOWN_HOSTS):
service_data["known_hosts"] = options[CONF_KNOWN_HOSTS]
try:
response = await hass.services.async_call(
SSH_COMMAND_DOMAIN,
SSH_COMMAND_SERVICE_EXECUTE,
service_data,
blocking=True,
return_response=True,
)
except Exception: # pylint: disable=broad-except
_LOGGER.debug(
"Service existence check for %s on %s failed, skipping check",
service,
options.get(CONF_HOST, "<unknown>"),
)
return None
output = ((response or {}).get(SSH_CONF_OUTPUT, "") or "").strip()
exit_status = (response or {}).get(SSH_CONF_EXIT_STATUS, 1)
if exit_status != 0 or not output:
_LOGGER.debug(
"Service listing on %s returned no usable output, skipping existence check",
options.get(CONF_HOST, "<unknown>"),
)
return None
try:
parsed = json.loads(output)
service_names = [str(s) for s in parsed if s] if isinstance(parsed, list) else []
except (json.JSONDecodeError, ValueError):
service_names = [s for s in output.replace(",", " ").split() if s]
if not service_names:
_LOGGER.debug(
"Service listing on %s returned an empty list, skipping existence check",
options.get(CONF_HOST, "<unknown>"),
)
return None
if service in service_names:
_LOGGER.debug("Service %s confirmed on %s", service, options.get(CONF_HOST, "<unknown>"))
return None
_LOGGER.warning(
"Service %s not found on %s. Available services: %s",
service,
options.get(CONF_HOST, "<unknown>"),
service_names,
)
return "service_not_found"
def _build_user_schema(defaults: dict[str, Any]) -> vol.Schema:
"""Build the user-step schema, optionally pre-filled with *defaults*."""
return vol.Schema(
{
vol.Required(CONF_NAME, default=defaults.get(CONF_NAME, "")): str,
vol.Required(CONF_SERVICE, default=defaults.get(CONF_SERVICE, "")): str,
vol.Required(CONF_HOST, default=defaults.get(CONF_HOST, "")): str,
vol.Required(CONF_USERNAME, default=defaults.get(CONF_USERNAME, "")): str,
vol.Optional(CONF_PASSWORD, default=defaults.get(CONF_PASSWORD, "")): str,
vol.Optional(CONF_KEY_FILE, default=defaults.get(CONF_KEY_FILE, "")): str,
vol.Optional(
CONF_CHECK_KNOWN_HOSTS,
default=defaults.get(CONF_CHECK_KNOWN_HOSTS, DEFAULT_CHECK_KNOWN_HOSTS),
): bool,
vol.Optional(CONF_KNOWN_HOSTS, default=defaults.get(CONF_KNOWN_HOSTS, "")): str,
vol.Optional(
CONF_DOCKER_COMMAND,
default=defaults.get(CONF_DOCKER_COMMAND, DEFAULT_DOCKER_COMMAND),
): str,
vol.Optional(
CONF_CHECK_FOR_UPDATES,
default=defaults.get(CONF_CHECK_FOR_UPDATES, DEFAULT_CHECK_FOR_UPDATES),
): bool,
vol.Optional(
CONF_AUTO_UPDATE,
default=defaults.get(CONF_AUTO_UPDATE, DEFAULT_AUTO_UPDATE),
): bool,
}
)
class SshDockerConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for SSH Docker."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
discovery = getattr(self, "_discovery_info", {})
if user_input is not None:
name = user_input[CONF_NAME]
service = user_input[CONF_SERVICE]
_LOGGER.debug(
"Config flow user step: validating entry for service %s (name: %s) on %s",
service,
name,
user_input.get(CONF_HOST, "<unknown>"),
)
# name must be unique across all entries
existing_names = {
e.data[CONF_NAME]
for e in self.hass.config_entries.async_entries(DOMAIN)
}
if name in existing_names:
errors["base"] = "already_configured"
else:
# unique_id is based on host+service to prevent adding the same
# container on the same host twice. raise_on_progress=False lets
# the user manually add a service even when a discovery flow for
# the same host+service is already in progress, instead of
# aborting with an unhelpful "already_in_progress" error.
await self.async_set_unique_id(
f"{user_input[CONF_HOST]}_{service}", raise_on_progress=False
)
self._abort_if_unique_id_configured()
options, error_key = await validate_and_build_options(self.hass, user_input)
if error_key:
_LOGGER.debug(
"Config flow validation failed for service %s: %s", service, error_key
)
errors["base"] = error_key
else:
error_key = await _check_service_exists(self.hass, options, service)
if error_key:
_LOGGER.debug(
"Service existence check failed for %s: %s", service, error_key
)
errors["base"] = error_key
else:
_LOGGER.info(
"Config entry created for service %s (name: %s) on %s",
service,
name,
user_input[CONF_HOST],
)
return self.async_create_entry(
title=name,
data={CONF_NAME: name, CONF_SERVICE: service},
options=options,
)
if user_input is not None:
# Re-show the form with the values the user already entered so they
# don't have to type everything again after a validation error.
schema = _build_user_schema(user_input)
elif discovery:
schema = _build_user_schema(discovery)
else:
schema = STEP_USER_DATA_SCHEMA
return self.async_show_form(
step_id="user",
data_schema=schema,
errors=errors,
)
async def async_step_discovery(
self, discovery_info: dict[str, Any]
) -> ConfigFlowResult:
"""Handle a discovered docker service."""
service = discovery_info.get(CONF_SERVICE, discovery_info.get(CONF_NAME, ""))
host = discovery_info.get(CONF_HOST, "")
_LOGGER.debug("Discovery flow: service %s found on %s", service, host)
await self.async_set_unique_id(f"{host}_{service}")
self._abort_if_unique_id_configured()
self.context["title_placeholders"] = {CONF_NAME: service, CONF_HOST: host}
self._discovery_info = discovery_info # pylint: disable=attribute-defined-outside-init
return await self.async_step_user()
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry, # pylint: disable=unused-argument
) -> SshDockerOptionsFlow:
"""Create the options flow."""
return SshDockerOptionsFlow()