Friday, January 14, 2011

Parallel Programming with MapReduce

MapReduce Overview

MapReduce is a framework designed by Google [1]. It is loosely based on the Map and Reduce programming constructs of functional languages like Lisp [2]. Google’s MapReduce is used in a distributed computing, scalable platform. It was designed for data-intensive applications which need to process huge amounts of data.
Various open source implementations exist. The most popular one is Apache’s Hadoop [3]. Hadoop is a scalable distributed computing platform that includes a file system (HDFS) to store massive data, and a Java MapReduce implementation to process that data.
Other MapReduce implementations exist. QT Concurrent has a MapReduce implementation for multi-core processors.
This module provides an introduction to parallel programming with MapReduce, using QT Concurrent.

Programming with MapReduce

When we decide to solve a programming problem using MapReduce, we provide an implementation for a mapper and for a reducer. N mappers are executed simultaneously, executing the same task for different input data. One or more reducers receive the output generated by the mappers, and apply a reducing phase yielding a final result.
A simple program to count word occurrences in a text corpus is described in [4]:

   map(String input_key, String input_value):
       // input_key: document name 
       // input_value: document contents 
       for each word w in input_value: 
           EmitIntermediate(w, "1"); 

    reduce(String output_key, Iterator intermediate_values): 
       // output_key: a word 
       // output_values: a list of counts 
       int result = 0; 
    
       for each v in intermediate_values: 
          result += ParseInt(v);
       Emit(AsString(result));
 
MapReduce pseudocode that counts word occurrences in a text corpus.
As shown in the code above, a map function receives an input key and an input value (<key, value>), and generates one or more intermediate output <key,value(s)> pairs. A reduce function receives intermediate keys and values that were the ouput by the mappers, processes them in some way, and generates one or more final key-value(s) pairs (<key,value(s)).
You can find a Hadoop implementation of the word count problem at [5], and a QT concurrent implementation in the QT Concurrent package (path: qtconcurrent/examples/wordcount) which can be checked out with subversion.

MapReduce in QT Concurrent

QT Concurrent [6] is a C++ library for multi-threaded applications. Among other things, it provides a MapReduce implementation for multi-core computers. The map function is called in parallel by multiple threads. The number of threads used in a program depends on the number of processor cores available.
Google’s original MapReduce runs in computer clusters. The data it processes is stored in a distributed file system called GFS (Google File System). To minimize I/O bottlenecks, mappers are usually executed in the same node where the data resides. Parallelism comes from multiple computers executing mappers (or reducers) at the same time.
On the other hand, QT Concurrent’s MapReduce implementation works on shared-memory systems. Parallelism comes from multiple threads executing mappers at the same time, on multiple processor cores.
To work with QT Concurrent’s MapReduce, you write a map function and a reduce function. You must also indicate the list of values (e.g., names of files, words, numbers, etc.) that you want to feed your mappers. To do this, use the following API:

QFuture<T> QtConcurrent::mappedReduced ( 
    const Sequence & sequence, 
    MapFunction mapFunction, 
    ReduceFunction reduceFunction, 
    QtConcurrent::ReduceOptions reduceOptions = UnorderedReduce | SequentialReduce )
Structure QTConcurrent::mappedReduced function
For example:

QFuture<T> mappedReduced(theList, mapFunction, reduceFunction);
QFuture::mappedReduced function

Example: Determining if a (Big) Integer is a Probable Prime

An algorithm for finding a probable prime can be defined using the Miller-Rabin primality test, as follows:
    
    let n be a very big odd number
    check n for divisibility for all primes < 2000
    choose m positive integers less than n
    for each of these bases apply the Miller-Rabin test

    
The algorithm described above can be easily parallelized using MapReduce as follows:
  1. Step 1. Randomly generate an odd (big) integer n greater than 2.
  2. Step 2. Generate (or read a list of) all primes less than 2000.
  3. Step 3. MapReduce-Part1:
    • Map function: Each mapper receives a distinct prime number pi, and checks if n is divisible by this number. A mapper emits (outputs) a 1 if n is divisible by pi or 0 otherwise.
    • Reduce function: Counts the number of divisors. If the number of divisors is greater than zero, it is not a prime number, so the test is terminated.
  4. Step 4. Generate 100 random positive integers less than n.
  5. Step 5. MapReduce-Part2:
    • Map function: Each mapper receives a different random number ri and applies the Miller-Rabin primality test. A mapper emits (outputs) a 1 if n does not pass the test, or a 0 if it passes the test.
    • Reduce function: Counts the number of 1s emitted by the mappers. If the number of divisors is zero, n is a probable prime.
The next section shows a MapReduce implementation of the algorithm described above, using QT Concurrent. You can find a Hadoop implementation here.

A MapReduce Implementation Using QT Concurrent

The main program needs to call the mappedReduced function twice (steps 3 and 5 in the algorithm described earlier):

    qDebug() << "\nJob: Divisibility for primes less than 2000";
    qDebug() << "Starting Job";
    
    time.start();
    Counting final = mappedReduced(primes,mapperLess2000,reducerLess2000);
    mapReduce2000Time = time.elapsed();
    qDebug() << "End Job";
    qDebug() << "MapReduce elapsed time: " << (mapReduce2000Time) << "msegs\n";
    
    //Counter of exacts divisors equals to zero
    if(final[exactDiv] == 0) {

       de = Decomposition<2048>(number - 1);

       qDebug() << "Job: Divisibility for random numbers less than number to evaluate";
       qDebug() << "Starting Job";
       time.start();
       Counting finalRand = mappedReduced(randomPrimes,mapperRandom,reducerRandom);
       mapReduceRandomTime = time.elapsed();
       qDebug() << "End Job";
       qDebug() << "MapReduce elapsed time: " << (mapReduceRandomTime) << "msegs\n";
    
       //Counter of random numbers
       if(finalRand[falseMiller] > 0) {
          qDebug() << "Result: Non-prime";
       } else
          qDebug() << "Result: Probably prime";
    } else qDebug() << "Result: Non-prime";
    
Main program
The Decomposition function used in the main program because number decomposition is needed before the Miller-Rabin test is performed. This means that the number minus one is represented as product of 2k and m, where m is odd.
Two MapReduce processes are implemented. The first MapReduce process, evaluates divisibility for all primes less than 2000. Each map function receives a prime as argument and the reduce function counts the number of divisors, as shown below:

    Counting mapperLess2000(const unsigned long &prime) {
       Counting rMap; 
       BigInt2048 myPrime(prime);
    
       if( number % myPrime == 0 )
          divisors << prime; 
    
       rMap[( number % myPrime == 0 )?exactDiv:inexactDiv];
       return rMap;
    }
    
    void reducerLess2000(Counting &result, const Counting &w) {
        QMapIterator<QString, int> i(w); 
    
        while(i.hasNext()) {
           i.next();
           result[i.key()]++;
        }
    }
MapReduce code to test for primality
The second MapReduce process is executed only if the random number was not divisible by any of the prime numbers less than 2000:

    Counting mapperRandom(const BigInt2048 &prime) {
        
        Counting rMap;
        BigInt2048 z( getZ(prime,de.getM()).bits );
    
        // This is the easy case; the first term in the sequence
        // is correct, so we pass the test.
    
        if(z == 1) {

           rMap[trueMiller];
           divisors << z;
           return rMap;

        } else {
    
           for (int j = 0; j < de.getK(); j++) {
               BigInt2048 zSquared = BigInt2048(newZ(z).bits);
           
               if (zSquared == 1 && z == (number - 1) ) {
    
                  // We've passed the hard version of the Rabin Miller test.
                  rMap[trueMiller];
                  divisors << z;
                  return rMap;

               }
           
               z = zSquared;
           }
        
           rMap[falseMiller];
           return rMap;
        }
    }
    
    
    void reducerRandom(Counting &result, const Counting &w) {

        QMapIterator<QString, int> i(w);
        while(i.hasNext()) {
            i.next();
            result[i.key()]++;
        }
    }
    
MapReduce code that applies the Miller-Rabin test
Figure 1 shows the results of one run of the program.
Figure 1: Prime Validator program
Figure 1 (graphics1.png)
You can download the complete source of this example here.

References

1 comment:

  1. Your Affiliate Money Printing Machine is ready -

    Plus, getting it running is as simple as 1---2---3!

    This is how it works...

    STEP 1. Tell the system which affiliate products the system will push
    STEP 2. Add some PUSH BUTTON TRAFFIC (this ONLY takes 2 minutes)
    STEP 3. Watch the affiliate system explode your list and sell your affiliate products on it's own!

    Are you ready to make money automatically?

    Get the full details here

    ReplyDelete

Related Posts Plugin for WordPress, Blogger...

java

Popular java Topics