Visit https://github.com/bwhite/hadoopy/ for the source.
Relevant blog posts
Hadoopy is a Python wrapper for Hadoop Streaming written in Cython. It is simple, fast, and readily hackable. It has been tested on 700+ node clusters. The goals of Hadoopy are
Killer Features (unique to Hadoopy)
Additional Features
The majority of the time spent by Hadoopy (and Dumbo) is in the TypedBytes conversion code. This is a simple binary serialization format that covers standard types with the ability to use Pickle for types not natively supported. We generate a large set of test vectors (using the tb_make_test.py script), that have primatives, containers, and a uniform mix (GrabBag). The idea is that by factoring out the types, we can easily see where optimization is needed. Each element is read from stdin, then written to stdout. Outside of the timing all of the values are compared to ensure that the final written values are the same. Four methods are compared: Hadoopy TypedBytes (speed_hadoopy.py), Hadoopy TypedBytes file interface (speed_hadoopyfp.py), TypedBytes (speed_tb.py), and cTypedBytes (speed_tbc.py). All columns are in seconds except for ratio. The ratio is min(TB, cTB) / HadoopyFP (e.g., 7 means HadoopyFP is 7 times faster).
| Filename | Hadoopy | HadoopyFP | TB | cTB | Ratio |
|---|---|---|---|---|---|
| double_100k.tb | 0.148790 | 0.119961 | 0.904720 | 0.993845 | 7.541784 |
| float_100k.tb | 0.145637 | 0.118920 | 0.883124 | 0.992447 | 7.426198 |
| gb_100k.tb | 4.638573 | 4.011934 | 25.577765 | 16.515563 | 4.116609 |
| bool_100k.tb | 0.171327 | 0.150975 | 0.942188 | 0.542741 | 3.594907 |
| dict_50.tb | 0.394323 | 0.364878 | 1.649921 | 1.225979 | 3.359970 |
| tuple_50.tb | 0.370037 | 0.413579 | 1.546317 | 1.241491 | 3.001823 |
| byte_100k.tb | 0.183307 | 0.164549 | 0.894184 | 0.487520 | 2.962767 |
| list_50.tb | 0.355870 | 0.370738 | 1.529233 | 1.092422 | 2.946614 |
| int_100k.tb | 0.234842 | 0.193235 | 0.922423 | 0.526160 | 2.722903 |
| long_100k.tb | 0.761289 | 0.640638 | 1.727951 | 1.957162 | 2.697234 |
| bytes_100_10k.tb | 0.069889 | 0.069375 | 0.147470 | 0.096838 | 1.395862 |
| string_100_10k.tb | 0.106642 | 0.104784 | 0.157907 | 0.106571 | 1.017054 |
| string_10k_10k.tb | 6.392013 | 6.527343 | 6.494607 | 6.949912 | 0.994985 |
| bytes_10k_10k.tb | 3.073718 | 3.123196 | 3.039668 | 3.100858 | 0.973256 |
| string_1k_10k.tb | 0.742198 | 0.719119 | 0.686382 | 0.676537 | 0.940786 |
| bytes_1k_10k.tb | 0.379785 | 0.370314 | 0.329728 | 0.339387 | 0.890401 |
| gb_single.tb | 0.045760 | 0.042701 | 0.038656 | 0.034925 | 0.817896 |
Python Source (fully documented version in wc.py)
"""Hadoopy Wordcount Demo"""
import hadoopy
def mapper(key, value):
for word in value.split():
yield word, 1
def reducer(key, values):
accum = 0
for count in values:
accum += int(count)
yield key, accum
if __name__ == "__main__":
hadoopy.run(mapper, reducer, doc=__doc__)
Command line test (run without args, it prints the docstring and quits because of doc=__doc__)
$ python wc.py
Hadoopy Wordcount Demo
Command line test (map)
$ echo "a b a a b c" | python wc.py map
a 1
b 1
a 1
a 1
b 1
c 1
Command line test (map/sort)
$ echo "a b a a b c" | python wc.py map | sort
a 1
a 1
a 1
b 1
b 1
c 1
Command line test (map/sort/reduce)
$ echo "a b a a b c" | python wc.py map | sort | python wc.py reduce
a 3
b 2
c 1
Here are a few test files
$ hadoop fs -ls playground/
Found 3 items
-rw-r--r-- 2 brandyn supergroup 259835 2011-01-17 18:56 /user/brandyn/playground/wc-input-alice.tb
-rw-r--r-- 2 brandyn supergroup 167529 2011-01-17 18:56 /user/brandyn/playground/wc-input-alice.txt
-rw-r--r-- 2 brandyn supergroup 60638 2011-01-17 18:56 /user/brandyn/playground/wc-input-alice.txt.gz
We can also do this in Python
>>> import hadoopy
>>> hadoopy.ls('playground/')
['/user/brandyn/playground/wc-input-alice.tb', '/user/brandyn/playground/wc-input-alice.txt', '/user/brandyn/playground/wc-input-alice.txt.gz']
Lets put wc-input-alice.txt through the word counter using Hadoop. Each node in the cluster has Hadoopy installed (later we will show that it isn’t necessary with launch_frozen). Note that it is using typedbytes, SequenceFiles, and the AutoInputFormat by default.
>>> out = hadoopy.launch('playground/wc-input-alice.txt', 'playground/out/', 'wc.py')
/\----------Hadoop Output----------/\
hadoopy: Running[hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2+737.jar -output playground/out/ -input playground/wc-input-alice.txt -mapper "python wc.py map" -reducer "python wc.py reduce" -file wc.py -jobconf mapred.job.name=python wc.py -io typedbytes -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat - inputformat AutoInputFormat]
11/01/17 20:22:31 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [wc.py, /var/lib/hadoop-0.20/cache/brandyn/hadoop-unjar464849802654976085/] [] /tmp/streamjob1822202887260165136.jar tmpDir=null
11/01/17 20:22:32 INFO mapred.FileInputFormat: Total input paths to process : 1
11/01/17 20:22:32 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/brandyn/mapred/local]
11/01/17 20:22:32 INFO streaming.StreamJob: Running job: job_201101141644_0723
11/01/17 20:22:32 INFO streaming.StreamJob: To kill this job, run:
11/01/17 20:22:32 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=node.com:8021 -kill job_201101141644_0723
11/01/17 20:22:32 INFO streaming.StreamJob: Tracking URL: http://node.com:50030/jobdetails.jsp?jobid=job_201101141644_0723
11/01/17 20:22:33 INFO streaming.StreamJob: map 0% reduce 0%
11/01/17 20:22:40 INFO streaming.StreamJob: map 50% reduce 0%
11/01/17 20:22:41 INFO streaming.StreamJob: map 100% reduce 0%
11/01/17 20:22:52 INFO streaming.StreamJob: map 100% reduce 100%
11/01/17 20:22:55 INFO streaming.StreamJob: Job complete: job_201101141644_0723
11/01/17 20:22:55 INFO streaming.StreamJob: Output: playground/out/
\/----------Hadoop Output----------\/
Return value is a dictionary of the command’s run, key/value iterator of the output (lazy evaluated), and other useful intermediate values.
Lets see what the output looks like.
>>> out = list(hadoopy.readtb('playground/out'))
>>> out[:10]
[('*', 60), ('-', 7), ('3', 2), ('4', 1), ('A', 8), ('I', 260), ('O', 1), ('a', 662), ('"I', 7), ("'A", 9)]
>>> out.sort(lambda x, y: cmp(x[1], y[1]))
>>> out[-10:]
[('was', 329), ('it', 356), ('in', 401), ('said', 416), ('she', 484), ('of', 596), ('a', 662), ('to', 773), ('and', 780), ('the', 1664)]
Note that the output is stored in SequenceFiles and each key/value is stored encoded as TypedBytes, by using readtb you don’t have to worry about any of that (if the output was compressed it would also be decompressed here). This can also be done inside of a job for getting additional side-data off of HDFS.
What if we don’t want to install python, numpy, scipy, or your-custom-code-that-always-changes on every node in the cluster? We have you covered there too. I’ll remove hadoopy from all nodes except for the one executing the job.
$ sudo rm -r /usr/local/lib/python2.7/dist-packages/hadoopy*
Now it’s gone
>>> import hadoopy
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named hadoopy
The rest of the nodes were cleaned up the same way. We modify the command, note that we now get the output from freeze at the top
>>> out = hadoopy.launch_frozen('playground/wc-input-alice.txt', 'playground/out_frozen/', 'wc.py')
/\----------Hadoop Output----------/\
hadoopy: Running[hadoop jar /hadoop-0.20.2+320/contrib/streaming/hadoop-streaming-0.20.2+320.jar -output playground/out_frozen/ -input playground/wc-input-alice.txt -mapper "_frozen/wc pipe map" -reducer "_frozen/wc pipe reduce" -jobconf "mapred.cache.archives=_hadoopy_temp/1310088192.511625/_frozen.tar#_frozen" -jobconf "mapreduce.job.cache.archives=_hadoopy_temp/1310088192.511625/_frozen.tar#_frozen" -jobconf mapred.job.name=wc -io typedbytes -outputformat org.apache.hadoop.mapred.SequenceFileOutputFormat -inputformat AutoInputFormat]
11/07/07 21:23:23 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [/tmp/hadoop/brandyn/hadoop-unjar12844/] [] /tmp/streamjob12845.jar tmpDir=null
11/07/07 21:23:24 INFO mapred.FileInputFormat: Total input paths to process : 1
11/07/07 21:23:24 INFO streaming.StreamJob: getLocalDirs(): [/scratch0/hadoop/mapred/local, /scratch1/hadoop/mapred/local, /scratch2/hadoop/mapred/local]
11/07/07 21:23:24 INFO streaming.StreamJob: Running job: job_201107051032_0215
11/07/07 21:23:24 INFO streaming.StreamJob: To kill this job, run:
11/07/07 21:23:24 INFO streaming.StreamJob: /hadoop-0.20.2+320/bin/hadoop job -Dmapred.job.tracker=node.com:8021 -kill job_201107051032_0215
11/07/07 21:23:24 INFO streaming.StreamJob: Tracking URL: http://node.com:50030/jobdetails.jsp?jobid=job_201107051032_0215
11/07/07 21:23:25 INFO streaming.StreamJob: map 0% reduce 0%
11/07/07 21:23:31 INFO streaming.StreamJob: map 100% reduce 0%
11/07/07 21:23:46 INFO streaming.StreamJob: map 100% reduce 100%
11/07/07 21:23:49 INFO streaming.StreamJob: Job complete: job_201107051032_0215
11/07/07 21:23:49 INFO streaming.StreamJob: Output: playground/out_frozen/
\/----------Hadoop Output----------\/
And lets check the output
>>> out = list(hadoopy.readtb('playground/out_frozen'))
>>> out[:10]
[('*', 60), ('-', 7), ('3', 2), ('4', 1), ('A', 8), ('I', 260), ('O', 1), ('a', 662), ('"I', 7), ("'A", 9)]
>>> out.sort(lambda x, y: cmp(x[1], y[1]))
>>> out[-10:]
[('was', 329), ('it', 356), ('in', 401), ('said', 416), ('she', 484), ('of', 596), ('a', 662), ('to', 773), ('and', 780), ('the', 1664)]
We can also generate a tar of the frozen script (useful when working with Oozie). Note the ‘wc’ is not wc.py, it is actually a self contained executable.
$ python wc.py freeze wc.tar
$ tar -tf wc.tar
_codecs_cn.so
readline.so
strop.so
cPickle.so
time.so
_collections.so
operator.so
zlib.so
_codecs_jp.so
grp.so
_codecs_kr.so
_codecs_hk.so
_functools.so
_json.so
math.so
libbz2.so.1.0
libutil.so.1
unicodedata.so
array.so
_bisect.so
libz.so.1
_typedbytes.so
_random.so
_main.so
cStringIO.so
_codecs_tw.so
libncurses.so.5
datetime.so
_struct.so
_weakref.so
fcntl.so
_heapq.so
wc
_io.so
select.so
_codecs_iso2022.so
_locale.so
itertools.so
binascii.so
bz2.so
libpython2.7.so.1.0
_multibytecodec.so
Lets open it up and try it out
$ tar -xf wc.py
$ ./wc
Hadoopy Wordcount Demo
$ python wc.py
Hadoopy Wordcount Demo
$ hexdump -C wc | head -n5
00000000 7f 45 4c 46 02 01 01 00 00 00 00 00 00 00 00 00 |.ELF............|
00000010 02 00 3e 00 01 00 00 00 80 41 40 00 00 00 00 00 |..>......A@.....|
00000020 40 00 00 00 00 00 00 00 50 04 16 00 00 00 00 00 |@.......P.......|
00000030 00 00 00 00 40 00 38 00 09 00 40 00 1d 00 1c 00 |....@.8...@.....|
00000040 06 00 00 00 05 00 00 00 40 00 00 00 00 00 00 00 |........@.......|
You can determine if a job provides map/reduce/combine functionality and get its documention by using ‘info’. This is also used internally by Hadoopy to automatically enable/disable the reducer/combiner. The task values are set when the corresponding slots in hadoopy.run are filled.
>>> python wc.py info
{"doc": "Hadoopy Wordcount Demo", "tasks": ["map", "reduce"]}
That’s a quick tour of Hadoopy.
Hadoop streaming implements the standard Mapper/Reducer classes and simply opens 3 pipes to a streaming program (stdout, stderr, and stdin). The first issue is how is data encoded? The standard is to separate keys and values with a tab and each key/value pair with a newline; however, this is really a bad way to have to work as you have to ensure that your output never contains tabs or newlines. Moreover, serializing everything to an escaped string is inefficient and tends to hurt interoperability of jobs as everyone has their own solution to encoding. The solution (part of CDH2+) is to use TypedBytes which is an encoding format for basic types (int, float, dictionary, list, string, etc.) which is fast, standardized, and simple. Hadoopy has its own implementation and it is particularly fast.
TypedBytes doesn’t solve the issue of client code outputting to stdout, it actually makes it worse as the resulting output is interpreted as TypedBytes which can have very complex effects. Most Hadoop streaming programs have to meticulously avoid printing to stdout as it will interfere with the connection to Hadoop streaming. Hadoopy uses a technique I refer to as ‘pipe hopping’ where a launcher remaps the stdin/stdout of the client program to be null and stderr respectively, and communication happens over file descriptors which correspond to the true stdout/stdin that Hadoop streaming communicates with. This is transparent to the user but the end result is more useful error messages when exceptions are thrown (as opposed to generic Java errors) and the ability to use print statements like normal. This is a general solution to the problem and if other library writers (for python or other languages) would like a minimum working example of this technique I have one available.
This technique is on by default and can be disabled by passing pipe=False to the launch command of your choice.
Once you get past the wordcount examples and you have a few scripts you use regularly, the next level of complexity is managing a workflow of jobs. The simplest way of doing this is to put a few sequential launch statements in a python script and run it. This is fine for simple workflows but you miss out on two abilities: re-execution of previous workflows by re-using outputs (e.g., when tweaking one job in a flow) and parallel execution of jobs in a flow. I’ve had some fairly complex flows and previously the best solution I could find was using Oozie with a several thousand line XML file. Once setup, this ran jobs in parallel and re-execute the workflow by skipping previous nodes; however, it is another server you have to setup and making that XML file takes a lot of the fun out of using Python in the first place (it could be more code than your actual task). While Hadoopy is fully compatible with Oozie, it certainly seems lacking for the kind of short turn-around scripts most users want to make.
In solving this problem, our goal was to avoid specifying the dependencies (often as a DAG) as they are inherent in the code itself. Hadoopy Flow solves both of these problems by keeping track of all HDFS outputs your program intends to create and following your program order. By doing this, if we see a ‘launch’ command we run it in a ‘greenlet’, note the output path of the job, and continue with the rest of the program. If none of the job’s inputs depend on any outputs that are pending (i.e., outputs that will materialize from previous jobs/hdfs commands) then we can safely start the job. This is entirely safe because if the program worked before Hadoopy Flow, then it will work now as those inputs must exist as nothing prior to the job could have created it. When a job completes, we notify dependent jobs/hdfs commands and if all of their inputs are available they are executed. The same goes for HDFS commands such as readtb and writetb (most but not all HDFS commands are supported, see Hadoopy Flow for more info). If you try to read from a file that another job will eventually output to but it hasn’t finished yet, then the execution will block at that point until the necessary data is available.
So it sounds pretty magical, but it wouldn’t be worth it if you have to rewrite all of your code. To use Hadoopy Flow, all that you have to do is add ‘import hadoopy_flow’ before you import Hadoopy, and it will automatically parallelize your code. It monkey patches Hadoopy (i.e., wraps the calls at run time) and the rest of your code can be unmodified. All of the code is just a few hundred lines in one file, if you are familiar with greenlets then it might take you 10 minutes to fully understand it (which I recommend if you are going to use it regularly).
Re-execution is another important feature that Hadoopy Flow addresses and it does so trivially. If after importing Hadoopy Flow you use ‘hadoopy_flow.USE_EXISTING = True’, then when paths already exist we simply skip the task/command that would have output to them. This is useful if you run a workflow, a job crashes, fix the bug, delete the bad job’s output, and re-run the workflow. All previous jobs will be skipped and jobs that don’t have their outputs on HDFS are executed like normal. This simple addition makes iterative development using Hadoop a lot more fun and effective as tweaks generally happen at the end of the workflow and you can easily waste hours recomputing results or hacking your workflow apart to short circuit it.
Run Hadoop given the parameters
| Parameters: |
|
|---|---|
| Return type: | Dictionary with some of the following entries (depending on options) |
| Returns: | freeze_cmds: Freeze command(s) ran |
| Returns: | frozen_tar_path: HDFS path to frozen file |
| Returns: | hadoop_cmds: Hadoopy command(s) ran |
| Returns: | process: subprocess.Popen object |
| Returns: | output: Iterator of (key, value) pairs |
| Raises : | subprocess.CalledProcessError: Hadoop error. |
| Raises : | OSError: Hadoop streaming not found. |
| Raises : | TypeError: Input types are not correct. |
Freezes a script and then launches it.
This function will freeze your python program, and place it on HDFS in ‘temp_path’. It will not remove it afterwards as they are typically small, you can easily reuse/debug them, and to avoid any risks involved with removing the file.
| Parameters: |
|
|---|---|
| Return type: | Dictionary with some of the following entries (depending on options) |
| Returns: | freeze_cmds: Freeze command(s) ran |
| Returns: | frozen_tar_path: HDFS path to frozen file |
| Returns: | hadoop_cmds: Hadoopy command(s) ran |
| Returns: | process: subprocess.Popen object |
| Returns: | output: Iterator of (key, value) pairs |
| Raises : | subprocess.CalledProcessError: Hadoop error. |
| Raises : | OSError: Hadoop streaming not found. |
| Raises : | TypeError: Input types are not correct. |
A simple local emulation of hadoop
This doesn’t run hadoop and it doesn’t support many advanced features, it is intended for simple debugging. The input/output uses HDFS if an HDFS path is given. This allows for small tasks to be run locally (primarily while debugging). A temporary working directory is used and removed.
Support
| Parameters: |
|
|---|---|
| Return type: | Dictionary with some of the following entries (depending on options) |
| Returns: | freeze_cmds: Freeze command(s) ran |
| Returns: | frozen_tar_path: HDFS path to frozen file |
| Returns: | hadoop_cmds: Hadoopy command(s) ran |
| Returns: | process: subprocess.Popen object |
| Returns: | output: Iterator of (key, value) pairs |
| Raises : | subprocess.CalledProcessError: Hadoop error. |
| Raises : | OSError: Hadoop streaming not found. |
| Raises : | TypeError: Input types are not correct. |
Hadoopy entrance function
This is to be called in all Hadoopy job’s. Handles arguments passed in, calls the provided functions with input, and stores the output.
TypedBytes are used if the following is True os.environ[‘stream_map_input’] == ‘typedbytes’
It is highly recommended that TypedBytes be used for all non-trivial tasks. Keep in mind that the semantics of what you can safely emit from your functions is limited when using Text (i.e., no t or n). You can use the base64 module to ensure that your output is clean.
If the HADOOPY_CHDIR environmental variable is set, this will immediately change the working directory to the one specified. This is useful if your data is provided in an archive but your program assumes it is in that directory.
As hadoop streaming relies on stdin/stdout/stderr for communication, anything that outputs on them in an unexpected way (especially stdout) will break the pipe on the Java side and can potentially cause data errors. To fix this problem, hadoopy allows file descriptors (integers) to be provided to each task. These will be used instead of stdin/stdout by hadoopy. This is designed to combine with the ‘pipe’ command.
To use the pipe functionality, instead of using your_script.py map use your_script.py pipe map which will call the script as a subprocess and use the read_fd/write_fd command line arguments for communication. This isolates your script and eliminates the largest source of errors when using hadoop streaming.
The pipe functionality has the following semantics stdin: Always an empty file stdout: Redirected to stderr (which is visible in the hadoop log) stderr: Kept as stderr read_fd: File descriptor that points to the true stdin write_fd: File descriptor that points to the true stdout
| Parameters: |
|
|---|
Output a status message that is displayed in the Hadoop web interface
The status message will replace any other, if you want to append you must do this yourself.
| Parameters: |
|
|---|
Output a counter update that is displayed in the Hadoop web interface
Counters are useful for quickly identifying the number of times an error occurred, current progress, or coarse statistics.
| Parameters: |
|
|---|
Read typedbytes sequence files on HDFS (with optional compression).
By default, ignores files who’s names start with an underscore ‘_’ as they are log files. This allows you to cat a directory that may be a variety of outputs from hadoop (e.g., _SUCCESS, _logs). This works on directories and files. The KV pairs may be interleaved between files (they are read in parallel).
| Parameters: |
|
|---|---|
| Returns: | An iterator of key, value pairs. |
| Raises : | IOError: An error occurred reading the directory (e.g., not available). |
Write typedbytes sequence file to HDFS given an iterator of KeyValue pairs
| Parameters: |
|
|---|---|
| Raises : | IOError: An error occurred while saving the data. |
Return the absolute path to a file and canonicalize it
Path is returned without a trailing slash and without redundant slashes. Caches the user’s home directory.
| Parameters: | path – A string for the path. This should not have any wildcards. |
|---|
:returns Absolute path to the file :raises IOError: If unsuccessful
List files on HDFS.
| Parameters: | path – A string (potentially with wildcards). |
|---|---|
| Return type: | A list of strings representing HDFS paths. |
| Raises : | IOError: An error occurred listing the directory (e.g., not available). |
Get a file from hdfs
| Parameters: |
|
|---|---|
| Raises : | IOError: If unsuccessful |
Put a file on hdfs
| Parameters: |
|
|---|---|
| Raises : | IOError: If unsuccessful |
Remove a file if it exists (recursive)
| Parameters: | path – A string (potentially with wildcards). |
|---|---|
| Raises IOError: | If unsuccessful |
Check if a path has zero length (also true if it’s a directory)
| Parameters: | path – A string for the path. This should not have any wildcards. |
|---|---|
| Returns: | True if the path has zero length, False otherwise. |
Check if a path is a directory
| Parameters: | path – A string for the path. This should not have any wildcards. |
|---|---|
| Returns: | True if the path is a directory, False otherwise. |
Check if a file exists.
| Parameters: | path – A string for the path. This should not have any wildcards. |
|---|---|
| Returns: | True if the path exists, False otherwise. |
Given KeyValue pairs, sort, then group
| Parameters: |
|
|---|---|
| Returns: | List of KeyValue pairs from the mapper |
Given KeyValue pairs, sort, then group
| Parameters: |
|
|---|---|
| Returns: | List of KeyValue pairs from the reducer |
Group sorted KeyValue pairs
| Parameters: | kv – Iterator of KeyValue pairs |
|---|---|
| Returns: | Grouped KeyValue pairs in sorted order |
Given KeyValue pairs, sort, then group
| Parameters: | kv – Iterator of KeyValue pairs |
|---|---|
| Returns: | Grouped KeyValue pairs in sorted order |
Perform a stable sort on KeyValue pair keys
| Parameters: | kv – Iterator of KeyValue pairs |
|---|---|
| Returns: | Grouped KeyValue pairs in sorted order |
TypedBytes interface
| Parameters: |
|
|---|
x.next() -> the next value, or raise StopIteration