mistralrs_core/engine/
logger.rs

1#![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
2
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use tracing::info;
9
10pub struct IntervalLogger {
11    enable_logging: Arc<AtomicBool>,
12    prefix_cache_hits: Arc<AtomicUsize>,
13    tokens_processed: Arc<AtomicUsize>,
14    total_new_seqs: Arc<AtomicUsize>,
15}
16
17impl IntervalLogger {
18    /// Starts an interval logger. Call `begin_logging` to begin the logging process.
19    pub fn new(interval: Duration) -> Self {
20        let prefix_cache_hits = Arc::new(AtomicUsize::new(0));
21        let tokens_processed = Arc::new(AtomicUsize::new(0));
22        let total_new_seqs = Arc::new(AtomicUsize::new(0));
23        let enable_logging = Arc::new(AtomicBool::new(false));
24
25        let t_prefix_cache_hits = prefix_cache_hits.clone();
26        let t_tokens_processed = tokens_processed.clone();
27        let t_total_new_seqs = total_new_seqs.clone();
28        let t_enable_logging = enable_logging.clone();
29        thread::spawn(move || {
30            // Wait
31            while !t_enable_logging.load(Ordering::Relaxed) {}
32
33            // Start the actual logging
34            loop {
35                thread::sleep(interval);
36
37                let total_new_seqs = t_total_new_seqs.load(Ordering::Relaxed);
38                let prefix_cache_hits = t_prefix_cache_hits.load(Ordering::Relaxed);
39                let tokens_processed = t_tokens_processed.swap(0, Ordering::Relaxed);
40
41                if total_new_seqs != 0 && tokens_processed != 0 {
42                    info!(
43                        "Throughput (T/s) {:.2}, Prefix cache hitrate {:.2}%",
44                        tokens_processed as f64 / interval.as_secs_f64(),
45                        100. * prefix_cache_hits as f64 / total_new_seqs as f64,
46                    );
47                }
48            }
49        });
50
51        Self {
52            prefix_cache_hits,
53            tokens_processed,
54            total_new_seqs,
55            enable_logging,
56        }
57    }
58
59    pub fn enable_logging(&self) {
60        self.enable_logging.store(true, Ordering::Relaxed);
61    }
62
63    pub fn add_tokens_processed(&self, num_tokens: usize) {
64        self.tokens_processed
65            .fetch_add(num_tokens, Ordering::Relaxed);
66    }
67
68    pub fn add_new_sequence(&self) {
69        self.total_new_seqs.fetch_add(1, Ordering::Relaxed);
70    }
71
72    pub fn add_prefix_cache_hit(&self) {
73        self.prefix_cache_hits.fetch_add(1, Ordering::Relaxed);
74    }
75}