-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcamera.py
More file actions
89 lines (70 loc) · 2.62 KB
/
camera.py
File metadata and controls
89 lines (70 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Support for the Environment Canada radar imagery."""
import datetime
from env_canada import ECRadar
import voluptuous as vol
from homeassistant.components.camera import PLATFORM_SCHEMA, Camera
from homeassistant.const import (
ATTR_ATTRIBUTION,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_NAME,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from .const import ATTRIBUTION
ATTR_UPDATED = "updated"
CONF_LOOP = "loop"
CONF_PRECIP_TYPE = "precip_type"
MIN_TIME_BETWEEN_UPDATES = datetime.timedelta(minutes=10)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_LOOP, default=True): cv.boolean,
vol.Optional(CONF_NAME): cv.string,
vol.Inclusive(CONF_LATITUDE, "latlon"): cv.latitude,
vol.Inclusive(CONF_LONGITUDE, "latlon"): cv.longitude,
vol.Optional(CONF_PRECIP_TYPE): vol.In(["rain", "snow"]),
}
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the Environment Canada camera."""
lat = config.get(CONF_LATITUDE, hass.config.latitude)
lon = config.get(CONF_LONGITUDE, hass.config.longitude)
radar_object = ECRadar(
coordinates=(lat, lon), precip_type=config.get(CONF_PRECIP_TYPE)
)
async_add_entities(
[ECCamera(radar_object, config.get(CONF_NAME), config[CONF_LOOP])], True
)
class ECCamera(Camera):
"""Implementation of an Environment Canada radar camera."""
def __init__(self, radar_object, camera_name, is_loop):
"""Initialize the camera."""
super().__init__()
self.radar_object = radar_object
self.camera_name = camera_name
self.is_loop = is_loop
self.content_type = "image/gif"
self.image = None
self.timestamp = None
async def async_camera_image(self):
"""Return bytes of camera image."""
await self.async_update()
return self.image
@property
def name(self):
"""Return the name of the camera."""
if self.camera_name is not None:
return self.camera_name
return "Environment Canada Radar"
@property
def extra_state_attributes(self):
"""Return the state attributes of the device."""
return {ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_UPDATED: self.timestamp}
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def async_update(self):
"""Update radar image."""
if self.is_loop:
self.image = await self.radar_object.get_loop()
else:
self.image = await self.radar_object.get_latest_frame()
self.timestamp = self.radar_object.timestamp