tower/circuit_breaker/
future.rs1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{Arc, Mutex},
5 task::{Context, Poll},
6};
7
8use pin_project_lite::pin_project;
9
10use super::{
11 policy::CircuitPolicy,
12 service::{CircuitError, CircuitStatus, SharedState},
13};
14
15pin_project! {
16 pub struct ResponseFuture<F, T, E, P> {
20 #[pin]
21 inner: F,
22 shared: Arc<Mutex<SharedState<P>>>,
23 _marker: std::marker::PhantomData<fn() -> (T, E)>,
24 }
25}
26
27impl<F, T, E, P> ResponseFuture<F, T, E, P> {
28 pub(crate) fn new(shared: Arc<Mutex<SharedState<P>>>, inner: F) -> Self {
29 Self {
30 inner,
31 shared,
32 _marker: std::marker::PhantomData,
33 }
34 }
35}
36
37impl<F, T, E, P> Future for ResponseFuture<F, T, E, P>
38where
39 F: Future<Output = Result<T, E>>,
40 P: CircuitPolicy,
41{
42 type Output = Result<T, CircuitError<E>>;
43
44 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45 let this = self.project();
46
47 match this.inner.poll(cx) {
48 Poll::Pending => Poll::Pending,
49 Poll::Ready(Ok(resp)) => {
50 let mut s = this.shared.lock().expect("circuit breaker state poisoned");
51 let should_close = s.policy.on_success();
52 if should_close && s.status == CircuitStatus::HalfOpen {
53 s.status = CircuitStatus::Closed;
54 }
55 Poll::Ready(Ok(resp))
56 }
57 Poll::Ready(Err(e)) => {
58 let mut s = this.shared.lock().expect("circuit breaker state poisoned");
59 let should_open = s.policy.on_failure();
60 match s.status {
61 CircuitStatus::HalfOpen => {
64 s.status = CircuitStatus::Open;
65 }
66 CircuitStatus::Closed if should_open => {
67 s.status = CircuitStatus::Open;
68 }
69 _ => {}
70 }
71 Poll::Ready(Err(CircuitError::Inner(e)))
72 }
73 }
74 }
75}