class MovieMasher::Job

Represents a single transcoding operation. Once process is called all ofthe job's inputs are downloaded and combined together into one mashup,which is then rendered into each of the formats specified by the job's#outputs.

These rendered files are then uploaded to the job's destination, or to theoutput's if it has one defined. At junctures during processing the job's#callbacks are requested, so as to alert remote systems of job status.

# construct a job and process it
job = Job.new('./job.json', render_directory: './temp')
job.process
# => true
job[:duration]
# => 360
job.progress
# => {
  rendering: 1,
  uploading: 1,
  downloading: 1,
  downloaded: 1,
  rendered: 1,
  uploaded: 1
}

Public Class Methods

__init_hash(job) click to toggle source
# File lib/job.rb, line 30
def self.__init_hash(job)
  job[:progress] = Hash.new { 0 }
  Hashable._init_key(job, :id, SecureRandom.uuid)
  Hashable._init_key(job, :inputs, [])
  Hashable._init_key(job, :outputs, [])
  Hashable._init_key(job, :callbacks, [])
  Hashable._init_key(job, :commands, []) # stores commands as executed
  Hashable._init_key(job, :results, []) # stores results of commands
  job
end
create(hash = nil) click to toggle source
# File lib/job.rb, line 27
def self.create(hash = nil)
  (hash.is_a?(Job) ? hash : Job.new(hash))
end
new(hash_or_path) click to toggle source

Create a new Job object from a nested structure or a file path.

hash_or_path

Hash or String expected to be a path to a JSON or YML file,which will be parse to the Hash.

Calls superclass method
# File lib/job.rb, line 98
def initialize(hash_or_path)
  @logger = nil
  @cached_audio_graphs = nil
  @cached_video_graphs = nil
  super Hashable.resolved_hash(hash_or_path)
  self.class.__init_hash(@hash)
  path_job = __path_job
  FileHelper.safe_path path_job
  path_log = "#{path_job}/log.txt"
  @hash[:log] = proc { File.read(path_log) }
  # write massaged job json to job directory
  path_job = Path.concat(path_job, 'job.json')
  File.open(path_job, 'w') { |f| f << @hash.to_json }
  # if we encountered a parsing error, log it
  log_entry(:error) { @hash[:error] } if @hash[:error]
end

Public Instance Methods

__assure_sequence_complete(output) click to toggle source
# File lib/job.rb, line 244
def __assure_sequence_complete(output)
  if Type::SEQUENCE == output[:type]
    output[:rendered_file] = File.dirname(output[:rendered_file])
    Output.sequence_complete(output)
  else
    true
  end
end
__cache_asset(asset) click to toggle source
# File lib/job.rb, line 252
def __cache_asset(asset) # input or media
  url_path = Asset.download_asset(asset, self)
  return unless url_path
  progress[:downloaded] += 1
  __callback(:progress)
end
__callback(type) click to toggle source
# File lib/job.rb, line 258
def __callback(type)
  log_entry(:debug) { "__callback #{type.id2name}" }
  did_trigger = false
  type_str = type.id2name
  type_callbacks = @hash[:callbacks].select { |c| type_str == c[:trigger] }
  type_callbacks.each do |callback|
    dont_trigger = false
    if :progress == type
      called = callback[:called]
      next if called && called + callback[:progress_seconds] > Time.now
      callback[:called] = Time.now
    else
      dont_trigger = callback[:called]
      callback[:called] = true unless dont_trigger
    end
    next if dont_trigger
    did_trigger = true
    data = __hash_or_array(callback)
    trigger_error = __callback_request(data, callback)
    progress[:called] += 1 unless :progress == type
    if trigger_error && callback[:required]
      log_entry(:error) { trigger_error }
    end
  end
  did_trigger
end
__callback_request(data, callback) click to toggle source
# File lib/job.rb, line 284
def __callback_request(data, callback)
  err = nil
  begin
    destination_path = callback.full_path
    destination_path = Evaluate.value(destination_path, __scope(callback))
    case callback[:type]
    when Type::FILE
      FileHelper.safe_path(File.dirname(destination_path))
      callback[:callback_file] = destination_path
      if data
        file = Path.concat(__path_job, "callback-#{SecureRandom.uuid}.json")
        File.open(file, 'w') { |f| f.write(data.to_json) }
        Transfer.file(callback[:method], file, destination_path)
      end
    when Type::HTTP, Type::HTTPS
      uri = URI(callback.url)
      uri.port = callback[:port].to_i if callback[:port]
      uri.query = Transfer.query_string(callback, self)
      # __transfer_uri_parameters(callback, uri, callback)
      req = nil
      if data
        headers = { 'Content-Type' => 'application/json' }
        req = Net::HTTP::Post.new(uri, headers)
        log_entry(:debug) { "posting callback #{uri}" }
        req.body = data.to_json
      else
        # simple get request
        log_entry(:debug) { "getting callback #{uri}" }
        req = Net::HTTP::Get.new(uri)
      end
      if callback[:user] && callback[:pass]
        req.basic_auth(callback[:user], callback[:pass])
      end
      use_ssl = (uri.scheme == 'https')
      Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
        result = http.request(req)
        err = __result_error(result)
      end
    else
      err = "unsupported callback type #{callback[:type]}"
    end
  rescue => e
    err = e.message
    puts "CAUGHT #{e.is_a?(Error::Job)} #{e.message} #{e.backtrace}"
  end
  log_entry(:warn) { err } if err
  err
end
__evaluated_transfer_path(output_dest, output) click to toggle source
# File lib/job.rb, line 332
def __evaluated_transfer_path(output_dest, output)
  file_name = output_dest.file_name
  if file_name.empty?
    # transfer didn't supply one
    file_name = output[:path]
    unless Type::SEQUENCE == output[:type]
      file_name = Path.concat(file_name, output.file_name)
    end
  end
  key = Path.concat(output_dest[:directory], output_dest[:path])
  key = Path.concat key, file_name
  Evaluate.value key, __scope(output)
end
__execute_and_log(options) click to toggle source
# File lib/job.rb, line 345
def __execute_and_log(options)
  # retrieve command before execution, so we can log before problems
  command = ShellHelper.command(options)
  log_entry(:debug) { command }
  @hash[:commands] << command
  result = ShellHelper.capture(command)
  log_entry(:debug) { result }
  @hash[:results] << result
  logs = ShellHelper.raise_unless_rendered(result, options)
  logs.each do |hash|
    hash.each do |sym, proc|
      log_entry(sym, &proc)
    end
  end
  result
end
__execute_output_command(output, cmd_hash) click to toggle source
# File lib/job.rb, line 361
def __execute_output_command(output, cmd_hash)
  out_path = cmd_hash[:file]
  content = cmd_hash[:content]
  FileHelper.safe_path(File.dirname(out_path))
  if content
    File.open(out_path, 'w') { |f| f << content }
  elsif !File.exist?(out_path)
    cmd = cmd_hash[:command]
    duration = cmd_hash[:duration]
    precision = cmd_hash[:precision]
    app = cmd_hash[:app]
    do_single_pass = !cmd_hash[:pass]
    unless do_single_pass
      pl = Path.concat(__path_job, "pass-#{SecureRandom.uuid}") #
      cmd_1 = "#{cmd} -pass 1 -passlogfile #{pl} -f #{output[:extension]}"
      cmd_2 = "#{cmd} -pass 2 -passlogfile #{pl}"
      begin
        __execute_and_log(app: app, command: cmd_1, file: '/dev/null')
        __execute_and_log(
          app: app, command: cmd_2, file: out_path,
          duration: duration, precision: precision
        )
      rescue => e
        puts "CAUGHT #{e.is_a?(Error::Job)} #{e.message} #{e.backtrace}"
        log_entry(:debug) { e.message }
        log_entry(:warn) { 'two pass encoding failed, retrying in one' }
        do_single_pass = true
      end
    end
    __execute_and_log(cmd_hash) if do_single_pass
  end
end
__hash_or_array(callback) click to toggle source
# File lib/job.rb, line 393
def __hash_or_array(callback)
  data = callback[:data]
  data = nil unless data.is_a?(Hash) || data.is_a?(Array)
  if data
    data = Marshal.load(Marshal.dump(data))
    Evaluate.object(data, __scope(callback))
  end
  data
end
__init_progress() click to toggle source
# File lib/job.rb, line 402
def __init_progress
  # clear existing progress, but not error (allows callback testing)
  self[:progress] = Hash.new { 0 }
  progress[:rendering] += outputs.length
  progress[:uploading] += outputs.length
  inputs.each do |input|
    if input[:input_url]
      progress[:downloading] += 1
    elsif Type::MASH == input[:type]
      mash = input.mash
      progress[:downloading] += (mash ? mash.url_count(outputs_desire) : 1)
    end
  end
  progress_callbacks = callbacks.select { |c| 'progress' == c[:trigger] }
  progress[:calling] += progress_callbacks.length
end
__input_dimensions() click to toggle source
# File lib/job.rb, line 418
def __input_dimensions
  dimensions = nil
  found_mash = false
  inputs.each do |input|
    case input[:type]
    when Type::MASH
      found_mash = true
    when Type::IMAGE, Type::VIDEO
      dimensions = input[:dimensions]
    end
    break if dimensions
  end
  dimensions = '' if !dimensions && found_mash
  dimensions
end
__log_exception(exception, is_warning = false) click to toggle source
# File lib/job.rb, line 447
def __log_exception(exception, is_warning = false)
  if exception
    unless exception.is_a?(Error::Job)
      str = "#{exception.backtrace.join "\n"}\n#{exception.message}"
      puts str # so it gets in cron log as well
    end
    log_entry(:debug) { exception.backtrace.join("\n") }
    log_entry(is_warning ? :warn : :error) { exception.message }
  end
  nil
end
__logger() click to toggle source
# File lib/job.rb, line 433
def __logger
  unless @logger
    log_dir = __path_job
    FileHelper.safe_path(log_dir)
    @logger = Logger.new(Path.concat(log_dir, 'log.txt'))
    ll = @hash[:log_level]
    ll = MovieMasher.configuration[:verbose] if ll.to_s.empty?
    ll = 'info' if ll.to_s.empty?
    ll = ll.upcase
    ll = (Logger.const_defined?(ll) ? Logger.const_get(ll) : Logger::INFO)
    @logger.level = ll
  end
  @logger
end
__output_commands(output) click to toggle source
# File lib/job.rb, line 458
def __output_commands(output)
  ShellHelper.set_output_commands(self, output) unless output[:commands]
  output[:commands]
end
__path_job() click to toggle source
# File lib/job.rb, line 462
def __path_job
  path = MovieMasher.configuration[:render_directory]
  path = Path.concat(path, identifier)
  Path.add_slash_end(path)
end
__process_download() click to toggle source
# File lib/job.rb, line 467
def __process_download
  __callback(:initiate)
  desired = outputs_desire
  inputs.each do |input|
    input_url = input[:input_url]
    type = input[:type]
    if input_url
      if Type::MASH == type || AV.includes?(Asset.av_type(input), desired)
        # we won't know if desired content types exist until cached & parsed
        __cache_asset(input)
        if Type::MASH == type
          # read and parse mash json file
          input[:mash] = JSON.parse(File.read(input[:cached_file]))
          Mash.init_mash_input(input)
          progress[:downloading] += input.mash.url_count(outputs_desire)
        end
      end
    end
    if Type::MASH == type && AV.includes?(Asset.av_type(input), desired)
      input[:mash][:media].each do |media|
        if Type::ASSETS.include?(media[:type])
          if AV.includes?(Asset.av_type(media), desired)
            __cache_asset(media)
          end
        end
      end
    end
    break if @hash[:error]
  end
  @hash[:duration] = TimeRange.update(inputs, outputs)
  __update_sizing
end
__process_render() click to toggle source
# File lib/job.rb, line 499
def __process_render
  outputs.each do |output|
    begin
      cmds = __output_commands(output)
      raise(Error::JobInput, 'could not build commands') if cmds.empty?
      # we only added one for each output, so add more minus one
      progress[:rendering] += cmds.length - 1
      if Type::SEQUENCE == output[:type]
        # sequences have additional uploads, which we can now calculate
        frames = output[:video_rate].to_f * output[:duration]
        progress[:uploading] += frames.floor.to_i - 1
      end
      last_file = nil
      result = nil
      cmds.each do |cmd_hash|
        last_file = cmd_hash[:file]
        __execute_output_command(output, cmd_hash)
        progress[:rendered] += 1
        __callback(:progress)
      end
      output[:rendered_file] = last_file
      unless __assure_sequence_complete(output)
        raise(Error::JobRender, 'no sequence files generated')
      end
    rescue Error::Job => e
      __log_exception(e, !output[:required])
      raise if output[:required]
    end
  end
end
__process_upload() click to toggle source
# File lib/job.rb, line 529
def __process_upload
  outputs.each do |output|
    next unless output[:rendered_file]
    begin
      __transfer_job_output(output, output[:rendered_file])
    rescue Error::Job => e
      __log_exception(e, !output[:required])
      raise if output[:required]
    end
  end
end
__result_error(result) click to toggle source
# File lib/job.rb, line 540
def __result_error(result)
  if '200' == result.code
    log_entry(:debug) { "callback OK response: #{result.body}" }
    nil
  else
    "callback ERROR #{result.code} response: #{result.body}"
  end
end
__scope(object = nil) click to toggle source
# File lib/job.rb, line 548
def __scope(object = nil)
  hash = {}
  hash[:job] = self
  hash[object.class_symbol] = object if object && object.is_a?(Hashable)
  hash
end
__transfer_job_output(output, file) click to toggle source
# File lib/job.rb, line 554
def __transfer_job_output(output, file)
  output_dest = output[:destination] || destination
  raise(Error::JobInput, 'no output destination') unless output_dest
  if File.exist?(file)
    if output_dest[:archive] || output[:archive]
      raise(Error::Todo, 'support for archive option coming...')
    end
    options = {
      upload: file, output: output, destination: output_dest,
      path: __evaluated_transfer_path(output_dest, output)
    }
    output_dest.directory_files(file).each do |up_file|
      output_dest.upload(options.merge(file: up_file))
      progress[:uploaded] += 1
      __callback(:progress)
    end
  else
    log_entry(:warn) { "file was not rendered #{file}" }
    log_entry(:error) { 'output not rendered' } if output[:required]
  end
end
__update_sizing() click to toggle source
# File lib/job.rb, line 575
def __update_sizing
  # make sure visual outputs have dimensions, using input's for default
  in_dimensions = nil
  outputs.each do |output|
    next if AV::AUDIO_ONLY == output[:av]
    next if output[:dimensions]
    in_dimensions = __input_dimensions unless in_dimensions
    output[:dimensions] = in_dimensions
  end
end
audio_graphs() click to toggle source
# File lib/job.rb, line 40
def audio_graphs
  @cached_audio_graphs ||= Input.audio_graphs(inputs)
end
base_source() click to toggle source
# File lib/job.rb, line 43
def base_source
  _get(__method__)
end
base_source=(value) click to toggle source
Transfer

Resolves relative paths within MovieMasher::Asset#source and Media#sourceString values.

# File lib/job.rb, line 48
def base_source=(value)
  _set(__method__, value)
end
callbacks() click to toggle source
Array

Zero or more Callback objects.

# File lib/job.rb, line 52
def callbacks
  _get(__method__)
end
destination() click to toggle source
# File lib/job.rb, line 55
def destination
  _get(__method__)
end
destination=(value) click to toggle source
Destination

Shared by all Output objects that haven't one of their own.

# File lib/job.rb, line 59
def destination=(value)
  _set(__method__, value)
end
error() click to toggle source
# File lib/job.rb, line 62
def error
  _get(__method__)
end
error=(value) click to toggle source

Problem encountered during new or process. If the source of the problemis a command line application then lines from its output that includecommon phrases will be included. Problems encountered during rendering ofoptional outputs are not included - check log for a warning instead.

Returns String that could be multiline and/or quite long.

# File lib/job.rb, line 71
def error=(value)
  _set(__method__, value)
end
error?() click to toggle source
# File lib/job.rb, line 74
def error?
  preflight
  err = error
  err ||= (inputs.empty? ? 'no inputs specified' : nil)
  err ||= (outputs.empty? ? 'no outputs specified' : nil)
  err ||= callbacks.find(&:error?)
  err ||= inputs.find(&:error?)
  err ||= outputs.find(&:error?)
  found_destination = destination || outputs.any?(:destination)
  err ||= 'no destinations specified' unless found_destination
  err ||= destination.error? if destination
  err ||= base_source.error? if base_source
  err ||= module_source.error? if module_source
  self.error = err
end
id() click to toggle source
String

user supplied identifier.

Default

Nil, or messageId if the Job originated from an SQS message.

# File lib/job.rb, line 91
def id
  _get(__method__)
end
inputs() click to toggle source
Array

One or more Input objects.

# File lib/job.rb, line 115
def inputs
  _get(__method__)
end
log() click to toggle source
String

Current content of the job's log file.

# File lib/job.rb, line 119
def log
  proc = _get(__method__)
  proc.call
end
log_entry(type) { || ... } click to toggle source

Output to the job's log file. If type is :error then job will be haltedand its error will be set to the result of proc.

type

Symbol :debug, :info, :warn or :error.

proc

Proc returning a string representing log entry.

# File lib/job.rb, line 128
def log_entry(type, &proc)
  @hash[:error] = yield if :error == type
  logger_job = __logger
  if logger_job && logger_job.send(:"#{type.id2name}?")
    logger_job.send(type, &proc)
  end
  puts yield if 'debug' == MovieMasher.configuration[:verbose]
end
module_source() click to toggle source
# File lib/job.rb, line 140
def module_source
  _get(__method__)
end
module_source=(value) click to toggle source
Transfer

Resolves relative font paths within Media#source String values.

Default

base_source

# File lib/job.rb, line 145
def module_source=(value)
  _set(__method__, value)
end
output_path(output, no_trailing_slash = false) click to toggle source
# File lib/job.rb, line 148
def output_path(output, no_trailing_slash = false)
  path = Path.concat(__path_job, output.identifier)
  path = Path.add_slash_end(path) unless no_trailing_slash
  path
end
outputs() click to toggle source
Array

One or more Output objects.

# File lib/job.rb, line 137
def outputs
  _get(__method__)
end
outputs_desire() click to toggle source
# File lib/job.rb, line 153
def outputs_desire
  desired = nil
  outputs.each do |output|
    desired = AV.merge(output[:av], desired)
  end
  desired
end
preflight() click to toggle source
# File lib/job.rb, line 160
def preflight
  self.destination = Destination.create_if destination # must say self. here
  self.base_source = Transfer.create_if base_source
  self.module_source = Transfer.create_if module_source
  inputs.map! do |input|
    input = Input.create input
    input.preflight self
    input
  end
  outputs.map! do |output|
    output = Output.create output
    output.preflight self
    output
  end
  callbacks.map! do |callback|
    Callback.create callback
  end
end
process() click to toggle source

Downloads assets for each Input, renders each Output and uploads to#destination or MovieMasher::Output#destination so long as error is false.

Returns true if processing succeeded, otherwise false - check error fordetails.

# File lib/job.rb, line 183
def process
  rescued_exception = nil
  begin
    error?
    __init_progress unless error
    __process_download unless error
    __process_render unless error
    __process_upload unless error
  rescue => e
    rescued_exception = e
  end
  begin
    if error || rescued_exception
      # encountered a showstopper (not one raised from optional output)
      rescued_exception = __log_exception(rescued_exception)
      __callback(:error)
    end
  rescue => e
    rescued_exception = e
  end
  begin
    __log_exception(rescued_exception)
    __callback(:complete)
  rescue => e
    puts "CAUGHT #{e.is_a?(Error::Job)} #{e.message} #{e.backtrace}"
    # no point in logging after complete
  end
  !error
end
progress() click to toggle source

Current status of processing. The following keys are available:

:downloading

number of files referenced by inputs

:downloaded

number of input files transferred

:rendering

number of outputs to render

:rendered

number of outputs rendered

:uploading

number of files referenced by outputs

:uploaded

number of output files transferred

:calling

number of non-progress callbacks to trigger

:called

number of non-progress callbacks triggered

Initial values for keys ending on 'ing' are based on information suppliedin the job description and might change after process is called. Forinstance, if a mash input uses a remote source then downloading mightincrease once it's downloaded and parsed for nested media files.

Returns Hash object with Symbol keys and Integer values.

# File lib/job.rb, line 229
def progress
  _get(__method__)
end
render_path(output) click to toggle source
# File lib/job.rb, line 232
def render_path(output)
  path = ''
  if Type::SEQUENCE == output[:type]
    path = "/#{output[:name]}#{output[:sequence]}"
  end
  path = "#{path}.#{output[:extension]}"
  path = Evaluate.value(path, __scope(output))
  "#{output_path(output, true)}#{path}"
end
video_graphs() click to toggle source
# File lib/job.rb, line 241
def video_graphs
  @cached_video_graphs ||= Input.video_graphs(inputs, self)
end