Atomic Job

  • Atomic Jobs gives you the best in class performance when you want to deal with short lived background service
  • Context Variable: job
  • Running Job Handle Variable: THIS

Example

  • Let's write a simple solution for this simple problem
    • A particular city has to be updated with it's weather data it it's already not updated for that day

1) Define a record to keep track of the weather data of a day

Record Architecture

{
   "day": string,
   "city": float,
   "node": string,
   "temp": float,
   "pressure": int,
   "humidity": int,
   "temp_min": float,
   "temp_max":float
}

2) Define a message to accept a city name from a sensor / api

Message Architecture

{
   "city": string
}

3) Message rule to validate and invoke an Atomic Job to update weather data

Message Rule

try{
    
    def REC_ID = 6000;
    def JOB_ID = "UpdateCityWeatherJob";
    def today = util.formatDate("yyyyMMdd");
    def id = String.format("%s%s",  today, msg.city);
    def queue = grid.stringQueue("CITIES"); //create or get the queue
    
    def orec = record.get(REC_ID, id);
    
    if(null != orec){
      log.warn("City %s is already updated", msg.city);
      return;
    }
    
    queue.offer(msg.city);
    
    if(job.count(JOB_ID) <= 0){
        job.start(JOB_ID);
    }
    
}catch(Exception ex){
    log.error(ex);
    
}

4) Atomic Job (UpdateCityWeatherJob) to get the weather update from openstreetmap.org API

def API_KEY = domain.get("api_key"); //get the stored API key
def API_URL = "https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api}&units={units}";
def REC_ID = 6000;
def queue = grid.stringQueue("CITIES"); //create or get the queue

try{

    log.info("UpdateCityWeatherJob running...");

    while(THIS.isRunning()){
        
        def city = queue.poll(1,  java.util.concurrent.TimeUnit.SECONDS);
        
       if(null == city ) break; //stop this job

        try{
            

            def res = rest.get(API_URL)
                            .routeParam("city", city)
                            .routeParam("api", API_KEY)
                            .routeParam("units", "metric")
                            .asJson()
                            .getBody()
                            ;
            def main = res.getObject().getJSONObject("main");

            def rec = main.toMap();
            rec['city'] = city;
            rec['node'] = THIS.getNodeId();
            rec['day'] = util.formatDate("yyyyMMdd");

            def id = String.format("%s%s", rec['day'], rec['city']);
            
            record.insert(REC_ID, id, rec);

        }catch(Exception ex){
            log.error(ex);
        }
        
    }

}finally{
    log.info("UpdateCityWeatherJob stopped.");
}
See Also

Related REST APIs

[Create / Update Job(ref:upsertjob)
Get Job
Delete Job
Count Jobs
List Jobs
Set Job State
Count Running Jobs
List Running Jobs
Start Job
Restart Job
Count All Running Jobs