require "./global_queue" require "./parallel/scheduler" module Fiber::ExecutionContext # Parallel execution context. # # Fibers running in the same context run both concurrently and in parallel to # each others, in addition to the other fibers running in other execution # contexts. # # The context internally keeps a number of fiber schedulers, each scheduler # runs on a system thread, so multiple schedulers can run in parallel. The # fibers are resumable by any scheduler in the context, and can thus move from # one system thread to another at any time. # # The actual parallelism is dynamic. As the need for parallelism increases, # for example more fibers running longer, the more schedulers will start (and # thus system threads), as the need decreases, for example not enough fibers, # the schedulers will pause themselves and parallelism will decrease. # # The parallelism can be as low as 1, in which case the context becomes a # concurrent context (no parallelism) until resized. # # For example: we can start a parallel context to run consumer fibers, while # the default context produces values. Because the consumer fibers can run in # parallel, we must protect accesses to the shared *value* variable. Running # the example without `Atomic#add` would produce a different result every # time! # # ``` # require "wait_group" # # consumers = Fiber::ExecutionContext::Parallel.new("consumers", 8) # channel = Channel(Int32).new(64) # wg = WaitGroup.new(32) # # result = Atomic.new(0) # # 32.times do # consumers.spawn do # while value = channel.receive? # result.add(value) # end # ensure # wg.done # end # end # # 1024.times { |i| channel.send(i) } # channel.close # # # wait for all workers to be done # wg.wait # # p result.get # => 523776 # ``` # # NOTE: The `Parallel` execution context isn't tied to a fixed set of system # threads, and execution can switch to other system threads, for example when # a fiber is blocked on a syscall. class Parallel include ExecutionContext getter name : String @mutex : Thread::Mutex @condition : Thread::ConditionVariable protected getter global_queue : GlobalQueue # :nodoc: getter stack_pool : Fiber::StackPool = Fiber::StackPool.new # :nodoc: getter event_loop : Crystal::EventLoop @event_loop_lock = Atomic(Bool).new(false) @parked = Atomic(Int32).new(0) @spinning = Atomic(Int32).new(0) # :nodoc: # # Starts the default execution context. There can be only one for the whole # process. Must be called from the main thread's main fiber; associates the # current thread and fiber to the created execution context. protected def self.default(maximum : Int32) : self Fiber.current.execution_context = new("DEFAULT", maximum, hijack: true) end # Starts a `Parallel` context with a *maximum* parallelism. The context # starts with an initial parallelism of zero. It will grow to one when a # fiber is spawned, then the actual parallelism will keep increasing and # decreasing as needed, but will never go past the configured *maximum*. def self.new(name : String, maximum : Int32) : self new(name, maximum, hijack: false) end @[Deprecated("Use Fiber::ExecutionContext::Parallel.new(String, Int32) instead.")] def self.new(name : String, size : Range(Nil, Int32)) : self new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false) end @[Deprecated("Use Fiber::ExecutionContext::Parallel.new(String, Int32) instead.")] def self.new(name : String, size : Range(Int32, Int32)) : self raise ArgumentError.new("invalid range") if size.begin > size.end new(name, size.exclusive? ? size.end - 1 : size.end, hijack: false) end protected def initialize(@name : String, capacity : Int32, hijack : Bool) raise ArgumentError.new("Parallelism can't be less than one.") if capacity < 1 @mutex = Thread::Mutex.new @condition = Thread::ConditionVariable.new @global_queue = GlobalQueue.new(@mutex) @schedulers = Array(Scheduler).new(capacity) @event_loop = Crystal::EventLoop.create(capacity) @started = hijack ? 1 : 0 @rng = Random::PCG32.new start_schedulers(capacity) hijack_current_thread(@schedulers.first) if hijack ExecutionContext.execution_contexts.push(self) end # :nodoc: # # TODO: must report how many schedulers are running (count spinning # schedulers but don't count waiting/parked ones). def size : Int32 @schedulers.count(&.active?) end # The maximum number of schedulers that can be started, aka how many fibers # can run in parallel or maximum parallelism of the context. def capacity : Int32 @schedulers.size end # :nodoc: def stack_pool? : Fiber::StackPool? @stack_pool end # Starts all schedulers at once. # # We could lazily initialize them as needed, would be safe as long as we # only mutate when the mutex is locked, but we iterate the schedulers in # #steal without locking the mutex (for obvious reasons) and there are no # guarantees that the new schedulers.@size will be written after the # scheduler has been written to the array's buffer. # # OPTIMIZE: consider storing schedulers to an array-like object that would # use an atomic/fence to make sure that @size can only be incremented # *after* the value has been written to @buffer. private def start_schedulers(capacity) capacity.times { |index| @schedulers << start_scheduler(index) } end private def start_scheduler(index) scheduler = Scheduler.new(self, "#{@name}-#{index}") @event_loop.register(scheduler, index) scheduler end # Attaches *scheduler* to the current `Thread`, usually the process' main # thread. Starts a `Fiber` to run the scheduler loop. private def hijack_current_thread(scheduler) : Nil thread = Thread.current thread.internal_name = scheduler.name thread.execution_context = self thread.scheduler = scheduler scheduler.thread = thread scheduler.running! end # Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop # directly in the thread's main `Fiber`. private def start_thread(scheduler) : Nil ExecutionContext.thread_pool.checkout(scheduler) end protected def each_scheduler(& : Scheduler ->) : Nil @schedulers.each { |scheduler| yield scheduler } end # Resizes the context to the new *maximum* parallelism. # # The new *maximum* can grow, in which case more schedulers are created to # eventually increase the parallelism. # # The new *maximum* can also shrink, in which case the overflow schedulers # are removed and told to shutdown immediately. The actual shutdown is # cooperative, so running schedulers won't stop until their current fiber # tries to switch to another fiber. def resize(maximum : Int32) : Nil raise ArgumentError.new("Parallelism can't be less than one.") if maximum < 1 removed_schedulers = nil @mutex.synchronize do # can run in parallel to #steal that dereferences @schedulers (once) # without locking the mutex, so we dup the schedulers, mutate the copy, # and eventually assign the copy as @schedulers; this way #steal can # safely access the array (never mutated). new_capacity = maximum old_schedulers = @schedulers old_capacity = capacity if new_capacity > old_capacity @schedulers = Array(Scheduler).new(new_capacity) do |index| old_schedulers[index]? || start_scheduler(index) end elsif new_capacity < old_capacity # tell the overflow schedulers to shutdown removed_schedulers = old_schedulers[new_capacity..] removed_schedulers.each(&.shutdown!) # resize @schedulers = old_schedulers[0...new_capacity] @started = new_capacity if @started > new_capacity # reset @parked counter (we wake all parked threads) so they can # shutdown (if told to): woken_threads = @parked.get(:relaxed) @parked.set(0, :relaxed) # update @spinning prior to unpark threads; we use acquire release # semantics to make sure that all the above stores are visible before # the following wakeup calls (maybe not needed, but let's err on the # safe side) @spinning.add(woken_threads, :acquire_release) # wake every waiting thread: @condition.broadcast @event_loop.interrupt end end return unless removed_schedulers # drain the local queues of removed schedulers since they're no longer # available for stealing removed_schedulers.each do |scheduler| scheduler.@runnables.drain end end # :nodoc: def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber raise ArgumentError.new("#{self.class.name}#spawn doesn't support same_thread:true") if same_thread self.spawn(name: name, &block) end # :nodoc: def enqueue(fiber : Fiber) : Nil if ExecutionContext.current? == self # local enqueue: push to local queue of current scheduler ExecutionContext::Scheduler.current.enqueue(fiber) else # cross context: push to global queue external_enqueue(fiber) end end # :nodoc: def external_enqueue(fiber : Fiber) : Nil Crystal.trace :sched, "enqueue", fiber: fiber, to_context: self @global_queue.push(fiber) # always try to wake a scheduler on external enqueues, they may all be # parked/waiting on evloop: wake_scheduler end # Picks a scheduler at random then iterates all schedulers to try to steal # fibers from. protected def steal(& : Scheduler ->) : Nil return if capacity == 1 schedulers = @schedulers i = @rng.next_int n = schedulers.size n.times do |j| if scheduler = schedulers[(i &+ j) % n]? yield scheduler end end end protected def park_thread(&) : Fiber? @mutex.synchronize do # avoid races by checking queues again if fiber = yield return fiber end Crystal.trace :sched, "park" @parked.add(1, :acquire_release) # NOTE: we could detach the scheduler and return the thread back into # ThreadPool... but it would need synchronization with #wake_scheduler # that wouldn't only start a thread for the next scheduler (@started) # but have to pick a non running scheduler @condition.wait(@mutex) # we don't decrement @parked because #wake_scheduler did Crystal.trace :sched, "wakeup" end nil end # This method always runs in parallel! # # This can be called from any thread in the context but can also be called # from external execution contexts, in which case the context may have its # last thread about to park itself, and we must prevent the last thread from # parking when there is a parallel cross context enqueue! protected def wake_scheduler(count = 1) : Nil # another thread is spinning: nothing to do count -= @spinning.get(:relaxed) return if count < 1 # interrupt a thread waiting on the event loop # FIXME: what if every scheduler can wait on evloop? (i.e. io_uring) if @event_loop.interrupt? count -= 1 return if count == 0 end # we can check @parked without locking the mutex because we can't push to # the global queue _and_ park the thread at the same time, so either the # thread is already parked (and we must awake it) or it noticed (or will # notice) the fiber in the global queue; # # we still rely on an atomic to make sure the actual value is visible by # the current thread if @parked.get(:acquire) > 0 @mutex.synchronize do # avoid race conditions parked = @parked.get(:relaxed) spinning = @spinning.get(:relaxed) if parked > 0 && spinning < count wake = count > parked ? parked : count # increase the number of spinning threads _now_ to avoid multiple # threads from trying to wakeup multiple threads at the same time # # we must also decrement the number of parked threads because another # thread could lock the mutex and increment @spinning again before the # signaled thread is resumed @spinning.add(wake, :acquire_release) @parked.sub(wake, :acquire_release) if wake == parked @condition.broadcast else wake.times { @condition.signal } end count -= wake end end return if count == 0 end # check if we can start another scheduler; no need for atomics, the value # shall be rather stable over time and we check again inside the mutex return if @started >= capacity @mutex.synchronize do start = @started limit = (start + count).clamp(..capacity) (start...limit).each do |index| start_thread(@schedulers[index]) @started += 1 end end end @[AlwaysInline] def inspect(io : IO) : Nil to_s(io) end def to_s(io : IO) : Nil io << "#<" << self.class.name << ":0x" object_id.to_s(io, 16) io << ' ' << name << '>' end end @[Deprecated("Use Fiber::ExecutionContext::Parallel instead.")] alias MultiThreaded = Parallel end