S3M - Streaming: RabbitMQ
RabbitMQ is a full-featured message broker with queues and bindings.
Required Permission: data-streaming
Note
The Python and Go examples on this page use gRPC client packages that are not yet publicly available. ORNL-internal users may request access by contacting the S3M team: olcf-s3m@email.ornl.gov. The REST API (via curl or otherwise) is available to all users.
Cluster Lifecycle
Lifetime: 7 day countdown from time of creation
Extension: Countdown can be reset as many times as needed using the extension endpoint
Expiration: Clusters are automatically deleted when they expire, and cannot be restored
Provision Cluster
Create a new RabbitMQ cluster. The cluster will not be immediately accessible—monitor status with list or get endpoints.
POST /olcf/v1alpha/streaming/rabbitmq/provision_cluster
curl -X POST -H @.env \
-H "Content-Type: application/json" \
https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/provision_cluster \
-d '{
"kind": "general",
"name": "mycluster",
"resourceSettings": {
"cpus": 2,
"ram-gbs": 4,
"nodes": 2
}
}'
from s3m_apis_betterproto.streaming.v1alpha import ProvisionRabbitMqClusterRequest
client = factory.create_client(RabbitMqStreamingStub)
result = await client.provision_rabbit_mq_cluster(ProvisionRabbitMqClusterRequest(
kind="general",
name="mycluster",
resource_settings={"cpus": 2, "ram-gbs": 4, "nodes": 2}
))
import (
"context"
streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)
client := streamingpb.NewRabbitMQStreamingClient(conn)
result, err := client.ProvisionRabbitMQCluster(context.Background(), &streamingpb.ProvisionRabbitMQClusterRequest{
Kind: "general",
Name: "mycluster",
ResourceSettings: map[string]int32{"cpus": 2, "ram-gbs": 4, "nodes": 2},
})
import os import requests S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming" S3M_TOKEN = os.getenv("S3M_TOKEN") resource = "rabbitmq" headers = { "Authorization": S3M_TOKEN, } payload = { "kind": "general", "name": "mycluster", "resourceSettings": { "cpus": 2, "ram-gbs": 4, "nodes": 2 } } response = requests.post( f"{S3M_BASE_PATH}/{resource}/provision_cluster", headers=headers, json=payload ) if response.ok: rabbitmq_response = response.json() print(rabbitmq_response) else: raise ValueError("Request to S3M failed")
Response:
{
"username": "stf040-api",
"password": "bMjHlJKjHQGbWrnO",
"amqpsUrl": "amqps://stf040-api:bMjHlJKjHQGbWrnO@rmq-stf040-api-mycluster.io.s3m-streams.apps.olivine.ccs.ornl.gov:443",
"mgmtUrl": "https://rmq-stf040-api-mycluster.mgmt.s3m-streams.apps.olivine.ccs.ornl.gov"
}
List Clusters
GET /olcf/v1alpha/streaming/rabbitmq/list_clusters
curl -H @.env \
https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/list_clusters
from s3m_apis_betterproto.streaming.v1alpha import ListRabbitMqClustersRequest
client = factory.create_client(RabbitMqStreamingStub)
clusters = await client.list_rabbit_mq_clusters(ListRabbitMqClustersRequest())
import (
"context"
streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)
client := streamingpb.NewRabbitMQStreamingClient(conn)
clusters, err := client.ListRabbitMQClusters(context.Background(), &streamingpb.ListRabbitMQClustersRequest{})
import os import requests S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming" S3M_TOKEN = os.getenv("S3M_TOKEN") resource = "rabbitmq" headers = { "Authorization": S3M_TOKEN, } response = requests.get( f"{S3M_BASE_PATH}/{resource}/list_clusters", headers=headers, ) if response.ok: rabbitmq_response = response.json() print(rabbitmq_response) else: raise ValueError("Request to S3M failed")
Get Cluster
GET /olcf/v1alpha/streaming/rabbitmq/cluster/{name}
curl -H @.env \
https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/cluster/mycluster
from s3m_apis_betterproto.streaming.v1alpha import GetRabbitMqClusterRequest
client = factory.create_client(RabbitMqStreamingStub)
cluster = await client.get_rabbit_mq_cluster(GetRabbitMqClusterRequest(name="mycluster"))
import (
"context"
streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)
client := streamingpb.NewRabbitMQStreamingClient(conn)
cluster, err := client.GetRabbitMQCluster(context.Background(), &streamingpb.GetRabbitMQClusterRequest{Name: "mycluster"})
import os import requests S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming" S3M_TOKEN = os.getenv("S3M_TOKEN") resource = "rabbitmq" cluster_name = "mycluster" headers = { "Authorization": S3M_TOKEN, } response = requests.get( f"{S3M_BASE_PATH}/{resource}/cluster/{cluster_name}", headers=headers, ) if response.ok: rabbitmq_response = response.json() print(rabbitmq_response) else: raise ValueError("Request to S3M failed")
Extend Cluster
Reset the cluster’s lifetime to 7 days from now.
POST /olcf/v1alpha/streaming/rabbitmq/extend/{name}
curl -X POST -H @.env \
https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/extend/mycluster
from s3m_apis_betterproto.streaming.v1alpha import ExtendRabbitMqClusterLifeRequest
client = factory.create_client(RabbitMqStreamingStub)
result = await client.extend_rabbit_mq_cluster_life(ExtendRabbitMqClusterLifeRequest(name="mycluster"))
import (
"context"
streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)
client := streamingpb.NewRabbitMQStreamingClient(conn)
result, err := client.ExtendRabbitMQClusterLife(context.Background(), &streamingpb.ExtendRabbitMQClusterLifeRequest{Name: "mycluster"})
import os import requests S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming" S3M_TOKEN = os.getenv("S3M_TOKEN") resource = "rabbitmq" cluster_name = "mycluster" headers = { "Authorization": S3M_TOKEN, } response = requests.post( f"{S3M_BASE_PATH}/{resource}/extend/{cluster_name}", headers=headers, ) if response.ok: rabbitmq_response = response.json() print(rabbitmq_response) else: raise ValueError("Request to S3M failed")
Delete Cluster
DELETE /olcf/v1alpha/streaming/rabbitmq/cluster/{name}
curl -X DELETE -H @.env \
https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/cluster/mycluster
from s3m_apis_betterproto.streaming.v1alpha import DeleteRabbitMqClusterRequest
client = factory.create_client(RabbitMqStreamingStub)
await client.delete_rabbit_mq_cluster(DeleteRabbitMqClusterRequest(name="mycluster"))
import (
"context"
streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)
client := streamingpb.NewRabbitMQStreamingClient(conn)
_, err := client.DeleteRabbitMQCluster(context.Background(), &streamingpb.DeleteRabbitMQClusterRequest{Name: "mycluster"})
import os import requests S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming" S3M_TOKEN = os.getenv("S3M_TOKEN") resource = "rabbitmq" cluster_name = "mycluster" headers = { "Authorization": S3M_TOKEN, } response = requests.delete( f"{S3M_BASE_PATH}/{resource}/cluster/{cluster_name}", headers=headers, ) if response.ok: rabbitmq_response = response.json() print(rabbitmq_response) else: raise ValueError("Request to S3M failed")