import asyncio
import json
import websockets
import base64
class RimeClient:
def __init__(self, speaker, api_key):
self.url = f"wss://users.rime.ai/ws2?speaker={speaker}&modelId=mistv2&audioFormat=mp3"
self.auth_headers = {
"Authorization": f"Bearer {api_key}"
}
self.audio_data = b''
async def send_messages(self, websocket, messages):
for message in messages:
await websocket.send(json.dumps(message))
async def handle_audio(self, websocket):
while True:
try:
audio = await websocket.recv()
except websockets.exceptions.ConnectionClosedOK:
break
message = json.loads(audio)
if message['type'] == 'chunk':
self.audio_data += base64.b64decode(message['data'])
if message['type'] == 'timestamps':
print("Rime model pronounced the words...\n")
for w, t in zip(message['word_timestamps']['words'], message['word_timestamps']['start']):
print(f"'{w}' at time {t}")
async def run(self, messages):
async with websockets.connect(self.url, additional_headers=self.auth_headers) as websocket:
await asyncio.gather(
self.send_messages(websocket, messages),
self.handle_audio(websocket),
)
def save_audio(self, file_path):
with open(file_path, 'wb') as f:
f.write(self.audio_data)
print(f"\n Audio saved at {file_path}")
message = [
{"text": "This "},
{"text": "is "},
{"text": "a "},
{"text": "test "},
{"operation":"clear"},
{"text": "This "},
{"text": "is "},
{"text": "an "},
{"text": "incomplete "},
{"text": "sentence "},
{"operation": "eos"},
]
client = RimeClient("cove", api_key="xxx")
asyncio.run(client.run(message))
client.save_audio("output.mp3")