puma/puma@88f9cba6fe10 with comments stripped
Puma threadpool will create threads spawn_thread
for worker.
require 'thread'
require 'puma/io_buffer'
module Puma
class ThreadPool
class ForceShutdown < RuntimeError
end
SHUTDOWN_GRACE_TIME = 5 # seconds
def initialize(name, options = {}, &block)
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@mutex = Mutex.new
@todo = []
@spawned = 0
@waiting = 0
@name = name
@min = Integer(options[:min_threads])
@max = Integer(options[:max_threads])
@block = block
@extra = [::Puma::IOBuffer]
@out_of_band = options[:out_of_band]
@clean_thread_locals = options[:clean_thread_locals]
@reaping_time = options[:reaping_time]
@auto_trim_time = options[:auto_trim_time]
@shutdown = false
@trim_requested = 0
@out_of_band_pending = false
@workers = []
@auto_trim = nil
@reaper = nil
@mutex.synchronize do
@min.times do
spawn_thread
@not_full.wait(@mutex)
end
end
@force_shutdown = false
@shutdown_mutex = Mutex.new
end
attr_reader :spawned, :trim_requested, :waiting
def self.clean_thread_locals
Thread.current.keys.each do |key|
Thread.current[key] = nil unless key == :__recursive_key__
end
end
def backlog
with_mutex { @todo.size }
end
def pool_capacity
waiting + (@max - spawned)
end
def busy_threads
with_mutex { @spawned - @waiting + @todo.size }
end
def spawn_thread
@spawned += 1
th = Thread.new(@spawned) do |spawned|
Puma.set_thread_name '%s tp %03i' % [@name, spawned]
todo = @todo
block = @block
mutex = @mutex
not_empty = @not_empty
not_full = @not_full
extra = @extra.map { |i| i.new }
while true
work = nil
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
@spawned -= 1
@workers.delete th
not_full.signal
Thread.exit
end
@waiting += 1
if @out_of_band_pending && trigger_out_of_band_hook
@out_of_band_pending = false
end
not_full.signal
begin
not_empty.wait mutex
ensure
@waiting -= 1
end
end
work = todo.shift
end
if @clean_thread_locals
ThreadPool.clean_thread_locals
end
begin
@out_of_band_pending = true if block.call(work, *extra)
rescue Exception => e
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end
end
@workers << th
th
end
private :spawn_thread
def trigger_out_of_band_hook
return false unless @out_of_band && @out_of_band.any?
return false unless @spawned == @waiting
@out_of_band.each(&:call)
true
rescue Exception => e
STDERR.puts "Exception calling out_of_band_hook: #{e.message} (#{e.class})"
true
end
private :trigger_out_of_band_hook
def with_mutex(&block)
@mutex.owned? ?
yield :
@mutex.synchronize(&block)
end
def <<(work)
with_mutex do
if @shutdown
raise "Unable to add work while shutting down"
end
@todo << work
if @waiting < @todo.size and @spawned < @max
spawn_thread
end
@not_empty.signal
end
end
def wait_until_not_full
with_mutex do
while true
return if @shutdown
return if busy_threads < @max
@not_full.wait @mutex
end
end
end
def wait_for_less_busy_worker(delay_s)
return unless delay_s && delay_s > 0
return unless Puma.mri?
with_mutex do
return if @shutdown
return unless busy_threads > 0
@not_full.wait @mutex, delay_s
end
end
def trim(force=false)
with_mutex do
free = @waiting - @todo.size
if (force or free > 0) and @spawned - @trim_requested > @min
@trim_requested += 1
@not_empty.signal
end
end
end
def reap
with_mutex do
dead_workers = @workers.reject(&:alive?)
dead_workers.each do |worker|
worker.kill
@spawned -= 1
end
@workers.delete_if do |w|
dead_workers.include?(w)
end
end
end
class Automaton
def initialize(pool, timeout, thread_name, message)
@pool = pool
@timeout = timeout
@thread_name = thread_name
@message = message
@running = false
end
def start!
@running = true
@thread = Thread.new do
Puma.set_thread_name @thread_name
while @running
@pool.public_send(@message)
sleep @timeout
end
end
end
def stop
@running = false
@thread.wakeup
end
end
def auto_trim!(timeout=@auto_trim_time)
@auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim)
@auto_trim.start!
end
def auto_reap!(timeout=@reaping_time)
@reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap)
@reaper.start!
end
def with_force_shutdown
t = Thread.current
@shutdown_mutex.synchronize do
raise ForceShutdown if @force_shutdown
t[:with_force_shutdown] = true
end
yield
ensure
t[:with_force_shutdown] = false
end
def shutdown(timeout=-1)
threads = with_mutex do
@shutdown = true
@trim_requested = @spawned
@not_empty.broadcast
@not_full.broadcast
@auto_trim.stop if @auto_trim
@reaper.stop if @reaper
@workers.dup
end
if timeout == -1
threads.each(&:join)
else
join = ->(inner_timeout) do
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
threads.reject! do |t|
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
t.join inner_timeout - elapsed
end
end
join.call(timeout)
@shutdown_mutex.synchronize do
@force_shutdown = true
threads.each do |t|
t.raise ForceShutdown if t[:with_force_shutdown]
end
end
join.call(SHUTDOWN_GRACE_TIME)
threads.each(&:kill)
join.call(1)
end
@spawned = 0
@workers = []
end
end
end
We learn:
-
@todo
keeps track of work:backlog
- if work need to be done is more than min threads, Puma will spawn more threads if it still has capacity (max threads)
- Puma grace shutdown period is 5 seconds.
-
clean_thread_locals
sets values tonil
for things stored inThread.current