Source code for hadoopy._job_cli

import sys
import hadoopy
import argparse
import os
import json
import subprocess
import inspect
import logging
from _hadoopy_main import HadoopyTask

def run_freeze(tar_path, Z):

def _get_execution_name():
    """Return a command that can be called to run the script"""
    return sys.argv[0]

def run_pipe(command):
    cmd = '%s %s %d %d' % (_get_execution_name(),
    slave_stdin = open('/dev/null', 'r')
    slave_stdout = os.fdopen(os.dup(sys.stderr.fileno()), 'w')
    retcode = 0
        retcode =, stdout=slave_stdout, stdin=slave_stdin)
    except OSError:  # If we can't find the file, check the local dir
        retcode ='./' + cmd).split(), stdout=slave_stdout, stdin=slave_stdin)
    if retcode:

def run_info(mapper, reducer, combiner, jobconfs, kw):
    tasks = []
    if mapper:
    if reducer:
    if combiner:
    info = dict(kw)
    info['tasks'] = tasks
    if jobconfs:
        info['jobconfs'] = jobconfs

def change_dir():
    """Change the local directory if the HADOOPY_CHDIR environmental variable is provided"""
        d = os.environ['HADOOPY_CHDIR']
        sys.stderr.write('HADOOPY: Trying to chdir to [%s]\n' % d)
    except KeyError:
        except OSError:
            sys.stderr.write('HADOOPY: Failed to chdir to [%s]\n' % d)

def run_task(mapper, reducer, combiner, command, read_fd=None, write_fd=None):
    return HadoopyTask(mapper, reducer, combiner, command, read_fd, write_fd).run()

def disable_stdout_buffering():
    """This turns off stdout buffering so that outputs are immediately
    materialized and log messages show up before the program exits"""
    stdout_orig = sys.stdout
    sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
    # NOTE(brandyn): This removes the original stdout
    return stdout_orig

[docs]def run(mapper=None, reducer=None, combiner=None, script_path=None, jobconfs=(), **kw): """Hadoopy entrance function This is to be called in all Hadoopy job's. Handles arguments passed in, calls the provided functions with input, and stores the output. TypedBytes are used if the following is True os.environ['stream_map_input'] == 'typedbytes' It is *highly* recommended that TypedBytes be used for all non-trivial tasks. Keep in mind that the semantics of what you can safely emit from your functions is limited when using Text (i.e., no \\t or \\n). You can use the base64 module to ensure that your output is clean. If the HADOOPY_CHDIR environmental variable is set, this will immediately change the working directory to the one specified. This is useful if your data is provided in an archive but your program assumes it is in that directory. As hadoop streaming relies on stdin/stdout/stderr for communication, anything that outputs on them in an unexpected way (especially stdout) will break the pipe on the Java side and can potentially cause data errors. To fix this problem, hadoopy allows file descriptors (integers) to be provided to each task. These will be used instead of stdin/stdout by hadoopy. This is designed to combine with the 'pipe' command. To use the pipe functionality, instead of using ` map` use ` pipe map` which will call the script as a subprocess and use the read_fd/write_fd command line arguments for communication. This isolates your script and eliminates the largest source of errors when using hadoop streaming. The pipe functionality has the following semantics stdin: Always an empty file stdout: Redirected to stderr (which is visible in the hadoop log) stderr: Kept as stderr read_fd: File descriptor that points to the true stdin write_fd: File descriptor that points to the true stdout | **Command Interface** | The command line switches added to your script (e.g., are python *map* (read_fd) (write_fd) Use the provided mapper, optional read_fd/write_fd. python *reduce* (read_fd) (write_fd) Use the provided reducer, optional read_fd/write_fd. python *combine* (read_fd) (write_fd) Use the provided combiner, optional read_fd/write_fd. python *freeze* <tar_path> <-Z add_file0 -Z add_file1...> Freeze the script to a tar file specified by <tar_path>. The extension may be .tar or .tar.gz. All files are placed in the root of the tar. Files specified with -Z will be added to the tar root. python info Prints a json object containing 'tasks' which is a list of tasks which can include 'map', 'combine', and 'reduce'. Also contains 'doc' which is the provided documentation through the doc argument to the run function. The tasks correspond to provided inputs to the run function. | **Specification of mapper/reducer/combiner** | Input Key/Value Types | For TypedBytes/SequenceFileInputFormat, the Key/Value are the decoded TypedBytes | For TextInputFormat, the Key is a byte offset (int) and the Value is a line without the newline (string) | | Output Key/Value Types | For TypedBytes, anything Pickle-able can be used | For Text, types are converted to string. Note that neither may contain \\t or \\n as these are used in the encoding. Output is key\\tvalue\\n | | Expected arguments | mapper(key, value) or, value) | reducer(key, values) or reducer.reduce(key, values) | combiner(key, values) or combiner.reduce(key, values) | | Optional methods | func.configure(): Called before any input read. Returns None. | func.close(): Called after all input read. Returns None or Iterator of (key, value) | | Expected return | None or Iterator of (key, value) :param mapper: Function or class following the above spec :param reducer: Function or class following the above spec :param combiner: Function or class following the above spec :param doc: If specified, on error print this and call sys.exit(1) """ if script_path is None: script_path = inspect.getfile(inspect.currentframe().f_back) script_path = os.path.abspath(script_path) parser = argparse.ArgumentParser() parser.add_argument('--log', help='Default log level to use', choices=('debug', 'info', 'warning', 'error', 'critical'), default='info') subparsers = parser.add_subparsers(help='Job Commands (additional help available inside each)') parser_freeze = subparsers.add_parser('freeze', help='Freeze the script to a tar file.') parser_freeze.add_argument('tar_path', help='Path to .tar or .tar.gz file.') parser_freeze.add_argument('-Z', help='Path to a file to be placed in the tar root (may be repeated for many files).', action='append') parser_freeze.set_defaults(func=run_freeze) parser_info = subparsers.add_parser('info', help='Display job info as JSON') parser_info.set_defaults(func=lambda : run_info(mapper, reducer, combiner, jobconfs, kw)) parser_launch_frozen = subparsers.add_parser('launch_frozen', help='Run Hadoop job (freezes script)') parser_launch_frozen.add_argument('in_name', help='Input HDFS path') parser_launch_frozen.add_argument('out_name', help='Output HDFS path') parser_launch_frozen.add_argument('-jobconf', help='Jobconf', action='append', dest='jobconfs', default=[]) parser_launch_frozen.set_defaults(func=lambda *args, **kw: hadoopy.launch_frozen(*args, script_path=script_path, **kw)) parser_pipe = subparsers.add_parser('pipe', help='Internal: Run map/combine/reduce task using "pipe hopping" to make stdout redirect to stderr.') parser_pipe.add_argument('command', help='Command to run', choices=('map', 'reduce', 'combine')) parser_pipe.set_defaults(func=run_pipe) parser_map = subparsers.add_parser('map', help='Internal: Run map task.') parser_map.add_argument('read_fd', type=int, help='Read file descriptor', nargs='?') parser_map.add_argument('write_fd', type=int, help='Write file descriptor', nargs='?') parser_map.set_defaults(func=lambda **y: run_task(mapper, reducer, combiner, command='map', **y)) parser_combine = subparsers.add_parser('combine', help='Internal: Run combine task.') parser_combine.add_argument('read_fd', type=int, help='Read file descriptor', nargs='?') parser_combine.add_argument('write_fd', type=int, help='Write file descriptor', nargs='?') parser_combine.set_defaults(func=lambda **y: run_task(mapper, reducer, combiner, command='combine', **y)) parser_reduce = subparsers.add_parser('reduce', help='Internal: Run reduce task.') parser_reduce.add_argument('read_fd', type=int, help='Read file descriptor', nargs='?') parser_reduce.add_argument('write_fd', type=int, help='Write file descriptor', nargs='?') parser_reduce.set_defaults(func=lambda **y: run_task(mapper, reducer, combiner, command='reduce', **y)) args = vars(parser.parse_args()) # Handle logging arguments if 'log' in args: numeric_level = getattr(logging, args['log'].upper(), None) logging.basicConfig(level=numeric_level) del args['log'] # Call function with all arguments except for itself func = args['func'] del args['func'] func(**args)
def job_cli(): pass