Home » Using LangGraph and MCP Servers to Create My Own Voice Assistant

Using LangGraph and MCP Servers to Create My Own Voice Assistant

Why?

, but I don’t like it. Why? It fails to do anything more complicated than basic voice commands.

I end up using it for three things:

  • Get the current date or time
  • Get weather information for today
  • Turn on or off connected devices (e.g. TV, lights, robot vacuum)

which are the only things that I can use it for reliably. Anything else, I get a polite and unhelpful “I can’t help with that”.

Given the rise of LLM Agents and MCP servers, it’s become easier than ever to create personal assistants and chatbots. And I ask myself,

“Why stop at a chatbot? Why not take this one step further and create my own voice assistant?”

This is my attempt to do just that.

Goals

So I think, what exactly do I want my voice assistant to be able to do?

This is my list of initial goals:

1. Run on my local computer

I don’t want to pay for a subscription to use an LLM, and in fact, I don’t want to pay for anything.

Everything I build should just run on my local computer without having to worry about costs or how much free credit I have left at the end of each month.

2. Replicate Alexa functionality

Let’s take baby steps — first I simply want to replicate the functionality I already have with Alexa. This will be a good milestone to work towards, before I add more complex, extravagant features.

It should be able to:

  • Get the current date or time
  • Get weather information for today
  • Turn on or off connected devices (e.g. TV, lights, robot vacuum)

before we start building this out into a fully-fledged Tony Stark’s Jarvis-esque voice assistant that can compute how to travel back in time.

3. Be quick

If the responses aren’t fast enough, the voice assistant is as good as being silent.

Asking a question and waiting over a minute for a response is unacceptable. I want to be able to ask a question and get a response in a reasonable amount of time.

However, I know that running anything locally on my cute little Macbook Air is going to be slow, regardless of how many tweaks and refactorings I do.

So for now, I’m not going to expect millisecond-level response times. Instead the response times should be quicker than the time it takes me to execute the task/query myself. At least in this way I know that I’m saving time.

In future articles, we’ll delve deeper into the optimisations I do to get this down to millisecond response times without paying for subscriptions.

My Device Specs

  • Device: Macbook Air
  • Chip: Apple M3
  • Memory: 16GB

1. Overall Structure

I’ve structured the project as follows:

Image by author, Diagram of overall project structure

Voice Assistant

1. Speech-to-Text & Text-to-speech

We make use of RealtimeSTT for wakeword detection (e.g. “Alexa”, “Hey Jarvis”, “Hey Siri”), speech detection and real-time speech-to-text transcription.

The transcribed text is then sent to the Agent for processing, after which its response is then streamed to a Kokoro text-to-speech model. The output is then sent to the speaker.

2. Agent

We use Ollama to run LLMs locally. The agent and the workflow that it takes is implemented in LangGraph.

The agent is responsible for taking a user query, understand it, and call on the tools it thinks are required to provide an appropriate response.

Our voice assistant will require the following tools to meet our goals:

  • A function to get the current date.
  • A function to get the current time.

It also needs tools to interact with smart-home devices, but the implementation for this can get quite involved so we implement this in a separate MCP server.

3. MCP Server for smart-home Connection

The MCP server is where we encapsulate the complexity of finding, connecting to, and managing the devices.

A SQL database keeps track of devices, their connection information and their names.

Meanwhile, tools are the interface through which an agent finds the connection information for a given device, and then uses it to turn the device on or off.

Let’s now dive deeper into the implementation details of each component.

Want access to the code repository?

For those of you who wish to get access to the voice-assistant code that accompanies this article, check out my Patreon page here to get access PLUS exclusive access to community chats where you can talk directly with me about this project.

2. Implementation Details

Text-to-speech (TTS) Implementation

Photo by Oleg Laptev on Unsplash

The text-to-speech layer was perhaps the easiest to implement.

Given some string we assume comes from the agent, pass it through a pre-trained text-to-speech model and stream it to the device speaker.

Firstly, let’s define a class called Voice that will be responsible for this.

We know upfront that apart from the model that we use for speech synthesis, receiving text and streaming it to the speaker will be the same and can remain decoupled from anything model related.

class Voice():
    def __init__(
        self,
        sample_rate: int = 24000,
        chunk_size: int = 2048
    ):
        self.sample_rate = sample_rate
        self.chunk_size = chunk_size
        self.initialise_model()

    def initialise_model(self):
        """Initialise the model to use for TTS."""
        pass

    def convert_text_to_speech(self, text:str) -> list[np.ndarray]:
        """Convert text to sepeech and return the waveform as frames."""
        pass

    def speak(self, text:str):
        """Speak the provided text through device output."""
        frames = self.convert_text_to_speech(self, text)
        for frame in frames:
            self.output_stream.write(frame.tobytes())

so we can implement the speak function to stream the text to the speaker upfront.

Now, we can figure out which model is out there, which one to use, and how to use it, and then wire it up into our Voice class.

TTS Models Testing

Below, I list the various different TTS models that I experimented with, and the code you can use to replicate the results.

1. BarkModel (Link)

Quickstart code to run the model yourself:

from IPython.display import Audio
from transformers import BarkModel, BarkProcessor

model = BarkModel.from_pretrained("suno/bark-small")
processor = BarkProcessor.from_pretrained("suno/bark-small")
sampling_rate = model.generation_config.sample_rate

input_msg = "The time is 3:10 PM."

inputs = processor(input_msg, voice_preset="v2/en_speaker_2")
speech_output = model.generate(**inputs).cpu().numpy()

Audio(speech_output[0], rate=sampling_rate)

Summary

  • Good: Very realistic voice synthesis with natural sounding ‘umm’, ‘ahh’ filler words.
  • Bad: Quality is worse with shorter sentences. The end of the sentence is spoken as though a follow up sentence will quickly follow.
  • Bad: Very slow. Takes 13 seconds to generate the speech for “The time is 3:10 PM.”

2. Coqui TTS (Link)

Install using:

pip install coqui-tts

Test code

from IPython.display import Audio
from TTS.api import TTS 

tts = TTS(model_name="tts_models/en/ljspeech/tacotron2-DDC", progress_bar=False)

output_path = "output.wav"
input_msg = "The time is 3:10 PM."
tts.tts_to_file(text=input_msg, file_path=output_path)
Audio(output_path)

Summary

  • Good: Fast. Takes 0.3 seconds to generate the speech for “The time is 3:10 PM.”
  • Bad: Text normalisation is not up to scratch. When it comes to time related queries, the pronunciation of “PM” is off. When the time is set to “13:10 PM”, the pronunciation of “13” is unrecognisable.

3. Elevenlabs (Link)

Install using:

pip install elevenlabs

and run using:

import dotenv
from elevenlabs.client import ElevenLabs
from elevenlabs import stream

dotenv.load_dotenv()

api_key = os.getenv('elevenlabs_apikey')

elevenlabs = ElevenLabs(
  api_key=api_key,
)

audio_stream = elevenlabs.text_to_speech.stream(
    text="The time is 03:47AM",
    voice_id="JBFqnCBsd6RMkjVDRZzb",
    model_id="eleven_flash_v2_5"
)

stream(audio_stream)

Summary

By far the best in terms of quality and response times, which obviously it should be given it is a paid service.

They also provide some free credits without a subscription, but I’d rather not become dependent on it at all when developing my voice assistant so we skip it for now.

4. Kokoro (Link)

We leave the best til last.

Install using:

pip install kokoro pyaudio

Test code:

RATE = 24000
CHUNK_SIZE = 1024

p = pyaudio.PyAudio()
print(f"Input device: {p.get_default_input_device_info()}")
print(f"Output device: {p.get_default_output_device_info()}")

output_stream = p.open(
    format=pyaudio.paFloat32,
    channels=1,
    rate=RATE,
    output=True,
)
input_msg = "The time is 03:47AM"
generator = pipeline(input_msg, voice='af_heart')
for i, (gs, ps, audio) in enumerate(generator):
    print(i, gs, ps)

    for start in range(0, len(audio), CHUNK_SIZE):
        chunk = audio[start:start + CHUNK_SIZE]
        output_stream.write(chunk.numpy().astype(np.float32).tobytes())

Summary

Firstly, it’s quick — it’s on par with Elevenlabs, only marginally slower, not really noticeable given the example text.

Secondly, the quality of the speech is also good. Sure, it could be better, there are occasions where it sounds slightly clunky.

But on average the quality of the outputs are spot on.

Defining the Voice Class

So, we decide to use Kokoro for our text-to-speech implementation. Let’s now fill in the blanks for our Voice class. Also keep in mind that this is a first implementation, and I know in the future I will want to try other models.

So instead of implementing the model specific code directly into the Voice class, I’ll create a child class that inherits from Voice.

This way, I can do a quick switcharoo between different models without having to change the Voice class or unravel code that has become coupled.

from kokoro import KPipeline

class KokoroVoice(Voice):
    def __init__(self, voice:str, sample_rate: int = 24000, chunk_size: int = 2048):
        """Initialise the model to use for TTS.
        
        Args:
            voice (str):
                The voice to use.
                See https://github.com/hexgrad/kokoro/blob/main/kokoro.js/voices/
                for all voices.
            sample_rate (int, optional):
                The sample rate to use. Defaults to 24000.
            chunk_size (int, optional):
                The chunk size to use. Defaults to 2048.
        """
        self.voice = voice
        super().__init__(sample_rate, chunk_size)

    def initialise_model(self):
        """Load the model to use for TTS."""
        self.pipeline = KPipeline(lang_code="b")

    def convert_text_to_speech(self, text:str) -> list[np.ndarray]:
        """Convert text to speech and return the waveform as frames."""
        generator = self.pipeline(text, voice=self.voice)
        frames = []
        for i, (_, _, audio) in enumerate(generator):
            for start in range(0, len(audio), self.chunk_size):
                chunk = audio[start : start + self.chunk_size]
                frames.append(chunk.numpy().astype(np.float32))
        return frames

Now, this implementation allows us to simply import and instantiate this class at the point where we receive text from the agent, and stream it to the device speaker using:

text = "Hello world"
voice = KokoroVoice(**kwargs)
voice.speak(text)

SmartHome MCP Server Implementation

Photo by Fajrul Islam on Unsplash

This MCP server is dedicated to finding, connecting and managing smarthome devices. It lives in a separate repository, nicely separated from the voice assistant.

At the time of writing, the only smarthome device I have is a Tapo Smart Plug. You can interact with Tapo devices by using the python-kasa library.

Our server needs to do the following:

  • Given a device name, turn it on or off.
  • Discover new devices and add them to the database.
  • Update the device database with the latest device information — this includes the name of the device, the IP address and the MAC address.

1. Database

Firstly, let’s look at how we will store the device information in a SQL database. For simplicity I will choose duckdb as the database backend.

Devices Table

We firstly define the schema for our first (and only) table called device.

# src/smarthome_mcp_server/database.py

import os
import duckdb
from dataclasses import dataclass


@dataclass
class TableSchema:
    name:str
    columns:dict[str, str]
    primary_key:list[str]


def get_device_table_schema():
    return TableSchema(
        name="device",
        columns={
            "device_id" : "VARCHAR",
            "name": "VARCHAR",
            "ip_address": "VARCHAR",
        },
        primary_key=["device_id"],
    )

The device_id is the primary key, and by definition need to uniquely identify all devices in our home. Luckily, each Tapo device has a unique device-id that we can use.

The name is what the user would be referencing as the device name. For example, in our case, the Tapo Smart Plug is connected to our living room light, and is named lights. This name is assigned via thee Tapo App.

Finally, the ip_address column will be the IP Address that is used to connect to the device in order to control it.

DB Initialisation

We create some helper functions like get_create_table_if_not_exists_query and initialise_database functions that we can call to invoke to create the DB on first startup.

For brevity, I show just the initialise_database function since the former is self-explanatory:

def initialise_database(db_path:os.PathLike) -> duckdb.DuckDBPyConnection:
    """Get the database connection and create the tables if they don't exist."""
    conn = duckdb.connect(db_path)

    # initialise if not exists tables
    conn.execute(
        get_create_table_if_not_exists_query(get_device_table_schema())
    )

    return conn

Device management

Finally, we implement the code that will be interacting with the devices and updating the database.

import duckdb
from dotenv import 

class DeviceManager:
    def __init__(self, conn:duckdb.DuckDBPyConnection) -> None:
        self._conn = conn
    
    ...

    async def turn_on_device(self, device_name: str) -> str:
        """Turn on a device.

        Args:
            device_name (str):
                The name of the device to turn on.
        """
        try:
            device = await self._get_device(device_name)
        except DeviceNotFoundError as e:
            logger.exception(e)
            return f"Device {device_name} not found."

        await device.turn_on()
        return f"Device {device_name} turned on."

    async def turn_off_device(self, device_name: str) -> str:
        """Turn off a device.

        Args:
            device_name (str):
                The name of the device to turn off.
        """
        try:
            device = await self._get_device(device_name)
        except DeviceNotFoundError as e:
            logger.exception(e)
            return f"Device {device_name} not found."

        await device.turn_off()
        return f"Device {device_name} turned off."

    async def list_devices(self) -> list[str]:
        """List the available device names.

        Returns:
            list[str]:
                A list of device names.
        """
        results = self._conn.query("SELECT name FROM device").fetchall()

        return [result[0] for result in results]

The three methods above will be the public methods that we register as tools for our Voice Assistant.

We’ve omitted the private methods for brevity.

One thing that I’ve realised since writing this code is that DeviceManager is very Tapo specific. After having looked at integrating non-Tapo devices, I realised I’ve been naive to think that other smart device APIs would follow the same, standardised pattern.

So in the future, this class will need to be changed to TapoDeviceManager, and further abstractions will need to be made to accommodate this variability.

For example, recently I’ve got some Wiz lightbulbs for my bedroom. Turns out, the API does not fetch the names assigned to each device via the app, which was available in Tapo by default.

Therefore, I will need to think of some way to fetch this in the backend, or use the voice-assistant to populate it when it doesn’t exist.

3. Expose the tools to Voice-Assistant using FastMCP

Finally, we need to expose the methods we’ve written as tools for our voice assistant to use.

from fastmcp import FastMCP

def register_device_manager_tools(mcp_instance: FastMCP, device_manager: DeviceManager) -> FastMCP:
    """Register the methods defined in DeviceManager as tools for MCP server."""
    mcp_instance.tool(name_or_fn=device_manager.list_devices)
    mcp_instance.tool(name_or_fn=device_manager.turn_off_device)
    mcp_instance.tool(name_or_fn=device_manager.turn_on_device)
    return mcp_instance


async def populate_database(device_manager: DeviceManager):
    """Find all devices that are available and update the database.

    Discover all available devices and get their latest states.

    Note:
        Device names may have changed via the mobile app, thus this
        step is necessary when starting the server.
    """
    all_devices = await device_manager.discover_new_devices()
    upsert_coroutines = [device_manager._upsert_device(device) for device in all_devices.values()]
    await asyncio.gather(*upsert_coroutines)


def initialise_server(db_path: os.PathLike) -> FastMCP:
    """Initialise the server.

    Args:
        db_path (os.PathLike):
            The path to the duckdb database which
            stores the server information.
    Returns:
        FastMCP: The FastMCP server.
    """
    conn = initialise_database(db_path)
    device_manager = DeviceManager(conn)

    # find all devices that are available and update the database
    asyncio.run(populate_database(device_manager))

    mcp = FastMCP(
        name="smarthome-mcp-server",
        instructions="This server is for finding and controlling smarthome devices.",
    )

    register_device_manager_tools(mcp, device_manager)
    return mcp

initialise_server is where we initialise and pre-populate the database, and run the server.

Notice we populate the database on startup each time. This is necessary since device names could have been updated via the Tapo app between runs, so this is an attempt to fetch the most up-to-date information for all devices.

Now, I know there are some holes in the implementation — it’s a first attempt and an ongoing project, so if you see any issues or potential improvements please let me know via my Patreon account (see end of article).

Server Entry Point

We use typer to make our server into a CLI app.

# __main__.py

load_dotenv()

app = typer.Typer()
console = Console()


@app.command()
def main():
    config = load_config()

    # set up server data directory
    root_dir = platformdirs.user_data_path(
        appname="smarthome-mcp-server",
        ensure_exists=True
    )
    db_path = Path(root_dir) / config.database.path
    db_path.parent.mkdir(parents=True, exist_ok=True)
    logger.info("Server data directory: %s", db_path)

    # init and run
    mcp_instance = initialise_server(db_path)
    asyncio.run(mcp_instance.run_stdio_async())

if __name__ == "__main__":
    app()

We then run the server python3 -m smarthome_mcp_server:


╭─ FastMCP 2.0 ────────────────────────────────────────────────────────────╮
│                                                                          │
│        _ __ ___ ______           __  __  _____________    ____           │
│    ____                                                                  │
│       _ __ ___ / ____/___ ______/ /_/  |/  / ____/ __   |___   / __    │
│                                                                         │
│      _ __ ___ / /_  / __ `/ ___/ __/ /|_/ / /   / /_/ /  ___/ / / / /    │
│    /                                                                     │
│     _ __ ___ / __/ / /_/ (__  ) /_/ /  / / /___/ ____/  /  __/_/ /_/     │
│    /                                                                     │
│    _ __ ___ /_/    __,_/____/__/_/  /_/____/_/      /_____(_)____/    │
│                                                                          │
│                                                                          │
│                                                                          │
│    🖥️  Server name:     smarthome-mcp-server                              │
│    📦 Transport:       STDIO                                             │
│                                                                          │
│    📚 Docs:            https://gofastmcp.com                             │
│    🚀 Deploy:          https://fastmcp.cloud                             │
│                                                                          │
│    🏎️  FastMCP version: 2.11.2                                            │
│    🤝 MCP version:     1.12.4                                            │
│                                                                          │
╰──────────────────────────────────────────────────────────────────────────╯


[08/19/25 05:02:55] INFO     Starting MCP server              server.py:1445
                             'smarthome-mcp-server' with                    
                             transport 'stdio'     

4. Using the SmartHome Tools

Now that the server has been implemented, we can now define some methods that will interact with the server via a client. This client will be used to register the tools for the Voice Assistant to use.

Coming back to the voice-assistant repo:

from langchain_mcp_adapters.client import MultiServerMCPClient

def get_new_mcp_client() -> MultiServerMCPClient
    return MultiServerMCPClient(
        {
            "smarthome-mcp-server": {
                "command": "smarthome_mcp_server",
                "args": [],
                "transport": "stdio",
            }
        }
    )

This method uses the convenient MultiServerMCPClient class to register our smarthome MCP server for tool usage.

The returned client object then exposes a get_tools method which returns all the tools that the registered servers expose.

mcp_client = get_new_mcp_client()
tools = await mcp_client.get_tools()

Note how we use await here given the get_tools method is asynchronous.

By defining a function called get_mcp_server_tools:

def get_mcp_server_tools():
    mcp_client = get_new_mcp_client()
    tools = await mcp_client.get_tools()
    return tools

this single function can be imported into wherever we define our agent and register the tools for use.

Speech-to-text Implementation

Photo by Franco Antonio Giovanella on Unsplash

Speech-to-text (STT) is where a lot of complexity comes in as it requires realtime IO processing.

STT itself is simple enough to achieve — there are plenty of models out there that we can use. But what makes it complex is the need to be able to constantly listen for a user’s voice input, which consists of a wakeword and a query.

A wakeword is what you normally use to trigger a voice assistant to start listening to you. For example, “Hey Google” or “Hey Siri”, or “Alexa”.

I could write this code entirely myself, but to make things simpler, I had a quick dig around just in case there was something pre-built that I could use.

And to my surprise, I found the package RealtimeSTT (link here) and it works perfectly.

How it works in a nutshell

  1. Create a thread for listening to the user’s voice input. Another for transcribing, which runs the STT model.
  2. If a wakeword is detected, start recording the user’s voice input.
  3. The recorded audio is then sent to the STT model for transcribing, and returns the transcribed text as a string.

To use this package, all we need to do is use the AudioToTextRecorder class as a context manager like below:

from RealtimeSTT import AudioToTextRecorder

with AudioToTextRecorder(
    model='tiny',
    wakeword_backend='oww',
    wake_words='hey jarvis',
    device='cpu',
    wake_word_activation_delay=3.0,
    wake_word_buffer_duration=0.15,
    post_speech_silence_duration=1.0
) as recorder:
    while True:
        # get the transcribed text from recorder
        query = recorder.text()
        if (query is not None) and (query != ""):

            # get response from our langgraph agent
            response_stream = await get_response_stream(
                query, agent_executor, thread_config
            )

            # output the response to device audio
            await stream_voice(response_stream, output_chunk_builder, voice)

We will come back to get_response_stream and stream_voice methods in the next section, since this also involves how we define our agent.

But simply putting together the AudioToTextRecorder context manager in the way we have, we’ve got a working speech -> text -> response mechanism implemented.

If you were to simply replace the get_response_stream with any LLM agent, and replace the stream_voice with any text-to-speech agent, you would have a working voice assistant.

You could also use a simple print statement and you would have a rudimentary chat bot with voice input.

Agent Implementation

Finally, the good stuff — the agent implementation.

I’ve left this as last since it’s a bit more involved. Let’s get stuck in.

LangGraph — What is it?

LangGraph is a framework for building stateful, graph-based workflows with language model agents.

Nodes encapsulate any logic related to an action an LLM agent can take.

Edges encapsulate the logic which determines how to transition from one node to another.

LangGraph implements a prebuilt graph that we can get via the create_react_agent method. The graph looks like this:

Image by author. Graph returned by create_react_agent method

Let’s use this as an example to explain better how nodes and edges work.

As you can see, the graph is very simple:

  • Given a query (the __start__ node)
  • The agent node will receive the query and determine whether it needs to call a tool to be able to respond appropriately.
    • If it does, we transition to the tool node. Once the tool response is received, we go back to the agent node.
    • The agent will repeatedly call the appropriate tools until it determines it has everything it needs.
  • Then, it will return its response (the __end__ node)

The conditional transition between the agent, tools and __end__ node is represented as dashed lines. Then, the question is:

How do we determine which node to go to next?

Well, Langgraph maintains a log of the messages that have been sent, and this represents the state of the graph.

The messages can come from the user, the agent, or a tool. In this example, the agent node will create a message that explicitly states that it will call a tool (exactly how will be revealed in the next section).

The presence of this tool call is what triggers the transition from the agent node to the tools node.

If no tools are called, then the transition from the agent node to the __end__ node is triggered.

It is this check for the presence of tool calls that is implemented in the conditional edge between the agent, tools and __end__ nodes.

In a future article, I’ll go into an example of how I created a custom agent graph to optimise for latency, and demonstrate how exactly these conditional edges and nodes are implemented.

For now, we don’t need to go into too much detail about this since the prebuilt graph is good enough for the scope of this article.

Our Agent Implementation

So, we define a function called get_new_agent like below:

from langgraph.prebuilt import create_react_agent
from langgraph.graph.state import CompiledStateGraph

from voice_assistant.tools.datetime import get_tools as get_datetime_tools


def get_new_agent(
    config, short_term_memory, long_term_memory
) -> CompiledStateGraph:
    """Build and return a new graph that defines the agent workflow."""
    
    # initialise the LLM
    model = init_chat_model(
        model=config.Agent.model,
        model_provider=config.Agent.model_provider,
        temperature=0,
        reasoning=config.Agent.reasoning
    )

    # initialise the tools that the agent will use
    server_tools = await get_mcp_server_tools()

    tools = (
        get_datetime_tools()
        + server_tools
    )

    # build the agent workflow given the LLM, its tools and memory.
    agent_executor = create_react_agent(
        model,
        tools,
        checkpointer=short_term_memory,
        store=long_term_memory
    )

    return agent_executor

which is responsible for:

  1. Initialising the LLM
    • init_chat_model returns the LLM from the specified provider. In our case, we use Ollama as our provider, and llama3.2:latest as our model type.
  2. Defining the full set of tools that the agent will use.
    • We have a function called get_datetime_tools() which returns a list of StructuredTool objects.
    • We also have server_tools, which are the list of tools that our previously mentioned MCP server provides for home automation.
    • Additionally, If we wish to extend the set of tools the agent can use, this is the place to add them.
  3. Construct the agent workflow given the LLM and its tools.
    • Here we call the create_react_agent function from LangGraph.
    • The function can also take in checkpointer and store objects which are used to persist the state of the agent, acting as a short term and long term memory.
    • In the future, if we want to use a custom graph, we can replace the create_react_agent function call with our own implementation.

Handling the Agent Response

Now, we’ve so far implemented all the components that we need to

  1. Get the user query
  2. Get the tools
  3. Create the agent

The next step is to run the agent to get a response for the query, and output it via the Voice method we defined earlier.

Given the user query text that we’ve received from our STT implementation, we format it into a dictionary:

user_query = "Hello world!"
user_query_formatted = {
    "role": "user",
    "content": user_query
}

This dictionary tells the agent that the message is from the user.

We also add a system prompt to set the context and give instructions to the agent:

system_prompt_formatted = {
    "role": "system",
    "content": (
        "You are a voice assistant called Jarvis."
        + " Keep your responses as short as possible."
        + "Do not format your responses using markdown, such as **bold** or _italics. ",
    )
}

These two messages are then passed into the agent to get a response:

response = agent_executor.invoke(
    {"messages" : [system_prompt_formatted, user_query_formatted]},
)

The response is a dictionary of messages (for brevity we omit any superfluous content):

output
> {
    "messages": [
        SystemMessage(
            content="You are a voice assistant called Jarvis.Keep your responses as short as possible.Do not format your responses using markdown, such as **bold** or _italics. ",
            additional_kwargs={},
            ...
        ),
        HumanMessage(
            content="What time is it?",
            additional_kwargs={},
            ...
        ),
        AIMessage(
            content="",
            additional_kwargs={},
            tool_calls=[
                {
                    "name": "get_current_time",
                    "args": {},
                    "id": "b39f7b12-4fba-494a-914a-9d4eaf3dc7d1",
                    "type": "tool_call",
                }
            ],
            ...
        ),
        ToolMessage(
            content="11:32PM",
            name="get_current_time",
            ...
        ),
        AIMessage(
            content="It's currently 11:32 PM.",
            additional_kwargs={},
            ...
        ),
    ]
}

As you can see, the output is a list of all the messages that have been created throughout the graph execution.

The first message will always be a HumanMessage or a SystemMessage since this is what we provided to the agent as input (i.e. the __start__ node).

The remaining are the messages that the agent or tools returned, in the order they were called.

For example, you can see the first AIMessage, the message type generated by the LLM, has a tool call within it which uses a get_current_time tool.

The presence of a tool_calls property in the AIMessage is what triggers the conditional transition from the agent node to the tools node.

Image by author. Graph with conditional edge from agent and tools highlighted in red.

Then you see the ToolMessage which is the response that was returned by the get_current_time tool.

Finally, the model responds with the actual response to the user query. The lack of a tool_calls property in the AIMessage means that the graph should transition to the __end__ node and return the response.

Reducing Latency

Photo by Lukas Blazek on Unsplash

Coming back to invoking the agent to get a response, the issue with using the invoke method is that we wait for the entire workflow to complete before we get a response.

This can take a long time, especially if the agent is addressing a complex query. Meanwhile, the user is waiting idly for the agent to respond, which results in a poor user experience.

So to improve on this, we can use the stream mode in LangGraph to stream the response as they are generated.

This allows us to start voicing the response as they come, rather than waiting for the entire response to be generated and then voicing it all in one go.

output_stream = agent_executor.stream(
    {"messages" : [system_prompt_formatted, user_query_formatted]},
    stream_mode="messages"
)

Here, output_stream is a generator that will yield a tuple of messages and message metadata, as they come.

Note, there is an asynchronous version of this method called astream, which does exactly the same thing but returns an AsyncIterator instead.

If we look at the messages we get after this change:

print([chunk for chunk, metadata in output])

>   AIMessageChunk(
        content="",
        tool_calls=[{"name": "get_current_time", ...}],
        tool_call_chunks=[{"name": "get_current_time", "args": "{}", ...}],
    ),
    ToolMessage(content="01:21AM", name="get_current_time", ...),
    AIMessageChunk(content="It", ...),
    AIMessageChunk(content="'s", additional_kwargs={}, ...),
    AIMessageChunk(content=" currently", ...),
    AIMessageChunk(content=" ",), 
    AIMessageChunk(content="1", ...), 
    AIMessageChunk(content=":", ...), 
    AIMessageChunk(content="21", ...),
    AIMessageChunk(content=" AM", ...),
    AIMessageChunk(content=".", ...),
    AIMessageChunk(content="", ...),

You can now see the tokens are being returned as they are generated.

But this poses a new problem!

We can’t just give the TTS model individual tokens, since it will just pronounce each token one by one, i.e. "It", "'s" will be pronounced separately, which is definitely not what we want.

So, there is a tradeoff that we need to make

While we need to stream the response to minimise user wait time, will still need to wait to accumulate enough tokens that form a meaningful chunk, before sending them to the TTS model.

Building Output Chunks

We therefore handle this complexity by defining an OutputChunkBuilder. So what constitutes a meaningful chunk?

The first thing that comes to mind is to wait until a full sentence, i.e. append all the tokens until it ends with one of ., ?, ;, !.

From trial and error, it has also proven wise to include n in this list as well, when we get a particularly long response from the agent that uses bullet points.

class OutputChunkBuilder:
    def __init__(self):
        self._msg = ""
        self.end_of_sentence = (".", "?", ";", "!", "n")

    def add_chunk(self, message_chunk:str):
        self._msg += message_chunk

    def output_chunk_ready(self) -> bool:
        return self._msg.endswith(self.end_of_sentence)

We achieve this with the above code, consisting of one function that appends message chunks together into a buffer called _msg, and one to check if the collated messages are ready (i.e. is it a full sentence or does it end with a new line).

class OutputChunkBuilder:
    
    ... # omitted for brevity

    def _reset_message(self):
        self._msg = ""

    def get_output_chunk(self):
        msg = self._msg # Get the current message chunk
        self._reset_message()
        return msg

We also implement the get_output_chunk function which will return the messages collated so far, and also reset the message buffer to an empty string so that it is ready for collating the next set of chunks.

This enables us to use logic like below to stream the response, sentence by sentence:

def stream_voice(msg_stream, output_chunk_builder):
    for chunk, metadata in msg_stream:
        # append the chunk to our buffer
        if chunk.content != "":
            output_chunk_builder.add_chunk(chunk.content)

        # speak the output chunk if it is ready
        if output_chunk_builder.output_chunk_ready():
            voice.speak(output_chunk_builder.get_output_chunk())

Tools Implementation

Photo by Barn Images on Unsplash

Lastly, let’s look at how we can implement the tools required to get the current date and time.

This is very straightforward, by far the easiest implementation. Any function that you create can be used as a tool as long as the docstrings are well-written and formatted clearly.

There are two main ways to mark a function as a tool:

  1. Using the @tool decorator from langchain_core.tools
  2. Using the StructuredTool class from langchain_core.tools.structured

For easier unit testing of our tools, we opt for the second option since the first option does not allow us to import the tool function into our tests.

First, write the functions to get the time and date as we would do normally:

# tools/datetime.py

from datetime import datetime
from langchain_core.tools.structured import StructuredTool


def get_now_datetime() -> datetime:
    """Wrapper for easier mocking in unit test."""
    return datetime.now()

def get_current_time() -> str:
    """Get the current time in format HH:MM AM/PM"""
    return get_now_datetime().strftime("%I:%M%p")

Additionally, we write a simple wrapper function called get_now_datetime that returns the current datetime, which also makes it easier to mock in our unit tests.

Next, a function for getting the current date.

def _convert_date_to_words(dt: datetime):
    """Change date values represented in YYYY-mm-dd format to word values as they would be pronounced."""
    day = dt.day
    if day == 1 or day == 21 or day == 31:
        day_word = f"{day}st"
    elif day == 2 or day == 22:
        day_word = f"{day}nd"
    elif day == 3 or day == 23:
        day_word = f"{day}rd"
    else:
        day_word = f"{day}th"

    date_obj = dt.strftime(f"%B {day_word}, %Y")
    return date_obj

def get_current_date() -> str:
    """Get the current date in format YYYY-MM-DD"""
    dt = get_now_datetime()
    dt_str = _convert_date_to_words(dt)
    return dt_str

We have to be careful here — different text-to-speech (TTS) models have varying abilities when it comes to text normalisation.

Example

If the function get_current_date returns the string 01-01-2025, the TTS model may pronounce this as ‘oh one oh one twenty twenty five‘.

To make our implementation robust to such differences, we normalise the date string to be clearer in how the date should be pronounced using the _convert_date_to_words function.

In doing so, we convert a datetime object like datetime(2025, 1, 1) into January 1st, 2025.

Finally, we write a get_tools function which will wrap up the get_current_time and get_current_date methods into a StructuredTool, and return them in a list:

def get_tools():
    """Get a list of tools for the agent.

    Returns:
        A list of tool functions available to the agent.
    """
    return [
        StructuredTool.from_function(get_current_time),
        StructuredTool.from_function(get_current_date),
    ]

thereby allowing us to import this function and callling it when we create the agent, as we saw in the agent implementation section.

Putting it all together to build our Agent

Now, we’ve gone through the individual components that make up our voice assistant, time to assemble them together.

# main.py

from RealtimeSTT import AudioToTextRecorder
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.store.sqlite.aio import AsyncSqliteStore

from voice_assistant.agent import get_new_agent, get_response_stream
from voice_assistant.voice import KokoroVoice
from settings import load_config


async def main():

    conf = load_config()
    voice = KokoroVoice(**conf.KokoroVoice)
    output_chunk_builder = OutputChunkBuilder()
    thread_config = {"configurable": {"thread_id": "abc123"}}

    # short term memory
    async with AsyncSqliteSaver.from_conn_string(conf.Agent.memory.checkpointer) as saver:
            
            # long term memory
            async with AsyncSqliteStore.from_conn_string(conf.Agent.memory.store) as store:
                
                agent_executor = await get_new_agent(conf, saver, store)

                with AudioToTextRecorder(**conf.AudioToTextRecorder) as recorder:
                    while True:
                        query = recorder.text()
                        if (query is not None) and (query != ""):
                            response_stream = await get_response_stream(
                                query, agent_executor, thread_config
                            )
                            await stream_voice(response_stream, output_chunk_builder, voice)


if __name__ == "__main__":
    asyncio.run(main())

Firstly, we load in our Yaml config file using OmegaConf (link here). The settings module and the load_config implementation is like below:

# settings.py

import logging
from pathlib import Path
from omegaconf import OmegaConf


logger = logging.getLogger(__name__)


CONFIG_PATH = Path(__file__).parents[1] / "conf" / "config.yaml"


def load_config():
    logger.debug(f"Loading config from: {CONFIG_PATH}")
    return OmegaConf.load(CONFIG_PATH)

Secondly, we use SQL databases to store our short and long term memory — this is done using the AsyncSqliteSaver and AsyncSqliteStore classes from the checkpoint and store modules in langgraph.

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from langgraph.store.sqlite.aio import AsyncSqliteStore

    ... # omitted for brevity 

    # short term memory
    async with AsyncSqliteSaver.from_conn_string(conf.Agent.memory.checkpointer) as saver:
            
            # long term memory
             async with AsyncSqliteStore.from_conn_string(conf.Agent.memory.store) as store:
                 
                 agent_executor = await get_new_agent(conf, saver, store)
                 ... # omitted for brevity
    

Then, in a while loop, the STT thread records the user’s voice input after a wakeword is detected, which is then passed to the agent for processing.

The agent response is returned as an AsyncIterator, which we then stream to the device speakers using the stream_voice function.

The stream_voice function looks like this:

async def stream_voice(
    msg_stream: AsyncGenerator,
    output_chunk_builder: OutputChunkBuilder,
    voice: Voice
):
    """Stream messages from the agent to the voice output."""
    async for chunk, metadata in msg_stream:
        if metadata["langgraph_node"] == "agent":
            # build up message chunks until a full sentence is received.
            if chunk.content != "":
                output_chunk_builder.add_chunk(chunk.content)

            if output_chunk_builder.output_chunk_ready():
                voice.speak(output_chunk_builder.get_output_chunk())

    # if we have anything left in the buffer, speak it.
    if output_chunk_builder.current_message_length() > 0:
        voice.speak(output_chunk_builder.get_output_chunk())

Which is the same logic as we already discussed before in the Building Output Chunks section, but with some small tweaks.

It turns out, not all responses end with a punctuation mark.

For example, when the LLM uses bullet points in their response, I’ve found they omit the punctuation for each bullet point.

So, we make sure to flush our buffer at the end if it isn’t empty.

We also filter out any messages that are not from the agent, as we don’t want to stream the user’s input or the tool responses back to the device speakers. We do this by checking the langgraph_node metadata key, and only speaking the message if it comes from the agent.

And turns out, that’s all you need to build a fully functioning voice assistant.

Final Remarks

Overall, I’ve been pleasantly surprised at how easy it was to build this out.

Sure, there are certainly more optimisations that can be made, but given I’ve been able to get the full functionality working within two weeks (whilst working a full-time job), I’m happy with the results.

But we’re not done yet.

There are a whole load of things I couldn’t discuss to stop this article becoming an entire book, such as the additional optimisations I’ve had to make to make the voice assistant quicker, so this will be covered in my next article.

For those of you who enjoyed this article, check out my other articles on Medium, at https://medium.com/@bl3e967

Related articles

Related Posts

Leave a Reply

Your email address will not be published. Required fields are marked *