class Rufus::Scheduler
Constants
- DURATIONS
- DURATIONS2
- DURATIONS2M
- DURATION_LETTERS
- DU_KEYS
- EoTime
- MAX_WORK_THREADS
MIN_WORK_THREADS = 3
- VERSION
Attributes
attr_accessor :min_work_threads
Public Class Methods
Produces a hour/min/sec/milli string representation of Time instance
# File lib/rufus/scheduler/util.rb, line 252 def self.h_to_s(t=Time.now) "#{t.strftime('%H:%M:%S')}.#{sprintf('%06d', t.usec)}" end
# File lib/rufus/scheduler.rb, line 56 def initialize(opts={}) @opts = opts @started_at = nil @paused = false @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @mutexes = {} @work_queue = Queue.new #@min_work_threads = opts[:min_work_threads] || MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. lock || return start end
# File lib/rufus/scheduler/util.rb, line 10 def self.parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || fail(ArgumentError.new("couldn't parse #{o.inspect} (#{o.class})")) end
# File lib/rufus/scheduler/util.rb, line 45 def self.parse_at(o, opts={}) return o if o.is_a?(EoTime) return EoTime.make(o) if o.is_a?(Time) EoTime.parse(o, opts) rescue StandardError => se return nil if opts[:no_error] fail se end
# File lib/rufus/scheduler/util.rb, line 20 def self.parse_cron(o, opts) o.is_a?(CronLine) ? o : CronLine.new(o) rescue ArgumentError => ae return nil if opts[:no_error] fail ae end
Turns a string like '1m10s' into a float like '70.0', more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year 'nada' -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5 Rufus::Scheduler.parse_duration "500" # => 0.5 Rufus::Scheduler.parse_duration "1000" # => 1.0 Rufus::Scheduler.parse_duration "1h" # => 3600.0 Rufus::Scheduler.parse_duration "1h10s" # => 3610.0 Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5 Rufus::Scheduler.parse_duration "-1h" # => -3600.0
# File lib/rufus/scheduler/util.rb, line 101 def self.parse_duration(string, opts={}) s = string.to_s.strip mod = s[0, 1] == '-' ? -1 : 1 s = s[1..-1] if mod == -1 ss = mod < 0 ? '-' : '' r = 0.0 s.scan(/(\d*\.\d+|\d+\.?)([#{DURATION_LETTERS}]?)/) do |f, d| ss += "#{f}#{d}" r += f.to_f * (DURATIONS[d] || 1.0) end if ss == '-' || ss != string.to_s.strip return nil if opts[:no_error] fail ArgumentError.new("invalid time duration #{string.inspect}") end mod * r end
-
for compatibility with rufus-scheduler 2.x
+
# File lib/rufus/scheduler/util.rb, line 30 def self.parse_in(o, opts={}) #o.is_a?(String) ? parse_duration(o, opts) : o return parse_duration(o, opts) if o.is_a?(String) return o if o.is_a?(Numeric) fail ArgumentError.new("couldn't parse time point in #{o.inspect}") rescue ArgumentError => ae return nil if opts[:no_error] fail ae end
Alias for ::singleton
# File lib/rufus/scheduler.rb, line 101 def self.s(opts={}); singleton(opts); end
Returns a singleton Rufus::Scheduler instance
# File lib/rufus/scheduler.rb, line 94 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let's assume the people pointing at rufus-scheduler/master on GitHub know what they do…
# File lib/rufus/scheduler.rb, line 109 def self.start_new fail "this is rufus-scheduler 3.x, use .new instead of .start_new" end
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s' Rufus.to_duration 60 # => '1m' Rufus.to_duration 3661 # => '1h1m1s' Rufus.to_duration 7 * 24 * 3600 # => '1w' Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without 'marker'
Rufus.to_duration 0.051 # => "51" Rufus.to_duration 7.051 # => "7s51" Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for ::parse_time_string()).
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 164 def self.to_duration(seconds, options={}) h = to_duration_hash(seconds, options) return (options[:drop_seconds] ? '0m' : '0s') if h.empty? s = DU_KEYS.inject('') { |r, key| count = h[key] count = nil if count == 0 r << "#{count}#{key}" if count r } ms = h[:ms] s << ms.to_s if ms s end
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051 # => { :ms => "51" } Rufus.to_duration_hash 7.051 # => { :s => 7, :ms => "51" } Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1 # => { :w => 4, :d => 2, :s => 1, :ms => "120" }
This method is used by ::to_duration behind the scenes.
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
# File lib/rufus/scheduler/util.rb, line 210 def self.to_duration_hash(seconds, options={}) h = {} if seconds.is_a?(Float) h[:ms] = (seconds % 1 * 1000).to_i seconds = seconds.to_i end if options[:drop_seconds] h.delete(:ms) seconds = (seconds - seconds % 60) end durations = options[:months] ? DURATIONS2M : DURATIONS2 durations.each do |key, duration| count = seconds / duration seconds = seconds % duration h[key.to_sym] = count if count > 0 end h end
-
for compatibility with rufus-scheduler 2.x
+
Produces the UTC string representation of a Time instance
like “2009/11/23 11:11:50.947109 UTC”
# File lib/rufus/scheduler/util.rb, line 245 def self.utc_to_s(t=Time.now) "#{t.utc.strftime('%Y-%m-%d %H:%M:%S')}.#{sprintf('%06d', t.usec)} UTC" end
Public Instance Methods
# File lib/rufus/scheduler.rb, line 183 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 294 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end
Callback called when a job is triggered. If the lock cannot be acquired, the job won't run (though it'll still be scheduled to run again if necessary).
# File lib/rufus/scheduler.rb, line 362 def confirm_lock @trigger_lock.lock end
# File lib/rufus/scheduler.rb, line 223 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 314 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end
# File lib/rufus/scheduler.rb, line 154 def down? ! @started_at end
# File lib/rufus/scheduler.rb, line 203 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 304 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end
# File lib/rufus/scheduler.rb, line 193 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 299 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end
# File lib/rufus/scheduler.rb, line 213 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end
# File lib/rufus/scheduler.rb, line 309 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end
# File lib/rufus/scheduler.rb, line 319 def job(job_id) @jobs[job_id] end
Returns all the scheduled jobs (even those right before re-schedule).
# File lib/rufus/scheduler.rb, line 276 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end tags = Array(opts[:tag] || opts[:tags]).collect(&:to_s) jobs = jobs.reject { |j| tags.find { |t| ! j.tags.include?(t) } } jobs end
# File lib/rufus/scheduler.rb, line 145 def join fail NotRunningError.new( 'cannot join scheduler that is not running' ) unless @thread @thread.join end
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => 'path/to/lock/file' scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won't work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to lock and unlock, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
# File lib/rufus/scheduler.rb, line 345 def lock @scheduler_lock.lock end
# File lib/rufus/scheduler.rb, line 418 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, _)| t } else h end end
# File lib/rufus/scheduler.rb, line 441 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") # TODO: eventually use a Job#detail or something like that stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") stderr.puts(" #{pre} et-orbi:") stderr.puts(" #{pre} #{EoTime.platform_info}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue: #{work_queue.size}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end
# File lib/rufus/scheduler.rb, line 169 def pause @paused = true end
# File lib/rufus/scheduler.rb, line 164 def paused? @paused end
# File lib/rufus/scheduler.rb, line 247 def repeat(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 174 def resume @paused = false end
# File lib/rufus/scheduler.rb, line 413 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end
# File lib/rufus/scheduler.rb, line 233 def schedule(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Scheduler.parse(arg, opts) case opts[:_t] when CronLine then schedule_cron(arg, callable, opts, &block) when Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end
# File lib/rufus/scheduler.rb, line 188 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 228 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 208 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 198 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end
# File lib/rufus/scheduler.rb, line 218 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
# File lib/rufus/scheduler.rb, line 372 def scheduled?(job_or_job_id) job, _ = fetch(job_or_job_id) !! (job && job.unscheduled_at.nil? && job.next_time != nil) end
# File lib/rufus/scheduler.rb, line 114 def shutdown(opt=nil) @started_at = nil #jobs.each { |j| j.unschedule } # provokes https://github.com/jmettraux/rufus-scheduler/issue/98 @jobs.array.each { |j| j.unschedule } @work_queue.clear if opt == :wait join_all_work_threads elsif opt == :kill kill_all_work_threads end unlock end
Lists all the threads associated with this scheduler.
# File lib/rufus/scheduler.rb, line 381 def threads Thread.list.select { |t| t[thread_key] } end
# File lib/rufus/scheduler.rb, line 436 def timeline(time0, time1) occurrences(time0, time1, :timeline) end
Sister method to lock, is called when the scheduler shuts down.
# File lib/rufus/scheduler.rb, line 352 def unlock @trigger_lock.unlock @scheduler_lock.unlock end
# File lib/rufus/scheduler.rb, line 260 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end
# File lib/rufus/scheduler.rb, line 159 def up? !! @started_at end
# File lib/rufus/scheduler.rb, line 135 def uptime @started_at ? EoTime.now - @started_at : nil end
# File lib/rufus/scheduler.rb, line 140 def uptime_s uptime ? self.class.to_duration(uptime) : '' end
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
-
:all (default), returns all the threads that are work threads or are currently running a job
-
:active, returns all threads that are currently running a job
-
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
# File lib/rufus/scheduler.rb, line 399 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_job] || t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end
Protected Instance Methods
# File lib/rufus/scheduler.rb, line 597 def do_schedule(job_type, t, callable, opts, return_job_instance, block) fail NotRunningError.new( 'cannot schedule, scheduler is down or shutting down' ) if @started_at.nil? callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup unless opts.has_key?(:_t) return_job_instance ||= opts[:job] job_class = case job_type when :once opts[:_t] ||= Rufus::Scheduler.parse(t, opts) opts[:_t].is_a?(Numeric) ? InJob : AtJob when :every EveryJob when :interval IntervalJob when :cron CronJob end job = job_class.new(self, t, opts, block || callable) fail ArgumentError.new( "job frequency (#{job.frequency}) is higher than " + "scheduler frequency (#{@frequency})" ) if job.respond_to?(:frequency) && job.frequency < @frequency @jobs.push(job) return_job_instance ? job : job.job_id end
Returns [ job, job_id ]
# File lib/rufus/scheduler.rb, line 506 def fetch(job_or_job_id) if job_or_job_id.respond_to?(:job_id) [ job_or_job_id, job_or_job_id.job_id ] else [ job(job_or_job_id), job_or_job_id ] end end
# File lib/rufus/scheduler.rb, line 522 def join_all_work_threads work_threads.size.times { @work_queue << :sayonara } work_threads.each { |t| t.join } @work_queue.clear end
# File lib/rufus/scheduler.rb, line 531 def kill_all_work_threads work_threads.each { |t| t.kill } end
def free_all_work_threads
work_threads.each { |t| t.raise(KillSignal) }
end
# File lib/rufus/scheduler.rb, line 541 def start @started_at = EoTime.now @thread = Thread.new do while @started_at do unschedule_jobs trigger_jobs unless @paused timeout_jobs sleep(@frequency) end end @thread[@thread_key] = true @thread[:rufus_scheduler] = self @thread[:name] = @opts[:thread_name] || "#{@thread_key}_scheduler" end
# File lib/rufus/scheduler.rb, line 515 def terminate_all_jobs jobs.each { |j| j.unschedule } sleep 0.01 while running_jobs.size > 0 end
# File lib/rufus/scheduler.rb, line 578 def timeout_jobs work_threads(:active).each do |t| job = t[:rufus_scheduler_job] to = t[:rufus_scheduler_timeout] ts = t[:rufus_scheduler_time] next unless job && to && ts # thread might just have become inactive (job -> nil) to = ts + to unless to.is_a?(EoTime) next if to > EoTime.now t.raise(Rufus::Scheduler::TimeoutError) end end
# File lib/rufus/scheduler.rb, line 568 def trigger_jobs now = EoTime.now @jobs.each(now) do |job| job.trigger(now) end end
# File lib/rufus/scheduler.rb, line 563 def unschedule_jobs @jobs.delete_unscheduled end