uhlc/
lib.rs

1//
2// Copyright (c) 2017, 2020 ADLINK Technology Inc.
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11
12//! A Unique Hybrid Logical Clock.
13//!
14//! This library is an implementation of an
15//! [Hybrid Logical Clock (HLC)](https://cse.buffalo.edu/tech-reports/2014-04.pdf)
16//! associated to a unique identifier.
17//! Thus, it is able to generate timestamps that are unique across a distributed system,
18//! without the need of a centralized time source.
19//!
20//! # Quick Start
21//!
22//! ```
23//! use uhlc::HLC;
24//!
25//! // create an HLC with a generated random ID and relying on SystemTime::now()
26//! let hlc = HLC::default();
27//!
28//! // generate timestamps
29//! let ts1 = hlc.new_timestamp();
30//! let ts2 = hlc.new_timestamp();
31//! assert!(ts2 > ts1);
32//!
33//! // update the HLC with a timestamp incoming from another HLC
34//! // (typically remote, but not in this example...)
35//! let hlc2 = HLC::default();
36//! let other_ts = hlc2.new_timestamp();
37//!
38//! if ! hlc.update_with_timestamp(&other_ts).is_ok() {
39//!     println!(r#"The incoming timestamp would make this HLC
40//!              to drift too much. You should refuse it!"#);
41//! }
42//!
43//! let ts3 = hlc.new_timestamp();
44//! assert!(ts3 > ts2);
45//! assert!(ts3 > other_ts);
46//! ```
47
48#![doc(
49    html_logo_url = "https://www.rust-lang.org/logos/rust-logo-128x128-blk.png",
50    html_favicon_url = "https://www.rust-lang.org/favicon.ico",
51    html_root_url = "https://atolab.github.io/uhlc-rs/"
52)]
53#![cfg_attr(not(feature = "std"), no_std)]
54extern crate alloc;
55
56use alloc::{format, string::String};
57use core::cmp;
58use core::time::Duration;
59
60#[cfg(feature = "std")]
61use {
62    lazy_static::lazy_static,
63    std::env::var,
64    std::sync::Mutex,
65    std::time::{SystemTime, UNIX_EPOCH},
66};
67
68#[cfg(not(feature = "std"))]
69use spin::Mutex; // No_std-friendly alternative to std::sync::Mutex
70
71mod id;
72pub use id::*;
73
74mod ntp64;
75pub use ntp64::*;
76
77mod timestamp;
78pub use timestamp::*;
79
80/// The size of counter part in [`NTP64`] (in bits)
81pub const CSIZE: u8 = 4u8;
82// Bit-mask of the counter part within the 64 bits time
83const CMASK: u64 = (1u64 << CSIZE) - 1u64;
84// Bit-mask of the logical clock part within the 64 bits time
85const LMASK: u64 = !CMASK;
86
87// HLC Delta in milliseconds: maximum accepted drift for an external timestamp.
88// I.e.: if an incoming timestamp has a time > now() + delta, then the HLC is not updated.
89const DEFAULT_DELTA_MS: u64 = 500;
90#[cfg(feature = "std")]
91lazy_static! {
92    static ref DELTA_MS: u64 = match var("UHLC_MAX_DELTA_MS") {
93        Ok(s) => s.parse().unwrap_or_else(|e| panic!(
94            "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}={} : {}",
95            s, e
96        )),
97        Err(std::env::VarError::NotPresent) => DEFAULT_DELTA_MS,
98        Err(e) => panic!(
99            "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}: {}",
100            e
101        ),
102    };
103}
104#[cfg(not(feature = "std"))]
105static DELTA_MS: &u64 = &DEFAULT_DELTA_MS; // Environment variables do not make sense in no_std environment
106
107///
108/// The builder of [`HLC`].
109///
110/// # Examples
111///
112/// ```
113/// use std::{convert::TryFrom, time::Duration};
114/// use uhlc::{HLCBuilder, ID};
115///
116/// let default_hlc = HLCBuilder::new().build();
117/// println!("{}", default_hlc.new_timestamp());
118///
119/// let custom_hlc = HLCBuilder::new()
120///    .with_id(ID::try_from([0x01, 0x02, 0x03]).unwrap())
121///    .with_max_delta(Duration::from_secs(1))
122///    .build();
123/// println!("{}", custom_hlc.new_timestamp());
124pub struct HLCBuilder {
125    hlc: HLC,
126}
127
128impl HLCBuilder {
129    ///
130    /// Constructs a new HLCBuilder for the creation of an [`HLC`], with the following default configuration:
131    ///  * a random u128 as HLC identifier.
132    ///    Can be changed calling [`Self::with_id()`].
133    ///  * [`system_time_clock()`] as physical clock (i.e. the ).
134    ///    Can be changed calling [`Self::with_clock()`].
135    ///  * 500 millisecond as maximum delta (i.e. the maximum accepted drift for an external timestamp).
136    ///    Can be changed calling [`Self::with_max_delta()`].
137    ///
138    pub fn new() -> HLCBuilder {
139        HLCBuilder::default()
140    }
141
142    ///
143    /// Configure a specific identifier for the HLC to be created.
144    ///
145    /// **NOTE: the identifier must be unique in the system.**
146    ///
147    pub fn with_id(mut self, id: ID) -> HLCBuilder {
148        self.hlc.id = id;
149        self
150    }
151
152    ///
153    /// Configure a specific physical clock for the HLC to be created.
154    ///
155    /// The `clock` parameter must be a function returning a new physical time (as an [`NTP64`] at each call.
156    /// The time returned by this clock doesn't need to be monotonic: when the HLC generates a new timestamp from this time,
157    /// it first checks if this time is greater than the previously generated timestamp. If not, the new timestamp it the previous one +1.
158    ///
159    pub fn with_clock(mut self, clock: fn() -> NTP64) -> HLCBuilder {
160        self.hlc.clock = clock;
161        self
162    }
163
164    ///
165    /// Configure the maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
166    ///
167    pub fn with_max_delta(mut self, delta: Duration) -> HLCBuilder {
168        self.hlc.delta = delta.into();
169        self
170    }
171
172    pub fn build(self) -> HLC {
173        self.hlc
174    }
175}
176
177impl Default for HLCBuilder {
178    fn default() -> Self {
179        HLCBuilder {
180            hlc: HLC {
181                id: ID::rand(),
182                #[cfg(feature = "std")]
183                clock: system_time_clock,
184                #[cfg(not(feature = "std"))]
185                clock: zero_clock,
186                delta: NTP64::from(Duration::from_millis(*DELTA_MS)),
187                last_time: Default::default(),
188            },
189        }
190    }
191}
192
193/// An Hybric Logical Clock generating [`Timestamp`]s
194pub struct HLC {
195    id: ID,
196    clock: fn() -> NTP64,
197    delta: NTP64,
198    last_time: Mutex<NTP64>,
199}
200
201#[cfg(feature = "std")]
202macro_rules! lock {
203    ($var:expr) => {
204        match $var.try_lock() {
205            Ok(guard) => guard,
206            Err(_) => $var.lock().unwrap(),
207        }
208    };
209}
210
211#[cfg(not(feature = "std"))]
212macro_rules! lock {
213    ($var:expr) => {
214        $var.lock()
215    };
216}
217
218impl HLC {
219    /// Generate a new [`Timestamp`].
220    ///
221    /// This timestamp is unique in the system and is always greater
222    /// than the latest timestamp generated by the HLC and than the
223    /// latest incoming timestamp that was used to update this [`HLC`]
224    /// (using [`HLC::update_with_timestamp()`]).
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use uhlc::HLC;
230    ///
231    /// let hlc = HLC::default();
232    /// let ts1 =  hlc.new_timestamp();
233    /// let ts2 =  hlc.new_timestamp();
234    /// assert!(ts2 > ts1);
235    /// ```
236    pub fn new_timestamp(&self) -> Timestamp {
237        let mut now = (self.clock)();
238        now.0 &= LMASK;
239        let mut last_time = lock!(self.last_time);
240        if now.0 > (last_time.0 & LMASK) {
241            *last_time = now
242        } else {
243            *last_time += 1;
244        }
245        Timestamp::new(*last_time, self.id)
246    }
247
248    /// Returns the HLC [`ID`].
249    ///
250    /// This ID is the specific identifier for this HLC instance.
251    ///
252    pub fn get_id(&self) -> &ID {
253        &self.id
254    }
255
256    /// Returns the HLC delta as [`NTP64`].
257    ///
258    /// The maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
259    ///
260    pub fn get_delta(&self) -> &NTP64 {
261        &self.delta
262    }
263
264    /// Update this [`HLC`] with a [`Timestamp`].
265    ///
266    /// Typically, this timestamp should have been generated by another HLC.
267    /// If the timestamp exceeds the current time of this HLC by more than the configured maximum delta
268    /// (see [`HLCBuilder::with_max_delta()`]) an [`Err`] is returned.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// use uhlc::HLC;
274    ///
275    /// let hlc1 = HLC::default();
276    ///
277    /// // update the HLC with a timestamp incoming from another HLC
278    /// // (typically remote, but not in this example...)
279    /// let hlc2 = HLC::default();
280    /// let other_ts = hlc2.new_timestamp();
281    /// if ! hlc1.update_with_timestamp(&other_ts).is_ok() {
282    ///     println!(r#"The incoming timestamp would make this HLC
283    ///              to drift too much. You should refuse it!"#);
284    /// }
285    ///
286    /// let ts = hlc1.new_timestamp();
287    /// assert!(ts > other_ts);
288    /// ```
289    pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), String> {
290        let mut now = (self.clock)();
291        now.0 &= LMASK;
292        let msg_time = timestamp.get_time();
293        if *msg_time > now && *msg_time - now > self.delta {
294            let err_msg = format!(
295                "incoming timestamp from {} exceeding delta {}ms is rejected: {:#} vs. now: {:#}",
296                timestamp.get_id(),
297                self.delta.to_duration().as_millis(),
298                msg_time,
299                now
300            );
301            #[cfg(feature = "std")]
302            log::warn!("{}", err_msg);
303            #[cfg(feature = "defmt")]
304            defmt::warn!("{}", err_msg);
305            Err(err_msg)
306        } else {
307            let mut last_time = lock!(self.last_time);
308            let max_time = cmp::max(cmp::max(now, *msg_time), *last_time);
309            if max_time == now {
310                *last_time = now;
311            } else if max_time == *msg_time {
312                *last_time = *msg_time + 1;
313            } else {
314                *last_time += 1;
315            }
316            Ok(())
317        }
318    }
319}
320
321impl Default for HLC {
322    /// Create a new [`HLC`] with a random u128 ID and using
323    /// [`system_time_clock()`] as physical clock.
324    /// This is equivalent to `HLCBuilder::default().build()`
325    fn default() -> Self {
326        HLCBuilder::default().build()
327    }
328}
329
330/// A physical clock relying on std::time::SystemTime::now().
331///
332/// It returns a NTP64 relative to std::time::UNIX_EPOCH (1st Jan 1970).
333/// That's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
334///
335#[inline]
336#[cfg(feature = "std")]
337pub fn system_time_clock() -> NTP64 {
338    NTP64::from(SystemTime::now().duration_since(UNIX_EPOCH).unwrap())
339}
340
341/// A physical clock relying on CLOCK_MONOTONIC.
342///
343/// Note: The time produced by this clock is relative to an unspecified starting point (likely the host boot).
344/// Therefore, comparing and ordering times produced by different instances of this clock on different hosts would probably be meaningless.
345///
346/// See: https://linux.die.net/man/2/clock_gettime
347///
348#[inline]
349#[cfg(all(feature = "nix", target_family = "unix"))]
350pub fn monotonic_time_clock() -> NTP64 {
351    let now = nix::time::ClockId::CLOCK_MONOTONIC.now().unwrap();
352    NTP64::from(now)
353}
354
355/// A dummy clock that returns a NTP64 initialized with the value 0.
356/// Suitable to use in no_std environments where std::time::{SystemTime, UNIX_EPOCH} are not available.
357/// If the feature `std` is disabled, that's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
358/// Notice that this means that the [`HLC`] will use incremental timestamps starting from 0.
359#[inline]
360pub fn zero_clock() -> NTP64 {
361    NTP64(0)
362}
363
364#[cfg(test)]
365mod tests {
366    use crate::*;
367    use async_std::sync::Arc;
368    use async_std::task;
369    use core::convert::TryFrom;
370    use core::time::Duration;
371    use futures::join;
372
373    fn is_sorted(vec: &[Timestamp]) -> bool {
374        let mut it = vec.iter();
375        let mut ts = it.next().unwrap();
376        for next in it {
377            if next <= ts {
378                return false;
379            };
380            ts = next;
381        }
382        true
383    }
384
385    #[test]
386    fn hlc_parallel() {
387        use alloc::vec::Vec;
388        task::block_on(async {
389            let id0: ID = ID::try_from([0x01]).unwrap();
390            let id1: ID = ID::try_from([0x02]).unwrap();
391            let id2: ID = ID::try_from([0x03]).unwrap();
392            let id3: ID = ID::try_from([0x04]).unwrap();
393            let hlc0 = Arc::new(HLCBuilder::new().with_id(id0).build());
394            let hlc1 = Arc::new(HLCBuilder::new().with_id(id1).build());
395            let hlc2 = Arc::new(HLCBuilder::new().with_id(id2).build());
396            let hlc3 = Arc::new(HLCBuilder::new().with_id(id3).build());
397
398            // Make 4 tasks to generate 10000 timestamps each with distinct HLCs,
399            // and also to update each other HLCs
400            const NB_TIME: usize = 10000;
401            let t0 = {
402                let hlc0 = hlc0.clone();
403                let hlc1 = hlc1.clone();
404                task::spawn(async move {
405                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
406                    for _ in 0..NB_TIME {
407                        let ts = hlc0.new_timestamp();
408                        assert!(hlc1.update_with_timestamp(&ts).is_ok());
409                        times.push(ts)
410                    }
411                    times
412                })
413            };
414            let t1 = {
415                let hlc1 = hlc1.clone();
416                let hlc2 = hlc2.clone();
417                task::spawn(async move {
418                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
419                    for _ in 0..NB_TIME {
420                        let ts = hlc1.new_timestamp();
421                        assert!(hlc2.update_with_timestamp(&ts).is_ok());
422                        times.push(ts)
423                    }
424                    times
425                })
426            };
427            let t2 = {
428                let hlc2 = hlc3.clone();
429                let hlc3 = hlc3.clone();
430                task::spawn(async move {
431                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
432                    for _ in 0..NB_TIME {
433                        let ts = hlc2.new_timestamp();
434                        assert!(hlc3.update_with_timestamp(&ts).is_ok());
435                        times.push(ts)
436                    }
437                    times
438                })
439            };
440            let t3 = {
441                let hlc3 = hlc3.clone();
442                let hlc0 = hlc0.clone();
443                task::spawn(async move {
444                    let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
445                    for _ in 0..NB_TIME {
446                        let ts = hlc3.new_timestamp();
447                        assert!(hlc0.update_with_timestamp(&ts).is_ok());
448                        times.push(ts)
449                    }
450                    times
451                })
452            };
453            let vecs = join!(t0, t1, t2, t3);
454
455            // test that each timeseries is sorted (i.e. monotonic time)
456            assert!(is_sorted(&vecs.0));
457            assert!(is_sorted(&vecs.1));
458            assert!(is_sorted(&vecs.2));
459            assert!(is_sorted(&vecs.3));
460
461            // test that there is no duplicate amongst all timestamps
462            let mut all_times: Vec<Timestamp> = vecs
463                .0
464                .into_iter()
465                .chain(vecs.1)
466                .chain(vecs.2)
467                .chain(vecs.3)
468                .collect::<Vec<Timestamp>>();
469            assert_eq!(NB_TIME * 4, all_times.len());
470            all_times.sort();
471            all_times.dedup();
472            assert_eq!(NB_TIME * 4, all_times.len());
473        });
474    }
475
476    #[test]
477    fn hlc_update_with_timestamp() {
478        let id: ID = ID::rand();
479        let hlc = HLCBuilder::new().with_id(id).build();
480
481        // Test that updating with an old Timestamp don't break the HLC
482        let past_ts = Timestamp::new(Default::default(), id);
483        let now_ts = hlc.new_timestamp();
484        assert!(hlc.update_with_timestamp(&past_ts).is_ok());
485        assert!(hlc.new_timestamp() > now_ts);
486
487        // Test that updating with a Timestamp exceeding the delta is refused
488        let now_ts = hlc.new_timestamp();
489        let future_time = now_ts.get_time() + NTP64::from(Duration::from_millis(1000));
490        let future_ts = Timestamp::new(future_time, id);
491        assert!(hlc.update_with_timestamp(&future_ts).is_err())
492    }
493
494    #[cfg(all(feature = "nix", target_family = "unix"))]
495    #[test]
496    fn hlc_nix_monotonic() {
497        let hlc = HLCBuilder::new().with_clock(monotonic_time_clock).build();
498        let t1 = monotonic_time_clock();
499        // Sleep for (CMASK + 1) nanoseconds since HLC clock strips off the lower CMASK bits.
500        std::thread::sleep(Duration::from_nanos(CMASK + 1));
501        let t2 = hlc.new_timestamp();
502        std::thread::sleep(Duration::from_nanos(1));
503        let t3 = monotonic_time_clock();
504
505        assert!(t2.get_time() > &t1);
506        assert!(&t3 > t2.get_time());
507    }
508}