Scalable Job

A job that can run in parallel

  • Scalable Jobs runs on each nodes of the cluster, you can scale them horizontally by running them exactly equal number of instances on every cluster node
  • Context Variable: job
  • Running Job Handle Variable: THIS

Example

1) Create a groovy script class using

Live Script Editor

A simple utility class to demonstrate Merge Sorting algorithm
package tutorial;

import java.util.Arrays;
import java.util.List;

class MergeSort {

	private static void merge(List<Integer> arr, int left, int middle, int right) {

		int low = middle - left + 1;
		int high = right - middle;

		int[] L = new int[low];
		int[] R = new int[high];

		int i = 0, j = 0;

		for (i = 0; i < low; i++) {
			L[i] = arr.get(left + i);
		}

		for (j = 0; j < high; j++) {
			R[j] = arr.get(middle + 1 + j);
		}

		int k = left;
		i = 0;
		j = 0;

		while (i < low && j < high) {
			if (L[i] <= R[j]) {
				arr.set(k, L[i]);
				i++;
			} else {
				arr.set(k, R[j]);
				j++;
			}
			k++;
		}

		while (i < low) {
			arr.set(k, L[i]);
			i++;
			k++;
		}

		while (j < high) {
			arr.set(k, R[j]);
			j++;
			k++;
		}
	}

	private static void mergeSort(List<Integer> arr, int left, int right) {
		int middle;
		if (left < right) {

			middle = (left + right) / 2;

			mergeSort(arr, left, middle); // left subarray
			mergeSort(arr, middle + 1, right); // right subarray

			merge(arr, left, middle, right); // merge the two subarrays
		}
	}

	public static void sort(List<Integer> arr) {
		mergeSort(arr, 0, arr.size() - 1);
	}

}

2) Write a binary rule (SortFeeder)

Binary Rule

A simple rule to parse the incoming file line by line and feed the comma separated numbers are a list of integers into a distributed queue
def queue = grid.objQueue("MSORT_QUEUE"); //create or get the queue

def lines = msg.lines(); //split the incoming data into list of lines
int count = 0;

for(line in lines){
    
    def csv = line.trim();
    
    if("".equals(csv)) continue; //if blank skip it
    
    def vals = csv.split(",");
    
    def iarray = [];
    
    for(val in vals){
        iarray.add(Integer.valueOf(val))
    }
    
    queue.offer(iarray);
    
    ++count;
}

log.info("%d arrays feeded", count);

3) Write a scalable job to sort the incoming data

The job waits on a queue for list of integers (MergeSorter)
import tutorial.MergeSort;

log.info("MergeSorter running...");

def queue = grid.objQueue("MSORT_QUEUE");
def al = grid.atomicLong("MSORT_QUEUE_JOB");
def JOBID = al.incrementAndGet();

try{

    while(THIS.isRunning()){
        
        def narray = queue.take(); //wait for data
        
        try{
            
            MergeSort.sort(narray); //do the sorting
        
            log.info("Node[%d][%s] Sorted: %s", JOBID, THIS.getNodeId(), narray);
        
        }catch(Exception ex){
            log.error(ex);
        }
        
    }
    
}finally{
    al.decrementAndGet();
}

log.info("MergeSorter stopped.")

See Also

Related REST APIs

[Create / Update Job(ref:upsertjob)
Get Job
Delete Job
Count Jobs
List Jobs
Set Job State
Count Running Jobs
List Running Jobs
Start Job
Restart Job
Count All Running Jobs