1use crate::{
2 get_mut_arcmutex, get_mut_group,
3 harmony::HarmonyContext,
4 paged_attention::BlockRef,
5 pipeline::{text_models_inputs_processor::PagedAttentionMeta, LayerCaches},
6 response::{ChatCompletionChunkResponse, Choice, ChunkChoice, Response, SYSTEM_FINGERPRINT},
7 sampler::{Logprobs, Sampler},
8 think_tags::ThinkTagContext,
9 AudioInput, ChatCompletionResponse, Usage,
10};
11use crate::{
12 paged_attention::{BlockEngineSequence, LogicalTokenBlock},
13 pipeline::{DiffusionGenerationParams, KvCache},
14 response::CompletionChoice,
15 tools::ToolCallingMatcher,
16 CompletionChunkChoice, CompletionChunkResponse, CompletionResponse, ImageChoice,
17 ImageGenerationResponse, ImageGenerationResponseFormat,
18};
19use candle_core::Tensor;
20use std::{
21 fmt::Display,
22 hash::{DefaultHasher, Hash, Hasher},
23 sync::{Arc, RwLock},
24 time::{SystemTime, UNIX_EPOCH},
25};
26use tokio::sync::{
27 mpsc::{error::SendError, Sender},
28 Mutex, MutexGuard,
29};
30
31#[derive(Clone, Copy, PartialEq, Debug)]
32pub enum StopReason {
33 Eos,
34 StopTok(u32),
35 Length(usize),
36 ModelLength(usize),
37 StopString {
38 stop_string_idx: usize,
39 completion_bytes_pos: usize,
40 },
41 Canceled,
42 GeneratedImage,
43 GeneratedSpeech,
44 ToolCalls,
45}
46
47impl Display for StopReason {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 StopReason::Eos => write!(f, "stop"),
51 StopReason::Length(_) | StopReason::ModelLength(_) => write!(f, "length"),
52 StopReason::StopTok(_) | StopReason::StopString { .. } => write!(f, "stop"),
53 StopReason::Canceled => write!(f, "canceled"),
54 StopReason::GeneratedImage => write!(f, "generated_image"),
55 StopReason::GeneratedSpeech => write!(f, "generated_speech"),
56 StopReason::ToolCalls => write!(f, "tool_calls"),
57 }
58 }
59}
60
61#[derive(Clone, Copy, PartialEq, Debug)]
62pub enum SequenceState {
63 Done(StopReason),
64 RunningPrompt,
65 RunningCompletion,
66 Waiting,
67 Error,
68 RunningPrefillPrompt,
69 FinishedAborted,
71 FinishedIgnored,
72 Swapped,
73}
74
75pub enum SequenceRecognizer {
76 Llguidance(Box<llguidance::Matcher>),
77 None,
78}
79
80enum SequenceCustomMetadata {
81 PagedAttention {
82 logical_token_blocks: Vec<LogicalTokenBlock>,
83 physical_blocks_prefill: Option<Vec<BlockRef>>,
84 block_size: usize,
85 },
86 None,
87}
88
89macro_rules! blocks_to_add_new_tok {
90 ($logical_token_blocks:expr) => {{
91 let last = $logical_token_blocks.last();
92 if !last.is_some_and(|last| last.is_full() || last.is_empty()) {
93 0
95 } else {
96 1
97 }
98 }};
99}
100
101pub(crate) fn util_append_token_to_blocks(
102 tok: usize,
103 logical_token_blocks: &mut Vec<LogicalTokenBlock>,
104 block_size: usize,
105) {
106 let last = logical_token_blocks.last_mut();
107 match last {
108 Some(last) => {
109 last.append_token_id(tok);
110 }
111 None => {
112 logical_token_blocks.push(LogicalTokenBlock::new(block_size));
113 logical_token_blocks
115 .last_mut()
116 .expect("just pushed a block, vector cannot be empty")
117 .append_token_id(tok);
118 }
119 }
120 if logical_token_blocks
123 .last()
124 .expect("logical_token_blocks should not be empty after appending")
125 .is_full()
126 {
127 logical_token_blocks.push(LogicalTokenBlock::new(block_size));
128 }
129}
130
131impl SequenceCustomMetadata {
132 fn append_token_to_blocks(&mut self, tok: usize) {
133 match self {
134 Self::PagedAttention {
135 logical_token_blocks,
136 physical_blocks_prefill: _,
137 block_size,
138 } => {
139 util_append_token_to_blocks(tok, logical_token_blocks, *block_size);
140 }
141 Self::None => (),
142 }
143 }
144
145 fn pop_token_from_blocks(&mut self) {
146 match self {
147 Self::PagedAttention {
148 logical_token_blocks,
149 physical_blocks_prefill: _,
150 block_size: _,
151 } => {
152 let last = logical_token_blocks.last_mut().unwrap();
153 last.pop_token();
154 }
155 Self::None => (),
156 }
157 }
158
159 fn append_tokens_to_blocks(&mut self, toks: Vec<usize>) {
160 for tok in toks {
161 self.append_token_to_blocks(tok);
162 }
163 }
164
165 fn remove_tokens_from_blocks(&mut self, n: usize) {
166 for _ in 0..n {
167 self.pop_token_from_blocks();
168 }
169 }
170}
171
172#[derive(Clone, Copy)]
173pub enum SeqStepType {
174 PromptAndDecode,
175 OneShot,
176}
177
178pub struct SequenceImages {
179 images: Vec<image::DynamicImage>,
180 hashes: Vec<u64>,
181}
182
183#[derive(Clone)]
184pub struct SequenceAudios {
185 audios: Vec<AudioInput>,
186 hashes: Vec<u64>,
187}
188
189impl SequenceAudios {
190 fn new(input_audios: Vec<AudioInput>) -> Self {
191 let hashes = input_audios.iter().map(|a| {
192 let mut hasher = DefaultHasher::new();
193 for s in &a.samples {
194 s.to_bits().hash(&mut hasher);
195 }
196 a.sample_rate.hash(&mut hasher);
197 hasher.finish()
198 });
199 Self {
200 hashes: hashes.collect(),
201 audios: input_audios,
202 }
203 }
204
205 fn clone_audios(&self) -> Vec<AudioInput> {
206 self.audios.clone()
207 }
208
209 fn audios(&self) -> &[AudioInput] {
210 &self.audios
211 }
212
213 fn audios_mut(&mut self) -> &mut Vec<AudioInput> {
214 &mut self.audios
215 }
216
217 fn hashes(&self) -> &[u64] {
218 &self.hashes
219 }
220
221 fn keep_num_audios(&mut self, audios_to_keep: usize) {
222 if self.audios.len() > audios_to_keep {
223 let start = self.audios.len() - audios_to_keep;
224 self.audios = self.audios[start..].to_vec();
225 }
228 }
229}
230
231impl SequenceImages {
232 fn new(input_images: Vec<image::DynamicImage>) -> Self {
233 let hashes = input_images.iter().map(|x| {
234 let mut hasher = DefaultHasher::new();
235 x.as_bytes().hash(&mut hasher);
236 hasher.finish()
237 });
238 Self {
239 hashes: hashes.collect(),
240 images: input_images,
241 }
242 }
243
244 fn clone_images(&self) -> Vec<image::DynamicImage> {
245 self.images.clone()
246 }
247
248 fn images(&self) -> &[image::DynamicImage] {
249 &self.images
250 }
251
252 fn images_mut(&mut self) -> &mut Vec<image::DynamicImage> {
253 &mut self.images
254 }
255
256 fn hashes(&self) -> &[u64] {
257 &self.hashes
258 }
259
260 fn keep_num_images(&mut self, images_to_keep: usize) {
261 if self.images.len() > images_to_keep {
262 let start = self.images.len() - images_to_keep;
263 self.images = self.images[start..].to_vec();
264 }
267 }
268}
269
270pub struct MultimodalData {
272 pub input_images: Option<SequenceImages>,
273 pub input_audios: Option<SequenceAudios>,
274 pub cached_pixel_values: Option<Tensor>,
275 pub cached_img_thw: Option<Tensor>,
276 pub cached_vid_thw: Option<Tensor>,
277 pub has_changed_prompt: bool,
278 pub image_gen_response_format: Option<ImageGenerationResponseFormat>,
279 pub diffusion_params: Option<DiffusionGenerationParams>,
280}
281
282impl MultimodalData {
283 pub fn new(
284 input_images: Option<Vec<image::DynamicImage>>,
285 input_audios: Option<Vec<AudioInput>>,
286 image_gen_response_format: Option<ImageGenerationResponseFormat>,
287 diffusion_params: Option<DiffusionGenerationParams>,
288 ) -> Self {
289 MultimodalData {
290 input_images: input_images.map(SequenceImages::new),
291 input_audios: input_audios.map(SequenceAudios::new),
292 cached_pixel_values: None,
293 cached_img_thw: None,
294 cached_vid_thw: None,
295 has_changed_prompt: false,
296 image_gen_response_format,
297 diffusion_params,
298 }
299 }
300
301 pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
302 if self.has_changed_prompt {
303 if let Some(input_images) = self.input_images.as_mut() {
304 let mut images = Vec::new();
305 std::mem::swap(&mut images, input_images.images_mut());
306 Some(images)
307 } else {
308 None
309 }
310 } else {
311 self.input_images.as_ref().map(|imgs| imgs.clone_images())
312 }
313 }
314
315 pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
316 self.input_images.as_ref().map(|imgs| imgs.clone_images())
317 }
318
319 pub fn images(&self) -> Option<&[image::DynamicImage]> {
320 self.input_images.as_ref().map(|imgs| imgs.images())
321 }
322
323 pub fn image_hashes(&self) -> Option<&[u64]> {
324 self.input_images.as_ref().map(|imgs| imgs.hashes())
325 }
326
327 pub fn has_images(&self) -> bool {
328 self.input_images
329 .as_ref()
330 .is_some_and(|imgs| !imgs.images().is_empty())
331 }
332
333 pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
334 if self.has_changed_prompt {
335 if let Some(input_audios) = self.input_audios.as_mut() {
336 let mut audios = Vec::new();
337 std::mem::swap(&mut audios, input_audios.audios_mut());
338 Some(audios)
339 } else {
340 None
341 }
342 } else {
343 self.input_audios.as_ref().map(|imgs| imgs.clone_audios())
344 }
345 }
346
347 pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
348 self.input_audios.as_ref().map(|a| a.clone_audios())
349 }
350
351 pub fn audios(&self) -> Option<&[AudioInput]> {
352 self.input_audios.as_ref().map(|a| a.audios())
353 }
354
355 pub fn audio_hashes(&self) -> Option<&[u64]> {
356 self.input_audios.as_ref().map(|a| a.hashes())
357 }
358
359 pub fn has_audios(&self) -> bool {
360 self.input_audios
361 .as_ref()
362 .is_some_and(|a| !a.audios().is_empty())
363 }
364
365 pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
366 if let Some(auds) = self.input_audios.as_mut() {
367 auds.keep_num_audios(audios_to_keep)
368 }
369 }
370
371 pub fn keep_num_images(&mut self, images_to_keep: usize) {
372 if let Some(imgs) = self.input_images.as_mut() {
373 imgs.keep_num_images(images_to_keep)
374 }
375 }
376
377 pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
378 self.image_gen_response_format
379 }
380
381 pub fn diffusion_params(&self) -> Option<DiffusionGenerationParams> {
382 self.diffusion_params.clone()
383 }
384}
385
386pub struct Sequence {
387 id: usize,
389 prompt_len: usize,
390 max_len: Option<usize>,
391 timestamp: u128,
392 sampler: Arc<Sampler>,
393 stop_tokens: Vec<u32>,
394 stop_strings: Vec<String>,
395 return_logprobs: bool,
396 responder: Sender<Response>,
397 response_index: usize,
398 creation_time: u64,
399 prompt: String,
400 sequence_stepping_type: SeqStepType,
401 pub(crate) return_raw_logits: bool,
402 token_offset: usize,
403 eos_tokens: Vec<u32>,
404
405 pub multimodal: MultimodalData,
407
408 suffix: Option<String>,
410 prefix: Option<String>,
411
412 is_tmp: bool,
414
415 prefill_prompt_toks: Option<Vec<u32>>,
417 prefix_cache_len: usize,
420
421 normal_cache: Vec<Option<KvCache>>,
423 normal_draft_cache: Vec<Option<KvCache>>,
424 scaling_cache: Option<Tensor>,
425 cache: LayerCaches,
426 draft_cache: LayerCaches,
427 xlora_cache: Option<LayerCaches>,
428 mamba_state_idx: Option<usize>,
430
431 seq_preallocated_cache: Option<(Tensor, Tensor)>,
433
434 tokens: Vec<u32>,
436 logprobs: Vec<Logprobs>,
437 cumulative_logprob: f32,
438 last_logprob: f32,
439 last_completion_bytes_len: usize,
440 last_is_done: Option<StopReason>,
441 completion_bytes: Vec<u8>,
442 stream_idx: usize,
443 pub recognizer: SequenceRecognizer,
444 scheduling_urgency: usize, waitlisted_count: usize, pub prompt_tok_per_sec: f32,
449 pub prompt_timestamp: Option<u128>,
450 pub total_prompt_time: Option<u128>,
451 group: Arc<Mutex<SequenceGroup>>,
452 state: RwLock<SequenceState>,
453
454 custom_metadata: SequenceCustomMetadata,
456
457 pub tools: Option<Arc<ToolCallingMatcher>>,
459
460 harmony_context: Option<HarmonyContext>,
462
463 think_tag_context: Option<ThinkTagContext>,
465}
466
467impl BlockEngineSequence for Sequence {
468 fn blocks_to_add_new_tok(&self) -> usize {
469 match &self.custom_metadata {
470 SequenceCustomMetadata::PagedAttention {
471 logical_token_blocks,
472 physical_blocks_prefill: _,
473 block_size: _,
474 } => {
475 blocks_to_add_new_tok!(logical_token_blocks)
476 }
477 SequenceCustomMetadata::None => unreachable!(),
478 }
479 }
480
481 fn get_id(&self) -> usize {
482 self.id
483 }
484
485 fn logical_token_blocks(&self) -> &[LogicalTokenBlock] {
486 match &self.custom_metadata {
487 SequenceCustomMetadata::PagedAttention {
488 logical_token_blocks,
489 physical_blocks_prefill: _,
490 block_size: _,
491 } => logical_token_blocks,
492 SequenceCustomMetadata::None => unreachable!(),
493 }
494 }
495
496 fn take_physical_blocks_prefill(&mut self) -> Option<Vec<BlockRef>> {
497 match &mut self.custom_metadata {
498 SequenceCustomMetadata::PagedAttention {
499 logical_token_blocks: _,
500 physical_blocks_prefill,
501 block_size: _,
502 } => physical_blocks_prefill.take(),
503 SequenceCustomMetadata::None => None,
504 }
505 }
506
507 fn increment_waitlist_count(&mut self) -> usize {
508 let prev = self.waitlisted_count;
509 self.waitlisted_count += 1;
510 prev
511 }
512
513 fn set_prefix_cache_len(&mut self, len: usize) {
514 self.prefix_cache_len = len;
515 }
516
517 fn block_size(&self) -> usize {
518 match &self.custom_metadata {
519 SequenceCustomMetadata::PagedAttention { block_size, .. } => *block_size,
520 SequenceCustomMetadata::None => unreachable!(),
521 }
522 }
523}
524
525impl Sequence {
526 #[allow(clippy::too_many_arguments)]
527 pub fn new_waiting(
528 tokens: Vec<u32>,
529 prompt: String,
530 id: usize,
531 timestamp: u128,
532 layers: usize,
533 responder: Sender<Response>,
534 sampler: Sampler,
535 stop_tokens: Vec<u32>,
536 stop_strings: Vec<String>,
537 max_len: Option<usize>,
538 return_logprobs: bool,
539 is_xlora: bool,
540 group: Arc<Mutex<SequenceGroup>>,
541 response_index: usize,
542 creation_time: u64,
543 recognizer: SequenceRecognizer,
544 suffix: Option<String>,
545 prefix: Option<String>,
546 input_images: Option<Vec<image::DynamicImage>>,
547 input_audios: Option<Vec<AudioInput>>,
548 block_size: Option<usize>,
550 tools: Option<Arc<ToolCallingMatcher>>,
552 image_gen_response_format: Option<ImageGenerationResponseFormat>,
553 sequence_stepping_type: SeqStepType,
554 diffusion_params: Option<DiffusionGenerationParams>,
555 seq_preallocated_cache: Option<(Tensor, Tensor)>,
557 return_raw_logits: bool,
559 eos_tokens: Vec<u32>,
560 ) -> Self {
561 let prompt_len = tokens.len();
562 let mut custom_metadata = if let Some(block_size) = block_size {
563 SequenceCustomMetadata::PagedAttention {
564 logical_token_blocks: Vec::new(),
565 physical_blocks_prefill: None,
566 block_size,
567 }
568 } else {
569 SequenceCustomMetadata::None
570 };
571 custom_metadata
572 .append_tokens_to_blocks(tokens.iter().map(|x| *x as usize).collect::<Vec<_>>());
573 Self {
574 tokens,
575 prompt,
576 logprobs: Vec::new(),
577 prompt_len,
578 id,
579 timestamp,
580 state: RwLock::new(SequenceState::Waiting),
581 normal_cache: vec![None; layers],
582 normal_draft_cache: vec![None; layers],
583 cache: vec![None; layers],
584 draft_cache: vec![None; layers],
585 xlora_cache: if is_xlora {
586 Some(vec![None; layers])
587 } else {
588 None
589 },
590 mamba_state_idx: None,
591 seq_preallocated_cache,
592 responder,
593 sampler: sampler.into(),
594 stop_tokens,
595 stop_strings,
596 max_len,
597 return_logprobs,
598 prompt_tok_per_sec: 0.,
599 prompt_timestamp: None,
600 group,
601 scaling_cache: None,
602 response_index,
603 creation_time,
604 recognizer,
605 prefill_prompt_toks: None,
606 prefix_cache_len: 0,
607 suffix,
608 prefix,
609 cumulative_logprob: 0.,
610 completion_bytes: Vec::new(),
611 stream_idx: 0,
612 last_completion_bytes_len: 0,
613 last_logprob: 0.0,
614 last_is_done: None,
615 is_tmp: false,
616 scheduling_urgency: 0,
617 multimodal: MultimodalData::new(
619 input_images,
620 input_audios,
621 image_gen_response_format,
622 diffusion_params,
623 ),
624 custom_metadata,
625 tools,
626 sequence_stepping_type,
627 return_raw_logits,
628 token_offset: 0,
629 eos_tokens,
630 total_prompt_time: None,
631 waitlisted_count: 0,
632 harmony_context: None,
633 think_tag_context: None,
634 }
635 }
636
637 pub fn add_urgency(mut self) -> Self {
638 self.scheduling_urgency += 1;
639 self
640 }
641
642 pub fn reset_urgency(mut self) -> Self {
643 self.scheduling_urgency = 0;
644 self
645 }
646
647 pub fn compute_priority(&self) -> f64 {
651 #![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
652 (self.scheduling_urgency as f64) + (self.len() as f64).log2()
653 }
654
655 pub fn prefill_v2_normal(
656 mut self,
657 cache: Vec<Option<KvCache>>,
658 toks: Vec<u32>,
659 offset: usize,
660 ) -> Self {
661 self.normal_cache = cache;
662 self.prefill_prompt_toks = Some(toks);
663 self.set_state(SequenceState::RunningPrefillPrompt);
664 self.token_offset = offset;
665 self
666 }
667
668 pub fn prefill_v2_paged(
669 mut self,
670 logical_blocks: Vec<LogicalTokenBlock>,
671 physical_blocks: Vec<BlockRef>,
672 toks: Vec<u32>,
673 offset: usize,
674 ) -> Self {
675 self.prefill_prompt_toks = Some(toks);
676 self.set_state(SequenceState::RunningPrefillPrompt);
677 self.token_offset = offset;
678
679 if let SequenceCustomMetadata::PagedAttention {
680 logical_token_blocks,
681 physical_blocks_prefill,
682 block_size: _,
683 } = &mut self.custom_metadata
684 {
685 *logical_token_blocks = logical_blocks;
686 *physical_blocks_prefill = Some(physical_blocks);
687 }
688
689 self
690 }
691
692 pub fn len(&self) -> usize {
694 if let Some(toks) = &self.prefill_prompt_toks {
695 return toks.len();
696 }
697 if self.is_tmp {
698 return self.tokens.len();
699 }
700 if self.xlora_cache.as_ref().is_some_and(|c| c[0].is_some()) {
702 self.xlora_cache.as_ref().unwrap()[0]
703 .as_ref()
704 .unwrap()
705 .0
706 .dims()[2]
707 + 1
708 } else if let Some((_, x)) = &self.cache[0] {
709 x.dims()[2] + 1
710 } else {
711 self.tokens.len()
712 }
713 }
714
715 pub fn id(&self) -> &usize {
716 &self.id
717 }
718
719 pub fn is_running(&self) -> bool {
720 matches!(
721 *self.state.read().unwrap(),
722 SequenceState::RunningCompletion | SequenceState::RunningPrompt )
724 }
725
726 pub fn is_completion(&self) -> bool {
727 matches!(
728 *self.state.read().unwrap(),
729 SequenceState::RunningCompletion
730 )
731 }
732
733 pub fn is_prompt(&self) -> bool {
734 matches!(
735 *self.state.read().unwrap(),
736 SequenceState::RunningPrompt | SequenceState::RunningPrefillPrompt
737 )
738 }
739
740 pub fn is_waiting(&self) -> bool {
741 matches!(*self.state.read().unwrap(), SequenceState::Waiting)
742 }
743
744 pub fn is_finished_paged_attn(&self) -> bool {
745 matches!(
746 *self.state.read().unwrap(),
747 SequenceState::FinishedAborted
748 | SequenceState::FinishedIgnored
749 | SequenceState::Done(_)
750 )
751 }
752
753 pub fn get_toks(&self) -> &[u32] {
754 if let Some(toks) = &self.prefill_prompt_toks {
755 return toks;
756 }
757 &self.tokens
758 }
759
760 pub fn get_initial_prompt(&self) -> &str {
761 &self.prompt
762 }
763
764 pub fn set_initial_prompt(&mut self, new: String) {
765 self.prompt = new;
766 }
767
768 pub fn token_offset(&self) -> usize {
769 self.token_offset
770 }
771
772 pub fn prefix_cache_len(&self) -> usize {
775 self.prefix_cache_len
776 }
777
778 pub fn set_prefix_cache_len(&mut self, len: usize) {
780 self.prefix_cache_len = len;
781 }
782
783 pub(crate) fn set_toks_and_reallocate(
785 &mut self,
786 toks: Vec<u32>,
787 paged_attn_metadata: Option<&mut PagedAttentionMeta>,
788 ) {
789 self.tokens.clone_from(&toks);
790 self.prompt_len = self.tokens.len();
791 match &mut self.custom_metadata {
793 SequenceCustomMetadata::PagedAttention {
794 logical_token_blocks,
795 physical_blocks_prefill: _,
796 block_size: _,
797 } => {
798 logical_token_blocks.clear();
799 }
800 SequenceCustomMetadata::None => (),
801 }
802 self.custom_metadata
803 .append_tokens_to_blocks(toks.iter().map(|x| *x as usize).collect::<Vec<_>>());
804
805 if let Some(metadata) = paged_attn_metadata {
806 get_mut_arcmutex!(metadata.block_engine).free_sequence(*self.id());
808 get_mut_arcmutex!(metadata.block_engine).allocate(self);
809 }
810 }
811
812 pub fn completion_bytes(&self) -> &[u8] {
813 &self.completion_bytes
814 }
815
816 pub fn preallocated_cache(&self) -> Option<&(Tensor, Tensor)> {
817 self.seq_preallocated_cache.as_ref()
818 }
819
820 pub fn normal_cache(&mut self) -> &mut Vec<Option<KvCache>> {
821 &mut self.normal_cache
822 }
823
824 pub fn normal_draft_cache(&mut self) -> &mut Vec<Option<KvCache>> {
825 &mut self.normal_draft_cache
826 }
827
828 pub fn cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
829 &mut self.cache
830 }
831
832 pub fn draft_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
833 &mut self.draft_cache
834 }
835
836 pub fn xlora_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
837 self.xlora_cache.as_mut().expect("No X-LoRA cache.")
838 }
839
840 pub fn scaling_cache(&mut self) -> &mut Option<Tensor> {
841 &mut self.scaling_cache
842 }
843
844 pub fn mamba_state_idx(&self) -> Option<usize> {
845 self.mamba_state_idx
846 }
847
848 pub fn set_mamba_state_idx(&mut self, idx: Option<usize>) {
849 self.mamba_state_idx = idx;
850 }
851
852 pub fn is_xlora(&self) -> bool {
853 self.xlora_cache.is_some()
854 }
855
856 pub fn sampler(&mut self) -> Arc<Sampler> {
857 self.sampler.clone()
858 }
859
860 pub fn set_prefill_toks(&mut self, toks: Vec<u32>) {
862 self.prefill_prompt_toks = Some(toks)
863 }
864
865 pub fn reset_prefill_toks(&mut self) {
867 self.prefill_prompt_toks = None
868 }
869
870 pub(crate) fn add_tmp_tok(&mut self, tok: u32) {
872 self.is_tmp = true;
873 self.tokens.push(tok);
874 self.custom_metadata.append_token_to_blocks(tok as usize);
876 }
877
878 pub(crate) fn remove_tmp_tok(&mut self, n: usize) {
880 self.is_tmp = false;
881 self.tokens.truncate(self.tokens.len() - n);
882 self.custom_metadata.remove_tokens_from_blocks(n);
884 }
885
886 pub fn add_token(
887 &mut self,
888 tok: Logprobs,
889 completion_bytes: Vec<u8>,
890 is_done: &Option<StopReason>,
891 ) {
892 let stopped_by_token = matches!(
893 is_done,
894 Some(StopReason::Eos) | Some(StopReason::StopTok(_))
895 );
896 if !stopped_by_token {
897 self.completion_bytes.extend_from_slice(&completion_bytes);
901 self.last_completion_bytes_len = completion_bytes.len();
902 }
903 self.last_logprob = tok.logprob;
904 self.last_is_done = *is_done;
905
906 self.custom_metadata
907 .append_token_to_blocks(tok.token as usize);
908
909 if let Some(ref mut harmony_ctx) = self.harmony_context {
911 let _ = harmony_ctx.process_token(tok.token);
912 }
913
914 if let Some(ref mut think_ctx) = self.think_tag_context {
916 if !stopped_by_token {
917 think_ctx.process_bytes(&completion_bytes);
919 }
920 }
921
922 self.cumulative_logprob += tok.logprob;
923 self.tokens.push(tok.token);
924 self.logprobs.push(tok);
925 self.reset_prefill_toks();
926 }
927
928 pub fn responder(&self) -> Sender<Response> {
929 self.responder.clone()
930 }
931
932 pub fn creation_time(&self) -> u64 {
933 self.creation_time
934 }
935
936 pub fn set_state(&self, state: SequenceState) {
937 if matches!(state, SequenceState::Error) {
938 let mut group = get_mut_group!(self);
939 group.n_choices = group.n_choices.saturating_sub(1);
940 }
941 *self.state.write().unwrap() = state;
942 }
943
944 pub fn getstate(&self) -> SequenceState {
945 *self.state.read().unwrap()
946 }
947
948 pub fn is_done(
949 &self,
950 tok: u32,
951 eos_tok: Option<&[u32]>,
952 max_model_len: usize,
953 ) -> Option<StopReason> {
954 let is_eos = match eos_tok {
955 Some(eos_tok) => eos_tok.contains(&tok),
956 None => false,
957 };
958 if is_eos {
959 Some(StopReason::Eos)
960 } else if matches!(
961 &*self.state.read().unwrap(),
962 SequenceState::Done(StopReason::Canceled)
963 ) {
964 Some(StopReason::Canceled)
965 } else if self.stop_tokens.contains(&tok) {
966 Some(StopReason::StopTok(tok))
967 } else if self.max_len.is_some()
968 && self.tokens.len().saturating_sub(self.prompt_len) + 1 >= self.max_len.unwrap()
969 {
970 Some(StopReason::Length(self.max_len.unwrap()))
972 } else if self.tokens.len().saturating_sub(self.prompt_len) >= max_model_len {
973 Some(StopReason::ModelLength(max_model_len))
974 } else {
975 if !self.stop_strings.is_empty() {
976 for (idx, s) in self.stop_strings.iter().enumerate() {
977 if let Some(pos) = galil_seiferas::gs_find(&self.completion_bytes, s.as_bytes())
978 {
979 return Some(StopReason::StopString {
980 stop_string_idx: idx,
981 completion_bytes_pos: pos,
982 });
983 }
984 }
985 }
986 None
987 }
988 }
989
990 pub fn logprobs(&self) -> &[Logprobs] {
991 &self.logprobs
992 }
993
994 pub fn return_logprobs(&self) -> bool {
995 self.return_logprobs
996 }
997
998 pub fn prompt_tokens(&self) -> usize {
999 self.prompt_len
1000 }
1001
1002 pub fn stop_strings(&self) -> &[String] {
1003 &self.stop_strings
1004 }
1005
1006 pub fn get_delta(
1008 &mut self,
1009 ) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
1010 let new_decoded = self.peek_delta();
1011 if matches!(new_decoded, Ok(Some(_))) {
1012 self.stream_idx = self.completion_bytes.len();
1013 }
1014 new_decoded
1015 }
1016
1017 pub fn peek_delta(&self) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
1019 let is_first = self.stream_idx == 0;
1020 let new_decoded = String::from_utf8_lossy(&self.completion_bytes[self.stream_idx..]);
1021 if new_decoded.ends_with('�') {
1023 return Ok(None);
1024 }
1025
1026 if is_first {
1030 return Ok(Some(new_decoded.trim_start().to_string()));
1031 }
1032 Ok(Some(new_decoded.to_string()))
1033 }
1034
1035 pub fn timestamp(&self) -> u128 {
1036 self.timestamp
1037 }
1038
1039 pub fn prompt_timestamp(&self) -> Option<u128> {
1040 self.prompt_timestamp
1041 }
1042
1043 fn update_time_info(&self) {
1044 let now = SystemTime::now()
1045 .duration_since(UNIX_EPOCH)
1046 .expect("Time travel has occurred!")
1047 .as_millis();
1048
1049 if let Some(ts) = self.prompt_timestamp {
1050 get_mut_group!(self).total_completion_time = now - ts;
1051 get_mut_group!(self).total_prompt_time = self.total_prompt_time.unwrap();
1052 }
1053
1054 get_mut_group!(self).total_time = now - self.timestamp;
1055
1056 get_mut_group!(self).total_prompt_toks = self.prompt_len;
1057 get_mut_group!(self).total_toks = self.len();
1058 }
1059
1060 pub fn add_image_choice_to_group(&self, choice: ImageChoice) {
1061 get_mut_group!(self).image_choices.push(choice);
1062 }
1063
1064 pub fn add_speech_pcm_to_group(&self, pcm: Arc<Vec<f32>>, rate: usize, channels: usize) {
1065 get_mut_group!(self).speech_pcms.push((pcm, rate, channels));
1066 }
1067
1068 pub fn add_choice_to_group(&self, choice: Choice) {
1069 get_mut_group!(self).choices.push(choice);
1070 self.update_time_info();
1071 }
1072
1073 pub fn add_raw_choice_to_group(&self, logit_chunks: Vec<Tensor>) {
1074 get_mut_group!(self)
1075 .raw_choices
1076 .push((logit_chunks, self.tokens.clone()));
1077 self.update_time_info();
1078 }
1079
1080 pub fn add_embedding_choice_to_group(&self, embedding: Vec<f32>) {
1081 get_mut_group!(self).embedding_choices.push(embedding);
1082 self.update_time_info();
1083 }
1084
1085 pub fn add_completion_choice_to_group(&self, mut choice: CompletionChoice) {
1086 choice.text = format!(
1087 "{}{}{}",
1088 self.prefix.as_deref().unwrap_or(""),
1089 choice.text,
1090 self.suffix.as_deref().unwrap_or("")
1091 );
1092 get_mut_group!(self)
1093 .completion_choices
1094 .push((self.cumulative_logprob, choice));
1095 self.update_time_info();
1096 }
1097
1098 pub fn get_response_index(&self) -> usize {
1099 self.response_index
1100 }
1101
1102 pub fn get_mut_group(&self) -> MutexGuard<'_, SequenceGroup> {
1103 get_mut_group!(self)
1104 }
1105
1106 pub fn add_streaming_chunk_choice_to_group(&self, chunk: ChunkChoice) {
1107 get_mut_group!(self).chat_streaming_chunks.push(chunk);
1108 self.update_time_info();
1109 }
1110
1111 pub fn add_streaming_completion_chunk_choice_to_group(&self, chunk: CompletionChunkChoice) {
1112 get_mut_group!(self).completion_streaming_chunks.push(chunk);
1113 self.update_time_info();
1114 }
1115
1116 pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
1117 self.multimodal.take_images()
1118 }
1119
1120 pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
1121 self.multimodal.clone_images()
1122 }
1123
1124 pub fn images(&self) -> Option<&[image::DynamicImage]> {
1125 self.multimodal.images()
1126 }
1127
1128 pub fn image_hashes(&self) -> Option<&[u64]> {
1129 self.multimodal.image_hashes()
1130 }
1131
1132 pub fn has_images(&self) -> bool {
1133 self.multimodal.has_images()
1134 }
1135
1136 pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
1137 self.multimodal.take_audios()
1138 }
1139
1140 pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
1141 self.multimodal.clone_audios()
1142 }
1143
1144 pub fn audios(&self) -> Option<&[AudioInput]> {
1145 self.multimodal.audios()
1146 }
1147
1148 pub fn audio_hashes(&self) -> Option<&[u64]> {
1149 self.multimodal.audio_hashes()
1150 }
1151
1152 pub fn has_audios(&self) -> bool {
1153 self.multimodal.has_audios()
1154 }
1155
1156 pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
1158 self.multimodal.keep_num_audios(audios_to_keep)
1159 }
1160
1161 pub fn keep_num_images(&mut self, images_to_keep: usize) {
1163 self.multimodal.keep_num_images(images_to_keep)
1164 }
1165
1166 pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
1167 self.multimodal.image_gen_response_format()
1168 }
1169
1170 pub fn sequence_stepping_type(&self) -> &SeqStepType {
1171 &self.sequence_stepping_type
1172 }
1173
1174 pub fn get_diffusion_diffusion_params(&self) -> Option<DiffusionGenerationParams> {
1175 self.multimodal.diffusion_params()
1176 }
1177
1178 pub fn eos_tokens(&self) -> &[u32] {
1179 &self.eos_tokens
1180 }
1181
1182 pub fn enable_harmony_mode(&mut self) -> Result<(), anyhow::Error> {
1187 if self.harmony_context.is_none() {
1188 self.harmony_context = Some(HarmonyContext::new()?);
1189 }
1190 Ok(())
1191 }
1192
1193 pub fn is_harmony_mode(&self) -> bool {
1195 self.harmony_context.is_some()
1196 }
1197
1198 pub fn process_harmony_token(&mut self, token_id: u32) -> Option<crate::harmony::HarmonyDelta> {
1201 self.harmony_context
1202 .as_mut()
1203 .map(|ctx| ctx.process_token(token_id))
1204 }
1205
1206 pub fn get_harmony_reasoning_delta(&mut self) -> Option<String> {
1209 self.harmony_context
1210 .as_mut()
1211 .and_then(|ctx| ctx.get_reasoning_delta())
1212 }
1213
1214 pub fn get_harmony_final_delta(&mut self) -> Option<String> {
1217 self.harmony_context
1218 .as_mut()
1219 .and_then(|ctx| ctx.get_final_delta())
1220 }
1221
1222 pub fn get_harmony_reasoning_content(&self) -> Option<String> {
1225 self.harmony_context
1226 .as_ref()
1227 .and_then(|ctx| ctx.reasoning_content())
1228 }
1229
1230 pub fn get_harmony_final_content(&self) -> Option<String> {
1233 self.harmony_context
1234 .as_ref()
1235 .and_then(|ctx| ctx.final_content())
1236 }
1237
1238 pub fn harmony_process_eos(&mut self) {
1240 if let Some(ref mut ctx) = self.harmony_context {
1241 ctx.process_eos();
1242 }
1243 }
1244
1245 pub fn has_harmony_tool_calls(&self) -> bool {
1247 self.harmony_context
1248 .as_ref()
1249 .is_some_and(|ctx| ctx.has_tool_call())
1250 }
1251
1252 pub fn get_harmony_tool_calls(&mut self) -> Vec<crate::harmony::HarmonyToolCall> {
1254 self.harmony_context
1255 .as_mut()
1256 .map(|ctx| ctx.finalize_tool_calls())
1257 .unwrap_or_default()
1258 }
1259
1260 pub fn enable_think_tag_mode(&mut self) {
1268 if self.think_tag_context.is_none() {
1269 let starts_in_think_block = self.prompt.trim_end().ends_with("<think>");
1271 self.think_tag_context = Some(if starts_in_think_block {
1272 ThinkTagContext::new_in_think_block()
1273 } else {
1274 ThinkTagContext::new()
1275 });
1276 }
1277 }
1278
1279 pub fn is_think_tag_mode(&self) -> bool {
1281 self.think_tag_context.is_some()
1282 }
1283
1284 pub fn process_think_tag_text(&mut self, text: &str) {
1286 if let Some(ref mut ctx) = self.think_tag_context {
1287 ctx.process_text(text);
1288 }
1289 }
1290
1291 pub fn get_think_tag_reasoning_delta(&mut self) -> Option<String> {
1294 self.think_tag_context
1295 .as_mut()
1296 .and_then(|ctx| ctx.get_reasoning_delta())
1297 }
1298
1299 pub fn get_think_tag_content_delta(&mut self) -> Option<String> {
1302 self.think_tag_context
1303 .as_mut()
1304 .and_then(|ctx| ctx.get_content_delta())
1305 }
1306
1307 pub fn get_think_tag_reasoning_content(&self) -> Option<String> {
1310 self.think_tag_context
1311 .as_ref()
1312 .and_then(|ctx| ctx.reasoning_content())
1313 }
1314
1315 pub fn get_think_tag_content(&self) -> Option<String> {
1318 self.think_tag_context
1319 .as_ref()
1320 .and_then(|ctx| ctx.content())
1321 }
1322
1323 pub fn think_tag_finalize(&mut self) {
1326 if let Some(ref mut ctx) = self.think_tag_context {
1327 ctx.finalize();
1328 }
1329 }
1330}
1331
1332pub struct SequenceGroup {
1333 n_choices: usize, best_of: Option<usize>, pub total_prompt_toks: usize,
1336 pub total_toks: usize,
1337 pub total_prompt_time: u128,
1338 pub total_time: u128,
1339 pub total_completion_time: u128,
1340 choices: Vec<Choice>,
1341 image_choices: Vec<ImageChoice>,
1342 speech_pcms: Vec<(Arc<Vec<f32>>, usize, usize)>, raw_choices: Vec<(Vec<Tensor>, Vec<u32>)>,
1344 embedding_choices: Vec<Vec<f32>>,
1345 completion_choices: Vec<(f32, CompletionChoice)>,
1346 pub chat_streaming_chunks: Vec<ChunkChoice>,
1347 pub completion_streaming_chunks: Vec<CompletionChunkChoice>,
1348 pub is_streaming: bool,
1349 pub is_chat: bool,
1350}
1351
1352impl SequenceGroup {
1353 pub fn new(
1354 n_choices: usize,
1355 is_streaming: bool,
1356 is_chat: bool,
1357 best_of: Option<usize>,
1358 ) -> Self {
1359 Self {
1360 choices: Vec::new(),
1361 image_choices: Vec::new(),
1362 speech_pcms: Vec::new(),
1363 raw_choices: Vec::new(),
1364 embedding_choices: Vec::new(),
1365 completion_choices: Vec::new(),
1366 n_choices,
1367 total_prompt_toks: 0,
1368 total_toks: 0,
1369 total_prompt_time: 0,
1370 total_time: 0,
1371 total_completion_time: 0,
1372 chat_streaming_chunks: Vec::new(),
1373 completion_streaming_chunks: Vec::new(),
1374 is_streaming,
1375 is_chat,
1376 best_of,
1377 }
1378 }
1379
1380 pub fn get_choices(&self) -> &[Choice] {
1381 &self.choices
1382 }
1383
1384 pub fn get_completion_choices(&self) -> Vec<CompletionChoice> {
1386 if let Some(best_of) = self.best_of {
1387 let mut choices = self.completion_choices.clone();
1388 choices.sort_by(|a, b| b.0.partial_cmp(&a.0).expect("No ordering."));
1390 choices
1391 .into_iter()
1392 .take(best_of)
1393 .map(|(_, x)| x)
1394 .collect::<Vec<_>>()
1395 } else {
1396 self.completion_choices
1397 .clone()
1398 .into_iter()
1399 .map(|(_, x)| x)
1400 .collect::<Vec<_>>()
1401 }
1402 }
1403
1404 pub fn get_image_choices(&self) -> &[ImageChoice] {
1405 &self.image_choices
1406 }
1407
1408 pub fn get_usage(&self) -> Usage {
1409 #[allow(clippy::cast_precision_loss)]
1410 Usage {
1411 completion_tokens: self.total_toks.saturating_sub(self.total_prompt_toks),
1412 prompt_tokens: self.total_prompt_toks,
1413 total_tokens: self.total_toks,
1414 avg_tok_per_sec: (self.total_toks as f32 / self.total_time as f32) * 1000.,
1415 avg_prompt_tok_per_sec: (self.total_prompt_toks as f32 / self.total_prompt_time as f32)
1416 * 1000.,
1417 avg_compl_tok_per_sec: (self.total_toks.saturating_sub(self.total_prompt_toks) as f32
1418 / self.total_completion_time as f32)
1419 * 1000.,
1420 total_time_sec: self.total_time as f32 / 1000.,
1421 total_completion_time_sec: self.total_completion_time as f32 / 1000.,
1422 total_prompt_time_sec: self.total_prompt_time as f32 / 1000.,
1423 }
1424 }
1425
1426 pub async fn maybe_send_chat_done_response(
1427 &self,
1428 response: ChatCompletionResponse,
1429 sender: Sender<Response>,
1430 ) -> Result<(), SendError<Response>> {
1431 if self.choices.len() == self.n_choices {
1432 sender.send(Response::Done(response)).await?;
1433 }
1434
1435 Ok(())
1436 }
1437
1438 pub async fn maybe_send_raw_done_response(
1439 &self,
1440 sender: Sender<Response>,
1441 ) -> Result<(), SendError<Response>> {
1442 if self.raw_choices.len() == self.n_choices {
1443 assert_eq!(self.raw_choices.len(), 1);
1444 let (logits_chunks, tokens) = self.raw_choices[0].clone();
1445 sender
1446 .send(Response::Raw {
1447 logits_chunks,
1448 tokens,
1449 })
1450 .await?;
1451 }
1452
1453 Ok(())
1454 }
1455
1456 pub async fn maybe_send_embedding_done_response(
1457 &self,
1458 sender: Sender<Response>,
1459 ) -> Result<(), SendError<Response>> {
1460 if self.embedding_choices.len() == self.n_choices {
1461 assert_eq!(self.embedding_choices.len(), 1);
1462 let embeddings = self.embedding_choices[0].clone();
1463 let prompt_tokens = self.total_prompt_toks;
1464 let total_tokens = self.total_toks;
1465 sender
1466 .send(Response::Embeddings {
1467 embeddings,
1468 prompt_tokens,
1469 total_tokens,
1470 })
1471 .await?;
1472 }
1473
1474 Ok(())
1475 }
1476
1477 pub async fn maybe_send_image_gen_response(
1478 &self,
1479 response: ImageGenerationResponse,
1480 sender: Sender<Response>,
1481 ) -> Result<(), SendError<Response>> {
1482 if self.image_choices.len() == self.n_choices {
1483 sender.send(Response::ImageGeneration(response)).await?;
1484 }
1485
1486 Ok(())
1487 }
1488
1489 pub async fn maybe_send_speech_response(
1490 &self,
1491 sender: Sender<Response>,
1492 ) -> Result<(), SendError<Response>> {
1493 assert_eq!(self.speech_pcms.len(), 1);
1494
1495 let (pcm, rate, channels) = self.speech_pcms[0].clone();
1496 sender
1497 .send(Response::Speech {
1498 pcm,
1499 rate,
1500 channels,
1501 })
1502 .await?;
1503
1504 Ok(())
1505 }
1506
1507 pub async fn maybe_send_streaming_response(
1508 &mut self,
1509 seq: &Sequence,
1510 model: String,
1511 usage_opt: Option<Usage>,
1512 ) -> Result<(), Box<SendError<Response>>> {
1513 if self.chat_streaming_chunks.len() == self.n_choices && self.is_streaming {
1514 let mut swap_streaming_chunks = vec![];
1515
1516 std::mem::swap(&mut swap_streaming_chunks, &mut self.chat_streaming_chunks);
1517
1518 seq.responder()
1519 .send(Response::Chunk(ChatCompletionChunkResponse {
1520 id: seq.id.to_string(),
1521 choices: swap_streaming_chunks,
1522 created: seq.timestamp,
1523 model: model.clone(),
1524 system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1525 object: "chat.completion.chunk".to_string(),
1526 usage: usage_opt,
1527 }))
1528 .await?;
1529 } else if self.completion_streaming_chunks.len() == self.n_choices && self.is_streaming {
1530 let mut swap_streaming_chunks = vec![];
1531
1532 std::mem::swap(
1533 &mut swap_streaming_chunks,
1534 &mut self.completion_streaming_chunks,
1535 );
1536
1537 seq.responder()
1538 .send(Response::CompletionChunk(CompletionChunkResponse {
1539 id: seq.id.to_string(),
1540 choices: swap_streaming_chunks,
1541 created: seq.timestamp,
1542 model: model.clone(),
1543 system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1544 object: "text_completion".to_string(),
1545 }))
1546 .await?;
1547 }
1548 Ok(())
1549 }
1550
1551 pub async fn maybe_send_completion_done_response(
1552 &self,
1553 response: CompletionResponse,
1554 sender: Sender<Response>,
1555 ) -> Result<(), Box<SendError<Response>>> {
1556 if self.completion_choices.len() == self.n_choices {
1557 sender.send(Response::CompletionDone(response)).await?;
1558 }
1559 Ok(())
1560 }
1561}