Skip to content

Commit

Permalink
Merge pull request #2 from niklasr22/virtualdeck
Browse files Browse the repository at this point in the history
Virtualdeck
  • Loading branch information
niklasr22 authored Feb 23, 2024
2 parents 99bfc70 + 0364e7a commit 11f9f31
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 58 deletions.
28 changes: 20 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,31 @@
This project is a simple and small framework which simplifies developing and running custom apps for streamdeck devices.
It allows using streamdecks without the offical software so you can use it with any major OS.

## Currently supported Streamdecks:
## Currently supported Streamdecks

- Streamdeck Mk.2

## Installing

### Prerequisites:
### Prerequisites

- [hidapi](https://github.com/libusb/hidapi)

### Installing sd-controls:
Linux:

```bash
sudo apt install libhidapi-dev
````

MacOS:

```bash
brew install hidapi
```

### Installing sd-controls

```bash
pip install sd-controls
```

Expand All @@ -24,19 +36,19 @@ pip install sd-controls
If you are interested in this project and have a streamdeck, please consider contributing to it.
Adding support for other Streamdecks is greatly appreciated but feel free to propose any other changes too!

## Using this software with Linux (Linux udev rules):
## Using this software with Linux (Linux udev rules)

Create a `/etc/udev/rules.d/50-elgato.rules` file with the following content:

```
```rules
SUBSYSTEM=="input", GROUP="input", MODE="0666"
SUBSYSTEM=="usb", ATTRS{idVendor}=="0fd9", ATTRS{idProduct}=="0080", MODE:="666", GROUP="plugdev"
KERNEL=="hidraw*", ATTRS{idVendor}=="0fd9", ATTRS{idProduct}=="0080", MODE:="666", GROUP="plugdev"
```

## Example app:
## Example app

```
```python
from PIL import Image
from sd_controls.sdsystem import SDSystem, SDUserApp, Sprites
Expand Down Expand Up @@ -70,4 +82,4 @@ class HelloWorldApp(SDUserApp):
system = SDSystem()
system.register_app(HelloWorldApp())
system.start()
```
```
2 changes: 1 addition & 1 deletion src/sd_controls/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.9"
__version__ = "0.0.10"
94 changes: 69 additions & 25 deletions src/sd_controls/sdsystem.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import asyncio
import inspect
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from enum import Enum
from functools import cache
from pathlib import Path
from threading import Lock, Thread
from typing import Iterator
from typing import Callable, Iterator

import hid
from PIL import Image, ImageDraw, ImageFont

from sd_controls.sprites import Sprites
from sd_controls.streamdeck import StreamDeck, StreamDeckMk2

_LIB_PATH = Path(__file__).parent
_IMG_PATH = _LIB_PATH / "imgs"
_FONT_PATH = _LIB_PATH / "fonts"


Expand All @@ -27,17 +31,10 @@ class Orientation(Enum):
FLIPPED_180 = 2


class Sprites:
CLEAR = Image.open(_IMG_PATH / "clear.jpeg")
BACK_BTN = Image.open(_IMG_PATH / "back_btn.jpeg")
GOAT = Image.open(_IMG_PATH / "goat.jpeg")


class SDSystem:
def __init__(self, orientation=Orientation.DEFAULT, timeout: int = 0) -> None:
def __init__(self, orientation=Orientation.DEFAULT, timeout: int = 0, deck: StreamDeck = None) -> None:
self._apps: list[SDUserApp] = []
self._deck: StreamDeck = None
self._deck_thread: Thread = None
self._deck: StreamDeck = deck
self._running_app: _SDApp = None
self._key_lock = Lock()
self._orientation = orientation
Expand All @@ -46,22 +43,22 @@ def __init__(self, orientation=Orientation.DEFAULT, timeout: int = 0) -> None:
self._connect()

def _connect(self):
decks = SDSystem.find_streamdecks()
if len(decks) == 0:
raise NoStreamDeckFoundExcpetion("There is no streamdeck available")
self._deck: StreamDeck = decks[0]
print("Selected", self._deck)
if not self._deck:
decks = SDSystem.find_streamdecks()
if len(decks) == 0:
raise NoStreamDeckFoundExcpetion("There is no streamdeck available")
self._deck: StreamDeck = decks[0]
print("Selected", self._deck)
self._create_key_map()
self._deck.add_event_listener(self._system_key_listener)

def start(self) -> None:
self._deck.set_standby_timeout(self._default_timeout)
self._deck_thread = Thread(target=self._deck.run)
self._deck_thread.start()
try:
print("Started StreamDeck System")
# starts launchpad
self.close_app()
self._deck_thread.join()
asyncio.run(self._deck.run())
except KeyboardInterrupt:
self.close()
print("Stream Deck System shutdown")
Expand Down Expand Up @@ -140,8 +137,6 @@ def get_brightness(self) -> int:
def _stop_deck(self) -> None:
if self._deck:
self._deck.stop()
if self._deck_thread and self._deck_thread.is_alive():
self._deck_thread.join(2.0)

def close(self) -> None:
self._deck.set_standby_timeout(1)
Expand Down Expand Up @@ -169,13 +164,37 @@ class _SDApp(ABC):
def __init__(self) -> None:
self._running = False
self._system: SDSystem = None
self._key_up_callbacks: dict[int, list[Callable[[], None]]] = defaultdict(list)
self._key_down_callbacks: dict[int, list[Callable[[], None]]] = defaultdict(list)

def set_key(self, key: int, image: Image.Image):
if not self._check_system():
return False

if 0 < key < self._system.get_key_count():
return self._system.set_key(key, image)
return False

def setup_key(self, key: int, *, down: Callable[[], None] = None, up: Callable[[], None] = None) -> bool:
if not self._check_system():
return False
if up:
self._key_up_callbacks[key].append(up)
if down:
self._key_down_callbacks[key].append(down)
return True

def clear_key_event(self, key: int) -> bool:
if not self._check_system():
return False
self._key_up_callbacks[key].clear()
self._key_down_callbacks[key].clear()
return True

def clear_deck(self) -> None:
if not self._check_system():
return False

self._system.clear_deck()

def start(self, system: SDSystem) -> None:
Expand All @@ -184,11 +203,37 @@ def start(self, system: SDSystem) -> None:
self.init()

def key_event(self, keys_before: list[bool], keys: list[bool]):
self.update(keys_before, keys)
self.keys_update(keys_before, keys)
loop = asyncio.get_event_loop()
for key, (before, after) in enumerate(zip(keys_before, keys)):
if before and not after:
for callback in self._key_up_callbacks[key]:
try:
if inspect.iscoroutinefunction(callback):
loop.create_task(callback())
else:
callback()
except Exception as e:
logging.error(f"Error in key up callback for key {key}: {e}")
elif not before and after:
for callback in self._key_down_callbacks[key]:
try:
if inspect.iscoroutinefunction(callback):
loop.create_task(callback())
else:
callback()
except Exception as e:
logging.error(f"Error in key up callback for key {key}: {e}")

def close_app(self):
self._system.close_app()

def _check_system(self) -> bool:
if not self._system:
self._running = False
return False
return True

def stop(self) -> None:
self._running = False
self.on_close()
Expand All @@ -199,8 +244,7 @@ def closed(self) -> None:
@abstractmethod
def init(self) -> None: ...

@abstractmethod
def update(self, keys_before: list[bool], keys: list[bool]) -> None: ...
def keys_update(self, keys_before: list[bool], keys: list[bool]) -> None: ...

def on_close(self) -> None: ...

Expand Down Expand Up @@ -273,7 +317,7 @@ def init(self) -> None:
self.set_key(key, app.get_icon())
self.apps[key] = app

def update(self, keys_before: list[bool], keys: list[bool]) -> None:
def keys_update(self, keys_before: list[bool], keys: list[bool]) -> None:
for key, (before, after) in enumerate(zip(keys_before, keys)):
if before and not after and key < len(self.apps):
self._system._start_app(self.apps[key])
Expand Down
12 changes: 12 additions & 0 deletions src/sd_controls/sprites.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

from PIL import Image

_LIB_PATH = Path(__file__).parent
_IMG_PATH = _LIB_PATH / "imgs"


class Sprites:
CLEAR = Image.open(_IMG_PATH / "clear.jpeg")
BACK_BTN = Image.open(_IMG_PATH / "back_btn.jpeg")
GOAT = Image.open(_IMG_PATH / "goat.jpeg")
74 changes: 50 additions & 24 deletions src/sd_controls/streamdeck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import io
from abc import ABC, abstractmethod
from typing import Callable, Self
Expand All @@ -7,24 +8,17 @@


class StreamDeck(ABC):
_PID: int = 0
_ICON_SIZE: int = 0
_KEY_COUNT: int = 0
_KEY_DATA_OFFSET: int = 0
_IMAGE_CMD_HEADER_LENGTH: int = 0
_IMAGE_CMD_MAX_PAYLOAD_LENGTH: int = 0

def __init__(self, device: hid.Device, read_interval: int = 100, buffer_size: int = 1024) -> None:
self._device = device
self._read_interval = read_interval
self._buffer_size = buffer_size
def __init__(self) -> None:
self._event_listeners: list[callable[[StreamDeck], None]] = []
self._keys: list[bool] = [False] * self._KEY_COUNT
self._running = False

def __str__(self) -> str:
return f"{self._device.product} ({self._device.manufacturer})"

def set_brightness(self, percentage: int) -> None:
self._brightness = percentage

Expand All @@ -37,30 +31,25 @@ def set_standby_timeout(self, timeout_secs: int) -> None:
def get_standby_timeout(self) -> None:
return self._timeout

@abstractmethod
def _get_send_image_command_header(
self, key: int, is_last_package: bool, payload_length: int, package_index: int
) -> bytes:
...

def get_key_count(self) -> int:
return self._KEY_COUNT

def get_keys(self) -> list[bool]:
return self._keys

def run(self) -> None:
async def run(self) -> None:
self._running = True
try:
while self._running:
data = self._device.read(self._buffer_size, self._read_interval)
if len(data) > 0:
keys_before = self._keys.copy()
self._keys = [
bool(k) for k in data[self._KEY_DATA_OFFSET : self._KEY_DATA_OFFSET + self._KEY_COUNT]
]
for listener in self._event_listeners:
listener(self, keys_before, self._keys.copy())
data = self._get_data()
if data is None:
continue

keys_before = self._keys.copy()
self._keys = data
for listener in self._event_listeners:
listener(self, keys_before, self._keys.copy())
await asyncio.sleep(0)
except (KeyboardInterrupt, hid.HIDException):
self._running = False

Expand All @@ -73,11 +62,48 @@ def add_event_listener(self, callback: Callable[[Self, list[bool], list[bool]],
def clear_event_listeners(self) -> None:
self._event_listeners.clear()

@abstractmethod
def set_key_image(self, key: int, image: Image.Image) -> bool: ...

@abstractmethod
def _get_data(self) -> list[bool] | None: ...


class HardwareStreamDeck(ABC):
_PID: int = 0
_ICON_SIZE: int = 0
_KEY_COUNT: int = 0
_KEY_DATA_OFFSET: int = 0
_IMAGE_CMD_HEADER_LENGTH: int = 0
_IMAGE_CMD_MAX_PAYLOAD_LENGTH: int = 0

def __init__(self, device: hid.Device, read_interval: int = 100, buffer_size: int = 1024) -> None:
self._device = device
self._read_interval = read_interval
self._buffer_size = buffer_size
self._event_listeners: list[callable[[StreamDeck], None]] = []
self._keys: list[bool] = [False] * self._KEY_COUNT
self._running = False

def __str__(self) -> str:
return f"{self._device.product} ({self._device.manufacturer})"

@abstractmethod
def _get_send_image_command_header(
self, key: int, is_last_package: bool, payload_length: int, package_index: int
) -> bytes: ...

def __del__(self):
print("Close device")
self._device.close()

def set_key_image(self, key: int, image: Image) -> bool:
def _get_data(self) -> list[bool] | None:
data = self._device.read(self._buffer_size, self._read_interval)
if len(data) > 0:
return [bool(k) for k in data[self._KEY_DATA_OFFSET : self._KEY_DATA_OFFSET + self._KEY_COUNT]]
return None

def set_key_image(self, key: int, image: Image.Image) -> bool:
max_payload_length = self._IMAGE_CMD_MAX_PAYLOAD_LENGTH

img_byte_buffer = io.BytesIO()
Expand Down
Loading

0 comments on commit 11f9f31

Please sign in to comment.