Writing A Websocket Server

Asynchronously

by Andreas Monitzer

Overview

  1. What is a Websocket?
  2. Implementing a server with the ws library
  3. What is a Future?
  4. Implementing Websockets with Tokio
  5. HTTP Requests with Tokio

What are Websockets?

Long Polling

Layer 1 Timeout Timeout Timeout Timeout

HTTP/2 Server Push

ws Crate


              extern crate ws;
              mod server;

              fn main() {
                ws::listen("0.0.0.0:4200", |out| {
                  server::Server::new(out)
                }).unwrap();
              }
            

            use ws;
            struct Server {
              out: ws::Sender,
            };
            impl ws::Handler for Server {
              fn on_open(&mut self, shake: ws::Handshake)
                -> Result<(), ws::Error> {}
              fn on_request(&mut self, req: &ws::Request)
                -> ws::Result {}
              fn on_close(&mut self, code: ws::CloseCode, reason: &str) {}
              fn on_message(&mut self, msg: ws::Message)
                -> Result<(), ws::Error> {}
              fn on_error(&mut self, err: ws::Error) {}
            }
            

POSIX Event Loops

  • select
  • poll
  • epoll

The Old Ways Part 1


              fn main() {
                let socket = open_connection();
                while !closed {
                  poll socket;
                  if socket.can_read() {
                    socket.read();
                  }
                }
                socket.close();
              }
            

The Old Ways Part 2

Enter

https://tokio.rs

Promises in JavaScript


              let text_promise = fetch("foo").then(result => {
                if(result.status === 200) {
                  return send_packet("bar");
                }
              }).then(result => result.text, err => {
                console.error("Network error:", err);
              });
            

Tokio Event Loop


              extern crate tokio_core;

              fn main() {
                let mut core = tokio_core::reactor::Core::new().unwrap();
                // ...
                core.run(f).unwrap();
              }
            

Manipulating Futures

  • map
  • map_err
  • then
  • and_then
  • or_else
  • select
  • join

Type Deconstruction


              fn then<F, B>(self, f: F) -> Then<Self, B, F>
                where
                    F: FnOnce(Result<Self::Item, Self::Error>) -> B,
                    B: IntoFuture,
                    Self: Sized,

              pub struct Then<A, B, F>
                where A: Future, B: IntoFuture, { /* fields omitted */ }

              impl Future for Then<A, B, F>
                where A: Future, B: IntoFuture,
                  F: FnOnce(Result<A::Item, A::Error>) -> B,
            

            extern crate tokio_core;
            use tokio::net::TcpListener;

            fn main() {
              let mut core = tokio_core::reactor::Core::new().unwrap();
              let addr = "127.0.0.1:12345".parse().unwrap();
              let tcp = TcpListener::bind(&addr).unwrap();
              let server = tcp.incoming().for_each(|tcp| {
                // ...
                Ok(())
              });
              core.run(server).unwrap();
            }
            

Type Deconstruction


              pub fn incoming(self) -> Incoming
            

            pub struct Incoming { /* fields omitted */ }
            impl Stream for Incoming
              type Item = tokio::net::TcpStream
              type Error = tokio::io::Error

            pub trait Stream {
              fn into_future(self) -> StreamFuture<Self> { ... }
            }
            

Error Handling


              extern crate tokio_core;
              use tokio::net::TcpListener;

              fn main() {
                let mut core = tokio_core::reactor::Core::new().unwrap();
                let addr = "127.0.0.1:12345".parse().unwrap();
                let tcp = TcpListener::bind(&addr).unwrap();
                let server = tcp.incoming().for_each(|tcp| {
                  // ...
                  Ok(())
                }).map_err(|err| {
                  println!("server error {:?}", err);
                });
                core.run(server).unwrap();
              }
            

Streams

  • filter
  • filter_map
  • skip_while
  • take_while
  • for_each
  • forward
  • split

Websocket Crate

Websockets using Tokio


              let mut core = tokio_core::reactor::Core::new().unwrap();
              let handle = core.handle();
              let server = websocket::async::Server::bind(&addr, &handle).unwrap();
              let f = server.incoming()
                .for_each(|(upgrade, addr)| {
                  let handle_inner = handle.clone();
                  let f = upgrade.accept().and_then(move |(s, _)| {
                    // next slide
                  });
                  handle.spawn(f);
                });
              core.run(f).unwrap();
            

              let f = upgrade.accept().and_then(move |(s, _)| {
                let (sink, stream) = s.split();
                stream.take_while(|m| Ok(!m.is_close()))
                  .filter_map(|m| {
                    match m {
                      OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
                      OwnedMessage::Pong(_) => None,
                      _ => Some(m),
                    }
                  })
                  .forward(sink)
                  .and_then(|(_, sink)| {
                    sink.send(OwnedMessage::Close(None))
                  })
              });
            

              move |(s, _)| {
                let (sink, stream) = s.split();
                stream.take_while(|m| Ok(!m.is_close()))
                  .for_each(move |m| {
                    // next slide
                    Ok(())
                  })
                  .map(move |_| {
                    // connection is closed
                  })
              }
            

              move |m| {
                let tx_inner = tx.clone();
                match m {
                  OwnedMessage::Text(txt) => {
                    handle_inner.spawn(
                      tx_inner.send(OwnedMessage::Text(txt))
                      .map(|_| {})
                    );
                  },
                  _ => {},
                };
                Ok(())
              }
            

              let (tx, rx) = futures::sync::mpsc::channel(8);
              let mut tx_close = tx.clone();

              let writer = rx.forward(sink).and_then(|(_, sink)| {
                sink.send(OwnedMessage::Close(None))
              }).map(|_| {});
              handle_inner.spawn(writer);
            

HTTP Request

Tokio-curl


              let session = tokio_curl::Session::new(handle.clone());

              let mut req = curl::easy::Easy::new();
              req.url("https://www.rust-lang.org/").unwrap();
              req.write_function(|data| {
                  io::stdout().write_all(data).unwrap();
                  Ok(data.len())
              }).unwrap();
              handle_inner.spawn(session.perform(req).map(move |mut res| {
                match res.response_code() {
                  Ok(200) => { /* ... */ },
                  // ...
                }
                Ok(())
              });
            

Conclusion

  • Websockets
  • Event Loop
  • Futures
  • Tokio

Crates: ws, futures, tokio_core, websocket, tokio_curl