I have been using celery for a while now with RabbitMq broker.
I was using authentication all this time but i still didn't feel confident that that will keep it secure as it was over http. So i tried setting it up over https. It took some time but finally i was able to complete it:
mkdir testca
cd testca
mkdir certs private
chmod 700 private
echo 01 > serial
touch index.txt
# create a file openssl.cnf and add the content to it.
vim openssl.cnf
[ ca ]
default_ca = testca
[ testca ]
dir = .
certificate = $dir/cacert.pem
database = $dir/index.txt
new_certs_dir = $dir/certs
private_key = $dir/private/cakey.pem
serial = $dir/serial
default_crl_days = 7
default_days = 365
default_md = sha1
policy = testca_policy
x509_extensions = certificate_extensions
[ testca_policy ]
commonName = supplied
stateOrProvinceName = optional
countryName = optional
emailAddress = optional
organizationName = optional
organizationalUnitName = optional
[ certificate_extensions ]
basicConstraints = CA:false
[ req ]
default_bits = 2048
default_keyfile = ./private/cakey.pem
default_md = sha1
prompt = yes
distinguished_name = root_ca_distinguished_name
x509_extensions = root_ca_extensions
[ root_ca_distinguished_name ]
commonName = hostname
[ root_ca_extensions ]
basicConstraints = CA:true
keyUsage = keyCertSign, cRLSign
[ client_ca_extensions ]
basicConstraints = CA:false
keyUsage = digitalSignature
extendedKeyUsage = 1.3.6.1.5.5.7.3.2
[ server_ca_extensions ]
basicConstraints = CA:false
keyUsage = keyEncipherment
extendedKeyUsage = 1.3.6.1.5.5.7.3.1
#run these 2 commands they will generate the key file testca/cacert.pem
openssl req -x509 -config openssl.cnf -newkey rsa:2048 -days 365 \
-out cacert.pem -outform PEM -subj /CN=MyTestCA/ -nodes
openssl x509 -in cacert.pem -out cacert.cer -outform DER
#Now generate key for server:
cd ..
ls
testca
mkdir server
cd server
openssl genrsa -out key.pem 2048
openssl req -new -key key.pem -out req.pem -outform PEM \
-subj /CN=$(hostname)/O=server/ -nodes
cd ../testca
openssl ca -config openssl.cnf -in ../server/req.pem -out \
../server/cert.pem -notext -batch -extensions server_ca_extensions
cd ../server
openssl pkcs12 -export -out keycert.p12 -in cert.pem -inkey key.pem -passout pass:MySecretPassword
#chose and remember the MySecretPassword.
#Now generate key for client:
cd ..
ls
server testca
mkdir client
cd client
openssl genrsa -out key.pem 2048
openssl req -new -key key.pem -out req.pem -outform PEM \
-subj /CN=$(hostname)/O=client/ -nodes
cd ../testca
openssl ca -config openssl.cnf -in ../client/req.pem -out \
../client/cert.pem -notext -batch -extensions client_ca_extensions
cd ../client
openssl pkcs12 -export -out keycert.p12 -in cert.pem -inkey key.pem -passout pass:MySecretPassword
#Relevant files generated:
.
├── client
│ ├── cert.pem
│ └── key.pem
├── server
│ ├── cert.pem
│ └── key.pem
└── testca
└── cacert.pem
Rename files to :
.
├── client
│ ├── client_cert.pem
│ └── client_key.pem
├── server
│ ├── server_cert.pem
│ └── server_key.pem
└── testca
└── cacert.pem
copy these files to your rabbitmq server.
Move the files to this directory:
/etc/rabbitmq/ssl/certs/
client_cert.pem client_key.pem server_cert.pem server_key.pem cacert.pem
#Next configure the rabbitmq to use these files:
#create the config file:
vim /etc/rabbitmq/rabbitmq.config
%% -*- mode: erlang -*-
%% ----------------------------------------------------------------------------
%% RabbitMQ Sample Configuration File.
%%
%% See http://www.rabbitmq.com/configure.html for details.
%% ----------------------------------------------------------------------------
[
{ssl, [{versions, ['tlsv1.2', 'tlsv1.1']}]},
{rabbit,
[
{ssl_listeners, [5671]},
{ssl_options, [{cacertfile,"/etc/cert/cacert.pem"},
{certfile,"/etc/cert/cert.pem"},
{keyfile,"/etc/cert/key.pem"},
{password, "dataemocertpass1@"},
{verify, verify_peer},
{fail_if_no_peer_cert, true},
{versions, ['tlsv1.2', 'tlsv1.1']}]}
%%
%% Network Connectivity
%% ====================
%%
%% By default, RabbitMQ will listen on all interfaces, using
%% the standard (reserved) AMQP port.
%%
%% {tcp_listeners, [5672]},
%% To listen on a specific interface, provide a tuple of {IpAddress, Port}.
%% For example, to listen only on localhost for both IPv4 and IPv6:
%%
%% {tcp_listeners, [{"127.0.0.1", 5672},
%% {"::1", 5672}]},
%% SSL listeners are configured in the same fashion as TCP listeners,
%% including the option to control the choice of interface.
%%
%% {ssl_listeners, [5671]},
%% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
%% and SSL handshake), in milliseconds.
%%
%% {handshake_timeout, 10000},
%% Log levels (currently just used for connection logging).
%% One of 'debug', 'info', 'warning', 'error' or 'none', in decreasing
%% order of verbosity. Defaults to 'info'.
%%
%% {log_levels, [{connection, info}, {channel, info}]},
%% Set to 'true' to perform reverse DNS lookups when accepting a
%% connection. Hostnames will then be shown instead of IP addresses
%% in rabbitmqctl and the management plugin.
%%
%% {reverse_dns_lookups, true},
%%
%% Security / AAA
%% ==============
%%
%% The default "guest" user is only permitted to access the server
%% via a loopback interface (e.g. localhost).
%% {loopback_users, [<<"guest">>]},
%%
%% Uncomment the following line if you want to allow access to the
%% guest user from anywhere on the network.
%% {loopback_users, []},
%% Configuring SSL.
%% See http://www.rabbitmq.com/ssl.html for full documentation.
%%
%% {ssl_options, [{cacertfile, "/path/to/testca/cacert.pem"},
%% {certfile, "/path/to/server/cert.pem"},
%% {keyfile, "/path/to/server/key.pem"},
%% {verify, verify_peer},
%% {fail_if_no_peer_cert, false}]},
%% Choose the available SASL mechanism(s) to expose.
%% The two default (built in) mechanisms are 'PLAIN' and
%% 'AMQPLAIN'. Additional mechanisms can be added via
%% plugins.
%%
%% See http://www.rabbitmq.com/authentication.html for more details.
%%
%% {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
%% Select an authentication database to use. RabbitMQ comes bundled
%% with a built-in auth-database, based on mnesia.
%%
%% {auth_backends, [rabbit_auth_backend_internal]},
%% Configurations supporting the rabbitmq_auth_mechanism_ssl and
%% rabbitmq_auth_backend_ldap plugins.
%%
%% NB: These options require that the relevant plugin is enabled.
%% See http://www.rabbitmq.com/plugins.html for further details.
%% The RabbitMQ-auth-mechanism-ssl plugin makes it possible to
%% authenticate a user based on the client's SSL certificate.
%%
%% To use auth-mechanism-ssl, add to or replace the auth_mechanisms
%% list with the entry 'EXTERNAL'.
%%
%% {auth_mechanisms, ['EXTERNAL']},
%% The rabbitmq_auth_backend_ldap plugin allows the broker to
%% perform authentication and authorisation by deferring to an
%% external LDAP server.
%%
%% For more information about configuring the LDAP backend, see
%% http://www.rabbitmq.com/ldap.html.
%%
%% Enable the LDAP auth backend by adding to or replacing the
%% auth_backends entry:
%%
%% {auth_backends, [rabbit_auth_backend_ldap]},
%% This pertains to both the rabbitmq_auth_mechanism_ssl plugin and
%% STOMP ssl_cert_login configurations. See the rabbitmq_stomp
%% configuration section later in this file and the README in
%% https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further
%% details.
%%
%% To use the SSL cert's CN instead of its DN as the username
%%
%% {ssl_cert_login_from, common_name},
%% SSL handshake timeout, in milliseconds.
%%
%% {ssl_handshake_timeout, 5000},
%%
%% Default User / VHost
%% ====================
%%
%% On first start RabbitMQ will create a vhost and a user. These
%% config items control what gets created. See
%% http://www.rabbitmq.com/access-control.html for further
%% information about vhosts and access control.
%%
%% {default_vhost, <<"/">>},
%% {default_user, <<"guest">>},
%% {default_pass, <<"guest">>},
%% {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
%% Tags for default user
%%
%% For more details about tags, see the documentation for the
%% Management Plugin at http://www.rabbitmq.com/management.html.
%%
%% {default_user_tags, [administrator]},
%%
%% Additional network and protocol related configuration
%% =====================================================
%%
%% Set the default AMQP heartbeat delay (in seconds).
%%
%% {heartbeat, 600},
%% Set the max permissible size of an AMQP frame (in bytes).
%%
%% {frame_max, 131072},
%% Set the max permissible number of channels per connection.
%% 0 means "no limit".
%%
%% {channel_max, 128},
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
%% further documentation.
%%
%% {tcp_listen_options, [binary,
%% {packet, raw},
%% {reuseaddr, true},
%% {backlog, 128},
%% {nodelay, true},
%% {exit_on_close, false}]},
%%
%% Resource Limits & Flow Control
%% ==============================
%%
%% See http://www.rabbitmq.com/memory.html for full details.
%% Memory-based Flow Control threshold.
%%
%% {vm_memory_high_watermark, 0.4},
%% Fraction of the high watermark limit at which queues start to
%% page message out to disc in order to free up memory.
%%
%% Values greater than 0.9 can be dangerous and should be used carefully.
%%
%% {vm_memory_high_watermark_paging_ratio, 0.5},
%% Set disk free limit (in bytes). Once free disk space reaches this
%% lower bound, a disk alarm will be set - see the documentation
%% listed above for more details.
%%
%% {disk_free_limit, 50000000},
%% Alternatively, we can set a limit relative to total available RAM.
%%
%% Values lower than 1.0 can be dangerous and should be used carefully.
%% {disk_free_limit, {mem_relative, 2.0}},
%%
%% Misc/Advanced Options
%% =====================
%%
%% NB: Change these only if you understand what you are doing!
%%
%% To announce custom properties to clients on connection:
%%
%% {server_properties, []},
%% How to respond to cluster partitions.
%% See http://www.rabbitmq.com/partitions.html for further details.
%%
%% {cluster_partition_handling, ignore},
%% Make clustering happen *automatically* at startup - only applied
%% to nodes that have just been reset or started for the first time.
%% See http://www.rabbitmq.com/clustering.html#auto-config for
%% further details.
%%
%% {cluster_nodes, {['rabbit@my.host.com'], disc}},
%% Interval (in milliseconds) at which we send keepalive messages
%% to other cluster members. Note that this is not the same thing
%% as net_ticktime; missed keepalive messages will not cause nodes
%% to be considered down.
%%
%% {cluster_keepalive_interval, 10000},
%% Set (internal) statistics collection granularity.
%%
%% {collect_statistics, none},
%% Statistics collection interval (in milliseconds).
%%
%% {collect_statistics_interval, 5000},
%% Explicitly enable/disable hipe compilation.
%%
%% {hipe_compile, true},
%% Timeout used when waiting for Mnesia tables in a cluster to
%% become available.
%%
%% {mnesia_table_loading_timeout, 30000},
%% Size in bytes below which to embed messages in the queue index. See
%% http://www.rabbitmq.com/persistence-conf.html
%%
%% {queue_index_embed_msgs_below, 4096}
]},
%% ----------------------------------------------------------------------------
%% Advanced Erlang Networking/Clustering Options.
%%
%% See http://www.rabbitmq.com/clustering.html for details
%% ----------------------------------------------------------------------------
{kernel,
[%% Sets the net_kernel tick time.
%% Please see http://erlang.org/doc/man/kernel_app.html and
%% http://www.rabbitmq.com/nettick.html for further details.
%%
%% {net_ticktime, 60}
]},
%% ----------------------------------------------------------------------------
%% RabbitMQ Management Plugin
%%
%% See http://www.rabbitmq.com/management.html for details
%% ----------------------------------------------------------------------------
{rabbitmq_management,
[%% Pre-Load schema definitions from the following JSON file. See
%% http://www.rabbitmq.com/management.html#load-definitions
%%
%% {load_definitions, "/path/to/schema.json"},
%% Log all requests to the management HTTP API to a file.
%%
%% {http_log_dir, "/path/to/access.log"},
%% Change the port on which the HTTP listener listens,
%% specifying an interface for the web server to bind to.
%% Also set the listener to use SSL and provide SSL options.
%%
%% {listener, [{port, 12345},
%% {ip, "127.0.0.1"},
%% {ssl, true},
%% {ssl_opts, [{cacertfile, "/path/to/cacert.pem"},
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
%% One of 'basic', 'detailed' or 'none'. See
%% http://www.rabbitmq.com/management.html#fine-stats for more details.
%% {rates_mode, basic},
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
%% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
%% [{global, [{60, 5}, {3600, 60}, {86400, 1200}]},
%% {basic, [{60, 5}, {3600, 60}]},
%% {detailed, [{10, 5}]}]}
]},
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
%% See http://www.rabbitmq.com/shovel.html for details
%% ----------------------------------------------------------------------------
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
%% {my_first_shovel,
%% [
%% List the source broker(s) from which to consume.
%%
%% {sources,
%% [%% URI(s) and pre-declarations for all source broker(s).
%% {brokers, ["amqp://user:password@host.domain/my_vhost"]},
%% {declarations, []}
%% ]},
%% List the destination broker(s) to publish to.
%% {destinations,
%% [%% A singular version of the 'brokers' element.
%% {broker, "amqp://"},
%% {declarations, []}
%% ]},
%% Name of the queue to shovel messages from.
%%
%% {queue, <<"your-queue-name-goes-here">>},
%% Optional prefetch count.
%%
%% {prefetch_count, 10},
%% when to acknowledge messages:
%% - no_ack: never (auto)
%% - on_publish: after each message is republished
%% - on_confirm: when the destination broker confirms receipt
%%
%% {ack_mode, on_confirm},
%% Overwrite fields of the outbound basic.publish.
%%
%% {publish_fields, [{exchange, <<"my_exchange">>},
%% {routing_key, <<"from_shovel">>}]},
%% Static list of basic.properties to set on re-publication.
%%
%% {publish_properties, [{delivery_mode, 2}]},
%% The number of seconds to wait before attempting to
%% reconnect in the event of a connection failure.
%%
%% {reconnect_delay, 2.5}
%% ]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]},
%% ----------------------------------------------------------------------------
%% RabbitMQ Stomp Adapter
%%
%% See http://www.rabbitmq.com/stomp.html for details
%% ----------------------------------------------------------------------------
{rabbitmq_stomp,
[%% Network Configuration - the format is generally the same as for the broker
%% Listen only on localhost (ipv4 & ipv6) on a specific port.
%% {tcp_listeners, [{"127.0.0.1", 61613},
%% {"::1", 61613}]},
%% Listen for SSL connections on a specific port.
%% {ssl_listeners, [61614]},
%% Additional SSL options
%% Extract a name from the client's certificate when using SSL.
%%
%% {ssl_cert_login, true},
%% Set a default user name and password. This is used as the default login
%% whenever a CONNECT frame omits the login and passcode headers.
%%
%% Please note that setting this will allow clients to connect without
%% authenticating!
%%
%% {default_user, [{login, "guest"},
%% {passcode, "guest"}]},
%% If a default user is configured, or you have configured use SSL client
%% certificate based authentication, you can choose to allow clients to
%% omit the CONNECT frame entirely. If set to true, the client is
%% automatically connected as the default user or user supplied in the
%% SSL certificate whenever the first frame sent on a session is not a
%% CONNECT frame.
%%
%% {implicit_connect, true}
]},
%% ----------------------------------------------------------------------------
%% RabbitMQ MQTT Adapter
%%
%% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
%% for details
%% ----------------------------------------------------------------------------
{rabbitmq_mqtt,
[%% Set the default user name and password. Will be used as the default login
%% if a connecting client provides no other login details.
%%
%% Please note that setting this will allow clients to connect without
%% authenticating!
%%
%% {default_user, <<"guest">>},
%% {default_pass, <<"guest">>},
%% Enable anonymous access. If this is set to false, clients MUST provide
%% login information in order to connect. See the default_user/default_pass
%% configuration elements for managing logins without authentication.
%%
%% {allow_anonymous, true},
%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%%
%% {vhost, <<"/">>},
%% Specify the exchange to which messages from MQTT clients are published.
%%
%% {exchange, <<"amq.topic">>},
%% Specify TTL (time to live) to control the lifetime of non-clean sessions.
%%
%% {subscription_ttl, 1800000},
%% Set the prefetch count (governing the maximum number of unacknowledged
%% messages that will be delivered).
%%
%% {prefetch, 10},
%% TCP/SSL Configuration (as per the broker configuration).
%%
%% {tcp_listeners, [1883]},
%% {ssl_listeners, []},
%% TCP/Socket options (as per the broker configuration).
%%
%% {tcp_listen_options, [binary,
%% {packet, raw},
%% {reuseaddr, true},
%% {backlog, 128},
%% {nodelay, true}]}
]},
%% ----------------------------------------------------------------------------
%% RabbitMQ AMQP 1.0 Support
%%
%% See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md
%% for details
%% ----------------------------------------------------------------------------
{rabbitmq_amqp1_0,
[%% Connections that are not authenticated with SASL will connect as this
%% account. See the README for more information.
%%
%% Please note that setting this will allow clients to connect without
%% authenticating!
%%
%% {default_user, "guest"},
%% Enable protocol strict mode. See the README for more information.
%%
%% {protocol_strict_mode, false}
]},
%% ----------------------------------------------------------------------------
%% RabbitMQ LDAP Plugin
%%
%% See http://www.rabbitmq.com/ldap.html for details.
%%
%% ----------------------------------------------------------------------------
{rabbitmq_auth_backend_ldap,
[%%
%% Connecting to the LDAP server(s)
%% ================================
%%
%% Specify servers to bind to. You *must* set this in order for the plugin
%% to work properly.
%%
%% {servers, ["your-server-name-goes-here"]},
%% Connect to the LDAP server using SSL
%%
%% {use_ssl, false},
%% Specify the LDAP port to connect to
%%
%% {port, 389},
%% LDAP connection timeout, in milliseconds or 'infinity'
%%
%% {timeout, infinity},
%% Enable logging of LDAP queries.
%% One of
%% - false (no logging is performed)
%% - true (verbose logging of the logic used by the plugin)
%% - network (as true, but additionally logs LDAP network traffic)
%%
%% Defaults to false.
%%
%% {log, false},
%%
%% Authentication
%% ==============
%%
%% Pattern to convert the username given through AMQP to a DN before
%% binding
%%
%% {user_dn_pattern, "cn=${username},ou=People,dc=example,dc=com"},
%% Alternatively, you can convert a username to a Distinguished
%% Name via an LDAP lookup after binding. See the documentation for
%% full details.
%% When converting a username to a dn via a lookup, set these to
%% the name of the attribute that represents the user name, and the
%% base DN for the lookup query.
%%
%% {dn_lookup_attribute, "userPrincipalName"},
%% {dn_lookup_base, "DC=gopivotal,DC=com"},
%% Controls how to bind for authorisation queries and also to
%% retrieve the details of users logging in without presenting a
%% password (e.g., SASL EXTERNAL).
%% One of
%% - as_user (to bind as the authenticated user - requires a password)
%% - anon (to bind anonymously)
%% - {UserDN, Password} (to bind with a specified user name and password)
%%
%% Defaults to 'as_user'.
%%
%% {other_bind, as_user},
%%
%% Authorisation
%% =============
%%
%% The LDAP plugin can perform a variety of queries against your
%% LDAP server to determine questions of authorisation. See
%% http://www.rabbitmq.com/ldap.html#authorisation for more
%% information.
%% Set the query to use when determining vhost access
%%
%% {vhost_access_query, {in_group,
%% "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},
%% Set the query to use when determining resource (e.g., queue) access
%%
%% {resource_access_query, {constant, true}},
%% Set queries to determine which tags a user has
%%
%% {tag_queries, []}
]}
].
#Restart rabbitmq server
#Check queue status:
python code:
import amqplib.client_0_8 as amqp
host = '127.0.0.1'
port = 5671
connection= amqp.Connection(host='%s:%s' % (host, port), userid='user', password='password', ssl=True, virtual_host='vhostname')
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue='for_monitoring', passive=True)
print jobs
#coding celery worker:
from __future__ import absolute_import
from celery import Celery
from kombu import Exchange, Queue
import requests
import ssl
worker_app = Celery('project',
broker = 'amqp://' + RABBITMQ_USER_NAME + ':' + RABBITMQ_PASSWORD + '@' + RABBITMQ_HOST + ':5671/' + RABBITMQ_VIRTUAL_HOST,
include = ['tasks'])
worker_app.conf.update(
CELERY_TASK_RESULT_EXPIRES = 3600,
CELERY_QUEUES = (
Queue('for_monitoring', Exchange('for_monitoring'), routing_key = 'for_monitoring')
),
CELERY_ROUTES = {
'monitoring': {'queue': 'for_monitoring', 'routing_key': 'for_monitoring'}
},
CELERY_TIMEZONE = 'UTC',
BROKER_USE_SSL = {
'ca_certs': '/etc/rabbitmq/ssl/certs/cacert.pem',
'keyfile': '/etc/rabbitmq/ssl/certs/key_client.pem',
'certfile': '/etc/rabbitmq/ssl/certs/cert_client.pem',
'cert_reqs': ssl.CERT_REQUIRED
}
)
This i tried and tested and is working fine. I am not sure how to disable the http port though. So i just blocked the port from remote access. Will update if i find that out.