Assignments‎ > ‎

Parallel Word Count

Due Wed, Monday Sept 17th, 2012

This project tests your understanding of concurrent programming and requires you to use several classes from the java.util.concurrent package. Your program must operate in a thread safe fashion with multiple threads running and all without using the Java synchronized keyword! All code must be lock free.

Your goal is to create a histogram containing the word counts of all words in a large (1.3M) set of Google ads scraped from the web. The single threaded version on my machine takes about 33 seconds to complete the task and my parallel version drops down to 13 seconds with 6 worker threads on an 8 core box. There are several optimizations I can think of and you should try to beat the ratio of my drop in processing time (~2.4). 

Here are the results of my word count. Your project should  get the exact same numbers, given that you must follow my word extraction code below that converts lines of text into clean words separated by spaces. If we all break up the data in the same way, we should all get the same results.

325035: [free]
236081: [for]
199439: [your]
194759: [and]
189182: [the]
184710: [to]
175371: [in]
160382: [a]
135333: [online]
132530: [find]
130750: [now]
129104: [on]
124403: [of]
109070: [get]
108666: [at]
84202: [with]
81835: [more]
75885: [you]
74396: [save]
72614: [from]
67959: [prices]
67626: [de]
66770: [great]
65504: [today]
60024: [new]
53529: [all]
49255: [download]
46605: [no]
44296: [search]
42577: [our]
42043: [compare]
41277: [up]
38341: [buy]
37908: [top]
37092: [site]
36241: [shipping]
35251: [learn]
34122: [hotel]
33090: [shop]
...

The data file is /home/public/cs680/ads.tsv.gz (which you must unzip into ads.tsv on your machines to process it). The expected is in /home/public/cs680/wc-output down to 2000 count.  The counts with multiple words might not be in that order.

Strategy

You will use a producer-consumer model whereby one thread, the producer, reads data from the input file and fills up a work queue, a BlockingDeque<String>.  Then, you create multiple worker threads, the consumers, that pull data from that work queue. These workers break up the line into words and then update a SHARED ConcurrentMap<String, AtomicInteger> to keep track of the word counts ``globally.'' The number of worker threads to create is passed as the 1st argument to the main program (see below).

Before sorting the words by word count, to create a histogram, your main thread must know when the producer and consumer threads are done. That means you need to use a CyclicBarrier to synchronize all those threads. Once all of the worker threads have processed all of the input from the work queue, then all concerns of multi threading safety are over. We can simply walk that map to sort the words and then print out the histogram. Have the main thread do the sorting and printing.

In my code, I created 3 classes: ThreadedWordCount, Loader, Worker. The main program creates and launches a loader and then N workers. (N is specified on the command line.) The workers share references to all of the key concurrent objects: the work queue (via a loader reference), the barrier, the global word count map, and the maximum count. Try to avoid a bunch of global static variables as it is poor software hygiene.

You must come up with a scheme to notify the workers when the loader has read all of the data from the input file. You must be careful that you do not interrupt a worker thread that is finishing its last line. In other words, the loader cannot simply wack all of the worker threads. At the same time, the worker threads cannot sit there and wait forever looking for more input in the work queue.  My worker termination condition is basically ``work queue is empty and the loader reports done.''

The overall structure of this program is fairly straightforward, but the details are tough to get right. You must avoid deadlock but also provide thread safety (and without using synchronization). For example, here's a hint: use pollFirst() not takeFirst() when pulling from the work queue.

Try to make your code as modular and clean as possible. It makes a big difference when trying to look for potential thread safety issues. Be advised that this took me a while to get right in the details. The symptoms of incorrect concurrency are deadlock or incorrect word counts.

Requirements

You must launch 1 or more threads to process the text file (+1 loader thread). You do not have to process it line by line, but you must follow the word normalization procedure I specify. Here is my single threaded code that normalizes lines into simple words separated by spaces; it strips punctuation, digits, and so on. You must follow the same algorithm for your parallel version.

public static void updateCounts(Map<String, Integer> wc, String line) {
    String[] cols = line.split("\\s+");
    for (String c : cols) {
        Integer n = wc.get(c);
        if ( n!=null ) wc.put(c, n+1);
        else wc.put(c, 1);
    }
}

private String clean(String line) {
    // get first two columns (subject, ad), ignoring URLs
    String[] cols = line.split("\t");
    line = cols[0]+" "+cols[1];
    line = line.toLowerCase();
    line = line.replaceAll("\"", "");
    line = line.replaceAll("'s", "");
    // collapse any whitespace to single ' ' 
    line = line.replaceAll("[\\p{Space}]+", " "); 
    line = line.trim();

    StringBuilder cleaned = new StringBuilder(line.length());
    for(int i = 0; i < line.length(); i++) {
        char c = line.charAt(i);
        if( Character.isSpaceChar((int) c) || Character.isLetter((int)c) ) {  
            // Preserve ONLY ascii characters
            if( c <= 127 ) {
                cleaned.append(c);
            }       
        }       
    }       
    line = cleaned.toString();
    line = line.trim();
        
    if( line.length() == 0 ) {
        return null;
    }       
    return line;
}       

You must use a pigeonhole sort to create the histogram at the end, just for fun (and because it's easier to code than sorting a map in Java).  The pigeonhole sort is a kind of degenerate bucket sort and both are awesome sorting mechanisms to know about, as it will impress your family and friends with O(n) sorting time vs O(n log n) theoretical minimum. (O(n log n) only applies to sorts based upon comparison of elements.)

To perform a pigeonhole sort, you must know what the maximum value is. In this case, that means that during your parallel word count, you must track the maximum word count so that you can create the appropriately sized array for the sort. That means using an AtomicInteger to track the maximum word count.

Once you have the histogram in memory, you must print the ``pigeon holes'' in reverse order as you see above in the project description.

You must create a class called ThreadedWordCount that contains your main(). It must take an argument that specifies the number of threads to use and then the 2nd argument specifies the full path to an input text file. It must work with 1 or greater threads. Print all word counts down to 1.

You must also create a graph, using whatever graphing program you want, that plots time in the Y axis and number of worker threads on the X axis. This should show a curve that describes the speed improvement you get from more than one thread.  You will print this out on paper and hand it to me at the start of class.


Your graph should clearly identify on the graph how many core your machine has. Also please label the axes (unlike my graph). When creating your graph, please try to run your program to get the data into the OS cash before tracking the program execution times. Otherwise, your numbers will be artificially inflated due to disk overhead.

Also, if you were able to beat my speed improvement ratio, please explain in detail the efficiencies in your program and your strategy.

My program requires on the order of 300M of memory to process the data.

For consistency, please use class names ThreadedWordCount, Loader, Worker.

All classes must be in the default package!

To make it easier for you to debug and for me to grade if there is a problem, please name your loader and worker threads something like this:

Thread.currentThread().setName("Loader");

Submission

You will create a jar file called wc.jar containing *.class files and place it in a directory called wc/dist under your cs680 dir:

https://www/svn/userid/cs680/wc/dist/wc.jar

Pur your source Java code in wc/src:

https://www/svn/userid/cs680/wc/src/...

To jar your stuff up, you will "cd" to the directory containing your source code (perhaps wc/src) and create the jar in the wc dir:

cd ~/cs680/wc/src
jar cvf ~/cs680/wc/dist/wc.jar *.class
cd ~/cs680/wc/dist
svn add wc.jar
svn commit

All classes must be in the default package!

Please submit a printed copy of your graph from above!

To learn more about submitting your project with svn, see Resources.

You should test your preoject by running as I will run it per the Grading section below.

You must submit your source code for credit.

Grading

I will run your program by pulling your wc.jar file from the repository and running like this:

java -Xmx1G -cp wc.jar ThreadedWordCount num_threads ads.tsv

I will check the results against my histogram.

If you beat my speed improvement ratio, I will grant you an extra 5% to counteract any deductions I make while grading. (100 is still the maximum score.)

You may discuss this project in its generality with anybody you want and may look at any code on the internet except for a classmate's code. You should physically code this project completely yourself but can use all the help you find other than cutting-n-pasting or looking at code from a classmate or other Human being.

I will deduct 10% if your program is not executable exactly in the fashion mentioned in the project; that is, class name, methods, lack-of-package, and jar must be exactly right. For you PC folks, note that case is significant for class names and file names on unix! All projects must run properly under linux.  That means it's a good idea to test it there yourself.

Comments