uhlc/
lib.rs

1//
2// Copyright (c) 2017, 2020 ADLINK Technology Inc.
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11
12//! A Unique Hybrid Logical Clock.
13//!
14//! This library is an implementation of an
15//! [Hybrid Logical Clock (HLC)](https://cse.buffalo.edu/tech-reports/2014-04.pdf)
16//! associated to a unique identifier.
17//! Thus, it is able to generate timestamps that are unique across a distributed system,
18//! without the need of a centralized time source.
19//!
20//! # Quick Start
21//!
22//! ```
23//! use uhlc::HLC;
24//!
25//! // create an HLC with a generated random ID and relying on SystemTime::now()
26//! let hlc = HLC::default();
27//!
28//! // generate timestamps
29//! let ts1 = hlc.new_timestamp();
30//! let ts2 = hlc.new_timestamp();
31//! assert!(ts2 > ts1);
32//!
33//! // update the HLC with a timestamp incoming from another HLC
34//! // (typically remote, but not in this example...)
35//! let hlc2 = HLC::default();
36//! let other_ts = hlc2.new_timestamp();
37//!
38//! if ! hlc.update_with_timestamp(&other_ts).is_ok() {
39//!     println!(r#"The incoming timestamp would make this HLC
40//!              to drift too much. You should refuse it!"#);
41//! }
42//!
43//! let ts3 = hlc.new_timestamp();
44//! assert!(ts3 > ts2);
45//! assert!(ts3 > other_ts);
46//! ```
47
48#![doc(
49    html_logo_url = "https://www.rust-lang.org/logos/rust-logo-128x128-blk.png",
50    html_favicon_url = "https://www.rust-lang.org/favicon.ico",
51    html_root_url = "https://atolab.github.io/uhlc-rs/"
52)]
53#![cfg_attr(not(feature = "std"), no_std)]
54
55use core::cmp;
56use core::time::Duration;
57
58#[cfg(feature = "std")]
59use {
60    lazy_static::lazy_static,
61    std::env::var,
62    std::sync::Mutex,
63    std::time::{SystemTime, UNIX_EPOCH},
64};
65
66#[cfg(not(feature = "std"))]
67use spin::Mutex; // No_std-friendly alternative to std::sync::Mutex
68
69mod id;
70pub use id::*;
71
72mod ntp64;
73pub use ntp64::*;
74
75mod timestamp;
76pub use timestamp::*;
77
78/// The size of counter part in [`NTP64`] (in bits)
79pub const CSIZE: u8 = 4u8;
80// Bit-mask of the counter part within the 64 bits time
81const CMASK: u64 = (1u64 << CSIZE) - 1u64;
82// Bit-mask of the logical clock part within the 64 bits time
83const LMASK: u64 = !CMASK;
84
85// HLC Delta in milliseconds: maximum accepted drift for an external timestamp.
86// I.e.: if an incoming timestamp has a time > now() + delta, then the HLC is not updated.
87const DEFAULT_DELTA_MS: u64 = 500;
88#[cfg(feature = "std")]
89lazy_static! {
90    static ref DELTA_MS: u64 = match var("UHLC_MAX_DELTA_MS") {
91        Ok(s) => s.parse().unwrap_or_else(|e| panic!(
92            "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}={} : {}",
93            s, e
94        )),
95        Err(std::env::VarError::NotPresent) => DEFAULT_DELTA_MS,
96        Err(e) => panic!(
97            "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}: {}",
98            e
99        ),
100    };
101}
102#[cfg(not(feature = "std"))]
103static DELTA_MS: &u64 = &DEFAULT_DELTA_MS; // Environment variables do not make sense in no_std environment
104
105///
106/// The builder of [`HLC`].
107///
108/// # Examples
109///
110/// ```
111/// use std::{convert::TryFrom, time::Duration};
112/// use uhlc::{HLCBuilder, ID};
113///
114/// let default_hlc = HLCBuilder::new().build();
115/// println!("{}", default_hlc.new_timestamp());
116///
117/// let custom_hlc = HLCBuilder::new()
118///    .with_id(ID::try_from([0x01, 0x02, 0x03]).unwrap())
119///    .with_max_delta(Duration::from_secs(1))
120///    .build();
121/// println!("{}", custom_hlc.new_timestamp());
122pub struct HLCBuilder {
123    hlc: HLC,
124}
125
126impl HLCBuilder {
127    ///
128    /// Constructs a new HLCBuilder for the creation of an [`HLC`], with the following default configuration:
129    ///  * a random u128 as HLC identifier.
130    ///    Can be changed calling [`Self::with_id()`].
131    ///  * [`system_time_clock()`] as physical clock (i.e. the ).
132    ///    Can be changed calling [`Self::with_clock()`].
133    ///  * 500 millisecond as maximum delta (i.e. the maximum accepted drift for an external timestamp).
134    ///    Can be changed calling [`Self::with_max_delta()`].
135    ///
136    pub fn new() -> HLCBuilder {
137        HLCBuilder::default()
138    }
139
140    ///
141    /// Configure a specific identifier for the HLC to be created.
142    ///
143    /// **NOTE: the identifier must be unique in the system.**
144    ///
145    pub fn with_id(mut self, id: ID) -> HLCBuilder {
146        self.hlc.id = id;
147        self
148    }
149
150    ///
151    /// Configure a specific physical clock for the HLC to be created.
152    ///
153    /// The `clock` parameter must be a function returning a new physical time (as an [`NTP64`] at each call.
154    /// The time returned by this clock doesn't need to be monotonic: when the HLC generates a new timestamp from this time,
155    /// it first checks if this time is greater than the previously generated timestamp. If not, the new timestamp it the previous one +1.
156    ///
157    pub fn with_clock(mut self, clock: fn() -> NTP64) -> HLCBuilder {
158        self.hlc.clock = clock;
159        self
160    }
161
162    ///
163    /// Configure the maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
164    ///
165    pub fn with_max_delta(mut self, delta: Duration) -> HLCBuilder {
166        self.hlc.delta = delta.into();
167        self
168    }
169
170    pub fn build(self) -> HLC {
171        self.hlc
172    }
173}
174
175impl Default for HLCBuilder {
176    fn default() -> Self {
177        HLCBuilder {
178            hlc: HLC {
179                id: ID::rand(),
180                #[cfg(feature = "std")]
181                clock: system_time_clock,
182                #[cfg(not(feature = "std"))]
183                clock: zero_clock,
184                delta: NTP64::from(Duration::from_millis(*DELTA_MS)),
185                last_time: Default::default(),
186            },
187        }
188    }
189}
190
191/// An Hybric Logical Clock generating [`Timestamp`]s
192pub struct HLC {
193    id: ID,
194    clock: fn() -> NTP64,
195    delta: NTP64,
196    last_time: Mutex<NTP64>,
197}
198
199#[cfg(feature = "std")]
200macro_rules! lock {
201    ($var:expr) => {
202        match $var.try_lock() {
203            Ok(guard) => guard,
204            Err(_) => $var.lock().unwrap(),
205        }
206    };
207}
208
209#[cfg(not(feature = "std"))]
210macro_rules! lock {
211    ($var:expr) => {
212        $var.lock()
213    };
214}
215
216impl HLC {
217    /// Generate a new [`Timestamp`].
218    ///
219    /// This timestamp is unique in the system and is always greater
220    /// than the latest timestamp generated by the HLC and than the
221    /// latest incoming timestamp that was used to update this [`HLC`]
222    /// (using [`HLC::update_with_timestamp()`]).
223    ///
224    /// # Examples
225    ///
226    /// ```
227    /// use uhlc::HLC;
228    ///
229    /// let hlc = HLC::default();
230    /// let ts1 =  hlc.new_timestamp();
231    /// let ts2 =  hlc.new_timestamp();
232    /// assert!(ts2 > ts1);
233    /// ```
234    pub fn new_timestamp(&self) -> Timestamp {
235        let mut now = (self.clock)();
236        now.0 &= LMASK;
237        let mut last_time = lock!(self.last_time);
238        if now.0 > (last_time.0 & LMASK) {
239            *last_time = now
240        } else {
241            *last_time += 1;
242        }
243        Timestamp::new(*last_time, self.id)
244    }
245
246    /// Returns the HLC [`ID`].
247    ///
248    /// This ID is the specific identifier for this HLC instance.
249    ///
250    pub fn get_id(&self) -> &ID {
251        &self.id
252    }
253
254    /// Returns the HLC delta as [`NTP64`].
255    ///
256    /// The maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
257    ///
258    pub fn get_delta(&self) -> &NTP64 {
259        &self.delta
260    }
261
262    /// Update this [`HLC`] with a [`Timestamp`].
263    ///
264    /// Typically, this timestamp should have been generated by another HLC.
265    /// If the timestamp exceeds the current time of this HLC by more than the configured maximum delta
266    /// (see [`HLCBuilder::with_max_delta()`]) an [`Err`] is returned.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use uhlc::HLC;
272    ///
273    /// let hlc1 = HLC::default();
274    ///
275    /// // update the HLC with a timestamp incoming from another HLC
276    /// // (typically remote, but not in this example...)
277    /// let hlc2 = HLC::default();
278    /// let other_ts = hlc2.new_timestamp();
279    /// if ! hlc1.update_with_timestamp(&other_ts).is_ok() {
280    ///     println!(r#"The incoming timestamp would make this HLC
281    ///              to drift too much. You should refuse it!"#);
282    /// }
283    ///
284    /// let ts = hlc1.new_timestamp();
285    /// assert!(ts > other_ts);
286    /// ```
287    pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), ExceedingDeltaError> {
288        let mut now = (self.clock)();
289        now.0 &= LMASK;
290        let msg_time = timestamp.get_time();
291        if *msg_time > now && *msg_time - now > self.delta {
292            let err = ExceedingDeltaError {
293                id: *timestamp.get_id(),
294                duration: self.delta.to_duration().as_millis(),
295                msg_time: *msg_time,
296                now,
297            };
298
299            #[cfg(feature = "std")]
300            log::warn!("{}", err);
301            #[cfg(feature = "defmt")]
302            defmt::warn!("{}", err);
303            Err(err)
304        } else {
305            let mut last_time = lock!(self.last_time);
306            let max_time = cmp::max(cmp::max(now, *msg_time), *last_time);
307            if max_time == now {
308                *last_time = now;
309            } else if max_time == *msg_time {
310                *last_time = *msg_time + 1;
311            } else {
312                *last_time += 1;
313            }
314            Ok(())
315        }
316    }
317}
318
319#[derive(Debug, PartialEq, Eq)]
320pub struct ExceedingDeltaError {
321    id: ID,
322    duration: u128,
323    msg_time: NTP64,
324    now: NTP64,
325}
326
327impl core::fmt::Display for ExceedingDeltaError {
328    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
329        write!(
330            f,
331            "incoming timestamp from {} exceeding delta {}ms is rejected: {:#} vs. now: {:#}",
332            self.id, self.duration, self.msg_time, self.now
333        )
334    }
335}
336
337#[cfg(feature = "std")]
338impl std::error::Error for ExceedingDeltaError {}
339
340impl Default for HLC {
341    /// Create a new [`HLC`] with a random u128 ID and using
342    /// [`system_time_clock()`] as physical clock.
343    /// This is equivalent to `HLCBuilder::default().build()`
344    fn default() -> Self {
345        HLCBuilder::default().build()
346    }
347}
348
349/// A physical clock relying on std::time::SystemTime::now().
350///
351/// It returns a NTP64 relative to std::time::UNIX_EPOCH (1st Jan 1970).
352/// That's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
353///
354#[inline]
355#[cfg(feature = "std")]
356pub fn system_time_clock() -> NTP64 {
357    NTP64::from(SystemTime::now().duration_since(UNIX_EPOCH).unwrap())
358}
359
360/// A physical clock relying on CLOCK_MONOTONIC.
361///
362/// Note: The time produced by this clock is relative to an unspecified starting point (likely the host boot).
363/// Therefore, comparing and ordering times produced by different instances of this clock on different hosts would probably be meaningless.
364///
365/// See: https://linux.die.net/man/2/clock_gettime
366///
367#[inline]
368#[cfg(all(feature = "nix", target_family = "unix"))]
369pub fn monotonic_time_clock() -> NTP64 {
370    let now = nix::time::ClockId::CLOCK_MONOTONIC.now().unwrap();
371    NTP64::from(now)
372}
373
374/// A dummy clock that returns a NTP64 initialized with the value 0.
375/// Suitable to use in no_std environments where std::time::{SystemTime, UNIX_EPOCH} are not available.
376/// If the feature `std` is disabled, that's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
377/// Notice that this means that the [`HLC`] will use incremental timestamps starting from 0.
378#[inline]
379pub fn zero_clock() -> NTP64 {
380    NTP64(0)
381}
382
383#[cfg(test)]
384mod tests {
385    use crate::*;
386    use core::time::Duration;
387
388    #[cfg(feature = "std")]
389    fn is_sorted(vec: &[Timestamp]) -> bool {
390        let mut it = vec.iter();
391        let mut ts = it.next().unwrap();
392        for next in it {
393            if next <= ts {
394                return false;
395            };
396            ts = next;
397        }
398        true
399    }
400
401    #[cfg(feature = "std")]
402    #[test]
403    fn hlc_parallel() {
404        use async_std::sync::Arc;
405        use async_std::task;
406        use core::convert::TryFrom;
407        use futures::join;
408        use std::vec::Vec;
409
410        task::block_on(async {
411            let id0: ID = ID::try_from([0x01]).unwrap();
412            let id1: ID = ID::try_from([0x02]).unwrap();
413            let id2: ID = ID::try_from([0x03]).unwrap();
414            let id3: ID = ID::try_from([0x04]).unwrap();
415            let hlc0 = Arc::new(HLCBuilder::new().with_id(id0).build());
416            let hlc1 = Arc::new(HLCBuilder::new().with_id(id1).build());
417            let hlc2 = Arc::new(HLCBuilder::new().with_id(id2).build());
418            let hlc3 = Arc::new(HLCBuilder::new().with_id(id3).build());
419
420            // Make 4 tasks to generate 10000 timestamps each with distinct HLCs,
421            // and also to update each other HLCs
422            const NB_TIME: usize = 10000;
423            let t0 = {
424                let hlc0 = hlc0.clone();
425                let hlc1 = hlc1.clone();
426                task::spawn(async move {
427                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
428                    for _ in 0..NB_TIME {
429                        let ts = hlc0.new_timestamp();
430                        assert!(hlc1.update_with_timestamp(&ts).is_ok());
431                        times.push(ts)
432                    }
433                    times
434                })
435            };
436            let t1 = {
437                let hlc1 = hlc1.clone();
438                let hlc2 = hlc2.clone();
439                task::spawn(async move {
440                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
441                    for _ in 0..NB_TIME {
442                        let ts = hlc1.new_timestamp();
443                        assert!(hlc2.update_with_timestamp(&ts).is_ok());
444                        times.push(ts)
445                    }
446                    times
447                })
448            };
449            let t2 = {
450                let hlc2 = hlc3.clone();
451                let hlc3 = hlc3.clone();
452                task::spawn(async move {
453                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
454                    for _ in 0..NB_TIME {
455                        let ts = hlc2.new_timestamp();
456                        assert!(hlc3.update_with_timestamp(&ts).is_ok());
457                        times.push(ts)
458                    }
459                    times
460                })
461            };
462            let t3 = {
463                let hlc3 = hlc3.clone();
464                let hlc0 = hlc0.clone();
465                task::spawn(async move {
466                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
467                    for _ in 0..NB_TIME {
468                        let ts = hlc3.new_timestamp();
469                        assert!(hlc0.update_with_timestamp(&ts).is_ok());
470                        times.push(ts)
471                    }
472                    times
473                })
474            };
475            let vecs = join!(t0, t1, t2, t3);
476
477            // test that each timeseries is sorted (i.e. monotonic time)
478            assert!(is_sorted(&vecs.0));
479            assert!(is_sorted(&vecs.1));
480            assert!(is_sorted(&vecs.2));
481            assert!(is_sorted(&vecs.3));
482
483            // test that there is no duplicate amongst all timestamps
484            let mut all_times: Vec<Timestamp> = vecs
485                .0
486                .into_iter()
487                .chain(vecs.1)
488                .chain(vecs.2)
489                .chain(vecs.3)
490                .collect::<Vec<Timestamp>>();
491            assert_eq!(NB_TIME * 4, all_times.len());
492            all_times.sort();
493            all_times.dedup();
494            assert_eq!(NB_TIME * 4, all_times.len());
495        });
496    }
497
498    #[test]
499    fn hlc_update_with_timestamp() {
500        let id: ID = ID::rand();
501        let hlc = HLCBuilder::new().with_id(id).build();
502
503        // Test that updating with an old Timestamp don't break the HLC
504        let past_ts = Timestamp::new(Default::default(), id);
505        let now_ts = hlc.new_timestamp();
506        assert!(hlc.update_with_timestamp(&past_ts).is_ok());
507        assert!(hlc.new_timestamp() > now_ts);
508
509        // Test that updating with a Timestamp exceeding the delta is refused
510        let now_ts = hlc.new_timestamp();
511        let future_time = now_ts.get_time() + NTP64::from(Duration::from_millis(1000));
512        let future_ts = Timestamp::new(future_time, id);
513        assert!(hlc.update_with_timestamp(&future_ts).is_err())
514    }
515
516    #[cfg(all(feature = "nix", target_family = "unix", feature = "std"))]
517    #[test]
518    fn hlc_nix_monotonic() {
519        let hlc = HLCBuilder::new().with_clock(monotonic_time_clock).build();
520        let t1 = monotonic_time_clock();
521        // Sleep for (CMASK + 1) nanoseconds since HLC clock strips off the lower CMASK bits.
522        std::thread::sleep(Duration::from_nanos(CMASK + 1));
523        let t2 = hlc.new_timestamp();
524        std::thread::sleep(Duration::from_nanos(1));
525        let t3 = monotonic_time_clock();
526
527        assert!(t2.get_time() > &t1);
528        assert!(&t3 > t2.get_time());
529    }
530}