pyspark.SparkContext.hadoopRDD

SparkContext.hadoopRDD(inputFormatClass: str, keyClass: str, valueClass: str, keyConverter: Optional[str] = None, valueConverter: Optional[str] = None, conf: Optional[Dict[str, str]] = None, batchSize: int = 0) → pyspark.rdd.RDD[Tuple[T, U]][source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for meth:SparkContext.sequenceFile.

New in version 1.1.0.

Parameters
inputFormatClassstr

fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClassstr

fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClassstr

fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverterstr, optional

fully qualified name of a function returning key WritableConverter

valueConverterstr, optional

fully qualified name of a function returning value WritableConverter

confdict, optional

Hadoop configuration, passed in as a dict

batchSizeint, optional, default 0

The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

Returns
RDD

RDD of tuples of key and corresponding value

Examples

>>> import os
>>> import tempfile

Set the related classes

>>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat"
>>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat"
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
...     path = os.path.join(d, "old_hadoop_file")
...
...     # Create the conf for writing
...     write_conf = {
...         "mapred.output.format.class": output_format_class,
...         "mapreduce.job.output.key.class": key_class,
...         "mapreduce.job.output.value.class": value_class,
...         "mapreduce.output.fileoutputformat.outputdir": path,
...     }
...
...     # Write a temporary Hadoop file
...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
...     rdd.saveAsHadoopDataset(conf=write_conf)
...
...     # Create the conf for reading
...     read_conf = {"mapreduce.input.fileinputformat.inputdir": path}
...
...     loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf)
...     collected = sorted(loaded.collect())
>>> collected
[(0, '1\t'), (0, '1\ta'), (0, '3\tx')]