mistralrs_core/
sequence.rs

1use crate::{
2    get_mut_arcmutex, get_mut_group,
3    harmony::HarmonyContext,
4    paged_attention::PhysicalTokenBlock,
5    pipeline::{text_models_inputs_processor::PagedAttentionMeta, LayerCaches},
6    response::{ChatCompletionChunkResponse, Choice, ChunkChoice, Response, SYSTEM_FINGERPRINT},
7    sampler::{Logprobs, Sampler},
8    AudioInput, ChatCompletionResponse, Usage,
9};
10use crate::{
11    paged_attention::{BlockEngineSequence, LogicalTokenBlock},
12    pipeline::{DiffusionGenerationParams, KvCache},
13    response::CompletionChoice,
14    tools::ToolCallingMatcher,
15    CompletionChunkChoice, CompletionChunkResponse, CompletionResponse, ImageChoice,
16    ImageGenerationResponse, ImageGenerationResponseFormat,
17};
18use candle_core::Tensor;
19use std::{
20    fmt::Display,
21    hash::{DefaultHasher, Hash, Hasher},
22    sync::{Arc, RwLock},
23    time::{SystemTime, UNIX_EPOCH},
24};
25use tokio::sync::{
26    mpsc::{error::SendError, Sender},
27    Mutex, MutexGuard,
28};
29
30#[derive(Clone, Copy, PartialEq, Debug)]
31pub enum StopReason {
32    Eos,
33    StopTok(u32),
34    Length(usize),
35    ModelLength(usize),
36    StopString {
37        stop_string_idx: usize,
38        completion_bytes_pos: usize,
39    },
40    Canceled,
41    GeneratedImage,
42    GeneratedSpeech,
43    ToolCalls,
44}
45
46impl Display for StopReason {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        match self {
49            StopReason::Eos => write!(f, "stop"),
50            StopReason::Length(_) | StopReason::ModelLength(_) => write!(f, "length"),
51            StopReason::StopTok(_) | StopReason::StopString { .. } => write!(f, "stop"),
52            StopReason::Canceled => write!(f, "canceled"),
53            StopReason::GeneratedImage => write!(f, "generated_image"),
54            StopReason::GeneratedSpeech => write!(f, "generated_speech"),
55            StopReason::ToolCalls => write!(f, "tool_calls"),
56        }
57    }
58}
59
60#[derive(Clone, Copy, PartialEq, Debug)]
61pub enum SequenceState {
62    Done(StopReason),
63    RunningPrompt,
64    RunningCompletion,
65    Waiting,
66    Error,
67    RunningPrefillPrompt,
68    // For PagedAttention:
69    FinishedAborted,
70    FinishedIgnored,
71    Swapped,
72}
73
74pub enum SequenceRecognizer {
75    Llguidance(Box<llguidance::Matcher>),
76    None,
77}
78
79enum SequenceCustomMetadata {
80    PagedAttention {
81        logical_token_blocks: Vec<LogicalTokenBlock>,
82        physical_blocks_prefill: Option<Vec<Arc<PhysicalTokenBlock>>>,
83        block_size: usize,
84    },
85    None,
86}
87
88macro_rules! blocks_to_add_new_tok {
89    ($logical_token_blocks:expr) => {{
90        let last = $logical_token_blocks.last();
91        if !last.is_some_and(|last| last.is_full() || last.is_empty()) {
92            // If we have space
93            0
94        } else {
95            1
96        }
97    }};
98}
99
100pub(crate) fn util_append_token_to_blocks(
101    tok: usize,
102    logical_token_blocks: &mut Vec<LogicalTokenBlock>,
103    block_size: usize,
104) {
105    let last = logical_token_blocks.last_mut();
106    match last {
107        Some(last) => {
108            last.append_token_id(tok);
109        }
110        None => {
111            logical_token_blocks.push(LogicalTokenBlock::new(block_size));
112            // SAFETY: We just pushed a block, so last_mut() will return Some
113            logical_token_blocks
114                .last_mut()
115                .expect("just pushed a block, vector cannot be empty")
116                .append_token_id(tok);
117        }
118    }
119    // SAFETY: At this point, we either had a block or just created one above,
120    // so the vector is guaranteed to be non-empty.
121    if logical_token_blocks
122        .last()
123        .expect("logical_token_blocks should not be empty after appending")
124        .is_full()
125    {
126        logical_token_blocks.push(LogicalTokenBlock::new(block_size));
127    }
128}
129
130impl SequenceCustomMetadata {
131    fn append_token_to_blocks(&mut self, tok: usize) {
132        match self {
133            Self::PagedAttention {
134                logical_token_blocks,
135                physical_blocks_prefill: _,
136                block_size,
137            } => {
138                util_append_token_to_blocks(tok, logical_token_blocks, *block_size);
139            }
140            Self::None => (),
141        }
142    }
143
144    fn pop_token_from_blocks(&mut self) {
145        match self {
146            Self::PagedAttention {
147                logical_token_blocks,
148                physical_blocks_prefill: _,
149                block_size: _,
150            } => {
151                let last = logical_token_blocks.last_mut().unwrap();
152                last.pop_token();
153            }
154            Self::None => (),
155        }
156    }
157
158    fn append_tokens_to_blocks(&mut self, toks: Vec<usize>) {
159        for tok in toks {
160            self.append_token_to_blocks(tok);
161        }
162    }
163
164    fn remove_tokens_from_blocks(&mut self, n: usize) {
165        for _ in 0..n {
166            self.pop_token_from_blocks();
167        }
168    }
169}
170
171#[derive(Clone, Copy)]
172pub enum SeqStepType {
173    PromptAndDecode,
174    OneShot,
175}
176
177pub struct SequenceImages {
178    images: Vec<image::DynamicImage>,
179    hashes: Vec<u64>,
180}
181
182#[derive(Clone)]
183pub struct SequenceAudios {
184    audios: Vec<AudioInput>,
185    hashes: Vec<u64>,
186}
187
188impl SequenceAudios {
189    fn new(input_audios: Vec<AudioInput>) -> Self {
190        let hashes = input_audios.iter().map(|a| {
191            let mut hasher = DefaultHasher::new();
192            for s in &a.samples {
193                s.to_bits().hash(&mut hasher);
194            }
195            a.sample_rate.hash(&mut hasher);
196            hasher.finish()
197        });
198        Self {
199            hashes: hashes.collect(),
200            audios: input_audios,
201        }
202    }
203
204    fn clone_audios(&self) -> Vec<AudioInput> {
205        self.audios.clone()
206    }
207
208    fn audios(&self) -> &[AudioInput] {
209        &self.audios
210    }
211
212    fn audios_mut(&mut self) -> &mut Vec<AudioInput> {
213        &mut self.audios
214    }
215
216    fn hashes(&self) -> &[u64] {
217        &self.hashes
218    }
219
220    fn keep_num_audios(&mut self, audios_to_keep: usize) {
221        if self.audios.len() > audios_to_keep {
222            let start = self.audios.len() - audios_to_keep;
223            self.audios = self.audios[start..].to_vec();
224            // Do not do this because we need all the hashes later in the prefix cacher.
225            // self.hashes = self.hashes[start..].to_vec();
226        }
227    }
228}
229
230impl SequenceImages {
231    fn new(input_images: Vec<image::DynamicImage>) -> Self {
232        let hashes = input_images.iter().map(|x| {
233            let mut hasher = DefaultHasher::new();
234            x.as_bytes().hash(&mut hasher);
235            hasher.finish()
236        });
237        Self {
238            hashes: hashes.collect(),
239            images: input_images,
240        }
241    }
242
243    fn clone_images(&self) -> Vec<image::DynamicImage> {
244        self.images.clone()
245    }
246
247    fn images(&self) -> &[image::DynamicImage] {
248        &self.images
249    }
250
251    fn images_mut(&mut self) -> &mut Vec<image::DynamicImage> {
252        &mut self.images
253    }
254
255    fn hashes(&self) -> &[u64] {
256        &self.hashes
257    }
258
259    fn keep_num_images(&mut self, images_to_keep: usize) {
260        if self.images.len() > images_to_keep {
261            let start = self.images.len() - images_to_keep;
262            self.images = self.images[start..].to_vec();
263            // Do not do this because we need all the hashes later in the prefix cacher.
264            // self.hashes = self.hashes[start..].to_vec();
265        }
266    }
267}
268
269// Holds all multimodal (vision/diffusion) data for a Sequence.
270pub struct MultimodalData {
271    pub input_images: Option<SequenceImages>,
272    pub input_audios: Option<SequenceAudios>,
273    pub cached_pixel_values: Option<Tensor>,
274    pub cached_img_thw: Option<Tensor>,
275    pub cached_vid_thw: Option<Tensor>,
276    pub has_changed_prompt: bool,
277    pub image_gen_response_format: Option<ImageGenerationResponseFormat>,
278    pub diffusion_params: Option<DiffusionGenerationParams>,
279}
280
281impl MultimodalData {
282    pub fn new(
283        input_images: Option<Vec<image::DynamicImage>>,
284        input_audios: Option<Vec<AudioInput>>,
285        image_gen_response_format: Option<ImageGenerationResponseFormat>,
286        diffusion_params: Option<DiffusionGenerationParams>,
287    ) -> Self {
288        MultimodalData {
289            input_images: input_images.map(SequenceImages::new),
290            input_audios: input_audios.map(SequenceAudios::new),
291            cached_pixel_values: None,
292            cached_img_thw: None,
293            cached_vid_thw: None,
294            has_changed_prompt: false,
295            image_gen_response_format,
296            diffusion_params,
297        }
298    }
299
300    pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
301        if self.has_changed_prompt {
302            if let Some(input_images) = self.input_images.as_mut() {
303                let mut images = Vec::new();
304                std::mem::swap(&mut images, input_images.images_mut());
305                Some(images)
306            } else {
307                None
308            }
309        } else {
310            self.input_images.as_ref().map(|imgs| imgs.clone_images())
311        }
312    }
313
314    pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
315        self.input_images.as_ref().map(|imgs| imgs.clone_images())
316    }
317
318    pub fn images(&self) -> Option<&[image::DynamicImage]> {
319        self.input_images.as_ref().map(|imgs| imgs.images())
320    }
321
322    pub fn image_hashes(&self) -> Option<&[u64]> {
323        self.input_images.as_ref().map(|imgs| imgs.hashes())
324    }
325
326    pub fn has_images(&self) -> bool {
327        self.input_images
328            .as_ref()
329            .is_some_and(|imgs| !imgs.images().is_empty())
330    }
331
332    pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
333        if self.has_changed_prompt {
334            if let Some(input_audios) = self.input_audios.as_mut() {
335                let mut audios = Vec::new();
336                std::mem::swap(&mut audios, input_audios.audios_mut());
337                Some(audios)
338            } else {
339                None
340            }
341        } else {
342            self.input_audios.as_ref().map(|imgs| imgs.clone_audios())
343        }
344    }
345
346    pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
347        self.input_audios.as_ref().map(|a| a.clone_audios())
348    }
349
350    pub fn audios(&self) -> Option<&[AudioInput]> {
351        self.input_audios.as_ref().map(|a| a.audios())
352    }
353
354    pub fn audio_hashes(&self) -> Option<&[u64]> {
355        self.input_audios.as_ref().map(|a| a.hashes())
356    }
357
358    pub fn has_audios(&self) -> bool {
359        self.input_audios
360            .as_ref()
361            .is_some_and(|a| !a.audios().is_empty())
362    }
363
364    pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
365        if let Some(auds) = self.input_audios.as_mut() {
366            auds.keep_num_audios(audios_to_keep)
367        }
368    }
369
370    pub fn keep_num_images(&mut self, images_to_keep: usize) {
371        if let Some(imgs) = self.input_images.as_mut() {
372            imgs.keep_num_images(images_to_keep)
373        }
374    }
375
376    pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
377        self.image_gen_response_format
378    }
379
380    pub fn diffusion_params(&self) -> Option<DiffusionGenerationParams> {
381        self.diffusion_params.clone()
382    }
383}
384
385pub struct Sequence {
386    // Metadata, const
387    id: usize,
388    prompt_len: usize,
389    max_len: Option<usize>,
390    timestamp: u128,
391    sampler: Arc<Sampler>,
392    stop_tokens: Vec<u32>,
393    stop_strings: Vec<String>,
394    return_logprobs: bool,
395    responder: Sender<Response>,
396    response_index: usize,
397    creation_time: u64,
398    prompt: String,
399    sequence_stepping_type: SeqStepType,
400    pub(crate) return_raw_logits: bool,
401    token_offset: usize,
402    eos_tokens: Vec<u32>,
403
404    // Multimodal data (images, diffusion settings, pixel caches)
405    pub multimodal: MultimodalData,
406
407    // Completion requests
408    suffix: Option<String>,
409    prefix: Option<String>,
410
411    // Speculative
412    is_tmp: bool,
413
414    // Prefix caching
415    prefill_prompt_toks: Option<Vec<u32>>,
416    /// Number of tokens at the start of the prompt that are cached (KV already computed).
417    /// These tokens should be skipped during prefill.
418    prefix_cache_len: usize,
419
420    // Cache
421    normal_cache: Vec<Option<KvCache>>,
422    normal_draft_cache: Vec<Option<KvCache>>,
423    scaling_cache: Option<Tensor>,
424    cache: LayerCaches,
425    draft_cache: LayerCaches,
426    xlora_cache: Option<LayerCaches>,
427    /// For hybrid models: index into the Mamba state pool
428    mamba_state_idx: Option<usize>,
429
430    // Preallocated KV cache (k,v)
431    seq_preallocated_cache: Option<(Tensor, Tensor)>,
432
433    // Mutables
434    tokens: Vec<u32>,
435    logprobs: Vec<Logprobs>,
436    cumulative_logprob: f32,
437    last_logprob: f32,
438    last_completion_bytes_len: usize,
439    last_is_done: Option<StopReason>,
440    completion_bytes: Vec<u8>,
441    stream_idx: usize,
442    pub recognizer: SequenceRecognizer,
443    scheduling_urgency: usize, // The number of passes since scheduling
444    waitlisted_count: usize, // Used in PagedAttention to alert the user when a sequence repeatedly cannot be scheduled
445
446    // GPU things
447    pub prompt_tok_per_sec: f32,
448    pub prompt_timestamp: Option<u128>,
449    pub total_prompt_time: Option<u128>,
450    group: Arc<Mutex<SequenceGroup>>,
451    state: RwLock<SequenceState>,
452
453    // Custom backend metadata
454    custom_metadata: SequenceCustomMetadata,
455
456    // Tool calls
457    pub tools: Option<Arc<ToolCallingMatcher>>,
458
459    // Harmony format parsing context (for GPT-OSS models)
460    harmony_context: Option<HarmonyContext>,
461}
462
463impl BlockEngineSequence for Sequence {
464    fn blocks_to_add_new_tok(&self) -> usize {
465        match &self.custom_metadata {
466            SequenceCustomMetadata::PagedAttention {
467                logical_token_blocks,
468                physical_blocks_prefill: _,
469                block_size: _,
470            } => {
471                blocks_to_add_new_tok!(logical_token_blocks)
472            }
473            SequenceCustomMetadata::None => unreachable!(),
474        }
475    }
476
477    fn get_id(&self) -> usize {
478        self.id
479    }
480
481    fn logical_token_blocks(&self) -> &[LogicalTokenBlock] {
482        match &self.custom_metadata {
483            SequenceCustomMetadata::PagedAttention {
484                logical_token_blocks,
485                physical_blocks_prefill: _,
486                block_size: _,
487            } => logical_token_blocks,
488            SequenceCustomMetadata::None => unreachable!(),
489        }
490    }
491
492    fn take_physical_blocks_prefill(&mut self) -> Option<Vec<Arc<PhysicalTokenBlock>>> {
493        match &mut self.custom_metadata {
494            SequenceCustomMetadata::PagedAttention {
495                logical_token_blocks: _,
496                physical_blocks_prefill,
497                block_size: _,
498            } => physical_blocks_prefill.take(),
499            SequenceCustomMetadata::None => None,
500        }
501    }
502
503    fn increment_waitlist_count(&mut self) -> usize {
504        let prev = self.waitlisted_count;
505        self.waitlisted_count += 1;
506        prev
507    }
508
509    fn set_prefix_cache_len(&mut self, len: usize) {
510        self.prefix_cache_len = len;
511    }
512
513    fn block_size(&self) -> usize {
514        match &self.custom_metadata {
515            SequenceCustomMetadata::PagedAttention { block_size, .. } => *block_size,
516            SequenceCustomMetadata::None => unreachable!(),
517        }
518    }
519}
520
521impl Sequence {
522    #[allow(clippy::too_many_arguments)]
523    pub fn new_waiting(
524        tokens: Vec<u32>,
525        prompt: String,
526        id: usize,
527        timestamp: u128,
528        layers: usize,
529        responder: Sender<Response>,
530        sampler: Sampler,
531        stop_tokens: Vec<u32>,
532        stop_strings: Vec<String>,
533        max_len: Option<usize>,
534        return_logprobs: bool,
535        is_xlora: bool,
536        group: Arc<Mutex<SequenceGroup>>,
537        response_index: usize,
538        creation_time: u64,
539        recognizer: SequenceRecognizer,
540        suffix: Option<String>,
541        prefix: Option<String>,
542        input_images: Option<Vec<image::DynamicImage>>,
543        input_audios: Option<Vec<AudioInput>>,
544        // Paged attention
545        block_size: Option<usize>,
546        //
547        tools: Option<Arc<ToolCallingMatcher>>,
548        image_gen_response_format: Option<ImageGenerationResponseFormat>,
549        sequence_stepping_type: SeqStepType,
550        diffusion_params: Option<DiffusionGenerationParams>,
551        // Preallocated KV cache (k,v)
552        seq_preallocated_cache: Option<(Tensor, Tensor)>,
553        //
554        return_raw_logits: bool,
555        eos_tokens: Vec<u32>,
556    ) -> Self {
557        let prompt_len = tokens.len();
558        let mut custom_metadata = if let Some(block_size) = block_size {
559            SequenceCustomMetadata::PagedAttention {
560                logical_token_blocks: Vec::new(),
561                physical_blocks_prefill: None,
562                block_size,
563            }
564        } else {
565            SequenceCustomMetadata::None
566        };
567        custom_metadata
568            .append_tokens_to_blocks(tokens.iter().map(|x| *x as usize).collect::<Vec<_>>());
569        Self {
570            tokens,
571            prompt,
572            logprobs: Vec::new(),
573            prompt_len,
574            id,
575            timestamp,
576            state: RwLock::new(SequenceState::Waiting),
577            normal_cache: vec![None; layers],
578            normal_draft_cache: vec![None; layers],
579            cache: vec![None; layers],
580            draft_cache: vec![None; layers],
581            xlora_cache: if is_xlora {
582                Some(vec![None; layers])
583            } else {
584                None
585            },
586            mamba_state_idx: None,
587            seq_preallocated_cache,
588            responder,
589            sampler: sampler.into(),
590            stop_tokens,
591            stop_strings,
592            max_len,
593            return_logprobs,
594            prompt_tok_per_sec: 0.,
595            prompt_timestamp: None,
596            group,
597            scaling_cache: None,
598            response_index,
599            creation_time,
600            recognizer,
601            prefill_prompt_toks: None,
602            prefix_cache_len: 0,
603            suffix,
604            prefix,
605            cumulative_logprob: 0.,
606            completion_bytes: Vec::new(),
607            stream_idx: 0,
608            last_completion_bytes_len: 0,
609            last_logprob: 0.0,
610            last_is_done: None,
611            is_tmp: false,
612            scheduling_urgency: 0,
613            // Multimodal data
614            multimodal: MultimodalData::new(
615                input_images,
616                input_audios,
617                image_gen_response_format,
618                diffusion_params,
619            ),
620            custom_metadata,
621            tools,
622            sequence_stepping_type,
623            return_raw_logits,
624            token_offset: 0,
625            eos_tokens,
626            total_prompt_time: None,
627            waitlisted_count: 0,
628            harmony_context: None,
629        }
630    }
631
632    pub fn add_urgency(mut self) -> Self {
633        self.scheduling_urgency += 1;
634        self
635    }
636
637    pub fn reset_urgency(mut self) -> Self {
638        self.scheduling_urgency = 0;
639        self
640    }
641
642    /// Simple metric: (scheduling urgency) + log2(length)
643    /// Takes into account: urgency (scales linear) and length (scales logarithmic)
644    /// Scaling urgency is the number of scheduling passes where we have not been scheduled.
645    pub fn compute_priority(&self) -> f64 {
646        #![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
647        (self.scheduling_urgency as f64) + (self.len() as f64).log2()
648    }
649
650    pub fn prefill_v2_normal(
651        mut self,
652        cache: Vec<Option<KvCache>>,
653        toks: Vec<u32>,
654        offset: usize,
655    ) -> Self {
656        self.normal_cache = cache;
657        self.prefill_prompt_toks = Some(toks);
658        self.set_state(SequenceState::RunningPrefillPrompt);
659        self.token_offset = offset;
660        self
661    }
662
663    pub fn prefill_v2_paged(
664        mut self,
665        logical_blocks: Vec<LogicalTokenBlock>,
666        physical_blocks: Vec<Arc<PhysicalTokenBlock>>,
667        toks: Vec<u32>,
668        offset: usize,
669    ) -> Self {
670        self.prefill_prompt_toks = Some(toks);
671        self.set_state(SequenceState::RunningPrefillPrompt);
672        self.token_offset = offset;
673
674        if let SequenceCustomMetadata::PagedAttention {
675            logical_token_blocks,
676            physical_blocks_prefill,
677            block_size: _,
678        } = &mut self.custom_metadata
679        {
680            *logical_token_blocks = logical_blocks;
681            *physical_blocks_prefill = Some(physical_blocks);
682        }
683
684        self
685    }
686
687    /// This is the number of tokens. If the KV cache is Some, then it will use that.
688    pub fn len(&self) -> usize {
689        if let Some(toks) = &self.prefill_prompt_toks {
690            return toks.len();
691        }
692        if self.is_tmp {
693            return self.tokens.len();
694        }
695        // Use xlora cache first because of non granular
696        if self.xlora_cache.as_ref().is_some_and(|c| c[0].is_some()) {
697            self.xlora_cache.as_ref().unwrap()[0]
698                .as_ref()
699                .unwrap()
700                .0
701                .dims()[2]
702                + 1
703        } else if let Some((_, x)) = &self.cache[0] {
704            x.dims()[2] + 1
705        } else {
706            self.tokens.len()
707        }
708    }
709
710    pub fn id(&self) -> &usize {
711        &self.id
712    }
713
714    pub fn is_running(&self) -> bool {
715        matches!(
716            *self.state.read().unwrap(),
717            SequenceState::RunningCompletion | SequenceState::RunningPrompt // | SequenceState::RunningPrefillPrompt
718        )
719    }
720
721    pub fn is_completion(&self) -> bool {
722        matches!(
723            *self.state.read().unwrap(),
724            SequenceState::RunningCompletion
725        )
726    }
727
728    pub fn is_prompt(&self) -> bool {
729        matches!(
730            *self.state.read().unwrap(),
731            SequenceState::RunningPrompt | SequenceState::RunningPrefillPrompt
732        )
733    }
734
735    pub fn is_waiting(&self) -> bool {
736        matches!(*self.state.read().unwrap(), SequenceState::Waiting)
737    }
738
739    pub fn is_finished_paged_attn(&self) -> bool {
740        matches!(
741            *self.state.read().unwrap(),
742            SequenceState::FinishedAborted
743                | SequenceState::FinishedIgnored
744                | SequenceState::Done(_)
745        )
746    }
747
748    pub fn get_toks(&self) -> &[u32] {
749        if let Some(toks) = &self.prefill_prompt_toks {
750            return toks;
751        }
752        &self.tokens
753    }
754
755    pub fn get_initial_prompt(&self) -> &str {
756        &self.prompt
757    }
758
759    pub fn set_initial_prompt(&mut self, new: String) {
760        self.prompt = new;
761    }
762
763    pub fn token_offset(&self) -> usize {
764        self.token_offset
765    }
766
767    /// Get the number of prefix tokens that are cached (KV already computed).
768    /// These tokens should be skipped during prefill.
769    pub fn prefix_cache_len(&self) -> usize {
770        self.prefix_cache_len
771    }
772
773    /// Set the number of prefix tokens that are cached.
774    pub fn set_prefix_cache_len(&mut self, len: usize) {
775        self.prefix_cache_len = len;
776    }
777
778    /// This will also set prompt_len
779    pub(crate) fn set_toks_and_reallocate(
780        &mut self,
781        toks: Vec<u32>,
782        paged_attn_metadata: Option<&mut PagedAttentionMeta>,
783    ) {
784        self.tokens.clone_from(&toks);
785        self.prompt_len = self.tokens.len();
786        // Handle possible block engine
787        match &mut self.custom_metadata {
788            SequenceCustomMetadata::PagedAttention {
789                logical_token_blocks,
790                physical_blocks_prefill: _,
791                block_size: _,
792            } => {
793                logical_token_blocks.clear();
794            }
795            SequenceCustomMetadata::None => (),
796        }
797        self.custom_metadata
798            .append_tokens_to_blocks(toks.iter().map(|x| *x as usize).collect::<Vec<_>>());
799
800        if let Some(metadata) = paged_attn_metadata {
801            // Free and then reallocate as appropriate
802            get_mut_arcmutex!(metadata.block_engine).free_sequence(*self.id());
803            get_mut_arcmutex!(metadata.block_engine).allocate(self);
804        }
805    }
806
807    pub fn completion_bytes(&self) -> &[u8] {
808        &self.completion_bytes
809    }
810
811    pub fn preallocated_cache(&self) -> Option<&(Tensor, Tensor)> {
812        self.seq_preallocated_cache.as_ref()
813    }
814
815    pub fn normal_cache(&mut self) -> &mut Vec<Option<KvCache>> {
816        &mut self.normal_cache
817    }
818
819    pub fn normal_draft_cache(&mut self) -> &mut Vec<Option<KvCache>> {
820        &mut self.normal_draft_cache
821    }
822
823    pub fn cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
824        &mut self.cache
825    }
826
827    pub fn draft_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
828        &mut self.draft_cache
829    }
830
831    pub fn xlora_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
832        self.xlora_cache.as_mut().expect("No X-LoRA cache.")
833    }
834
835    pub fn scaling_cache(&mut self) -> &mut Option<Tensor> {
836        &mut self.scaling_cache
837    }
838
839    pub fn mamba_state_idx(&self) -> Option<usize> {
840        self.mamba_state_idx
841    }
842
843    pub fn set_mamba_state_idx(&mut self, idx: Option<usize>) {
844        self.mamba_state_idx = idx;
845    }
846
847    pub fn is_xlora(&self) -> bool {
848        self.xlora_cache.is_some()
849    }
850
851    pub fn sampler(&mut self) -> Arc<Sampler> {
852        self.sampler.clone()
853    }
854
855    /// Add a some prefill tokens. Only meant for internal speculative decoding usage.
856    pub fn set_prefill_toks(&mut self, toks: Vec<u32>) {
857        self.prefill_prompt_toks = Some(toks)
858    }
859
860    /// Remove the prefill tokens.
861    pub fn reset_prefill_toks(&mut self) {
862        self.prefill_prompt_toks = None
863    }
864
865    /// Internal api to add one raw token.
866    pub(crate) fn add_tmp_tok(&mut self, tok: u32) {
867        self.is_tmp = true;
868        self.tokens.push(tok);
869        // Handle possible block engine
870        self.custom_metadata.append_token_to_blocks(tok as usize);
871    }
872
873    /// Internal api to remove n raw tokens.
874    pub(crate) fn remove_tmp_tok(&mut self, n: usize) {
875        self.is_tmp = false;
876        self.tokens.truncate(self.tokens.len() - n);
877        // Handle possible block engine
878        self.custom_metadata.remove_tokens_from_blocks(n);
879    }
880
881    pub fn add_token(
882        &mut self,
883        tok: Logprobs,
884        completion_bytes: Vec<u8>,
885        is_done: &Option<StopReason>,
886    ) {
887        let stopped_by_token = matches!(
888            is_done,
889            Some(StopReason::Eos) | Some(StopReason::StopTok(_))
890        );
891        if !stopped_by_token {
892            // Completion bytes is used to check for stop strings, and as the response buffer.
893            // We don't need to add stop tokens to the completion bytes to check for stop strings.
894            // And by not adding it here, we can avoid having to delete these tokens from the output.
895            self.completion_bytes.extend_from_slice(&completion_bytes);
896            self.last_completion_bytes_len = completion_bytes.len();
897        }
898        self.last_logprob = tok.logprob;
899        self.last_is_done = *is_done;
900
901        self.custom_metadata
902            .append_token_to_blocks(tok.token as usize);
903
904        // Process token through Harmony parser if in Harmony mode
905        if let Some(ref mut harmony_ctx) = self.harmony_context {
906            let _ = harmony_ctx.process_token(tok.token);
907        }
908
909        self.cumulative_logprob += tok.logprob;
910        self.tokens.push(tok.token);
911        self.logprobs.push(tok);
912        self.reset_prefill_toks();
913    }
914
915    pub fn responder(&self) -> Sender<Response> {
916        self.responder.clone()
917    }
918
919    pub fn creation_time(&self) -> u64 {
920        self.creation_time
921    }
922
923    pub fn set_state(&self, state: SequenceState) {
924        if matches!(state, SequenceState::Error) {
925            let mut group = get_mut_group!(self);
926            group.n_choices = group.n_choices.saturating_sub(1);
927        }
928        *self.state.write().unwrap() = state;
929    }
930
931    pub fn getstate(&self) -> SequenceState {
932        *self.state.read().unwrap()
933    }
934
935    pub fn is_done(
936        &self,
937        tok: u32,
938        eos_tok: Option<&[u32]>,
939        max_model_len: usize,
940    ) -> Option<StopReason> {
941        let is_eos = match eos_tok {
942            Some(eos_tok) => eos_tok.contains(&tok),
943            None => false,
944        };
945        if is_eos {
946            Some(StopReason::Eos)
947        } else if matches!(
948            &*self.state.read().unwrap(),
949            SequenceState::Done(StopReason::Canceled)
950        ) {
951            Some(StopReason::Canceled)
952        } else if self.stop_tokens.contains(&tok) {
953            Some(StopReason::StopTok(tok))
954        } else if self.max_len.is_some()
955            && self.tokens.len().saturating_sub(self.prompt_len) + 1 >= self.max_len.unwrap()
956        {
957            // add_token will be called after this check
958            Some(StopReason::Length(self.max_len.unwrap()))
959        } else if self.tokens.len().saturating_sub(self.prompt_len) >= max_model_len {
960            Some(StopReason::ModelLength(max_model_len))
961        } else {
962            if !self.stop_strings.is_empty() {
963                for (idx, s) in self.stop_strings.iter().enumerate() {
964                    if let Some(pos) = galil_seiferas::gs_find(&self.completion_bytes, s.as_bytes())
965                    {
966                        return Some(StopReason::StopString {
967                            stop_string_idx: idx,
968                            completion_bytes_pos: pos,
969                        });
970                    }
971                }
972            }
973            None
974        }
975    }
976
977    pub fn logprobs(&self) -> &[Logprobs] {
978        &self.logprobs
979    }
980
981    pub fn return_logprobs(&self) -> bool {
982        self.return_logprobs
983    }
984
985    pub fn prompt_tokens(&self) -> usize {
986        self.prompt_len
987    }
988
989    pub fn stop_strings(&self) -> &[String] {
990        &self.stop_strings
991    }
992
993    /// Returns the delta between the last two decoded sequences
994    pub fn get_delta(
995        &mut self,
996    ) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
997        let new_decoded = self.peek_delta();
998        if matches!(new_decoded, Ok(Some(_))) {
999            self.stream_idx = self.completion_bytes.len();
1000        }
1001        new_decoded
1002    }
1003
1004    /// Peeks at the delta between the last two decoded sequences, but does not advance the stream index.
1005    pub fn peek_delta(&self) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
1006        let is_first = self.stream_idx == 0;
1007        let new_decoded = String::from_utf8_lossy(&self.completion_bytes[self.stream_idx..]);
1008        // Check if the sequence ends with valid utf8, if not skip it as it probably is a multi token sequence
1009        if new_decoded.ends_with('�') {
1010            return Ok(None);
1011        }
1012
1013        // The first token usually starts with a space. We don't want to add that to the delta.
1014        // Since we're using the completion_bytes, we need to take care of that ourselves.
1015        // Had we used HF's Tokenizer, it would have taken care of that for us.
1016        if is_first {
1017            return Ok(Some(new_decoded.trim_start().to_string()));
1018        }
1019        Ok(Some(new_decoded.to_string()))
1020    }
1021
1022    pub fn timestamp(&self) -> u128 {
1023        self.timestamp
1024    }
1025
1026    pub fn prompt_timestamp(&self) -> Option<u128> {
1027        self.prompt_timestamp
1028    }
1029
1030    fn update_time_info(&self) {
1031        let now = SystemTime::now()
1032            .duration_since(UNIX_EPOCH)
1033            .expect("Time travel has occurred!")
1034            .as_millis();
1035
1036        if let Some(ts) = self.prompt_timestamp {
1037            get_mut_group!(self).total_completion_time = now - ts;
1038            get_mut_group!(self).total_prompt_time = self.total_prompt_time.unwrap();
1039        }
1040
1041        get_mut_group!(self).total_time = now - self.timestamp;
1042
1043        get_mut_group!(self).total_prompt_toks = self.prompt_len;
1044        get_mut_group!(self).total_toks = self.len();
1045    }
1046
1047    pub fn add_image_choice_to_group(&self, choice: ImageChoice) {
1048        get_mut_group!(self).image_choices.push(choice);
1049    }
1050
1051    pub fn add_speech_pcm_to_group(&self, pcm: Arc<Vec<f32>>, rate: usize, channels: usize) {
1052        get_mut_group!(self).speech_pcms.push((pcm, rate, channels));
1053    }
1054
1055    pub fn add_choice_to_group(&self, choice: Choice) {
1056        get_mut_group!(self).choices.push(choice);
1057        self.update_time_info();
1058    }
1059
1060    pub fn add_raw_choice_to_group(&self, logit_chunks: Vec<Tensor>) {
1061        get_mut_group!(self)
1062            .raw_choices
1063            .push((logit_chunks, self.tokens.clone()));
1064        self.update_time_info();
1065    }
1066
1067    pub fn add_embedding_choice_to_group(&self, embedding: Vec<f32>) {
1068        get_mut_group!(self).embedding_choices.push(embedding);
1069        self.update_time_info();
1070    }
1071
1072    pub fn add_completion_choice_to_group(&self, mut choice: CompletionChoice) {
1073        choice.text = format!(
1074            "{}{}{}",
1075            self.prefix.as_deref().unwrap_or(""),
1076            choice.text,
1077            self.suffix.as_deref().unwrap_or("")
1078        );
1079        get_mut_group!(self)
1080            .completion_choices
1081            .push((self.cumulative_logprob, choice));
1082        self.update_time_info();
1083    }
1084
1085    pub fn get_response_index(&self) -> usize {
1086        self.response_index
1087    }
1088
1089    pub fn get_mut_group(&self) -> MutexGuard<'_, SequenceGroup> {
1090        get_mut_group!(self)
1091    }
1092
1093    pub fn add_streaming_chunk_choice_to_group(&self, chunk: ChunkChoice) {
1094        get_mut_group!(self).chat_streaming_chunks.push(chunk);
1095        self.update_time_info();
1096    }
1097
1098    pub fn add_streaming_completion_chunk_choice_to_group(&self, chunk: CompletionChunkChoice) {
1099        get_mut_group!(self).completion_streaming_chunks.push(chunk);
1100        self.update_time_info();
1101    }
1102
1103    pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
1104        self.multimodal.take_images()
1105    }
1106
1107    pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
1108        self.multimodal.clone_images()
1109    }
1110
1111    pub fn images(&self) -> Option<&[image::DynamicImage]> {
1112        self.multimodal.images()
1113    }
1114
1115    pub fn image_hashes(&self) -> Option<&[u64]> {
1116        self.multimodal.image_hashes()
1117    }
1118
1119    pub fn has_images(&self) -> bool {
1120        self.multimodal.has_images()
1121    }
1122
1123    pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
1124        self.multimodal.take_audios()
1125    }
1126
1127    pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
1128        self.multimodal.clone_audios()
1129    }
1130
1131    pub fn audios(&self) -> Option<&[AudioInput]> {
1132        self.multimodal.audios()
1133    }
1134
1135    pub fn audio_hashes(&self) -> Option<&[u64]> {
1136        self.multimodal.audio_hashes()
1137    }
1138
1139    pub fn has_audios(&self) -> bool {
1140        self.multimodal.has_audios()
1141    }
1142
1143    /// Keep these last n audios
1144    pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
1145        self.multimodal.keep_num_audios(audios_to_keep)
1146    }
1147
1148    /// Keep these last n images
1149    pub fn keep_num_images(&mut self, images_to_keep: usize) {
1150        self.multimodal.keep_num_images(images_to_keep)
1151    }
1152
1153    pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
1154        self.multimodal.image_gen_response_format()
1155    }
1156
1157    pub fn sequence_stepping_type(&self) -> &SeqStepType {
1158        &self.sequence_stepping_type
1159    }
1160
1161    pub fn get_diffusion_diffusion_params(&self) -> Option<DiffusionGenerationParams> {
1162        self.multimodal.diffusion_params()
1163    }
1164
1165    pub fn eos_tokens(&self) -> &[u32] {
1166        &self.eos_tokens
1167    }
1168
1169    // === Harmony Format Support ===
1170
1171    /// Enable Harmony format parsing for this sequence.
1172    /// Should be called when the model uses Harmony format (GPT-OSS models).
1173    pub fn enable_harmony_mode(&mut self) -> Result<(), anyhow::Error> {
1174        if self.harmony_context.is_none() {
1175            self.harmony_context = Some(HarmonyContext::new()?);
1176        }
1177        Ok(())
1178    }
1179
1180    /// Check if this sequence is in Harmony mode
1181    pub fn is_harmony_mode(&self) -> bool {
1182        self.harmony_context.is_some()
1183    }
1184
1185    /// Process a token through the Harmony parser (if enabled).
1186    /// Returns the Harmony delta if in Harmony mode.
1187    pub fn process_harmony_token(&mut self, token_id: u32) -> Option<crate::harmony::HarmonyDelta> {
1188        self.harmony_context
1189            .as_mut()
1190            .map(|ctx| ctx.process_token(token_id))
1191    }
1192
1193    /// Get the latest Harmony reasoning delta (for streaming).
1194    /// Returns None if not in Harmony mode or no new reasoning content.
1195    pub fn get_harmony_reasoning_delta(&mut self) -> Option<String> {
1196        self.harmony_context
1197            .as_mut()
1198            .and_then(|ctx| ctx.get_reasoning_delta())
1199    }
1200
1201    /// Get the latest Harmony final content delta (for streaming).
1202    /// Returns None if not in Harmony mode or no new final content.
1203    pub fn get_harmony_final_delta(&mut self) -> Option<String> {
1204        self.harmony_context
1205            .as_mut()
1206            .and_then(|ctx| ctx.get_final_delta())
1207    }
1208
1209    /// Get accumulated Harmony reasoning content (for non-streaming).
1210    /// Returns None if not in Harmony mode or no reasoning content.
1211    pub fn get_harmony_reasoning_content(&self) -> Option<String> {
1212        self.harmony_context
1213            .as_ref()
1214            .and_then(|ctx| ctx.reasoning_content())
1215    }
1216
1217    /// Get accumulated Harmony final content.
1218    /// Returns None if not in Harmony mode or no final content.
1219    pub fn get_harmony_final_content(&self) -> Option<String> {
1220        self.harmony_context
1221            .as_ref()
1222            .and_then(|ctx| ctx.final_content())
1223    }
1224
1225    /// Signal end of stream to the Harmony parser
1226    pub fn harmony_process_eos(&mut self) {
1227        if let Some(ref mut ctx) = self.harmony_context {
1228            ctx.process_eos();
1229        }
1230    }
1231
1232    /// Check if Harmony mode has detected any tool calls
1233    pub fn has_harmony_tool_calls(&self) -> bool {
1234        self.harmony_context
1235            .as_ref()
1236            .is_some_and(|ctx| ctx.has_tool_call())
1237    }
1238
1239    /// Get all Harmony tool calls (finalizes any pending tool call)
1240    pub fn get_harmony_tool_calls(&mut self) -> Vec<crate::harmony::HarmonyToolCall> {
1241        self.harmony_context
1242            .as_mut()
1243            .map(|ctx| ctx.finalize_tool_calls())
1244            .unwrap_or_default()
1245    }
1246}
1247
1248pub struct SequenceGroup {
1249    n_choices: usize, // The target number of choices to return. Can be decreased if an error is thrown.
1250    best_of: Option<usize>, // Top n seqs based on cumulative logprobs.
1251    pub total_prompt_toks: usize,
1252    pub total_toks: usize,
1253    pub total_prompt_time: u128,
1254    pub total_time: u128,
1255    pub total_completion_time: u128,
1256    choices: Vec<Choice>,
1257    image_choices: Vec<ImageChoice>,
1258    speech_pcms: Vec<(Arc<Vec<f32>>, usize, usize)>, // (pcm, rate, channels)
1259    raw_choices: Vec<(Vec<Tensor>, Vec<u32>)>,
1260    embedding_choices: Vec<Vec<f32>>,
1261    completion_choices: Vec<(f32, CompletionChoice)>,
1262    pub chat_streaming_chunks: Vec<ChunkChoice>,
1263    pub completion_streaming_chunks: Vec<CompletionChunkChoice>,
1264    pub is_streaming: bool,
1265    pub is_chat: bool,
1266}
1267
1268impl SequenceGroup {
1269    pub fn new(
1270        n_choices: usize,
1271        is_streaming: bool,
1272        is_chat: bool,
1273        best_of: Option<usize>,
1274    ) -> Self {
1275        Self {
1276            choices: Vec::new(),
1277            image_choices: Vec::new(),
1278            speech_pcms: Vec::new(),
1279            raw_choices: Vec::new(),
1280            embedding_choices: Vec::new(),
1281            completion_choices: Vec::new(),
1282            n_choices,
1283            total_prompt_toks: 0,
1284            total_toks: 0,
1285            total_prompt_time: 0,
1286            total_time: 0,
1287            total_completion_time: 0,
1288            chat_streaming_chunks: Vec::new(),
1289            completion_streaming_chunks: Vec::new(),
1290            is_streaming,
1291            is_chat,
1292            best_of,
1293        }
1294    }
1295
1296    pub fn get_choices(&self) -> &[Choice] {
1297        &self.choices
1298    }
1299
1300    /// This may apply the best_of.
1301    pub fn get_completion_choices(&self) -> Vec<CompletionChoice> {
1302        if let Some(best_of) = self.best_of {
1303            let mut choices = self.completion_choices.clone();
1304            // Sort by descending logprobs
1305            choices.sort_by(|a, b| b.0.partial_cmp(&a.0).expect("No ordering."));
1306            choices
1307                .into_iter()
1308                .take(best_of)
1309                .map(|(_, x)| x)
1310                .collect::<Vec<_>>()
1311        } else {
1312            self.completion_choices
1313                .clone()
1314                .into_iter()
1315                .map(|(_, x)| x)
1316                .collect::<Vec<_>>()
1317        }
1318    }
1319
1320    pub fn get_image_choices(&self) -> &[ImageChoice] {
1321        &self.image_choices
1322    }
1323
1324    pub fn get_usage(&self) -> Usage {
1325        #[allow(clippy::cast_precision_loss)]
1326        Usage {
1327            completion_tokens: self.total_toks.saturating_sub(self.total_prompt_toks),
1328            prompt_tokens: self.total_prompt_toks,
1329            total_tokens: self.total_toks,
1330            avg_tok_per_sec: (self.total_toks as f32 / self.total_time as f32) * 1000.,
1331            avg_prompt_tok_per_sec: (self.total_prompt_toks as f32 / self.total_prompt_time as f32)
1332                * 1000.,
1333            avg_compl_tok_per_sec: (self.total_toks.saturating_sub(self.total_prompt_toks) as f32
1334                / self.total_completion_time as f32)
1335                * 1000.,
1336            total_time_sec: self.total_time as f32 / 1000.,
1337            total_completion_time_sec: self.total_completion_time as f32 / 1000.,
1338            total_prompt_time_sec: self.total_prompt_time as f32 / 1000.,
1339        }
1340    }
1341
1342    pub async fn maybe_send_chat_done_response(
1343        &self,
1344        response: ChatCompletionResponse,
1345        sender: Sender<Response>,
1346    ) -> Result<(), SendError<Response>> {
1347        if self.choices.len() == self.n_choices {
1348            sender.send(Response::Done(response)).await?;
1349        }
1350
1351        Ok(())
1352    }
1353
1354    pub async fn maybe_send_raw_done_response(
1355        &self,
1356        sender: Sender<Response>,
1357    ) -> Result<(), SendError<Response>> {
1358        if self.raw_choices.len() == self.n_choices {
1359            assert_eq!(self.raw_choices.len(), 1);
1360            let (logits_chunks, tokens) = self.raw_choices[0].clone();
1361            sender
1362                .send(Response::Raw {
1363                    logits_chunks,
1364                    tokens,
1365                })
1366                .await?;
1367        }
1368
1369        Ok(())
1370    }
1371
1372    pub async fn maybe_send_embedding_done_response(
1373        &self,
1374        sender: Sender<Response>,
1375    ) -> Result<(), SendError<Response>> {
1376        if self.embedding_choices.len() == self.n_choices {
1377            assert_eq!(self.embedding_choices.len(), 1);
1378            let embeddings = self.embedding_choices[0].clone();
1379            let prompt_tokens = self.total_prompt_toks;
1380            let total_tokens = self.total_toks;
1381            sender
1382                .send(Response::Embeddings {
1383                    embeddings,
1384                    prompt_tokens,
1385                    total_tokens,
1386                })
1387                .await?;
1388        }
1389
1390        Ok(())
1391    }
1392
1393    pub async fn maybe_send_image_gen_response(
1394        &self,
1395        response: ImageGenerationResponse,
1396        sender: Sender<Response>,
1397    ) -> Result<(), SendError<Response>> {
1398        if self.image_choices.len() == self.n_choices {
1399            sender.send(Response::ImageGeneration(response)).await?;
1400        }
1401
1402        Ok(())
1403    }
1404
1405    pub async fn maybe_send_speech_response(
1406        &self,
1407        sender: Sender<Response>,
1408    ) -> Result<(), SendError<Response>> {
1409        assert_eq!(self.speech_pcms.len(), 1);
1410
1411        let (pcm, rate, channels) = self.speech_pcms[0].clone();
1412        sender
1413            .send(Response::Speech {
1414                pcm,
1415                rate,
1416                channels,
1417            })
1418            .await?;
1419
1420        Ok(())
1421    }
1422
1423    pub async fn maybe_send_streaming_response(
1424        &mut self,
1425        seq: &Sequence,
1426        model: String,
1427        usage_opt: Option<Usage>,
1428    ) -> Result<(), Box<SendError<Response>>> {
1429        if self.chat_streaming_chunks.len() == self.n_choices && self.is_streaming {
1430            let mut swap_streaming_chunks = vec![];
1431
1432            std::mem::swap(&mut swap_streaming_chunks, &mut self.chat_streaming_chunks);
1433
1434            seq.responder()
1435                .send(Response::Chunk(ChatCompletionChunkResponse {
1436                    id: seq.id.to_string(),
1437                    choices: swap_streaming_chunks,
1438                    created: seq.timestamp,
1439                    model: model.clone(),
1440                    system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1441                    object: "chat.completion.chunk".to_string(),
1442                    usage: usage_opt,
1443                }))
1444                .await?;
1445        } else if self.completion_streaming_chunks.len() == self.n_choices && self.is_streaming {
1446            let mut swap_streaming_chunks = vec![];
1447
1448            std::mem::swap(
1449                &mut swap_streaming_chunks,
1450                &mut self.completion_streaming_chunks,
1451            );
1452
1453            seq.responder()
1454                .send(Response::CompletionChunk(CompletionChunkResponse {
1455                    id: seq.id.to_string(),
1456                    choices: swap_streaming_chunks,
1457                    created: seq.timestamp,
1458                    model: model.clone(),
1459                    system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1460                    object: "text_completion".to_string(),
1461                }))
1462                .await?;
1463        }
1464        Ok(())
1465    }
1466
1467    pub async fn maybe_send_completion_done_response(
1468        &self,
1469        response: CompletionResponse,
1470        sender: Sender<Response>,
1471    ) -> Result<(), Box<SendError<Response>>> {
1472        if self.completion_choices.len() == self.n_choices {
1473            sender.send(Response::CompletionDone(response)).await?;
1474        }
1475        Ok(())
1476    }
1477}