Workflows
Dynamic workflows
- Workflow is the series of activities that are necessary to complete a task. Each step in a workflow may have a specific step before it and a specific step after it, with the exception of the first step. In a linear workflow, the first step is usually initiated by an outside event.
- Boodskap workflow engine offers you the full freedom of executing steps in sequential and parallel mode
Simple Example
- In this example, we are going to create a workflow with the following
- A workflow takes an input of latitude and longitude
- First step will reverse geocode and find the address using Google API
- Second step goes to openweathermap API and pulls in the temperature data
- Then one of the below would happen based on the temperature
- If the temperature is less than 35, the process completes
- If the temperature is above 40 both SMS and Emails are triggered
- If the temperature is between 38-40, only SMS are triggered
- If the temperature is between 35-37, only Emails are triggered
Flow Diagram
Process Rules
/**
* OUTPUT
* {
* "address": String
* }
*/
import org.codehaus.jettison.json.JSONObject;
def lat = input.lat;
def lon = input.lon;
def resultData = [:];
resultData['address'] = null;
try{
def PROPERTY = domain.property("google.map.key");
if(PROPERTY != null){
def jsonObject = json.parseText(PROPERTY);
def API_KEY = jsonObject['apiKey'];
//Google API Service
def result = rest.get("https://maps.googleapis.com/maps/api/geocode/json?latlng="+lat+","+lon+"&key="+API_KEY).asString().getBody();
def resrec = util.toMap(new JSONObject(result));
if(resrec['status']=='OK' && resrec['results'].size() > 0){
resultData['address'] = resrec['results'][0]['formatted_address'];
}
}
else{
step.error('Google API Key not updated in the Domain Settings!')
resultData['address'] = null;
}
}catch(Exception ex){
step.error(ex);
log.error(ex.toString())
}
return resultData;
/**
* OUTPUT
* {
* "temp": float
* }
*/
import org.codehaus.jettison.json.JSONObject;
def lat = input.lat;
def lon = input.lon;
def excludeParams = 'minutely,hourly,daily';
def resultData=[:];
resultData['temp'] = null;
try{
def PROPERTY = domain.property("openweather.map.key");
if(PROPERTY != null){
def jsonObject = json.parseText(PROPERTY);
def API_KEY = jsonObject['apiKey'];
def str = "https://api.openweathermap.org/data/2.5/onecall?units=metric&exclude="+excludeParams+"&lat="+lat+"&lon="+lon+"&appid="+API_KEY;
//Open Weather API Service
def result = rest.get("https://api.openweathermap.org/data/2.5/onecall?units=metric&exclude="+excludeParams+"&lat="+lat+"&lon="+lon+"&appid="+API_KEY).asString().getBody();
forecastData = util.toMap(new JSONObject(result));
def insertData = forecastData['current'];
insertData['lat'] = forecastData['lat'];
insertData['lon'] = forecastData['lon'];
insertData['timezone'] = forecastData['timezone'];
insertData['timezone_offset'] = forecastData['timezone_offset'];
record.insert(1001, insertData);
resultData['temp'] = insertData.temp;
}
else{
step.error('Openweather Map API Key not updated in the Domain Settings!')
resultData['temp'] = null;
}
}catch(Exception ex){
step.error(ex);
log.error(ex)
}
return resultData;
/**
* OUTPUT
* {
* "none": boolean,
* "both": boolean,
* "sms": boolean,
* "email": boolean
* }
*/
def temp = args.temp;
def resultData = [:];
resultData['sms'] = false;
resultData['email'] = false;
resultData['none'] = false;
resultData['both'] = false;
try{
if(args.temp > 40){
resultData['both'] = true;
}else if(args.temp >= 38){
resultData['sms'] = true;
}else if(args.temp >= 35){
resultData['email'] = true;
}else{
resultData['none'] = true;
}
}catch(Exception ex){
step.error(ex);
log.error(ex.toString())
}
return resultData;
/**
* OUTPUT
* {
* "status": boolean
* }
*/
def resultData = [status: false];
try{
def vals = [temp:args.temp];
grid.mapQueue("EMAIL_NOTIFICATION_LISTENER").offer(vals);
resultData['status'] = true;
}catch(Exception ex){
step.error(ex);
log.error(ex.toString())
}
return resultData;
/**
* OUTPUT
* {
* "status": boolean
* }
*/
def resultData = [status: false];
try{
def vals = [temp:args.temp];
grid.mapQueue("SMS_NOTIFICATION_LISTENER").offer(vals);
resultData['status'] = true;
}catch(Exception ex){
step.error(ex);
log.error(ex.toString())
}
return resultData;
/**
* OUTPUT
* {
* "status": boolean
* }
*/
def resultData = [status: false];
try{
def vals = [temp:args.temp];
grid.mapQueue("SMS_NOTIFICATION_LISTENER").offer(vals);
grid.mapQueue("EMAIL_NOTIFICATION_LISTENER").offer(vals);
resultData['status'] = true;
}catch(Exception ex){
step.error(ex);
log.error(ex.toString())
}
return resultData;
JOB Rules
def notifyQ = grid.mapQueue("EMAIL_NOTIFICATION_LISTENER");
try{
log.info("Email Notification Listener Job %s[%d] at node:%s Started", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
while(!THIS.isStopped()){
def data = notifyQ.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
if(null != data){
rules.invoke(data,'EmailSender');
}
}
}
catch(Exception ex){
if(!THIS.isStopped()){
failure.insert(ex)
log.error(ex);
}
}finally{
log.info("Email Notification Listener Job %s[%d] at node:%s stopped", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
}
def notifyQ = grid.mapQueue("SMS_NOTIFICATION_LISTENER");
try{
log.info("SMS Notification Listener Job %s[%d] at node:%s Started", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
while(!THIS.isStopped()){
def data = notifyQ.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
if(null != data){
rules.invoke(data,'SmsSender');
}
}
}
catch(Exception ex){
if(!THIS.isStopped()){
failure.insert(ex)
log.error(ex);
}
}finally{
log.info("SMS Notification Listener Job %s[%d] at node:%s stopped", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
}
Named Rules
log.info("Email sent: %s",args)
return [:];
log.info("SMS sent: %s",args)
return [:];
Message Rules
try{
//Start a workflow with the given input data and metadata
def wfId = "WEATHER_FORECAST";
def wfInput = ['lat': msg.lat, 'lon': msg.lon, 'stamp': util.millis()];
def wfMeta = ['TriggeredBy': '1001 Message Rule'];
def result = workflow.start(wfId, wfInput, wfMeta);
log.info("Workflow instance => %s",result)
}catch(Exception ex){
failure.insert(ex);
log.error(ex.toString())
}
Updated over 3 years ago