Skip to main content

tty_web/
session.rs

1//! Persistent terminal sessions with scrollback and lifecycle management.
2//!
3//! A [`Session`] wraps a [`Terminal`] and adds:
4//! - a configurable ring-buffer of recent output (scrollback, default 256 KiB),
5//! - client attach/detach tracking,
6//! - orphan detection (no clients for 60 s → auto-remove).
7//!
8//! [`SessionStore`] is the global session registry. Each session gets a reaper
9//! task that periodically checks for removal conditions.
10
11use std::collections::HashMap;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex, RwLock, Weak};
15use std::time::Instant;
16
17use tokio::sync::{broadcast, watch};
18
19use crate::terminal::Terminal;
20
21/// Default time without any attached clients before a session is reaped.
22pub const DEFAULT_ORPHAN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
23
24/// Return type of [`Session::attach`]: scrollback events, output stream,
25/// and window-size watch.
26pub type AttachResult = (
27    Vec<ScrollbackEvent>,
28    broadcast::Receiver<Vec<u8>>,
29    watch::Receiver<(u16, u16)>,
30);
31
32/// A scrollback event — either terminal output or a window-size change.
33///
34/// Storing events instead of raw bytes ensures that eviction never splits
35/// an escape sequence and that resize history is preserved for replay.
36#[derive(Clone, Debug, PartialEq)]
37pub enum ScrollbackEvent {
38    /// Raw terminal output bytes.
39    Output(Vec<u8>),
40    /// PTY window size changed (rows, cols).
41    WindowSize(u16, u16),
42}
43
44impl ScrollbackEvent {
45    /// Logical byte cost used for eviction accounting.
46    fn byte_cost(&self) -> usize {
47        match self {
48            Self::Output(data) => data.len(),
49            Self::WindowSize(_, _) => 4,
50        }
51    }
52}
53
54/// A persistent terminal session.
55///
56/// Tracks connected clients, buffers recent output for replay on reconnect,
57/// and detects when the session becomes orphaned.
58pub struct Session {
59    id: String,
60    pub terminal: Terminal,
61    scrollback: Mutex<VecDeque<ScrollbackEvent>>,
62    scrollback_bytes: Mutex<usize>,
63    scrollback_limit: usize,
64    clients: AtomicUsize,
65    detached_at: Mutex<Option<Instant>>,
66    window_size: watch::Sender<(u16, u16)>,
67    orphan_timeout: std::time::Duration,
68}
69
70impl Session {
71    /// Create a new session.
72    ///
73    /// `orphan_timeout` controls how long a session with no attached clients
74    /// survives before the reaper removes it (default: [`DEFAULT_ORPHAN_TIMEOUT`]).
75    pub fn new(
76        terminal: Terminal,
77        output_rx: broadcast::Receiver<Vec<u8>>,
78        scrollback_limit: usize,
79        orphan_timeout: std::time::Duration,
80    ) -> Arc<Self> {
81        let id = uuid::Uuid::new_v4().to_string();
82        let (ws_tx, _) = watch::channel((24, 80));
83        let session = Arc::new(Self {
84            id,
85            terminal,
86            scrollback: Mutex::new(VecDeque::new()),
87            scrollback_bytes: Mutex::new(0),
88            scrollback_limit,
89            clients: AtomicUsize::new(0),
90            detached_at: Mutex::new(None),
91            window_size: ws_tx,
92            orphan_timeout,
93        });
94
95        // Scrollback collector
96        let weak: Weak<Session> = Arc::downgrade(&session);
97        let mut rx = output_rx;
98        tokio::spawn(async move {
99            loop {
100                match rx.recv().await {
101                    Ok(data) => {
102                        let Some(s) = weak.upgrade() else {
103                            break;
104                        };
105                        s.push_scrollback(ScrollbackEvent::Output(data));
106                    }
107                    Err(broadcast::error::RecvError::Lagged(_)) => {
108                        continue;
109                    }
110                    Err(broadcast::error::RecvError::Closed) => break,
111                }
112            }
113        });
114
115        session
116    }
117
118    /// Session identifier.
119    pub fn id(&self) -> &str {
120        &self.id
121    }
122
123    /// Push an event into the scrollback ring buffer, evicting old events
124    /// when the byte budget is exceeded.
125    fn push_scrollback(&self, event: ScrollbackEvent) {
126        let cost = event.byte_cost();
127        let mut sb = self.scrollback.lock().unwrap();
128        let mut bytes = self.scrollback_bytes.lock().unwrap();
129        *bytes += cost;
130        sb.push_back(event);
131        while *bytes > self.scrollback_limit {
132            if let Some(old) = sb.pop_front() {
133                *bytes -= old.byte_cost();
134            } else {
135                break;
136            }
137        }
138    }
139
140    /// Attach a client: increment the counter, subscribe to live output, and
141    /// return the scrollback event log. The subscription and snapshot are taken
142    /// under the same lock so no output is lost.
143    pub fn attach(&self) -> AttachResult {
144        self.clients.fetch_add(1, Ordering::Relaxed);
145        *self.detached_at.lock().unwrap() = None;
146        let sb = self.scrollback.lock().unwrap();
147        let rx = self.terminal.subscribe();
148        let ws_rx = self.window_size.subscribe();
149        let events: Vec<ScrollbackEvent> = sb.iter().cloned().collect();
150        (events, rx, ws_rx)
151    }
152
153    /// Update the current PTY window size (broadcast to viewers) and record
154    /// the resize in the scrollback log so replay clients see it too.
155    pub fn set_window_size(&self, rows: u16, cols: u16) {
156        let _ = self.window_size.send((rows, cols));
157        self.push_scrollback(ScrollbackEvent::WindowSize(rows, cols));
158    }
159
160    /// Detach a client. When the last client detaches, the orphan timer starts.
161    pub fn detach(&self) {
162        if self.clients.fetch_sub(1, Ordering::Relaxed) == 1 {
163            *self.detached_at.lock().unwrap() = Some(Instant::now());
164        }
165    }
166
167    /// Number of currently attached clients.
168    pub fn client_count(&self) -> usize {
169        self.clients.load(Ordering::Relaxed)
170    }
171
172    fn is_orphaned(&self) -> bool {
173        self.clients.load(Ordering::Relaxed) == 0
174            && self
175                .detached_at
176                .lock()
177                .unwrap()
178                .is_some_and(|t| t.elapsed() >= self.orphan_timeout)
179    }
180}
181
182/// Thread-safe session registry keyed by UUID.
183pub struct SessionStore {
184    sessions: RwLock<HashMap<String, Arc<Session>>>,
185}
186
187impl SessionStore {
188    /// Create an empty session store.
189    pub fn new() -> Arc<Self> {
190        Arc::new(Self {
191            sessions: RwLock::new(HashMap::new()),
192        })
193    }
194
195    /// Register a session and spawn a reaper task that removes it when the
196    /// shell exits with no clients or the orphan timeout elapses.
197    pub fn insert(self: &Arc<Self>, session: Arc<Session>) {
198        let sid = session.id().to_owned();
199        self.sessions
200            .write()
201            .unwrap()
202            .insert(sid.clone(), session.clone());
203
204        // Reaper task: periodically checks for removal conditions
205        let store = Arc::downgrade(self);
206        let closed_rx = session.terminal.closed();
207        tokio::spawn(async move {
208            loop {
209                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
210                let Some(store) = store.upgrade() else { return };
211                let should_remove = {
212                    let sessions = store.sessions.read().unwrap();
213                    match sessions.get(&sid) {
214                        Some(s) => {
215                            s.is_orphaned()
216                                || (*closed_rx.borrow() && s.clients.load(Ordering::Relaxed) == 0)
217                        }
218                        None => return,
219                    }
220                };
221                if should_remove {
222                    store.sessions.write().unwrap().remove(&sid);
223                    tracing::info!("removed session {sid}");
224                    return;
225                }
226            }
227        });
228    }
229
230    /// Look up a session by ID.
231    pub fn get(&self, id: &str) -> Option<Arc<Session>> {
232        self.sessions.read().unwrap().get(id).cloned()
233    }
234
235    /// Returns `true` if there are no active sessions.
236    pub fn is_empty(&self) -> bool {
237        self.sessions.read().unwrap().is_empty()
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    const TEST_SCROLLBACK_LIMIT: usize = 256 * 1024;
246
247    fn spawn_session() -> Arc<Session> {
248        let (terminal, output_rx) = Terminal::spawn("/bin/sh", None).expect("spawn /bin/sh");
249        Session::new(
250            terminal,
251            output_rx,
252            TEST_SCROLLBACK_LIMIT,
253            DEFAULT_ORPHAN_TIMEOUT,
254        )
255    }
256
257    #[tokio::test]
258    async fn test_attach_detach_clients() {
259        let session = spawn_session();
260
261        let (_sb1, _rx1, _ws1) = session.attach();
262        assert_eq!(session.clients.load(Ordering::Relaxed), 1);
263
264        let (_sb2, _rx2, _ws2) = session.attach();
265        assert_eq!(session.clients.load(Ordering::Relaxed), 2);
266
267        session.detach();
268        assert_eq!(session.clients.load(Ordering::Relaxed), 1);
269    }
270
271    #[tokio::test]
272    async fn test_not_orphaned_with_clients() {
273        let session = spawn_session();
274        let (_sb, _rx, _ws) = session.attach();
275        assert!(!session.is_orphaned());
276    }
277
278    #[tokio::test]
279    async fn test_not_orphaned_immediately_after_detach() {
280        let session = spawn_session();
281        let (_sb, _rx, _ws) = session.attach();
282        session.detach();
283        assert!(!session.is_orphaned());
284    }
285
286    #[tokio::test]
287    async fn test_orphaned_after_timeout() {
288        let session = spawn_session();
289        let (_sb, _rx, _ws) = session.attach();
290        session.detach();
291        *session.detached_at.lock().unwrap() =
292            Some(Instant::now() - session.orphan_timeout - std::time::Duration::from_secs(1));
293        assert!(session.is_orphaned());
294    }
295
296    #[tokio::test]
297    async fn test_scrollback_captures_output() {
298        let session = spawn_session();
299
300        session
301            .terminal
302            .write(b"echo scrollback_test_marker\n".to_vec())
303            .await
304            .unwrap();
305
306        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
307
308        let (events, _rx, _ws) = session.attach();
309        let has_marker = events.iter().any(|e| match e {
310            ScrollbackEvent::Output(data) => {
311                String::from_utf8_lossy(data).contains("scrollback_test_marker")
312            }
313            _ => false,
314        });
315        assert!(has_marker, "scrollback should contain Output with marker");
316    }
317
318    #[tokio::test]
319    async fn test_session_store_insert_and_get() {
320        let store = SessionStore::new();
321        let session = spawn_session();
322        let id = session.id().to_owned();
323        store.insert(session);
324
325        assert!(store.get(&id).is_some());
326        assert!(store.get("nonexistent").is_none());
327    }
328
329    #[tokio::test]
330    async fn test_scrollback_eviction_removes_whole_events() {
331        let (terminal, output_rx) = Terminal::spawn("/bin/sh", None).expect("spawn");
332        let session = Session::new(terminal, output_rx, 10, DEFAULT_ORPHAN_TIMEOUT);
333
334        session.push_scrollback(ScrollbackEvent::Output(b"aaaaa".to_vec())); // 5
335        session.push_scrollback(ScrollbackEvent::Output(b"bbbbb".to_vec())); // 5, total 10
336        session.push_scrollback(ScrollbackEvent::Output(b"ccc".to_vec())); // 3, total 13 → evict
337
338        let sb = session.scrollback.lock().unwrap();
339        let bytes = *session.scrollback_bytes.lock().unwrap();
340        assert!(bytes <= 10, "bytes {bytes} should be within limit");
341        assert!(
342            sb.iter().all(|e| matches!(e, ScrollbackEvent::Output(_))),
343            "all events should be Output"
344        );
345        assert_ne!(
346            sb.front(),
347            Some(&ScrollbackEvent::Output(b"aaaaa".to_vec())),
348            "oldest event should have been evicted"
349        );
350    }
351
352    #[tokio::test]
353    async fn test_set_window_size_records_event() {
354        let (terminal, output_rx) = Terminal::spawn("/bin/sh", None).expect("spawn");
355        let session = Session::new(
356            terminal,
357            output_rx,
358            TEST_SCROLLBACK_LIMIT,
359            DEFAULT_ORPHAN_TIMEOUT,
360        );
361
362        session.set_window_size(40, 120);
363
364        let sb = session.scrollback.lock().unwrap();
365        let has_ws = sb
366            .iter()
367            .any(|e| matches!(e, ScrollbackEvent::WindowSize(40, 120)));
368        assert!(has_ws, "scrollback should contain WindowSize(40, 120)");
369    }
370}