📦 EqualifyEverything / integration-axe

📄 test_rabbit_connection.py · 21 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import unittest
import pika
from utils.auth import rabbit


class TestRabbitConnection(unittest.TestCase):
    def test_rabbit_connection(self):
        try:
            channel, connection = rabbit('test_queue')
            self.assertIsInstance(channel, pika.channel.Channel)
            self.assertIsInstance(connection, pika.adapters.blocking_connection.BlockingConnection)
        except Exception as e:
            self.fail(f"Failed to establish a connection to RabbitMQ: {str(e)}")
        finally:
            if connection and not connection.is_closed:
                connection.close()


if __name__ == '__main__':
    unittest.main()