1use std::collections::HashMap;
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::{Arc, Mutex, RwLock, Weak};
15use std::time::Instant;
16
17use tokio::sync::{broadcast, watch};
18
19use crate::terminal::Terminal;
20
21pub const DEFAULT_ORPHAN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
23
24pub type AttachResult = (
27 Vec<ScrollbackEvent>,
28 broadcast::Receiver<Vec<u8>>,
29 watch::Receiver<(u16, u16)>,
30);
31
32#[derive(Clone, Debug, PartialEq)]
37pub enum ScrollbackEvent {
38 Output(Vec<u8>),
40 WindowSize(u16, u16),
42}
43
44impl ScrollbackEvent {
45 fn byte_cost(&self) -> usize {
47 match self {
48 Self::Output(data) => data.len(),
49 Self::WindowSize(_, _) => 4,
50 }
51 }
52}
53
54pub struct Session {
59 id: String,
60 pub terminal: Terminal,
61 scrollback: Mutex<VecDeque<ScrollbackEvent>>,
62 scrollback_bytes: Mutex<usize>,
63 scrollback_limit: usize,
64 clients: AtomicUsize,
65 detached_at: Mutex<Option<Instant>>,
66 window_size: watch::Sender<(u16, u16)>,
67 orphan_timeout: std::time::Duration,
68}
69
70impl Session {
71 pub fn new(
76 terminal: Terminal,
77 output_rx: broadcast::Receiver<Vec<u8>>,
78 scrollback_limit: usize,
79 orphan_timeout: std::time::Duration,
80 ) -> Arc<Self> {
81 let id = uuid::Uuid::new_v4().to_string();
82 let (ws_tx, _) = watch::channel((24, 80));
83 let session = Arc::new(Self {
84 id,
85 terminal,
86 scrollback: Mutex::new(VecDeque::new()),
87 scrollback_bytes: Mutex::new(0),
88 scrollback_limit,
89 clients: AtomicUsize::new(0),
90 detached_at: Mutex::new(None),
91 window_size: ws_tx,
92 orphan_timeout,
93 });
94
95 let weak: Weak<Session> = Arc::downgrade(&session);
97 let mut rx = output_rx;
98 tokio::spawn(async move {
99 loop {
100 match rx.recv().await {
101 Ok(data) => {
102 let Some(s) = weak.upgrade() else {
103 break;
104 };
105 s.push_scrollback(ScrollbackEvent::Output(data));
106 }
107 Err(broadcast::error::RecvError::Lagged(_)) => {
108 continue;
109 }
110 Err(broadcast::error::RecvError::Closed) => break,
111 }
112 }
113 });
114
115 session
116 }
117
118 pub fn id(&self) -> &str {
120 &self.id
121 }
122
123 fn push_scrollback(&self, event: ScrollbackEvent) {
126 let cost = event.byte_cost();
127 let mut sb = self.scrollback.lock().unwrap();
128 let mut bytes = self.scrollback_bytes.lock().unwrap();
129 *bytes += cost;
130 sb.push_back(event);
131 while *bytes > self.scrollback_limit {
132 if let Some(old) = sb.pop_front() {
133 *bytes -= old.byte_cost();
134 } else {
135 break;
136 }
137 }
138 }
139
140 pub fn attach(&self) -> AttachResult {
144 self.clients.fetch_add(1, Ordering::Relaxed);
145 *self.detached_at.lock().unwrap() = None;
146 let sb = self.scrollback.lock().unwrap();
147 let rx = self.terminal.subscribe();
148 let ws_rx = self.window_size.subscribe();
149 let events: Vec<ScrollbackEvent> = sb.iter().cloned().collect();
150 (events, rx, ws_rx)
151 }
152
153 pub fn set_window_size(&self, rows: u16, cols: u16) {
156 let _ = self.window_size.send((rows, cols));
157 self.push_scrollback(ScrollbackEvent::WindowSize(rows, cols));
158 }
159
160 pub fn detach(&self) {
162 if self.clients.fetch_sub(1, Ordering::Relaxed) == 1 {
163 *self.detached_at.lock().unwrap() = Some(Instant::now());
164 }
165 }
166
167 pub fn client_count(&self) -> usize {
169 self.clients.load(Ordering::Relaxed)
170 }
171
172 fn is_orphaned(&self) -> bool {
173 self.clients.load(Ordering::Relaxed) == 0
174 && self
175 .detached_at
176 .lock()
177 .unwrap()
178 .is_some_and(|t| t.elapsed() >= self.orphan_timeout)
179 }
180}
181
182pub struct SessionStore {
184 sessions: RwLock<HashMap<String, Arc<Session>>>,
185}
186
187impl SessionStore {
188 pub fn new() -> Arc<Self> {
190 Arc::new(Self {
191 sessions: RwLock::new(HashMap::new()),
192 })
193 }
194
195 pub fn insert(self: &Arc<Self>, session: Arc<Session>) {
198 let sid = session.id().to_owned();
199 self.sessions
200 .write()
201 .unwrap()
202 .insert(sid.clone(), session.clone());
203
204 let store = Arc::downgrade(self);
206 let closed_rx = session.terminal.closed();
207 tokio::spawn(async move {
208 loop {
209 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
210 let Some(store) = store.upgrade() else { return };
211 let should_remove = {
212 let sessions = store.sessions.read().unwrap();
213 match sessions.get(&sid) {
214 Some(s) => {
215 s.is_orphaned()
216 || (*closed_rx.borrow() && s.clients.load(Ordering::Relaxed) == 0)
217 }
218 None => return,
219 }
220 };
221 if should_remove {
222 store.sessions.write().unwrap().remove(&sid);
223 tracing::info!("removed session {sid}");
224 return;
225 }
226 }
227 });
228 }
229
230 pub fn get(&self, id: &str) -> Option<Arc<Session>> {
232 self.sessions.read().unwrap().get(id).cloned()
233 }
234
235 pub fn is_empty(&self) -> bool {
237 self.sessions.read().unwrap().is_empty()
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 const TEST_SCROLLBACK_LIMIT: usize = 256 * 1024;
246
247 fn spawn_session() -> Arc<Session> {
248 let (terminal, output_rx) = Terminal::spawn("/bin/sh", None).expect("spawn /bin/sh");
249 Session::new(
250 terminal,
251 output_rx,
252 TEST_SCROLLBACK_LIMIT,
253 DEFAULT_ORPHAN_TIMEOUT,
254 )
255 }
256
257 #[tokio::test]
258 async fn test_attach_detach_clients() {
259 let session = spawn_session();
260
261 let (_sb1, _rx1, _ws1) = session.attach();
262 assert_eq!(session.clients.load(Ordering::Relaxed), 1);
263
264 let (_sb2, _rx2, _ws2) = session.attach();
265 assert_eq!(session.clients.load(Ordering::Relaxed), 2);
266
267 session.detach();
268 assert_eq!(session.clients.load(Ordering::Relaxed), 1);
269 }
270
271 #[tokio::test]
272 async fn test_not_orphaned_with_clients() {
273 let session = spawn_session();
274 let (_sb, _rx, _ws) = session.attach();
275 assert!(!session.is_orphaned());
276 }
277
278 #[tokio::test]
279 async fn test_not_orphaned_immediately_after_detach() {
280 let session = spawn_session();
281 let (_sb, _rx, _ws) = session.attach();
282 session.detach();
283 assert!(!session.is_orphaned());
284 }
285
286 #[tokio::test]
287 async fn test_orphaned_after_timeout() {
288 let session = spawn_session();
289 let (_sb, _rx, _ws) = session.attach();
290 session.detach();
291 *session.detached_at.lock().unwrap() =
292 Some(Instant::now() - session.orphan_timeout - std::time::Duration::from_secs(1));
293 assert!(session.is_orphaned());
294 }
295
296 #[tokio::test]
297 async fn test_scrollback_captures_output() {
298 let session = spawn_session();
299
300 session
301 .terminal
302 .write(b"echo scrollback_test_marker\n".to_vec())
303 .await
304 .unwrap();
305
306 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
307
308 let (events, _rx, _ws) = session.attach();
309 let has_marker = events.iter().any(|e| match e {
310 ScrollbackEvent::Output(data) => {
311 String::from_utf8_lossy(data).contains("scrollback_test_marker")
312 }
313 _ => false,
314 });
315 assert!(has_marker, "scrollback should contain Output with marker");
316 }
317
318 #[tokio::test]
319 async fn test_session_store_insert_and_get() {
320 let store = SessionStore::new();
321 let session = spawn_session();
322 let id = session.id().to_owned();
323 store.insert(session);
324
325 assert!(store.get(&id).is_some());
326 assert!(store.get("nonexistent").is_none());
327 }
328
329 #[tokio::test]
330 async fn test_scrollback_eviction_removes_whole_events() {
331 let (terminal, output_rx) = Terminal::spawn("/bin/sh", None).expect("spawn");
332 let session = Session::new(terminal, output_rx, 10, DEFAULT_ORPHAN_TIMEOUT);
333
334 session.push_scrollback(ScrollbackEvent::Output(b"aaaaa".to_vec())); session.push_scrollback(ScrollbackEvent::Output(b"bbbbb".to_vec())); session.push_scrollback(ScrollbackEvent::Output(b"ccc".to_vec())); let sb = session.scrollback.lock().unwrap();
339 let bytes = *session.scrollback_bytes.lock().unwrap();
340 assert!(bytes <= 10, "bytes {bytes} should be within limit");
341 assert!(
342 sb.iter().all(|e| matches!(e, ScrollbackEvent::Output(_))),
343 "all events should be Output"
344 );
345 assert_ne!(
346 sb.front(),
347 Some(&ScrollbackEvent::Output(b"aaaaa".to_vec())),
348 "oldest event should have been evicted"
349 );
350 }
351
352 #[tokio::test]
353 async fn test_set_window_size_records_event() {
354 let (terminal, output_rx) = Terminal::spawn("/bin/sh", None).expect("spawn");
355 let session = Session::new(
356 terminal,
357 output_rx,
358 TEST_SCROLLBACK_LIMIT,
359 DEFAULT_ORPHAN_TIMEOUT,
360 );
361
362 session.set_window_size(40, 120);
363
364 let sb = session.scrollback.lock().unwrap();
365 let has_ws = sb
366 .iter()
367 .any(|e| matches!(e, ScrollbackEvent::WindowSize(40, 120)));
368 assert!(has_ws, "scrollback should contain WindowSize(40, 120)");
369 }
370}