S3M - Streaming: RabbitMQ

RabbitMQ is a full-featured message broker with queues and bindings.

Required Permission: data-streaming

Note

The Python and Go examples on this page use gRPC client packages that are not yet publicly available. ORNL-internal users may request access by contacting the S3M team: olcf-s3m@email.ornl.gov. The REST API (via curl or otherwise) is available to all users.

Cluster Lifecycle

  • Lifetime: 7 day countdown from time of creation

  • Extension: Countdown can be reset as many times as needed using the extension endpoint

  • Expiration: Clusters are automatically deleted when they expire, and cannot be restored

Provision Cluster

Create a new RabbitMQ cluster. The cluster will not be immediately accessible—monitor status with list or get endpoints.

POST /olcf/v1alpha/streaming/rabbitmq/provision_cluster

curl -X POST -H @.env \
    -H "Content-Type: application/json" \
    https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/provision_cluster \
    -d '{
      "kind": "general",
      "name": "mycluster",
      "resourceSettings": {
        "cpus": 2,
        "ram-gbs": 4,
        "nodes": 2
      }
    }'
from s3m_apis_betterproto.streaming.v1alpha import ProvisionRabbitMqClusterRequest

client = factory.create_client(RabbitMqStreamingStub)
result = await client.provision_rabbit_mq_cluster(ProvisionRabbitMqClusterRequest(
    kind="general",
    name="mycluster",
    resource_settings={"cpus": 2, "ram-gbs": 4, "nodes": 2}
))
import (
    "context"
    streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)

client := streamingpb.NewRabbitMQStreamingClient(conn)
result, err := client.ProvisionRabbitMQCluster(context.Background(), &streamingpb.ProvisionRabbitMQClusterRequest{
    Kind: "general",
    Name: "mycluster",
    ResourceSettings: map[string]int32{"cpus": 2, "ram-gbs": 4, "nodes": 2},
})
import os
import requests

S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming"
S3M_TOKEN = os.getenv("S3M_TOKEN")
resource = "rabbitmq"

headers = {
    "Authorization": S3M_TOKEN,
}

payload = {
    "kind": "general",
    "name": "mycluster",
    "resourceSettings": {
        "cpus": 2,
        "ram-gbs": 4,
        "nodes": 2
    }
}

response = requests.post(
    f"{S3M_BASE_PATH}/{resource}/provision_cluster",
    headers=headers,
    json=payload
)

if response.ok:
    rabbitmq_response = response.json()
    print(rabbitmq_response)

else:
    raise ValueError("Request to S3M failed")

Response:

{
  "username": "stf040-api",
  "password": "bMjHlJKjHQGbWrnO",
  "amqpsUrl": "amqps://stf040-api:bMjHlJKjHQGbWrnO@rmq-stf040-api-mycluster.io.s3m-streams.apps.olivine.ccs.ornl.gov:443",
  "mgmtUrl": "https://rmq-stf040-api-mycluster.mgmt.s3m-streams.apps.olivine.ccs.ornl.gov"
}

List Clusters

GET /olcf/v1alpha/streaming/rabbitmq/list_clusters

curl -H @.env \
    https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/list_clusters
from s3m_apis_betterproto.streaming.v1alpha import ListRabbitMqClustersRequest

client = factory.create_client(RabbitMqStreamingStub)
clusters = await client.list_rabbit_mq_clusters(ListRabbitMqClustersRequest())
import (
    "context"
    streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)

client := streamingpb.NewRabbitMQStreamingClient(conn)
clusters, err := client.ListRabbitMQClusters(context.Background(), &streamingpb.ListRabbitMQClustersRequest{})
import os
import requests

S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming"
S3M_TOKEN = os.getenv("S3M_TOKEN")
resource = "rabbitmq"

headers = {
    "Authorization": S3M_TOKEN,
}

response = requests.get(
    f"{S3M_BASE_PATH}/{resource}/list_clusters",
    headers=headers,
)

if response.ok:
    rabbitmq_response = response.json()
    print(rabbitmq_response)

else:
    raise ValueError("Request to S3M failed")

Get Cluster

GET /olcf/v1alpha/streaming/rabbitmq/cluster/{name}

curl -H @.env \
    https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/cluster/mycluster
from s3m_apis_betterproto.streaming.v1alpha import GetRabbitMqClusterRequest

client = factory.create_client(RabbitMqStreamingStub)
cluster = await client.get_rabbit_mq_cluster(GetRabbitMqClusterRequest(name="mycluster"))
import (
    "context"
    streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)

client := streamingpb.NewRabbitMQStreamingClient(conn)
cluster, err := client.GetRabbitMQCluster(context.Background(), &streamingpb.GetRabbitMQClusterRequest{Name: "mycluster"})
import os
import requests

S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming"
S3M_TOKEN = os.getenv("S3M_TOKEN")
resource = "rabbitmq"
cluster_name = "mycluster"

headers = {
    "Authorization": S3M_TOKEN,
}

response = requests.get(
    f"{S3M_BASE_PATH}/{resource}/cluster/{cluster_name}",
    headers=headers,
)

if response.ok:
    rabbitmq_response = response.json()
    print(rabbitmq_response)

else:
    raise ValueError("Request to S3M failed")

Extend Cluster

Reset the cluster’s lifetime to 7 days from now.

POST /olcf/v1alpha/streaming/rabbitmq/extend/{name}

curl -X POST -H @.env \
    https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/extend/mycluster
from s3m_apis_betterproto.streaming.v1alpha import ExtendRabbitMqClusterLifeRequest

client = factory.create_client(RabbitMqStreamingStub)
result = await client.extend_rabbit_mq_cluster_life(ExtendRabbitMqClusterLifeRequest(name="mycluster"))
import (
    "context"
    streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)

client := streamingpb.NewRabbitMQStreamingClient(conn)
result, err := client.ExtendRabbitMQClusterLife(context.Background(), &streamingpb.ExtendRabbitMQClusterLifeRequest{Name: "mycluster"})
import os
import requests

S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming"
S3M_TOKEN = os.getenv("S3M_TOKEN")
resource = "rabbitmq"
cluster_name = "mycluster"

headers = {
    "Authorization": S3M_TOKEN,
}

response = requests.post(
    f"{S3M_BASE_PATH}/{resource}/extend/{cluster_name}",
    headers=headers,
)

if response.ok:
    rabbitmq_response = response.json()
    print(rabbitmq_response)

else:
    raise ValueError("Request to S3M failed")

Delete Cluster

DELETE /olcf/v1alpha/streaming/rabbitmq/cluster/{name}

curl -X DELETE -H @.env \
    https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming/rabbitmq/cluster/mycluster
from s3m_apis_betterproto.streaming.v1alpha import DeleteRabbitMqClusterRequest

client = factory.create_client(RabbitMqStreamingStub)
await client.delete_rabbit_mq_cluster(DeleteRabbitMqClusterRequest(name="mycluster"))
import (
    "context"
    streamingpb "s3m.olcf.ornl.gov/apis/streaming/v1alpha"
)

client := streamingpb.NewRabbitMQStreamingClient(conn)
_, err := client.DeleteRabbitMQCluster(context.Background(), &streamingpb.DeleteRabbitMQClusterRequest{Name: "mycluster"})
import os
import requests

S3M_BASE_PATH = "https://s3m.olcf.ornl.gov/olcf/v1alpha/streaming"
S3M_TOKEN = os.getenv("S3M_TOKEN")
resource = "rabbitmq"
cluster_name = "mycluster"

headers = {
    "Authorization": S3M_TOKEN,
}

response = requests.delete(
    f"{S3M_BASE_PATH}/{resource}/cluster/{cluster_name}",
    headers=headers,
)

if response.ok:
    rabbitmq_response = response.json()
    print(rabbitmq_response)

else:
    raise ValueError("Request to S3M failed")