1
0
mirror of https://github.com/balkian/mqtt-poc.git synced 2025-01-08 04:11:30 +00:00

First commit

This commit is contained in:
J. Fernando Sánchez 2018-02-22 17:38:21 +01:00
commit a1e0d0b4b0
17 changed files with 629 additions and 0 deletions

45
README.md Normal file
View File

@ -0,0 +1,45 @@
Testing three different MQTT brokers: mosquitto, hbmqtt (python) and volantmq (go).
The compose file sets up the three brokers, and two clients (a producer and a consumer).
## Setup
To launch the three brokers, run:
```
docker-compose up mosquitto pybroker gobroker
```
The brokers can be configured to use different brokers with the `MQTT_HOST` environment variable (see below).
## Mosquitto
```
docker-compose run -e MQTT_HOST=mosquitto consumer
```
```
docker-compose run -e MQTT_HOST=mosquitto producer
```
## HBMQTT (python)
This broker is using BoltDB (an embedded key-value store) for persistence.
```
docker-compose run -e MQTT_HOST=pybroker consumer
```
```
docker-compose run -e MQTT_HOST=pybroker producer
```
## VolantMQ (Go)
```
docker-compose run -e MQTT_HOST=gobroker consumer
```
```
docker-compose run -e MQTT_HOST=gobroker producer
```

42
docker-compose.yml Normal file
View File

@ -0,0 +1,42 @@
version: '2.1'
services:
mosquitto:
image: eclipse-mosquitto
ports:
- "1883:1883"
- "9001:9001"
volumes:
- "./mosquitto.conf:/mosquitto/config/mosquitto.conf"
- "./data:/mosquitto/data"
- "./logs:/mosquitto/log"
consumer:
build: python-clients
image: mqtt-test
tty: true
stdin_open: true
command:
- 'python'
- 'mqtt-consumer.py'
environment:
MQTT_HOST: '${MQTT_HOST:-pybroker}'
producer:
build: python-clients
image: mqtt-test
tty: true
stdin_open: true
command: ['python', 'mqtt-producer.py']
environment:
MQTT_HOST: '${MQTT_HOST:-pybroker}'
pybroker:
build: python-broker
image: mqtt-pybroker-test
tty: true
stdin_open: true
command:
- 'python'
- 'broker.py'
gobroker:
build: golang
image: mqtt-gobroker-test

28
golang/Dockerfile Normal file
View File

@ -0,0 +1,28 @@
FROM registry.hub.docker.com/library/golang:1.9
RUN curl -fsSL -o /usr/local/bin/dep https://github.com/golang/dep/releases/download/v0.4.1/dep-linux-amd64 && chmod +x /usr/local/bin/dep
RUN mkdir -p /go/src/mqtt-poc/
WORKDIR /go/src/mqtt-poc
COPY Gopkg.toml Gopkg.lock ./
# copies the Gopkg.toml and Gopkg.lock to WORKDIR
RUN dep ensure -vendor-only
COPY . /go/src/mqtt-poc
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o app .
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
RUN mkdir -p /root/conf
ADD conf/config.json /root/conf/config.json
COPY --from=0 /go/src/mqtt-poc/app .
ENTRYPOINT ["./app"]

191
golang/Gopkg.lock generated Normal file
View File

@ -0,0 +1,191 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
name = "github.com/VolantMQ/volantmq"
packages = [
".",
"auth",
"clients",
"configuration",
"connection",
"packet",
"persistence",
"persistence/boltdb",
"routines",
"subscriber",
"systree",
"topics",
"topics/mem",
"topics/types",
"transport",
"types"
]
revision = "e7adeb65d1ddc075730d0a8c7eb9ffc43ba7e068"
version = "v0.0.3"
[[projects]]
name = "github.com/boltdb/bolt"
packages = ["."]
revision = "2f1ce7a837dcb8da3ec595b1dac9d0632f0f99e8"
version = "v1.3.1"
[[projects]]
name = "github.com/fsnotify/fsnotify"
packages = ["."]
revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9"
version = "v1.4.7"
[[projects]]
name = "github.com/gorilla/websocket"
packages = ["."]
revision = "ea4d1f681babbce9545c9c5f3d5194a789c89f5b"
version = "v1.2.0"
[[projects]]
branch = "master"
name = "github.com/hashicorp/hcl"
packages = [
".",
"hcl/ast",
"hcl/parser",
"hcl/scanner",
"hcl/strconv",
"hcl/token",
"json/parser",
"json/scanner",
"json/token"
]
revision = "23c074d0eceb2b8a5bfdbb271ab780cde70f05a8"
[[projects]]
name = "github.com/magiconair/properties"
packages = ["."]
revision = "c3beff4c2358b44d0493c7dda585e7db7ff28ae6"
version = "v1.7.6"
[[projects]]
branch = "master"
name = "github.com/mitchellh/mapstructure"
packages = ["."]
revision = "00c29f56e2386353d58c599509e8dc3801b0d716"
[[projects]]
name = "github.com/pborman/uuid"
packages = ["."]
revision = "e790cca94e6cc75c7064b1332e63811d4aae1a53"
version = "v1.1"
[[projects]]
name = "github.com/pelletier/go-toml"
packages = ["."]
revision = "acdc4509485b587f5e675510c4f2c63e90ff68a8"
version = "v1.1.0"
[[projects]]
name = "github.com/spf13/afero"
packages = [
".",
"mem"
]
revision = "bb8f1927f2a9d3ab41c9340aa034f6b803f4359c"
version = "v1.0.2"
[[projects]]
name = "github.com/spf13/cast"
packages = ["."]
revision = "8965335b8c7107321228e3e3702cab9832751bac"
version = "v1.2.0"
[[projects]]
branch = "master"
name = "github.com/spf13/jwalterweatherman"
packages = ["."]
revision = "7c0cea34c8ece3fbeb2b27ab9b59511d360fb394"
[[projects]]
name = "github.com/spf13/pflag"
packages = ["."]
revision = "e57e3eeb33f795204c1ca35f56c44f83227c6e66"
version = "v1.0.0"
[[projects]]
name = "github.com/spf13/viper"
packages = ["."]
revision = "25b30aa063fc18e48662b86996252eabdcf2f0c7"
version = "v1.0.0"
[[projects]]
branch = "master"
name = "github.com/troian/easygo"
packages = ["netpoll"]
revision = "b69abaaf8a8503f3102d80d17d20515041bf9ef5"
[[projects]]
branch = "master"
name = "github.com/troian/omap"
packages = ["."]
revision = "bf92c506ddb0f2cc90f2ed8aab7dad6dce349194"
[[projects]]
name = "go.uber.org/atomic"
packages = ["."]
revision = "8474b86a5a6f79c443ce4b2992817ff32cf208b8"
version = "v1.3.1"
[[projects]]
name = "go.uber.org/multierr"
packages = ["."]
revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a"
version = "v1.1.0"
[[projects]]
name = "go.uber.org/zap"
packages = [
".",
"buffer",
"internal/bufferpool",
"internal/color",
"internal/exit",
"zapcore"
]
revision = "35aad584952c3e7020db7b839f6b102de6271f89"
version = "v1.7.1"
[[projects]]
branch = "master"
name = "golang.org/x/sync"
packages = ["syncmap"]
revision = "fd80eb99c8f653c847d294a001bdf2a3a6f768f5"
[[projects]]
branch = "master"
name = "golang.org/x/sys"
packages = ["unix"]
revision = "37707fdb30a5b38865cfb95e5aab41707daec7fd"
[[projects]]
name = "golang.org/x/text"
packages = [
"internal/gen",
"internal/triegen",
"internal/ucd",
"transform",
"unicode/cldr",
"unicode/norm"
]
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
branch = "v2"
name = "gopkg.in/yaml.v2"
packages = ["."]
revision = "d670f9405373e636a5a2765eea47fac0c9bc91a4"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "2182cc4d542c75e5f2cb933cdb6e7d40d6f75fa432f66b0d0876683eb5b97dd2"
solver-name = "gps-cdcl"
solver-version = 1

37
golang/Gopkg.toml Normal file
View File

@ -0,0 +1,37 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
[[constraint]]
name = "github.com/VolantMQ/volantmq"
version = "0.0.3"
[[constraint]]
name = "github.com/spf13/viper"
version = "1.0.0"
[[constraint]]
name = "go.uber.org/zap"
version = "1.7.1"
[[constraint]]
name = "github.com/boltdb/bolt"
version = "1.3.1"

21
golang/auth.go Normal file
View File

@ -0,0 +1,21 @@
package main
import "github.com/VolantMQ/volantmq/auth"
type internalAuth struct {
creds map[string]string
}
func (a internalAuth) Password(user, password string) auth.Status {
if hash, ok := a.creds[user]; ok {
if password == hash {
return auth.StatusAllow
}
}
return auth.StatusDeny
}
// nolint: golint
func (a internalAuth) ACL(clientID, user, topic string, access auth.AccessType) auth.Status {
return auth.StatusAllow
}

16
golang/conf/config.json Normal file
View File

@ -0,0 +1,16 @@
{
"mqtt" : {
"auth" : {
"internal" : [
{
"user" : "testuser",
"password" : "testpassword"
},
{
"user" : "",
"password" : ""
}
]
}
}
}

143
golang/volant.go Normal file
View File

@ -0,0 +1,143 @@
// Copyright (c) 2017 The VolantMQ Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
"github.com/VolantMQ/volantmq"
"github.com/VolantMQ/volantmq/auth"
"github.com/VolantMQ/volantmq/configuration"
"github.com/VolantMQ/volantmq/persistence/boltdb"
"github.com/VolantMQ/volantmq/transport"
"github.com/spf13/viper"
"go.uber.org/zap"
_ "net/http/pprof"
_ "runtime/debug"
)
func main() {
ops := configuration.Options{
LogWithTs: false,
}
configuration.Init(ops)
logger := configuration.GetLogger().Named("example")
var err error
logger.Info("Starting application")
logger.Info("Allocated cores", zap.Int("GOMAXPROCS", runtime.GOMAXPROCS(0)))
viper.SetConfigName("config")
viper.AddConfigPath("conf")
viper.SetConfigType("json")
go func() {
http.ListenAndServe("localhost:6061", nil) // nolint: errcheck
}()
logger.Info("Initializing configs")
if err = viper.ReadInConfig(); err != nil {
logger.Error("Couldn't read config file", zap.Error(err))
os.Exit(1)
}
ia := internalAuth{
creds: make(map[string]string),
}
var internalCreds []struct {
User string `json:"user"`
Password string `json:"password"`
}
if err = viper.UnmarshalKey("mqtt.auth.internal", &internalCreds); err != nil {
logger.Error("Couldn't unmarshal config", zap.Error(err))
os.Exit(1)
}
for i := range internalCreds {
ia.creds[internalCreds[i].User] = internalCreds[i].Password
}
if err = auth.Register("internal", ia); err != nil {
logger.Error("Couldn't register *internal* auth provider", zap.Error(err))
os.Exit(1)
}
var srv volantmq.Server
listenerStatus := func(id string, status string) {
logger.Info("Listener status", zap.String("id", id), zap.String("status", status))
}
serverConfig := volantmq.NewServerConfig()
serverConfig.OfflineQoS0 = true
serverConfig.TransportStatus = listenerStatus
serverConfig.AllowDuplicates = true
serverConfig.Authenticators = "internal"
serverConfig.Persistence, err = boltdb.New(&boltdb.Config{
File: "./persist.db",
})
if err != nil {
logger.Error("Couldn't init BoltDB persistence", zap.Error(err))
os.Exit(1)
}
srv, err = volantmq.NewServer(serverConfig)
if err != nil {
logger.Error("Couldn't create server", zap.Error(err))
os.Exit(1)
}
var authMng *auth.Manager
if authMng, err = auth.NewManager("internal"); err != nil {
logger.Error("Couldn't register *amqp* auth provider", zap.Error(err))
return
}
config := transport.NewConfigTCP(
&transport.Config{
Port: "1883",
AuthManager: authMng,
})
if err = srv.ListenAndServe(config); err != nil {
logger.Error("Couldn't start listener", zap.Error(err))
}
go http.ListenAndServe(":6061", nil) // nolint: errcheck
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
sig := <-ch
logger.Info("Received signal", zap.String("signal", sig.String()))
if err = srv.Close(); err != nil {
logger.Error("Couldn't shutdown server", zap.Error(err))
}
os.Remove("./persist.db") // nolint: errcheck
}

3
mosquitto.conf Normal file
View File

@ -0,0 +1,3 @@
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log

1
python-broker/Dockerfile Normal file
View File

@ -0,0 +1 @@
FROM python:onbuild

46
python-broker/broker.py Normal file
View File

@ -0,0 +1,46 @@
import logging
import asyncio
import os
from hbmqtt.broker import Broker
logger = logging.getLogger(__name__)
config = {
'listeners': {
'default': {
'type': 'tcp',
'bind': '0.0.0.0:1883',
},
'ws-mqtt': {
'bind': '127.0.0.1:8080',
'type': 'ws',
'max_connections': 10,
},
},
'sys_interval': 10,
'auth': {
'allow-anonymous': True,
'password-file': os.path.join(os.path.dirname(os.path.realpath(__file__)), "passwd"),
'plugins': [
'auth_file', 'auth_anonymous'
]
}
}
broker = Broker(config)
@asyncio.coroutine
def test_coro():
yield from broker.start()
#yield from asyncio.sleep(5)
#yield from broker.shutdown()
if __name__ == '__main__':
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
#formatter = "%(asctime)s :: %(levelname)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_forever()

1
python-broker/passwd Normal file
View File

@ -0,0 +1 @@
test:$6$l4zQEHEcowc1Pnv4$HHrh8xnsZoLItQ8BmpFHM4r6q5UqK3DnXp2GaTm5zp5buQ7NheY3Xt9f6godVKbEtA.hOC7IEDwnok3pbAOip.

View File

@ -0,0 +1 @@
hbmqtt

View File

@ -0,0 +1 @@
FROM python:onbuild

View File

@ -0,0 +1,33 @@
import paho.mqtt.client as mqtt
import os
HOST = os.environ.get('MQTT_HOST', 'localhost')
print('Connecting to %s' %HOST)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
#
# QOS: 0) no confirmation; 1) at least once; 2) at most once
client.subscribe("/gsi/#", qos=2)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client(client_id='mqtt-server', clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

View File

@ -0,0 +1,19 @@
import paho.mqtt.publish as publish
import os
import time
import socket
HOSTNAME = socket.gethostname()
HOST = os.environ.get('MQTT_HOST', 'localhost')
print('Connecting to %s' %HOST)
topic = "/gsi/test/multiple"
i = 0
while True:
print('%s pushing msg %s' %(HOSTNAME, i))
publish.single(topic=topic, payload='%s says %s' %(HOSTNAME, i), qos=1, hostname=HOST)
i += 1
time.sleep(2)

View File

@ -0,0 +1 @@
paho-mqtt