Skip to content

Commit

Permalink
Add support for separate fan speed and on/off GAs (#967)
Browse files Browse the repository at this point in the history
* Add support for separate fan speed and on/off GAs

* Use `self.switch.initialized`

* Logic fixes to react to telegrams

* Ensure switch is always created

* Refactor to use explicit switch GA and add tests

* Cleanup

* More tests

* Even more tests + typo fixes

* Even more tests...again

* Add docs

* Update changelog

* Docs example fixes

* Remove `set_on()` and `set_off()`

Co-authored-by: Matthias Alphart <[email protected]>

* Move default turn on speed into xknx

* Move the speed `None` check

Co-authored-by: Matthias Alphart <[email protected]>
  • Loading branch information
spacegaier and farmio authored Jun 7, 2022
1 parent 4cdbc59 commit fa4788d
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 15 deletions.
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ nav_order: 2
- Lock sending telegrams via a Tunnel until a confirmation is received
- Use device subclass for `device_updated_cb` callback argument type hint
- Fix CEMI Frame Ack-request flag set wrongly
- Fan: Add support for dedicated on/off switch GA

## 0.21.3 Cover updates 2022-05-17

Expand Down
27 changes: 17 additions & 10 deletions docs/fan.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,33 @@ nav_order: 4

## [](#header-2)Overview

Fans are simple representations of KNX controlled fans. They support setting the speed and the oscillation.
Fans are simple representations of KNX controlled fans. They support switching on/off, setting the speed and the oscillation.

## [](#header-2)Interface

- `xknx` XKNX object.
- `name` name of the device.
- `group_address` is the KNX group address of the fan speed. Used for sending. *DPT 5.001 / 5.010*
- `group_address_state` is the KNX group address of the fan speed state. Used for updating and reading state. *DPT 5.001 / 5.010*
- `group_address_speed` is the KNX group address of the fan speed. Used for sending. If no `group_address_switch` is provided, it will implicitly control switching the fan on/off as well. *DPT 5.001 / 5.010*
- `group_address_speed_state` is the KNX group address of the fan speed state. Used for updating and reading state. *DPT 5.001 / 5.010*
- `group_address_oscillation` is the KNX group address of the oscillation. Used for sending. *DPT 1.001*
- `group_address_oscillation_state` is the KNX group address of the fan oscillation state. Used for updating and reading state. *DPT 1.001*
- `group_address_switch` is the KNX group address of the fan on/off state. If not used, on/off will implicitly be controlled via `group_address_speed` instead. Used for sending. *DPT 1.001*
- `group_address_switch_state` is the KNX group address of the fan on/off state. Used for updating and reading state. *DPT 1.001*
- `sync_state` defines if and how often the value should be actively read from the bus. If `False` no GroupValueRead telegrams will be sent to its group address. Defaults to `True`
- `device_updated_cb` awaitable callback for each update.
- `max_step` Maximum step amount for fans which are controlled with steps and not percentage. If this attribute is set, the fan is controlled by sending the step value in the range `0` and `max_step`. In that case, the group address DPT changes from *DPT 5.001* to *DPT 5.010*. Default: None

## [](#header-2)Example

```python
fan = Fan(xknx,
'TestFan',
group_address='1/2/1',
group_address_state='1/2/2',
group_address_oscillation='1/2/3',
group_address_oscillation_state='1/2/4')
fan = Fan(
xknx,
'TestFan',
group_address_speed='1/2/1',
group_address_speed_state='1/2/2',
group_address_oscillation='1/2/3',
group_address_oscillation_state='1/2/4'
)

# Set the fan speed
await fan.set_speed(50)
Expand All @@ -42,9 +46,12 @@ print(fan.current_speed)
# Set the oscillation
await fan.set_oscillation(True)

# Accessing speed
# Accessing oscillation
print(fan.current_oscillation)

# Accessing on/off state
print(fan.is_on)

# Requesting state via KNX GroupValueRead
await fan.sync()
```
205 changes: 202 additions & 3 deletions test/devices_tests/fan_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,83 @@ async def test_sync_state_address(self):
)

#
# TEST SWITCH ON/OFF
#
async def test_switch_on_off(self):
"""Test switching on/off of a Fan."""
xknx = XKNX()
fan = Fan(xknx, name="TestFan", group_address_speed="1/2/3")

# Turn the fan on via speed GA. First try without providing a speed,
# which will set it to the default 50% percentage.
await fan.turn_on()
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
# 128 is 50% as byte (0...255)
assert telegram == Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(128)),
)

# Try again, but this time with a speed provided
await fan.turn_on(55)
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
# 140 is 55% as byte (0...255)
assert telegram == Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(140)),
)

# Turn the fan off via the speed GA
await fan.turn_off()
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(0)),
)

fan_with_switch = Fan(
xknx,
name="TestFanSwitch",
group_address_speed="1/2/3",
group_address_switch="4/5/6",
)

# Turn the fan on via the switch GA, which should not adjust the speed
await fan_with_switch.turn_on()
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("4/5/6"),
payload=GroupValueWrite(DPTBinary(1)),
)

# Turn the fan off via the switch GA
await fan_with_switch.turn_off()
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("4/5/6"),
payload=GroupValueWrite(DPTBinary(0)),
)

# Turn the fan on again this time with a provided speed, which for a switch GA fan
# should result in separate telegrams to switch on the fan and then set the speed.
await fan_with_switch.turn_on(55)
assert xknx.telegrams.qsize() == 2
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("4/5/6"),
payload=GroupValueWrite(DPTBinary(1)),
)
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(140)),
)

#
# TEST SET SPEED
#
Expand All @@ -75,8 +152,51 @@ async def test_set_speed(self):
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(140)),
)
await fan.process(telegram)
assert fan.is_on == True

# A speed of 0 will turn off the fan implicitly if there is no
# dedicated switch GA
await fan.set_speed(0)
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
# 140 is 55% as byte (0...255)
assert telegram == Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(0)),
)
await fan.process(telegram)
assert fan.is_on == False

fan_with_switch = Fan(
xknx,
name="TestFan",
group_address_speed="1/2/3",
group_address_switch="4/5/6",
)
await fan_with_switch.turn_on()
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("4/5/6"),
payload=GroupValueWrite(DPTBinary(1)),
)
await fan_with_switch.process(telegram)
assert fan_with_switch.is_on == True

# A speed of 0 will not turn off the fan implicitly if there is a
# dedicated switch GA defined. So we only expect a speed change telegram,
# but no state switch one.
await fan_with_switch.set_speed(0)
assert xknx.telegrams.qsize() == 1
telegram = xknx.telegrams.get_nowait()
assert telegram == Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(0)),
)
await fan_with_switch.process(telegram)
assert fan_with_switch.is_on == True

#
#
# TEST SET SPEED STEP
#
Expand All @@ -98,7 +218,6 @@ async def test_set_speed_step(self):
payload=GroupValueWrite(DPTArray(2)),
)

#
#
# TEST SET OSCILLATION
#
Expand All @@ -120,12 +239,13 @@ async def test_set_oscillation(self):
)

#
# TEST PROCESS
# TEST PROCESS SPEED
#
async def test_process_speed(self):
"""Test process / reading telegrams from telegram queue. Test if speed is processed."""
xknx = XKNX()
fan = Fan(xknx, name="TestFan", group_address_speed="1/2/3")
assert fan.is_on is False
assert fan.current_speed is None

# 140 is 55% as byte (0...255)
Expand All @@ -134,8 +254,20 @@ async def test_process_speed(self):
payload=GroupValueWrite(DPTArray(140)),
)
await fan.process(telegram)
# Setting a speed for a fan that has no dedicated switch GA,
# should turn on the fan.
assert fan.is_on is True
assert fan.current_speed == 55

# Now set a speed of zero which should turn off the fan.
telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(0)),
)
await fan.process(telegram)
assert fan.is_on is False
assert fan.current_speed == 0

async def test_process_speed_wrong_payload(self):
"""Test process wrong telegrams. (wrong payload type)."""
xknx = XKNX()
Expand All @@ -152,6 +284,69 @@ async def test_process_speed_wrong_payload(self):
log_mock.assert_called_once()
cb_mock.assert_not_called()

#
# TEST PROCESS SWITCH
#
async def test_process_switch(self):
"""Test process / reading telegrams from telegram queue. Test if switch is handled correctly."""
xknx = XKNX()
fan = Fan(
xknx,
name="TestFan",
group_address_speed="1/2/3",
group_address_switch="4/5/6",
)
assert fan.is_on is False
assert fan.current_speed is None

# 140 is 55% as byte (0...255)
telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(140)),
)
await fan.process(telegram)
# Setting a speed for a fan with dedicated switch GA,
# should not turn on the fan
assert fan.is_on is False
assert fan.current_speed == 55

# Now turn on the fan via its switch GA
telegram = Telegram(
destination_address=GroupAddress("4/5/6"),
payload=GroupValueWrite(DPTBinary(1)),
)
await fan.process(telegram)
assert fan.is_on is True
assert fan.current_speed == 55

# Setting a speed of 0 should not turn off the fan
telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(0)),
)
await fan.process(telegram)
assert fan.is_on is True
assert fan.current_speed == 0

# Set the speed again so we can verify that switching off the fan does not
# modify the set speed
telegram = Telegram(
destination_address=GroupAddress("1/2/3"),
payload=GroupValueWrite(DPTArray(140)),
)
await fan.process(telegram)
assert fan.is_on is True
assert fan.current_speed == 55

# Now turn off the fan via the dedicated switch GA
telegram = Telegram(
destination_address=GroupAddress("4/5/6"),
payload=GroupValueWrite(DPTBinary(0)),
)
await fan.process(telegram)
assert fan.is_on is False
assert fan.current_speed == 55

#
# TEST PROCESS OSCILLATION
#
Expand Down Expand Up @@ -221,9 +416,13 @@ def test_has_group_address(self):
group_address_speed_state="1/7/2",
group_address_oscillation="1/6/1",
group_address_oscillation_state="1/6/2",
group_address_switch="1/5/1",
group_address_switch_state="1/5/2",
)
assert fan.has_group_address(GroupAddress("1/7/1"))
assert fan.has_group_address(GroupAddress("1/7/2"))
assert not fan.has_group_address(GroupAddress("1/7/3"))
assert fan.has_group_address(GroupAddress("1/6/1"))
assert fan.has_group_address(GroupAddress("1/6/2"))
assert fan.has_group_address(GroupAddress("1/5/1"))
assert fan.has_group_address(GroupAddress("1/5/2"))
Loading

0 comments on commit fa4788d

Please sign in to comment.