Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisamin committed Feb 7, 2023
1 parent aae6772 commit 664ecb4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
2 changes: 2 additions & 0 deletions ripe/atlas/tools/helpers/tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class CSVRenderer(Renderer):
Renderer which outputs comma-separated values, where strings are always
enclosed in double quotes, and literal double quotes are written as "".
"""

dialect = "excel"

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -252,6 +253,7 @@ class TabRenderer(Renderer):
Renderer which outputs tab-separated values, where literal tabs are replaced
with spaces.
"""

def get_header(self) -> Iterable[str]:
yield self.get_line({"values": dict((k, k) for k in self.columns.keys())})

Expand Down
21 changes: 14 additions & 7 deletions ripe/atlas/tools/streaming.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2016 RIPE NCC
# Copyright (c) 2023 RIPE NCC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -12,30 +12,37 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Iterator, Optional

from ripe.atlas.cousteau import AtlasStream
from ripe.atlas.sagan import Result

from .settings import conf


class Stream(object):
class Stream:
"""
Iterable wrapper for AtlasStream that yields sagan Results up to a
specified capture limit and/or timeout
"""

def __init__(self, pk, capture_limit=None, timeout=None):
def __init__(
self,
pk: int,
capture_limit: Optional[int] = None,
timeout: Optional[float] = None,
):
self.pk = pk
self.capture_limit = capture_limit
self.timeout = timeout
self.num_received = 0

def __iter__(self):
def __iter__(self) -> Iterator[Result]:
stream = AtlasStream(base_url=conf["stream-base-url"])
stream.connect()
stream.subscribe("result", msm=self.pk)
for event_name, payload in stream.iter(
seconds=self.timeout
):
for event_name, payload in stream.iter(seconds=self.timeout):
if event_name == "atlas_result":
parsed = Result.get(
payload,
Expand Down

0 comments on commit 664ecb4

Please sign in to comment.