mistralrs_core/
harmony.rs

1//! OpenAI Harmony format parsing for GPT-OSS models.
2//!
3//! The Harmony format uses channels to separate different types of content:
4//! - `analysis`: Chain-of-thought reasoning (internal, not for end users)
5//! - `commentary`: Tool call preambles and explanations
6//! - `final`: User-facing response content
7//!
8//! Tool calls in Harmony are indicated by the `recipient` field being set to:
9//! - `functions.tool_name` for user-defined tools
10//! - `browser.search`, `browser.open`, `browser.find` for browser tool
11//! - `python` for python tool
12//!
13//! This module provides incremental parsing of Harmony-formatted token streams.
14
15use openai_harmony::{
16    chat::Role, load_harmony_encoding, HarmonyEncoding, HarmonyEncodingName, StreamableParser,
17};
18use std::sync::OnceLock;
19use uuid::Uuid;
20
21/// Extract the tool name from a recipient string.
22/// - "functions.my_tool" -> "my_tool"
23/// - "browser.search" -> "browser.search"
24/// - "python" -> "python"
25fn extract_tool_name(recipient: &str) -> String {
26    if let Some(name) = recipient.strip_prefix("functions.") {
27        name.to_string()
28    } else {
29        // For builtin tools like "browser.search" or "python", use the full recipient
30        recipient.to_string()
31    }
32}
33
34/// Channel types in Harmony format
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum HarmonyChannel {
37    /// Chain-of-thought reasoning (internal, not for end users)
38    Analysis,
39    /// Tool call preambles and explanations
40    Commentary,
41    /// User-facing response content
42    Final,
43}
44
45impl HarmonyChannel {
46    /// Parse a channel name string into a HarmonyChannel
47    pub fn parse(s: &str) -> Option<Self> {
48        match s {
49            "analysis" => Some(Self::Analysis),
50            "commentary" => Some(Self::Commentary),
51            "final" => Some(Self::Final),
52            _ => None,
53        }
54    }
55}
56
57/// Incremental delta from Harmony parsing
58#[derive(Debug, Clone, Default)]
59pub struct HarmonyDelta {
60    /// New analysis/reasoning content since last delta
61    pub analysis_delta: Option<String>,
62    /// New commentary content since last delta
63    pub commentary_delta: Option<String>,
64    /// New final response content since last delta
65    pub final_delta: Option<String>,
66    /// Currently active channel
67    pub current_channel: Option<HarmonyChannel>,
68}
69
70impl HarmonyDelta {
71    /// Check if this delta has any content
72    pub fn has_content(&self) -> bool {
73        self.analysis_delta.is_some()
74            || self.commentary_delta.is_some()
75            || self.final_delta.is_some()
76    }
77
78    /// Get reasoning content (analysis + commentary without tool calls)
79    pub fn reasoning_content(&self) -> Option<String> {
80        match (&self.analysis_delta, &self.commentary_delta) {
81            (Some(a), Some(c)) => Some(format!("{}{}", a, c)),
82            (Some(a), None) => Some(a.clone()),
83            (None, Some(c)) => Some(c.clone()),
84            (None, None) => None,
85        }
86    }
87}
88
89/// Accumulated content for each channel
90#[derive(Debug, Clone, Default)]
91pub struct HarmonyAccumulated {
92    /// Accumulated analysis content
93    pub analysis: String,
94    /// Accumulated commentary content
95    pub commentary: String,
96    /// Accumulated final content
97    pub final_content: String,
98}
99
100/// A tool call parsed from Harmony format
101#[derive(Debug, Clone)]
102pub struct HarmonyToolCall {
103    /// Unique ID for this tool call
104    pub id: String,
105    /// The function name (extracted from recipient like "functions.tool_name")
106    pub name: String,
107    /// The JSON arguments as a string
108    pub arguments: String,
109}
110
111impl HarmonyAccumulated {
112    /// Get all reasoning content (analysis + commentary)
113    pub fn reasoning_content(&self) -> Option<String> {
114        let combined = format!("{}{}", self.analysis, self.commentary);
115        if combined.is_empty() {
116            None
117        } else {
118            Some(combined)
119        }
120    }
121}
122
123/// Context for tracking Harmony parsing state within a sequence.
124///
125/// This wraps the openai-harmony crate's StreamableParser and provides
126/// delta extraction for streaming responses.
127pub struct HarmonyContext {
128    parser: StreamableParser,
129    // Track lengths for delta extraction (for parser content)
130    last_analysis_len: usize,
131    last_commentary_len: usize,
132    last_final_len: usize,
133    // Accumulated content
134    accumulated: HarmonyAccumulated,
135    // Track which channel we're currently in
136    channel: Option<HarmonyChannel>,
137    // Track positions for streaming deltas (what has been sent)
138    sent_reasoning_len: usize,
139    sent_final_len: usize,
140    // Tool call tracking
141    tool_calls: Vec<HarmonyToolCall>,
142    // Track current tool call being built (recipient, accumulated_args)
143    current_tool_call: Option<(String, String)>,
144    // Track how much of current tool call args have been sent
145    sent_tool_args_len: usize,
146}
147
148impl HarmonyContext {
149    /// Create a new Harmony parsing context
150    pub fn new() -> Result<Self, anyhow::Error> {
151        let encoding = get_harmony_encoding().clone();
152        let parser = StreamableParser::new(encoding, Some(Role::Assistant))
153            .map_err(|e| anyhow::anyhow!("Failed to create Harmony parser: {:?}", e))?;
154        Ok(Self {
155            parser,
156            last_analysis_len: 0,
157            last_commentary_len: 0,
158            last_final_len: 0,
159            accumulated: HarmonyAccumulated::default(),
160            channel: None,
161            sent_reasoning_len: 0,
162            sent_final_len: 0,
163            tool_calls: Vec::new(),
164            current_tool_call: None,
165            sent_tool_args_len: 0,
166        })
167    }
168
169    /// Process a token and return any new delta content
170    pub fn process_token(&mut self, token_id: u32) -> HarmonyDelta {
171        // process() returns Result, ignore errors for robustness
172        let _ = self.parser.process(token_id);
173        self.extract_delta()
174    }
175
176    /// Extract delta since last call
177    fn extract_delta(&mut self) -> HarmonyDelta {
178        let mut delta = HarmonyDelta::default();
179
180        // Get current channel from parser
181        if let Some(channel_str) = self.parser.current_channel() {
182            if let Some(channel) = HarmonyChannel::parse(&channel_str) {
183                self.channel = Some(channel);
184                delta.current_channel = Some(channel);
185            }
186        }
187
188        // Check for tool calls via recipient field
189        // Recipient is set to "functions.tool_name" when making a tool call
190        let current_recipient = self.parser.current_recipient();
191
192        // Get current content and extract delta based on channel
193        // current_content() returns Result<String>
194        if let Ok(content) = self.parser.current_content() {
195            // Check if this is a tool call
196            // Tool calls have recipients like:
197            // - "functions.tool_name" for user-defined tools
198            // - "browser.search", "browser.open", "browser.find" for browser tool
199            // - "python" for python tool
200            if let Some(ref recipient) = current_recipient {
201                let is_tool_call = recipient.starts_with("functions.")
202                    || recipient.starts_with("browser.")
203                    || recipient == "python";
204
205                if is_tool_call {
206                    // This is a tool call - track it
207                    // Check if this is the same tool call or a different one
208                    let is_same_tool_call = self
209                        .current_tool_call
210                        .as_ref()
211                        .is_some_and(|(existing, _)| existing == recipient);
212
213                    if is_same_tool_call {
214                        // Same tool call, update arguments
215                        if let Some((_, ref mut args)) = self.current_tool_call {
216                            *args = content.clone();
217                        }
218                    } else {
219                        // Different tool call or no current tool call
220                        // Finalize previous tool call if any
221                        if let Some((prev_recipient, prev_args)) = self.current_tool_call.take() {
222                            let prev_name = extract_tool_name(&prev_recipient);
223                            self.tool_calls.push(HarmonyToolCall {
224                                id: format!("call_{}", Uuid::new_v4()),
225                                name: prev_name,
226                                arguments: prev_args,
227                            });
228                        }
229                        // Start new tool call
230                        self.current_tool_call = Some((recipient.clone(), content.clone()));
231                        self.sent_tool_args_len = 0;
232                    }
233                    // Don't accumulate tool call content to final_content
234                    return delta;
235                }
236            }
237
238            // Not a tool call, handle normally by channel
239            match self.channel {
240                Some(HarmonyChannel::Analysis) => {
241                    if content.len() > self.last_analysis_len {
242                        let new_content = content[self.last_analysis_len..].to_string();
243                        self.accumulated.analysis.push_str(&new_content);
244                        delta.analysis_delta = Some(new_content);
245                        self.last_analysis_len = content.len();
246                    }
247                }
248                Some(HarmonyChannel::Commentary) => {
249                    if content.len() > self.last_commentary_len {
250                        let new_content = content[self.last_commentary_len..].to_string();
251                        self.accumulated.commentary.push_str(&new_content);
252                        delta.commentary_delta = Some(new_content);
253                        self.last_commentary_len = content.len();
254                    }
255                }
256                Some(HarmonyChannel::Final) | None => {
257                    // Final channel OR no channel marker - treat content as final.
258                    // This handles cases where the model responds without Harmony
259                    // channel markers (e.g., after tool call results).
260                    if content.len() > self.last_final_len {
261                        let new_content = content[self.last_final_len..].to_string();
262                        self.accumulated.final_content.push_str(&new_content);
263                        delta.final_delta = Some(new_content);
264                        self.last_final_len = content.len();
265                    }
266                }
267            }
268        }
269
270        delta
271    }
272
273    /// Get the currently active channel
274    pub fn current_channel(&self) -> Option<HarmonyChannel> {
275        self.channel
276    }
277
278    /// Get all accumulated content
279    pub fn accumulated(&self) -> &HarmonyAccumulated {
280        &self.accumulated
281    }
282
283    /// Get accumulated reasoning content (analysis + commentary)
284    pub fn reasoning_content(&self) -> Option<String> {
285        self.accumulated.reasoning_content()
286    }
287
288    /// Get accumulated final content
289    pub fn final_content(&self) -> Option<String> {
290        if self.accumulated.final_content.is_empty() {
291            None
292        } else {
293            Some(self.accumulated.final_content.clone())
294        }
295    }
296
297    /// Get the reasoning delta since last call (for streaming).
298    /// Returns new reasoning content that hasn't been sent yet.
299    pub fn get_reasoning_delta(&mut self) -> Option<String> {
300        let reasoning = format!(
301            "{}{}",
302            self.accumulated.analysis, self.accumulated.commentary
303        );
304        if reasoning.len() > self.sent_reasoning_len {
305            let delta = reasoning[self.sent_reasoning_len..].to_string();
306            self.sent_reasoning_len = reasoning.len();
307            if delta.is_empty() {
308                None
309            } else {
310                Some(delta)
311            }
312        } else {
313            None
314        }
315    }
316
317    /// Get the final content delta since last call (for streaming).
318    /// Returns new final content that hasn't been sent yet.
319    pub fn get_final_delta(&mut self) -> Option<String> {
320        if self.accumulated.final_content.len() > self.sent_final_len {
321            let delta = self.accumulated.final_content[self.sent_final_len..].to_string();
322            self.sent_final_len = self.accumulated.final_content.len();
323            if delta.is_empty() {
324                None
325            } else {
326                Some(delta)
327            }
328        } else {
329            None
330        }
331    }
332
333    /// Signal end of stream to the parser
334    pub fn process_eos(&mut self) {
335        let _ = self.parser.process_eos();
336
337        // Finalize any pending tool call
338        if let Some((recipient, args)) = self.current_tool_call.take() {
339            let name = extract_tool_name(&recipient);
340            self.tool_calls.push(HarmonyToolCall {
341                id: format!("call_{}", Uuid::new_v4()),
342                name,
343                arguments: args,
344            });
345        }
346    }
347
348    /// Get the recipient (for tool calls) if any
349    pub fn current_recipient(&self) -> Option<String> {
350        self.parser.current_recipient()
351    }
352
353    /// Check if there's a tool call in progress
354    pub fn has_tool_call(&self) -> bool {
355        self.current_tool_call.is_some() || !self.tool_calls.is_empty()
356    }
357
358    /// Get all completed tool calls
359    pub fn get_tool_calls(&self) -> &[HarmonyToolCall] {
360        &self.tool_calls
361    }
362
363    /// Get the current tool call being built (if any)
364    /// Returns (recipient, arguments_so_far)
365    pub fn get_current_tool_call(&self) -> Option<(&str, &str)> {
366        self.current_tool_call
367            .as_ref()
368            .map(|(recipient, args)| (recipient.as_str(), args.as_str()))
369    }
370
371    /// Finalize any pending tool call and return all tool calls.
372    /// This should be called when the sequence is done.
373    /// Note: This takes ownership of the tool calls, so calling it twice
374    /// will return an empty vector the second time.
375    pub fn finalize_tool_calls(&mut self) -> Vec<HarmonyToolCall> {
376        // Finalize any pending tool call
377        if let Some((recipient, args)) = self.current_tool_call.take() {
378            let name = extract_tool_name(&recipient);
379            self.tool_calls.push(HarmonyToolCall {
380                id: format!("call_{}", Uuid::new_v4()),
381                name,
382                arguments: args,
383            });
384        }
385        // Take ownership to prevent duplicate returns if called multiple times
386        std::mem::take(&mut self.tool_calls)
387    }
388}
389
390/// Global harmony encoding (lazy loaded)
391static HARMONY_ENCODING: OnceLock<HarmonyEncoding> = OnceLock::new();
392
393/// Pre-initialize the Harmony encoding. This MUST be called from a non-async
394/// context (e.g., during pipeline loading) before any async code runs.
395/// The openai-harmony crate uses reqwest::blocking which creates its own
396/// tokio runtime, so it cannot be called from within an existing async context.
397pub fn prewarm_harmony_encoding() {
398    let _ = HARMONY_ENCODING.get_or_init(|| {
399        load_harmony_encoding(HarmonyEncodingName::HarmonyGptOss)
400            .expect("Failed to load Harmony encoding")
401    });
402}
403
404/// Check if the Harmony encoding has been initialized.
405pub fn is_harmony_encoding_ready() -> bool {
406    HARMONY_ENCODING.get().is_some()
407}
408
409fn get_harmony_encoding() -> &'static HarmonyEncoding {
410    HARMONY_ENCODING
411        .get()
412        .expect("Harmony encoding not initialized. Call prewarm_harmony_encoding() first.")
413}
414
415/// Check if a chat template uses Harmony format by looking for Harmony markers.
416///
417/// Returns true if the template contains Harmony-specific tokens like
418/// `<|channel|>`, `<|start|>`, `<|message|>`, or `<|end|>`.
419pub fn is_harmony_template(template: &str) -> bool {
420    // Check for the most distinctive Harmony marker
421    if template.contains("<|channel|>") {
422        return true;
423    }
424
425    // Check for the combination of start/message/end which is characteristic of Harmony
426    template.contains("<|start|>")
427        && template.contains("<|message|>")
428        && template.contains("<|end|>")
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434
435    #[test]
436    fn test_is_harmony_template() {
437        // Should detect Harmony templates
438        assert!(is_harmony_template(
439            "<|start|>system<|message|>content<|end|>"
440        ));
441        assert!(is_harmony_template(
442            "some prefix <|channel|>analysis<|message|>thinking"
443        ));
444
445        // Should not detect non-Harmony templates
446        assert!(!is_harmony_template("<|im_start|>system<|im_end|>"));
447        assert!(!is_harmony_template("regular chat template"));
448    }
449
450    #[test]
451    fn test_harmony_channel_from_str() {
452        assert_eq!(
453            HarmonyChannel::parse("analysis"),
454            Some(HarmonyChannel::Analysis)
455        );
456        assert_eq!(
457            HarmonyChannel::parse("commentary"),
458            Some(HarmonyChannel::Commentary)
459        );
460        assert_eq!(HarmonyChannel::parse("final"), Some(HarmonyChannel::Final));
461        assert_eq!(HarmonyChannel::parse("unknown"), None);
462    }
463
464    #[test]
465    fn test_harmony_delta_has_content() {
466        let empty = HarmonyDelta::default();
467        assert!(!empty.has_content());
468
469        let with_analysis = HarmonyDelta {
470            analysis_delta: Some("thinking".to_string()),
471            ..Default::default()
472        };
473        assert!(with_analysis.has_content());
474
475        let with_final = HarmonyDelta {
476            final_delta: Some("response".to_string()),
477            ..Default::default()
478        };
479        assert!(with_final.has_content());
480    }
481
482    #[test]
483    fn test_harmony_delta_reasoning_content() {
484        let both = HarmonyDelta {
485            analysis_delta: Some("thinking ".to_string()),
486            commentary_delta: Some("about tools".to_string()),
487            ..Default::default()
488        };
489        assert_eq!(
490            both.reasoning_content(),
491            Some("thinking about tools".to_string())
492        );
493
494        let only_analysis = HarmonyDelta {
495            analysis_delta: Some("just thinking".to_string()),
496            ..Default::default()
497        };
498        assert_eq!(
499            only_analysis.reasoning_content(),
500            Some("just thinking".to_string())
501        );
502
503        let none = HarmonyDelta::default();
504        assert_eq!(none.reasoning_content(), None);
505    }
506}