Arduino Code

ESP8266 aka NodeMCU Board

#include <Arduino.h>

#include <ESP8266WiFi.h>

#include <PubSubClient.h>

#include <ArduinoJson.h>

//Define Sensor Data PIN
const int sensorPin = 4; // variable for D2 pin

// The hall-effect flow sensor outputs approximately 4.5 pulses per second per
// litre/minute of flow.
float calibrationFactor = 4.5;
volatile byte pulseCount;
float flowRate;
unsigned int flowMilliLitres;
unsigned long totalMilliLitres;
unsigned long oldTime;

//SSID and PASSWORD for the AP 
const char * ssid = "<SSID>"; //your Wifi SSID
const char * password = "<PASSWORD>"; //your Wifi Password

// define Boodskap Platform MQTT configurations
#define DOMAIN_KEY "<DOMAIN KEY>" //your DOMAIN Key
#define API_KEY "<API KEY>" //Your API Key
#define DEVICE_ID "<DEVICE UNIQUE ID>" //Your DEVICE_ID
#define DEVICE_MODEL "<DEVICE MODEL>" //Your Device Model
#define FIRMWARE_VER "<FIRMWARE VERSION>" //Your Firmware Version
#define MSG_ID 10001 // Your Message ID 
#define REPORTING_INTERVAL 20000 // Reporting Interval

#define MQTT_SERVER "<MQTT SERVER URL>" // Platform MQTT URL
#define MQTT_PORT 1883
String mqttUser = "DEV_" + String(DOMAIN_KEY);
String mqttPassword = API_KEY;
String clientid = "DEV_" + String(DEVICE_ID);
uint32_t lastReport = 0;

String topic = "/" + String(DOMAIN_KEY) + "/device/" + String(DEVICE_ID) + "/msgs/" + String(DEVICE_MODEL) + "/" + String(FIRMWARE_VER) + "/" + String(MSG_ID);

// Initialise MQTT Client
WiFiClient espClient;
PubSubClient client(espClient);

void setup() {
  Serial.begin(115200);
  delay(10);
  Serial.println('\n');
  startWIFI();
  delay(1000);
  // Initialization of the variable “sensorPin” as INPUT (D2 pin)
  pinMode(sensorPin, INPUT);
  pulseCount = 0;
  flowRate = 0.0;
  flowMilliLitres = 0;
  totalMilliLitres = 0;
  oldTime = 0;

  client.setServer(MQTT_SERVER, MQTT_PORT);
  digitalWrite(sensorPin, HIGH);
  attachInterrupt(digitalPinToInterrupt(sensorPin), pulseCounter, RISING);
}

void loop() {
  if ((millis() - oldTime) > 1000) // Executed Every one sec
  {
    // Disable the interrupt while calculating flow rate
    detachInterrupt(sensorPin);
    flowRate = ((1000.0 / (millis() - oldTime)) * pulseCount) / calibrationFactor;
    oldTime = millis();

    // Divide the flow rate in litres/minute by 60 to determine how many litres have
    // passed through the sensor in this 1 second interval, then multiply by 1000 to
    // convert to millilitres.
    flowMilliLitres = (flowRate / 60) * 1000;

    // Add the millilitres passed in this second to the cumulative total
    totalMilliLitres += flowMilliLitres;

    // Print the flow rate for this second in litres / minute
    Serial.print("Flow rate: ");
    Serial.print(flowRate); // Print the integer part of the variable
    Serial.print("  Output Liquid Quantity: "); // Output separator
    Serial.print(totalMilliLitres);
    Serial.println("mL");

    // Message sending function is called as per the reporting interval
    if ((millis() - lastReport) >= REPORTING_INTERVAL) {
      sendStatusMessage(flowRate, totalMilliLitres);
      lastReport = millis();
    }

    // Reset the pulse counter so we can start incrementing again
    pulseCount = 0;

    // Enable the interrupt again now that we've finished sending output
    attachInterrupt(sensorPin, pulseCounter, RISING);
  }
}

//Interrupt Service Routine
ICACHE_RAM_ATTR void pulseCounter() {
  // Increment the pulse counter
  pulseCount++;
}
// Connect to the network
void startWIFI(void) {
  WiFi.begin(ssid, password);
  Serial.print("Connecting to ");
  Serial.print(ssid);
  Serial.println(" ...");
  int i = 0;
  while (WiFi.status() != WL_CONNECTED) { // Wait for the Wi-Fi to connect
    delay(1000);
    Serial.print(++i);
    Serial.print(' ');
  }
  Serial.println('\n');
  Serial.println("Connection established!");
  Serial.print("IP address:\t");
  Serial.println(WiFi.localIP());
}

// Send Messages to the platform
void sendStatusMessage(float flow_rate, float consumption) {
  StaticJsonDocument < 256 > data;
  if (flow_rate <= 0) {
    data["status"] = 0;
    data["flow_rate"] = flow_rate;
    data["total_consumption"] = consumption / 1000;
  } else {
    data["status"] = 1;
    data["flow_rate"] = flow_rate;
    data["total_consumption"] = consumption / 1000;
  }

  while (!client.connected()) {
    Serial.println("Connecting to MQTT...");
    Serial.println(clientid);
    if (client.connect(clientid.c_str(), mqttUser.c_str(), mqttPassword.c_str())) {
      Serial.println("connected");
    } else {
      Serial.println("failed with state ");
      Serial.println(client.state());
      delay(2000);
    }
  }
  char buffer[256];
  serializeJson(data, buffer);
  client.publish(topic.c_str(), buffer);
  client.disconnect();
}