-
Notifications
You must be signed in to change notification settings - Fork 0
Message Streams
Greg Svoboda edited this page Mar 4, 2026
·
1 revision
Message stream management is done through ServerClient. You will need a server API token.
import postmark
client = postmark.ServerClient("your-server-token")import asyncio
async def main():
result = await client.stream.list()
print(f"Total streams: {result.total}")
for stream in result.items:
print(f" [{stream.id}] {stream.name}")
print(f" Type: {stream.message_stream_type.value}")
print(f" Archived: {stream.archived_at is not None}")
asyncio.run(main())Filter by stream type or include archived streams:
from postmark.models.streams import MessageStreamType
result = await client.stream.list(
message_stream_type=MessageStreamType.BROADCASTS,
include_archived=False,
)async def main():
stream = await client.stream.get("my-broadcasts")
print(f"Name: {stream.name}")
print(f"Type: {stream.message_stream_type.value}")
print(f"Description: {stream.description}")import asyncio
from postmark.models.streams import MessageStreamType, UnsubscribeHandlingType
async def main():
stream = await client.stream.create(
id="my-broadcasts",
name="My Broadcast Stream",
message_stream_type=MessageStreamType.BROADCASTS,
description="Used for newsletters and announcements",
unsubscribe_handling_type=UnsubscribeHandlingType.POSTMARK,
)
print(f"Created: {stream.id} ({stream.message_stream_type.value})")
asyncio.run(main())MessageStreamType values: TRANSACTIONAL, BROADCASTS
UnsubscribeHandlingType values: NONE, POSTMARK, CUSTOM
async def main():
stream = await client.stream.edit(
"my-broadcasts",
name="Updated Broadcast Stream",
description="Updated description",
)
print(f"Updated: {stream.name}")Archiving a stream prevents it from sending. The stream and its data will be purged after the expected purge date.
import asyncio
async def main():
result = await client.stream.archive("my-broadcasts")
print(f"Archived: {result.id}")
print(f"Expected purge date: {result.expected_purge_date}")
asyncio.run(main())async def main():
stream = await client.stream.unarchive("my-broadcasts")
print(f"Unarchived: {stream.id}")