#!/usr/bin/python # # check_welle-cli by remco_k # Version: 1.0 # Date: 2021-01-02 # This check reads the json output of welle-cli (Part of welle.io, a SDR DAB+ receiver) and checks if the audio of the selected DAB+ service (sid) is present at some level, along with some other DAB+ service parameters (bitrate, allotment, snr, dls). # This is a Python script. Python3 is needed. import os, sys, getopt from urllib.request import urlopen from urllib import request, parse import json from datetime import datetime import math import time # Some defaults: url=None # 'http://192.168.1.45/' or file:///C:/path/to/' sid=None # "0x8064" audio_low_warn=None #-30 audio_low_crit=None #-40 audio_high_warn=None #-3.5 audio_high_crit=None #-3.0 time_secs_warn=None # 60 time_secs_crit=None # 600 bitrate_warn=None # 71 bitrate_crit=None # 71 dls_secs_warn=None # 3600 dls_secs_crit=None # 7200 snr_warn=None snr_crit=None channel=None # None=current channel where welle-cli is tuned into channelupdate=False verbose=False stat_strings=["OK", "WARNING", "CRITICAL"] def main(argv): global url, audio_low_warn, audio_low_crit, audio_low_warn, audio_low_crit, audio_high_warn, audio_high_crit, sid, time_secs_warn,time_secs_crit,bitrate_warn,bitrate_crit,dls_secs_warn,dls_secs_crit, snr_warn, snr_crit, channel, channelupdate, verbose try: opts, args = getopt.getopt(argv,"h", ["url=","file=", "audio_low_warn=", "audio_low_crit=", "audio_low_warn=", "audio_low_crit=", "audio_high_warn=", "audio_high_crit=", "sid=", "time_secs_warn=","time_secs_crit=","bitrate_warn=","bitrate_crit=","dls_secs_warn=","dls_secs_crit=", "snr_warn=", "snr_crit=", "channel=", "channelupdate", "verbose"]) except getopt.GetoptError as e: print("Invalid option: %r" % e) showhelp() for opt, arg in opts: if opt == '-h': showhelp() elif opt in ("--url"): url = arg elif opt in ("--file"): url = arg elif opt in ("--audio_low_warn"): audio_low_warn = float(arg) elif opt in ("--audio_low_crit"): audio_low_crit = float(arg) elif opt in ("--audio_high_warn"): audio_high_warn = float(arg) elif opt in ("--audio_high_crit"): audio_high_crit = float(arg) elif opt in ("--sid"): sid = arg elif opt in ("--time_secs_warn"): time_secs_warn = int(arg) elif opt in ("--time_secs_crit"): time_secs_crit = int(arg) elif opt in ("--bitrate_warn"): bitrate_warn = int(arg) elif opt in ("--bitrate_crit"): bitrate_crit = int(arg) elif opt in ("--dls_secs_warn"): dls_secs_warn = int(arg) elif opt in ("--dls_secs_crit"): dls_secs_crit = int(arg) elif opt in ("--snr_warn"): snr_warn = float(arg) elif opt in ("--snr_crit"): snr_crit = float(arg) elif opt in ("--channel"): channel = arg elif opt in ("--channelupdate"): channelupdate = True elif opt in ("--verbose"): verbose = True # run some checks on the CLI params # Mandatory params: cli_error=False if sid==None: print("Error: Missing --sid") cli_error=True if url==None: print("Error: Missing --url or --file") cli_error=True if cli_error==True: showhelp() try: runcheck() except Exception as e: print("UNKNOWN - check_welle-cli Unexpected error: %r" % e) sys.exit(3) def showhelp(): print ("check_welle-cli.py v1.0 by Remco Kuijer (remco_k) at 02-01-2021") print ("This check reads the json output of welle-cli and checks if the audio of the selected DAB+ service (sid) is present, along with some other parameters.") print ("-h Shows this help") print ("") print("Mandatory") print ("--sid Service ID (SID). E.g. 0x8064") print ("--url The url to open. E.g. http://ip_of_welle_cli/") print ("--file For testing purposes, to open a json file. E.g. file:///c:/path/to/") print (" (--url or --file is mandatory)") print("Optional") print("--audio_low_warn Audio warning level threshold in dBFS. If audio is BELOW this threshold then a warning state is returned. E.g. -40.0") print("--audio_low_crit Same, but for critical level") print("--audio_high_warn Audio warning level threshold in dBFS. If audio is ABOVE this threshold then a warning state is returned. E.g. -3.5") print("--audio_high_crit Same, but for critical level") print("--time_secs_warn Define the max age threshold in seconds of the last audio update before a warning state is returned. E.g. 60") print("--time_secs_crit Same, but for critical level") print("--bitrate_warn Define the bitrate warning threshold in kbps. E.g. 71 (a 72 kbps stream will return OK then)") print("--bitrate_crit Same, but for critical level") print("--dls_secs_warn Define the max age threshold in seconds of the last Dynamic Label Segment (DLS) update before a warning state is returned. E.g. 3600") print("--dls_secs_crit Same, but for critical level") print("--snr_warn Define the Signal to Noise Ratio (SNR) when a warning state is returned. E.g. 6.0") print("--snr_crit Same, but for critical level") print("--channel Set welle-cli to this channel. E.g. 7B. If this parameter is not set, the current welle-cli channel is used to find the SID. If welle-cli needs to switch channels then this function might take several seconds waiting for welle-cli to demux the mux on that channel") print("--channelupdate Test mode. If set, the channel will always be updated to welle-cli, regardless if a real change of the channel is needed") print("--verbose Test mode. Verbose output") sys.exit() def sampleTodBFS(sSample, iScale): if sSample<0: return float('-inf') else: return -(20 * math.log10(float(iScale))-20 * math.log10(sSample+1)) def checkChannel(): global channel, channelupdate response=urlopen(url+"channel", timeout=1) current_channel=response.read().decode('utf-8') if channelupdate==True: if channel!="5A": switchChannel("5A") else: switchChannel("5B") time.sleep(1) if channel!=None and current_channel!=channel: switchChannel(channel) waitForEnsembleID() waitForServiceAndAudio(sid) elif channelupdate==True: switchChannel(current_channel) waitForEnsembleID() waitForServiceAndAudio(sid) return current_channel def switchChannel(channel): # We need to switch channels by doing a post to welle-cli req = request.Request(url+"channel", data=channel.encode('utf-8')) request.urlopen(req, timeout=10) def waitForEnsembleID(): ensembleID=None channelchange_timeout=60 #secs while ensembleID==None or ensembleID=="0x0000": channelchange_timeout=channelchange_timeout-1 if channelchange_timeout<=0: raise Exception("Timeout waiting for ensemble id") if verbose==True: print("Waiting for ensembleID...") time.sleep(1) response = urlopen(url+"mux.json", timeout=5) string = response.read().decode('utf-8') mux = json.loads(string) ensembleID=mux["ensemble"]["id"] def waitForServiceAndAudio(sid): waitservice_timeout=60 #secs while True: waitservice_timeout=waitservice_timeout-1 if waitservice_timeout<=0: raise Exception("Timeout waiting for ensemble id") if verbose==True: print("Waiting for SID audio") time.sleep(1) response = urlopen(url+"mux.json", timeout=5) string = response.read().decode('utf-8') mux = json.loads(string) for service in mux['services']: if service['sid']==sid and service['audiolevel']['left']!=-1 and service['audiolevel']['right']!=-1: return def runcheck(): current_channel=checkChannel() response = urlopen(url+"mux.json", timeout=5) string = response.read().decode('utf-8') mux = json.loads(string) snr=mux['demodulator']['snr'] for service in mux['services']: if service['sid']==sid: al=service['audiolevel']['left'] ar=service['audiolevel']['right'] t=service['audiolevel']['time'] dls_t=service['dls']['time'] bitrate=service['components'][0]['subchannel']['bitrate'] audiolevel=sampleTodBFS(min(al,ar), 32768) dt = datetime.fromtimestamp(t) delta_timestamp = datetime.timestamp(datetime.now()) - t delta_dls= datetime.timestamp(datetime.now()) - dls_t dt_dls=datetime.fromtimestamp(dls_t) status=3 # 0=ok, 1=warning, 2=critical, 3=unknown crit=[] warn=[] if audio_low_crit!=None and audiolevel<=audio_low_crit: crit.append("Audiolevel too low") elif audio_low_warn!=None and audiolevel<=audio_low_warn: warn.append("Audiolevel too low") if audio_high_crit!=None and audiolevel>=audio_high_crit: crit.append("Audiolevel too high") elif audio_high_warn!=None and audiolevel>=audio_high_warn: warn.append("Audiolevel too high") if time_secs_crit!=None and delta_timestamp>=time_secs_crit: crit.append("Audio update time") elif time_secs_warn!=None and delta_timestamp>=time_secs_warn: warn.append("Audio update time") if bitrate_crit!=None and bitrate<=bitrate_crit: crit.append("Bitrate") elif bitrate_warn!=None and bitrate<=bitrate_warn: warn.append("Bitrate") if dls_secs_crit!=None and delta_dls>=dls_secs_crit: crit.append("DLS") elif dls_secs_warn!=None and delta_dls>=dls_secs_warn: warn.append("DLS") if snr_crit!=None and snr<=snr_crit: crit.append("SNR") elif snr_warn!=None and snr<=snr_warn: warn.append("SNR") if len(crit)>0: status=2 elif len(warn)>0: status=1 else: status=0 print ("%s -" % (stat_strings[status]), end='') if len(crit)>0: print(" Critical: %s" % (crit) , end='') if len(warn)>0: print(" Warning: %s" % (warn ) , end='') print (" DAB+ Service %s (%s) has audiolevel %.1f dBFS at bitrate %d kbps. Audiolevel updated %d seconds ago at: %s. DLS updated %d seconds ago at: %s. SNR: %.1f. Channel: %s." % (sid, service['label']['label'].strip(), audiolevel, bitrate, delta_timestamp, dt, delta_dls, dt_dls, snr, current_channel), end='') print("") sys.exit(status) if __name__ == "__main__": main(sys.argv[1:]) print ("UNKNOWN - Service %s not found." % sid) sys.exit(3)