uhlc/lib.rs
1//
2// Copyright (c) 2017, 2020 ADLINK Technology Inc.
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11
12//! A Unique Hybrid Logical Clock.
13//!
14//! This library is an implementation of an
15//! [Hybrid Logical Clock (HLC)](https://cse.buffalo.edu/tech-reports/2014-04.pdf)
16//! associated to a unique identifier.
17//! Thus, it is able to generate timestamps that are unique across a distributed system,
18//! without the need of a centralized time source.
19//!
20//! # Quick Start
21//!
22//! ```
23//! use uhlc::HLC;
24//!
25//! // create an HLC with a generated random ID and relying on SystemTime::now()
26//! let hlc = HLC::default();
27//!
28//! // generate timestamps
29//! let ts1 = hlc.new_timestamp();
30//! let ts2 = hlc.new_timestamp();
31//! assert!(ts2 > ts1);
32//!
33//! // update the HLC with a timestamp incoming from another HLC
34//! // (typically remote, but not in this example...)
35//! let hlc2 = HLC::default();
36//! let other_ts = hlc2.new_timestamp();
37//!
38//! if ! hlc.update_with_timestamp(&other_ts).is_ok() {
39//! println!(r#"The incoming timestamp would make this HLC
40//! to drift too much. You should refuse it!"#);
41//! }
42//!
43//! let ts3 = hlc.new_timestamp();
44//! assert!(ts3 > ts2);
45//! assert!(ts3 > other_ts);
46//! ```
47
48#![doc(
49 html_logo_url = "https://www.rust-lang.org/logos/rust-logo-128x128-blk.png",
50 html_favicon_url = "https://www.rust-lang.org/favicon.ico",
51 html_root_url = "https://atolab.github.io/uhlc-rs/"
52)]
53#![cfg_attr(not(feature = "std"), no_std)]
54extern crate alloc;
55
56use alloc::{format, string::String};
57use core::cmp;
58use core::time::Duration;
59
60#[cfg(feature = "std")]
61use {
62 lazy_static::lazy_static,
63 std::env::var,
64 std::sync::Mutex,
65 std::time::{SystemTime, UNIX_EPOCH},
66};
67
68#[cfg(not(feature = "std"))]
69use spin::Mutex; // No_std-friendly alternative to std::sync::Mutex
70
71mod id;
72pub use id::*;
73
74mod ntp64;
75pub use ntp64::*;
76
77mod timestamp;
78pub use timestamp::*;
79
80/// The size of counter part in [`NTP64`] (in bits)
81pub const CSIZE: u8 = 4u8;
82// Bit-mask of the counter part within the 64 bits time
83const CMASK: u64 = (1u64 << CSIZE) - 1u64;
84// Bit-mask of the logical clock part within the 64 bits time
85const LMASK: u64 = !CMASK;
86
87// HLC Delta in milliseconds: maximum accepted drift for an external timestamp.
88// I.e.: if an incoming timestamp has a time > now() + delta, then the HLC is not updated.
89const DEFAULT_DELTA_MS: u64 = 500;
90#[cfg(feature = "std")]
91lazy_static! {
92 static ref DELTA_MS: u64 = match var("UHLC_MAX_DELTA_MS") {
93 Ok(s) => s.parse().unwrap_or_else(|e| panic!(
94 "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}={} : {}",
95 s, e
96 )),
97 Err(std::env::VarError::NotPresent) => DEFAULT_DELTA_MS,
98 Err(e) => panic!(
99 "Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}: {}",
100 e
101 ),
102 };
103}
104#[cfg(not(feature = "std"))]
105static DELTA_MS: &u64 = &DEFAULT_DELTA_MS; // Environment variables do not make sense in no_std environment
106
107///
108/// The builder of [`HLC`].
109///
110/// # Examples
111///
112/// ```
113/// use std::{convert::TryFrom, time::Duration};
114/// use uhlc::{HLCBuilder, ID};
115///
116/// let default_hlc = HLCBuilder::new().build();
117/// println!("{}", default_hlc.new_timestamp());
118///
119/// let custom_hlc = HLCBuilder::new()
120/// .with_id(ID::try_from([0x01, 0x02, 0x03]).unwrap())
121/// .with_max_delta(Duration::from_secs(1))
122/// .build();
123/// println!("{}", custom_hlc.new_timestamp());
124pub struct HLCBuilder {
125 hlc: HLC,
126}
127
128impl HLCBuilder {
129 ///
130 /// Constructs a new HLCBuilder for the creation of an [`HLC`], with the following default configuration:
131 /// * a random u128 as HLC identifier.
132 /// Can be changed calling [`Self::with_id()`].
133 /// * [`system_time_clock()`] as physical clock (i.e. the ).
134 /// Can be changed calling [`Self::with_clock()`].
135 /// * 500 millisecond as maximum delta (i.e. the maximum accepted drift for an external timestamp).
136 /// Can be changed calling [`Self::with_max_delta()`].
137 ///
138 pub fn new() -> HLCBuilder {
139 HLCBuilder::default()
140 }
141
142 ///
143 /// Configure a specific identifier for the HLC to be created.
144 ///
145 /// **NOTE: the identifier must be unique in the system.**
146 ///
147 pub fn with_id(mut self, id: ID) -> HLCBuilder {
148 self.hlc.id = id;
149 self
150 }
151
152 ///
153 /// Configure a specific physical clock for the HLC to be created.
154 ///
155 /// The `clock` parameter must be a function returning a new physical time (as an [`NTP64`] at each call.
156 /// The time returned by this clock doesn't need to be monotonic: when the HLC generates a new timestamp from this time,
157 /// it first checks if this time is greater than the previously generated timestamp. If not, the new timestamp it the previous one +1.
158 ///
159 pub fn with_clock(mut self, clock: fn() -> NTP64) -> HLCBuilder {
160 self.hlc.clock = clock;
161 self
162 }
163
164 ///
165 /// Configure the maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
166 ///
167 pub fn with_max_delta(mut self, delta: Duration) -> HLCBuilder {
168 self.hlc.delta = delta.into();
169 self
170 }
171
172 pub fn build(self) -> HLC {
173 self.hlc
174 }
175}
176
177impl Default for HLCBuilder {
178 fn default() -> Self {
179 HLCBuilder {
180 hlc: HLC {
181 id: ID::rand(),
182 #[cfg(feature = "std")]
183 clock: system_time_clock,
184 #[cfg(not(feature = "std"))]
185 clock: zero_clock,
186 delta: NTP64::from(Duration::from_millis(*DELTA_MS)),
187 last_time: Default::default(),
188 },
189 }
190 }
191}
192
193/// An Hybric Logical Clock generating [`Timestamp`]s
194pub struct HLC {
195 id: ID,
196 clock: fn() -> NTP64,
197 delta: NTP64,
198 last_time: Mutex<NTP64>,
199}
200
201#[cfg(feature = "std")]
202macro_rules! lock {
203 ($var:expr) => {
204 match $var.try_lock() {
205 Ok(guard) => guard,
206 Err(_) => $var.lock().unwrap(),
207 }
208 };
209}
210
211#[cfg(not(feature = "std"))]
212macro_rules! lock {
213 ($var:expr) => {
214 $var.lock()
215 };
216}
217
218impl HLC {
219 /// Generate a new [`Timestamp`].
220 ///
221 /// This timestamp is unique in the system and is always greater
222 /// than the latest timestamp generated by the HLC and than the
223 /// latest incoming timestamp that was used to update this [`HLC`]
224 /// (using [`HLC::update_with_timestamp()`]).
225 ///
226 /// # Examples
227 ///
228 /// ```
229 /// use uhlc::HLC;
230 ///
231 /// let hlc = HLC::default();
232 /// let ts1 = hlc.new_timestamp();
233 /// let ts2 = hlc.new_timestamp();
234 /// assert!(ts2 > ts1);
235 /// ```
236 pub fn new_timestamp(&self) -> Timestamp {
237 let mut now = (self.clock)();
238 now.0 &= LMASK;
239 let mut last_time = lock!(self.last_time);
240 if now.0 > (last_time.0 & LMASK) {
241 *last_time = now
242 } else {
243 *last_time += 1;
244 }
245 Timestamp::new(*last_time, self.id)
246 }
247
248 /// Returns the HLC [`ID`].
249 ///
250 /// This ID is the specific identifier for this HLC instance.
251 ///
252 pub fn get_id(&self) -> &ID {
253 &self.id
254 }
255
256 /// Returns the HLC delta as [`NTP64`].
257 ///
258 /// The maximum delta accepted by an HLC when updating it's logical clock calling [`HLC::update_with_timestamp()`].
259 ///
260 pub fn get_delta(&self) -> &NTP64 {
261 &self.delta
262 }
263
264 /// Update this [`HLC`] with a [`Timestamp`].
265 ///
266 /// Typically, this timestamp should have been generated by another HLC.
267 /// If the timestamp exceeds the current time of this HLC by more than the configured maximum delta
268 /// (see [`HLCBuilder::with_max_delta()`]) an [`Err`] is returned.
269 ///
270 /// # Examples
271 ///
272 /// ```
273 /// use uhlc::HLC;
274 ///
275 /// let hlc1 = HLC::default();
276 ///
277 /// // update the HLC with a timestamp incoming from another HLC
278 /// // (typically remote, but not in this example...)
279 /// let hlc2 = HLC::default();
280 /// let other_ts = hlc2.new_timestamp();
281 /// if ! hlc1.update_with_timestamp(&other_ts).is_ok() {
282 /// println!(r#"The incoming timestamp would make this HLC
283 /// to drift too much. You should refuse it!"#);
284 /// }
285 ///
286 /// let ts = hlc1.new_timestamp();
287 /// assert!(ts > other_ts);
288 /// ```
289 pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), String> {
290 let mut now = (self.clock)();
291 now.0 &= LMASK;
292 let msg_time = timestamp.get_time();
293 if *msg_time > now && *msg_time - now > self.delta {
294 let err_msg = format!(
295 "incoming timestamp from {} exceeding delta {}ms is rejected: {:#} vs. now: {:#}",
296 timestamp.get_id(),
297 self.delta.to_duration().as_millis(),
298 msg_time,
299 now
300 );
301 #[cfg(feature = "std")]
302 log::warn!("{}", err_msg);
303 #[cfg(feature = "defmt")]
304 defmt::warn!("{}", err_msg);
305 Err(err_msg)
306 } else {
307 let mut last_time = lock!(self.last_time);
308 let max_time = cmp::max(cmp::max(now, *msg_time), *last_time);
309 if max_time == now {
310 *last_time = now;
311 } else if max_time == *msg_time {
312 *last_time = *msg_time + 1;
313 } else {
314 *last_time += 1;
315 }
316 Ok(())
317 }
318 }
319}
320
321impl Default for HLC {
322 /// Create a new [`HLC`] with a random u128 ID and using
323 /// [`system_time_clock()`] as physical clock.
324 /// This is equivalent to `HLCBuilder::default().build()`
325 fn default() -> Self {
326 HLCBuilder::default().build()
327 }
328}
329
330/// A physical clock relying on std::time::SystemTime::now().
331///
332/// It returns a NTP64 relative to std::time::UNIX_EPOCH (1st Jan 1970).
333/// That's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
334///
335#[inline]
336#[cfg(feature = "std")]
337pub fn system_time_clock() -> NTP64 {
338 NTP64::from(SystemTime::now().duration_since(UNIX_EPOCH).unwrap())
339}
340
341/// A physical clock relying on CLOCK_MONOTONIC.
342///
343/// Note: The time produced by this clock is relative to an unspecified starting point (likely the host boot).
344/// Therefore, comparing and ordering times produced by different instances of this clock on different hosts would probably be meaningless.
345///
346/// See: https://linux.die.net/man/2/clock_gettime
347///
348#[inline]
349#[cfg(all(feature = "nix", target_family = "unix"))]
350pub fn monotonic_time_clock() -> NTP64 {
351 let now = nix::time::ClockId::CLOCK_MONOTONIC.now().unwrap();
352 NTP64::from(now)
353}
354
355/// A dummy clock that returns a NTP64 initialized with the value 0.
356/// Suitable to use in no_std environments where std::time::{SystemTime, UNIX_EPOCH} are not available.
357/// If the feature `std` is disabled, that's the default clock used by an [`HLC`] if [`HLCBuilder::with_clock()`] is not called.
358/// Notice that this means that the [`HLC`] will use incremental timestamps starting from 0.
359#[inline]
360pub fn zero_clock() -> NTP64 {
361 NTP64(0)
362}
363
364#[cfg(test)]
365mod tests {
366 use crate::*;
367 use async_std::sync::Arc;
368 use async_std::task;
369 use core::convert::TryFrom;
370 use core::time::Duration;
371 use futures::join;
372
373 fn is_sorted(vec: &[Timestamp]) -> bool {
374 let mut it = vec.iter();
375 let mut ts = it.next().unwrap();
376 for next in it {
377 if next <= ts {
378 return false;
379 };
380 ts = next;
381 }
382 true
383 }
384
385 #[test]
386 fn hlc_parallel() {
387 use alloc::vec::Vec;
388 task::block_on(async {
389 let id0: ID = ID::try_from([0x01]).unwrap();
390 let id1: ID = ID::try_from([0x02]).unwrap();
391 let id2: ID = ID::try_from([0x03]).unwrap();
392 let id3: ID = ID::try_from([0x04]).unwrap();
393 let hlc0 = Arc::new(HLCBuilder::new().with_id(id0).build());
394 let hlc1 = Arc::new(HLCBuilder::new().with_id(id1).build());
395 let hlc2 = Arc::new(HLCBuilder::new().with_id(id2).build());
396 let hlc3 = Arc::new(HLCBuilder::new().with_id(id3).build());
397
398 // Make 4 tasks to generate 10000 timestamps each with distinct HLCs,
399 // and also to update each other HLCs
400 const NB_TIME: usize = 10000;
401 let t0 = {
402 let hlc0 = hlc0.clone();
403 let hlc1 = hlc1.clone();
404 task::spawn(async move {
405 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
406 for _ in 0..NB_TIME {
407 let ts = hlc0.new_timestamp();
408 assert!(hlc1.update_with_timestamp(&ts).is_ok());
409 times.push(ts)
410 }
411 times
412 })
413 };
414 let t1 = {
415 let hlc1 = hlc1.clone();
416 let hlc2 = hlc2.clone();
417 task::spawn(async move {
418 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
419 for _ in 0..NB_TIME {
420 let ts = hlc1.new_timestamp();
421 assert!(hlc2.update_with_timestamp(&ts).is_ok());
422 times.push(ts)
423 }
424 times
425 })
426 };
427 let t2 = {
428 let hlc2 = hlc3.clone();
429 let hlc3 = hlc3.clone();
430 task::spawn(async move {
431 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
432 for _ in 0..NB_TIME {
433 let ts = hlc2.new_timestamp();
434 assert!(hlc3.update_with_timestamp(&ts).is_ok());
435 times.push(ts)
436 }
437 times
438 })
439 };
440 let t3 = {
441 let hlc3 = hlc3.clone();
442 let hlc0 = hlc0.clone();
443 task::spawn(async move {
444 let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
445 for _ in 0..NB_TIME {
446 let ts = hlc3.new_timestamp();
447 assert!(hlc0.update_with_timestamp(&ts).is_ok());
448 times.push(ts)
449 }
450 times
451 })
452 };
453 let vecs = join!(t0, t1, t2, t3);
454
455 // test that each timeseries is sorted (i.e. monotonic time)
456 assert!(is_sorted(&vecs.0));
457 assert!(is_sorted(&vecs.1));
458 assert!(is_sorted(&vecs.2));
459 assert!(is_sorted(&vecs.3));
460
461 // test that there is no duplicate amongst all timestamps
462 let mut all_times: Vec<Timestamp> = vecs
463 .0
464 .into_iter()
465 .chain(vecs.1)
466 .chain(vecs.2)
467 .chain(vecs.3)
468 .collect::<Vec<Timestamp>>();
469 assert_eq!(NB_TIME * 4, all_times.len());
470 all_times.sort();
471 all_times.dedup();
472 assert_eq!(NB_TIME * 4, all_times.len());
473 });
474 }
475
476 #[test]
477 fn hlc_update_with_timestamp() {
478 let id: ID = ID::rand();
479 let hlc = HLCBuilder::new().with_id(id).build();
480
481 // Test that updating with an old Timestamp don't break the HLC
482 let past_ts = Timestamp::new(Default::default(), id);
483 let now_ts = hlc.new_timestamp();
484 assert!(hlc.update_with_timestamp(&past_ts).is_ok());
485 assert!(hlc.new_timestamp() > now_ts);
486
487 // Test that updating with a Timestamp exceeding the delta is refused
488 let now_ts = hlc.new_timestamp();
489 let future_time = now_ts.get_time() + NTP64::from(Duration::from_millis(1000));
490 let future_ts = Timestamp::new(future_time, id);
491 assert!(hlc.update_with_timestamp(&future_ts).is_err())
492 }
493
494 #[cfg(all(feature = "nix", target_family = "unix"))]
495 #[test]
496 fn hlc_nix_monotonic() {
497 let hlc = HLCBuilder::new().with_clock(monotonic_time_clock).build();
498 let t1 = monotonic_time_clock();
499 // Sleep for (CMASK + 1) nanoseconds since HLC clock strips off the lower CMASK bits.
500 std::thread::sleep(Duration::from_nanos(CMASK + 1));
501 let t2 = hlc.new_timestamp();
502 std::thread::sleep(Duration::from_nanos(1));
503 let t3 = monotonic_time_clock();
504
505 assert!(t2.get_time() > &t1);
506 assert!(&t3 > t2.get_time());
507 }
508}