Scalable Job
A job that can run in parallel
- Scalable Jobs runs on each nodes of the cluster, you can scale them horizontally by running them exactly equal number of instances on every cluster node
- Context Variable: job
- Running Job Handle Variable: THIS
Example
1) Create a groovy script class using
A simple utility class to demonstrate Merge Sorting algorithm
package tutorial;
import java.util.Arrays;
import java.util.List;
class MergeSort {
private static void merge(List<Integer> arr, int left, int middle, int right) {
int low = middle - left + 1;
int high = right - middle;
int[] L = new int[low];
int[] R = new int[high];
int i = 0, j = 0;
for (i = 0; i < low; i++) {
L[i] = arr.get(left + i);
}
for (j = 0; j < high; j++) {
R[j] = arr.get(middle + 1 + j);
}
int k = left;
i = 0;
j = 0;
while (i < low && j < high) {
if (L[i] <= R[j]) {
arr.set(k, L[i]);
i++;
} else {
arr.set(k, R[j]);
j++;
}
k++;
}
while (i < low) {
arr.set(k, L[i]);
i++;
k++;
}
while (j < high) {
arr.set(k, R[j]);
j++;
k++;
}
}
private static void mergeSort(List<Integer> arr, int left, int right) {
int middle;
if (left < right) {
middle = (left + right) / 2;
mergeSort(arr, left, middle); // left subarray
mergeSort(arr, middle + 1, right); // right subarray
merge(arr, left, middle, right); // merge the two subarrays
}
}
public static void sort(List<Integer> arr) {
mergeSort(arr, 0, arr.size() - 1);
}
}
2) Write a binary rule (SortFeeder)
A simple rule to parse the incoming file line by line and feed the comma separated numbers are a list of integers into a distributed queue
def queue = grid.objQueue("MSORT_QUEUE"); //create or get the queue
def lines = msg.lines(); //split the incoming data into list of lines
int count = 0;
for(line in lines){
def csv = line.trim();
if("".equals(csv)) continue; //if blank skip it
def vals = csv.split(",");
def iarray = [];
for(val in vals){
iarray.add(Integer.valueOf(val))
}
queue.offer(iarray);
++count;
}
log.info("%d arrays feeded", count);
3) Write a scalable job to sort the incoming data
The job waits on a queue for list of integers (MergeSorter)
import tutorial.MergeSort;
log.info("MergeSorter running...");
def queue = grid.objQueue("MSORT_QUEUE");
def al = grid.atomicLong("MSORT_QUEUE_JOB");
def JOBID = al.incrementAndGet();
try{
while(THIS.isRunning()){
def narray = queue.take(); //wait for data
try{
MergeSort.sort(narray); //do the sorting
log.info("Node[%d][%s] Sorted: %s", JOBID, THIS.getNodeId(), narray);
}catch(Exception ex){
log.error(ex);
}
}
}finally{
al.decrementAndGet();
}
log.info("MergeSorter stopped.")
See Also
Related REST APIs
Create / Update Job
Get Job
Delete Job
Count Jobs
List Jobs
Set Job State
Count Running Jobs
List Running Jobs
Start Job
Restart Job
Count All Running Jobs
Updated about 16 hours ago