mistralrs_core/
sequence.rs

1use crate::{
2    get_mut_arcmutex, get_mut_group,
3    harmony::HarmonyContext,
4    paged_attention::BlockRef,
5    pipeline::{text_models_inputs_processor::PagedAttentionMeta, LayerCaches},
6    response::{ChatCompletionChunkResponse, Choice, ChunkChoice, Response, SYSTEM_FINGERPRINT},
7    sampler::{Logprobs, Sampler},
8    think_tags::ThinkTagContext,
9    AudioInput, ChatCompletionResponse, Usage,
10};
11use crate::{
12    paged_attention::{BlockEngineSequence, LogicalTokenBlock},
13    pipeline::{DiffusionGenerationParams, KvCache},
14    response::CompletionChoice,
15    tools::ToolCallingMatcher,
16    CompletionChunkChoice, CompletionChunkResponse, CompletionResponse, ImageChoice,
17    ImageGenerationResponse, ImageGenerationResponseFormat,
18};
19use candle_core::Tensor;
20use std::{
21    fmt::Display,
22    hash::{DefaultHasher, Hash, Hasher},
23    sync::{Arc, RwLock},
24    time::{SystemTime, UNIX_EPOCH},
25};
26use tokio::sync::{
27    mpsc::{error::SendError, Sender},
28    Mutex, MutexGuard,
29};
30
31#[derive(Clone, Copy, PartialEq, Debug)]
32pub enum StopReason {
33    Eos,
34    StopTok(u32),
35    Length(usize),
36    ModelLength(usize),
37    StopString {
38        stop_string_idx: usize,
39        completion_bytes_pos: usize,
40    },
41    Canceled,
42    GeneratedImage,
43    GeneratedSpeech,
44    ToolCalls,
45}
46
47impl Display for StopReason {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            StopReason::Eos => write!(f, "stop"),
51            StopReason::Length(_) | StopReason::ModelLength(_) => write!(f, "length"),
52            StopReason::StopTok(_) | StopReason::StopString { .. } => write!(f, "stop"),
53            StopReason::Canceled => write!(f, "canceled"),
54            StopReason::GeneratedImage => write!(f, "generated_image"),
55            StopReason::GeneratedSpeech => write!(f, "generated_speech"),
56            StopReason::ToolCalls => write!(f, "tool_calls"),
57        }
58    }
59}
60
61#[derive(Clone, Copy, PartialEq, Debug)]
62pub enum SequenceState {
63    Done(StopReason),
64    RunningPrompt,
65    RunningCompletion,
66    Waiting,
67    Error,
68    RunningPrefillPrompt,
69    // For PagedAttention:
70    FinishedAborted,
71    FinishedIgnored,
72    Swapped,
73}
74
75pub enum SequenceRecognizer {
76    Llguidance(Box<llguidance::Matcher>),
77    None,
78}
79
80enum SequenceCustomMetadata {
81    PagedAttention {
82        logical_token_blocks: Vec<LogicalTokenBlock>,
83        physical_blocks_prefill: Option<Vec<BlockRef>>,
84        block_size: usize,
85    },
86    None,
87}
88
89macro_rules! blocks_to_add_new_tok {
90    ($logical_token_blocks:expr) => {{
91        let last = $logical_token_blocks.last();
92        if !last.is_some_and(|last| last.is_full() || last.is_empty()) {
93            // If we have space
94            0
95        } else {
96            1
97        }
98    }};
99}
100
101pub(crate) fn util_append_token_to_blocks(
102    tok: usize,
103    logical_token_blocks: &mut Vec<LogicalTokenBlock>,
104    block_size: usize,
105) {
106    let last = logical_token_blocks.last_mut();
107    match last {
108        Some(last) => {
109            last.append_token_id(tok);
110        }
111        None => {
112            logical_token_blocks.push(LogicalTokenBlock::new(block_size));
113            // SAFETY: We just pushed a block, so last_mut() will return Some
114            logical_token_blocks
115                .last_mut()
116                .expect("just pushed a block, vector cannot be empty")
117                .append_token_id(tok);
118        }
119    }
120    // SAFETY: At this point, we either had a block or just created one above,
121    // so the vector is guaranteed to be non-empty.
122    if logical_token_blocks
123        .last()
124        .expect("logical_token_blocks should not be empty after appending")
125        .is_full()
126    {
127        logical_token_blocks.push(LogicalTokenBlock::new(block_size));
128    }
129}
130
131impl SequenceCustomMetadata {
132    fn append_token_to_blocks(&mut self, tok: usize) {
133        match self {
134            Self::PagedAttention {
135                logical_token_blocks,
136                physical_blocks_prefill: _,
137                block_size,
138            } => {
139                util_append_token_to_blocks(tok, logical_token_blocks, *block_size);
140            }
141            Self::None => (),
142        }
143    }
144
145    fn pop_token_from_blocks(&mut self) {
146        match self {
147            Self::PagedAttention {
148                logical_token_blocks,
149                physical_blocks_prefill: _,
150                block_size: _,
151            } => {
152                let last = logical_token_blocks.last_mut().unwrap();
153                last.pop_token();
154            }
155            Self::None => (),
156        }
157    }
158
159    fn append_tokens_to_blocks(&mut self, toks: Vec<usize>) {
160        for tok in toks {
161            self.append_token_to_blocks(tok);
162        }
163    }
164
165    fn remove_tokens_from_blocks(&mut self, n: usize) {
166        for _ in 0..n {
167            self.pop_token_from_blocks();
168        }
169    }
170}
171
172#[derive(Clone, Copy)]
173pub enum SeqStepType {
174    PromptAndDecode,
175    OneShot,
176}
177
178pub struct SequenceImages {
179    images: Vec<image::DynamicImage>,
180    hashes: Vec<u64>,
181}
182
183#[derive(Clone)]
184pub struct SequenceAudios {
185    audios: Vec<AudioInput>,
186    hashes: Vec<u64>,
187}
188
189impl SequenceAudios {
190    fn new(input_audios: Vec<AudioInput>) -> Self {
191        let hashes = input_audios.iter().map(|a| {
192            let mut hasher = DefaultHasher::new();
193            for s in &a.samples {
194                s.to_bits().hash(&mut hasher);
195            }
196            a.sample_rate.hash(&mut hasher);
197            hasher.finish()
198        });
199        Self {
200            hashes: hashes.collect(),
201            audios: input_audios,
202        }
203    }
204
205    fn clone_audios(&self) -> Vec<AudioInput> {
206        self.audios.clone()
207    }
208
209    fn audios(&self) -> &[AudioInput] {
210        &self.audios
211    }
212
213    fn audios_mut(&mut self) -> &mut Vec<AudioInput> {
214        &mut self.audios
215    }
216
217    fn hashes(&self) -> &[u64] {
218        &self.hashes
219    }
220
221    fn keep_num_audios(&mut self, audios_to_keep: usize) {
222        if self.audios.len() > audios_to_keep {
223            let start = self.audios.len() - audios_to_keep;
224            self.audios = self.audios[start..].to_vec();
225            // Do not do this because we need all the hashes later in the prefix cacher.
226            // self.hashes = self.hashes[start..].to_vec();
227        }
228    }
229}
230
231impl SequenceImages {
232    fn new(input_images: Vec<image::DynamicImage>) -> Self {
233        let hashes = input_images.iter().map(|x| {
234            let mut hasher = DefaultHasher::new();
235            x.as_bytes().hash(&mut hasher);
236            hasher.finish()
237        });
238        Self {
239            hashes: hashes.collect(),
240            images: input_images,
241        }
242    }
243
244    fn clone_images(&self) -> Vec<image::DynamicImage> {
245        self.images.clone()
246    }
247
248    fn images(&self) -> &[image::DynamicImage] {
249        &self.images
250    }
251
252    fn images_mut(&mut self) -> &mut Vec<image::DynamicImage> {
253        &mut self.images
254    }
255
256    fn hashes(&self) -> &[u64] {
257        &self.hashes
258    }
259
260    fn keep_num_images(&mut self, images_to_keep: usize) {
261        if self.images.len() > images_to_keep {
262            let start = self.images.len() - images_to_keep;
263            self.images = self.images[start..].to_vec();
264            // Do not do this because we need all the hashes later in the prefix cacher.
265            // self.hashes = self.hashes[start..].to_vec();
266        }
267    }
268}
269
270// Holds all multimodal (vision/diffusion) data for a Sequence.
271pub struct MultimodalData {
272    pub input_images: Option<SequenceImages>,
273    pub input_audios: Option<SequenceAudios>,
274    pub cached_pixel_values: Option<Tensor>,
275    pub cached_img_thw: Option<Tensor>,
276    pub cached_vid_thw: Option<Tensor>,
277    pub has_changed_prompt: bool,
278    pub image_gen_response_format: Option<ImageGenerationResponseFormat>,
279    pub diffusion_params: Option<DiffusionGenerationParams>,
280}
281
282impl MultimodalData {
283    pub fn new(
284        input_images: Option<Vec<image::DynamicImage>>,
285        input_audios: Option<Vec<AudioInput>>,
286        image_gen_response_format: Option<ImageGenerationResponseFormat>,
287        diffusion_params: Option<DiffusionGenerationParams>,
288    ) -> Self {
289        MultimodalData {
290            input_images: input_images.map(SequenceImages::new),
291            input_audios: input_audios.map(SequenceAudios::new),
292            cached_pixel_values: None,
293            cached_img_thw: None,
294            cached_vid_thw: None,
295            has_changed_prompt: false,
296            image_gen_response_format,
297            diffusion_params,
298        }
299    }
300
301    pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
302        if self.has_changed_prompt {
303            if let Some(input_images) = self.input_images.as_mut() {
304                let mut images = Vec::new();
305                std::mem::swap(&mut images, input_images.images_mut());
306                Some(images)
307            } else {
308                None
309            }
310        } else {
311            self.input_images.as_ref().map(|imgs| imgs.clone_images())
312        }
313    }
314
315    pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
316        self.input_images.as_ref().map(|imgs| imgs.clone_images())
317    }
318
319    pub fn images(&self) -> Option<&[image::DynamicImage]> {
320        self.input_images.as_ref().map(|imgs| imgs.images())
321    }
322
323    pub fn image_hashes(&self) -> Option<&[u64]> {
324        self.input_images.as_ref().map(|imgs| imgs.hashes())
325    }
326
327    pub fn has_images(&self) -> bool {
328        self.input_images
329            .as_ref()
330            .is_some_and(|imgs| !imgs.images().is_empty())
331    }
332
333    pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
334        if self.has_changed_prompt {
335            if let Some(input_audios) = self.input_audios.as_mut() {
336                let mut audios = Vec::new();
337                std::mem::swap(&mut audios, input_audios.audios_mut());
338                Some(audios)
339            } else {
340                None
341            }
342        } else {
343            self.input_audios.as_ref().map(|imgs| imgs.clone_audios())
344        }
345    }
346
347    pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
348        self.input_audios.as_ref().map(|a| a.clone_audios())
349    }
350
351    pub fn audios(&self) -> Option<&[AudioInput]> {
352        self.input_audios.as_ref().map(|a| a.audios())
353    }
354
355    pub fn audio_hashes(&self) -> Option<&[u64]> {
356        self.input_audios.as_ref().map(|a| a.hashes())
357    }
358
359    pub fn has_audios(&self) -> bool {
360        self.input_audios
361            .as_ref()
362            .is_some_and(|a| !a.audios().is_empty())
363    }
364
365    pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
366        if let Some(auds) = self.input_audios.as_mut() {
367            auds.keep_num_audios(audios_to_keep)
368        }
369    }
370
371    pub fn keep_num_images(&mut self, images_to_keep: usize) {
372        if let Some(imgs) = self.input_images.as_mut() {
373            imgs.keep_num_images(images_to_keep)
374        }
375    }
376
377    pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
378        self.image_gen_response_format
379    }
380
381    pub fn diffusion_params(&self) -> Option<DiffusionGenerationParams> {
382        self.diffusion_params.clone()
383    }
384}
385
386pub struct Sequence {
387    // Metadata, const
388    id: usize,
389    prompt_len: usize,
390    max_len: Option<usize>,
391    timestamp: u128,
392    sampler: Arc<Sampler>,
393    stop_tokens: Vec<u32>,
394    stop_strings: Vec<String>,
395    return_logprobs: bool,
396    responder: Sender<Response>,
397    response_index: usize,
398    creation_time: u64,
399    prompt: String,
400    sequence_stepping_type: SeqStepType,
401    pub(crate) return_raw_logits: bool,
402    token_offset: usize,
403    eos_tokens: Vec<u32>,
404
405    // Multimodal data (images, diffusion settings, pixel caches)
406    pub multimodal: MultimodalData,
407
408    // Completion requests
409    suffix: Option<String>,
410    prefix: Option<String>,
411
412    // Speculative
413    is_tmp: bool,
414
415    // Prefix caching
416    prefill_prompt_toks: Option<Vec<u32>>,
417    /// Number of tokens at the start of the prompt that are cached (KV already computed).
418    /// These tokens should be skipped during prefill.
419    prefix_cache_len: usize,
420
421    // Cache
422    normal_cache: Vec<Option<KvCache>>,
423    normal_draft_cache: Vec<Option<KvCache>>,
424    scaling_cache: Option<Tensor>,
425    cache: LayerCaches,
426    draft_cache: LayerCaches,
427    xlora_cache: Option<LayerCaches>,
428    /// For hybrid models: index into the Mamba state pool
429    mamba_state_idx: Option<usize>,
430
431    // Preallocated KV cache (k,v)
432    seq_preallocated_cache: Option<(Tensor, Tensor)>,
433
434    // Mutables
435    tokens: Vec<u32>,
436    logprobs: Vec<Logprobs>,
437    cumulative_logprob: f32,
438    last_logprob: f32,
439    last_completion_bytes_len: usize,
440    last_is_done: Option<StopReason>,
441    completion_bytes: Vec<u8>,
442    stream_idx: usize,
443    pub recognizer: SequenceRecognizer,
444    scheduling_urgency: usize, // The number of passes since scheduling
445    waitlisted_count: usize, // Used in PagedAttention to alert the user when a sequence repeatedly cannot be scheduled
446
447    // GPU things
448    pub prompt_tok_per_sec: f32,
449    pub prompt_timestamp: Option<u128>,
450    pub total_prompt_time: Option<u128>,
451    group: Arc<Mutex<SequenceGroup>>,
452    state: RwLock<SequenceState>,
453
454    // Custom backend metadata
455    custom_metadata: SequenceCustomMetadata,
456
457    // Tool calls
458    pub tools: Option<Arc<ToolCallingMatcher>>,
459
460    // Harmony format parsing context (for GPT-OSS models)
461    harmony_context: Option<HarmonyContext>,
462
463    // Think tag parsing context (for models using <think>...</think> tags)
464    think_tag_context: Option<ThinkTagContext>,
465}
466
467impl BlockEngineSequence for Sequence {
468    fn blocks_to_add_new_tok(&self) -> usize {
469        match &self.custom_metadata {
470            SequenceCustomMetadata::PagedAttention {
471                logical_token_blocks,
472                physical_blocks_prefill: _,
473                block_size: _,
474            } => {
475                blocks_to_add_new_tok!(logical_token_blocks)
476            }
477            SequenceCustomMetadata::None => unreachable!(),
478        }
479    }
480
481    fn get_id(&self) -> usize {
482        self.id
483    }
484
485    fn logical_token_blocks(&self) -> &[LogicalTokenBlock] {
486        match &self.custom_metadata {
487            SequenceCustomMetadata::PagedAttention {
488                logical_token_blocks,
489                physical_blocks_prefill: _,
490                block_size: _,
491            } => logical_token_blocks,
492            SequenceCustomMetadata::None => unreachable!(),
493        }
494    }
495
496    fn take_physical_blocks_prefill(&mut self) -> Option<Vec<BlockRef>> {
497        match &mut self.custom_metadata {
498            SequenceCustomMetadata::PagedAttention {
499                logical_token_blocks: _,
500                physical_blocks_prefill,
501                block_size: _,
502            } => physical_blocks_prefill.take(),
503            SequenceCustomMetadata::None => None,
504        }
505    }
506
507    fn increment_waitlist_count(&mut self) -> usize {
508        let prev = self.waitlisted_count;
509        self.waitlisted_count += 1;
510        prev
511    }
512
513    fn set_prefix_cache_len(&mut self, len: usize) {
514        self.prefix_cache_len = len;
515    }
516
517    fn block_size(&self) -> usize {
518        match &self.custom_metadata {
519            SequenceCustomMetadata::PagedAttention { block_size, .. } => *block_size,
520            SequenceCustomMetadata::None => unreachable!(),
521        }
522    }
523}
524
525impl Sequence {
526    #[allow(clippy::too_many_arguments)]
527    pub fn new_waiting(
528        tokens: Vec<u32>,
529        prompt: String,
530        id: usize,
531        timestamp: u128,
532        layers: usize,
533        responder: Sender<Response>,
534        sampler: Sampler,
535        stop_tokens: Vec<u32>,
536        stop_strings: Vec<String>,
537        max_len: Option<usize>,
538        return_logprobs: bool,
539        is_xlora: bool,
540        group: Arc<Mutex<SequenceGroup>>,
541        response_index: usize,
542        creation_time: u64,
543        recognizer: SequenceRecognizer,
544        suffix: Option<String>,
545        prefix: Option<String>,
546        input_images: Option<Vec<image::DynamicImage>>,
547        input_audios: Option<Vec<AudioInput>>,
548        // Paged attention
549        block_size: Option<usize>,
550        //
551        tools: Option<Arc<ToolCallingMatcher>>,
552        image_gen_response_format: Option<ImageGenerationResponseFormat>,
553        sequence_stepping_type: SeqStepType,
554        diffusion_params: Option<DiffusionGenerationParams>,
555        // Preallocated KV cache (k,v)
556        seq_preallocated_cache: Option<(Tensor, Tensor)>,
557        //
558        return_raw_logits: bool,
559        eos_tokens: Vec<u32>,
560    ) -> Self {
561        let prompt_len = tokens.len();
562        let mut custom_metadata = if let Some(block_size) = block_size {
563            SequenceCustomMetadata::PagedAttention {
564                logical_token_blocks: Vec::new(),
565                physical_blocks_prefill: None,
566                block_size,
567            }
568        } else {
569            SequenceCustomMetadata::None
570        };
571        custom_metadata
572            .append_tokens_to_blocks(tokens.iter().map(|x| *x as usize).collect::<Vec<_>>());
573        Self {
574            tokens,
575            prompt,
576            logprobs: Vec::new(),
577            prompt_len,
578            id,
579            timestamp,
580            state: RwLock::new(SequenceState::Waiting),
581            normal_cache: vec![None; layers],
582            normal_draft_cache: vec![None; layers],
583            cache: vec![None; layers],
584            draft_cache: vec![None; layers],
585            xlora_cache: if is_xlora {
586                Some(vec![None; layers])
587            } else {
588                None
589            },
590            mamba_state_idx: None,
591            seq_preallocated_cache,
592            responder,
593            sampler: sampler.into(),
594            stop_tokens,
595            stop_strings,
596            max_len,
597            return_logprobs,
598            prompt_tok_per_sec: 0.,
599            prompt_timestamp: None,
600            group,
601            scaling_cache: None,
602            response_index,
603            creation_time,
604            recognizer,
605            prefill_prompt_toks: None,
606            prefix_cache_len: 0,
607            suffix,
608            prefix,
609            cumulative_logprob: 0.,
610            completion_bytes: Vec::new(),
611            stream_idx: 0,
612            last_completion_bytes_len: 0,
613            last_logprob: 0.0,
614            last_is_done: None,
615            is_tmp: false,
616            scheduling_urgency: 0,
617            // Multimodal data
618            multimodal: MultimodalData::new(
619                input_images,
620                input_audios,
621                image_gen_response_format,
622                diffusion_params,
623            ),
624            custom_metadata,
625            tools,
626            sequence_stepping_type,
627            return_raw_logits,
628            token_offset: 0,
629            eos_tokens,
630            total_prompt_time: None,
631            waitlisted_count: 0,
632            harmony_context: None,
633            think_tag_context: None,
634        }
635    }
636
637    pub fn add_urgency(mut self) -> Self {
638        self.scheduling_urgency += 1;
639        self
640    }
641
642    pub fn reset_urgency(mut self) -> Self {
643        self.scheduling_urgency = 0;
644        self
645    }
646
647    /// Simple metric: (scheduling urgency) + log2(length)
648    /// Takes into account: urgency (scales linear) and length (scales logarithmic)
649    /// Scaling urgency is the number of scheduling passes where we have not been scheduled.
650    pub fn compute_priority(&self) -> f64 {
651        #![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
652        (self.scheduling_urgency as f64) + (self.len() as f64).log2()
653    }
654
655    pub fn prefill_v2_normal(
656        mut self,
657        cache: Vec<Option<KvCache>>,
658        toks: Vec<u32>,
659        offset: usize,
660    ) -> Self {
661        self.normal_cache = cache;
662        self.prefill_prompt_toks = Some(toks);
663        self.set_state(SequenceState::RunningPrefillPrompt);
664        self.token_offset = offset;
665        self
666    }
667
668    pub fn prefill_v2_paged(
669        mut self,
670        logical_blocks: Vec<LogicalTokenBlock>,
671        physical_blocks: Vec<BlockRef>,
672        toks: Vec<u32>,
673        offset: usize,
674    ) -> Self {
675        self.prefill_prompt_toks = Some(toks);
676        self.set_state(SequenceState::RunningPrefillPrompt);
677        self.token_offset = offset;
678
679        if let SequenceCustomMetadata::PagedAttention {
680            logical_token_blocks,
681            physical_blocks_prefill,
682            block_size: _,
683        } = &mut self.custom_metadata
684        {
685            *logical_token_blocks = logical_blocks;
686            *physical_blocks_prefill = Some(physical_blocks);
687        }
688
689        self
690    }
691
692    /// This is the number of tokens. If the KV cache is Some, then it will use that.
693    pub fn len(&self) -> usize {
694        if let Some(toks) = &self.prefill_prompt_toks {
695            return toks.len();
696        }
697        if self.is_tmp {
698            return self.tokens.len();
699        }
700        // Use xlora cache first because of non granular
701        if self.xlora_cache.as_ref().is_some_and(|c| c[0].is_some()) {
702            self.xlora_cache.as_ref().unwrap()[0]
703                .as_ref()
704                .unwrap()
705                .0
706                .dims()[2]
707                + 1
708        } else if let Some((_, x)) = &self.cache[0] {
709            x.dims()[2] + 1
710        } else {
711            self.tokens.len()
712        }
713    }
714
715    pub fn id(&self) -> &usize {
716        &self.id
717    }
718
719    pub fn is_running(&self) -> bool {
720        matches!(
721            *self.state.read().unwrap(),
722            SequenceState::RunningCompletion | SequenceState::RunningPrompt // | SequenceState::RunningPrefillPrompt
723        )
724    }
725
726    pub fn is_completion(&self) -> bool {
727        matches!(
728            *self.state.read().unwrap(),
729            SequenceState::RunningCompletion
730        )
731    }
732
733    pub fn is_prompt(&self) -> bool {
734        matches!(
735            *self.state.read().unwrap(),
736            SequenceState::RunningPrompt | SequenceState::RunningPrefillPrompt
737        )
738    }
739
740    pub fn is_waiting(&self) -> bool {
741        matches!(*self.state.read().unwrap(), SequenceState::Waiting)
742    }
743
744    pub fn is_finished_paged_attn(&self) -> bool {
745        matches!(
746            *self.state.read().unwrap(),
747            SequenceState::FinishedAborted
748                | SequenceState::FinishedIgnored
749                | SequenceState::Done(_)
750        )
751    }
752
753    pub fn get_toks(&self) -> &[u32] {
754        if let Some(toks) = &self.prefill_prompt_toks {
755            return toks;
756        }
757        &self.tokens
758    }
759
760    pub fn get_initial_prompt(&self) -> &str {
761        &self.prompt
762    }
763
764    pub fn set_initial_prompt(&mut self, new: String) {
765        self.prompt = new;
766    }
767
768    pub fn token_offset(&self) -> usize {
769        self.token_offset
770    }
771
772    /// Get the number of prefix tokens that are cached (KV already computed).
773    /// These tokens should be skipped during prefill.
774    pub fn prefix_cache_len(&self) -> usize {
775        self.prefix_cache_len
776    }
777
778    /// Set the number of prefix tokens that are cached.
779    pub fn set_prefix_cache_len(&mut self, len: usize) {
780        self.prefix_cache_len = len;
781    }
782
783    /// This will also set prompt_len
784    pub(crate) fn set_toks_and_reallocate(
785        &mut self,
786        toks: Vec<u32>,
787        paged_attn_metadata: Option<&mut PagedAttentionMeta>,
788    ) {
789        self.tokens.clone_from(&toks);
790        self.prompt_len = self.tokens.len();
791        // Handle possible block engine
792        match &mut self.custom_metadata {
793            SequenceCustomMetadata::PagedAttention {
794                logical_token_blocks,
795                physical_blocks_prefill: _,
796                block_size: _,
797            } => {
798                logical_token_blocks.clear();
799            }
800            SequenceCustomMetadata::None => (),
801        }
802        self.custom_metadata
803            .append_tokens_to_blocks(toks.iter().map(|x| *x as usize).collect::<Vec<_>>());
804
805        if let Some(metadata) = paged_attn_metadata {
806            // Free and then reallocate as appropriate
807            get_mut_arcmutex!(metadata.block_engine).free_sequence(*self.id());
808            get_mut_arcmutex!(metadata.block_engine).allocate(self);
809        }
810    }
811
812    pub fn completion_bytes(&self) -> &[u8] {
813        &self.completion_bytes
814    }
815
816    pub fn preallocated_cache(&self) -> Option<&(Tensor, Tensor)> {
817        self.seq_preallocated_cache.as_ref()
818    }
819
820    pub fn normal_cache(&mut self) -> &mut Vec<Option<KvCache>> {
821        &mut self.normal_cache
822    }
823
824    pub fn normal_draft_cache(&mut self) -> &mut Vec<Option<KvCache>> {
825        &mut self.normal_draft_cache
826    }
827
828    pub fn cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
829        &mut self.cache
830    }
831
832    pub fn draft_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
833        &mut self.draft_cache
834    }
835
836    pub fn xlora_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
837        self.xlora_cache.as_mut().expect("No X-LoRA cache.")
838    }
839
840    pub fn scaling_cache(&mut self) -> &mut Option<Tensor> {
841        &mut self.scaling_cache
842    }
843
844    pub fn mamba_state_idx(&self) -> Option<usize> {
845        self.mamba_state_idx
846    }
847
848    pub fn set_mamba_state_idx(&mut self, idx: Option<usize>) {
849        self.mamba_state_idx = idx;
850    }
851
852    pub fn is_xlora(&self) -> bool {
853        self.xlora_cache.is_some()
854    }
855
856    pub fn sampler(&mut self) -> Arc<Sampler> {
857        self.sampler.clone()
858    }
859
860    /// Add a some prefill tokens. Only meant for internal speculative decoding usage.
861    pub fn set_prefill_toks(&mut self, toks: Vec<u32>) {
862        self.prefill_prompt_toks = Some(toks)
863    }
864
865    /// Remove the prefill tokens.
866    pub fn reset_prefill_toks(&mut self) {
867        self.prefill_prompt_toks = None
868    }
869
870    /// Internal api to add one raw token.
871    pub(crate) fn add_tmp_tok(&mut self, tok: u32) {
872        self.is_tmp = true;
873        self.tokens.push(tok);
874        // Handle possible block engine
875        self.custom_metadata.append_token_to_blocks(tok as usize);
876    }
877
878    /// Internal api to remove n raw tokens.
879    pub(crate) fn remove_tmp_tok(&mut self, n: usize) {
880        self.is_tmp = false;
881        self.tokens.truncate(self.tokens.len() - n);
882        // Handle possible block engine
883        self.custom_metadata.remove_tokens_from_blocks(n);
884    }
885
886    pub fn add_token(
887        &mut self,
888        tok: Logprobs,
889        completion_bytes: Vec<u8>,
890        is_done: &Option<StopReason>,
891    ) {
892        let stopped_by_token = matches!(
893            is_done,
894            Some(StopReason::Eos) | Some(StopReason::StopTok(_))
895        );
896        if !stopped_by_token {
897            // Completion bytes is used to check for stop strings, and as the response buffer.
898            // We don't need to add stop tokens to the completion bytes to check for stop strings.
899            // And by not adding it here, we can avoid having to delete these tokens from the output.
900            self.completion_bytes.extend_from_slice(&completion_bytes);
901            self.last_completion_bytes_len = completion_bytes.len();
902        }
903        self.last_logprob = tok.logprob;
904        self.last_is_done = *is_done;
905
906        self.custom_metadata
907            .append_token_to_blocks(tok.token as usize);
908
909        // Process token through Harmony parser if in Harmony mode
910        if let Some(ref mut harmony_ctx) = self.harmony_context {
911            let _ = harmony_ctx.process_token(tok.token);
912        }
913
914        // Process token through think tag parser if in think tag mode
915        if let Some(ref mut think_ctx) = self.think_tag_context {
916            if !stopped_by_token {
917                // Use process_bytes to handle incomplete UTF-8 sequences (e.g., emojis split across tokens)
918                think_ctx.process_bytes(&completion_bytes);
919            }
920        }
921
922        self.cumulative_logprob += tok.logprob;
923        self.tokens.push(tok.token);
924        self.logprobs.push(tok);
925        self.reset_prefill_toks();
926    }
927
928    pub fn responder(&self) -> Sender<Response> {
929        self.responder.clone()
930    }
931
932    pub fn creation_time(&self) -> u64 {
933        self.creation_time
934    }
935
936    pub fn set_state(&self, state: SequenceState) {
937        if matches!(state, SequenceState::Error) {
938            let mut group = get_mut_group!(self);
939            group.n_choices = group.n_choices.saturating_sub(1);
940        }
941        *self.state.write().unwrap() = state;
942    }
943
944    pub fn getstate(&self) -> SequenceState {
945        *self.state.read().unwrap()
946    }
947
948    pub fn is_done(
949        &self,
950        tok: u32,
951        eos_tok: Option<&[u32]>,
952        max_model_len: usize,
953    ) -> Option<StopReason> {
954        let is_eos = match eos_tok {
955            Some(eos_tok) => eos_tok.contains(&tok),
956            None => false,
957        };
958        if is_eos {
959            Some(StopReason::Eos)
960        } else if matches!(
961            &*self.state.read().unwrap(),
962            SequenceState::Done(StopReason::Canceled)
963        ) {
964            Some(StopReason::Canceled)
965        } else if self.stop_tokens.contains(&tok) {
966            Some(StopReason::StopTok(tok))
967        } else if self.max_len.is_some()
968            && self.tokens.len().saturating_sub(self.prompt_len) + 1 >= self.max_len.unwrap()
969        {
970            // add_token will be called after this check
971            Some(StopReason::Length(self.max_len.unwrap()))
972        } else if self.tokens.len().saturating_sub(self.prompt_len) >= max_model_len {
973            Some(StopReason::ModelLength(max_model_len))
974        } else {
975            if !self.stop_strings.is_empty() {
976                for (idx, s) in self.stop_strings.iter().enumerate() {
977                    if let Some(pos) = galil_seiferas::gs_find(&self.completion_bytes, s.as_bytes())
978                    {
979                        return Some(StopReason::StopString {
980                            stop_string_idx: idx,
981                            completion_bytes_pos: pos,
982                        });
983                    }
984                }
985            }
986            None
987        }
988    }
989
990    pub fn logprobs(&self) -> &[Logprobs] {
991        &self.logprobs
992    }
993
994    pub fn return_logprobs(&self) -> bool {
995        self.return_logprobs
996    }
997
998    pub fn prompt_tokens(&self) -> usize {
999        self.prompt_len
1000    }
1001
1002    pub fn stop_strings(&self) -> &[String] {
1003        &self.stop_strings
1004    }
1005
1006    /// Returns the delta between the last two decoded sequences
1007    pub fn get_delta(
1008        &mut self,
1009    ) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
1010        let new_decoded = self.peek_delta();
1011        if matches!(new_decoded, Ok(Some(_))) {
1012            self.stream_idx = self.completion_bytes.len();
1013        }
1014        new_decoded
1015    }
1016
1017    /// Peeks at the delta between the last two decoded sequences, but does not advance the stream index.
1018    pub fn peek_delta(&self) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
1019        let is_first = self.stream_idx == 0;
1020        let new_decoded = String::from_utf8_lossy(&self.completion_bytes[self.stream_idx..]);
1021        // Check if the sequence ends with valid utf8, if not skip it as it probably is a multi token sequence
1022        if new_decoded.ends_with('�') {
1023            return Ok(None);
1024        }
1025
1026        // The first token usually starts with a space. We don't want to add that to the delta.
1027        // Since we're using the completion_bytes, we need to take care of that ourselves.
1028        // Had we used HF's Tokenizer, it would have taken care of that for us.
1029        if is_first {
1030            return Ok(Some(new_decoded.trim_start().to_string()));
1031        }
1032        Ok(Some(new_decoded.to_string()))
1033    }
1034
1035    pub fn timestamp(&self) -> u128 {
1036        self.timestamp
1037    }
1038
1039    pub fn prompt_timestamp(&self) -> Option<u128> {
1040        self.prompt_timestamp
1041    }
1042
1043    fn update_time_info(&self) {
1044        let now = SystemTime::now()
1045            .duration_since(UNIX_EPOCH)
1046            .expect("Time travel has occurred!")
1047            .as_millis();
1048
1049        if let Some(ts) = self.prompt_timestamp {
1050            get_mut_group!(self).total_completion_time = now - ts;
1051            get_mut_group!(self).total_prompt_time = self.total_prompt_time.unwrap();
1052        }
1053
1054        get_mut_group!(self).total_time = now - self.timestamp;
1055
1056        get_mut_group!(self).total_prompt_toks = self.prompt_len;
1057        get_mut_group!(self).total_toks = self.len();
1058    }
1059
1060    pub fn add_image_choice_to_group(&self, choice: ImageChoice) {
1061        get_mut_group!(self).image_choices.push(choice);
1062    }
1063
1064    pub fn add_speech_pcm_to_group(&self, pcm: Arc<Vec<f32>>, rate: usize, channels: usize) {
1065        get_mut_group!(self).speech_pcms.push((pcm, rate, channels));
1066    }
1067
1068    pub fn add_choice_to_group(&self, choice: Choice) {
1069        get_mut_group!(self).choices.push(choice);
1070        self.update_time_info();
1071    }
1072
1073    pub fn add_raw_choice_to_group(&self, logit_chunks: Vec<Tensor>) {
1074        get_mut_group!(self)
1075            .raw_choices
1076            .push((logit_chunks, self.tokens.clone()));
1077        self.update_time_info();
1078    }
1079
1080    pub fn add_embedding_choice_to_group(&self, embedding: Vec<f32>) {
1081        get_mut_group!(self).embedding_choices.push(embedding);
1082        self.update_time_info();
1083    }
1084
1085    pub fn add_completion_choice_to_group(&self, mut choice: CompletionChoice) {
1086        choice.text = format!(
1087            "{}{}{}",
1088            self.prefix.as_deref().unwrap_or(""),
1089            choice.text,
1090            self.suffix.as_deref().unwrap_or("")
1091        );
1092        get_mut_group!(self)
1093            .completion_choices
1094            .push((self.cumulative_logprob, choice));
1095        self.update_time_info();
1096    }
1097
1098    pub fn get_response_index(&self) -> usize {
1099        self.response_index
1100    }
1101
1102    pub fn get_mut_group(&self) -> MutexGuard<'_, SequenceGroup> {
1103        get_mut_group!(self)
1104    }
1105
1106    pub fn add_streaming_chunk_choice_to_group(&self, chunk: ChunkChoice) {
1107        get_mut_group!(self).chat_streaming_chunks.push(chunk);
1108        self.update_time_info();
1109    }
1110
1111    pub fn add_streaming_completion_chunk_choice_to_group(&self, chunk: CompletionChunkChoice) {
1112        get_mut_group!(self).completion_streaming_chunks.push(chunk);
1113        self.update_time_info();
1114    }
1115
1116    pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
1117        self.multimodal.take_images()
1118    }
1119
1120    pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
1121        self.multimodal.clone_images()
1122    }
1123
1124    pub fn images(&self) -> Option<&[image::DynamicImage]> {
1125        self.multimodal.images()
1126    }
1127
1128    pub fn image_hashes(&self) -> Option<&[u64]> {
1129        self.multimodal.image_hashes()
1130    }
1131
1132    pub fn has_images(&self) -> bool {
1133        self.multimodal.has_images()
1134    }
1135
1136    pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
1137        self.multimodal.take_audios()
1138    }
1139
1140    pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
1141        self.multimodal.clone_audios()
1142    }
1143
1144    pub fn audios(&self) -> Option<&[AudioInput]> {
1145        self.multimodal.audios()
1146    }
1147
1148    pub fn audio_hashes(&self) -> Option<&[u64]> {
1149        self.multimodal.audio_hashes()
1150    }
1151
1152    pub fn has_audios(&self) -> bool {
1153        self.multimodal.has_audios()
1154    }
1155
1156    /// Keep these last n audios
1157    pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
1158        self.multimodal.keep_num_audios(audios_to_keep)
1159    }
1160
1161    /// Keep these last n images
1162    pub fn keep_num_images(&mut self, images_to_keep: usize) {
1163        self.multimodal.keep_num_images(images_to_keep)
1164    }
1165
1166    pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
1167        self.multimodal.image_gen_response_format()
1168    }
1169
1170    pub fn sequence_stepping_type(&self) -> &SeqStepType {
1171        &self.sequence_stepping_type
1172    }
1173
1174    pub fn get_diffusion_diffusion_params(&self) -> Option<DiffusionGenerationParams> {
1175        self.multimodal.diffusion_params()
1176    }
1177
1178    pub fn eos_tokens(&self) -> &[u32] {
1179        &self.eos_tokens
1180    }
1181
1182    // === Harmony Format Support ===
1183
1184    /// Enable Harmony format parsing for this sequence.
1185    /// Should be called when the model uses Harmony format (GPT-OSS models).
1186    pub fn enable_harmony_mode(&mut self) -> Result<(), anyhow::Error> {
1187        if self.harmony_context.is_none() {
1188            self.harmony_context = Some(HarmonyContext::new()?);
1189        }
1190        Ok(())
1191    }
1192
1193    /// Check if this sequence is in Harmony mode
1194    pub fn is_harmony_mode(&self) -> bool {
1195        self.harmony_context.is_some()
1196    }
1197
1198    /// Process a token through the Harmony parser (if enabled).
1199    /// Returns the Harmony delta if in Harmony mode.
1200    pub fn process_harmony_token(&mut self, token_id: u32) -> Option<crate::harmony::HarmonyDelta> {
1201        self.harmony_context
1202            .as_mut()
1203            .map(|ctx| ctx.process_token(token_id))
1204    }
1205
1206    /// Get the latest Harmony reasoning delta (for streaming).
1207    /// Returns None if not in Harmony mode or no new reasoning content.
1208    pub fn get_harmony_reasoning_delta(&mut self) -> Option<String> {
1209        self.harmony_context
1210            .as_mut()
1211            .and_then(|ctx| ctx.get_reasoning_delta())
1212    }
1213
1214    /// Get the latest Harmony final content delta (for streaming).
1215    /// Returns None if not in Harmony mode or no new final content.
1216    pub fn get_harmony_final_delta(&mut self) -> Option<String> {
1217        self.harmony_context
1218            .as_mut()
1219            .and_then(|ctx| ctx.get_final_delta())
1220    }
1221
1222    /// Get accumulated Harmony reasoning content (for non-streaming).
1223    /// Returns None if not in Harmony mode or no reasoning content.
1224    pub fn get_harmony_reasoning_content(&self) -> Option<String> {
1225        self.harmony_context
1226            .as_ref()
1227            .and_then(|ctx| ctx.reasoning_content())
1228    }
1229
1230    /// Get accumulated Harmony final content.
1231    /// Returns None if not in Harmony mode or no final content.
1232    pub fn get_harmony_final_content(&self) -> Option<String> {
1233        self.harmony_context
1234            .as_ref()
1235            .and_then(|ctx| ctx.final_content())
1236    }
1237
1238    /// Signal end of stream to the Harmony parser
1239    pub fn harmony_process_eos(&mut self) {
1240        if let Some(ref mut ctx) = self.harmony_context {
1241            ctx.process_eos();
1242        }
1243    }
1244
1245    /// Check if Harmony mode has detected any tool calls
1246    pub fn has_harmony_tool_calls(&self) -> bool {
1247        self.harmony_context
1248            .as_ref()
1249            .is_some_and(|ctx| ctx.has_tool_call())
1250    }
1251
1252    /// Get all Harmony tool calls (finalizes any pending tool call)
1253    pub fn get_harmony_tool_calls(&mut self) -> Vec<crate::harmony::HarmonyToolCall> {
1254        self.harmony_context
1255            .as_mut()
1256            .map(|ctx| ctx.finalize_tool_calls())
1257            .unwrap_or_default()
1258    }
1259
1260    // === Think Tag Format Support ===
1261
1262    /// Enable think tag parsing for this sequence.
1263    /// Should be called when the model uses `<think>...</think>` tags.
1264    ///
1265    /// If the prompt ends with `<think>`, the context will start inside a think block
1266    /// since the chat template hardcoded the opening tag.
1267    pub fn enable_think_tag_mode(&mut self) {
1268        if self.think_tag_context.is_none() {
1269            // Check if the prompt ends with <think> (template hardcoded the opening tag)
1270            let starts_in_think_block = self.prompt.trim_end().ends_with("<think>");
1271            self.think_tag_context = Some(if starts_in_think_block {
1272                ThinkTagContext::new_in_think_block()
1273            } else {
1274                ThinkTagContext::new()
1275            });
1276        }
1277    }
1278
1279    /// Check if this sequence is in think tag mode
1280    pub fn is_think_tag_mode(&self) -> bool {
1281        self.think_tag_context.is_some()
1282    }
1283
1284    /// Process text through the think tag parser (if enabled).
1285    pub fn process_think_tag_text(&mut self, text: &str) {
1286        if let Some(ref mut ctx) = self.think_tag_context {
1287            ctx.process_text(text);
1288        }
1289    }
1290
1291    /// Get the latest think tag reasoning delta (for streaming).
1292    /// Returns None if not in think tag mode or no new reasoning content.
1293    pub fn get_think_tag_reasoning_delta(&mut self) -> Option<String> {
1294        self.think_tag_context
1295            .as_mut()
1296            .and_then(|ctx| ctx.get_reasoning_delta())
1297    }
1298
1299    /// Get the latest think tag content delta (for streaming).
1300    /// Returns None if not in think tag mode or no new content.
1301    pub fn get_think_tag_content_delta(&mut self) -> Option<String> {
1302        self.think_tag_context
1303            .as_mut()
1304            .and_then(|ctx| ctx.get_content_delta())
1305    }
1306
1307    /// Get accumulated think tag reasoning content (for non-streaming).
1308    /// Returns None if not in think tag mode or no reasoning content.
1309    pub fn get_think_tag_reasoning_content(&self) -> Option<String> {
1310        self.think_tag_context
1311            .as_ref()
1312            .and_then(|ctx| ctx.reasoning_content())
1313    }
1314
1315    /// Get accumulated think tag content (for non-streaming).
1316    /// Returns None if not in think tag mode or no content.
1317    pub fn get_think_tag_content(&self) -> Option<String> {
1318        self.think_tag_context
1319            .as_ref()
1320            .and_then(|ctx| ctx.content())
1321    }
1322
1323    /// Finalize think tag parsing at end of stream.
1324    /// Handles unclosed `<think>` blocks.
1325    pub fn think_tag_finalize(&mut self) {
1326        if let Some(ref mut ctx) = self.think_tag_context {
1327            ctx.finalize();
1328        }
1329    }
1330}
1331
1332pub struct SequenceGroup {
1333    n_choices: usize, // The target number of choices to return. Can be decreased if an error is thrown.
1334    best_of: Option<usize>, // Top n seqs based on cumulative logprobs.
1335    pub total_prompt_toks: usize,
1336    pub total_toks: usize,
1337    pub total_prompt_time: u128,
1338    pub total_time: u128,
1339    pub total_completion_time: u128,
1340    choices: Vec<Choice>,
1341    image_choices: Vec<ImageChoice>,
1342    speech_pcms: Vec<(Arc<Vec<f32>>, usize, usize)>, // (pcm, rate, channels)
1343    raw_choices: Vec<(Vec<Tensor>, Vec<u32>)>,
1344    embedding_choices: Vec<Vec<f32>>,
1345    completion_choices: Vec<(f32, CompletionChoice)>,
1346    pub chat_streaming_chunks: Vec<ChunkChoice>,
1347    pub completion_streaming_chunks: Vec<CompletionChunkChoice>,
1348    pub is_streaming: bool,
1349    pub is_chat: bool,
1350}
1351
1352impl SequenceGroup {
1353    pub fn new(
1354        n_choices: usize,
1355        is_streaming: bool,
1356        is_chat: bool,
1357        best_of: Option<usize>,
1358    ) -> Self {
1359        Self {
1360            choices: Vec::new(),
1361            image_choices: Vec::new(),
1362            speech_pcms: Vec::new(),
1363            raw_choices: Vec::new(),
1364            embedding_choices: Vec::new(),
1365            completion_choices: Vec::new(),
1366            n_choices,
1367            total_prompt_toks: 0,
1368            total_toks: 0,
1369            total_prompt_time: 0,
1370            total_time: 0,
1371            total_completion_time: 0,
1372            chat_streaming_chunks: Vec::new(),
1373            completion_streaming_chunks: Vec::new(),
1374            is_streaming,
1375            is_chat,
1376            best_of,
1377        }
1378    }
1379
1380    pub fn get_choices(&self) -> &[Choice] {
1381        &self.choices
1382    }
1383
1384    /// This may apply the best_of.
1385    pub fn get_completion_choices(&self) -> Vec<CompletionChoice> {
1386        if let Some(best_of) = self.best_of {
1387            let mut choices = self.completion_choices.clone();
1388            // Sort by descending logprobs
1389            choices.sort_by(|a, b| b.0.partial_cmp(&a.0).expect("No ordering."));
1390            choices
1391                .into_iter()
1392                .take(best_of)
1393                .map(|(_, x)| x)
1394                .collect::<Vec<_>>()
1395        } else {
1396            self.completion_choices
1397                .clone()
1398                .into_iter()
1399                .map(|(_, x)| x)
1400                .collect::<Vec<_>>()
1401        }
1402    }
1403
1404    pub fn get_image_choices(&self) -> &[ImageChoice] {
1405        &self.image_choices
1406    }
1407
1408    pub fn get_usage(&self) -> Usage {
1409        #[allow(clippy::cast_precision_loss)]
1410        Usage {
1411            completion_tokens: self.total_toks.saturating_sub(self.total_prompt_toks),
1412            prompt_tokens: self.total_prompt_toks,
1413            total_tokens: self.total_toks,
1414            avg_tok_per_sec: (self.total_toks as f32 / self.total_time as f32) * 1000.,
1415            avg_prompt_tok_per_sec: (self.total_prompt_toks as f32 / self.total_prompt_time as f32)
1416                * 1000.,
1417            avg_compl_tok_per_sec: (self.total_toks.saturating_sub(self.total_prompt_toks) as f32
1418                / self.total_completion_time as f32)
1419                * 1000.,
1420            total_time_sec: self.total_time as f32 / 1000.,
1421            total_completion_time_sec: self.total_completion_time as f32 / 1000.,
1422            total_prompt_time_sec: self.total_prompt_time as f32 / 1000.,
1423        }
1424    }
1425
1426    pub async fn maybe_send_chat_done_response(
1427        &self,
1428        response: ChatCompletionResponse,
1429        sender: Sender<Response>,
1430    ) -> Result<(), SendError<Response>> {
1431        if self.choices.len() == self.n_choices {
1432            sender.send(Response::Done(response)).await?;
1433        }
1434
1435        Ok(())
1436    }
1437
1438    pub async fn maybe_send_raw_done_response(
1439        &self,
1440        sender: Sender<Response>,
1441    ) -> Result<(), SendError<Response>> {
1442        if self.raw_choices.len() == self.n_choices {
1443            assert_eq!(self.raw_choices.len(), 1);
1444            let (logits_chunks, tokens) = self.raw_choices[0].clone();
1445            sender
1446                .send(Response::Raw {
1447                    logits_chunks,
1448                    tokens,
1449                })
1450                .await?;
1451        }
1452
1453        Ok(())
1454    }
1455
1456    pub async fn maybe_send_embedding_done_response(
1457        &self,
1458        sender: Sender<Response>,
1459    ) -> Result<(), SendError<Response>> {
1460        if self.embedding_choices.len() == self.n_choices {
1461            assert_eq!(self.embedding_choices.len(), 1);
1462            let embeddings = self.embedding_choices[0].clone();
1463            let prompt_tokens = self.total_prompt_toks;
1464            let total_tokens = self.total_toks;
1465            sender
1466                .send(Response::Embeddings {
1467                    embeddings,
1468                    prompt_tokens,
1469                    total_tokens,
1470                })
1471                .await?;
1472        }
1473
1474        Ok(())
1475    }
1476
1477    pub async fn maybe_send_image_gen_response(
1478        &self,
1479        response: ImageGenerationResponse,
1480        sender: Sender<Response>,
1481    ) -> Result<(), SendError<Response>> {
1482        if self.image_choices.len() == self.n_choices {
1483            sender.send(Response::ImageGeneration(response)).await?;
1484        }
1485
1486        Ok(())
1487    }
1488
1489    pub async fn maybe_send_speech_response(
1490        &self,
1491        sender: Sender<Response>,
1492    ) -> Result<(), SendError<Response>> {
1493        assert_eq!(self.speech_pcms.len(), 1);
1494
1495        let (pcm, rate, channels) = self.speech_pcms[0].clone();
1496        sender
1497            .send(Response::Speech {
1498                pcm,
1499                rate,
1500                channels,
1501            })
1502            .await?;
1503
1504        Ok(())
1505    }
1506
1507    pub async fn maybe_send_streaming_response(
1508        &mut self,
1509        seq: &Sequence,
1510        model: String,
1511        usage_opt: Option<Usage>,
1512    ) -> Result<(), Box<SendError<Response>>> {
1513        if self.chat_streaming_chunks.len() == self.n_choices && self.is_streaming {
1514            let mut swap_streaming_chunks = vec![];
1515
1516            std::mem::swap(&mut swap_streaming_chunks, &mut self.chat_streaming_chunks);
1517
1518            seq.responder()
1519                .send(Response::Chunk(ChatCompletionChunkResponse {
1520                    id: seq.id.to_string(),
1521                    choices: swap_streaming_chunks,
1522                    created: seq.timestamp,
1523                    model: model.clone(),
1524                    system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1525                    object: "chat.completion.chunk".to_string(),
1526                    usage: usage_opt,
1527                }))
1528                .await?;
1529        } else if self.completion_streaming_chunks.len() == self.n_choices && self.is_streaming {
1530            let mut swap_streaming_chunks = vec![];
1531
1532            std::mem::swap(
1533                &mut swap_streaming_chunks,
1534                &mut self.completion_streaming_chunks,
1535            );
1536
1537            seq.responder()
1538                .send(Response::CompletionChunk(CompletionChunkResponse {
1539                    id: seq.id.to_string(),
1540                    choices: swap_streaming_chunks,
1541                    created: seq.timestamp,
1542                    model: model.clone(),
1543                    system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1544                    object: "text_completion".to_string(),
1545                }))
1546                .await?;
1547        }
1548        Ok(())
1549    }
1550
1551    pub async fn maybe_send_completion_done_response(
1552        &self,
1553        response: CompletionResponse,
1554        sender: Sender<Response>,
1555    ) -> Result<(), Box<SendError<Response>>> {
1556        if self.completion_choices.len() == self.n_choices {
1557            sender.send(Response::CompletionDone(response)).await?;
1558        }
1559        Ok(())
1560    }
1561}