Workflows

Dynamic workflows

  • Workflow is the series of activities that are necessary to complete a task. Each step in a workflow may have a specific step before it and a specific step after it, with the exception of the first step. In a linear workflow, the first step is usually initiated by an outside event.
  • Boodskap workflow engine offers you the full freedom of executing steps in sequential and parallel mode

Simple Example

  • In this example, we are going to create a workflow with the following
    • A workflow takes an input of latitude and longitude
    • First step will reverse geocode and find the address using Google API
    • Second step goes to openweathermap API and pulls in the temperature data
    • Then one of the below would happen based on the temperature
      • If the temperature is less than 35, the process completes
      • If the temperature is above 40 both SMS and Emails are triggered
      • If the temperature is between 38-40, only SMS are triggered
      • If the temperature is between 35-37, only Emails are triggered

Flow Diagram

2128

Workflow Diagram

Process Rules

/**
* OUTPUT 
* {
*   "address": String
* }
*/

import  org.codehaus.jettison.json.JSONObject;


def lat = input.lat;
def lon = input.lon;

def resultData = [:];
resultData['address'] = null;

try{
    
    def PROPERTY = domain.property("google.map.key");
    
    if(PROPERTY != null){
        
        def jsonObject = json.parseText(PROPERTY);
        def API_KEY = jsonObject['apiKey'];
        
        //Google API Service
        def result = rest.get("https://maps.googleapis.com/maps/api/geocode/json?latlng="+lat+","+lon+"&key="+API_KEY).asString().getBody();
        
        def resrec = util.toMap(new JSONObject(result));
        
        if(resrec['status']=='OK' && resrec['results'].size() > 0){
           
            resultData['address'] = resrec['results'][0]['formatted_address'];
            
        }
        
    }
    else{
        step.error('Google API Key not updated in the Domain Settings!')
        resultData['address'] = null;
    }

    
}catch(Exception ex){
    step.error(ex);
    log.error(ex.toString())
}

return resultData;
/**
* OUTPUT 
* {
*   "temp": float
* }
*/

import  org.codehaus.jettison.json.JSONObject;

def lat = input.lat;
def lon = input.lon;

def excludeParams = 'minutely,hourly,daily';

def resultData=[:];
resultData['temp'] = null;

try{
    
    def PROPERTY = domain.property("openweather.map.key");
    
    if(PROPERTY != null){
        
        def jsonObject = json.parseText(PROPERTY);
        def API_KEY = jsonObject['apiKey'];
        
        def str = "https://api.openweathermap.org/data/2.5/onecall?units=metric&exclude="+excludeParams+"&lat="+lat+"&lon="+lon+"&appid="+API_KEY;
        
        //Open Weather API Service
        def result = rest.get("https://api.openweathermap.org/data/2.5/onecall?units=metric&exclude="+excludeParams+"&lat="+lat+"&lon="+lon+"&appid="+API_KEY).asString().getBody();
        
        forecastData = util.toMap(new JSONObject(result));
     
        
        def insertData = forecastData['current'];
        
        insertData['lat'] = forecastData['lat'];
        insertData['lon'] = forecastData['lon'];
        insertData['timezone'] = forecastData['timezone'];
        insertData['timezone_offset'] = forecastData['timezone_offset'];
        
        record.insert(1001, insertData);
        

        resultData['temp'] = insertData.temp;
    }
    else{
        step.error('Openweather Map API Key not updated in the Domain Settings!')
        resultData['temp'] = null;
    }

    
}catch(Exception ex){
    step.error(ex);
    log.error(ex)
}

return resultData;
/**
* OUTPUT 
* {
*   "none": boolean,
*   "both": boolean,
*   "sms": boolean,
*   "email": boolean
* }
*/

def temp = args.temp;

def resultData = [:];
   
resultData['sms'] = false;
resultData['email'] = false;    
resultData['none'] = false;
resultData['both'] = false;

try{
    
  if(args.temp > 40){
    resultData['both'] = true;
  }else if(args.temp >= 38){
    resultData['sms'] = true;
  }else if(args.temp >= 35){
    resultData['email'] = true;
  }else{
    resultData['none'] = true;
  }
      
}catch(Exception ex){
    step.error(ex);
    log.error(ex.toString())
}

return resultData;
/**
* OUTPUT 
* {
*   "status": boolean
* }
*/

def resultData = [status: false];

try{
    
    def vals = [temp:args.temp];
    
    grid.mapQueue("EMAIL_NOTIFICATION_LISTENER").offer(vals);
  
    resultData['status'] = true;

    
}catch(Exception ex){
    step.error(ex);
    log.error(ex.toString())
}

return resultData;
/**
* OUTPUT 
* {
*   "status": boolean
* }
*/

def resultData = [status: false];

try{
    
    def vals = [temp:args.temp];
    
    grid.mapQueue("SMS_NOTIFICATION_LISTENER").offer(vals);
  
    resultData['status'] = true;

    
}catch(Exception ex){
    step.error(ex);
    log.error(ex.toString())
}

return resultData;
/**
* OUTPUT 
* {
*   "status": boolean
* }
*/

def resultData = [status: false];

try{
    
    def vals = [temp:args.temp];
    
    grid.mapQueue("SMS_NOTIFICATION_LISTENER").offer(vals);
    grid.mapQueue("EMAIL_NOTIFICATION_LISTENER").offer(vals);
  
    resultData['status'] = true;

    
}catch(Exception ex){
    step.error(ex);
    log.error(ex.toString())
}

return resultData;

JOB Rules

def notifyQ = grid.mapQueue("EMAIL_NOTIFICATION_LISTENER"); 

try{
    log.info("Email Notification Listener Job %s[%d] at node:%s Started", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
    
    
    while(!THIS.isStopped()){
          
    
        def data = notifyQ.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
        
        if(null != data){
            rules.invoke(data,'EmailSender');
        }
    }
    
}
catch(Exception ex){
    if(!THIS.isStopped()){
        failure.insert(ex)
        log.error(ex);
    }
}finally{
    
    log.info("Email Notification Listener Job %s[%d] at node:%s stopped", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
}
def notifyQ = grid.mapQueue("SMS_NOTIFICATION_LISTENER"); 

try{
    log.info("SMS Notification Listener Job %s[%d] at node:%s Started", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
    
    
    while(!THIS.isStopped()){
          
    
        def data = notifyQ.poll(100, java.util.concurrent.TimeUnit.MILLISECONDS);
        
        if(null != data){
            rules.invoke(data,'SmsSender');
        }
    }
    
}
catch(Exception ex){
    if(!THIS.isStopped()){
        failure.insert(ex)
        log.error(ex);
    }
}finally{
    
    log.info("SMS Notification Listener Job %s[%d] at node:%s stopped", THIS.getId(), THIS.getInstanceId(), THIS.getNodeId());
}

Named Rules

log.info("Email sent: %s",args)
return [:];
log.info("SMS sent: %s",args)
return [:];

Message Rules

try{

    //Start a workflow with the given input data and metadata
    def wfId = "WEATHER_FORECAST";
    def wfInput = ['lat': msg.lat, 'lon': msg.lon, 'stamp': util.millis()];
    def wfMeta = ['TriggeredBy': '1001 Message Rule'];
  
    def result = workflow.start(wfId, wfInput, wfMeta);
  
    log.info("Workflow instance => %s",result)
    
}catch(Exception ex){
    failure.insert(ex);
    log.error(ex.toString())
}