import asyncio
import json
import requests
import websockets
api_key = <api key>
client_id = <client id>
audio_file_path = <audio file path>
model = "generic"
def get_session_id():
url = "https://services.juvoly.nl/api/v2/rest/session"
headers = {
"X-Juvoly-Api-Key": api_key,
"X-Juvoly-Client-Id": client_id,
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, json={})
return response.json().get('sessionId', None)
async def read(websocket):
while True:
response = await websocket.recv()
response_data = json.loads(response)
if response_data.get("type") == "transcript":
print("Received:", response_data.get("utterance"))
async def write(websocket):
with open(audio_file_path, 'rb') as audio_stream:
while chunk := audio_stream.read(10000):
await websocket.send(chunk)
async def connect(session_id):
websocket_url = f"wss://services.juvoly.nl/api/v2/socket/{model}"
async with websockets.connect(websocket_url, subprotocols=["v2.juvoly", client_id, session_id]) as websocket:
ready_message = await websocket.recv()
if json.loads(ready_message).get("type") == "ready":
write_task = asyncio.create_task(write(websocket))
read_task = asyncio.create_task(read(websocket))
await asyncio.gather(write_task, read_task)
if __name__ == "__main__":
session_id = get_session_id()
asyncio.run(connect(session_id))