Refactor to remove useless task

This commit is contained in:
Frederik Menke 2023-11-26 20:33:16 +01:00
parent 4d30463a87
commit 059ccdc14b

View file

@ -2,14 +2,13 @@ pub mod gcode;
use bytes::BytesMut; use bytes::BytesMut;
use futures::sink::SinkExt; use futures::sink::SinkExt;
use futures::stream::StreamExt; use futures::stream::{SplitSink, SplitStream, StreamExt};
use std::time::Duration; use std::time::Duration;
use std::{fmt::Write, io, rc::Rc, str}; use std::{fmt::Write, io, rc::Rc, str};
use tokio; use tokio;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::timeout; use tokio::time::timeout;
use tokio_serial::{SerialPortBuilderExt, SerialStream}; use tokio_serial::{SerialPortBuilderExt, SerialStream};
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder, Framed};
pub use gcode::{GcodeCommand, GcodeReply}; pub use gcode::{GcodeCommand, GcodeReply};
@ -17,12 +16,18 @@ use crate::printer::gcode::G91Command;
use self::gcode::GcodeReplyError; use self::gcode::GcodeReplyError;
/// Recv buffer string will be initialized with this capacity.
/// This should fit a simple "OK Pnn Bn" reply for GCODE commands that
/// do not return any data.
const RECV_BUFFER_CAPACITY: usize = 32;
#[derive(Debug)] #[derive(Debug)]
pub enum PrinterError { pub enum PrinterError {
IO(std::io::Error), IO(std::io::Error),
PrinterTaskDown, PrinterTaskDown,
OutputChannelDropped, OutputChannelDropped,
GcodeReply(GcodeReplyError), GcodeReply(GcodeReplyError),
NoResponseFromPrinter(String),
} }
pub struct State { pub struct State {
@ -30,24 +35,41 @@ pub struct State {
} }
pub struct Printer { pub struct Printer {
printer_in: Sender<Box<dyn GcodeCommand>>, serial_tx: SplitSink<Framed<SerialStream, LineCodec>, String>,
printer_out: Receiver<Result<GcodeReply, PrinterError>>, serial_rx: SplitStream<Framed<SerialStream, LineCodec>>,
pub state: Rc<State>, pub state: Rc<State>,
} }
impl Printer { impl Printer {
/// Send gcode to the printer and parse its reply
pub async fn send_gcode( pub async fn send_gcode(
&mut self, &mut self,
gcode: Box<dyn GcodeCommand>, command: Box<dyn GcodeCommand>,
) -> Result<GcodeReply, PrinterError> { ) -> Result<GcodeReply, PrinterError> {
self.printer_in let command_text = format!("{}\n", command.command());
.send(gcode) println!("sending in: {:?}", command_text);
self.serial_tx.send(command_text.clone()).await.unwrap();
// TODO: figure out how big a simple "OK"-reply is and use `with_capacity` for the `reply`
let mut reply = String::with_capacity(RECV_BUFFER_CAPACITY);
loop {
// TODO: add timeout below
let line = self
.serial_rx
.next()
.await .await
.map_err(|_| PrinterError::PrinterTaskDown)?; .ok_or(PrinterError::NoResponseFromPrinter(
self.printer_out "There are no more lines to get from the printer. Did the port close?"
.recv() .to_string(),
.await ))?;
.unwrap_or(Err(PrinterError::PrinterTaskDown)) let line = line.unwrap();
if line.contains("ok") {
let reply = command.parse_reply(&reply);
println!("got reply: {:?}", reply);
return reply.map_err(PrinterError::GcodeReply);
} else {
reply.push_str(&line);
}
}
} }
pub async fn connect(port_path: &str) -> Result<Self, PrinterError> { pub async fn connect(port_path: &str) -> Result<Self, PrinterError> {
@ -58,46 +80,10 @@ impl Printer {
port.set_exclusive(false) port.set_exclusive(false)
.expect("Unable to set serial port exclusive to false"); .expect("Unable to set serial port exclusive to false");
// commands that go to the printer
let (printer_in_tx, mut printer_in_rx) = channel::<Box<dyn GcodeCommand>>(32);
// replies that come from the printer
let (printer_out_tx, printer_out_rx) = channel::<Result<GcodeReply, PrinterError>>(32);
tokio::spawn(printer_communication_task(
port,
printer_in_rx,
printer_out_tx,
));
Ok(Printer {
printer_in: printer_in_tx,
printer_out: printer_out_rx,
state: Rc::new(State {
position: (0.0, 0.0, 0.0, 0.0),
}),
})
}
}
/// Pull commands from `user_commands`, write them to the printer and send the reply back through
/// `printer_replies`.
///
///
/// Arguments:
///
/// * `port`: Serial port that is connected to a printer running Marlin
/// * `user_commands`: Channel for the user to submit their GCODE
/// * `printer_replies`: Replies from the printer the the submitted GCODE. These always correspond
/// to the incoming commands in order that they were received.
async fn printer_communication_task(
port: SerialStream,
mut user_commands: Receiver<Box<dyn GcodeCommand>>,
printer_replies: Sender<Result<GcodeReply, PrinterError>>,
) {
let connection = LineCodec.framed(port); let connection = LineCodec.framed(port);
let (mut serial_tx, mut serial_rx) = connection.split(); let (mut serial_tx, mut serial_rx) = connection.split();
// The printer will send some info after connecting. We need to wait for this // The printer will send some info after connecting for the first time. We need to wait for this
// to be received as it will otherwise stop responding for some reason: // to be received as it will otherwise stop responding for some reason:
loop { loop {
if let Ok(message) = timeout(Duration::from_secs(10), serial_rx.next()).await { if let Ok(message) = timeout(Duration::from_secs(10), serial_rx.next()).await {
@ -118,7 +104,9 @@ async fn printer_communication_task(
} }
} else { } else {
// TODO: Check if port is good by sending some harmless gcode // TODO: Check if port is good by sending some harmless gcode
println!("Reading from serial port timed out. Printer might already be initialized."); println!(
"Reading from serial port timed out. Printer might already be initialized."
);
println!("Sending G91 command"); println!("Sending G91 command");
serial_tx serial_tx
.send(G91Command.command() + "\n") .send(G91Command.command() + "\n")
@ -127,25 +115,13 @@ async fn printer_communication_task(
} }
} }
while let Some(command) = user_commands.recv().await { Ok(Printer {
let command_text = format!("{}\n", command.command()); serial_rx,
println!("sending in: {:?}", command_text); serial_tx,
serial_tx.send(command_text).await.unwrap(); state: Rc::new(State {
let mut reply = "".to_string(); position: (0.0, 0.0, 0.0, 0.0),
while let Some(line) = serial_rx.next().await { }),
let line = line.unwrap(); })
if line.contains("ok") {
let reply = command.parse_reply(&reply);
println!("got reply: {:?}", reply);
printer_replies
.send(reply.map_err(PrinterError::GcodeReply))
.await
.unwrap();
break;
} else {
reply.push_str(&line);
}
}
} }
} }