zeek_websocket/
protocol.rs

1//! # Sans I/O-style protocol wrapper for the Zeek API
2//!
3//! Instead of providing a full-fledged client [`Binding`] encapsulates the Zeek WebSocket
4//! protocol [sans I/O style](https://sans-io.readthedocs.io/). It provides the following methods:
5//!
6//! - [`Binding::handle_incoming`] injects data received over a network connection into the
7//!   `Binding` object
8//! - [`Binding::receive_event`] gets the next event received from Zeek
9//! - [`Binding::publish_event`] to publish an event to Zeek
10//! - [`Binding::outgoing`] gets the next data payload for sending to Zeek
11//!
12//! A full client implementation will typically implement some form of event loop.
13//!
14//! ## Example
15//!
16//! ```no_run
17//! use zeek_websocket::*;
18//!
19//! # fn main() -> anyhow::Result<()> {
20//! // Open an underlying WebSocket connection to a Zeek endpoint.
21//! let (mut socket, _) = tungstenite::connect("ws://127.0.0.1:8080/v1/messages/json")?;
22//!
23//! // Create a connection.
24//! let mut conn = Binding::new(&["/ping"]);
25//!
26//! // The event loop.
27//! loop {
28//!     // If we have any outgoing messages send at least one.
29//!     if let Some(data) = conn.outgoing() {
30//!         socket.send(tungstenite::Message::binary(data))?;
31//!     }
32//!
33//!     // Receive the next message and handle it.
34//!     if let Ok(msg) = socket.read()?.try_into() {
35//!         conn.handle_incoming(msg);
36//!     }
37//!
38//!     // If we received a `ping` event, respond with a `pong`.
39//!     if let Some((topic, event)) = conn.receive_event()? {
40//!         if event.name == "ping" {
41//!             conn.publish_event(topic, Event::new("pong", event.args));
42//!         }
43//!     }
44//! }
45//! # }
46//! ```
47
48use std::collections::VecDeque;
49
50use thiserror::Error;
51use tungstenite::Bytes;
52use zeek_websocket_types::{Data, Event, Message, Value};
53
54use crate::types::Subscriptions;
55
56/// Protocol wrapper for a Zeek WebSocket connection.
57///
58/// See the [module documentation](crate::protocol) for an introduction
59pub struct Binding {
60    state: State,
61    subscriptions: Subscriptions,
62
63    inbox: Inbox,
64    outbox: Outbox,
65}
66
67enum State {
68    Subscribing,
69    Subscribed,
70}
71
72impl Binding {
73    /// Create a new `Binding` with the given [`Subscriptions`].
74    ///
75    /// ```
76    /// # use zeek_websocket::Binding;
77    /// let conn = Binding::new(&["topic"]);
78    /// ```
79    #[must_use]
80    pub fn new<S>(subscriptions: S) -> Self
81    where
82        S: Into<Subscriptions>,
83    {
84        let subscriptions = subscriptions.into();
85        Self {
86            state: State::Subscribing,
87            inbox: Inbox(VecDeque::new()),
88            outbox: Outbox(VecDeque::from([subscriptions.clone().into()])),
89            subscriptions,
90        }
91    }
92
93    /// Handle received message.
94    ///
95    /// Returns `true` if the data was converted to a [`Message`] and added
96    /// to the inbox, or `false` otherwise.
97    ///
98    /// # Errors
99    ///
100    /// - returns a [`ProtocolError::AlreadySubscribed`] if we saw an unexpected ACK.
101    /// - returns a [`ProtocolError::UnexpectedEventPayload`] if an unexpected event payload was seen
102    pub fn handle_incoming(&mut self, message: Message) -> Result<(), ProtocolError> {
103        match &message {
104            Message::Ack { .. } => match self.state {
105                State::Subscribing => {
106                    self.state = State::Subscribed;
107                }
108                State::Subscribed => return Err(ProtocolError::AlreadySubscribed),
109            },
110            Message::DataMessage {
111                data: Data::Other(unexpected),
112                ..
113            } => {
114                return Err(ProtocolError::UnexpectedEventPayload(unexpected.clone()));
115            }
116            _ => {
117                self.inbox.handle(message);
118            }
119        }
120
121        Ok(())
122    }
123
124    /// Get next data enqueued for sending.
125    pub fn outgoing(&mut self) -> Option<Bytes> {
126        self.outbox.next_data()
127    }
128
129    /// Get the next incoming event.
130    ///
131    /// # Errors
132    ///
133    /// - returns a [`ProtocolError::ZeekError`] if an error was received from Zeek
134    pub fn receive_event(&mut self) -> Result<Option<(String, Event)>, ProtocolError> {
135        if let Some(message) = self.inbox.next_message() {
136            match message {
137                Message::DataMessage { topic, data } => {
138                    let event = match data {
139                        Data::Event(event) => event,
140                        Data::Other(..) => unreachable!(), // Rejected in `handle_incoming`.
141                    };
142                    return Ok(Some((topic, event)));
143                }
144                Message::Error { code, context } => {
145                    return Err(ProtocolError::ZeekError { code, context });
146                }
147                Message::Ack { .. } => {
148                    unreachable!() // Never forwarded from `handle_incoming`.
149                }
150            }
151        }
152
153        Ok(None)
154    }
155
156    /// Enqueue a message for sending.
157    fn enqueue(&mut self, message: Message) -> Result<(), ProtocolError> {
158        match message {
159            Message::DataMessage { topic, data } => {
160                let is_subscribed = self
161                    .subscriptions
162                    .0
163                    .iter()
164                    .any(|s| s.as_str() == topic.as_str());
165
166                if is_subscribed {
167                    self.outbox.enqueue(Message::DataMessage { topic, data });
168                } else {
169                    return Err(ProtocolError::SendOnNonSubscribed(
170                        topic,
171                        self.subscriptions.clone(),
172                        data,
173                    ))?;
174                }
175            }
176            _ => self.outbox.enqueue(message),
177        }
178
179        Ok(())
180    }
181
182    /// Enqueue an event for sending.
183    ///
184    /// # Errors
185    ///
186    /// Will return [`ProtocolError::SendOnNonSubscribed`] if the binding is not subscribed to the
187    /// topic of the message.
188    pub fn publish_event<S>(&mut self, topic: S, event: Event) -> Result<(), ProtocolError>
189    where
190        S: Into<String>,
191    {
192        self.enqueue(Message::new_data(topic.into(), event))
193    }
194
195    /// Split the `Binding` into an [`Inbox`] and [`Outbox`].
196    ///
197    /// <div class="warning">
198    /// The returned <code>Inbox</code> and <code>Outbox</code> do not enforce correct use of the protocol.
199    /// </div>
200    #[must_use]
201    pub fn split(self) -> (Inbox, Outbox) {
202        (self.inbox, self.outbox)
203    }
204}
205
206/// Receiving side of a [`Binding`].
207pub struct Inbox(VecDeque<Message>);
208
209impl Inbox {
210    /// Handle received message.
211    pub fn handle(&mut self, message: Message) {
212        self.0.push_back(message);
213    }
214
215    /// Get next incoming message.
216    #[must_use]
217    pub fn next_message(&mut self) -> Option<Message> {
218        self.0.pop_front()
219    }
220
221    /// Get the next event.
222    ///
223    /// In contrast to [`Inbox::next_message`] this discards any non-`Event` messages which were received.
224    pub fn next_event(&mut self) -> Option<(String, Event)> {
225        while let Some(message) = self.next_message() {
226            if let Message::DataMessage {
227                topic,
228                data: Data::Event(event),
229            } = message
230            {
231                return Some((topic, event));
232            }
233        }
234
235        None
236    }
237}
238
239/// Sending side of [`Binding`].
240pub struct Outbox(VecDeque<tungstenite::Message>);
241
242impl Outbox {
243    /// Get next data enqueued for sending.
244    pub fn next_data(&mut self) -> Option<Bytes> {
245        self.0.pop_front().map(tungstenite::Message::into_data)
246    }
247
248    /// Enqueue a new message.
249    pub fn enqueue<M>(&mut self, message: M)
250    where
251        M: Into<tungstenite::Message>,
252    {
253        self.0.push_back(message.into());
254    }
255
256    /// Enqueue an event for sending.
257    pub fn enqueue_event<S>(&mut self, topic: S, event: Event)
258    where
259        S: Into<String>,
260    {
261        self.enqueue(Message::new_data(topic.into(), event));
262    }
263}
264
265/// Error enum for protocol-related errors.
266#[derive(Error, Debug, PartialEq)]
267pub enum ProtocolError {
268    /// received an ACK while already subscribed
269    #[error("received an ACK while already subscribed")]
270    AlreadySubscribed,
271
272    #[error("attempted to send on topic '{0}' but only subscribed to '{1}'")]
273    SendOnNonSubscribed(String, Subscriptions, Data),
274
275    #[error("Zeek error {code}: {context}")]
276    ZeekError { code: String, context: String },
277
278    #[error("unexpected event payload received")]
279    UnexpectedEventPayload(Value),
280}
281
282#[cfg(test)]
283mod test {
284    use crate::{
285        protocol::{Binding, ProtocolError},
286        types::{Data, Event, Message, Subscriptions, Value},
287    };
288
289    fn ack() -> Message {
290        Message::Ack {
291            endpoint: "mock".into(),
292            version: "0.1".into(),
293        }
294    }
295
296    #[test]
297    fn recv() {
298        let topic = "foo";
299
300        let mut conn = Binding::new(&[topic]);
301
302        // Nothing received yet,
303        assert_eq!(conn.inbox.next_message(), None);
304
305        // Handle subscription.
306        Subscriptions::try_from(tungstenite::Message::binary(conn.outgoing().unwrap())).unwrap();
307        conn.handle_incoming(ack().into()).unwrap();
308
309        // No new input received.
310        assert_eq!(conn.inbox.next_message(), None);
311        assert_eq!(conn.receive_event(), Ok(None));
312
313        // Receive a single event.
314        conn.handle_incoming(Message::new_data(topic, Event::new("ping", [(); 0])).into())
315            .unwrap();
316
317        assert!(matches!(
318            conn.inbox.next_message(),
319            Some(Message::DataMessage {
320                data: Data::Event(..),
321                ..
322            })
323        ));
324    }
325
326    #[test]
327    fn send() {
328        let mut conn = Binding::new(&["foo"]);
329
330        // Handle subscription.
331        Subscriptions::try_from(tungstenite::Message::binary(conn.outgoing().unwrap())).unwrap();
332        conn.handle_incoming(ack().into()).unwrap();
333
334        // Send an event.
335        conn.publish_event("foo", Event::new("ping", [(); 0]))
336            .unwrap();
337
338        // Event payload should be in outbox.
339        let msg =
340            Message::try_from(tungstenite::Message::binary(conn.outgoing().unwrap())).unwrap();
341        assert!(matches!(
342            msg,
343            Message::DataMessage {
344                data: Data::Event(..),
345                ..
346            }
347        ));
348    }
349
350    #[test]
351    fn split() {
352        let (mut inbox, mut outbox) = Binding::new(&["foo"]).split();
353
354        // Handle subscription.
355        Subscriptions::try_from(tungstenite::Message::binary(outbox.next_data().unwrap())).unwrap();
356        inbox.handle(ack().into());
357
358        assert!(matches!(inbox.next_message(), Some(Message::Ack { .. })));
359    }
360
361    #[test]
362    fn send_on_non_subscribed() {
363        let mut conn = Binding::new(&["foo"]);
364
365        // The initial message is the subscription to `["foo"]`.
366        let message = tungstenite::Message::binary(conn.outgoing().unwrap());
367        let subscription = Subscriptions::try_from(message).unwrap();
368        assert_eq!(subscription, Subscriptions::from(&["foo"]));
369
370        // Sent a message on `"bar"` to which we are not subscribed.
371        let event = Event::new("ping", ["ping on 'bar'"]);
372        assert_eq!(
373            conn.publish_event("bar", event.clone()),
374            Err(ProtocolError::SendOnNonSubscribed(
375                "bar".to_string(),
376                Subscriptions::from(&["foo"]),
377                Data::Event(event),
378            ))
379        );
380    }
381
382    #[test]
383    fn duplicate_ack() {
384        let mut conn = Binding::new(&["foo"]);
385
386        // Handle subscription. The call to `handle_incoming` consumes the ACK.
387        Subscriptions::try_from(tungstenite::Message::binary(conn.outgoing().unwrap())).unwrap();
388        conn.handle_incoming(ack().into()).unwrap();
389
390        // Detect if we see another, unexpected ACK.
391        assert_eq!(
392            conn.handle_incoming(ack().into()),
393            Err(ProtocolError::AlreadySubscribed)
394        );
395    }
396
397    #[test]
398    fn other_event_payload() {
399        let mut conn = Binding::new(&["foo"]);
400        conn.handle_incoming(ack()).unwrap();
401
402        let other = Message::new_data("foo", Value::Count(42));
403        assert_eq!(
404            conn.handle_incoming(other),
405            Err(ProtocolError::UnexpectedEventPayload(Value::Count(42)))
406        );
407    }
408
409    #[test]
410    fn next_incoming() {
411        let mut conn = Binding::new(Subscriptions(Vec::new()));
412
413        // Put an ACK and an event into the inbox.
414        let _ = conn.handle_incoming(ack());
415        let _ = conn.handle_incoming(Message::new_data("topic", Event::new("ping", [(); 0])));
416
417        // Event though we have an ACK in the inbox `receive_event`
418        // discards it and returns the event.
419        let (topic, event) = conn.receive_event().unwrap().unwrap();
420        assert_eq!(topic, "topic");
421        assert_eq!(event.name, "ping");
422
423        assert_eq!(conn.inbox.next_message(), None);
424    }
425
426    #[test]
427    fn error() {
428        let mut conn = Binding::new(&["foo"]);
429        conn.handle_incoming(ack()).unwrap();
430
431        conn.handle_incoming(Message::Error {
432            code: "code".to_string(),
433            context: "context".to_string(),
434        })
435        .unwrap();
436
437        assert_eq!(
438            conn.receive_event(),
439            Err(ProtocolError::ZeekError {
440                code: "code".to_string(),
441                context: "context".to_string()
442            })
443        );
444    }
445
446    #[test]
447    fn publish_event() {
448        let mut conn = Binding::new(&["foo"]);
449        // Consume the subscription.
450        conn.outgoing().unwrap();
451
452        conn.publish_event("foo", Event::new("ping", [(); 0]))
453            .unwrap();
454        let message =
455            Message::try_from(tungstenite::Message::binary(conn.outgoing().unwrap())).unwrap();
456        let Message::DataMessage {
457            topic,
458            data: Data::Event(event),
459        } = message
460        else {
461            panic!()
462        };
463        assert_eq!(topic, "foo");
464        assert_eq!(event.name, "ping");
465    }
466}