1use crate::{
2 get_mut_arcmutex, get_mut_group,
3 harmony::HarmonyContext,
4 paged_attention::PhysicalTokenBlock,
5 pipeline::{text_models_inputs_processor::PagedAttentionMeta, LayerCaches},
6 response::{ChatCompletionChunkResponse, Choice, ChunkChoice, Response, SYSTEM_FINGERPRINT},
7 sampler::{Logprobs, Sampler},
8 AudioInput, ChatCompletionResponse, Usage,
9};
10use crate::{
11 paged_attention::{BlockEngineSequence, LogicalTokenBlock},
12 pipeline::{DiffusionGenerationParams, KvCache},
13 response::CompletionChoice,
14 tools::ToolCallingMatcher,
15 CompletionChunkChoice, CompletionChunkResponse, CompletionResponse, ImageChoice,
16 ImageGenerationResponse, ImageGenerationResponseFormat,
17};
18use candle_core::Tensor;
19use std::{
20 fmt::Display,
21 hash::{DefaultHasher, Hash, Hasher},
22 sync::{Arc, RwLock},
23 time::{SystemTime, UNIX_EPOCH},
24};
25use tokio::sync::{
26 mpsc::{error::SendError, Sender},
27 Mutex, MutexGuard,
28};
29
30#[derive(Clone, Copy, PartialEq, Debug)]
31pub enum StopReason {
32 Eos,
33 StopTok(u32),
34 Length(usize),
35 ModelLength(usize),
36 StopString {
37 stop_string_idx: usize,
38 completion_bytes_pos: usize,
39 },
40 Canceled,
41 GeneratedImage,
42 GeneratedSpeech,
43 ToolCalls,
44}
45
46impl Display for StopReason {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 match self {
49 StopReason::Eos => write!(f, "stop"),
50 StopReason::Length(_) | StopReason::ModelLength(_) => write!(f, "length"),
51 StopReason::StopTok(_) | StopReason::StopString { .. } => write!(f, "stop"),
52 StopReason::Canceled => write!(f, "canceled"),
53 StopReason::GeneratedImage => write!(f, "generated_image"),
54 StopReason::GeneratedSpeech => write!(f, "generated_speech"),
55 StopReason::ToolCalls => write!(f, "tool_calls"),
56 }
57 }
58}
59
60#[derive(Clone, Copy, PartialEq, Debug)]
61pub enum SequenceState {
62 Done(StopReason),
63 RunningPrompt,
64 RunningCompletion,
65 Waiting,
66 Error,
67 RunningPrefillPrompt,
68 FinishedAborted,
70 FinishedIgnored,
71 Swapped,
72}
73
74pub enum SequenceRecognizer {
75 Llguidance(Box<llguidance::Matcher>),
76 None,
77}
78
79enum SequenceCustomMetadata {
80 PagedAttention {
81 logical_token_blocks: Vec<LogicalTokenBlock>,
82 physical_blocks_prefill: Option<Vec<Arc<PhysicalTokenBlock>>>,
83 block_size: usize,
84 },
85 None,
86}
87
88macro_rules! blocks_to_add_new_tok {
89 ($logical_token_blocks:expr) => {{
90 let last = $logical_token_blocks.last();
91 if !last.is_some_and(|last| last.is_full() || last.is_empty()) {
92 0
94 } else {
95 1
96 }
97 }};
98}
99
100pub(crate) fn util_append_token_to_blocks(
101 tok: usize,
102 logical_token_blocks: &mut Vec<LogicalTokenBlock>,
103 block_size: usize,
104) {
105 let last = logical_token_blocks.last_mut();
106 match last {
107 Some(last) => {
108 last.append_token_id(tok);
109 }
110 None => {
111 logical_token_blocks.push(LogicalTokenBlock::new(block_size));
112 logical_token_blocks
114 .last_mut()
115 .expect("just pushed a block, vector cannot be empty")
116 .append_token_id(tok);
117 }
118 }
119 if logical_token_blocks
122 .last()
123 .expect("logical_token_blocks should not be empty after appending")
124 .is_full()
125 {
126 logical_token_blocks.push(LogicalTokenBlock::new(block_size));
127 }
128}
129
130impl SequenceCustomMetadata {
131 fn append_token_to_blocks(&mut self, tok: usize) {
132 match self {
133 Self::PagedAttention {
134 logical_token_blocks,
135 physical_blocks_prefill: _,
136 block_size,
137 } => {
138 util_append_token_to_blocks(tok, logical_token_blocks, *block_size);
139 }
140 Self::None => (),
141 }
142 }
143
144 fn pop_token_from_blocks(&mut self) {
145 match self {
146 Self::PagedAttention {
147 logical_token_blocks,
148 physical_blocks_prefill: _,
149 block_size: _,
150 } => {
151 let last = logical_token_blocks.last_mut().unwrap();
152 last.pop_token();
153 }
154 Self::None => (),
155 }
156 }
157
158 fn append_tokens_to_blocks(&mut self, toks: Vec<usize>) {
159 for tok in toks {
160 self.append_token_to_blocks(tok);
161 }
162 }
163
164 fn remove_tokens_from_blocks(&mut self, n: usize) {
165 for _ in 0..n {
166 self.pop_token_from_blocks();
167 }
168 }
169}
170
171#[derive(Clone, Copy)]
172pub enum SeqStepType {
173 PromptAndDecode,
174 OneShot,
175}
176
177pub struct SequenceImages {
178 images: Vec<image::DynamicImage>,
179 hashes: Vec<u64>,
180}
181
182#[derive(Clone)]
183pub struct SequenceAudios {
184 audios: Vec<AudioInput>,
185 hashes: Vec<u64>,
186}
187
188impl SequenceAudios {
189 fn new(input_audios: Vec<AudioInput>) -> Self {
190 let hashes = input_audios.iter().map(|a| {
191 let mut hasher = DefaultHasher::new();
192 for s in &a.samples {
193 s.to_bits().hash(&mut hasher);
194 }
195 a.sample_rate.hash(&mut hasher);
196 hasher.finish()
197 });
198 Self {
199 hashes: hashes.collect(),
200 audios: input_audios,
201 }
202 }
203
204 fn clone_audios(&self) -> Vec<AudioInput> {
205 self.audios.clone()
206 }
207
208 fn audios(&self) -> &[AudioInput] {
209 &self.audios
210 }
211
212 fn audios_mut(&mut self) -> &mut Vec<AudioInput> {
213 &mut self.audios
214 }
215
216 fn hashes(&self) -> &[u64] {
217 &self.hashes
218 }
219
220 fn keep_num_audios(&mut self, audios_to_keep: usize) {
221 if self.audios.len() > audios_to_keep {
222 let start = self.audios.len() - audios_to_keep;
223 self.audios = self.audios[start..].to_vec();
224 }
227 }
228}
229
230impl SequenceImages {
231 fn new(input_images: Vec<image::DynamicImage>) -> Self {
232 let hashes = input_images.iter().map(|x| {
233 let mut hasher = DefaultHasher::new();
234 x.as_bytes().hash(&mut hasher);
235 hasher.finish()
236 });
237 Self {
238 hashes: hashes.collect(),
239 images: input_images,
240 }
241 }
242
243 fn clone_images(&self) -> Vec<image::DynamicImage> {
244 self.images.clone()
245 }
246
247 fn images(&self) -> &[image::DynamicImage] {
248 &self.images
249 }
250
251 fn images_mut(&mut self) -> &mut Vec<image::DynamicImage> {
252 &mut self.images
253 }
254
255 fn hashes(&self) -> &[u64] {
256 &self.hashes
257 }
258
259 fn keep_num_images(&mut self, images_to_keep: usize) {
260 if self.images.len() > images_to_keep {
261 let start = self.images.len() - images_to_keep;
262 self.images = self.images[start..].to_vec();
263 }
266 }
267}
268
269pub struct MultimodalData {
271 pub input_images: Option<SequenceImages>,
272 pub input_audios: Option<SequenceAudios>,
273 pub cached_pixel_values: Option<Tensor>,
274 pub cached_img_thw: Option<Tensor>,
275 pub cached_vid_thw: Option<Tensor>,
276 pub has_changed_prompt: bool,
277 pub image_gen_response_format: Option<ImageGenerationResponseFormat>,
278 pub diffusion_params: Option<DiffusionGenerationParams>,
279}
280
281impl MultimodalData {
282 pub fn new(
283 input_images: Option<Vec<image::DynamicImage>>,
284 input_audios: Option<Vec<AudioInput>>,
285 image_gen_response_format: Option<ImageGenerationResponseFormat>,
286 diffusion_params: Option<DiffusionGenerationParams>,
287 ) -> Self {
288 MultimodalData {
289 input_images: input_images.map(SequenceImages::new),
290 input_audios: input_audios.map(SequenceAudios::new),
291 cached_pixel_values: None,
292 cached_img_thw: None,
293 cached_vid_thw: None,
294 has_changed_prompt: false,
295 image_gen_response_format,
296 diffusion_params,
297 }
298 }
299
300 pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
301 if self.has_changed_prompt {
302 if let Some(input_images) = self.input_images.as_mut() {
303 let mut images = Vec::new();
304 std::mem::swap(&mut images, input_images.images_mut());
305 Some(images)
306 } else {
307 None
308 }
309 } else {
310 self.input_images.as_ref().map(|imgs| imgs.clone_images())
311 }
312 }
313
314 pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
315 self.input_images.as_ref().map(|imgs| imgs.clone_images())
316 }
317
318 pub fn images(&self) -> Option<&[image::DynamicImage]> {
319 self.input_images.as_ref().map(|imgs| imgs.images())
320 }
321
322 pub fn image_hashes(&self) -> Option<&[u64]> {
323 self.input_images.as_ref().map(|imgs| imgs.hashes())
324 }
325
326 pub fn has_images(&self) -> bool {
327 self.input_images
328 .as_ref()
329 .is_some_and(|imgs| !imgs.images().is_empty())
330 }
331
332 pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
333 if self.has_changed_prompt {
334 if let Some(input_audios) = self.input_audios.as_mut() {
335 let mut audios = Vec::new();
336 std::mem::swap(&mut audios, input_audios.audios_mut());
337 Some(audios)
338 } else {
339 None
340 }
341 } else {
342 self.input_audios.as_ref().map(|imgs| imgs.clone_audios())
343 }
344 }
345
346 pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
347 self.input_audios.as_ref().map(|a| a.clone_audios())
348 }
349
350 pub fn audios(&self) -> Option<&[AudioInput]> {
351 self.input_audios.as_ref().map(|a| a.audios())
352 }
353
354 pub fn audio_hashes(&self) -> Option<&[u64]> {
355 self.input_audios.as_ref().map(|a| a.hashes())
356 }
357
358 pub fn has_audios(&self) -> bool {
359 self.input_audios
360 .as_ref()
361 .is_some_and(|a| !a.audios().is_empty())
362 }
363
364 pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
365 if let Some(auds) = self.input_audios.as_mut() {
366 auds.keep_num_audios(audios_to_keep)
367 }
368 }
369
370 pub fn keep_num_images(&mut self, images_to_keep: usize) {
371 if let Some(imgs) = self.input_images.as_mut() {
372 imgs.keep_num_images(images_to_keep)
373 }
374 }
375
376 pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
377 self.image_gen_response_format
378 }
379
380 pub fn diffusion_params(&self) -> Option<DiffusionGenerationParams> {
381 self.diffusion_params.clone()
382 }
383}
384
385pub struct Sequence {
386 id: usize,
388 prompt_len: usize,
389 max_len: Option<usize>,
390 timestamp: u128,
391 sampler: Arc<Sampler>,
392 stop_tokens: Vec<u32>,
393 stop_strings: Vec<String>,
394 return_logprobs: bool,
395 responder: Sender<Response>,
396 response_index: usize,
397 creation_time: u64,
398 prompt: String,
399 sequence_stepping_type: SeqStepType,
400 pub(crate) return_raw_logits: bool,
401 token_offset: usize,
402 eos_tokens: Vec<u32>,
403
404 pub multimodal: MultimodalData,
406
407 suffix: Option<String>,
409 prefix: Option<String>,
410
411 is_tmp: bool,
413
414 prefill_prompt_toks: Option<Vec<u32>>,
416 prefix_cache_len: usize,
419
420 normal_cache: Vec<Option<KvCache>>,
422 normal_draft_cache: Vec<Option<KvCache>>,
423 scaling_cache: Option<Tensor>,
424 cache: LayerCaches,
425 draft_cache: LayerCaches,
426 xlora_cache: Option<LayerCaches>,
427 mamba_state_idx: Option<usize>,
429
430 seq_preallocated_cache: Option<(Tensor, Tensor)>,
432
433 tokens: Vec<u32>,
435 logprobs: Vec<Logprobs>,
436 cumulative_logprob: f32,
437 last_logprob: f32,
438 last_completion_bytes_len: usize,
439 last_is_done: Option<StopReason>,
440 completion_bytes: Vec<u8>,
441 stream_idx: usize,
442 pub recognizer: SequenceRecognizer,
443 scheduling_urgency: usize, waitlisted_count: usize, pub prompt_tok_per_sec: f32,
448 pub prompt_timestamp: Option<u128>,
449 pub total_prompt_time: Option<u128>,
450 group: Arc<Mutex<SequenceGroup>>,
451 state: RwLock<SequenceState>,
452
453 custom_metadata: SequenceCustomMetadata,
455
456 pub tools: Option<Arc<ToolCallingMatcher>>,
458
459 harmony_context: Option<HarmonyContext>,
461}
462
463impl BlockEngineSequence for Sequence {
464 fn blocks_to_add_new_tok(&self) -> usize {
465 match &self.custom_metadata {
466 SequenceCustomMetadata::PagedAttention {
467 logical_token_blocks,
468 physical_blocks_prefill: _,
469 block_size: _,
470 } => {
471 blocks_to_add_new_tok!(logical_token_blocks)
472 }
473 SequenceCustomMetadata::None => unreachable!(),
474 }
475 }
476
477 fn get_id(&self) -> usize {
478 self.id
479 }
480
481 fn logical_token_blocks(&self) -> &[LogicalTokenBlock] {
482 match &self.custom_metadata {
483 SequenceCustomMetadata::PagedAttention {
484 logical_token_blocks,
485 physical_blocks_prefill: _,
486 block_size: _,
487 } => logical_token_blocks,
488 SequenceCustomMetadata::None => unreachable!(),
489 }
490 }
491
492 fn take_physical_blocks_prefill(&mut self) -> Option<Vec<Arc<PhysicalTokenBlock>>> {
493 match &mut self.custom_metadata {
494 SequenceCustomMetadata::PagedAttention {
495 logical_token_blocks: _,
496 physical_blocks_prefill,
497 block_size: _,
498 } => physical_blocks_prefill.take(),
499 SequenceCustomMetadata::None => None,
500 }
501 }
502
503 fn increment_waitlist_count(&mut self) -> usize {
504 let prev = self.waitlisted_count;
505 self.waitlisted_count += 1;
506 prev
507 }
508
509 fn set_prefix_cache_len(&mut self, len: usize) {
510 self.prefix_cache_len = len;
511 }
512
513 fn block_size(&self) -> usize {
514 match &self.custom_metadata {
515 SequenceCustomMetadata::PagedAttention { block_size, .. } => *block_size,
516 SequenceCustomMetadata::None => unreachable!(),
517 }
518 }
519}
520
521impl Sequence {
522 #[allow(clippy::too_many_arguments)]
523 pub fn new_waiting(
524 tokens: Vec<u32>,
525 prompt: String,
526 id: usize,
527 timestamp: u128,
528 layers: usize,
529 responder: Sender<Response>,
530 sampler: Sampler,
531 stop_tokens: Vec<u32>,
532 stop_strings: Vec<String>,
533 max_len: Option<usize>,
534 return_logprobs: bool,
535 is_xlora: bool,
536 group: Arc<Mutex<SequenceGroup>>,
537 response_index: usize,
538 creation_time: u64,
539 recognizer: SequenceRecognizer,
540 suffix: Option<String>,
541 prefix: Option<String>,
542 input_images: Option<Vec<image::DynamicImage>>,
543 input_audios: Option<Vec<AudioInput>>,
544 block_size: Option<usize>,
546 tools: Option<Arc<ToolCallingMatcher>>,
548 image_gen_response_format: Option<ImageGenerationResponseFormat>,
549 sequence_stepping_type: SeqStepType,
550 diffusion_params: Option<DiffusionGenerationParams>,
551 seq_preallocated_cache: Option<(Tensor, Tensor)>,
553 return_raw_logits: bool,
555 eos_tokens: Vec<u32>,
556 ) -> Self {
557 let prompt_len = tokens.len();
558 let mut custom_metadata = if let Some(block_size) = block_size {
559 SequenceCustomMetadata::PagedAttention {
560 logical_token_blocks: Vec::new(),
561 physical_blocks_prefill: None,
562 block_size,
563 }
564 } else {
565 SequenceCustomMetadata::None
566 };
567 custom_metadata
568 .append_tokens_to_blocks(tokens.iter().map(|x| *x as usize).collect::<Vec<_>>());
569 Self {
570 tokens,
571 prompt,
572 logprobs: Vec::new(),
573 prompt_len,
574 id,
575 timestamp,
576 state: RwLock::new(SequenceState::Waiting),
577 normal_cache: vec![None; layers],
578 normal_draft_cache: vec![None; layers],
579 cache: vec![None; layers],
580 draft_cache: vec![None; layers],
581 xlora_cache: if is_xlora {
582 Some(vec![None; layers])
583 } else {
584 None
585 },
586 mamba_state_idx: None,
587 seq_preallocated_cache,
588 responder,
589 sampler: sampler.into(),
590 stop_tokens,
591 stop_strings,
592 max_len,
593 return_logprobs,
594 prompt_tok_per_sec: 0.,
595 prompt_timestamp: None,
596 group,
597 scaling_cache: None,
598 response_index,
599 creation_time,
600 recognizer,
601 prefill_prompt_toks: None,
602 prefix_cache_len: 0,
603 suffix,
604 prefix,
605 cumulative_logprob: 0.,
606 completion_bytes: Vec::new(),
607 stream_idx: 0,
608 last_completion_bytes_len: 0,
609 last_logprob: 0.0,
610 last_is_done: None,
611 is_tmp: false,
612 scheduling_urgency: 0,
613 multimodal: MultimodalData::new(
615 input_images,
616 input_audios,
617 image_gen_response_format,
618 diffusion_params,
619 ),
620 custom_metadata,
621 tools,
622 sequence_stepping_type,
623 return_raw_logits,
624 token_offset: 0,
625 eos_tokens,
626 total_prompt_time: None,
627 waitlisted_count: 0,
628 harmony_context: None,
629 }
630 }
631
632 pub fn add_urgency(mut self) -> Self {
633 self.scheduling_urgency += 1;
634 self
635 }
636
637 pub fn reset_urgency(mut self) -> Self {
638 self.scheduling_urgency = 0;
639 self
640 }
641
642 pub fn compute_priority(&self) -> f64 {
646 #![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
647 (self.scheduling_urgency as f64) + (self.len() as f64).log2()
648 }
649
650 pub fn prefill_v2_normal(
651 mut self,
652 cache: Vec<Option<KvCache>>,
653 toks: Vec<u32>,
654 offset: usize,
655 ) -> Self {
656 self.normal_cache = cache;
657 self.prefill_prompt_toks = Some(toks);
658 self.set_state(SequenceState::RunningPrefillPrompt);
659 self.token_offset = offset;
660 self
661 }
662
663 pub fn prefill_v2_paged(
664 mut self,
665 logical_blocks: Vec<LogicalTokenBlock>,
666 physical_blocks: Vec<Arc<PhysicalTokenBlock>>,
667 toks: Vec<u32>,
668 offset: usize,
669 ) -> Self {
670 self.prefill_prompt_toks = Some(toks);
671 self.set_state(SequenceState::RunningPrefillPrompt);
672 self.token_offset = offset;
673
674 if let SequenceCustomMetadata::PagedAttention {
675 logical_token_blocks,
676 physical_blocks_prefill,
677 block_size: _,
678 } = &mut self.custom_metadata
679 {
680 *logical_token_blocks = logical_blocks;
681 *physical_blocks_prefill = Some(physical_blocks);
682 }
683
684 self
685 }
686
687 pub fn len(&self) -> usize {
689 if let Some(toks) = &self.prefill_prompt_toks {
690 return toks.len();
691 }
692 if self.is_tmp {
693 return self.tokens.len();
694 }
695 if self.xlora_cache.as_ref().is_some_and(|c| c[0].is_some()) {
697 self.xlora_cache.as_ref().unwrap()[0]
698 .as_ref()
699 .unwrap()
700 .0
701 .dims()[2]
702 + 1
703 } else if let Some((_, x)) = &self.cache[0] {
704 x.dims()[2] + 1
705 } else {
706 self.tokens.len()
707 }
708 }
709
710 pub fn id(&self) -> &usize {
711 &self.id
712 }
713
714 pub fn is_running(&self) -> bool {
715 matches!(
716 *self.state.read().unwrap(),
717 SequenceState::RunningCompletion | SequenceState::RunningPrompt )
719 }
720
721 pub fn is_completion(&self) -> bool {
722 matches!(
723 *self.state.read().unwrap(),
724 SequenceState::RunningCompletion
725 )
726 }
727
728 pub fn is_prompt(&self) -> bool {
729 matches!(
730 *self.state.read().unwrap(),
731 SequenceState::RunningPrompt | SequenceState::RunningPrefillPrompt
732 )
733 }
734
735 pub fn is_waiting(&self) -> bool {
736 matches!(*self.state.read().unwrap(), SequenceState::Waiting)
737 }
738
739 pub fn is_finished_paged_attn(&self) -> bool {
740 matches!(
741 *self.state.read().unwrap(),
742 SequenceState::FinishedAborted
743 | SequenceState::FinishedIgnored
744 | SequenceState::Done(_)
745 )
746 }
747
748 pub fn get_toks(&self) -> &[u32] {
749 if let Some(toks) = &self.prefill_prompt_toks {
750 return toks;
751 }
752 &self.tokens
753 }
754
755 pub fn get_initial_prompt(&self) -> &str {
756 &self.prompt
757 }
758
759 pub fn set_initial_prompt(&mut self, new: String) {
760 self.prompt = new;
761 }
762
763 pub fn token_offset(&self) -> usize {
764 self.token_offset
765 }
766
767 pub fn prefix_cache_len(&self) -> usize {
770 self.prefix_cache_len
771 }
772
773 pub fn set_prefix_cache_len(&mut self, len: usize) {
775 self.prefix_cache_len = len;
776 }
777
778 pub(crate) fn set_toks_and_reallocate(
780 &mut self,
781 toks: Vec<u32>,
782 paged_attn_metadata: Option<&mut PagedAttentionMeta>,
783 ) {
784 self.tokens.clone_from(&toks);
785 self.prompt_len = self.tokens.len();
786 match &mut self.custom_metadata {
788 SequenceCustomMetadata::PagedAttention {
789 logical_token_blocks,
790 physical_blocks_prefill: _,
791 block_size: _,
792 } => {
793 logical_token_blocks.clear();
794 }
795 SequenceCustomMetadata::None => (),
796 }
797 self.custom_metadata
798 .append_tokens_to_blocks(toks.iter().map(|x| *x as usize).collect::<Vec<_>>());
799
800 if let Some(metadata) = paged_attn_metadata {
801 get_mut_arcmutex!(metadata.block_engine).free_sequence(*self.id());
803 get_mut_arcmutex!(metadata.block_engine).allocate(self);
804 }
805 }
806
807 pub fn completion_bytes(&self) -> &[u8] {
808 &self.completion_bytes
809 }
810
811 pub fn preallocated_cache(&self) -> Option<&(Tensor, Tensor)> {
812 self.seq_preallocated_cache.as_ref()
813 }
814
815 pub fn normal_cache(&mut self) -> &mut Vec<Option<KvCache>> {
816 &mut self.normal_cache
817 }
818
819 pub fn normal_draft_cache(&mut self) -> &mut Vec<Option<KvCache>> {
820 &mut self.normal_draft_cache
821 }
822
823 pub fn cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
824 &mut self.cache
825 }
826
827 pub fn draft_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
828 &mut self.draft_cache
829 }
830
831 pub fn xlora_cache(&mut self) -> &mut Vec<Option<(Tensor, Tensor)>> {
832 self.xlora_cache.as_mut().expect("No X-LoRA cache.")
833 }
834
835 pub fn scaling_cache(&mut self) -> &mut Option<Tensor> {
836 &mut self.scaling_cache
837 }
838
839 pub fn mamba_state_idx(&self) -> Option<usize> {
840 self.mamba_state_idx
841 }
842
843 pub fn set_mamba_state_idx(&mut self, idx: Option<usize>) {
844 self.mamba_state_idx = idx;
845 }
846
847 pub fn is_xlora(&self) -> bool {
848 self.xlora_cache.is_some()
849 }
850
851 pub fn sampler(&mut self) -> Arc<Sampler> {
852 self.sampler.clone()
853 }
854
855 pub fn set_prefill_toks(&mut self, toks: Vec<u32>) {
857 self.prefill_prompt_toks = Some(toks)
858 }
859
860 pub fn reset_prefill_toks(&mut self) {
862 self.prefill_prompt_toks = None
863 }
864
865 pub(crate) fn add_tmp_tok(&mut self, tok: u32) {
867 self.is_tmp = true;
868 self.tokens.push(tok);
869 self.custom_metadata.append_token_to_blocks(tok as usize);
871 }
872
873 pub(crate) fn remove_tmp_tok(&mut self, n: usize) {
875 self.is_tmp = false;
876 self.tokens.truncate(self.tokens.len() - n);
877 self.custom_metadata.remove_tokens_from_blocks(n);
879 }
880
881 pub fn add_token(
882 &mut self,
883 tok: Logprobs,
884 completion_bytes: Vec<u8>,
885 is_done: &Option<StopReason>,
886 ) {
887 let stopped_by_token = matches!(
888 is_done,
889 Some(StopReason::Eos) | Some(StopReason::StopTok(_))
890 );
891 if !stopped_by_token {
892 self.completion_bytes.extend_from_slice(&completion_bytes);
896 self.last_completion_bytes_len = completion_bytes.len();
897 }
898 self.last_logprob = tok.logprob;
899 self.last_is_done = *is_done;
900
901 self.custom_metadata
902 .append_token_to_blocks(tok.token as usize);
903
904 if let Some(ref mut harmony_ctx) = self.harmony_context {
906 let _ = harmony_ctx.process_token(tok.token);
907 }
908
909 self.cumulative_logprob += tok.logprob;
910 self.tokens.push(tok.token);
911 self.logprobs.push(tok);
912 self.reset_prefill_toks();
913 }
914
915 pub fn responder(&self) -> Sender<Response> {
916 self.responder.clone()
917 }
918
919 pub fn creation_time(&self) -> u64 {
920 self.creation_time
921 }
922
923 pub fn set_state(&self, state: SequenceState) {
924 if matches!(state, SequenceState::Error) {
925 let mut group = get_mut_group!(self);
926 group.n_choices = group.n_choices.saturating_sub(1);
927 }
928 *self.state.write().unwrap() = state;
929 }
930
931 pub fn getstate(&self) -> SequenceState {
932 *self.state.read().unwrap()
933 }
934
935 pub fn is_done(
936 &self,
937 tok: u32,
938 eos_tok: Option<&[u32]>,
939 max_model_len: usize,
940 ) -> Option<StopReason> {
941 let is_eos = match eos_tok {
942 Some(eos_tok) => eos_tok.contains(&tok),
943 None => false,
944 };
945 if is_eos {
946 Some(StopReason::Eos)
947 } else if matches!(
948 &*self.state.read().unwrap(),
949 SequenceState::Done(StopReason::Canceled)
950 ) {
951 Some(StopReason::Canceled)
952 } else if self.stop_tokens.contains(&tok) {
953 Some(StopReason::StopTok(tok))
954 } else if self.max_len.is_some()
955 && self.tokens.len().saturating_sub(self.prompt_len) + 1 >= self.max_len.unwrap()
956 {
957 Some(StopReason::Length(self.max_len.unwrap()))
959 } else if self.tokens.len().saturating_sub(self.prompt_len) >= max_model_len {
960 Some(StopReason::ModelLength(max_model_len))
961 } else {
962 if !self.stop_strings.is_empty() {
963 for (idx, s) in self.stop_strings.iter().enumerate() {
964 if let Some(pos) = galil_seiferas::gs_find(&self.completion_bytes, s.as_bytes())
965 {
966 return Some(StopReason::StopString {
967 stop_string_idx: idx,
968 completion_bytes_pos: pos,
969 });
970 }
971 }
972 }
973 None
974 }
975 }
976
977 pub fn logprobs(&self) -> &[Logprobs] {
978 &self.logprobs
979 }
980
981 pub fn return_logprobs(&self) -> bool {
982 self.return_logprobs
983 }
984
985 pub fn prompt_tokens(&self) -> usize {
986 self.prompt_len
987 }
988
989 pub fn stop_strings(&self) -> &[String] {
990 &self.stop_strings
991 }
992
993 pub fn get_delta(
995 &mut self,
996 ) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
997 let new_decoded = self.peek_delta();
998 if matches!(new_decoded, Ok(Some(_))) {
999 self.stream_idx = self.completion_bytes.len();
1000 }
1001 new_decoded
1002 }
1003
1004 pub fn peek_delta(&self) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
1006 let is_first = self.stream_idx == 0;
1007 let new_decoded = String::from_utf8_lossy(&self.completion_bytes[self.stream_idx..]);
1008 if new_decoded.ends_with('�') {
1010 return Ok(None);
1011 }
1012
1013 if is_first {
1017 return Ok(Some(new_decoded.trim_start().to_string()));
1018 }
1019 Ok(Some(new_decoded.to_string()))
1020 }
1021
1022 pub fn timestamp(&self) -> u128 {
1023 self.timestamp
1024 }
1025
1026 pub fn prompt_timestamp(&self) -> Option<u128> {
1027 self.prompt_timestamp
1028 }
1029
1030 fn update_time_info(&self) {
1031 let now = SystemTime::now()
1032 .duration_since(UNIX_EPOCH)
1033 .expect("Time travel has occurred!")
1034 .as_millis();
1035
1036 if let Some(ts) = self.prompt_timestamp {
1037 get_mut_group!(self).total_completion_time = now - ts;
1038 get_mut_group!(self).total_prompt_time = self.total_prompt_time.unwrap();
1039 }
1040
1041 get_mut_group!(self).total_time = now - self.timestamp;
1042
1043 get_mut_group!(self).total_prompt_toks = self.prompt_len;
1044 get_mut_group!(self).total_toks = self.len();
1045 }
1046
1047 pub fn add_image_choice_to_group(&self, choice: ImageChoice) {
1048 get_mut_group!(self).image_choices.push(choice);
1049 }
1050
1051 pub fn add_speech_pcm_to_group(&self, pcm: Arc<Vec<f32>>, rate: usize, channels: usize) {
1052 get_mut_group!(self).speech_pcms.push((pcm, rate, channels));
1053 }
1054
1055 pub fn add_choice_to_group(&self, choice: Choice) {
1056 get_mut_group!(self).choices.push(choice);
1057 self.update_time_info();
1058 }
1059
1060 pub fn add_raw_choice_to_group(&self, logit_chunks: Vec<Tensor>) {
1061 get_mut_group!(self)
1062 .raw_choices
1063 .push((logit_chunks, self.tokens.clone()));
1064 self.update_time_info();
1065 }
1066
1067 pub fn add_embedding_choice_to_group(&self, embedding: Vec<f32>) {
1068 get_mut_group!(self).embedding_choices.push(embedding);
1069 self.update_time_info();
1070 }
1071
1072 pub fn add_completion_choice_to_group(&self, mut choice: CompletionChoice) {
1073 choice.text = format!(
1074 "{}{}{}",
1075 self.prefix.as_deref().unwrap_or(""),
1076 choice.text,
1077 self.suffix.as_deref().unwrap_or("")
1078 );
1079 get_mut_group!(self)
1080 .completion_choices
1081 .push((self.cumulative_logprob, choice));
1082 self.update_time_info();
1083 }
1084
1085 pub fn get_response_index(&self) -> usize {
1086 self.response_index
1087 }
1088
1089 pub fn get_mut_group(&self) -> MutexGuard<'_, SequenceGroup> {
1090 get_mut_group!(self)
1091 }
1092
1093 pub fn add_streaming_chunk_choice_to_group(&self, chunk: ChunkChoice) {
1094 get_mut_group!(self).chat_streaming_chunks.push(chunk);
1095 self.update_time_info();
1096 }
1097
1098 pub fn add_streaming_completion_chunk_choice_to_group(&self, chunk: CompletionChunkChoice) {
1099 get_mut_group!(self).completion_streaming_chunks.push(chunk);
1100 self.update_time_info();
1101 }
1102
1103 pub fn take_images(&mut self) -> Option<Vec<image::DynamicImage>> {
1104 self.multimodal.take_images()
1105 }
1106
1107 pub fn clone_images(&self) -> Option<Vec<image::DynamicImage>> {
1108 self.multimodal.clone_images()
1109 }
1110
1111 pub fn images(&self) -> Option<&[image::DynamicImage]> {
1112 self.multimodal.images()
1113 }
1114
1115 pub fn image_hashes(&self) -> Option<&[u64]> {
1116 self.multimodal.image_hashes()
1117 }
1118
1119 pub fn has_images(&self) -> bool {
1120 self.multimodal.has_images()
1121 }
1122
1123 pub fn take_audios(&mut self) -> Option<Vec<AudioInput>> {
1124 self.multimodal.take_audios()
1125 }
1126
1127 pub fn clone_audios(&self) -> Option<Vec<AudioInput>> {
1128 self.multimodal.clone_audios()
1129 }
1130
1131 pub fn audios(&self) -> Option<&[AudioInput]> {
1132 self.multimodal.audios()
1133 }
1134
1135 pub fn audio_hashes(&self) -> Option<&[u64]> {
1136 self.multimodal.audio_hashes()
1137 }
1138
1139 pub fn has_audios(&self) -> bool {
1140 self.multimodal.has_audios()
1141 }
1142
1143 pub fn keep_num_audios(&mut self, audios_to_keep: usize) {
1145 self.multimodal.keep_num_audios(audios_to_keep)
1146 }
1147
1148 pub fn keep_num_images(&mut self, images_to_keep: usize) {
1150 self.multimodal.keep_num_images(images_to_keep)
1151 }
1152
1153 pub fn image_gen_response_format(&self) -> Option<ImageGenerationResponseFormat> {
1154 self.multimodal.image_gen_response_format()
1155 }
1156
1157 pub fn sequence_stepping_type(&self) -> &SeqStepType {
1158 &self.sequence_stepping_type
1159 }
1160
1161 pub fn get_diffusion_diffusion_params(&self) -> Option<DiffusionGenerationParams> {
1162 self.multimodal.diffusion_params()
1163 }
1164
1165 pub fn eos_tokens(&self) -> &[u32] {
1166 &self.eos_tokens
1167 }
1168
1169 pub fn enable_harmony_mode(&mut self) -> Result<(), anyhow::Error> {
1174 if self.harmony_context.is_none() {
1175 self.harmony_context = Some(HarmonyContext::new()?);
1176 }
1177 Ok(())
1178 }
1179
1180 pub fn is_harmony_mode(&self) -> bool {
1182 self.harmony_context.is_some()
1183 }
1184
1185 pub fn process_harmony_token(&mut self, token_id: u32) -> Option<crate::harmony::HarmonyDelta> {
1188 self.harmony_context
1189 .as_mut()
1190 .map(|ctx| ctx.process_token(token_id))
1191 }
1192
1193 pub fn get_harmony_reasoning_delta(&mut self) -> Option<String> {
1196 self.harmony_context
1197 .as_mut()
1198 .and_then(|ctx| ctx.get_reasoning_delta())
1199 }
1200
1201 pub fn get_harmony_final_delta(&mut self) -> Option<String> {
1204 self.harmony_context
1205 .as_mut()
1206 .and_then(|ctx| ctx.get_final_delta())
1207 }
1208
1209 pub fn get_harmony_reasoning_content(&self) -> Option<String> {
1212 self.harmony_context
1213 .as_ref()
1214 .and_then(|ctx| ctx.reasoning_content())
1215 }
1216
1217 pub fn get_harmony_final_content(&self) -> Option<String> {
1220 self.harmony_context
1221 .as_ref()
1222 .and_then(|ctx| ctx.final_content())
1223 }
1224
1225 pub fn harmony_process_eos(&mut self) {
1227 if let Some(ref mut ctx) = self.harmony_context {
1228 ctx.process_eos();
1229 }
1230 }
1231
1232 pub fn has_harmony_tool_calls(&self) -> bool {
1234 self.harmony_context
1235 .as_ref()
1236 .is_some_and(|ctx| ctx.has_tool_call())
1237 }
1238
1239 pub fn get_harmony_tool_calls(&mut self) -> Vec<crate::harmony::HarmonyToolCall> {
1241 self.harmony_context
1242 .as_mut()
1243 .map(|ctx| ctx.finalize_tool_calls())
1244 .unwrap_or_default()
1245 }
1246}
1247
1248pub struct SequenceGroup {
1249 n_choices: usize, best_of: Option<usize>, pub total_prompt_toks: usize,
1252 pub total_toks: usize,
1253 pub total_prompt_time: u128,
1254 pub total_time: u128,
1255 pub total_completion_time: u128,
1256 choices: Vec<Choice>,
1257 image_choices: Vec<ImageChoice>,
1258 speech_pcms: Vec<(Arc<Vec<f32>>, usize, usize)>, raw_choices: Vec<(Vec<Tensor>, Vec<u32>)>,
1260 embedding_choices: Vec<Vec<f32>>,
1261 completion_choices: Vec<(f32, CompletionChoice)>,
1262 pub chat_streaming_chunks: Vec<ChunkChoice>,
1263 pub completion_streaming_chunks: Vec<CompletionChunkChoice>,
1264 pub is_streaming: bool,
1265 pub is_chat: bool,
1266}
1267
1268impl SequenceGroup {
1269 pub fn new(
1270 n_choices: usize,
1271 is_streaming: bool,
1272 is_chat: bool,
1273 best_of: Option<usize>,
1274 ) -> Self {
1275 Self {
1276 choices: Vec::new(),
1277 image_choices: Vec::new(),
1278 speech_pcms: Vec::new(),
1279 raw_choices: Vec::new(),
1280 embedding_choices: Vec::new(),
1281 completion_choices: Vec::new(),
1282 n_choices,
1283 total_prompt_toks: 0,
1284 total_toks: 0,
1285 total_prompt_time: 0,
1286 total_time: 0,
1287 total_completion_time: 0,
1288 chat_streaming_chunks: Vec::new(),
1289 completion_streaming_chunks: Vec::new(),
1290 is_streaming,
1291 is_chat,
1292 best_of,
1293 }
1294 }
1295
1296 pub fn get_choices(&self) -> &[Choice] {
1297 &self.choices
1298 }
1299
1300 pub fn get_completion_choices(&self) -> Vec<CompletionChoice> {
1302 if let Some(best_of) = self.best_of {
1303 let mut choices = self.completion_choices.clone();
1304 choices.sort_by(|a, b| b.0.partial_cmp(&a.0).expect("No ordering."));
1306 choices
1307 .into_iter()
1308 .take(best_of)
1309 .map(|(_, x)| x)
1310 .collect::<Vec<_>>()
1311 } else {
1312 self.completion_choices
1313 .clone()
1314 .into_iter()
1315 .map(|(_, x)| x)
1316 .collect::<Vec<_>>()
1317 }
1318 }
1319
1320 pub fn get_image_choices(&self) -> &[ImageChoice] {
1321 &self.image_choices
1322 }
1323
1324 pub fn get_usage(&self) -> Usage {
1325 #[allow(clippy::cast_precision_loss)]
1326 Usage {
1327 completion_tokens: self.total_toks.saturating_sub(self.total_prompt_toks),
1328 prompt_tokens: self.total_prompt_toks,
1329 total_tokens: self.total_toks,
1330 avg_tok_per_sec: (self.total_toks as f32 / self.total_time as f32) * 1000.,
1331 avg_prompt_tok_per_sec: (self.total_prompt_toks as f32 / self.total_prompt_time as f32)
1332 * 1000.,
1333 avg_compl_tok_per_sec: (self.total_toks.saturating_sub(self.total_prompt_toks) as f32
1334 / self.total_completion_time as f32)
1335 * 1000.,
1336 total_time_sec: self.total_time as f32 / 1000.,
1337 total_completion_time_sec: self.total_completion_time as f32 / 1000.,
1338 total_prompt_time_sec: self.total_prompt_time as f32 / 1000.,
1339 }
1340 }
1341
1342 pub async fn maybe_send_chat_done_response(
1343 &self,
1344 response: ChatCompletionResponse,
1345 sender: Sender<Response>,
1346 ) -> Result<(), SendError<Response>> {
1347 if self.choices.len() == self.n_choices {
1348 sender.send(Response::Done(response)).await?;
1349 }
1350
1351 Ok(())
1352 }
1353
1354 pub async fn maybe_send_raw_done_response(
1355 &self,
1356 sender: Sender<Response>,
1357 ) -> Result<(), SendError<Response>> {
1358 if self.raw_choices.len() == self.n_choices {
1359 assert_eq!(self.raw_choices.len(), 1);
1360 let (logits_chunks, tokens) = self.raw_choices[0].clone();
1361 sender
1362 .send(Response::Raw {
1363 logits_chunks,
1364 tokens,
1365 })
1366 .await?;
1367 }
1368
1369 Ok(())
1370 }
1371
1372 pub async fn maybe_send_embedding_done_response(
1373 &self,
1374 sender: Sender<Response>,
1375 ) -> Result<(), SendError<Response>> {
1376 if self.embedding_choices.len() == self.n_choices {
1377 assert_eq!(self.embedding_choices.len(), 1);
1378 let embeddings = self.embedding_choices[0].clone();
1379 let prompt_tokens = self.total_prompt_toks;
1380 let total_tokens = self.total_toks;
1381 sender
1382 .send(Response::Embeddings {
1383 embeddings,
1384 prompt_tokens,
1385 total_tokens,
1386 })
1387 .await?;
1388 }
1389
1390 Ok(())
1391 }
1392
1393 pub async fn maybe_send_image_gen_response(
1394 &self,
1395 response: ImageGenerationResponse,
1396 sender: Sender<Response>,
1397 ) -> Result<(), SendError<Response>> {
1398 if self.image_choices.len() == self.n_choices {
1399 sender.send(Response::ImageGeneration(response)).await?;
1400 }
1401
1402 Ok(())
1403 }
1404
1405 pub async fn maybe_send_speech_response(
1406 &self,
1407 sender: Sender<Response>,
1408 ) -> Result<(), SendError<Response>> {
1409 assert_eq!(self.speech_pcms.len(), 1);
1410
1411 let (pcm, rate, channels) = self.speech_pcms[0].clone();
1412 sender
1413 .send(Response::Speech {
1414 pcm,
1415 rate,
1416 channels,
1417 })
1418 .await?;
1419
1420 Ok(())
1421 }
1422
1423 pub async fn maybe_send_streaming_response(
1424 &mut self,
1425 seq: &Sequence,
1426 model: String,
1427 usage_opt: Option<Usage>,
1428 ) -> Result<(), Box<SendError<Response>>> {
1429 if self.chat_streaming_chunks.len() == self.n_choices && self.is_streaming {
1430 let mut swap_streaming_chunks = vec![];
1431
1432 std::mem::swap(&mut swap_streaming_chunks, &mut self.chat_streaming_chunks);
1433
1434 seq.responder()
1435 .send(Response::Chunk(ChatCompletionChunkResponse {
1436 id: seq.id.to_string(),
1437 choices: swap_streaming_chunks,
1438 created: seq.timestamp,
1439 model: model.clone(),
1440 system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1441 object: "chat.completion.chunk".to_string(),
1442 usage: usage_opt,
1443 }))
1444 .await?;
1445 } else if self.completion_streaming_chunks.len() == self.n_choices && self.is_streaming {
1446 let mut swap_streaming_chunks = vec![];
1447
1448 std::mem::swap(
1449 &mut swap_streaming_chunks,
1450 &mut self.completion_streaming_chunks,
1451 );
1452
1453 seq.responder()
1454 .send(Response::CompletionChunk(CompletionChunkResponse {
1455 id: seq.id.to_string(),
1456 choices: swap_streaming_chunks,
1457 created: seq.timestamp,
1458 model: model.clone(),
1459 system_fingerprint: SYSTEM_FINGERPRINT.to_string(),
1460 object: "text_completion".to_string(),
1461 }))
1462 .await?;
1463 }
1464 Ok(())
1465 }
1466
1467 pub async fn maybe_send_completion_done_response(
1468 &self,
1469 response: CompletionResponse,
1470 sender: Sender<Response>,
1471 ) -> Result<(), Box<SendError<Response>>> {
1472 if self.completion_choices.len() == self.n_choices {
1473 sender.send(Response::CompletionDone(response)).await?;
1474 }
1475 Ok(())
1476 }
1477}