Scalable Job
A job that can run in parallel
- Scalable Jobs runs on each nodes of the cluster, you can scale them horizontally by running them exactly equal number of instances on every cluster node
- Context Variable: job
- Running Job Handle Variable: THIS
Example
1) Create a groovy script class using
A simple utility class to demonstrate Merge Sorting algorithm
package tutorial;
import java.util.Arrays;
import java.util.List;
class MergeSort {
private static void merge(List<Integer> arr, int left, int middle, int right) {
int low = middle - left + 1;
int high = right - middle;
int[] L = new int[low];
int[] R = new int[high];
int i = 0, j = 0;
for (i = 0; i < low; i++) {
L[i] = arr.get(left + i);
}
for (j = 0; j < high; j++) {
R[j] = arr.get(middle + 1 + j);
}
int k = left;
i = 0;
j = 0;
while (i < low && j < high) {
if (L[i] <= R[j]) {
arr.set(k, L[i]);
i++;
} else {
arr.set(k, R[j]);
j++;
}
k++;
}
while (i < low) {
arr.set(k, L[i]);
i++;
k++;
}
while (j < high) {
arr.set(k, R[j]);
j++;
k++;
}
}
private static void mergeSort(List<Integer> arr, int left, int right) {
int middle;
if (left < right) {
middle = (left + right) / 2;
mergeSort(arr, left, middle); // left subarray
mergeSort(arr, middle + 1, right); // right subarray
merge(arr, left, middle, right); // merge the two subarrays
}
}
public static void sort(List<Integer> arr) {
mergeSort(arr, 0, arr.size() - 1);
}
}
2) Write a binary rule (SortFeeder)
A simple rule to parse the incoming file line by line and feed the comma separated numbers are a list of integers into a distributed queue
def queue = grid.objQueue("MSORT_QUEUE"); //create or get the queue
def lines = msg.lines(); //split the incoming data into list of lines
int count = 0;
for(line in lines){
def csv = line.trim();
if("".equals(csv)) continue; //if blank skip it
def vals = csv.split(",");
def iarray = [];
for(val in vals){
iarray.add(Integer.valueOf(val))
}
queue.offer(iarray);
++count;
}
log.info("%d arrays feeded", count);
3) Write a scalable job to sort the incoming data
The job waits on a queue for list of integers (MergeSorter)
import tutorial.MergeSort;
log.info("MergeSorter running...");
def queue = grid.objQueue("MSORT_QUEUE");
def al = grid.atomicLong("MSORT_QUEUE_JOB");
def JOBID = al.incrementAndGet();
try{
while(THIS.isRunning()){
def narray = queue.take(); //wait for data
try{
MergeSort.sort(narray); //do the sorting
log.info("Node[%d][%s] Sorted: %s", JOBID, THIS.getNodeId(), narray);
}catch(Exception ex){
log.error(ex);
}
}
}finally{
al.decrementAndGet();
}
log.info("MergeSorter stopped.")
See Also
Related REST APIs
[Create / Update Job(ref:upsertjob)
Get Job
Delete Job
Count Jobs
List Jobs
Set Job State
Count Running Jobs
List Running Jobs
Start Job
Restart Job
Count All Running Jobs
Updated over 3 years ago