Skip to content

Commit

Permalink
feat: evented track input
Browse files Browse the repository at this point in the history
  • Loading branch information
hairmare committed Jan 1, 2022
1 parent 407b062 commit a461def
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ build/
.eggs/
dist/
*.egg-info/
htmlcov/
45 changes: 45 additions & 0 deletions nowplaying/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import logging
from queue import Queue

import cherrypy
import isodate
from cloudevents.exceptions import GenericException as CloudEventException
from cloudevents.http import from_http

logger = logging.getLogger(__name__)

_RABE_CLOUD_EVENTS_SUBS = (
"ch.rabe.api.events.track.v1.trackStarted",
"ch.rabe.api.events.track.v1.trackFinished",
)


class ApiServer:
"""The API server."""

def __init__(self, event_queue: Queue):
self.event_queue = event_queue

@cherrypy.expose
@cherrypy.tools.json_in(
content_type=("application/json", "application/cloudevents+json")
)
def webhook(self):
"""Receive a CloudEvent and put it into the event queue."""
try:
event = from_http(
cherrypy.request.headers, json.dumps(cherrypy.request.json)
)
except CloudEventException as error:
cherrypy.response.status = f"400 {error}"
return

logger.info("Received event: %s", event)

if event["time"]:
event["time"] = isodate.parse_datetime(event["time"])
if event["type"] in _RABE_CLOUD_EVENTS_SUBS:
self.event_queue.put(event)

cherrypy.response.status = "204 Event Received"
48 changes: 47 additions & 1 deletion nowplaying/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import signal
import sys
import time
from queue import Queue
from threading import Thread

import cherrypy
from api import ApiServer
from cloudevents.http.event import CloudEvent
from input import observer as inputObservers
from input.handler import InputHandler
from misc.saemubox import SaemuBox
Expand Down Expand Up @@ -35,10 +40,51 @@ def main(self):
logger.exception("Error: %s", e)
sys.exit(-1)

self.event_queue = Queue()

_thread = Thread(target=self._main_loop, args=(input_handler,))
_thread.daemon = True
_thread.start()

self._start_apiserver()

def _start_apiserver(self):
conf = {
"/": {
"tools.auth_digest.on": True,
"tools.auth_digest.realm": "localhost",
"tools.auth_digest.get_ha1": cherrypy.lib.auth_digest.get_ha1_dict_plain(
self.options.digestAuthUsers
),
"tools.auth_digest.key": self.options.digestAuthKey,
"tools.auth_digest.accept_charset": "UTF-8",
}
}
cherrypy.config.update({"server.socket_host": self.options.apiBindAddress})
cherrypy.config.update({"server.socket_port": self.options.apiPort})
logger.info("Starting web server")
cherrypy.quickstart(ApiServer(self.event_queue), "/", conf)

def _main_loop(self, input_handler: InputHandler):
"""
Main loop of the daemon.
Should be run in a thread.
"""
logger.info("Starting main loop")
while True:
try:
saemubox_id = self.poll_saemubox()

while not self.event_queue.empty():
logger.debug("Queue size: %i" % self.event_queue.qsize())
event: CloudEvent = self.event_queue.get()
logger.info(
"Handling update from event: %s, source: %s"
% (event["type"], event["source"])
)
input_handler.update(saemubox_id, event)

input_handler.update(saemubox_id)
except Exception as e:
logger.exception("Error: %s", e)
Expand Down Expand Up @@ -92,7 +138,7 @@ def get_input_handler(self):

return handler

def poll_saemubox(self):
def poll_saemubox(self) -> int:
"""
Poll Saemubox for new data.
Expand Down
7 changes: 4 additions & 3 deletions nowplaying/input/handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import logging.handlers

from cloudevents.http.event import CloudEvent
from input.observer import InputObserver

logger = logging.getLogger(__name__)
Expand All @@ -13,7 +14,7 @@ class InputHandler:
"""

def __init__(self):
self.__observers = []
self.__observers: list[InputObserver] = []

def register_observer(self, observer: InputObserver):
logger.info("Registering InputObserver '%s'" % observer.__class__.__name__)
Expand All @@ -22,12 +23,12 @@ def register_observer(self, observer: InputObserver):
def remove_observer(self, observer: InputObserver):
self.__observers.remove(observer)

def update(self, saemubox_id: int):
def update(self, saemubox_id: int, event: CloudEvent = None):
for observer in self.__observers:
logger.debug("Sending update event to observer %s" % observer.__class__)

try:
observer.update(saemubox_id)
observer.update(saemubox_id, event)
except Exception as e:
logger.error("InputObserver (%s): %s" % (observer.__class__, e))
logger.exception(e)
125 changes: 95 additions & 30 deletions nowplaying/input/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

import isodate
import pytz
from cloudevents.http.event import CloudEvent
from show import client
from show.show import Show
from track.handler import TrackEventHandler
from track.track import DEFAULT_ARTIST, DEFAULT_TITLE, Track

logger = logging.getLogger(__name__)
Expand All @@ -21,79 +23,108 @@ class InputObserver(ABC):
_SHOW_NAME_KLANGBECKEN = "Klangbecken"
_SHOW_URL_KLANGBECKEN = "http://www.rabe.ch/sendungen/musik/klangbecken.html"

def __init__(self, current_show_url):
def __init__(self, current_show_url: str):
self.show: Show
self.track_handler: TrackEventHandler
self.previous_saemubox_id: int = -1
self.first_run = True
self.previous_show_uuid = ""

self.current_show_url = current_show_url

self.first_run = True
self.previous_saemubox_id = None
self.show = None
self.showclient = client.ShowClient(current_show_url)
self.show = self.showclient.get_show_info()

self.previous_show_uuid = None
def add_track_handler(self, track_handler: TrackEventHandler):
self.track_handler = track_handler

self.track_handler = None
def update(self, saemubox_id: int, event: CloudEvent = None):
if self.handle_id(saemubox_id, event):
self.handle(event)

def add_track_handler(self, track_handler):
self.track_handler = track_handler
@abstractmethod
# TODO Deprecate this method
def handle_id(
self, saemubox_id: int, event: CloudEvent = None
): # pragma: no coverage
pass

def update(self, saemubox_id):
if self.handle_id(saemubox_id):
self.handle()
@abstractmethod
# TODO Deprecate this method
def handle(self, event: CloudEvent = None): # pragma: no coverage
pass

@abstractmethod
def handle_id(self, saemubox_id): # pragma: no coverage
def handles(self, event: CloudEvent) -> bool: # pragma: no coverage
pass

@abstractmethod
def handle(self): # pragma: no coverage
def event(self, event: CloudEvent): # pragma: no coverage
pass


class KlangbeckenInputObserver(InputObserver):
"""Observe cases where Sämu Box says Klangbecken is running and we can consume now-playing.xml input."""

def __init__(self, current_show_url, input_file):
warnings.warn(
"The now-playing.xml format from Loopy/Klangbecken will be replaced in the future",
PendingDeprecationWarning,
)
InputObserver.__init__(self, current_show_url)
def __init__(self, current_show_url: str, input_file: str = None):
if input_file:
warnings.warn(
"The now-playing.xml format from Loopy/Klangbecken will be replaced in the future",
PendingDeprecationWarning,
)
self.input_file = input_file
self.last_modify_time = os.stat(self.input_file).st_mtime

self.track: Track
super().__init__(current_show_url)

self.input_file = input_file
self.last_modify_time = os.stat(self.input_file).st_mtime
def handles(self, event: CloudEvent) -> bool:
# TODO make magic string configurable
# TODO check if source is currently on-air
return event["source"] == "https://github/radiorabe/klangbecken"

self.track = None
def event(self, event: CloudEvent):
self._handle(event)

def handle_id(self, saemubox_id):
def handle_id(self, saemubox_id: int, event: CloudEvent = None):
# only handle Klangbecken output
if saemubox_id == 1:
return True

return False
return self.handles(event)

def handle(self):
# @TODO: replace the stat method with inotify
modify_time = os.stat(self.input_file).st_mtime
def handle(self, event: CloudEvent = None):
self._handle(event)

def _handle(self, event: CloudEvent = None):
if not event:
# @TODO: replace the stat method with inotify
modify_time = os.stat(self.input_file).st_mtime

# @TODO: Need to check if we have a stale file and send default
# track infos in this case. This might happend if loopy
# went out for lunch...
# pseudo code: now > modify_time + self.track.get_duration()

if self.first_run or modify_time > self.last_modify_time:
if self.first_run or event or modify_time > self.last_modify_time:
logger.info("Now playing file changed")

self.show = self.showclient.get_show_info()
self.last_modify_time = modify_time

if event:
self.track = self.parse_event(event)
self.first_run = False

logger.info("First run: %s" % self.first_run)

if not self.first_run:
logger.info("calling track_finished")
self.track_handler.track_finished(self.track)

self.track = self.get_track_info()
if not event:
# TODO remove once legacy xml is gone
self.track = self.get_track_info()

# Klangbecken acts as a failover and last resort input, if other
# active inputs are silent or have problems.
Expand All @@ -116,6 +147,7 @@ def handle(self):

self.track.set_show(self.show)

# TODO: or finished?
self.track_handler.track_started(self.track)

self.first_run = False
Expand Down Expand Up @@ -180,14 +212,47 @@ def get_track_info(self):

return current_track

def parse_event(self, event: CloudEvent) -> Track:
track = Track()
logger.info("Parsing event: %s" % event)

track.set_artist(event.data["item.artist"])
track.set_title(event.data["item.title"])

if event["type"] == "ch.rabe.api.events.track.v1.trackStarted":
track.set_starttime(event["time"])
elif event["type"] == "ch.rabe.api.events.track.v1.trackFinished":
# TODO consider using now() instead of event['time']
track.set_endtime(event["time"])

if "item.length" in event.data:
track.set_duration(event.data["item.length"])

logger.info("Track: %s" % track)
return track


class NonKlangbeckenInputObserver(InputObserver):
"""Observer for input that doesn't originate from klangbecken and therefore misses the track information.
Uses the show's name instead of the actual track infos
"""

def handle_id(self, saemubox_id):
def handles(self, event: CloudEvent) -> bool:
"""Do not handle events yet.
TODO implement this method
"""
return False

def event(self, event: CloudEvent):
"""Do not handle events yet.
TODO implement this method
"""
super().event(event)

def handle_id(self, saemubox_id: int, event: CloudEvent = None):

if saemubox_id != self.previous_saemubox_id:
# If sämubox changes, force a show update, this acts as
Expand Down
24 changes: 24 additions & 0 deletions nowplaying/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,30 @@ def __init__(self):
help="ticker XML output format",
default="/var/www/localhost/htdocs/songticker/0.9.3/current.xml",
)
self.__args.add_argument(
"--api-bind-address",
dest="apiBindAddress",
help="Bind address for the API server",
default="0.0.0.0",
)
self.__args.add_argument(
"--api-port",
dest="apiPort",
help="Bind port for the API server",
default=8080,
)
self.__args.add_argument(
"--digest-auth-key",
dest="digestAuthKey",
help="Digest Auth Key",
default="a565c27146791cfb",
)
self.__args.add_argument(
"--digest-auth-users",
dest="digestAuthUsers",
help="Digest Auth Users",
default={"rabe": "rabe"},
)
self.__args.add_argument(
"--debug",
type=bool,
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ isodate==0.6.1
pylast==4.4.0
lxml==4.7.1
requests==2.26.0
cherrypy==18.6.1
cloudevents==1.2.0

0 comments on commit a461def

Please sign in to comment.