概要
Mac OS XでVagrant上の仮想マシン(CentOS7)にRabbitMQをインストール
Mac上のScalaプログラムからRabbitMQにアクセス
Vagrant
CentOS7のminimal boxをベースに環境構築していく
vagrant box add CentOS7 https://github.com/holms/vagrant-centos7-box/releases/download/7.1.1503.001/CentOS-7.1.1503-x86_64-netboot.box vagrant init CentOS7
Vagrantfileが作成されたと思うので、以下のRabbitMQ用のポートフォワーディング設定を追加する
vi Vagrantfile ↓追加↓ config.vm.network :forwarded_port, guest: 5672, host: 5672 config.vm.network :forwarded_port, guest: 15672, host: 15672
初期メモリが500MBぐらいしか割り当てられないので、それでは心もとないからメモリ1GB割り当てられるように設定する。
config.vm.provider "virtualbox" do |vb| vb.customize ["modifyvm", :id, "--memory", "1024"] end
vagrant up vagrant ssh
CentOS7の事前準備
とりあえずSELinuxは無効化
setenforce 0
FirewalldのRabbitMQ用のポートを解放しておく
firewall-cmd --permanent --add-port=5672/tcp firewall-cmd --permanent --add-port=15672/tcp
設定を反映する
firewall-cmd --reload
RabbitMQのインストール
Erlangと別々にインストールするのがだるいのでEPELレポジトリのRabbitMQを使用する。
sudo yum install https://dl.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm sudo yum -y install rabbitmq-server
Erlangがインストールされたか確認
[vagrant@localhost ~]$ erl Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false] Eshell V5.10.4 (abort with ^G) 1> erlang:system_info(otp_release). "R16B03-1" 2> q(). ok
RabbitMQがインストールされたか確認
[vagrant@localhost ~]$ sudo rabbitmqctl status | grep rabbit, {rabbit,"RabbitMQ","3.3.5"},
RabbitMQのManagement Pluginを有効化する。
Web画面ベースでQueueの管理とか出来るので絶対いれるべき
sudo rabbitmq-plugins enable rabbitmq_management
Config作成
cd $RABBITMQ_HOME/etc/rabbitmq/ vi rabbitmq.config ↓以下の行を追加 [{rabbit, [{loopback_users, []}]}].
設定を反映するためにRabbitMQを再起動
sudo service rabbitmq-server restart
管理画面表示できるか確認
curl http://127.0.0.1:15672/
Macでも表示できるか確認 初期IDとパスワードはどちらも「guest」
疎通確認用プログラムの作成
RabbitMQ tutorialsの「Hello World」をScalaで作成する
RabbitMQ - RabbitMQ tutorial - "Hello World!"
build.sbtに以下を追加
libraryDependencies += "com.rabbitmq" % "amqp-client" % "3.5.3"
Producer.scala(送信)
class Producer(queueName : String) { val factory = new ConnectionFactory() factory.setHost("localhost") val connection = factory.newConnection() val channel = connection.createChannel() channel.queueDeclare(queueName, false, false, false, null) def send():Unit={ var message = "Hello World!" channel.basicPublish("", queueName, null, message.getBytes()) println(" [x] Sent '" + message + "'") } def close():Unit={ channel.close() connection.close() } }
Consumer.scala(受信)
class Consumer(queueName : String) { val factory = new ConnectionFactory() factory.setHost("localhost") val connection = factory.newConnection() val channel = connection.createChannel() val consumer = new QueueingConsumer(channel) channel.basicConsume(queueName, true, consumer) def receive():Unit={ val delivery = consumer.nextDelivery() val message = new String(delivery.getBody()) println(" [x] Received '" + message + "'") } def close():Unit={ channel.close() connection.close() } }
Main.scala
object Main { val QUEUE_NAME="test_queue" def main(args: Array[String]) { val p = new Producer(QUEUE_NAME) val c = new Consumer(QUEUE_NAME) p.send() c.receive() p.close() c.close() } }
実行結果
[x] Sent 'Hello World!' [x] Received 'Hello World!'
プロジェクトのコードはGitHubに上げました github.com