Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: evented track input #140

Merged
merged 52 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
a461def
feat: evented track input
hairmare Jan 1, 2022
43e38ab
feat: tested api using werkzeug
hairmare Jan 2, 2022
64fb9a7
fix: better error handling with response
hairmare Jan 2, 2022
f6d1a00
test: daemon and input_handler
hairmare Jan 2, 2022
86766fb
test: input observer basics
hairmare Jan 2, 2022
462b455
lint: remove import
hairmare Jan 2, 2022
2e44b80
doc: add initial events doc
hairmare Jan 2, 2022
f71d2cb
docs: add a bunch of TODO comments
hairmare Jan 3, 2022
93d7f75
test: no hardcoded user for api test
hairmare Jan 3, 2022
f843a2c
Merge branch 'main' of github.com:radiorabe/nowplaying into feat/even…
hairmare Jan 8, 2022
49e291a
Merge branch 'main' into feat/evented-track-input
hairmare Jan 8, 2022
2e9af2b
fix: post-merge pre-commit issues
hairmare Jan 8, 2022
dc99352
Merge branch 'main' into feat/evented-track-input
hairmare Jan 21, 2022
a4c9911
Merge branch 'main' into feat/evented-track-input
hairmare Jan 21, 2022
ccb097e
Merge branch 'main' into feat/evented-track-input
hairmare Jan 29, 2022
74dabf8
Merge branch 'main' into feat/evented-track-input
hairmare Jan 30, 2022
c85c2e6
fix: update tests
hairmare Jan 30, 2022
1c44ee4
test: track parsing
hairmare Jan 30, 2022
e95860c
fix: lint
hairmare Jan 30, 2022
7fb5abb
test: assert webhook uses queue
hairmare Jan 30, 2022
b4fbb04
docs: add CloudEvents info to README
hairmare Jan 30, 2022
20df23a
docs: further README cleanup
hairmare Jan 30, 2022
73f5ee3
Merge branch 'main' into feat/evented-track-input
hairmare Jan 30, 2022
ab1b561
docs: Update README.md
hairmare Jan 30, 2022
fd3427c
docs: Update README.md
hairmare Jan 30, 2022
ae4f3bc
fix: defensive auth check
hairmare Jan 30, 2022
74aae70
Merge branch 'main' into feat/evented-track-input
hairmare Jan 30, 2022
ce9173e
Merge branch 'main' into feat/evented-track-input
hairmare Jan 31, 2022
828fc98
Merge branch 'main' into feat/evented-track-input
hairmare Jan 31, 2022
97629bc
Merge branch 'main' into feat/evented-track-input
hairmare Jan 31, 2022
ffa5279
Merge branch 'main' into feat/evented-track-input
hairmare Feb 4, 2022
ff948cc
fix: proper signal handling diring stop for api
hairmare Feb 5, 2022
2a29e18
test: ApiServer.stop_server method
hairmare Feb 5, 2022
47be15e
Merge branch 'main' into feat/evented-track-input
hairmare Feb 7, 2022
740c631
Merge branch 'main' into feat/evented-track-input
hairmare Feb 14, 2022
b42bdbc
test: simplify queue init where possible
hairmare Feb 14, 2022
aaee42d
Merge branch 'main' into feat/evented-track-input
hairmare Mar 3, 2022
b0cd63d
Merge branch 'main' into feat/evented-track-input
hairmare Mar 22, 2022
f78880c
chore: pyupgrade
hairmare Mar 22, 2022
0f3e4ad
Merge branch 'main' into feat/evented-track-input
hairmare Apr 2, 2022
be3746c
Merge branch 'main' into feat/evented-track-input
hairmare Apr 23, 2022
75c5faa
Merge branch 'main' into feat/evented-track-input
hairmare Jun 11, 2022
c668a73
Merge branch 'main' into feat/evented-track-input
hairmare Aug 21, 2022
d2574c5
chore: update Werkzeug from 2.0.2 to 2.2.2
hairmare Aug 21, 2022
c8f2ddf
chore: update more deps
hairmare Aug 21, 2022
49e8428
docs: update help for input-file arg
hairmare Aug 21, 2022
6774526
Merge branch 'main' into feat/evented-track-input
hairmare Nov 28, 2022
f54d097
Merge branch 'main' into feat/evented-track-input
hairmare Dec 13, 2022
77519e1
chore: implement validating ids as crid
hairmare Dec 13, 2022
905acc1
chore: updates
hairmare Dec 13, 2022
c0fbe7d
Merge branch 'main' into feat/evented-track-input
hairmare Dec 13, 2022
300fbf9
Merge branch 'main' into feat/evented-track-input
hairmare Dec 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ build/
.eggs/
dist/
*.egg-info/
htmlcov/
140 changes: 140 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,106 @@ This repo contains the tool we use to grab, aggregate and publish show, artist a

The nowplaying project grabs info from RaBes playout solution and publishes them to broadcast vectors like DAB+ and Webstreams.

It also takes care of generating our live ticker at songticker.rabe.ch.

## Overview

The nowplaying daemon takes various sources into account:

- The RaBe playout solution (via the [virtual-saemubox](https://github.com/radiorabe/virtual-saemubox) project)
- Input from [Klangbecken](https://github.com/radiorabe/klangbecken)

It then make an informed decision as to what should be our leading PAD data and pushes this to it's track handlers for the following sinks:

- DAB+ (via the [ODR-EncoderManager](https://github.com/Opendigitalradio/ODR-EncoderManager) API)
- Webstream by pushing to our [Icecast](https://icecast.org/) instances
- Statically hosted XML output for browsers on songticker.rabe.ch

The sources are currently individually implemented and are being replaced with generic [RaBe CloudEvents](https://github.com/radiorabe/event-spec) based sources. In many places the legacy system is underdocumented and this documentation documents the new system.

## Usage

TBD

### RaBe CloudEvents

The nowplaying projects receives httpd [RaBe CloudEvents](https://github.com/radiorabe/event-spec) on a dedicated web service. It reacts to them depending on the event type and source

It supports the following event types:

- `ch.rabe.api.events.track.v1.trackStarted`
- `ch.rabe.api.events.track.v1.trackFinished`

An example `trackStarted` event looks like this:

```json
{
"specversion": "1.0",
"type": "ch.rabe.api.events.track.v1.trackStarted",
"source": "<source>",
"subject": null,
"id": "<id>",
"time": "2021-12-28T19:31:00Z",
"datacontenttype": "application/json",
"data": {
"item.artist": "hairmare fusion sounds collective",
"item.title": "C L O U D E V E N T W A V E",
"item.length": 36000
}
}
```

It can be sent to the nowplaying service using cURL as follows:

```bash
curl -vvv -u rabe:rabe -H 'Content-Type: application/cloudevents+json' -X POST -d '@event.json' localhost:8080/webhook
```

In most cases the use of a cloudevents-sdk is recommended. The following example is based on the same [python-sdk](https://github.com/cloudevents/sdk-python) nowplaying uses.

```python
import requests

from cloudevents.http import CloudEvent, to_structured

def send_event(url, username, password):
# This data defines a cloudevent
attributes = {
"specversion": "1.0",
# as defined by the events-spec repo
"type": "ch.rabe.api.events.track.v1.trackStarted",
# for klangbecken the github link is always used as source (as per events-spec)
"source": "https://github.com/radiorabe/klangbecken",
# this should be generated and could/should point to a real
# URL on either https://klangbecken.service.int.example.org
# using a `crid://` URL based on the upcoming crid-spec.
"id": "uri:demo:12345",
}
data = {
"item.title": "Track Title",
"item.artist": "Artist",
# length in seconds, optional if you also implement sending the
# not "completely specced yet" trackFinished event
"item.length": 60,
}

event = CloudEvent(attributes, data)
headers, body = to_structured(event)

# send and print event
requests.post(url, headers=headers, data=body, auth=(username, password))
print(f"Sent {event['id']} from {event['source']} with {event.data}")

if __name__ == "__main__":
# local config
url = "https://nowplaying.service.int.example.org/webhook"
username = "rabe"
password = "rabe"

# do work
send_event(url, username, password)
```

## Contributing

### pre-commit hook
Expand All @@ -21,3 +119,45 @@ pre-commit install
```bash
pytest
```

## Release Management

The CI/CD setup uses semantic commit messages following the [conventional commits standard](https://www.conventionalcommits.org/en/v1.0.0/).
There is a GitHub Action in [.github/workflows/semantic-release.yaml](./.github/workflows/semantic-release.yaml)
that uses [go-semantic-commit](https://go-semantic-release.xyz/) to create new
releases.

The commit message should be structured as follows:

```
<type>[optional scope]: <description>

[optional body]

[optional footer(s)]
```

The commit contains the following structural elements, to communicate intent to the consumers of your library:

1. **fix:** a commit of the type `fix` patches gets released with a PATCH version bump
1. **feat:** a commit of the type `feat` gets released as a MINOR version bump
1. **BREAKING CHANGE:** a commit that has a footer `BREAKING CHANGE:` gets released as a MAJOR version bump
1. types other than `fix:` and `feat:` are allowed and don't trigger a release

If a commit does not contain a conventional commit style message you can fix
it during the squash and merge operation on the PR.

Once a commit has landed on the `main` branch a release will be created and automatically published to [pypi](https://pypi.org/)
using the GitHub Action in [.github/workflows/pypi.yaml](./.github/workflows/pypi.yaml) which uses [twine](https://twine.readthedocs.io/)
to publish the package to pypi. Additionaly a container image based on the [RaBe Python Minimal Base Image](https://github.com/radiorabe/container-image-python-minimal) is built and published using [Docker build-push Action](https://github.com/docker/build-push-action).
This is managed in [.github/workflows/release.yaml](./.github/workflows/release.yaml).

## License

This application is free software: you can redistribute it and/or modify it under
the terms of the GNU Affero General Public License as published by the Free
Software Foundation, version 3 of the License.

## Copyright

Copyright (c) 2022 [Radio Bern RaBe](http://www.rabe.ch)
121 changes: 121 additions & 0 deletions nowplaying/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import json
import logging
from queue import Queue

import cherrypy
from cloudevents.exceptions import GenericException as CloudEventException
from cloudevents.http import from_http
from werkzeug.exceptions import BadRequest, HTTPException, UnsupportedMediaType
from werkzeug.routing import Map, Rule
from werkzeug.wrappers import Request, Response

logger = logging.getLogger(__name__)

_RABE_CLOUD_EVENTS_SUBS = (
"ch.rabe.api.events.track.v1.trackStarted",
"ch.rabe.api.events.track.v1.trackFinished",
)
_RABE_CLOUD_EVENTS_SUPPORTED_MEDIA_TYPES = (
"application/cloudevents+json",
"application/json",
)


class ApiServer:
"""The API server."""

def __init__(self, options, event_queue: Queue, realm: str = "nowplaying"):
self.options = options
self.event_queue = event_queue
self.realm = realm

self.url_map = Map([Rule("/webhook", endpoint="webhook")])

def run_server(self):
"""Run the API server."""
if self.options.debug:
from werkzeug.serving import run_simple

self._server = run_simple(
self.options.apiBindAddress,
self.options.apiPort,
self,
use_debugger=True,
use_reloader=True,
)
else: # pragma: no cover
cherrypy.tree.graft(self, "/")
cherrypy.server.unsubscribe()

self._server = cherrypy._cpserver.Server()

self._server.socket_host = self.options.apiBindAddress
self._server.socket_port = self.options.apiPort

self._server.subscribe()

cherrypy.engine.start()
cherrypy.engine.block()

def stop_server(self):
"""Stop the server."""
self._server.stop()
cherrypy.engine.exit()

def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)

def wsgi_app(self, environ, start_response):
request = Request(environ)
auth = request.authorization
if auth and self.check_auth(auth.username, auth.password):
response = self.dispatch_request(request)
else:
response = self.auth_required(request)
return response(environ, start_response)

def check_auth(self, username, password):
return (
username in self.options.apiAuthUsers
and self.options.apiAuthUsers[username] == password
)

def auth_required(self, request):
return Response(
"Could not verify your access level for that URL.\n"
"You have to login with proper credentials",
401,
{"WWW-Authenticate": f'Basic realm="{self.realm}"'},
)

def dispatch_request(self, request):
adapter = self.url_map.bind_to_environ(request.environ)
try:
endpoint, values = adapter.match()
return getattr(self, f"on_{endpoint}")(request, **values)
except HTTPException as e:
return Response(
json.dumps(e.description),
e.code,
{"Content-Type": "application/json"},
)

def on_webhook(self, request):
"""Receive a CloudEvent and put it into the event queue."""
logger.warning("Received a webhook")
if (
request.headers.get("Content-Type")
not in _RABE_CLOUD_EVENTS_SUPPORTED_MEDIA_TYPES
):
raise UnsupportedMediaType()
try:
event = from_http(request.headers, request.data)
except CloudEventException as error:
raise BadRequest(description=f"{error}")

logger.info("Received event: %s", event)

if event["type"] in _RABE_CLOUD_EVENTS_SUBS:
self.event_queue.put(event)

return Response(status="200 Event Received")
41 changes: 40 additions & 1 deletion nowplaying/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import signal
import sys
import time
from queue import Queue
from threading import Thread

from cloudevents.http.event import CloudEvent

from .api import ApiServer
from .input import observer as inputObservers
from .input.handler import InputHandler
from .misc.saemubox import SaemuBox
Expand All @@ -24,6 +29,7 @@ class NowPlayingDaemon:
def __init__(self, options):
self.options = options

self.event_queue = Queue()
self.saemubox = SaemuBox(self.options.saemubox_ip)

def main(self): # pragma: no cover
Expand All @@ -39,10 +45,42 @@ def main(self): # pragma: no cover
logger.exception("Error: %s", e)
sys.exit(-1)

_thread = Thread(target=self._main_loop, args=(input_handler,))
_thread.daemon = True
_thread.start()

self._start_apiserver() # blocking

def _start_apiserver(self):
"""Start the API server."""
self._api = ApiServer(self.options, self.event_queue)
self._api.run_server() # blocking

def _stop_apiserver(self):
"""Stop the API server."""
logger.info("Stopping API server")
self._api.stop_server()

def _main_loop(self, input_handler: InputHandler): # pragma: no cover
"""
Run main loop of the daemon.

Should be run in a thread.
"""
logger.info("Starting main loop")
while True:
try:
saemubox_id = self.poll_saemubox()

while not self.event_queue.empty():
logger.debug("Queue size: %i" % self.event_queue.qsize())
event: CloudEvent = self.event_queue.get()
logger.info(
"Handling update from event: %s, source: %s"
% (event["type"], event["source"])
)
input_handler.update(saemubox_id, event)

input_handler.update(saemubox_id)
except Exception as e:
logger.exception("Error: %s", e)
Expand All @@ -59,6 +97,7 @@ def signal_handler(self, signum, frame):

if signum == signal.SIGINT or signum == signal.SIGKILL:
logger.info("Signal %i caught, terminating." % signum)
self._stop_apiserver()
sys.exit(os.EX_OK)

def get_track_handler(self): # pragma: no cover
Expand Down Expand Up @@ -108,7 +147,7 @@ def get_input_handler(self): # pragma: no cover

return handler

def poll_saemubox(self): # pragma: no cover
def poll_saemubox(self) -> int: # pragma: no cover
"""
Poll Saemubox for new data.

Expand Down
8 changes: 5 additions & 3 deletions nowplaying/input/handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import logging.handlers

from cloudevents.http.event import CloudEvent

from .observer import InputObserver

logger = logging.getLogger(__name__)
Expand All @@ -13,7 +15,7 @@ class InputHandler:
"""

def __init__(self):
self._observers = []
self._observers: list[InputObserver] = []

def register_observer(self, observer: InputObserver):
logger.info("Registering InputObserver '%s'" % observer.__class__.__name__)
Expand All @@ -22,12 +24,12 @@ def register_observer(self, observer: InputObserver):
def remove_observer(self, observer: InputObserver):
self._observers.remove(observer)

def update(self, saemubox_id: int):
def update(self, saemubox_id: int, event: CloudEvent = None):
for observer in self._observers:
logger.debug("Sending update event to observer %s" % observer.__class__)

try:
observer.update(saemubox_id)
observer.update(saemubox_id, event)
except Exception as e: # pragma: no cover
# TODO test once replaced with non generic exception
logger.error("InputObserver (%s): %s" % (observer.__class__, e))
Expand Down
Loading