Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 184 additions & 97 deletions custom_components/span_panel/config_flow.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from __future__ import annotations

from collections.abc import Mapping
import enum
import logging
from collections.abc import Mapping
from typing import Any

import voluptuous as vol

from homeassistant import config_entries
from homeassistant.components import zeroconf
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_HOST
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResult
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.util.network import is_ipv4_address

Expand All @@ -20,21 +19,37 @@

_LOGGER = logging.getLogger(__name__)

CONF_SERIAL = "serial"

STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required("host"): str,
vol.Required(CONF_HOST): str,
}
)

STEP_AUTH_TOKEN_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_ACCESS_TOKEN): str,
}
)


def create_api_controller(hass: HomeAssistant, host: str) -> SpanPanelApi:
return SpanPanelApi(host=host, async_client=get_async_client(hass))
class TriggerFlowType(enum.Enum):
CREATE_ENTRY = enum.auto()
UPDATE_ENTRY = enum.auto()


def create_api_controller(
hass: HomeAssistant, host: str, access_token: str | None = None
) -> SpanPanelApi:
params = {"host": host, "async_client": get_async_client(hass)}
if access_token is not None:
params["access_token"] = access_token
return SpanPanelApi(**params)

async def validate_host(hass: HomeAssistant, host: str) -> bool:
span_api = create_api_controller(hass, host)

async def validate_host(
hass: HomeAssistant, host: str, access_token: str | None = None
) -> bool:
span_api = create_api_controller(hass, host, access_token)
return await span_api.ping()


Expand All @@ -46,146 +61,218 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1

def __init__(self) -> None:
self.trigger_flow_type: TriggerFlowType | None = None
self.host: str | None = None
self.serial_number: str | None = None
self.access_token: str | None = None

self._is_flow_setup: bool = False

async def setup_flow(self, trigger_type: TriggerFlowType, host: str):
assert self._is_flow_setup is False

span_api = create_api_controller(self.hass, host)
panel_status = await span_api.get_status_data()

self.trigger_flow_type = trigger_type
self.host = host
self.serial_number = panel_status.serial_number

self.context.setdefault("title_placeholders", {})[CONF_HOST] = self.host

self._is_flow_setup = True

def ensure_flow_is_set_up(self):
assert self._is_flow_setup is True

async def ensure_not_already_configured(self):
self.ensure_flow_is_set_up()

# Abort if we had already set this panel up
await self.async_set_unique_id(self.serial_number)
self._abort_if_unique_id_configured(updates={CONF_HOST: self.host})

async def async_step_zeroconf(
self, discovery_info: zeroconf.ZeroconfServiceInfo
) -> FlowResult:
"""
Handle a flow initialized by zeroconf discovery.
Handle a flow initiated by zeroconf discovery.
"""
_LOGGER.debug("Zeroconf discovered: %s", discovery_info)
# Do not probe device if the host is already configured
self._async_abort_entries_match({CONF_HOST: discovery_info.host})

# Do not probe device if it is not an ipv4 address
if not is_ipv4_address(discovery_info.host):
return self.async_abort(reason="not_ipv4_address")

# Do not probe the device if the host is already configured
self._async_abort_entries_match({CONF_HOST: self.host})

# Validate that this is a valid Span Panel
if not await validate_host(self.hass, discovery_info.host):
return self.async_abort(reason="not_span_panel")

self.host = discovery_info.host
await self.setup_flow(TriggerFlowType.CREATE_ENTRY, discovery_info.host)
await self.ensure_not_already_configured()
return await self.async_step_confirm_discovery()

span_api = create_api_controller(self.hass, self.host)
panel_status = await span_api.get_status_data()
self.serial_number = panel_status.serial_number
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""
Handle a flow initiated by the user.
"""
# Prompt the user for input if haven't done so
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
)

_LOGGER.debug("SN: %s ip %s", self.serial_number, self.host)
# Validate host is a valid Span Panel, prompt user again
if not await validate_host(self.hass, user_input[CONF_HOST]):
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_DATA_SCHEMA,
errors={"base": "cannot_connect"},
)

await self.async_set_unique_id(self.serial_number)
self._abort_if_unique_id_configured(updates={CONF_HOST: self.host})
await self.setup_flow(TriggerFlowType.CREATE_ENTRY, user_input[CONF_HOST])
await self.ensure_not_already_configured()
return await self.async_step_choose_auth_type()

for _ in self._async_current_entries(include_ignore=False):
_LOGGER.debug("entry loop")
async def async_step_reauth(self, entry_data: Mapping[str, Any]) -> FlowResult:
"""
Handle a flow initiated by re-auth.
"""

return await self.async_step_confirm_discovery()
await self.setup_flow(TriggerFlowType.UPDATE_ENTRY, entry_data[CONF_HOST])
return await self.async_step_auth_proximity(entry_data)

async def async_step_confirm_discovery(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""
Confirm a discovered Span Panel.
Prompt user to confirm a discovered Span Panel.
"""
assert self.host is not None
assert self.unique_id is not None
assert self.serial_number is not None
self.ensure_flow_is_set_up()

if user_input is not None:
return self.async_create_entry(
title=f"Span Panel {self.serial_number}",
data={CONF_HOST: self.host},
# Prompt the user for confirmation
if user_input is None:
self._set_confirm_only()
return self.async_show_form(
step_id="confirm_discovery",
description_placeholders={
"host": self.host,
},
)

self._set_confirm_only()
self.context["title_placeholders"] = {
CONF_SERIAL: self.serial_number,
CONF_HOST: self.host,
}
return self.async_show_form(
step_id="confirm_discovery",
description_placeholders={
"serial": self.serial_number,
"host": self.host,
return await self.async_step_choose_auth_type()

async def async_step_choose_auth_type(
self,
) -> FlowResult:
self.ensure_flow_is_set_up()

return self.async_show_menu(
step_id="auth_menu",
menu_options={
"auth_proximity",
"auth_token",
},
)

async def async_step_user(
self, user_input: dict[str, Any] | None = None
async def async_step_auth_proximity(
self,
entry_data: dict[str, Any] | None = None,
) -> FlowResult:
"""
Handle the initial step.
Step that guide users through the proximity authentication process.
"""
if user_input is None:
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
)
self.ensure_flow_is_set_up()

host_valid = await validate_host(self.hass, user_input[CONF_HOST])
if host_valid is False:
errors = {}
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)

span_api = create_api_controller(self.hass, user_input[CONF_HOST])
span_api = create_api_controller(self.hass, self.host)
panel_status = await span_api.get_status_data()

self.host = user_input[CONF_HOST]
self.serial_number = panel_status.serial_number

# Reprompt until we are able to do proximity auth
remaining_presses = panel_status.remaining_auth_unlock_button_presses
if remaining_presses != 0:
return self.async_show_form(
step_id="reauth_confirm",
step_id="auth_proximity",
description_placeholders={"remaining": remaining_presses},
)

access_token = await span_api.get_access_token()
user_input[CONF_ACCESS_TOKEN] = access_token
return self.async_create_entry(
title=panel_status.serial_number, data=user_input
)
# Ensure token is valid
self.access_token = await span_api.get_access_token()
if not await validate_host(self.hass, self.host, self.access_token):
return self.async_abort(reason="invalid_access_token")

async def async_step_reauth(self, entry_data: Mapping[str, Any]) -> FlowResult:
"""
Handle configuration by re-auth.
"""
return await self.async_step_reauth_confirm(entry_data)
return await self.async_step_resolve_entity(entry_data)

async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
async def async_step_auth_token(
self,
user_input: dict[str, Any] | None = None,
) -> FlowResult:
"""
Dialog that informs the user that reauth is required.
Step that prompts user for access token.
"""
host = user_input.get(CONF_HOST, self.host)
assert host
span_api = create_api_controller(self.hass, host)
panel_status = await span_api.get_status_data()
self.ensure_flow_is_set_up()

self.context["title_placeholders"] = {
CONF_SERIAL: panel_status.serial_number,
CONF_HOST: host,
}

remaining_presses = panel_status.remaining_auth_unlock_button_presses
if remaining_presses != 0:
if user_input is None:
return self.async_show_form(
step_id="reauth_confirm",
description_placeholders={"remaining": remaining_presses},
step_id="auth_token", data_schema=STEP_AUTH_TOKEN_DATA_SCHEMA
)

access_token = span_api.get_access_token()
user_input[CONF_HOST] = host
user_input[CONF_ACCESS_TOKEN] = access_token
# Ensure token is valid
self.access_token = user_input[CONF_ACCESS_TOKEN]
if not await validate_host(self.hass, self.host, self.access_token):
return self.async_abort(reason="invalid_access_token")

entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry
self.hass.config_entries.async_update_entry(entry, data=user_input)
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.context["entry_id"])
return await self.async_step_resolve_entity(user_input)

async def async_step_resolve_entity(
self,
entry_data: dict[str, Any] | None = None,
) -> FlowResult:
self.ensure_flow_is_set_up()

# Continue based on flow trigger type
match self.trigger_flow_type:
case TriggerFlowType.CREATE_ENTRY:
return self.create_new_entry(
self.host, self.serial_number, self.access_token
)
case TriggerFlowType.UPDATE_ENTRY:
return self.update_existing_entry(
self.context["entry_id"], self.host, self.access_token, entry_data
)
case _:
raise NotImplementedError()

def create_new_entry(
self, host: str, serial_number: str, access_token: str
) -> FlowResult:
"""
Creates a new SPAN panel entry.
"""
return self.async_create_entry(
title=serial_number, data={CONF_HOST: host, CONF_ACCESS_TOKEN: access_token}
)

def update_existing_entry(
self,
entry_id: str,
host: str,
access_token: str,
entry_data: Mapping[str, Any],
) -> FlowResult:
"""
Updates an existing entry with new configurations.
"""
# Update the existing data with reauthed data
entry_data[CONF_HOST] = host
entry_data[CONF_ACCESS_TOKEN] = access_token

# An existing entry must exist before we can update it
entry = self.hass.config_entries.async_get_entry(entry_id)
assert entry

self.hass.config_entries.async_update_entry(entry, data=entry_data)
self.hass.async_create_task(self.hass.config_entries.async_reload(entry_id))
return self.async_abort(reason="reauth_successful")
4 changes: 3 additions & 1 deletion custom_components/span_panel/const.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Constants for the Span Panel integration."""

from datetime import timedelta
import enum
from datetime import timedelta

DOMAIN = "span_panel"
COORDINATOR = "coordinator"
NAME = "name"

CONF_SERIAL_NUMBER = "serial_number"

URL_STATUS = "http://{}/api/v1/status"
URL_SPACES = "http://{}/api/v1/spaces"
URL_CIRCUITS = "http://{}/api/v1/circuits"
Expand Down
Loading