Skip to content

断路器模式

Circuit Breaker Pattern

处理大量大概率会耗费时间和资源去纠正的错误请求。

稳定性,弹性。

1. 背景与问题

在分布式的环境下,访问远程资源或服务时,会发生短暂性的失败,比如缓慢的网络连接、超时、资源过载或临时不可用。这些错误通常很快就能自动修复。

但是还有另一种情况,就是不容易发现的未预料到的事件,可能此时要花费更多的时间去修复。这些在长时间内不能修复的错误存在时,重试就显得毫无意义,而且会导致系统资源白白被浪费甚至到系统崩溃,不如迅速直接地返回错误,在合适的时候重新再调用服务。

缩短等待时间可能也能解决这个问题,但问题是,你不能将这个时间设置的过短吧?

2. 解决方案

断路器可以防止应用重复尝试执行大概率会失败的操作,让其无需等待浪费资源的长时间存在的错误。并且如果检测到错误已被解决了,应用可以尝试调用操作。

断路器充当可能失败的操作的代理。该代理可以由状态机实现相关功能:

  • 闭路(Closed)从应用的请求可以路由到操作。

代理维护了最近失败的次数count,如果操作不成功则递增该值。若count超过了规定时间内的阈值,则代理变成 开路 状态。此时代理开启一个超时计时器(为了给系统一定的时间去修正问题),当它超时后代理被置为 半开路 状况。

  • 开路(Open)。来自应用的请求会立即失败,并返回给应用一个异常。

  • 半开路(Half-Open)。只允许有限的请求通过代理并调用操作。

若这些请求成功了,则切换到 闭路,重置count值。若有任何请求失败了,则断路器会假设错误仍然存在,代理回滚到 开路 状态,重新启动一个时间更长的超时定时器。

此状态还能防止请求突然暴增。当一个服务恢复后,可能只能接受有限的请求直到完全复原。

状态机

失败计数器failure counter会被周期性的重置。这可以防止因为偶然的失败进入开路状态;失败阈值导致开路的情况只能是因为在特定的时间范围内达到了特定的失败次数。

成功计数器success counter记录了半开路状态下成功调用操作的次数。在连续达到一定次数后断路器会恢复到闭路状态。若任何一次调用失败了,断路器将进入开路状态,计数器也将在下次进入半开路时被重置。

如果每次断路器状态改变时就抛出一个事件,则可以检测系统健康状态及时发出警告。

在某些情况下,想比于开路状态返回错误或异常,返回一个默认值可能更有意义。

3. 适用场景

适用:

防止应用在大概率会失败的时候企图调用远程服务或访问分片资源。

不适用:

  • 访问本地资源,只会增加系统负担
  • 作为处理异常的业务逻辑组件

4. 相关模式

5. gobreaker 实现分析

go语言实现版断路器

type CircuitBreaker struct {
   name          string    // 名字
   maxRequests   uint32 // 半开路状态下请求被允许通过的最大数量。若是0,只允许一个请求通过
   interval      time.Duration // 闭路状态下清零计数器的周期。若是0,则闭路时不会清理计数器
   timeout       time.Duration // 开路状态下多久之后变成半开路状态。默认60s
   readyToTrip   func(counts Counts) bool // 从闭路转变为开路的判断依据
   onStateChange func(name string, from State, to State) // 每次状态改变时都会调用的函数

   mutex      sync.Mutex // 在获取状态或转换状态时的锁
   state      State
   generation uint64 // 新生次数
   counts     Counts // 计数器
   expiry     time.Time // 过期时间点
}

5.1 NewCircuitBreaker

func NewCircuitBreaker(st Settings) *CircuitBreaker {
    // 设置一下属性

    // 生成第一代
    cb.toNewGeneration(time.Now())
}

5.2 toNewGeneration

新的状态

func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
   cb.generation++    // 代数++
   cb.counts.clear() // 所有的计数进行清零

   // 设置到期时间,为以后的相关操作提供判断依据
   var zero time.Time
   switch cb.state {
   case StateClosed:
      if cb.interval == 0 {
         cb.expiry = zero
      } else {
         cb.expiry = now.Add(cb.interval)
      }
   case StateOpen:
      cb.expiry = now.Add(cb.timeout)
   default: // StateHalfOpen
      cb.expiry = zero
   }
}

5.3 currentState

更新并返回当前状态,若已到超时时间则生成新代

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
   switch cb.state {
   case StateClosed:
      if !cb.expiry.IsZero() && cb.expiry.Before(now) {
         cb.toNewGeneration(now)
      }
   case StateOpen:
      if cb.expiry.Before(now) {
         cb.setState(StateHalfOpen, now)
      }
   }
   return cb.state, cb.generation
}

5.4 setState

设置状态,新生一代,触发状态更新钩子

func (cb *CircuitBreaker) setState(state State, now time.Time) {
   if cb.state == state {
      return
   }

   prev := cb.state
   cb.state = state

   cb.toNewGeneration(now)

   if cb.onStateChange != nil {
      cb.onStateChange(cb.name, prev, state)
   }
}

5.5 Execute 入口方法

在请求前后调用相关方法

func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
   generation, err := cb.beforeRequest()
   if err != nil {
      return nil, err
   }

   defer func() {
      e := recover()
      if e != nil {
         cb.afterRequest(generation, false)
         panic(e)
      }
   }()

   result, err := req()
   cb.afterRequest(generation, err == nil)
   return result, err
}

5.6 beforeRequest, afterRequest

请求前后执行的方法。都会先获取最新状态。

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
   cb.mutex.Lock()
   defer cb.mutex.Unlock()

   now := time.Now()
   state, generation := cb.currentState(now)

   // 只通过满足条件的请求
   if state == StateOpen {
      return generation, ErrOpenState
   } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
      return generation, ErrTooManyRequests
   }

   // 递增计数器
   cb.counts.onRequest()
   return generation, nil
}

func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
    cb.mutex.Lock()
    defer cb.mutex.Unlock()

    now := time.Now()
    state, generation := cb.currentState(now)
    // 有新代生成则直接返回
    if generation != before {
        return
    }

    if success {
        cb.onSuccess(state, now)
    } else {
        cb.onFailure(state, now)
    }
}

5.7 onSuccess, onFailure

状态转换

func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
   switch state {
   case StateClosed:
      cb.counts.onSuccess()
   case StateHalfOpen:
      cb.counts.onSuccess()
      if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
         cb.setState(StateClosed, now)
      }
   }
}

func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
   switch state {
   case StateClosed:
      cb.counts.onFailure()
      if cb.readyToTrip(cb.counts) {
         cb.setState(StateOpen, now)
      }
   case StateHalfOpen:
      cb.setState(StateOpen, now)
   }
}