본문 바로가기

Infra

RabbitMQ 이것저것

K8s에 설치된 RabbitMQ를 사용하기 위해 해본 이것저것

 

RabbitMQ는 5672, 4369, 15672, 25672의 4가지 port를 가짐

각각 amqp, epmd, stats, dist로 이 중 15672가 management를 위한 port

하지만 port-forward로 15672를 열어도 socat[96627] E connet : Connection refused 에러가 나며 연결이 안되는 모습을 볼 수 있는데, 이 경우 pod의 container로 접속하여 아래 명령어를 통해 상태를 바꿔줘야 사용 가능

rabbitmq-plugins enable rabbitmq_management

 

위 과정을 통해 http를 열었다면 curl을 통해 api를 호출하여 사용이 가능하고 Rabbit MQ ui를 사용 할 수 있음

 

그 외에 내부에서 rabbitmqctl을 통해 기본적인 조작이 가능

cli 환경에서 유저에게 permission을 열어주려면 아래와 같은 형식으로 가능

rabbitmqctl set_permissions -p [vhost path ex) / ] [user name ex) tester ] [conf ex) ".*" ] [write ex) ".*" ] [read ex) ".*" ]

 

curl를 통해 API를 요청할 경우 필요한 정보는 아래와 같음

curl -i -H "context-type:application/json" http://[user]:[password]@[url ex) rabbitmq-0.rabbitmq.svc.cluster.local]:15672/api/users/

결과는 users list가 반환

이 외에도 queues, exchanges, connections 등 넣어서 사용 가능

 

UI 상에서 Exchange와 Queue를 만드는 중 생성된 Exchange의 정보를 출력하는 API의 문제가 있는 것으로 보임

Management API returned status code 500 -경고 창 발생

Queue에서는 상세보기가 가능하여 Binding이 가능

curl -i -H "context-type:application/json" http://[user]:[password]@[url ex) rabbitmq-0.rabbitmq.svc.cluster.local]:15672/api/bindings/로 확인한 결과 연결 된 것으로 보임

 

From exchange : test.exchange

Routing key : test.routing.#

위 정보로 연결

 

이번엔 UI를 통해 설정하였지만 참고한 페이지에서는 code 레벨에서 설정해주는 것을 추천

 

아래 코드를 통해 테스트

send.py

 
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 
                                                               5672, 
                                                               '/', 
                                                               pika.PlainCredentials('guest', 
                                                                                     'guest'))
channel = connection.channel()
channel.queue_declare(queue='test.queue', durable=True)
channel.basic_publish(exchange='test.exchange',
                      routing_key='test.routing.#',
                      body='Hello World!')
connection.close()

receive.py

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 
                                                               5672, 
                                                               '/', 
                                                               pika.PlainCredentials('guest', 
                                                                                     'guest'))
channel = connection.channel()
channel.queue_declare(queue='test.queue', durable=True)
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
channel.basic_consume(queue='test.queue',
                      on_message_callback=callback
                      )
channel.start_consuming()

receive는 UI에서 보낸 메세지나 send.py를 통해 전송된 message를 수신

 

rabbitmqctl을 통해 몇 회의 message queue가 전송되었는지 확인 가능

rabbitmqctl list_queue

 

Topic을 통한 Message 전송

Topic은 Exchange에서 Type을 다르게 가진 상태로 생성해야 동작 가능

Queue는 해당 사항 없음

 

일정 문자를 Topic으로 지정하여 해당하는 문자를 지닌 메세지만 받도록 함

 

code는 아래와 같음

receive_topic.py

 
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 
                                                               5672, 
                                                               '/', 
                                                               pika.PlainCredentials('guest', 
                                                                                     'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='topic.exchange', exchange_type='topic', durable=True)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic.exchange',
                       queue=queue_name,
                       routing_key=binding_key)
def callback(ch, method, properties, body):
    print(body)
channel.basic_consume(queue=queue_name,
                      on_message_callback=callback)
channel.start_consuming()

emit_log.py

 
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 
                                                               5672, 
                                                               '/', 
                                                               pika.PlainCredentials('guest', 
                                                                                     'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='topic.exchange',exchange_type='topic', durable=True)
routing_key=sys.argv[1]
message = ' '.join(sys.argv[2:]) or 'hello worlds'
channel.basic_publish(exchange='topic.exchange',
                        routing_key=routing_key,
                        body=message)
connection.close()

python receive_topic.py "notification.*"

python emit_log.py "notification.info" "[message]"

보낼 때 notification.info 에서 info는 *로 설정하였기 때문에 어떤 값이 오더라도 상관없으나, 앞에 notification 부분이 달라지면 메세지가 전달되지 않음

이때 ‘.'으로만 단어를 나눌 수 있으며 *은 한 단어만 #은 그 뒤에 오는 '.'으로 나눠진 몇 개의 단어가 오더라도 대체할 수 있음

notification.info 는 notification.*에서 받을 수 있지만 notification.info.test 는 받지 못함

notification.#은 두 개 모두 전송 가능

'Infra' 카테고리의 다른 글

Squid proxy 폐쇄망 연결  (0) 2024.05.23
sFlow Dashboard  (0) 2024.05.23
OpenVSwitch sFlow  (0) 2024.05.23
Fluentd 설정 방법  (0) 2024.05.23
Rabbit MQ - Fluentd - Opensearch  (0) 2024.05.23