use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fs::File; use std::io::{BufRead, BufReader}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tokio::time::sleep; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DataEvent { pub timestamp: f64, pub value: f64, pub metadata: Option, } impl DataEvent { pub fn new(timestamp: f64, value: f64) -> Self { Self { timestamp, value, metadata: None, } } pub fn with_metadata(timestamp: f64, value: f64, metadata: serde_json::Value) -> Self { Self { timestamp, value, metadata: Some(metadata), } } } #[derive(Debug, Clone)] pub enum DataSourceConfig { File { path: String, interval_ms: u64, format: FileFormat, }, Mqtt { broker_url: String, topic: String, format: MessageFormat, }, WebSocket { url: String, format: MessageFormat, }, HttpApi { url: String, interval_ms: u64, format: MessageFormat, }, Pipe { path: String, format: MessageFormat, }, } #[derive(Debug, Clone)] pub enum FileFormat { Json, Csv, PlainText, Custom(String), // regex pattern } #[derive(Debug, Clone)] pub enum MessageFormat { Json, PlainText, Custom(String), // parser function name } #[async_trait] pub trait DataSource: Send + Sync { async fn start(&mut self, sender: mpsc::Sender) -> Result<(), Box>; async fn stop(&mut self) -> Result<(), Box>; fn is_running(&self) -> bool; fn get_config(&self) -> &DataSourceConfig; } pub struct FileDataSource { config: DataSourceConfig, running: Arc>, task_handle: Option>, } impl FileDataSource { pub fn new(config: DataSourceConfig) -> Self { Self { config, running: Arc::new(Mutex::new(false)), task_handle: None, } } fn parse_line(&self, line: &str, format: &FileFormat) -> Option { match format { FileFormat::Json => { if let Ok(event) = serde_json::from_str::(line) { Some(event) } else if let Ok(value) = serde_json::from_str::(line) { // Try to extract timestamp and value from JSON let timestamp = value.get("timestamp") .or_else(|| value.get("time")) .or_else(|| value.get("t")) .and_then(|v| v.as_f64()) .unwrap_or_else(|| std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64()); let data_value = value.get("value") .or_else(|| value.get("data")) .or_else(|| value.get("y")) .and_then(|v| v.as_f64()) .unwrap_or(0.0); Some(DataEvent::with_metadata(timestamp, data_value, value)) } else { None } }, FileFormat::Csv => { let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let timestamp = parts[0].parse::().unwrap_or_else(|_| { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64() }); let value = parts[1].parse::().unwrap_or(0.0); Some(DataEvent::new(timestamp, value)) } else { None } }, FileFormat::PlainText => { if let Ok(value) = line.trim().parse::() { let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64(); Some(DataEvent::new(timestamp, value)) } else { None } }, FileFormat::Custom(_pattern) => { // TODO: Implement regex parsing if let Ok(value) = line.trim().parse::() { let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64(); Some(DataEvent::new(timestamp, value)) } else { None } } } } } #[async_trait] impl DataSource for FileDataSource { async fn start(&mut self, sender: mpsc::Sender) -> Result<(), Box> { if let DataSourceConfig::File { path, interval_ms, format } = &self.config { let file = File::open(path)?; let reader = BufReader::new(file); let lines: Vec = reader.lines().collect::, _>>()?; *self.running.lock().unwrap() = true; let running = self.running.clone(); let format = format.clone(); let interval = Duration::from_millis(*interval_ms); let handle = tokio::spawn(async move { let mut line_index = 0; while *running.lock().unwrap() { if line_index < lines.len() { let line = &lines[line_index]; if let Some(event) = Self::parse_line_static(line, &format) { if sender.send(event).await.is_err() { break; // Receiver dropped } } line_index += 1; } else { // Restart from beginning (loop the file) line_index = 0; } sleep(interval).await; } }); self.task_handle = Some(handle); } Ok(()) } async fn stop(&mut self) -> Result<(), Box> { *self.running.lock().unwrap() = false; if let Some(handle) = self.task_handle.take() { handle.abort(); } Ok(()) } fn is_running(&self) -> bool { *self.running.lock().unwrap() } fn get_config(&self) -> &DataSourceConfig { &self.config } } impl FileDataSource { fn parse_line_static(line: &str, format: &FileFormat) -> Option { match format { FileFormat::Json => { if let Ok(event) = serde_json::from_str::(line) { Some(event) } else if let Ok(value) = serde_json::from_str::(line) { let timestamp = value.get("timestamp") .or_else(|| value.get("time")) .or_else(|| value.get("t")) .and_then(|v| v.as_f64()) .unwrap_or_else(|| std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64()); let data_value = value.get("value") .or_else(|| value.get("data")) .or_else(|| value.get("y")) .and_then(|v| v.as_f64()) .unwrap_or(0.0); Some(DataEvent::with_metadata(timestamp, data_value, value)) } else { None } }, FileFormat::Csv => { let parts: Vec<&str> = line.split(',').collect(); if parts.len() >= 2 { let timestamp = parts[0].parse::().unwrap_or_else(|_| { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64() }); let value = parts[1].parse::().unwrap_or(0.0); Some(DataEvent::new(timestamp, value)) } else { None } }, FileFormat::PlainText => { if let Ok(value) = line.trim().parse::() { let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64(); Some(DataEvent::new(timestamp, value)) } else { None } }, FileFormat::Custom(_pattern) => { if let Ok(value) = line.trim().parse::() { let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap().as_secs_f64(); Some(DataEvent::new(timestamp, value)) } else { None } } } } } // Placeholder implementations for future data sources pub struct MqttDataSource { config: DataSourceConfig, running: Arc>, } impl MqttDataSource { pub fn new(config: DataSourceConfig) -> Self { Self { config, running: Arc::new(Mutex::new(false)), } } } #[async_trait] impl DataSource for MqttDataSource { async fn start(&mut self, _sender: mpsc::Sender) -> Result<(), Box> { // TODO: Implement MQTT connection Err("MQTT not implemented yet".into()) } async fn stop(&mut self) -> Result<(), Box> { *self.running.lock().unwrap() = false; Ok(()) } fn is_running(&self) -> bool { *self.running.lock().unwrap() } fn get_config(&self) -> &DataSourceConfig { &self.config } } pub struct DataSourceManager { sources: Vec>, event_buffer: Arc>>, max_buffer_size: usize, sender: mpsc::Sender, receiver: Option>, buffer_task: Option>, } impl DataSourceManager { pub fn new(max_buffer_size: usize) -> Self { let (sender, receiver) = mpsc::channel(1000); Self { sources: Vec::new(), event_buffer: Arc::new(Mutex::new(VecDeque::new())), max_buffer_size, sender, receiver: Some(receiver), buffer_task: None, } } pub fn add_source(&mut self, source: Box) -> Result<(), Box> { self.sources.push(source); Ok(()) } pub async fn start_all(&mut self) -> Result<(), Box> { // Start buffer management task let receiver = self.receiver.take().unwrap(); let buffer = self.event_buffer.clone(); let max_size = self.max_buffer_size; let buffer_handle = tokio::spawn(async move { let mut receiver = receiver; while let Some(event) = receiver.recv().await { let mut buffer = buffer.lock().unwrap(); buffer.push_back(event); // Keep buffer size under limit while buffer.len() > max_size { buffer.pop_front(); } } }); self.buffer_task = Some(buffer_handle); // Start all sources for source in &mut self.sources { source.start(self.sender.clone()).await?; } Ok(()) } pub async fn stop_all(&mut self) -> Result<(), Box> { for source in &mut self.sources { source.stop().await?; } if let Some(handle) = self.buffer_task.take() { handle.abort(); } Ok(()) } pub fn get_events(&self, max_count: usize) -> Vec { let mut buffer = self.event_buffer.lock().unwrap(); let mut events = Vec::new(); for _ in 0..max_count { if let Some(event) = buffer.pop_front() { events.push(event); } else { break; } } events } pub fn has_events(&self) -> bool { !self.event_buffer.lock().unwrap().is_empty() } pub fn get_buffer_size(&self) -> usize { self.event_buffer.lock().unwrap().len() } }