Class Delayed::Job
In: lib/delayed/job.rb
Parent: ActiveRecord::Base

A job object that is persisted to the database. Contains the work object as a YAML field.

Methods

Constants

MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

Public Class methods

When a worker is exiting, make sure we don‘t have any locked jobs.

[Source]

    # File lib/delayed/job.rb, line 37
37:     def self.clear_locks!
38:       update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
39:     end

Add a job to the queue

[Source]

     # File lib/delayed/job.rb, line 107
107:     def self.enqueue(*args, &block)
108:       object = block_given? ? EvaledJob.new(&block) : args.shift
109: 
110:       unless object.respond_to?(:perform) || block_given?
111:         raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
112:       end
113:     
114:       priority = args.first || 0
115:       run_at   = args[1]
116: 
117:       Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
118:     end

Find a few candidate jobs to run (in case some immediately get locked by others).

[Source]

     # File lib/delayed/job.rb, line 121
121:     def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
122: 
123:       time_now = db_time_now
124: 
125:       sql = NextTaskSQL.dup
126: 
127:       conditions = [time_now, time_now - max_run_time, worker_name]
128: 
129:       if self.min_priority
130:         sql << ' AND (priority >= ?)'
131:         conditions << min_priority
132:       end
133: 
134:       if self.max_priority
135:         sql << ' AND (priority <= ?)'
136:         conditions << max_priority
137:       end
138: 
139:       conditions.unshift(sql)
140: 
141:       ActiveRecord::Base.silence do
142:         find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
143:       end
144:     end

Run the next job we can get an exclusive lock on. If no jobs are left we return nil

[Source]

     # File lib/delayed/job.rb, line 148
148:     def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
149: 
150:       # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
151:       # this leads to a more even distribution of jobs across the worker processes
152:       find_available(5, max_run_time).each do |job|
153:         t = job.run_with_lock(max_run_time, worker_name)
154:         return t unless t == nil  # return if we did work (good or bad)
155:       end
156: 
157:       nil # we didn't do any work, all 5 were not lockable
158:     end

Do num jobs and return stats on success/failure. Exit early if interrupted.

[Source]

     # File lib/delayed/job.rb, line 195
195:     def self.work_off(num = 100)
196:       success, failure = 0, 0
197: 
198:       num.times do
199:         case self.reserve_and_run_one_job
200:         when true
201:             success += 1
202:         when false
203:             failure += 1
204:         else
205:           break  # leave if no work could be done
206:         end
207:         break if $exit # leave if we're exiting
208:       end
209: 
210:       return [success, failure]
211:     end

Public Instance methods

failed()

Alias for failed?

[Source]

    # File lib/delayed/job.rb, line 41
41:     def failed?
42:       failed_at
43:     end

Moved into its own method so that new_relic can trace it.

[Source]

     # File lib/delayed/job.rb, line 214
214:     def invoke_job
215:       payload_object.perform
216:     end

Lock this job for this worker. Returns true if we have the lock, false otherwise.

[Source]

     # File lib/delayed/job.rb, line 162
162:     def lock_exclusively!(max_run_time, worker = worker_name)
163:       now = self.class.db_time_now
164:       affected_rows = if locked_by != worker
165:         # We don't own this job so we will update the locked_by name and the locked_at
166:         self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
167:       else
168:         # We already own this job, this may happen if the job queue crashes.
169:         # Simply resume and update the locked_at
170:         self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
171:       end
172:       if affected_rows == 1
173:         self.locked_at    = now
174:         self.locked_by    = worker
175:         return true
176:       else
177:         return false
178:       end
179:     end

This is a good hook if you need to report job processing errors in additional or different ways

[Source]

     # File lib/delayed/job.rb, line 188
188:     def log_exception(error)
189:       logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
190:       logger.error(error)
191:     end

[Source]

    # File lib/delayed/job.rb, line 50
50:     def name
51:       @name ||= begin
52:         payload = payload_object
53:         if payload.respond_to?(:display_name)
54:           payload.display_name
55:         else
56:           payload.class.name
57:         end
58:       end
59:     end

[Source]

    # File lib/delayed/job.rb, line 46
46:     def payload_object
47:       @payload_object ||= deserialize(self['handler'])
48:     end

[Source]

    # File lib/delayed/job.rb, line 61
61:     def payload_object=(object)
62:       self['handler'] = object.to_yaml
63:     end

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.

[Source]

    # File lib/delayed/job.rb, line 67
67:     def reschedule(message, backtrace = [], time = nil)
68:       if (self.attempts += 1) < MAX_ATTEMPTS
69:         time ||= Job.db_time_now + (attempts ** 4) + 5
70: 
71:         self.run_at       = time
72:         self.last_error   = message + "\n" + backtrace.join("\n")
73:         self.unlock
74:         save!
75:       else
76:         logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
77:         destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now)
78:       end
79:     end

Try to run one job. Returns true/false (work done/work failed) or nil if job can‘t be locked.

[Source]

     # File lib/delayed/job.rb, line 83
 83:     def run_with_lock(max_run_time, worker_name)
 84:       logger.info "* [JOB] acquiring lock on #{name}"
 85:       unless lock_exclusively!(max_run_time, worker_name)
 86:         # We did not get the lock, some other worker process must have
 87:         logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
 88:         return nil # no work done
 89:       end
 90: 
 91:       begin
 92:         runtime =  Benchmark.realtime do
 93:           Timeout.timeout(max_run_time.to_i) { invoke_job }
 94:           destroy
 95:         end
 96:         # TODO: warn if runtime > max_run_time ?
 97:         logger.info "* [JOB] #{name} completed after %.4f" % runtime
 98:         return true  # did work
 99:       rescue Exception => e
100:         reschedule e.message, e.backtrace
101:         log_exception(e)
102:         return false  # work failed
103:       end
104:     end

Unlock this job (note: not saved to DB)

[Source]

     # File lib/delayed/job.rb, line 182
182:     def unlock
183:       self.locked_at    = nil
184:       self.locked_by    = nil
185:     end

Protected Instance methods

[Source]

     # File lib/delayed/job.rb, line 255
255:     def before_save
256:       self.run_at ||= self.class.db_time_now
257:     end

[Validate]