#!usr/bin/env python # -*- coding: utf-8 -*- import os import ssl import json import paho.mqtt.client as mqtt from influxdb import InfluxDBClient # MQTTに関する設定です。環境変数から設定できるようにしています mqtt_host = os.getenv("MQTT_HOST" , "" ) mqtt_port = int(os.getenv("MQTT_PORT","8883")) mqtt_topic = os.getenv("MQTT_TOPIC" , "topic/test" ) mqtt_username = os.getenv("MQTT_USERNAME" , "" ) mqtt_passwd = os.getenv("MQTT_PASSWORD" , "" ) # InfluxDBに関する設定です。環境変数から設定できるようにしています influxdb_host = os.getenv("INFLUXDB_HOST" , "" ) influxdb_port = int(os.getenv("INFLUXDB_PORT","8086")) influxdb_database = os.getenv("INFLUXDB_DATABASE" ,"iotsample" ) influxdb_username = os.getenv("INFLUXDB_USERNAME" ,"" ) influxdb_passwd = os.getenv("INFLUXDB_PASSWORD" ,"" ) # MQTTサーバに接続した際の処理 def on_connect(client, userdata, flags, respons_code): print('status {0}'.format(respons_code)) # 状態をログに残します client.subscribe(mqtt_topic) # 指定のトピックの購読を開始します # MQTTサーバからメッセージを受信した際の処理 def on_message(client, userdata, msg): # 受信データをUTF-8で読み込み、JSONとしてパースします in_data = json.loads(str(msg.payload,'utf-8')) # InfluxDBに接続します influx_client = InfluxDBClient( host=influxdb_host,port=influxdb_port, username=influxdb_username,password=influxdb_passwd, database=influxdb_database) # 受信データのsensorsの項目分だけInfluxDBへ登録します # センサの値はすべてfloatに変換して保存します if "sensors" in in_data.keys(): for sensor in in_data["sensors"]: out_data = [ { "measurement": msg.topic, "time":in_data["timestamp"], "tags":{ "node":in_data["name"], "sensor": sensor }, "fields": dict( (k, float(v)) for k,v in in_data["sensors"][sensor].items()) } ] # InfluxDBへの書き込み res = influx_client.write_points(out_data) print("influxdb res = " + str(res)) # 実行のメイン if __name__ == '__main__': # MQTTのためにTLS/SSLの設定を行います sslContext = ssl.create_default_context() sslContext.check_hostname = False sslContext.verify_mode = ssl.CERT_NONE # MQTTのクライアントを設定し、接続を開始します client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.username_pw_set(mqtt_username,mqtt_passwd) client.tls_set_context(sslContext) client.tls_insecure_set(True) client.connect(mqtt_host, port=mqtt_port, keepalive=60) # アプリケーションが終了するまで購読し続けます client.loop_forever()