uhlc/lib.rs
1//
2// Copyright (c) 2017, 2020 ADLINK Technology Inc.
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11
12//! A Unique Hybrid Logical Clock.
13//!
14//! This library is an implementation of an
15//! [Hybrid Logical Clock (HLC)](https://cse.buffalo.edu/tech-reports/2014-04.pdf)
16//! associated to a unique identifier.
17//! Thus, it is able to generate timestamps that are unique across a distributed system,
18//! without the need of a centralized time source.
19//!
20//! # Quick Start
21//!
22//! ```
23//! use uhlc::HLC;
24//!
25//! // create an HLC with a generated random ID and relying on SystemTime::now()
26//! let hlc = HLC::default();
27//!
28//! // generate timestamps
29//! let ts1 = hlc.new_timestamp();
30//! let ts2 = hlc.new_timestamp();
31//! assert!(ts2 > ts1);
32//!
33//! // update the HLC with a timestamp incoming from another HLC
34//! // (typically remote, but not in this example...)
35//! let hlc2 = HLC::default();
36//! let other_ts = hlc2.new_timestamp();
37//!
38//! if ! hlc.update_with_timestamp(&other_ts).is_ok() {
39//! println!(r#"The incoming timestamp would make this HLC
40//! to drift too much. You should refuse it!"#);
41//! }
42//!
43//! let ts3 = hlc.new_timestamp();
44//! assert!(ts3 > ts2);
45//! assert!(ts3 > other_ts);
46//! ```
47
48#![doc(
49 html_logo_url = "https://www.rust-lang.org/logos/rust-logo-128x128-blk.png",
50 html_favicon_url = "https://www.rust-lang.org/favicon.ico",
51 html_root_url = "https://atolab.github.io/uhlc-rs/"
52)]
53#![cfg_attr(not(feature = "std"), no_std)]
54
55use core::cmp;
56use core::time::Duration;
57
58#[cfg(feature = "std")]
59use {
60 lazy_static::lazy_static,
61 std::env::var,
62 std::sync::Mutex,
63 std::time::{SystemTime, UNIX_EPOCH},
64};
65
66#[cfg(not(feature = "std"))]
67use spin::Mutex; // No_std-friendly alternative to std::sync::Mutex
68
69mod id;
70pub use id::*;
71
72mod ntp64;
73pub use ntp64::*;
74
75mod timestamp;
76pub use timestamp::*;
77
78/// The size of counter part in [`NTP64`] (in bits)
79pub const CSIZE: u8 = 4u8;
80// Bit-mask of the counter part within the 64 bits time
81const CMASK: u64 = (1u64 << CSIZE) - 1u64;
82// Bit-mask of the logical clock part within the 64 bits time
83const LMASK: u64 = !CMASK;
84
85// HLC Delta in milliseconds: maximum accepted drift for an external timestamp.
86// I.e.: if an incoming timestamp has a time > now() + delta, then the HLC is not updated.
87const DEFAULT_DELTA_MS: u64 = 500;
88#[cfg(feature = "std")]
89lazy_static! {
90 static ref DELTA_MS: u64 = match var("UHLC_MAX_DELTA_MS") {
91 Ok(s) => s.parse().unwrap_or_else(|e| panic!(
92 "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}={} : {}",
93 s, e
94 )),
95 Err(std::env::VarError::NotPresent) => DEFAULT_DELTA_MS,
96 Err(e) => panic!(
97 "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}: {}",
98 e
99 ),
100 };
101}
102#[cfg(not(feature = "std"))]
103static DELTA_MS: &u64 = &DEFAULT_DELTA_MS; // Environment variables do not make sense in no_std environment
104
105///
106/// The builder of [`HLC`].
107///
108/// # Examples
109///
110/// ```
111/// use std::{convert::TryFrom, time::Duration};
112/// use uhlc::{HLCBuilder, ID};
113///
114/// let default_hlc = HLCBuilder::new().build();
115/// println!("{}", default_hlc.new_timestamp());
116///
117/// let custom_hlc = HLCBuilder::new()
118/// .with_id(ID::try_from([0x01, 0x02, 0x03]).unwrap())
119/// .with_max_delta(Duration::from_secs(1))
120/// .build();
121/// println!("{}", custom_hlc.new_timestamp());
122pub struct HLCBuilder {
123 hlc: HLC,
124}
125
126impl HLCBuilder {
127 ///
128 /// Constructs a new HLCBuilder for the creation of an [`HLC`], with the following default configuration:
129 /// * a random u128 as HLC identifier.
130 /// Can be changed calling [`Self::with_id()`].
131 /// * [`system_time_clock()`] as physical clock (i.e. the ).
132 /// Can be changed calling [`Self::with_clock()`].
133 /// * 500 millisecond as maximum delta (i.e. the maximum accepted drift for an external timestamp).
134 /// Can be changed calling [`Self::with_max_delta()`].
135 ///
136 pub fn new() -> HLCBuilder {
137 HLCBuilder::default()
138 }
139
140 ///
141 /// Configure a specific identifier for the HLC to be created.
142 ///
143 /// **NOTE: the identifier must be unique in the system.**
144 ///
145 pub fn with_id(mut self, id: ID) -> HLCBuilder {
146 self.hlc.id = id;
147 self
148 }
149
150 ///
151 /// Configure a specific physical clock for the HLC to be created.
152 ///
153 /// The `clock` parameter must be a function returning a new physical time (as an [`NTP64`] at each call.
154 /// The time returned by this clock doesn't need to be monotonic: when the HLC generates a new timestamp from this time,
155 /// it first checks if this time is greater than the previously generated timestamp. If not, the new timestamp it the previous one +1.
156 ///
157 pub fn with_clock(mut self, clock: fn() -> NTP64) -> HLCBuilder {
158 self.hlc.clock = clock;
159 self
160 }
161
162 ///
163 /// Configure the maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
164 ///
165 pub fn with_max_delta(mut self, delta: Duration) -> HLCBuilder {
166 self.hlc.delta = delta.into();
167 self
168 }
169
170 pub fn build(self) -> HLC {
171 self.hlc
172 }
173}
174
175impl Default for HLCBuilder {
176 fn default() -> Self {
177 HLCBuilder {
178 hlc: HLC {
179 id: ID::rand(),
180 #[cfg(feature = "std")]
181 clock: system_time_clock,
182 #[cfg(not(feature = "std"))]
183 clock: zero_clock,
184 delta: NTP64::from(Duration::from_millis(*DELTA_MS)),
185 last_time: Default::default(),
186 },
187 }
188 }
189}
190
191/// An Hybric Logical Clock generating [`Timestamp`]s
192pub struct HLC {
193 id: ID,
194 clock: fn() -> NTP64,
195 delta: NTP64,
196 last_time: Mutex<NTP64>,
197}
198
199#[cfg(feature = "std")]
200macro_rules! lock {
201 ($var:expr) => {
202 match $var.try_lock() {
203 Ok(guard) => guard,
204 Err(_) => $var.lock().unwrap(),
205 }
206 };
207}
208
209#[cfg(not(feature = "std"))]
210macro_rules! lock {
211 ($var:expr) => {
212 $var.lock()
213 };
214}
215
216impl HLC {
217 /// Generate a new [`Timestamp`].
218 ///
219 /// This timestamp is unique in the system and is always greater
220 /// than the latest timestamp generated by the HLC and than the
221 /// latest incoming timestamp that was used to update this [`HLC`]
222 /// (using [`HLC::update_with_timestamp()`]).
223 ///
224 /// # Examples
225 ///
226 /// ```
227 /// use uhlc::HLC;
228 ///
229 /// let hlc = HLC::default();
230 /// let ts1 = hlc.new_timestamp();
231 /// let ts2 = hlc.new_timestamp();
232 /// assert!(ts2 > ts1);
233 /// ```
234 pub fn new_timestamp(&self) -> Timestamp {
235 let mut now = (self.clock)();
236 now.0 &= LMASK;
237 let mut last_time = lock!(self.last_time);
238 if now.0 > (last_time.0 & LMASK) {
239 *last_time = now
240 } else {
241 *last_time += 1;
242 }
243 Timestamp::new(*last_time, self.id)
244 }
245
246 /// Returns the HLC [`ID`].
247 ///
248 /// This ID is the specific identifier for this HLC instance.
249 ///
250 pub fn get_id(&self) -> &ID {
251 &self.id
252 }
253
254 /// Returns the HLC delta as [`NTP64`].
255 ///
256 /// The maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
257 ///
258 pub fn get_delta(&self) -> &NTP64 {
259 &self.delta
260 }
261
262 /// Update this [`HLC`] with a [`Timestamp`].
263 ///
264 /// Typically, this timestamp should have been generated by another HLC.
265 /// If the timestamp exceeds the current time of this HLC by more than the configured maximum delta
266 /// (see [`HLCBuilder::with_max_delta()`]) an [`Err`] is returned.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use uhlc::HLC;
272 ///
273 /// let hlc1 = HLC::default();
274 ///
275 /// // update the HLC with a timestamp incoming from another HLC
276 /// // (typically remote, but not in this example...)
277 /// let hlc2 = HLC::default();
278 /// let other_ts = hlc2.new_timestamp();
279 /// if ! hlc1.update_with_timestamp(&other_ts).is_ok() {
280 /// println!(r#"The incoming timestamp would make this HLC
281 /// to drift too much. You should refuse it!"#);
282 /// }
283 ///
284 /// let ts = hlc1.new_timestamp();
285 /// assert!(ts > other_ts);
286 /// ```
287 pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), ExceedingDeltaError> {
288 let mut now = (self.clock)();
289 now.0 &= LMASK;
290 let msg_time = timestamp.get_time();
291 if *msg_time > now && *msg_time - now > self.delta {
292 let err = ExceedingDeltaError {
293 id: *timestamp.get_id(),
294 duration: self.delta.to_duration().as_millis(),
295 msg_time: *msg_time,
296 now,
297 };
298
299 #[cfg(feature = "std")]
300 log::warn!("{}", err);
301 #[cfg(feature = "defmt")]
302 defmt::warn!("{}", err);
303 Err(err)
304 } else {
305 let mut last_time = lock!(self.last_time);
306 let max_time = cmp::max(cmp::max(now, *msg_time), *last_time);
307 if max_time == now {
308 *last_time = now;
309 } else if max_time == *msg_time {
310 *last_time = *msg_time + 1;
311 } else {
312 *last_time += 1;
313 }
314 Ok(())
315 }
316 }
317}
318
319#[derive(Debug, PartialEq, Eq)]
320pub struct ExceedingDeltaError {
321 id: ID,
322 duration: u128,
323 msg_time: NTP64,
324 now: NTP64,
325}
326
327impl core::fmt::Display for ExceedingDeltaError {
328 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
329 write!(
330 f,
331 "incoming timestamp from {} exceeding delta {}ms is rejected: {:#} vs. now: {:#}",
332 self.id, self.duration, self.msg_time, self.now
333 )
334 }
335}
336
337#[cfg(feature = "std")]
338impl std::error::Error for ExceedingDeltaError {}
339
340impl Default for HLC {
341 /// Create a new [`HLC`] with a random u128 ID and using
342 /// [`system_time_clock()`] as physical clock.
343 /// This is equivalent to `HLCBuilder::default().build()`
344 fn default() -> Self {
345 HLCBuilder::default().build()
346 }
347}
348
349/// A physical clock relying on std::time::SystemTime::now().
350///
351/// It returns a NTP64 relative to std::time::UNIX_EPOCH (1st Jan 1970).
352/// That's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
353///
354#[inline]
355#[cfg(feature = "std")]
356pub fn system_time_clock() -> NTP64 {
357 NTP64::from(SystemTime::now().duration_since(UNIX_EPOCH).unwrap())
358}
359
360/// A physical clock relying on CLOCK_MONOTONIC.
361///
362/// Note: The time produced by this clock is relative to an unspecified starting point (likely the host boot).
363/// Therefore, comparing and ordering times produced by different instances of this clock on different hosts would probably be meaningless.
364///
365/// See: https://linux.die.net/man/2/clock_gettime
366///
367#[inline]
368#[cfg(all(feature = "nix", target_family = "unix"))]
369pub fn monotonic_time_clock() -> NTP64 {
370 let now = nix::time::ClockId::CLOCK_MONOTONIC.now().unwrap();
371 NTP64::from(now)
372}
373
374/// A dummy clock that returns a NTP64 initialized with the value 0.
375/// Suitable to use in no_std environments where std::time::{SystemTime, UNIX_EPOCH} are not available.
376/// If the feature `std` is disabled, that's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
377/// Notice that this means that the [`HLC`] will use incremental timestamps starting from 0.
378#[inline]
379pub fn zero_clock() -> NTP64 {
380 NTP64(0)
381}
382
383#[cfg(test)]
384mod tests {
385 use crate::*;
386 use core::time::Duration;
387
388 #[cfg(feature = "std")]
389 fn is_sorted(vec: &[Timestamp]) -> bool {
390 let mut it = vec.iter();
391 let mut ts = it.next().unwrap();
392 for next in it {
393 if next <= ts {
394 return false;
395 };
396 ts = next;
397 }
398 true
399 }
400
401 #[cfg(feature = "std")]
402 #[test]
403 fn hlc_parallel() {
404 use async_std::sync::Arc;
405 use async_std::task;
406 use core::convert::TryFrom;
407 use futures::join;
408 use std::vec::Vec;
409
410 task::block_on(async {
411 let id0: ID = ID::try_from([0x01]).unwrap();
412 let id1: ID = ID::try_from([0x02]).unwrap();
413 let id2: ID = ID::try_from([0x03]).unwrap();
414 let id3: ID = ID::try_from([0x04]).unwrap();
415 let hlc0 = Arc::new(HLCBuilder::new().with_id(id0).build());
416 let hlc1 = Arc::new(HLCBuilder::new().with_id(id1).build());
417 let hlc2 = Arc::new(HLCBuilder::new().with_id(id2).build());
418 let hlc3 = Arc::new(HLCBuilder::new().with_id(id3).build());
419
420 // Make 4 tasks to generate 10000 timestamps each with distinct HLCs,
421 // and also to update each other HLCs
422 const NB_TIME: usize = 10000;
423 let t0 = {
424 let hlc0 = hlc0.clone();
425 let hlc1 = hlc1.clone();
426 task::spawn(async move {
427 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
428 for _ in 0..NB_TIME {
429 let ts = hlc0.new_timestamp();
430 assert!(hlc1.update_with_timestamp(&ts).is_ok());
431 times.push(ts)
432 }
433 times
434 })
435 };
436 let t1 = {
437 let hlc1 = hlc1.clone();
438 let hlc2 = hlc2.clone();
439 task::spawn(async move {
440 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
441 for _ in 0..NB_TIME {
442 let ts = hlc1.new_timestamp();
443 assert!(hlc2.update_with_timestamp(&ts).is_ok());
444 times.push(ts)
445 }
446 times
447 })
448 };
449 let t2 = {
450 let hlc2 = hlc3.clone();
451 let hlc3 = hlc3.clone();
452 task::spawn(async move {
453 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
454 for _ in 0..NB_TIME {
455 let ts = hlc2.new_timestamp();
456 assert!(hlc3.update_with_timestamp(&ts).is_ok());
457 times.push(ts)
458 }
459 times
460 })
461 };
462 let t3 = {
463 let hlc3 = hlc3.clone();
464 let hlc0 = hlc0.clone();
465 task::spawn(async move {
466 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
467 for _ in 0..NB_TIME {
468 let ts = hlc3.new_timestamp();
469 assert!(hlc0.update_with_timestamp(&ts).is_ok());
470 times.push(ts)
471 }
472 times
473 })
474 };
475 let vecs = join!(t0, t1, t2, t3);
476
477 // test that each timeseries is sorted (i.e. monotonic time)
478 assert!(is_sorted(&vecs.0));
479 assert!(is_sorted(&vecs.1));
480 assert!(is_sorted(&vecs.2));
481 assert!(is_sorted(&vecs.3));
482
483 // test that there is no duplicate amongst all timestamps
484 let mut all_times: Vec<Timestamp> = vecs
485 .0
486 .into_iter()
487 .chain(vecs.1)
488 .chain(vecs.2)
489 .chain(vecs.3)
490 .collect::<Vec<Timestamp>>();
491 assert_eq!(NB_TIME * 4, all_times.len());
492 all_times.sort();
493 all_times.dedup();
494 assert_eq!(NB_TIME * 4, all_times.len());
495 });
496 }
497
498 #[test]
499 fn hlc_update_with_timestamp() {
500 let id: ID = ID::rand();
501 let hlc = HLCBuilder::new().with_id(id).build();
502
503 // Test that updating with an old Timestamp don't break the HLC
504 let past_ts = Timestamp::new(Default::default(), id);
505 let now_ts = hlc.new_timestamp();
506 assert!(hlc.update_with_timestamp(&past_ts).is_ok());
507 assert!(hlc.new_timestamp() > now_ts);
508
509 // Test that updating with a Timestamp exceeding the delta is refused
510 let now_ts = hlc.new_timestamp();
511 let future_time = now_ts.get_time() + NTP64::from(Duration::from_millis(1000));
512 let future_ts = Timestamp::new(future_time, id);
513 assert!(hlc.update_with_timestamp(&future_ts).is_err())
514 }
515
516 #[cfg(all(feature = "nix", target_family = "unix", feature = "std"))]
517 #[test]
518 fn hlc_nix_monotonic() {
519 let hlc = HLCBuilder::new().with_clock(monotonic_time_clock).build();
520 let t1 = monotonic_time_clock();
521 // Sleep for (CMASK + 1) nanoseconds since HLC clock strips off the lower CMASK bits.
522 std::thread::sleep(Duration::from_nanos(CMASK + 1));
523 let t2 = hlc.new_timestamp();
524 std::thread::sleep(Duration::from_nanos(1));
525 let t3 = monotonic_time_clock();
526
527 assert!(t2.get_time() > &t1);
528 assert!(&t3 > t2.get_time());
529 }
530}