Skip to main content

tower/circuit_breaker/
future.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::{Arc, Mutex},
5    task::{Context, Poll},
6};
7
8use pin_project_lite::pin_project;
9
10use super::{
11    policy::CircuitPolicy,
12    service::{CircuitError, CircuitStatus, SharedState},
13};
14
15pin_project! {
16    /// Response future for [`CircuitBreaker`].
17    ///
18    /// [`CircuitBreaker`]: super::service::CircuitBreaker
19    pub struct ResponseFuture<F, T, E, P> {
20        #[pin]
21        inner: F,
22        shared: Arc<Mutex<SharedState<P>>>,
23        _marker: std::marker::PhantomData<fn() -> (T, E)>,
24    }
25}
26
27impl<F, T, E, P> ResponseFuture<F, T, E, P> {
28    pub(crate) fn new(shared: Arc<Mutex<SharedState<P>>>, inner: F) -> Self {
29        Self {
30            inner,
31            shared,
32            _marker: std::marker::PhantomData,
33        }
34    }
35}
36
37impl<F, T, E, P> Future for ResponseFuture<F, T, E, P>
38where
39    F: Future<Output = Result<T, E>>,
40    P: CircuitPolicy,
41{
42    type Output = Result<T, CircuitError<E>>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        let this = self.project();
46
47        match this.inner.poll(cx) {
48            Poll::Pending => Poll::Pending,
49            Poll::Ready(Ok(resp)) => {
50                let mut s = this.shared.lock().expect("circuit breaker state poisoned");
51                let should_close = s.policy.on_success();
52                if should_close && s.status == CircuitStatus::HalfOpen {
53                    s.status = CircuitStatus::Closed;
54                }
55                Poll::Ready(Ok(resp))
56            }
57            Poll::Ready(Err(e)) => {
58                let mut s = this.shared.lock().expect("circuit breaker state poisoned");
59                let should_open = s.policy.on_failure();
60                match s.status {
61                    // Any failure during a probe reopens immediately —
62                    // the backend is not yet ready regardless of threshold.
63                    CircuitStatus::HalfOpen => {
64                        s.status = CircuitStatus::Open;
65                    }
66                    CircuitStatus::Closed if should_open => {
67                        s.status = CircuitStatus::Open;
68                    }
69                    _ => {}
70                }
71                Poll::Ready(Err(CircuitError::Inner(e)))
72            }
73        }
74    }
75}