Programming

From JPPFWiki

Jump to: navigation, search

Contents

Main Page > Programming

Software Requirements

  • Any platform with Java 2, Standard Edition (J2SE) 5.0 or later.
  • A JPPF distribution (for implementation the jppf-client.jar and jppf-common.jar files are required).


Client Implementation

The client has to split up the calculations into pieces that are executed in a single server node, and pass them to the JPPF framework. The atomic portions that are executed in a JPPF node are called tasks. It is up to the client application to define a reasonable part of the complete algorithm as a task. Small tasks may cause a high transport overhead, while large tasks may cause poor parallelism in execution.


Task Implementation

A task that can be submitted to the JPPF framework for execution must be a specialization of the JPPFTask. It must at least implement the run() method. The run() method contains the code that is executed in a JPPF node.

 import org.jppf.server.protocol.JPPFTask;
 
 public class HelloTask extends JPPFTask
 {
   public void run()
   {
     setResult("Hello World");
   }
 }

The run() method must store the result of the calculation using the setResult() method.

Client Initialisation

A JPPF client must initialize a connection to a JPPF server. This is done by creating an instance of JPPFClient. The client connects with default parameters to the server, if no configuration file is set by system properties.

 import org.jppf.client.JPPFClient;
 
 public static void main(String[] args)
 {
   JPPFClient client = new JPPFClient();
   // .. use the client
 }

Task Submission

Tasks are submitted to the JPPFClient instance as a List of JPPFTask instances. The list is passed to the submit() method, and a List of JPPFTask instances is returned, containing the results.

 public static void main(String[] args)
 {
   JPPFClient client = new JPPFClient();
   List<JPPFTask> tasks = new ArrayList<JPPFTask>();
   tasks.add(new HelloTask());
   try
   {
     // execute the tasks
     List<JPPFTask> results = client.submit(tasks, null);
   }
   catch (Exception e)
   {
     e.printStackTrace();
   }
 }

Task Results

The result of a submission of a task list is returned by the submit() method. Each of the returned JPPFTask instances contains the result of this specific task, or the Exception if the execution failed. A client should always first check if an Exception occurred before the result is examined.

 List<JPPFTask> results = client.submit(tasks, null);
 JPPFTask task = results.get(0);
 if  (null == task.getException())
 {
   Object result = task.getResult();
   // use the result ...
 }
 else
 {
   Exception ex = task.getException();
   // handle the exception ...
 }

Advanced Features

Sharing data among tasks

The DataProvider interface enables tasks to share data, in order to avoid transmitting the data multiple times. To use a data provider, you need to instantiate it, add data through its setValue(Object, Object) method, then send it along with the tasks.

 Matrix a = new Matrix(300);
 DataProvider dataProvider = new MemoryMapDataProvider();
 dataProvider.setValue("matrix.a", a);
 jppfClient.submit(taskList, dataProvider);

Alternate forms of tasks submission

In addition to the 2-arguments method submit(List<JPPFTask>, DataProvider) JPPFClient provides the following methods to submit tasks:

  • List<JPPFTask> submit(List<JPPFTask>, DataProvider, ExecutionPolicy) : synchronous submission with an execution policy.
  • void submitNonBlocking(List<JPPFTask>, DataProvider, TaskResultListener) : asynchronous submission with a TaskResultListener that handles the notifications of execution results.
  • void submitNonBlocking(List<JPPFTask>, DataProvider, TaskResultListener, ExecutionPolicy) : asynchronous submission with a TaskResultListener and an execution policy.

Accessing remote data from a task

Another implementation of DataProvider, URLDataProvider, enables reading and writing data located through a URL. Its getValue(Object) method takes a URL as parameter and returns a corresponding input stream. Likewise, its setValue(Object, Object) method takes an input stream as its second parameter, allowing the task to write arbitrary data to a remote location.

 public class FileDownloadTestTask extends JPPFTask
 {
   public void run()
   {
     try
     {
       // download the XSD file from an HTTP location
       URL url = new URL("http://www.jppf.org/Options.xsd");
       InputStream is = (InputStream) getDataProvider().getValue(url);
       String s = FileUtils.readTextFile(new BufferedReader(new InputStreamReader(is)));
       setResult(s);
       // upload the same file onto a remote FTP server
       url = new URL("ftp://ftp.myserver.org/Options.xsd");
       getDataProvider().setValue(url, new ByteArrayInputStream(s.getBytes()));
     }
     catch(Exception e)
     {
       setException(e);
     }
   }
 }

Server Connections Pooling

JPPF clients provide very flexible options for load-balancing and failover. These options are setup using the client configuration file, as described here.

Associated with the asynchronous submission of multiple sets of tasks, this enables a powerful way of load-balancing the execution of tasks and of boosting the overall throughput.

The asynchronous submission of a set of tasks is performed as follows:

 public class MyConnectionPool
 {
   public static void main(String...args)
   {
     try
     {
       // Initialize the client and connections pool
       JPPFClient client = new JPPFClient();
       // Compute the tasks to submit for execution
       List<JPPFTask> list1 = ....; // first set of tasks
       List<JPPFTask> list2 = ....; // second set of tasks
       // Initialize the collectors for the results of each submission
       JPPFResultCollector collector1 = new JPPFResultCollector(list1.size());
       JPPFResultCollector collector2 = new JPPFResultCollector(list2.size());
       // submit the tasks in an asynchronous, non blocking way
       client.submitNonBlocking(list1, null, collector1);
       client.submitNonBlocking(list2, null, collector2);
       // ... execute some code while the tasks are executing ...
       ...
       // get the results back, waiting until the client has received them
       List<JPPFTask> result1 = collector1.waitForResults();
       List<JPPFTask> result2 = collector2.waitForResults();
       // ... process the results ...
     }
     catch(Exception e)
     {
       e.printStackTrace();
     }
   }
 }

Here the 2 major points are:

  1. The use of the new class JPPFResultCollector whose role is to receive, collect and store the results of a submission
  2. The use of JPPFClient.submitNonBlocking(taskList, dataProvider, resultCollector) to perform an asynchronous submission

Using non-serializable classes in a task

We have seen that a JPPF task requires the use of serializable classes for the type of its instance variables, or any object that is part of its object graph. Sometimes, however, the source code of these classes is not accessible (for instance in a 3rd-party library) and it may be an issue to use these classes with JPPF.
Here, we are presenting an overview of 2 ways to get around this limitation.

Task-localized serialization

One way to work around this is to use the XStream library directly in the tasks code. XStream is a powerful XML serialization framework that enables the serialization to XML of classes that do not implement the Serializable interface.
The principle is to perform the serialization to XML on the client side, when the task is first instantiated, then perform the deserialization from XML on the node side, when it is deserialized.

Here, we are showing an example task that uses XStream to serialize a simple object graph. This sample uses XStream version 1.2.2 and only requires that you have the xstream-1.2.2.jar in the classpath of the JPPF client.

 public class XstreamTask extends JPPFTask
 {
   /**
    * Person object to serialize with xstream. Note that it must be declared as transient.
    */
   private transient Person person = null;
   /**
    * Xml representation of the Person object to deserialize with xstream.
    */
   private String personXml = null;
 
   /**
    * Initialize this task with the specified person.
    * @param person a <code>Person</code> instance.
    */
   public XstreamTask(Person person)
   {
     this.person = person;
     // serialize person to XML
     XStream xstream = new XStream(new DomDriver());
     personXml = xstream.toXML(person);
   }
 
   /**
    * Run this task.
    * @see java.lang.Runnable#run()
    */
   public void run()
   {
     // deserialize person from XML
     XStream xstream = new XStream(new DomDriver());
     person = (Person) xstream.fromXML(personXml);
     String s = person.toString();
     System.out.println("deserialized this person: " + s);
     setResult(s);
   }
 }

The full code for this example is accessible from these links:

Specifying alternate object streams

JPPF performs objects transport and associated serialization by the means of object streams, i.e. instances of ObjectInputStream and ObjectOutputStream or subclasses of these classes.

It is now possible to specify alternate object stream classes for a JPPF grid, enabling the use of non-serializable classes without any extra coding required for the JPPF task developer.
JPPF provides 2 ways to achieve this:

1. Specifying the object stream implementation classes
This is done in the JPPF configuration file, by adding these 2 properties:

 # configure the object input stream implementation
 jppf.object.input.stream.class = my.package.MyObjectInputStream
 # configure the object output stream implementation
 jppf.object.output.stream.class = my.package.MyObjectOutputStream

Please note that the object stream implementations must have a constructor that takes an InputStream parameter for the object input stream class, and an OutputStream parameter for the object output stream class.

2. Implementing an object stream builder
An object stream builder is an object that instantiates input and output object streams.
It is defined as an implementation of the JPPFObjectStreamBuilder interface:

 /**
  * Interface for all builders instantiating alternate object input streams and output streams.
  */
 public interface JPPFObjectStreamBuilder
 {
   /**
    * Obtain an input stream used for deserializing objects.
    * @param  in input stream to read from.
    * @return an ObjectInputStream
    * @throws Exception if an error is raised while creating the stream.
    */
   ObjectInputStream newObjectInputStream(InputStream in) throws Exception;
   /**
    * Obtain an Output stream used for serializing objects.
    * @param  out output stream to write to.
    * @return an ObjectOutputStream
    * @throws Exception if an error is raised while creating the stream.
    */
   ObjectOutputStream newObjectOutputStream(OutputStream out) throws Exception;
 }

You then configure JPPF to use this object stream builder by specifying the following property in the JPPF configuration file:

 # configure the object stream builder implementation
 jppf.object.stream.builder = my.package.MyObjectStreamBuilder



Notes:
  a. when no alternate object stream is specified, JPPF uses the standard java.io.ObjectInputStream and java.io.ObjectOutputStream of the JDK
  b. when alternate object streams are specified, they must be used by all JPPF clients, servers and nodes, otherwise JPPF will not work. The implementation classes must also be present in the classpath of all JPPF components
  c. JPPF has a built-in Object Stream Builder that uses XStream to provide XML serialization: XstreamObjectStreamBuilder. To use it, simply specify "jppf.object.stream.builder = org.jppf.serialization.XstreamObjectStreamBuilder" in the JPPF configuration files. You will also need the XStream 1.3 (or later) jar file and the xpp3 jar file available in the XStream distribution


Caveats & Pitfalls

Some common problems are described in the following sections.

Remote Execution

The tasks are executed remotely in a server node. The instances of the tasks are serialized and transferred to the node server for execution. This might cause some unexpected behaviour, in particular:

  • All static fields are initialised in the node server, getting the initial values again as defined in the class. So static fields should not be used in tasks, unless they are final or set to the desired value in the run() method.
  • All transient fields are not transferred to the node server. They will have a null value. Fields that are required in the run method should not be transient, while all other fields might be declared transient to reduce the amount of transferred data.
  • Resources like network connections and files cannot be used in tasks. Network connections have to be re-opened in the run method, and files must be read into class fields before the task is submitted.

Security Restrictions

The node server that executes the tasks typically runs with a restricted access policy. The policy needs to be adapted to the access required by tasks. The policy file delivered with JPPF is named jppf.policy and contains only those rights required by the JPPF node server itself.

Personal tools