From 8eb9da4ee4ce53cd9157ecb523749fb699e446c3 Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Tue, 14 Jun 2016 22:26:01 +0000 Subject: [PATCH 1/2] Replace time with std::time --- Cargo.toml | 1 - src/delay_queue.rs | 31 +++++++++++++------------------ src/lib.rs | 1 - src/linked_queue.rs | 26 +++++++++++++------------- src/thread_pool.rs | 4 ++-- test/test.rs | 13 ++++++------- test/test_delay_queue.rs | 24 ++++++++++++------------ test/test_linked_queue.rs | 14 +++++++------- test/test_scheduled_pool.rs | 24 ++++++++++++------------ 9 files changed, 65 insertions(+), 73 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 86918fa..b16ad97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ exclude = [ [dependencies] log = "0.3.2" -time = "0.1.32" [dev-dependencies] env_logger = "0.3.1" diff --git a/src/delay_queue.rs b/src/delay_queue.rs index 17d4166..fd4e6ef 100644 --- a/src/delay_queue.rs +++ b/src/delay_queue.rs @@ -3,7 +3,7 @@ use std::collections::BinaryHeap; use std::cmp::{self, PartialOrd, Ord, PartialEq, Eq, Ordering}; use std::ops; use std::sync::{Arc, Mutex, MutexGuard, Condvar}; -use time::{Duration, SteadyTime}; +use std::time::{Duration, Instant}; /// A value that should not be used until the delay has expired. pub trait Delayed { @@ -15,7 +15,7 @@ impl Delayed for Option { fn delay(&self) -> Duration { match *self { Some(ref v) => v.delay(), - None => Duration::nanoseconds(0), + None => Duration::new(0, 0), } } } @@ -77,12 +77,12 @@ impl DelayQueue { /// Takes from the queue, blocking for up to `timeout`. pub fn poll_timeout(&self, timeout: Duration) -> Option { - let end = SteadyTime::now() + timeout; + let end = Instant::now() + timeout; let mut queue = self.inner.queue.lock().unwrap(); loop { - let now = SteadyTime::now(); + let now = Instant::now(); if now >= end { return None; } @@ -93,10 +93,9 @@ impl DelayQueue { None => end, }; - // TODO: Check the cast - let timeout = (wait_until - now).num_milliseconds() as u32; + let timeout = wait_until - now; - queue = self.inner.condvar.wait_timeout_ms(queue, timeout).unwrap().0; + queue = self.inner.condvar.wait_timeout(queue, timeout).unwrap().0; } Some(self.finish_pop(queue)) @@ -116,7 +115,7 @@ impl Queue for DelayQueue { let queue = self.inner.queue.lock().unwrap(); match queue.peek() { - Some(e) if e.time > SteadyTime::now() => return None, + Some(e) if e.time > Instant::now() => return None, Some(_) => {} None => return None, } @@ -132,12 +131,10 @@ impl Queue for DelayQueue { fn offer(&self, e: T) -> Result<(), T> { let delay = e.delay(); - assert!(delay >= Duration::milliseconds(0), "delay must be greater or equal to 0"); - let entry = Entry::new(e, delay); let mut queue = self.inner.queue.lock().unwrap(); - trace!("offering value to delay queue; delay={}", delay.num_milliseconds()); + trace!("offering value to delay queue; delay={:?}", delay); match queue.peek() { Some(e) => { @@ -165,7 +162,7 @@ impl SyncQueue for DelayQueue { let mut queue = self.inner.queue.lock().unwrap(); loop { - let now = SteadyTime::now(); + let now = Instant::now(); trace!("peeking into queue"); let need = match queue.peek() { @@ -187,11 +184,9 @@ impl SyncQueue for DelayQueue { self.inner.condvar.wait(queue).unwrap() } Need::WaitTimeout(t) => { - // TODO: Check the cast - let timeout = t.num_milliseconds() as u32; - trace!("condvar wait; timeout={:?}; t={:?}", timeout, t.num_milliseconds()); + trace!("condvar wait; t={:?}", t); - self.inner.condvar.wait_timeout_ms(queue, timeout).unwrap().0 + self.inner.condvar.wait_timeout(queue, t).unwrap().0 } }; } @@ -212,14 +207,14 @@ impl Clone for DelayQueue { struct Entry { val: T, - time: SteadyTime, + time: Instant, } impl Entry { fn new(val: T, delay: Duration) -> Entry { Entry { val: val, - time: SteadyTime::now() + delay, + time: Instant::now() + delay, } } } diff --git a/src/lib.rs b/src/lib.rs index 6e01895..28e0e34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,6 @@ #[macro_use] extern crate log; -extern crate time; pub use self::array_queue::ArrayQueue; pub use self::delay_queue::{DelayQueue, Delayed, Delay}; diff --git a/src/linked_queue.rs b/src/linked_queue.rs index 44a221b..b9dd8bd 100644 --- a/src/linked_queue.rs +++ b/src/linked_queue.rs @@ -1,5 +1,5 @@ use super::{Queue, SyncQueue}; -use time; +use std::time::Instant; use std::{mem, ptr, ops, usize, u32}; use std::sync::{Arc, Mutex, MutexGuard, Condvar}; use std::sync::atomic::{self, AtomicUsize, Ordering}; @@ -199,7 +199,7 @@ impl QueueInner { .ok().expect("something went wrong"); if self.len() == self.capacity { - let mut now = time::precise_time_ns(); + let mut now = Instant::now(); loop { if dur == 0 { @@ -213,14 +213,14 @@ impl QueueInner { break; } - let n = time::precise_time_ns(); - let d = (n - now) / 1_000_000; + let elapsed = now.elapsed(); + let d = elapsed.as_secs() as u32 * 1000 + elapsed.subsec_nanos() / 1000_000; - if d >= dur as u64 { + if d >= dur { dur = 0; } else { - dur -= d as u32; - now = n; + dur -= d; + now = Instant::now(); } } } @@ -262,7 +262,7 @@ impl QueueInner { .ok().expect("something went wrong"); if self.len() == 0 { - let mut now = time::precise_time_ns(); + let mut now = Instant::now(); loop { if dur == 0 { @@ -276,14 +276,14 @@ impl QueueInner { break; } - let n = time::precise_time_ns(); - let d = (n - now) / 1_000_000; + let elapsed = now.elapsed(); + let d = elapsed.as_secs() as u32 * 1000 + elapsed.subsec_nanos() / 1000_000; - if d >= dur as u64 { + if d >= dur { dur = 0; } else { - dur -= d as u32; - now = n; + dur -= d; + now = Instant::now(); } } } diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 931c0f5..898a068 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{thread, usize}; -use time::Duration; +use std::time::Duration; /// A queue that can be used to back a thread pool pub trait WorkQueue : SyncQueue> + Clone + Send + 'static { @@ -98,7 +98,7 @@ impl ScheduledThreadPool { } pub fn schedule_ms(&self, delay: u32, task: T) { - let delay = Duration::milliseconds(delay as i64); + let delay = Duration::from_millis(delay as u64); let task = Scheduled::Delayed(Box::new(task), delay); self.thread_pool.inner.run(task, false); diff --git a/test/test.rs b/test/test.rs index cdf0044..8bd1b8a 100644 --- a/test/test.rs +++ b/test/test.rs @@ -2,7 +2,6 @@ extern crate syncbox; #[macro_use] extern crate log; -extern crate time; extern crate env_logger; mod test_delay_queue; @@ -15,20 +14,20 @@ fn spawn(f: F) { thread::spawn(f); } -fn sleep_ms(ms: usize) { +fn sleep_ms(ms: u64) { use std::thread; - use time::precise_time_ns; + use std::time::{Duration, Instant}; - let start = precise_time_ns(); - let target = start + (ms as u64) * 1_000_000; + let start = Instant::now(); + let target = start + Duration::from_millis(ms); loop { - let now = precise_time_ns(); + let now = Instant::now(); if now > target { return; } - thread::park_timeout_ms(((target - now) / 1_000_000) as u32); + thread::park_timeout(target - now); } } diff --git a/test/test_delay_queue.rs b/test/test_delay_queue.rs index 860b81c..d836900 100644 --- a/test/test_delay_queue.rs +++ b/test/test_delay_queue.rs @@ -1,13 +1,13 @@ use syncbox::*; -use time::Duration; +use std::time::Duration; #[test] fn test_ordering() { let queue = DelayQueue::new(); - queue.offer(Delay(1i32, Duration::milliseconds(30))).unwrap(); - queue.offer(Delay(2i32, Duration::milliseconds(10))).unwrap(); - queue.offer(Delay(3i32, Duration::milliseconds(20))).unwrap(); + queue.offer(Delay(1i32, Duration::from_millis(30))).unwrap(); + queue.offer(Delay(2i32, Duration::from_millis(10))).unwrap(); + queue.offer(Delay(3i32, Duration::from_millis(20))).unwrap(); assert_eq!(2, *queue.take()); assert_eq!(3, *queue.take()); @@ -18,8 +18,8 @@ fn test_ordering() { fn test_poll() { let queue = DelayQueue::new(); - queue.offer(Delay(1i32, Duration::nanoseconds(0))).unwrap(); - queue.offer(Delay(2i32, Duration::days(1))).unwrap(); + queue.offer(Delay(1i32, Duration::new(0, 0))).unwrap(); + queue.offer(Delay(2i32, Duration::from_secs(86400))).unwrap(); assert_eq!(1, *queue.poll().unwrap()); assert_eq!(None, queue.poll()); @@ -29,11 +29,11 @@ fn test_poll() { fn test_poll_timeout() { let queue = DelayQueue::new(); - queue.offer(Delay(1i32, Duration::nanoseconds(0))).unwrap(); - queue.offer(Delay(2i32, Duration::milliseconds(250))).unwrap(); - queue.offer(Delay(3i32, Duration::days(1))).unwrap(); + queue.offer(Delay(1i32, Duration::new(0, 0))).unwrap(); + queue.offer(Delay(2i32, Duration::from_millis(250))).unwrap(); + queue.offer(Delay(3i32, Duration::from_secs(86400))).unwrap(); - assert_eq!(1, *queue.poll_timeout(Duration::milliseconds(250)).unwrap()); - assert_eq!(2, *queue.poll_timeout(Duration::milliseconds(500)).unwrap()); - assert_eq!(None, queue.poll_timeout(Duration::milliseconds(500))); + assert_eq!(1, *queue.poll_timeout(Duration::from_millis(250)).unwrap()); + assert_eq!(2, *queue.poll_timeout(Duration::from_millis(500)).unwrap()); + assert_eq!(None, queue.poll_timeout(Duration::from_millis(500))); } diff --git a/test/test_linked_queue.rs b/test/test_linked_queue.rs index ef894c6..5cd91c9 100644 --- a/test/test_linked_queue.rs +++ b/test/test_linked_queue.rs @@ -1,6 +1,6 @@ use {spawn, sleep_ms}; use syncbox::LinkedQueue; -use time; +use std::time::{Duration, Instant}; use std::thread; #[test] @@ -32,21 +32,21 @@ pub fn test_single_threaded_offer_timeout() { q.offer(1).unwrap(); - let now = time::precise_time_ns(); + let now = Instant::now(); let result = q.offer_ms(2, 200); - let delta = time::precise_time_ns() - now; + let delta = now.elapsed(); assert!(result.is_err()); - assert!(delta >= 200_000_000, "actual={}", delta); + assert!(delta >= Duration::from_millis(200), "actual={:?}", delta); } #[test] pub fn test_single_threaded_poll_timeout() { let q = LinkedQueue::::new(); - let now = time::precise_time_ns(); + let now = Instant::now(); q.poll_ms(200); - let delta = time::precise_time_ns() - now; - assert!(delta >= 200_000_000, "actual={}", delta); + let delta = now.elapsed(); + assert!(delta >= Duration::from_millis(200), "actual={:?}", delta); } #[test] diff --git a/test/test_scheduled_pool.rs b/test/test_scheduled_pool.rs index 8af1a05..f59e689 100644 --- a/test/test_scheduled_pool.rs +++ b/test/test_scheduled_pool.rs @@ -1,17 +1,17 @@ use syncbox::{ScheduledThreadPool}; use std::sync::mpsc::*; use std::thread; -use time::{SteadyTime, Duration}; +use std::time::{Instant, Duration}; #[test] pub fn test_one_thread_one_task() { let tp = ScheduledThreadPool::single_thread(); let (tx, rx) = channel(); - let start = SteadyTime::now(); + let start = Instant::now(); tp.schedule_ms(500, move || { - tx.send(SteadyTime::now() - start > ms(500)).unwrap(); + tx.send(start.elapsed() > ms(500)).unwrap(); }); assert!(rx.recv().unwrap()); @@ -22,19 +22,19 @@ pub fn test_one_thread_two_tasks() { let tp = ScheduledThreadPool::single_thread(); let (tx, rx) = channel(); - let start = SteadyTime::now(); + let start = Instant::now(); { let tx = tx.clone(); tp.schedule_ms(500, move || { - tx.send(("one", SteadyTime::now() - start > ms(500))).unwrap(); + tx.send(("one", start.elapsed() > ms(500))).unwrap(); }); } { let tx = tx.clone(); tp.schedule_ms(200, move || { - tx.send(("two", SteadyTime::now() - start > ms(200))).unwrap(); + tx.send(("two", start.elapsed() > ms(200))).unwrap(); }); } @@ -47,12 +47,12 @@ pub fn test_two_threads() { let tp = ScheduledThreadPool::fixed_size(2); let (tx, rx) = channel(); - let start = SteadyTime::now(); + let start = Instant::now(); { let tx = tx.clone(); tp.schedule_ms(500, move || { - assert!(SteadyTime::now() - start > ms(500)); + assert!(start.elapsed() > ms(500)); tx.send("win").unwrap(); }); } @@ -60,9 +60,9 @@ pub fn test_two_threads() { { let tx = tx.clone(); tp.schedule_ms(100, move || { - assert!(SteadyTime::now() - start > ms(100)); + assert!(start.elapsed() > ms(100)); tx.send("start").unwrap(); - thread::sleep_ms(2000); + thread::sleep(Duration::from_secs(2)); tx.send("end").unwrap(); }); } @@ -73,6 +73,6 @@ pub fn test_two_threads() { assert_eq!(vals, &["start", "win", "end"]); } -fn ms(ms: u32) -> Duration { - Duration::milliseconds(ms as i64) +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) } From 47a6e1aa4bcbbb085ff2bfc2fb13864f4bf5fd71 Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Tue, 14 Jun 2016 22:54:37 +0000 Subject: [PATCH 2/2] Fix nightly warning, raise minimum rust version to 1.8.0 --- .travis.yml | 2 +- src/thread_pool.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index cb1eb60..085ff60 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ language: rust rust: - nightly - stable - - 1.3.0 # Oldest supported version of Rust + - 1.8.0 # Oldest supported version of Rust script: - cargo test diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 898a068..12e4c3e 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -1,4 +1,4 @@ -use {LinkedQueue, Queue, SyncQueue, Run, Task, Delayed, DelayQueue}; +use {LinkedQueue, SyncQueue, Run, Task, Delayed, DelayQueue}; use std::marker::PhantomData; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicUsize, Ordering};