mistralrs_server_core/
cached_responses.rs

1//! ## Response caching functionality for the Responses API.
2
3use anyhow::Result;
4use std::collections::HashMap;
5use std::sync::LazyLock;
6use std::sync::{Arc, RwLock};
7
8use crate::openai::{Message, ResponsesChunk, ResponsesObject};
9
10/// Trait for caching responses
11pub trait ResponseCache: Send + Sync {
12    /// Store a response object with the given ID
13    fn store_response(&self, id: String, response: ResponsesObject) -> Result<()>;
14
15    /// Retrieve a response object by ID
16    fn get_response(&self, id: &str) -> Result<Option<ResponsesObject>>;
17
18    /// Delete a response object by ID
19    fn delete_response(&self, id: &str) -> Result<bool>;
20
21    /// Store streaming chunks for a response
22    fn store_chunks(&self, id: String, chunks: Vec<ResponsesChunk>) -> Result<()>;
23
24    /// Retrieve streaming chunks for a response
25    fn get_chunks(&self, id: &str) -> Result<Option<Vec<ResponsesChunk>>>;
26
27    /// Store conversation history for a response
28    fn store_conversation_history(&self, id: String, messages: Vec<Message>) -> Result<()>;
29
30    /// Retrieve conversation history for a response
31    fn get_conversation_history(&self, id: &str) -> Result<Option<Vec<Message>>>;
32}
33
34/// In-memory implementation of ResponseCache
35pub struct InMemoryResponseCache {
36    responses: Arc<RwLock<HashMap<String, ResponsesObject>>>,
37    chunks: Arc<RwLock<HashMap<String, Vec<ResponsesChunk>>>>,
38    conversation_histories: Arc<RwLock<HashMap<String, Vec<Message>>>>,
39}
40
41impl InMemoryResponseCache {
42    /// Create a new in-memory cache
43    pub fn new() -> Self {
44        Self {
45            responses: Arc::new(RwLock::new(HashMap::new())),
46            chunks: Arc::new(RwLock::new(HashMap::new())),
47            conversation_histories: Arc::new(RwLock::new(HashMap::new())),
48        }
49    }
50}
51
52impl Default for InMemoryResponseCache {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl ResponseCache for InMemoryResponseCache {
59    fn store_response(&self, id: String, response: ResponsesObject) -> Result<()> {
60        let mut responses = self.responses.write().unwrap();
61        responses.insert(id, response);
62        Ok(())
63    }
64
65    fn get_response(&self, id: &str) -> Result<Option<ResponsesObject>> {
66        let responses = self.responses.read().unwrap();
67        Ok(responses.get(id).cloned())
68    }
69
70    fn delete_response(&self, id: &str) -> Result<bool> {
71        // IMPORTANT: Lock ordering must be maintained to prevent deadlocks.
72        // Order: responses -> chunks -> conversation_histories
73        // All methods that acquire multiple locks must follow this order.
74        //
75        // We acquire all locks before any modifications to ensure atomicity.
76        // The locks are released in reverse order when dropped at end of scope.
77        let mut responses = self.responses.write().unwrap();
78        let mut chunks = self.chunks.write().unwrap();
79        let mut histories = self.conversation_histories.write().unwrap();
80
81        let response_removed = responses.remove(id).is_some();
82        let chunks_removed = chunks.remove(id).is_some();
83        let history_removed = histories.remove(id).is_some();
84
85        Ok(response_removed || chunks_removed || history_removed)
86    }
87
88    fn store_chunks(&self, id: String, chunks: Vec<ResponsesChunk>) -> Result<()> {
89        let mut chunk_storage = self.chunks.write().unwrap();
90        chunk_storage.insert(id, chunks);
91        Ok(())
92    }
93
94    fn get_chunks(&self, id: &str) -> Result<Option<Vec<ResponsesChunk>>> {
95        let chunks = self.chunks.read().unwrap();
96        Ok(chunks.get(id).cloned())
97    }
98
99    fn store_conversation_history(&self, id: String, messages: Vec<Message>) -> Result<()> {
100        let mut histories = self.conversation_histories.write().unwrap();
101        histories.insert(id, messages);
102        Ok(())
103    }
104
105    fn get_conversation_history(&self, id: &str) -> Result<Option<Vec<Message>>> {
106        let histories = self.conversation_histories.read().unwrap();
107        Ok(histories.get(id).cloned())
108    }
109}
110
111/// Global response cache instance
112pub static RESPONSE_CACHE: LazyLock<Arc<dyn ResponseCache>> =
113    LazyLock::new(|| Arc::new(InMemoryResponseCache::new()));
114
115/// Helper function to get the global cache instance
116pub fn get_response_cache() -> Arc<dyn ResponseCache> {
117    RESPONSE_CACHE.clone()
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn test_in_memory_cache() {
126        let cache = InMemoryResponseCache::new();
127
128        // Create a test response
129        let response = ResponsesObject {
130            id: "test-id".to_string(),
131            object: "response",
132            created_at: 1234567890.0,
133            model: "test-model".to_string(),
134            status: "completed".to_string(),
135            output: vec![],
136            output_text: None,
137            usage: None,
138            error: None,
139            metadata: None,
140            instructions: None,
141            incomplete_details: None,
142        };
143
144        // Store and retrieve
145        cache
146            .store_response("test-id".to_string(), response.clone())
147            .unwrap();
148        let retrieved = cache.get_response("test-id").unwrap();
149        assert!(retrieved.is_some());
150        assert_eq!(retrieved.unwrap().id, "test-id");
151
152        // Delete
153        let deleted = cache.delete_response("test-id").unwrap();
154        assert!(deleted);
155        let retrieved = cache.get_response("test-id").unwrap();
156        assert!(retrieved.is_none());
157    }
158}