Class | Delayed::Job |
In: |
lib/delayed/job.rb
|
Parent: | ActiveRecord::Base |
A job object that is persisted to the database. Contains the work object as a YAML field.
MAX_ATTEMPTS | = | 25 |
MAX_RUN_TIME | = | 4.hours |
NextTaskSQL | = | '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL' |
NextTaskOrder | = | 'priority DESC, run_at ASC' |
ParseObjectFromYaml | = | /\!ruby\/\w+\:([^\s]+)/ |
When a worker is exiting, make sure we don‘t have any locked jobs.
# File lib/delayed/job.rb, line 37 37: def self.clear_locks! 38: update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) 39: end
Add a job to the queue
# File lib/delayed/job.rb, line 107 107: def self.enqueue(*args, &block) 108: object = block_given? ? EvaledJob.new(&block) : args.shift 109: 110: unless object.respond_to?(:perform) || block_given? 111: raise ArgumentError, 'Cannot enqueue items which do not respond to perform' 112: end 113: 114: priority = args.first || 0 115: run_at = args[1] 116: 117: Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) 118: end
Find a few candidate jobs to run (in case some immediately get locked by others).
# File lib/delayed/job.rb, line 121 121: def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) 122: 123: time_now = db_time_now 124: 125: sql = NextTaskSQL.dup 126: 127: conditions = [time_now, time_now - max_run_time, worker_name] 128: 129: if self.min_priority 130: sql << ' AND (priority >= ?)' 131: conditions << min_priority 132: end 133: 134: if self.max_priority 135: sql << ' AND (priority <= ?)' 136: conditions << max_priority 137: end 138: 139: conditions.unshift(sql) 140: 141: ActiveRecord::Base.silence do 142: find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) 143: end 144: end
Run the next job we can get an exclusive lock on. If no jobs are left we return nil
# File lib/delayed/job.rb, line 148 148: def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME) 149: 150: # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. 151: # this leads to a more even distribution of jobs across the worker processes 152: find_available(5, max_run_time).each do |job| 153: t = job.run_with_lock(max_run_time, worker_name) 154: return t unless t == nil # return if we did work (good or bad) 155: end 156: 157: nil # we didn't do any work, all 5 were not lockable 158: end
Do num jobs and return stats on success/failure. Exit early if interrupted.
# File lib/delayed/job.rb, line 195 195: def self.work_off(num = 100) 196: success, failure = 0, 0 197: 198: num.times do 199: case self.reserve_and_run_one_job 200: when true 201: success += 1 202: when false 203: failure += 1 204: else 205: break # leave if no work could be done 206: end 207: break if $exit # leave if we're exiting 208: end 209: 210: return [success, failure] 211: end
Moved into its own method so that new_relic can trace it.
# File lib/delayed/job.rb, line 214 214: def invoke_job 215: payload_object.perform 216: end
Lock this job for this worker. Returns true if we have the lock, false otherwise.
# File lib/delayed/job.rb, line 162 162: def lock_exclusively!(max_run_time, worker = worker_name) 163: now = self.class.db_time_now 164: affected_rows = if locked_by != worker 165: # We don't own this job so we will update the locked_by name and the locked_at 166: self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now]) 167: else 168: # We already own this job, this may happen if the job queue crashes. 169: # Simply resume and update the locked_at 170: self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) 171: end 172: if affected_rows == 1 173: self.locked_at = now 174: self.locked_by = worker 175: return true 176: else 177: return false 178: end 179: end
This is a good hook if you need to report job processing errors in additional or different ways
# File lib/delayed/job.rb, line 188 188: def log_exception(error) 189: logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts" 190: logger.error(error) 191: end
# File lib/delayed/job.rb, line 50 50: def name 51: @name ||= begin 52: payload = payload_object 53: if payload.respond_to?(:display_name) 54: payload.display_name 55: else 56: payload.class.name 57: end 58: end 59: end
# File lib/delayed/job.rb, line 46 46: def payload_object 47: @payload_object ||= deserialize(self['handler']) 48: end
# File lib/delayed/job.rb, line 61 61: def payload_object=(object) 62: self['handler'] = object.to_yaml 63: end
Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.
# File lib/delayed/job.rb, line 67 67: def reschedule(message, backtrace = [], time = nil) 68: if (self.attempts += 1) < MAX_ATTEMPTS 69: time ||= Job.db_time_now + (attempts ** 4) + 5 70: 71: self.run_at = time 72: self.last_error = message + "\n" + backtrace.join("\n") 73: self.unlock 74: save! 75: else 76: logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." 77: destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now) 78: end 79: end
Try to run one job. Returns true/false (work done/work failed) or nil if job can‘t be locked.
# File lib/delayed/job.rb, line 83 83: def run_with_lock(max_run_time, worker_name) 84: logger.info "* [JOB] acquiring lock on #{name}" 85: unless lock_exclusively!(max_run_time, worker_name) 86: # We did not get the lock, some other worker process must have 87: logger.warn "* [JOB] failed to acquire exclusive lock for #{name}" 88: return nil # no work done 89: end 90: 91: begin 92: runtime = Benchmark.realtime do 93: Timeout.timeout(max_run_time.to_i) { invoke_job } 94: destroy 95: end 96: # TODO: warn if runtime > max_run_time ? 97: logger.info "* [JOB] #{name} completed after %.4f" % runtime 98: return true # did work 99: rescue Exception => e 100: reschedule e.message, e.backtrace 101: log_exception(e) 102: return false # work failed 103: end 104: end
Unlock this job (note: not saved to DB)
# File lib/delayed/job.rb, line 182 182: def unlock 183: self.locked_at = nil 184: self.locked_by = nil 185: end