Well after my last post about Executor Framework, I was desperate about finding the practical usage of the framework. I googled the whole day and found some of the search applications, where they used this approach to search in parallel. This approach drastically improves the time complexity!! no doubt. But one thing I noticed, after submitting the tasks the execution is waiting for a particular task to be completed in a queue, even if other tasks in the queue are ready with the result. It does not make much difference, where the execution is not performing any task on the result set. But in a large system, where the system performs 'n' number task on the result set, the new approach will end up boosting the time complexity. No more theory, let's get in to the code. First let me demonstrate the approach, that I googled.
So here I will curl multiple web sites and get the response. I have created a Callable implementation, where I am doing a curl to the web site. This is to give a sense how the search applications gather data from different sources.
So here I will curl multiple web sites and get the response. I have created a Callable implementation, where I am doing a curl to the web site. This is to give a sense how the search applications gather data from different sources.
package org.satish.concurrency;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.Callable;
/**
* This the callable implementation where I am curling the sites and send the
* response back.
*
* @author Satish Kumar
*
*/
public class WebCurlCallable implements Callable<String> {
private String siteName;
WebCurlCallable(String siteName) {
this.siteName = siteName;
}
public String call() {
StringBuffer response = new StringBuffer();
try {
URL url = new URL(this.siteName);
URLConnection urlConnection = url.openConnection();
BufferedReader in = new BufferedReader(new InputStreamReader(
urlConnection.getInputStream()));
String inputLine;
while ((inputLine = in.readLine()) != null)
response.append(inputLine);
in.close();
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return this.siteName + " :::::: " + response.toString();
}
public String getSiteName() {
return siteName;
}
public void setSiteName(String siteName) {
this.siteName = siteName;
}
}
WebCurlCallable.java
Now let me post the actual class from where, I will be creating a thread pool of 3 and will be assigning the task to the threads. If you notice, I will be collecting the Future objects in a list and in the next section I will be looping through the list and will be getting the result set from each Future object. Now in the loop, when the execution encounter the "future.get()", it will wait till completion of the associated task, even if other task in the list already finished and be ready with the result set. In my program I am not doing any post processing after getting the result set. But think of a situation, where you need to perform some task on that result sets. In that case it is a loss of CPU time, which could have been used for some processing.
package org.satish.concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class WebCurlTesterInitial{
/*Thread Pool Size*/
private static final int NO_OF_THREADS = 3;
/*List of sites to */
private static final String siteNames[] = { "http://www.google.com",
"http://www.yahoo.com", "http://www.touringheights.com",
"http://satish-tech-talks.blogspot.in/", "http://www.facebook.com",
"http://www.oracle.com", "http://www.amazon.com", "http://www.cnn.com", "http://www.about.com",
"http://www.ebay.com", "http://www.download.com" };
/** main thread. Alwyas there by default. **/
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(NO_OF_THREADS);
List<Future<String>> list = new ArrayList<Future<String>>(10); // provides facility to return results asynchronously
for (int i = 0; i < siteNames.length; i++) {
Callable<String> worker = new WebCurlCallable(siteNames[i]); // create worker threads
Future<String> submit = executor.submit(worker); // add runnables to the work queue
list.add(submit);
}
//process the results asynchronously when each thread completes its task
for (Future<String> future : list) {
try {
System.out.println(future.get().substring(0, 60));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
System.out.println("Finished all threads in : " + (System.currentTimeMillis() - startTime)/100 + " secs");
}
}
WebCurlTesterInitial.java
Output:
http://www.google.com :::::: <!doctype html><html itemscope
http://www.yahoo.com :::::: <!DOCTYPE html><html lang="en-IN
http://www.touringheights.com :::::: <!DOCTYPE html PUBLIC "
http://satish-tech-talks.blogspot.in/ :::::: <!DOCTYPE html
http://www.facebook.com :::::: <!DOCTYPE html><html lang="en
http://www.oracle.com :::::: <!DOCTYPE html PUBLIC "-//W3C//
http://www.amazon.com :::::: <!DOCTYPE html PUBLIC "-//W3C
http://www.cnn.com :::::: <!DOCTYPE HTML><html lang="en-US">
http://www.about.com :::::: <!doctype html> <!--[if lt IE 7
http://www.ebay.com :::::: <!DOCTYPE html PUBLIC "-//W3C//DT
http://www.download.com :::::: <!DOCTYPE html> <html lang=
Finished all threads in : 105 secs
To utilize that time, I took the help of FutureTask. FutureTask has a protected method which get triggered, once the task has been completed. By default the "done()" does not do anything. So I extended this class and implemented the "done()" method, the way I wanted. This worked as a callback method for me.
package org.satish.concurrency;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* I picked this implementation, because I what to write call back for each
* task. FutreTask class has a method done(), which get fired once the task is
* completed
*
* @author Satish
*
*/
public class WebCurlFutureTax extends FutureTask<String> {
public WebCurlFutureTax(Callable<String> callable) {
super(callable);
// TODO Auto-generated constructor stub
}
protected void done() {
/* After completion logic */
try {
/*
* Once the task is completed, get the result and print. User can
* implement their own logic here i.e. to fire a event or to trigger
* some other business logic.
*/
System.out.println(this.get().substring(0, 60));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
WebCurlFutureTax.java
package org.satish.concurrency;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public class WebCurlTester {
/*Thread Pool Size*/
private static final int NO_OF_THREADS = 3;
/*List of sites to */
private static final String siteNames[] = { "http://www.google.com",
"http://www.yahoo.com", "http://www.touringheights.com",
"http://satish-tech-talks.blogspot.in/", "http://www.facebook.com",
"http://www.oracle.com", "http://www.amazon.com", "http://www.cnn.com", "http://www.about.com",
"http://www.ebay.com", "http://www.download.com" };
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(NO_OF_THREADS);
for (int i = 0; i < siteNames.length; i++) {
Runnable worker = new WebCurlFutureTax(
new WebCurlCallable(siteNames[i])); // create worker threads
executor.submit(worker); // add runnables to the work queue
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads have completed
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads in : " + (System.currentTimeMillis() - startTime)/100 + " secs");
}
}
WebCurlTester.java
Output:
http://www.google.com :::::: <!doctype html><html itemscope
http://www.touringheights.com :::::: <!DOCTYPE html PUBLIC "
http://satish-tech-talks.blogspot.in/ :::::: <!DOCTYPE html
http://www.oracle.com :::::: <!DOCTYPE html PUBLIC "-//W3C//
http://www.yahoo.com :::::: <!DOCTYPE html><html lang="en-IN
http://www.facebook.com :::::: <!DOCTYPE html><html lang="en
http://www.about.com :::::: <!doctype html> <!--[if lt IE 7
http://www.amazon.com :::::: <!DOCTYPE html PUBLIC "-//W3C
http://www.cnn.com :::::: <!DOCTYPE HTML><html lang="en-US">
http://www.ebay.com :::::: <!DOCTYPE html PUBLIC "-//W3C//DT
http://www.download.com :::::: <!DOCTYPE html> <html lang=
Finished all threads in : 61 secs
Now you can see there is a big difference in time complexity. And till now my callback method does not do much things. Think of a situation where there is a post processing on the resultant. In that case a significant performance can be achieved. This approach can be used for tech solutions, where the CPU time of the distributed servers can be used efficiently. SOA is a great approach to implement concurrent solutions, where the module chunks are distributed among multiple servers. Problem statements can be designed to perform task in a parallel way instead of a serial way.
0 comments:
Post a Comment