diff --git a/Cargo.toml b/Cargo.toml index 665e2e950..b6c080b5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ rand = "0.10" reqwest = "0.13" serde = { version = "^1.0" } serde_json = { version = "^1.0" } +tdms = "0.3.0" tempdir = "0.3" tokio = { version = "1" } tokio-stream = "0.1" diff --git a/rust/crates/sift_cli/Cargo.toml b/rust/crates/sift_cli/Cargo.toml index 4e0aed21f..1dfbe57e6 100644 --- a/rust/crates/sift_cli/Cargo.toml +++ b/rust/crates/sift_cli/Cargo.toml @@ -31,6 +31,7 @@ parquet = { workspace = true } pbjson-types = { workspace = true } reqwest = { workspace = true } sift_pbfs = { workspace = true } +tdms = { workspace = true } sift_rs = { workspace = true } tokio = { workspace = true, features = ["full", "net", "time"] } tokio-stream = { workspace = true } diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 5b1b5f396..1d74dff25 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -1,7 +1,9 @@ use clap::{Parser, Subcommand, crate_description, crate_version}; use clap_complete::Shell; use parquet::ComplexTypesMode; +pub mod tdms; use std::path::PathBuf; +use tdms::TdmsFallbackMethod; pub mod channel; use channel::DataType; @@ -156,6 +158,9 @@ pub enum ImportCmd { #[command(subcommand)] Parquet(ImportParquetCmd), + /// Import a Tdms file into Sift. + Tdms(ImportTdmsArgs), + /// Import backup files generated by sift_stream into Sift. #[command(name = "backups")] Backup(BackupArgs), @@ -294,6 +299,32 @@ pub struct ImportBackupArgs { pub cleanup: bool, } +#[derive(clap::Args)] +pub struct CommonImportArgs { + /// Path to the file to import + pub path: PathBuf, + + /// Name of the asset this data belongs to + #[arg(short, long)] + pub asset: String, + + /// Optional run name to associate with this import + #[arg(short, long)] + pub run: Option, + + /// The id of an existing run to add this data to. Takes precedence over --run + #[arg(long)] + pub run_id: Option, + + /// Wait until the import finishes processing + #[arg(short, long)] + pub wait: bool, + + /// Preview the parsed schema without uploading + #[arg(short, long)] + pub preview: bool, +} + #[derive(Subcommand)] pub enum ImportParquetCmd { /// A parquet file where every column is exclusive to a single channel except for the time @@ -363,3 +394,29 @@ pub struct FlatDatasetArgs { #[arg(short, long)] pub preview: bool, } + +#[derive(clap::Args)] +pub struct ImportTdmsArgs { + #[command(flatten)] + pub common: CommonImportArgs, + + /// Optional override on start time + #[arg(long)] + pub start_time_override: Option, + + /// Fallback method for channels with missing timing information + #[arg(short, long, default_value = "fail-on-error")] + pub fallback_method: TdmsFallbackMethod, + + /// Time format for the channels not using the TDMS timestamp type + #[arg(long)] + pub time_format: Option, + + /// Relative start time for channels using a non standard time channel + #[arg(short = 's', long)] + pub relative_start_time: Option, + + /// If true, will import TDMS file properties to the run as metadata + #[arg(long)] + pub import_file_properties: bool, +} diff --git a/rust/crates/sift_cli/src/cli/tdms.rs b/rust/crates/sift_cli/src/cli/tdms.rs new file mode 100644 index 000000000..5a16f2455 --- /dev/null +++ b/rust/crates/sift_cli/src/cli/tdms.rs @@ -0,0 +1,29 @@ +use std::fmt::{self, Display}; + +use clap::ValueEnum; +use sift_rs::data_imports::v2::TdmsFallbackMethod as ProtoTdmsFallbackMethod; + +#[derive(Debug, Copy, Clone, ValueEnum, Default)] +pub enum TdmsFallbackMethod { + #[default] + FailOnError, + IgnoreError, +} + +impl From for ProtoTdmsFallbackMethod { + fn from(fm: TdmsFallbackMethod) -> Self { + match fm { + TdmsFallbackMethod::FailOnError => Self::FailOnError, + TdmsFallbackMethod::IgnoreError => Self::IgnoreError, + } + } +} + +impl Display for TdmsFallbackMethod { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::FailOnError => write!(f, "fail-on-error"), + Self::IgnoreError => write!(f, "ignore-error"), + } + } +} diff --git a/rust/crates/sift_cli/src/cmd/import/mod.rs b/rust/crates/sift_cli/src/cmd/import/mod.rs index f65694ca0..5dda5616e 100644 --- a/rust/crates/sift_cli/src/cmd/import/mod.rs +++ b/rust/crates/sift_cli/src/cmd/import/mod.rs @@ -14,6 +14,7 @@ use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output, user:: pub mod backup; pub mod csv; pub mod parquet; +pub mod tdms; mod utils; const INDENT_1: &str = " "; diff --git a/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs b/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs new file mode 100644 index 000000000..96cfe52d5 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs @@ -0,0 +1,248 @@ +use std::{fs::File, path::Path, process::ExitCode}; + +use anyhow::{Context as AnyhowContext, Result, anyhow}; +use chrono::DateTime; +use crossterm::style::Stylize; +use pbjson_types::Timestamp; +use sift_rs::{ + common::r#type::v1::{ChannelConfig, ChannelDataType}, + data_imports::v2::{ + CreateDataImportFromUploadRequest, CreateDataImportFromUploadResponse, TdmsConfig, + TdmsFallbackMethod as ProtoFallbackMethod, TimeFormat as ProtoTimeFormat, + data_import_service_client::DataImportServiceClient, + }, +}; +use tdms::TDMSFile; +use tdms::data_type::TdmsDataType; +use tdms::segment::Channel; + +use crate::{ + cli::{ImportTdmsArgs, tdms::TdmsFallbackMethod}, + cmd::{ + Context, + import::{ + preview_import_config, + utils::{upload_gzipped_file, validate_time_format}, + wait_for_job_completion, + }, + }, + util::{api::create_grpc_channel, tty::Output}, +}; + +pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result { + let grpc_channel = + create_grpc_channel(&ctx).context("failed to create grpc channel for tdms import")?; + let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone()); + let tdms_config = build_tdms_config(&args).context("failed to build tdms config")?; + + if args.common.preview { + let run_label = if tdms_config.run_id.is_empty() { + tdms_config.run_name.as_str() + } else { + tdms_config.run_id.as_str() + }; + + match detect_config(&args.common.path, args.fallback_method) { + Ok(channel_configs) => { + let refs: Vec<&ChannelConfig> = channel_configs.iter().collect(); + preview_import_config(&args.common.asset, run_label, &refs); + } + Err(e) => { + preview_import_config(&args.common.asset, run_label, &[]); + Output::new() + .line(format!("client-side preview parse failed: {e:#}")) + .tip("the server-side parser may still ingest this file correctly") + .eprint(); + } + } + return Ok(ExitCode::SUCCESS); + } + + let file = File::open(&args.common.path).context("failed to open tdms file")?; + + let create_data_import_req = + create_data_import_request(tdms_config).context("failed to create data import req")?; + + let CreateDataImportFromUploadResponse { upload_url, .. } = data_imports_client + .create_data_import_from_upload(create_data_import_req) + .await + .context("error creating data import for tdms")? + .into_inner(); + + upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream") + .await + .context("failed to upload tdms file")?; + + let location = args.common.run.as_ref().map_or_else( + || format!("asset '{}'", args.common.asset.cyan()), + |r| format!("run '{}'", r.clone().cyan()), + ); + + if !args.common.wait { + Output::new() + .line(format!("{} file for processing", "Uploaded".green())) + .tip(format!( + "Once processing is complete the data will be available on the {location}." + )) + .print(); + + return Ok(ExitCode::SUCCESS); + } + + wait_for_job_completion(grpc_channel, location).await +} + +pub fn build_tdms_config(args: &ImportTdmsArgs) -> Result { + if args.import_file_properties && args.common.run.is_none() && args.common.run_id.is_none() { + anyhow::bail!("--import-file-properties requires --run or --run-id"); + } + + validate_time_format( + args.time_format.unwrap_or_default(), + &args.relative_start_time, + ) + .context("validating time format for tdms")?; + + let relative_start_time_input = match &args.relative_start_time { + Some(start) => { + let rs = DateTime::parse_from_rfc3339(start) + .context("--relative-start-time is not valid RFC3339")?; + let utc = rs.to_utc(); + Some(Timestamp::from(utc)) + } + None => None, + }; + + let start_time_override_input = match &args.start_time_override { + Some(override_wf) => { + let rs = DateTime::parse_from_rfc3339(override_wf) + .context("--start-time-override is not valid RFC3339")?; + let utc = rs.to_utc(); + Some(Timestamp::from(utc)) + } + None => None, + }; + + Ok(TdmsConfig { + asset_name: args.common.asset.clone(), + run_name: args.common.run.clone().unwrap_or_default(), + start_time_override: start_time_override_input, + run_id: args.common.run_id.clone().unwrap_or_default(), + fallback_method: ProtoFallbackMethod::from(args.fallback_method).into(), + time_format: args.time_format.map(|tf| ProtoTimeFormat::from(tf).into()), + relative_start_time: relative_start_time_input, + import_file_properties: args.import_file_properties, + ..Default::default() + }) +} + +fn create_data_import_request(config: TdmsConfig) -> Result { + let req = CreateDataImportFromUploadRequest { + tdms_config: Some(config), + ..Default::default() + }; + Ok(req) +} + +pub(super) fn tdms_to_sift_data_type(tdms_type: TdmsDataType) -> Option { + match tdms_type { + TdmsDataType::SingleFloat(_) | TdmsDataType::SingleFloatWithUnit(_) => { + Some(ChannelDataType::Float) + } + TdmsDataType::DoubleFloat(_) + | TdmsDataType::DoubleFloatWithUnit(_) + | TdmsDataType::ExtendedFloat(_) + | TdmsDataType::ExtendedFloatWithUnit(_) => Some(ChannelDataType::Double), + + TdmsDataType::I8(_) | TdmsDataType::I16(_) | TdmsDataType::I32(_) => { + Some(ChannelDataType::Int32) + } + TdmsDataType::I64(_) => Some(ChannelDataType::Int64), + + TdmsDataType::U8(_) | TdmsDataType::U16(_) | TdmsDataType::U32(_) => { + Some(ChannelDataType::Uint32) + } + TdmsDataType::U64(_) => Some(ChannelDataType::Uint64), + + TdmsDataType::Boolean(_) => Some(ChannelDataType::Bool), + TdmsDataType::String => Some(ChannelDataType::String), + TdmsDataType::TimeStamp(_) => Some(ChannelDataType::Int64), + + TdmsDataType::ComplexSingleFloat(_) => Some(ChannelDataType::Float), + TdmsDataType::ComplexDoubleFloat(_) => Some(ChannelDataType::Double), + + TdmsDataType::FixedPoint(_) => Some(ChannelDataType::Double), + + TdmsDataType::Void | TdmsDataType::DAQmxRawData => None, + } +} + +pub(super) fn find_time_channel(channels: &[(String, &Channel)]) -> Option { + channels + .iter() + .find(|(name, ch)| { + matches!(ch.data_type, TdmsDataType::TimeStamp(_)) + || name.eq_ignore_ascii_case("Time") + || name.eq_ignore_ascii_case("Timestamp") + }) + .map(|(name, _)| name.clone()) +} + +pub(super) fn is_waveform_channel(channel: &Channel) -> bool { + let has_prop = |name: &str| channel.properties.iter().any(|p| p.name == name); + has_prop("wf_start_time") && has_prop("wf_increment") +} + +fn get_string_property(channel: &Channel, name: &str) -> String { + channel + .properties + .iter() + .find(|p| p.name == name) + .and_then(|p| p.value.value.as_ref()) + .and_then(|bytes| String::from_utf8(bytes.clone()).ok()) + .unwrap_or_default() +} + +fn detect_config(path: &Path, fallback_method: TdmsFallbackMethod) -> Result> { + let file = + TDMSFile::from_path(path).map_err(|e| anyhow!("failed to open tdms for preview: {e}"))?; + let mut channels_vec: Vec = vec![]; + + for group in file.groups() { + let channels: Vec<(String, &Channel)> = file.channels(&group).into_iter().collect(); + let time_channel_name = find_time_channel(&channels); + + for (channel_name, channel) in &channels { + if Some(channel_name) == time_channel_name.as_ref() { + continue; + } + + let Some(data_type) = tdms_to_sift_data_type(channel.data_type) else { + continue; + }; + + let has_timing = is_waveform_channel(channel) || time_channel_name.is_some(); + if !has_timing { + match fallback_method { + TdmsFallbackMethod::IgnoreError => continue, + TdmsFallbackMethod::FailOnError => { + return Err(anyhow!( + "no timing information for {}.{}", + group, + channel_name + )); + } + } + } + + channels_vec.push(ChannelConfig { + name: format!("{}.{}", group, channel_name), + units: get_string_property(channel, "unit_string"), + description: get_string_property(channel, "description"), + data_type: data_type as i32, + ..Default::default() + }); + } + } + Ok(channels_vec) +} diff --git a/rust/crates/sift_cli/src/cmd/import/tdms/mod.rs b/rust/crates/sift_cli/src/cmd/import/tdms/mod.rs new file mode 100644 index 000000000..bf6d55942 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/import/tdms/mod.rs @@ -0,0 +1,4 @@ +pub mod detect_tdms_config; + +#[cfg(test)] +mod tests; diff --git a/rust/crates/sift_cli/src/cmd/import/tdms/tests.rs b/rust/crates/sift_cli/src/cmd/import/tdms/tests.rs new file mode 100644 index 000000000..2b90cff5f --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/import/tdms/tests.rs @@ -0,0 +1,337 @@ +use std::path::PathBuf; + +use crate::cli::tdms::TdmsFallbackMethod; +use crate::cli::time::TimeFormat; +use crate::cli::{CommonImportArgs, ImportTdmsArgs}; +use crate::cmd::import::tdms::detect_tdms_config::{ + build_tdms_config, find_time_channel, is_waveform_channel, tdms_to_sift_data_type, +}; +use chrono::DateTime; +use sift_rs::common::r#type::v1::ChannelDataType; +use sift_rs::data_imports::v2::{ + TdmsFallbackMethod as ProtoFallbackMethod, TimeFormat as ProtoTimeFormat, +}; +use tdms::data_type::{TDMSValue, TdmsDataType}; +use tdms::segment::{Channel, Endianness, MetadataProperty}; + +fn make_args() -> ImportTdmsArgs { + ImportTdmsArgs { + common: CommonImportArgs { + path: PathBuf::from("test.tdms"), + asset: "test-asset".into(), + run: None, + run_id: None, + wait: false, + preview: false, + }, + start_time_override: None, + fallback_method: TdmsFallbackMethod::FailOnError, + time_format: None, + relative_start_time: None, + import_file_properties: false, + } +} + +fn make_channel(data_type: TdmsDataType, property_names: &[&str]) -> Channel { + Channel { + full_path: String::new(), + group_path: String::new(), + path: String::new(), + data_type, + raw_data_index: None, + daqmx_data_index: None, + properties: property_names + .iter() + .map(|n| MetadataProperty { + name: n.to_string(), + data_type: TdmsDataType::String, + value: TDMSValue { + data_type: TdmsDataType::String, + endianness: Endianness::Little, + value: None, + }, + }) + .collect(), + chunk_positions: Vec::new(), + string_offset_pos: None, + interleaved_offset: 0, + } +} + +#[test] +fn build_tdms_config_defaults() { + let args = make_args(); + let cfg = build_tdms_config(&args).expect("defaults should build"); + assert_eq!(cfg.asset_name, "test-asset"); + assert_eq!(cfg.run_name, ""); + assert_eq!(cfg.run_id, ""); + assert_eq!(cfg.fallback_method, ProtoFallbackMethod::FailOnError as i32); + assert!(cfg.data.is_empty()); + assert!(cfg.start_time_override.is_none()); + assert!(cfg.relative_start_time.is_none()); + assert!(cfg.time_format.is_none()); + assert!(!cfg.import_file_properties); +} + +#[test] +fn build_tdms_config_run_name_passes_through() { + let mut args = make_args(); + args.common.run = Some("my-run".into()); + let cfg = build_tdms_config(&args).expect("build"); + assert_eq!(cfg.run_name, "my-run"); + assert_eq!(cfg.run_id, ""); +} + +#[test] +fn build_tdms_config_run_id_passes_through() { + let mut args = make_args(); + args.common.run_id = Some("run-abc-123".into()); + let cfg = build_tdms_config(&args).expect("build"); + assert_eq!(cfg.run_id, "run-abc-123"); + assert_eq!(cfg.run_name, ""); +} + +#[test] +fn build_tdms_config_fallback_method_ignore_error() { + let mut args = make_args(); + args.fallback_method = TdmsFallbackMethod::IgnoreError; + let cfg = build_tdms_config(&args).expect("build"); + assert_eq!(cfg.fallback_method, ProtoFallbackMethod::IgnoreError as i32); +} + +#[test] +fn build_tdms_config_time_format_absolute_rfc3339() { + let mut args = make_args(); + args.time_format = Some(TimeFormat::AbsoluteRfc3339); + let cfg = build_tdms_config(&args).expect("build"); + assert_eq!( + cfg.time_format, + Some(ProtoTimeFormat::AbsoluteRfc3339 as i32) + ); +} + +#[test] +fn build_tdms_config_relative_start_time_parses_rfc3339() { + let mut args = make_args(); + args.time_format = Some(TimeFormat::RelativeSeconds); + args.relative_start_time = Some("2026-05-06T12:00:00Z".into()); + let cfg = build_tdms_config(&args).expect("build"); + let ts = cfg.relative_start_time.expect("timestamp"); + let expected = DateTime::parse_from_rfc3339(args.relative_start_time.as_deref().unwrap()) + .unwrap() + .timestamp(); + assert_eq!(ts.seconds, expected); + assert_eq!(ts.nanos, 0); +} + +#[test] +fn build_tdms_config_start_time_override_parses_rfc3339() { + let mut args = make_args(); + args.start_time_override = Some("2026-01-01T00:00:00Z".into()); + let cfg = build_tdms_config(&args).expect("build"); + let ts = cfg.start_time_override.expect("timestamp"); + let expected = DateTime::parse_from_rfc3339(args.start_time_override.as_deref().unwrap()) + .unwrap() + .timestamp(); + assert_eq!(ts.seconds, expected); +} + +#[test] +fn build_tdms_config_import_file_properties_with_run() { + let mut args = make_args(); + args.import_file_properties = true; + args.common.run = Some("my-run".into()); + let cfg = build_tdms_config(&args).expect("build"); + assert!(cfg.import_file_properties); +} + +#[test] +fn build_tdms_config_import_file_properties_requires_run() { + let mut args = make_args(); + args.import_file_properties = true; + let err = build_tdms_config(&args).unwrap_err(); + assert!( + err.to_string().contains("import-file-properties"), + "expected validation error, got: {err:#}" + ); +} + +#[test] +fn build_tdms_config_invalid_relative_start_time_errors() { + let mut args = make_args(); + args.relative_start_time = Some("not a date".into()); + let err = build_tdms_config(&args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("RFC3339")), + "expected RFC3339 error, got: {err:#}" + ); +} + +#[test] +fn build_tdms_config_invalid_start_time_override_errors() { + let mut args = make_args(); + args.start_time_override = Some("garbage".into()); + let err = build_tdms_config(&args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("RFC3339")), + "expected RFC3339 error, got: {err:#}" + ); +} + +#[test] +fn test_tdms_to_sift_data_type() { + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::SingleFloat(4)), + Some(ChannelDataType::Float) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::SingleFloatWithUnit(4)), + Some(ChannelDataType::Float) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::DoubleFloat(4)), + Some(ChannelDataType::Double) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::DoubleFloatWithUnit(4)), + Some(ChannelDataType::Double) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::ExtendedFloat(4)), + Some(ChannelDataType::Double) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::ExtendedFloatWithUnit(4)), + Some(ChannelDataType::Double) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::I8(4)), + Some(ChannelDataType::Int32) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::I16(4)), + Some(ChannelDataType::Int32) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::I32(4)), + Some(ChannelDataType::Int32) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::I64(4)), + Some(ChannelDataType::Int64) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::U8(4)), + Some(ChannelDataType::Uint32) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::U16(4)), + Some(ChannelDataType::Uint32) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::U32(4)), + Some(ChannelDataType::Uint32) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::U64(4)), + Some(ChannelDataType::Uint64) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::Boolean(4)), + Some(ChannelDataType::Bool) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::String), + Some(ChannelDataType::String) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::TimeStamp(4)), + Some(ChannelDataType::Int64) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::ComplexSingleFloat(4)), + Some(ChannelDataType::Float) + ); + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::ComplexDoubleFloat(4)), + Some(ChannelDataType::Double) + ); + + assert_eq!( + tdms_to_sift_data_type(TdmsDataType::FixedPoint(4)), + Some(ChannelDataType::Double) + ); + + assert_eq!(tdms_to_sift_data_type(TdmsDataType::Void), None); + assert_eq!(tdms_to_sift_data_type(TdmsDataType::DAQmxRawData), None); +} + +#[test] +fn test_find_time_channel() { + let time_ch = make_channel(TdmsDataType::TimeStamp(16), &[]); + let data_ch = make_channel(TdmsDataType::DoubleFloat(8), &[]); + + let channels = vec![ + ("Voltage".to_string(), &data_ch), + ("MyTimeChannel".to_string(), &time_ch), + ]; + assert_eq!( + find_time_channel(&channels), + Some("MyTimeChannel".to_string()) + ); + + let channels = vec![ + ("Voltage".to_string(), &data_ch), + ("Time".to_string(), &data_ch), + ]; + assert_eq!(find_time_channel(&channels), Some("Time".to_string())); + + let channels = vec![("TIMESTAMP".to_string(), &data_ch)]; + assert_eq!(find_time_channel(&channels), Some("TIMESTAMP".to_string())); + + let channels = vec![ + ("Voltage".to_string(), &data_ch), + ("Pressure".to_string(), &data_ch), + ]; + assert_eq!(find_time_channel(&channels), None); + + let channels: Vec<(String, &Channel)> = vec![]; + assert_eq!(find_time_channel(&channels), None); +} + +#[test] +fn test_is_waveform_channel() { + let ch = make_channel( + TdmsDataType::DoubleFloat(8), + &["wf_start_time", "wf_increment"], + ); + assert!(is_waveform_channel(&ch)); + + let ch = make_channel(TdmsDataType::DoubleFloat(8), &["wf_start_time"]); + assert!(!is_waveform_channel(&ch)); + + let ch = make_channel(TdmsDataType::DoubleFloat(8), &["wf_increment"]); + assert!(!is_waveform_channel(&ch)); + + let ch = make_channel(TdmsDataType::DoubleFloat(8), &[]); + assert!(!is_waveform_channel(&ch)); + + let ch = make_channel( + TdmsDataType::DoubleFloat(8), + &["unit_string", "description"], + ); + assert!(!is_waveform_channel(&ch)); + + let ch = make_channel( + TdmsDataType::DoubleFloat(8), + &["wf_start_time", "wf_increment", "unit_string"], + ); + assert!(is_waveform_channel(&ch)); +} diff --git a/rust/crates/sift_cli/src/cmd/import/utils.rs b/rust/crates/sift_cli/src/cmd/import/utils.rs index a1e5bff98..8fbc73830 100644 --- a/rust/crates/sift_cli/src/cmd/import/utils.rs +++ b/rust/crates/sift_cli/src/cmd/import/utils.rs @@ -5,9 +5,41 @@ use std::{ use anyhow::{Context, Result, anyhow}; use flate2::{Compression, write::GzEncoder}; +use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE}; use sift_rs::common::r#type::v1::{ChannelBitFieldElement, ChannelEnumType}; -use crate::cli::time::TimeFormat; +use crate::{cli::time::TimeFormat, cmd::Context as CmdContext, util::api::create_rest_client}; + +/// Gzip and upload a file to a pre-signed upload URL with the given content type. +/// Reads from the file's current cursor position. +pub async fn upload_gzipped_file( + ctx: &CmdContext, + upload_url: &str, + file: File, + content_type: &str, +) -> Result<()> { + let compressed_data = gzip_file(file)?; + let rest_client = create_rest_client(ctx).context("failed to create rest client")?; + + let res = rest_client + .post(upload_url) + .header(CONTENT_ENCODING, "gzip") + .header(CONTENT_TYPE, content_type) + .body(compressed_data) + .send() + .await + .context("failed to upload file")?; + + if !res.status().is_success() { + let status = res.status(); + let text = res + .text() + .await + .unwrap_or_else(|_| "".into()); + return Err(anyhow!("upload failed with http status {status}: {text}")); + } + Ok(()) +} /// Be sure that the file's cursor is rewinded to the start before hand. pub fn gzip_file(file: File) -> Result> { diff --git a/rust/crates/sift_cli/src/main.rs b/rust/crates/sift_cli/src/main.rs index 777ce30e2..05fed3607 100644 --- a/rust/crates/sift_cli/src/main.rs +++ b/rust/crates/sift_cli/src/main.rs @@ -75,6 +75,9 @@ fn run(clargs: cli::Args) -> Result { run_future(cmd::import::parquet::flat_dataset::run(ctx, args)) } }, + cli::ImportCmd::Tdms(args) => { + run_future(cmd::import::tdms::detect_tdms_config::run(ctx, args)) + } cli::ImportCmd::Backup(args) => match args.cmd { Some(cli::BackupCmd::Ls(ls_args)) => { run_future(cmd::import::backup::ls(ctx, ls_args))