commit a1e0d0b4b07e1c547dab94d1808f790ceffb6198 Author: J. Fernando Sánchez Date: Thu Feb 22 17:38:21 2018 +0100 First commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..cee813a --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +Testing three different MQTT brokers: mosquitto, hbmqtt (python) and volantmq (go). + +The compose file sets up the three brokers, and two clients (a producer and a consumer). + +## Setup + +To launch the three brokers, run: + +``` +docker-compose up mosquitto pybroker gobroker +``` + +The brokers can be configured to use different brokers with the `MQTT_HOST` environment variable (see below). + +## Mosquitto + +``` +docker-compose run -e MQTT_HOST=mosquitto consumer +``` + +``` +docker-compose run -e MQTT_HOST=mosquitto producer +``` + +## HBMQTT (python) + +This broker is using BoltDB (an embedded key-value store) for persistence. + +``` +docker-compose run -e MQTT_HOST=pybroker consumer +``` + +``` +docker-compose run -e MQTT_HOST=pybroker producer +``` + +## VolantMQ (Go) + +``` +docker-compose run -e MQTT_HOST=gobroker consumer +``` + +``` +docker-compose run -e MQTT_HOST=gobroker producer +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5546fe0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,42 @@ +version: '2.1' +services: + mosquitto: + image: eclipse-mosquitto + ports: + - "1883:1883" + - "9001:9001" + volumes: + - "./mosquitto.conf:/mosquitto/config/mosquitto.conf" + - "./data:/mosquitto/data" + - "./logs:/mosquitto/log" + consumer: + build: python-clients + image: mqtt-test + tty: true + stdin_open: true + command: + - 'python' + - 'mqtt-consumer.py' + environment: + MQTT_HOST: '${MQTT_HOST:-pybroker}' + producer: + build: python-clients + image: mqtt-test + tty: true + stdin_open: true + command: ['python', 'mqtt-producer.py'] + environment: + MQTT_HOST: '${MQTT_HOST:-pybroker}' + + pybroker: + build: python-broker + image: mqtt-pybroker-test + tty: true + stdin_open: true + command: + - 'python' + - 'broker.py' + + gobroker: + build: golang + image: mqtt-gobroker-test diff --git a/golang/Dockerfile b/golang/Dockerfile new file mode 100644 index 0000000..47f284f --- /dev/null +++ b/golang/Dockerfile @@ -0,0 +1,28 @@ +FROM registry.hub.docker.com/library/golang:1.9 + +RUN curl -fsSL -o /usr/local/bin/dep https://github.com/golang/dep/releases/download/v0.4.1/dep-linux-amd64 && chmod +x /usr/local/bin/dep + +RUN mkdir -p /go/src/mqtt-poc/ +WORKDIR /go/src/mqtt-poc + +COPY Gopkg.toml Gopkg.lock ./ +# copies the Gopkg.toml and Gopkg.lock to WORKDIR + +RUN dep ensure -vendor-only + +COPY . /go/src/mqtt-poc + +RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app . + +FROM alpine:latest +RUN apk --no-cache add ca-certificates + +WORKDIR /root/ + +RUN mkdir -p /root/conf + +ADD conf/config.json /root/conf/config.json + +COPY --from=0 /go/src/mqtt-poc/app . + +ENTRYPOINT ["./app"] \ No newline at end of file diff --git a/golang/Gopkg.lock b/golang/Gopkg.lock new file mode 100644 index 0000000..bf6d1ad --- /dev/null +++ b/golang/Gopkg.lock @@ -0,0 +1,191 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/VolantMQ/volantmq" + packages = [ + ".", + "auth", + "clients", + "configuration", + "connection", + "packet", + "persistence", + "persistence/boltdb", + "routines", + "subscriber", + "systree", + "topics", + "topics/mem", + "topics/types", + "transport", + "types" + ] + revision = "e7adeb65d1ddc075730d0a8c7eb9ffc43ba7e068" + version = "v0.0.3" + +[[projects]] + name = "github.com/boltdb/bolt" + packages = ["."] + revision = "2f1ce7a837dcb8da3ec595b1dac9d0632f0f99e8" + version = "v1.3.1" + +[[projects]] + name = "github.com/fsnotify/fsnotify" + packages = ["."] + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + version = "v1.4.7" + +[[projects]] + name = "github.com/gorilla/websocket" + packages = ["."] + revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b" + version = "v1.2.0" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/hcl" + packages = [ + ".", + "hcl/ast", + "hcl/parser", + "hcl/scanner", + "hcl/strconv", + "hcl/token", + "json/parser", + "json/scanner", + "json/token" + ] + revision = "23c074d0eceb2b8a5bfdbb271ab780cde70f05a8" + +[[projects]] + name = "github.com/magiconair/properties" + packages = ["."] + revision = "c3beff4c2358b44d0493c7dda585e7db7ff28ae6" + version = "v1.7.6" + +[[projects]] + branch = "master" + name = "github.com/mitchellh/mapstructure" + packages = ["."] + revision = "00c29f56e2386353d58c599509e8dc3801b0d716" + +[[projects]] + name = "github.com/pborman/uuid" + packages = ["."] + revision = "e790cca94e6cc75c7064b1332e63811d4aae1a53" + version = "v1.1" + +[[projects]] + name = "github.com/pelletier/go-toml" + packages = ["."] + revision = "acdc4509485b587f5e675510c4f2c63e90ff68a8" + version = "v1.1.0" + +[[projects]] + name = "github.com/spf13/afero" + packages = [ + ".", + "mem" + ] + revision = "bb8f1927f2a9d3ab41c9340aa034f6b803f4359c" + version = "v1.0.2" + +[[projects]] + name = "github.com/spf13/cast" + packages = ["."] + revision = "8965335b8c7107321228e3e3702cab9832751bac" + version = "v1.2.0" + +[[projects]] + branch = "master" + name = "github.com/spf13/jwalterweatherman" + packages = ["."] + revision = "7c0cea34c8ece3fbeb2b27ab9b59511d360fb394" + +[[projects]] + name = "github.com/spf13/pflag" + packages = ["."] + revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66" + version = "v1.0.0" + +[[projects]] + name = "github.com/spf13/viper" + packages = ["."] + revision = "25b30aa063fc18e48662b86996252eabdcf2f0c7" + version = "v1.0.0" + +[[projects]] + branch = "master" + name = "github.com/troian/easygo" + packages = ["netpoll"] + revision = "b69abaaf8a8503f3102d80d17d20515041bf9ef5" + +[[projects]] + branch = "master" + name = "github.com/troian/omap" + packages = ["."] + revision = "bf92c506ddb0f2cc90f2ed8aab7dad6dce349194" + +[[projects]] + name = "go.uber.org/atomic" + packages = ["."] + revision = "8474b86a5a6f79c443ce4b2992817ff32cf208b8" + version = "v1.3.1" + +[[projects]] + name = "go.uber.org/multierr" + packages = ["."] + revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a" + version = "v1.1.0" + +[[projects]] + name = "go.uber.org/zap" + packages = [ + ".", + "buffer", + "internal/bufferpool", + "internal/color", + "internal/exit", + "zapcore" + ] + revision = "35aad584952c3e7020db7b839f6b102de6271f89" + version = "v1.7.1" + +[[projects]] + branch = "master" + name = "golang.org/x/sync" + packages = ["syncmap"] + revision = "fd80eb99c8f653c847d294a001bdf2a3a6f768f5" + +[[projects]] + branch = "master" + name = "golang.org/x/sys" + packages = ["unix"] + revision = "37707fdb30a5b38865cfb95e5aab41707daec7fd" + +[[projects]] + name = "golang.org/x/text" + packages = [ + "internal/gen", + "internal/triegen", + "internal/ucd", + "transform", + "unicode/cldr", + "unicode/norm" + ] + revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" + version = "v0.3.0" + +[[projects]] + branch = "v2" + name = "gopkg.in/yaml.v2" + packages = ["."] + revision = "d670f9405373e636a5a2765eea47fac0c9bc91a4" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "2182cc4d542c75e5f2cb933cdb6e7d40d6f75fa432f66b0d0876683eb5b97dd2" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/golang/Gopkg.toml b/golang/Gopkg.toml new file mode 100644 index 0000000..6ad3143 --- /dev/null +++ b/golang/Gopkg.toml @@ -0,0 +1,37 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" + + +[[constraint]] + name = "github.com/VolantMQ/volantmq" + version = "0.0.3" + +[[constraint]] + name = "github.com/spf13/viper" + version = "1.0.0" + +[[constraint]] + name = "go.uber.org/zap" + version = "1.7.1" + +[[constraint]] + name = "github.com/boltdb/bolt" + version = "1.3.1" diff --git a/golang/auth.go b/golang/auth.go new file mode 100644 index 0000000..e81c604 --- /dev/null +++ b/golang/auth.go @@ -0,0 +1,21 @@ +package main + +import "github.com/VolantMQ/volantmq/auth" + +type internalAuth struct { + creds map[string]string +} + +func (a internalAuth) Password(user, password string) auth.Status { + if hash, ok := a.creds[user]; ok { + if password == hash { + return auth.StatusAllow + } + } + return auth.StatusDeny +} + +// nolint: golint +func (a internalAuth) ACL(clientID, user, topic string, access auth.AccessType) auth.Status { + return auth.StatusAllow +} diff --git a/golang/conf/config.json b/golang/conf/config.json new file mode 100644 index 0000000..3255e63 --- /dev/null +++ b/golang/conf/config.json @@ -0,0 +1,16 @@ +{ + "mqtt" : { + "auth" : { + "internal" : [ + { + "user" : "testuser", + "password" : "testpassword" + }, + { + "user" : "", + "password" : "" + } + ] + } + } +} diff --git a/golang/volant.go b/golang/volant.go new file mode 100644 index 0000000..3f8c992 --- /dev/null +++ b/golang/volant.go @@ -0,0 +1,143 @@ +// Copyright (c) 2017 The VolantMQ Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "net/http" + "os" + "os/signal" + "runtime" + "syscall" + + "github.com/VolantMQ/volantmq" + "github.com/VolantMQ/volantmq/auth" + "github.com/VolantMQ/volantmq/configuration" + "github.com/VolantMQ/volantmq/persistence/boltdb" + "github.com/VolantMQ/volantmq/transport" + "github.com/spf13/viper" + "go.uber.org/zap" + + _ "net/http/pprof" + _ "runtime/debug" +) + +func main() { + ops := configuration.Options{ + LogWithTs: false, + } + + configuration.Init(ops) + + logger := configuration.GetLogger().Named("example") + + var err error + + logger.Info("Starting application") + logger.Info("Allocated cores", zap.Int("GOMAXPROCS", runtime.GOMAXPROCS(0))) + viper.SetConfigName("config") + viper.AddConfigPath("conf") + viper.SetConfigType("json") + + go func() { + http.ListenAndServe("localhost:6061", nil) // nolint: errcheck + }() + + logger.Info("Initializing configs") + if err = viper.ReadInConfig(); err != nil { + logger.Error("Couldn't read config file", zap.Error(err)) + os.Exit(1) + } + + ia := internalAuth{ + creds: make(map[string]string), + } + + var internalCreds []struct { + User string `json:"user"` + Password string `json:"password"` + } + + if err = viper.UnmarshalKey("mqtt.auth.internal", &internalCreds); err != nil { + logger.Error("Couldn't unmarshal config", zap.Error(err)) + os.Exit(1) + } + + for i := range internalCreds { + ia.creds[internalCreds[i].User] = internalCreds[i].Password + } + + if err = auth.Register("internal", ia); err != nil { + logger.Error("Couldn't register *internal* auth provider", zap.Error(err)) + os.Exit(1) + } + + var srv volantmq.Server + + listenerStatus := func(id string, status string) { + logger.Info("Listener status", zap.String("id", id), zap.String("status", status)) + } + + serverConfig := volantmq.NewServerConfig() + serverConfig.OfflineQoS0 = true + serverConfig.TransportStatus = listenerStatus + serverConfig.AllowDuplicates = true + serverConfig.Authenticators = "internal" + + serverConfig.Persistence, err = boltdb.New(&boltdb.Config{ + File: "./persist.db", + }) + + if err != nil { + logger.Error("Couldn't init BoltDB persistence", zap.Error(err)) + os.Exit(1) + } + + srv, err = volantmq.NewServer(serverConfig) + + if err != nil { + logger.Error("Couldn't create server", zap.Error(err)) + os.Exit(1) + } + + var authMng *auth.Manager + + if authMng, err = auth.NewManager("internal"); err != nil { + logger.Error("Couldn't register *amqp* auth provider", zap.Error(err)) + return + } + + config := transport.NewConfigTCP( + &transport.Config{ + Port: "1883", + AuthManager: authMng, + }) + + if err = srv.ListenAndServe(config); err != nil { + logger.Error("Couldn't start listener", zap.Error(err)) + } + + go http.ListenAndServe(":6061", nil) // nolint: errcheck + + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + sig := <-ch + logger.Info("Received signal", zap.String("signal", sig.String())) + + if err = srv.Close(); err != nil { + logger.Error("Couldn't shutdown server", zap.Error(err)) + } + + os.Remove("./persist.db") // nolint: errcheck +} diff --git a/mosquitto.conf b/mosquitto.conf new file mode 100644 index 0000000..d33a1a7 --- /dev/null +++ b/mosquitto.conf @@ -0,0 +1,3 @@ +persistence true +persistence_location /mosquitto/data/ +log_dest file /mosquitto/log/mosquitto.log \ No newline at end of file diff --git a/python-broker/Dockerfile b/python-broker/Dockerfile new file mode 100644 index 0000000..58d645c --- /dev/null +++ b/python-broker/Dockerfile @@ -0,0 +1 @@ +FROM python:onbuild \ No newline at end of file diff --git a/python-broker/broker.py b/python-broker/broker.py new file mode 100644 index 0000000..356f4c1 --- /dev/null +++ b/python-broker/broker.py @@ -0,0 +1,46 @@ +import logging +import asyncio +import os +from hbmqtt.broker import Broker + +logger = logging.getLogger(__name__) + +config = { + 'listeners': { + 'default': { + 'type': 'tcp', + 'bind': '0.0.0.0:1883', + }, + 'ws-mqtt': { + 'bind': '127.0.0.1:8080', + 'type': 'ws', + 'max_connections': 10, + }, + }, + 'sys_interval': 10, + 'auth': { + 'allow-anonymous': True, + 'password-file': os.path.join(os.path.dirname(os.path.realpath(__file__)), "passwd"), + 'plugins': [ + 'auth_file', 'auth_anonymous' + ] + + } +} + +broker = Broker(config) + + +@asyncio.coroutine +def test_coro(): + yield from broker.start() + #yield from asyncio.sleep(5) + #yield from broker.shutdown() + + +if __name__ == '__main__': + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + #formatter = "%(asctime)s :: %(levelname)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.get_event_loop().run_forever() diff --git a/python-broker/passwd b/python-broker/passwd new file mode 100644 index 0000000..ebb3835 --- /dev/null +++ b/python-broker/passwd @@ -0,0 +1 @@ +test:$6$l4zQEHEcowc1Pnv4$HHrh8xnsZoLItQ8BmpFHM4r6q5UqK3DnXp2GaTm5zp5buQ7NheY3Xt9f6godVKbEtA.hOC7IEDwnok3pbAOip. \ No newline at end of file diff --git a/python-broker/requirements.txt b/python-broker/requirements.txt new file mode 100644 index 0000000..cb79df5 --- /dev/null +++ b/python-broker/requirements.txt @@ -0,0 +1 @@ +hbmqtt diff --git a/python-clients/Dockerfile b/python-clients/Dockerfile new file mode 100644 index 0000000..58d645c --- /dev/null +++ b/python-clients/Dockerfile @@ -0,0 +1 @@ +FROM python:onbuild \ No newline at end of file diff --git a/python-clients/mqtt-consumer.py b/python-clients/mqtt-consumer.py new file mode 100644 index 0000000..3d14122 --- /dev/null +++ b/python-clients/mqtt-consumer.py @@ -0,0 +1,33 @@ +import paho.mqtt.client as mqtt +import os + +HOST = os.environ.get('MQTT_HOST', 'localhost') + +print('Connecting to %s' %HOST) + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, rc): + print("Connected with result code "+str(rc)) + + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + # + # QOS: 0) no confirmation; 1) at least once; 2) at most once + + client.subscribe("/gsi/#", qos=2) + +# The callback for when a PUBLISH message is received from the server. +def on_message(client, userdata, msg): + print(msg.topic+" "+str(msg.payload)) + +client = mqtt.Client(client_id='mqtt-server', clean_session=False) +client.on_connect = on_connect +client.on_message = on_message + +client.connect(HOST, 1883, 60) + +# Blocking call that processes network traffic, dispatches callbacks and +# handles reconnecting. +# Other loop*() functions are available that give a threaded interface and a +# manual interface. +client.loop_forever() diff --git a/python-clients/mqtt-producer.py b/python-clients/mqtt-producer.py new file mode 100644 index 0000000..4716949 --- /dev/null +++ b/python-clients/mqtt-producer.py @@ -0,0 +1,19 @@ +import paho.mqtt.publish as publish +import os +import time +import socket + +HOSTNAME = socket.gethostname() + +HOST = os.environ.get('MQTT_HOST', 'localhost') + +print('Connecting to %s' %HOST) + +topic = "/gsi/test/multiple" + +i = 0 +while True: + print('%s pushing msg %s' %(HOSTNAME, i)) + publish.single(topic=topic, payload='%s says %s' %(HOSTNAME, i), qos=1, hostname=HOST) + i += 1 + time.sleep(2) diff --git a/python-clients/requirements.txt b/python-clients/requirements.txt new file mode 100644 index 0000000..8579e8b --- /dev/null +++ b/python-clients/requirements.txt @@ -0,0 +1 @@ +paho-mqtt