The Cerebras Python library provides convenient access to the Cerebras REST API from any Python 3.8+ application. The library includes type definitions for all request params and response fields, and offers both synchronous and asynchronous clients powered by httpx.
It is generated with Stainless.
At Cerebras, we've developed the world's largest and fastest AI processor, the Wafer-Scale Engine-3 (WSE-3). The Cerebras CS-3 system, powered by the WSE-3, represents a new class of AI supercomputer that sets the standard for generative AI training and inference with unparalleled performance and scalability.
With Cerebras as your inference provider, you can:
- Achieve unprecedented speed for AI inference workloads
- Build commercially with high throughput
- Effortlessly scale your AI workloads with our seamless clustering technology
Our CS-3 systems can be quickly and easily clustered to create the largest AI supercomputers in the world, making it simple to place and run the largest models. Leading corporations, research institutions, and governments are already using Cerebras solutions to develop proprietary models and train popular open-source models.
Want to experience the power of Cerebras? Check out our website for more resources and explore options for accessing our technology through the Cerebras Cloud or on-premise deployments!
The REST API documentation can be found on inference-docs.cerebras.ai. The full API of this library can be found in api.md.
pip install cerebras_cloud_sdk
Get an API Key from cloud.cerebras.ai and add it to your environment variables:
export CEREBRAS_API_KEY="your-api-key-here"
The full API of this library can be found in api.md.
import os
from cerebras.cloud.sdk import Cerebras
client = Cerebras(
api_key=os.environ.get("CEREBRAS_API_KEY"), # This is the default and can be omitted
)
chat_completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Why is fast inference important?",
}
],
model="llama3.1-8b",
)
print(chat_completion)
import os
from cerebras.cloud.sdk import Cerebras
client = Cerebras(
api_key=os.environ.get("CEREBRAS_API_KEY"), # This is the default and can be omitted
)
completion = client.completions.create(
prompt="It was a dark and stormy ",
max_tokens=100,
model="llama3.1-8b",
)
print(completion)
While you can provide an api_key
keyword argument,
we recommend using python-dotenv
to add CEREBRAS_API_KEY="My API Key"
to your .env
file
so that your API Key is not stored in source control.
Simply import AsyncCerebras
instead of Cerebras
and use await
with each API call:
import os
import asyncio
from cerebras.cloud.sdk import AsyncCerebras
client = AsyncCerebras(
api_key=os.environ.get("CEREBRAS_API_KEY"), # This is the default and can be omitted
)
async def main() -> None:
chat_completion = await client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Why is fast inference important?",
}
],
model="llama3.1-8b",
)
print(chat_completion)
asyncio.run(main())
Functionality between the synchronous and asynchronous clients is otherwise identical.
We provide support for streaming responses using Server Side Events (SSE).
Note that when streaming, usage
and time_info
will be information will only be included in the final chunk.
import os
from cerebras.cloud.sdk import Cerebras
client = Cerebras(
# This is the default and can be omitted
api_key=os.environ.get("CEREBRAS_API_KEY"),
)
stream = client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Why is fast inference important?",
}
],
model="llama3.1-8b",
stream=True,
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
The async client uses the exact same interface.
import os
import asyncio
from cerebras.cloud.sdk import AsyncCerebras
client = AsyncCerebras(
# This is the default and can be omitted
api_key=os.environ.get("CEREBRAS_API_KEY"),
)
async def main() -> None:
stream = await client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Why is fast inference important?",
}
],
model="llama3.1-8b",
stream=True,
)
async for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
asyncio.run(main())
import os
from cerebras.cloud.sdk import Cerebras
client = Cerebras(
# This is the default and can be omitted
api_key=os.environ.get("CEREBRAS_API_KEY"),
)
stream = client.completions.create(
prompt="It was a dark and stormy ",
max_tokens=100,
model="llama3.1-8b",
stream=True,
)
for chunk in stream:
print(chunk.choices[0].text or "", end="")
Nested request parameters are TypedDicts. Responses are Pydantic models which also provide helper methods for things like:
- Serializing back into JSON,
model.to_json()
- Converting to a dictionary,
model.to_dict()
Typed requests and responses provide autocomplete and documentation within your editor. If you would like to see type errors in VS Code to help catch bugs earlier, set python.analysis.typeCheckingMode
to basic
.
When the library is unable to connect to the API (for example, due to network connection problems or a timeout), a subclass of cerebras.cloud.sdk.APIConnectionError
is raised.
When the API returns a non-success status code (that is, 4xx or 5xx
response), a subclass of cerebras.cloud.sdk.APIStatusError
is raised, containing status_code
and response
properties.
All errors inherit from cerebras.cloud.sdk.APIError
.
import cerebras.cloud.sdk
from cerebras.cloud.sdk import Cerebras
client = Cerebras()
try:
client.chat.completions.create(
messages=[
{
"role": "user",
"content": "This should cause an error!",
}
],
model="some-model-that-doesnt-exist",
)
except cerebras.cloud.sdk.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx.
except cerebras.cloud.sdk.RateLimitError as e:
print("A 429 status code was received; we should back off a bit.")
except cerebras.cloud.sdk.APIStatusError as e:
print("Another non-200-range status code was received")
print(e.status_code)
print(e.response)
Error codes are as followed:
Status Code | Error Type |
---|---|
400 | BadRequestError |
401 | AuthenticationError |
403 | PermissionDeniedError |
404 | NotFoundError |
422 | UnprocessableEntityError |
429 | RateLimitError |
>=500 | InternalServerError |
N/A | APIConnectionError |
Certain errors are automatically retried 2 times by default, with a short exponential backoff. Connection errors (for example, due to a network connectivity problem), 408 Request Timeout, 409 Conflict, 429 Rate Limit, and >=500 Internal errors are all retried by default.
You can use the max_retries
option to configure or disable retry settings:
from cerebras.cloud.sdk import Cerebras
# Configure the default for all requests:
client = Cerebras(
# default is 2
max_retries=0,
)
# Or, configure per-request:
client.with_options(max_retries=5).chat.completions.create(
messages=[
{
"role": "user",
"content": "Why is fast inference important?",
}
],
model="llama3.1-8b",
)
By default requests time out after 1 minute. You can configure this with a timeout
option,
which accepts a float or an httpx.Timeout
object:
from cerebras.cloud.sdk import Cerebras
import httpx
# Configure the default for all requests:
client = Cerebras(
# 20 seconds (default is 1 minute)
timeout=20.0,
)
# More granular control:
client = Cerebras(
timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)
# Override per-request:
client.with_options(timeout=5.0).chat.completions.create(
messages=[
{
"role": "user",
"content": "Why is fast inference important?",
}
],
model="llama3.1-8b",
)
On timeout, an APITimeoutError
is thrown.
Note that requests that time out are retried twice by default.
We use the standard library logging
module.
You can enable logging by setting the environment variable CEREBRAS_LOG
to debug
.
$ export CEREBRAS_LOG=debug
In an API response, a field may be explicitly null
, or missing entirely; in either case, its value is None
in this library. You can differentiate the two cases with .model_fields_set
:
if response.my_field is None:
if 'my_field' not in response.model_fields_set:
print('Got json like {}, without a "my_field" key present at all.')
else:
print('Got json like {"my_field": null}.')
The "raw" Response object can be accessed by prefixing .with_raw_response.
to any HTTP method call, e.g.,
from cerebras.cloud.sdk import Cerebras
client = Cerebras()
response = client.chat.completions.with_raw_response.create(
messages=[{
"role": "user",
"content": "Why is fast inference important?",
}],
model="llama3.1-8b",
)
print(response.headers.get('X-My-Header'))
completion = response.parse() # get the object that `chat.completions.create()` would have returned
print(completion)
These methods return an APIResponse
object.
The async client returns an AsyncAPIResponse
with the same structure, the only difference being await
able methods for reading the response content.
This library is typed for convenient access to the documented API.
If you need to access undocumented endpoints, params, or response properties, the library can still be used.
To make requests to undocumented endpoints, you can make requests using client.get
, client.post
, and other
http verbs. Options on the client will be respected (such as retries) will be respected when making this
request.
import httpx
response = client.post(
"/foo",
cast_to=httpx.Response,
body={"my_param": True},
)
print(response.headers.get("x-foo"))
If you want to explicitly send an extra param, you can do so with the extra_query
, extra_body
, and extra_headers
request
options.
To access undocumented response properties, you can access the extra fields like response.unknown_prop
. You
can also get all the extra fields on the Pydantic model as a dict with
response.model_extra
.
You can directly override the httpx client to customize it for your use case, including:
- Support for proxies
- Custom transports
- Additional advanced functionality
from cerebras.cloud.sdk import Cerebras, DefaultHttpxClient
client = Cerebras(
# Or use the `CEREBRAS_BASE_URL` env var
base_url="http://my.test.server.example.com:8083",
http_client=DefaultHttpxClient(
proxies="http://my.test.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)
You can also customize the client on a per-request basis by using with_options()
:
client.with_options(http_client=DefaultHttpxClient(...))
By default the library closes underlying HTTP connections whenever the client is garbage collected. You can manually close the client using the .close()
method if desired, or with a context manager that closes when exiting.
This package generally follows SemVer conventions, though certain backwards-incompatible changes may be released as minor versions:
- Changes that only affect static types, without breaking runtime behavior.
- Changes to library internals which are technically public but not intended or documented for external use. (Please open a GitHub issue to let us know if you are relying on such internals).
- Changes that we do not expect to impact the vast majority of users in practice.
We take backwards-compatibility seriously and work hard to ensure you can rely on a smooth upgrade experience.
We are keen for your feedback; please open an issue with questions, bugs, or suggestions.
If you've upgraded to the latest version but aren't seeing any new features you were expecting then your python environment is likely still using an older version.
You can determine the version that is being used at runtime with:
import cerebras.cloud.sdk
print(cerebras.cloud.sdk.__version__)
Python 3.8 or higher.