Skip to content

Commit

Permalink
Merge pull request #1036 from mebsout/alain/lwt_stream_junk_available
Browse files Browse the repository at this point in the history
Lwt_stream: non-Lwt version of junk_old
  • Loading branch information
raphael-proust authored Nov 8, 2024
2 parents 5b59c71 + 30c9d85 commit 94ce0b4
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
16 changes: 9 additions & 7 deletions src/core/lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -649,25 +649,27 @@ let rec junk_while_s_rec node f s =

let junk_while_s f s = junk_while_s_rec s.node f s

let rec junk_old_rec node s =
let rec junk_available_rec node s =
if node == !(s.last) then
let thread = feed s in
match Lwt.state thread with
| Lwt.Return _ ->
junk_old_rec node s
junk_available_rec node s
| Lwt.Fail exn ->
Lwt.fail exn
raise exn
| Lwt.Sleep ->
Lwt.return_unit
()
else
match node.data with
| Some _ ->
consume s node;
junk_old_rec node.next s
junk_available_rec node.next s
| None ->
Lwt.return_unit
()

let junk_available s = junk_available_rec s.node s

let junk_old s = junk_old_rec s.node s
let junk_old s = Lwt.return (junk_available s)

let rec get_available_rec node acc s =
if node == !(s.last) then
Expand Down
9 changes: 7 additions & 2 deletions src/core/lwt_stream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ val junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.t
(** [junk_while f st] removes all elements at the beginning of the
streams which satisfy [f]. *)

val junk_old : 'a t -> unit Lwt.t
(** [junk_old st] removes all elements that are ready to be read
val junk_available : 'a t -> unit
(** [junk_available st] removes all elements that are ready to be read
without yielding from [st]. *)

val get_available : 'a t -> 'a list
Expand Down Expand Up @@ -263,6 +263,11 @@ val closed : 'a t -> unit Lwt.t
@since 2.6.0 *)

(** {3 Deprecated} *)

val junk_old : 'a t -> unit Lwt.t [@@deprecated "Use junk_available instead"]
(** @deprecated [junk_old st] is [Lwt.return (junk_available st)]. *)

(** {2 Stream transversal} *)

(** Note: all the following functions are destructive.
Expand Down
30 changes: 30 additions & 0 deletions test/core/test_lwt_stream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,36 @@ let suite = suite "lwt_stream" [
Lwt_stream.last_new stream >>= fun x ->
return (x = 3));

test_direct "junk_available"
(fun () ->
let s, push = Lwt_stream.create () in
let b0 = Lwt_stream.get_available s = [] in
let () = Lwt_stream.junk_available s in
let b1 = Lwt_stream.get_available s = [] in
let () = push (Some 1); push (Some 2); push (Some 4) in
let () = Lwt_stream.junk_available s in
let b2 = Lwt_stream.get_available s = [] in
let () = push (Some 66); push (Some 77); push (Some 99) in
let () = Lwt_stream.junk_available s in
let b3 = Lwt_stream.get_available s = [] in
b0 && b1 && b2 && b3);

test "junk_old"
(fun () ->
let open Lwt.Syntax in
let s, push = Lwt_stream.create () in
let b0 = Lwt_stream.get_available s = [] in
let* () = Lwt_stream.junk_old s in
let b1 = Lwt_stream.get_available s = [] in
let () = push (Some 1); push (Some 2); push (Some 4) in
let* () = Lwt_stream.junk_old s in
let b2 = Lwt_stream.get_available s = [] in
let () = push (Some 66); push (Some 77); push (Some 99) in
let* () = Lwt_stream.junk_old s in
let b3 = Lwt_stream.get_available s = [] in
Lwt.return (b0 && b1 && b2 && b3))
[@ocaml.alert "-deprecated"];

test "cancel push stream 1"
(fun () ->
let stream, _ = Lwt_stream.create () in
Expand Down

0 comments on commit 94ce0b4

Please sign in to comment.