Building an Interactive UI for Llamaindex Workflows

Author:Murphy  |  View: 25899  |  Time: 2025-03-22 20:24:39

In my last article, I demonstrated how I use LlamaIndex workflows to streamline my research and presentation process. I built a workflow that takes a research topic, finds related articles on arxiv.org, creates summaries for the papers, and generates a PowerPoint slide deck to present the papers. You can read the full walk-through here:

How I Streamline My Research and Presentation with LlamaIndex Workflows

To continue building on the workflow and make it more user-friendly, I implemented a UI with Streamlit to enhance the user experience. The UI displays progress updates of the workflow execution, integrates user input, enables real-time user feedback, and renders the final generated slide deck.

The Streamlit UI (Screen recording by author)

You can check out the full code on my Github. In this article, I will walk through some key points on the UI implementation and the integration between the frontend and the backend:

Backend enhancements:

  • Update the workflow to support sending streaming event
  • Update the workflow to pause execution and wait for user inputs
  • Host several endpoints using FastAPI for running workflow, accepting user inputs, and downloading files, enabling async processes and streaming messages

Frontend UI features:

  • Send requests to the backend and display event data streamed from the backend in an expander
  • Display related information in a container and collect user input, if user input is required
  • Render the final generated slide deck
  • Provide a button for the user to download the final file

Putting it all together:

  • Separate frontend and backend dependencies and build by using distinct pyproject.toml and Dockerfile
  • Use docker-compose to build launch all services

When starting the workflow from the terminal, it is straightforward to see which step it is executing and the logging we put in those steps.

Terminal log for the workflow execution (Screenshot by author)

We can also enable the human-in-the-loop interaction by simply using user_feedback = input()in the workflow. This will pause the workflow and wait for the user input (See the human-in-the-loop example in this official Llamaindex notebook). However, to be able to achieve the same functionality in a user-friendly interface, we need additional modifications to the original workflow.

Sending streaming events from the workflow

Workflow can take a long time to execute, so for a better user experience, Llamaindex provided a way to send streaming events to indicate the progress of the workflow, as shown in the notebook here. In my workflow, I define a WorkflowStreamingEvent class to include useful information about the event message, such as the type of the event, and from which step it is sent:

class WorkflowStreamingEvent(BaseModel):
    event_type: Literal["server_message", "request_user_input"] = Field(
        ..., description="Type of the event"
    )
    event_sender: str = Field(
        ..., description="Sender (workflow step name) of the event"
    )
    event_content: Dict[str, Any] = Field(..., description="Content of the event")

To enable sending streaming events, the workflow step needs to have access to the shared context, which is done by adding @step(pass_context=True) decorator to the step definition. Then in the step definition, we can send event messages about the progress through the context. For example, in the tavily_query() step:

@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
    ctx.data["research_topic"] = ev.user_query
    query = f"arxiv papers about the state of the art of {ev.user_query}"
    ctx.write_event_to_stream(
        Event(
            msg=WorkflowStreamingEvent(
                event_type="server_message",
                event_sender=inspect.currentframe().f_code.co_name,
                event_content={"message": f"Querying Tavily with: '{query}'"},
            ).model_dump()
        )
    )

In this example, we set the event_type to be "server_message" . It means that it is an update message and no user action is required. We have another type of event "request_user_input" that indicates a user input is needed. For example, in the gather_feedback_outline() step in the workflow, after generating the slide text outlines from the original paper summary, a message is sent to prompt the user to provide approval and feedback on the outline text:

@step(pass_context=True)
    async def gather_feedback_outline(
        self, ctx: Context, ev: OutlineEvent
    ) -> OutlineFeedbackEvent | OutlineOkEvent:
        """Present user the original paper summary and the outlines generated, gather feedback from user"""
        ...

        # Send a special event indicating that user input is needed
        ctx.write_event_to_stream(
            Event(
                msg=json.dumps(
                    {
                        "event_type": "request_user_input",
                        "event_sender": inspect.currentframe().f_code.co_name,
                        "event_content": {
                            "summary": ev.summary,
                            "outline": ev.outline.dict(),
                            "message": "Do you approve this outline? If not, please provide feedback.",
                        },
                    }
                )
            )
        )

        ...

These events are handled differently in the backend API and the frontend logic, which I will describe in detail in the later sections of this article.

Pausing the workflow to wait for user input

Workflow steps that requires user feedback (Image by author)

When sending a "request_user_input" event to the user, we only want to proceed to the next step after we have received the user input. As shown in the workflow diagram above, it either proceeds to the outlines_with_layout()step if the user approves the outline, or to the summary2outline() step again if the user does not approve.

This is achieved using the Future() object from Python's asyncio library. In the SlideGenerationWorkflow class, we set an attribute self.user_input_future = asyncio.Future() that can be waited on in the gather_feedback_outline() step. The subsequent execution of the workflow is conditioned on the content of the user feedback:

@step(pass_context=True)
async def gather_feedback_outline(
    self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
    ...

    # Wait for user input
    if not self.user_input_future.done():
        user_response = await self.user_input_future
        logger.info(f"gather_feedback_outline: Got user response: {user_response}")

        # Process user_response, which should be a JSON string
        try:
            response_data = json.loads(user_response)
            approval = response_data.get("approval", "").lower().strip()
            feedback = response_data.get("feedback", "").strip()
        except json.JSONDecodeError:
            # Handle invalid JSON
            logger.error("Invalid user response format")
            raise Exception("Invalid user response format")

        if approval == ":material/thumb_up:":
            return OutlineOkEvent(summary=ev.summary, outline=ev.outline)
        else:
            return OutlineFeedbackEvent(
                summary=ev.summary, outline=ev.outline, feedback=feedback
            )

The FastAPI backend

We set up the backend using fastAPI, expose a POST endpoint to handle requests, and initiate the workflow run. The asynchronous function run_workflow_endpoint() takes ResearchTopic as input. In the function, an asynchronous generator event_generator() is defined, which creates a task to run the workflow and streams the events to the client as the workflow progresses. When the workflow finishes, it will also stream the final file results to the client.


class ResearchTopic(BaseModel):
    query: str = Field(..., example="example query")

@app.post("/run-slide-gen")
async def run_workflow_endpoint(topic: ResearchTopic):
    workflow_id = str(uuid.uuid4())

    wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
    wf.add_workflows(
        summary_gen_wf=SummaryGenerationWorkflow(
            wid=workflow_id, timeout=800, verbose=True
        )
    )
    wf.add_workflows(
        slide_gen_wf=SlideGenerationWorkflow(
            wid=workflow_id, timeout=1200, verbose=True
        )
    )

    async def event_generator():
        loop = asyncio.get_running_loop()
        logger.debug(f"event_generator: loop id {id(loop)}")
        yield f"{json.dumps({'workflow_id': workflow_id})}nn"

        task = asyncio.create_task(wf.run(user_query=topic.query))
        logger.debug(f"event_generator: Created task {task}")
        try:
            async for ev in wf.stream_events():
                logger.info(f"Sending message to frontend: {ev.msg}")
                yield f"{ev.msg}nn"
                await asyncio.sleep(0.1)  # Small sleep to ensure proper chunking
            final_result = await task

            # Construct the download URL
            download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
            download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"

            final_result_with_url = {
                "result": final_result,
                "download_pptx_url": download_pptx_url,
                "download_pdf_url": download_pdf_url,
            }

            yield f"{json.dumps({'final_result': final_result_with_url})}nn"
        except Exception as e:
            error_message = f"Error in workflow: {str(e)}"
            logger.error(error_message)
            yield f"{json.dumps({'event': 'error', 'message': error_message})}nn"
        finally:
            # Clean up
            workflows.pop(workflow_id, None)

    return StreamingResponse(event_generator(), media_type="text/event-stream")

In addition to this endpoint, there are endpoints for receiving user input from the client and handling file download requests. Since each workflow is assigned a unique workflow ID, we can map the user input received from the client to the correct workflow. By call the set_result() on the awaiting Future, the pending workflow can resume execution.

@app.post("/submit_user_input")
async def submit_user_input(data: dict = Body(...)):
    workflow_id = data.get("workflow_id")
    user_input = data.get("user_input")
    wf = workflows.get(workflow_id)
    if wf and wf.user_input_future:
        loop = wf.user_input_future.get_loop()  # Get the loop from the future
        logger.info(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
        if not wf.user_input_future.done():
            loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
            logger.info("submit_user_input: set_result called")
        else:
            logger.info("submit_user_input: future already done")
        return {"status": "input received"}
    else:
        raise HTTPException(
            status_code=404, detail="Workflow not found or future not initialized"
        )

The download endpoint also identifies where the final file is located based on the workflow ID.

@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
    file_path = (
        Path(settings.WORKFLOW_ARTIFACTS_PATH)
        / "SlideGenerationWorkflow"
        / workflow_id
        / "final.pptx"
    )
    if file_path.exists():
        return FileResponse(
            path=file_path,
            media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
            filename=f"final.pptx",
        )
    else:
        raise HTTPException(status_code=404, detail="File not found")

The Streamlit frontend

In the frontend page, after the user submits the research topic through st.text_input(), a long-running process is started in a background thread in a new event loop for receiving the streamed events from the backend, without interfering with the rest of the page:

def start_long_running_task(url, payload, message_queue, user_input_event):
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(
            get_stream_data(url, payload, message_queue, user_input_event)
        )
        loop.close()
    except Exception as e:
        message_queue.put(("error", f"Exception in background thread: {str(e)}"))

...

def main():

  ...

  with st.sidebar:
      with st.form(key="slide_gen_form"):
          query = st.text_input(
              "Enter the topic of your research:",
          )
          submit_button = st.form_submit_button(label="Submit")

  if submit_button:
      # Reset the workflow_complete flag for a new workflow
      st.session_state.workflow_complete = False
      # Start the long-running task in a separate thread
      if (
          st.session_state.workflow_thread is None
          or not st.session_state.workflow_thread.is_alive()
      ):
          st.write("Starting the background thread...")

          st.session_state.workflow_thread = threading.Thread(
              target=start_long_running_task,
              args=(
                  "http://backend:80/run-slide-gen",
                  {"query": query},
                  st.session_state.message_queue,
                  st.session_state.user_input_event,
              ),
          )
          st.session_state.workflow_thread.start()
          st.session_state.received_lines = []
      else:
          st.write("Background thread is already running.")

The event data streamed from the backend is fetched by httpx.AsyncClient and put into a message queue for further processing. Different information is extracted depending on the event types. For event type "request_user_input", the thread is also paused until the user input is provided.

async def fetch_streaming_data(url: str, payload: dict = None):
    async with httpx.AsyncClient(timeout=1200.0) as client:
        async with client.stream("POST", url=url, json=payload) as response:
            async for line in response.aiter_lines():
                if line:
                    yield line

async def get_stream_data(url, payload, message_queue, user_input_event):
    # message_queue.put(("message", "Starting to fetch streaming data..."))
    data_json = None
    async for data in fetch_streaming_data(url, payload):
        if data:
            try:
                data_json = json.loads(data)
                if "workflow_id" in data_json:
                    # Send workflow_id to main thread
                    message_queue.put(("workflow_id", data_json["workflow_id"]))
                    continue
                elif "final_result" in data_json:
                    # Send final_result to main thread
                    message_queue.put(("final_result", data_json["final_result"]))
                    continue
                event_type = data_json.get("event_type")
                event_sender = data_json.get("event_sender")
                event_content = data_json.get("event_content")
                if event_type in ["request_user_input"]:
                    # Send the message to the main thread
                    message_queue.put(("user_input_required", data_json))
                    # Wait until user input is provided
                    user_input_event.wait()
                    user_input_event.clear()
                    continue
                else:
                    # Send the line to the main thread
                    message_queue.put(("message", format_workflow_info(data_json)))
            except json.JSONDecodeError:  # todo: is this necessary?
                message_queue.put(("message", data))
        if data_json and "final_result" in data_json or "final_result" in str(data):
            break  # Stop processing after receiving the final result

We store the messages in the st.session_state and use a st.expander() to display and update these streamed data.

if st.session_state.received_lines:
    with expander_placeholder.container():
        # Create or update the expander with the latest truncated line
        expander = st.expander(st.session_state.expander_label)
        for line in st.session_state.received_lines:
            expander.write(line)
            expander.divider()

To ensure the UI remains responsive and displays the event messages when they are being processed in a background thread, we use a customed autorefresh component to refresh the page at a set interval:

if not st.session_state.workflow_complete:
    st_autorefresh(interval=2000, limit=None, key="data_refresh")

When the streamed event is of type "request_user_input", we will display related information in a separate container and gather user feedback. As there can be multiple events that require user input from one workflow run, we put them in a message queue and make sure to assign a unique key to the st.feedback(), st.text_area() and st.button() that are linked to each event to ensure the widgets don't interfere with each other:

def gather_outline_feedback(placeholder):
    container = placeholder.container()
    with container:
        if st.session_state.user_input_required:
            data = st.session_state.user_input_prompt
            event_type = data.get("event_type")
            if event_type == "request_user_input":
                summary = data.get("event_content").get("summary")
                outline = data.get("event_content").get("outline")
                prompt_message = data.get("event_content").get(
                    "message", "Please review the outline."
                )

                # display the content for user input
                st.markdown("## Original Summary:")
                st.text_area("Summary", summary, disabled=True, height=400)
                st.divider()
                st.markdown("## Generated Slide Outline:")
                st.json(outline)
                st.write(prompt_message)

                # Define unique keys for widgets
                current_prompt = st.session_state.prompt_counter
                approval_key = f"approval_state_{current_prompt}"
                feedback_key = f"user_feedback_{current_prompt}"

                # Display the approval feedback widget
                approval = st.feedback("thumbs", key=approval_key)
                st.write(f"Current Approval state is: {approval}")
                logging.info(f"Current Approval state is: {approval}")

                # Display the feedback text area
                feedback = st.text_area(
                    "Please provide feedback if you have any:", key=feedback_key
                )

                # Handle the submission of user response
                if st.button(
                    "Submit Feedback", key=f"submit_response_{current_prompt}"
                ):
                    if not st.session_state.user_response_submitted:
                        # Retrieve approval and feedback using unique keys
                        approval_state = st.session_state.get(approval_key)
                        user_feedback = st.session_state.get(feedback_key, "")

                        # Ensure approval_state is valid
                        if approval_state not in [0, 1]:
                            st.error("Please select an approval option.")
                            return

                        user_response = {
                            "approval": (
                                ":material/thumb_down:"
                                if approval_state == 0
                                else ":material/thumb_up:"
                            ),
                            "feedback": user_feedback,
                        }
                        # Send the user's response to the backend

                        try:
                            response = requests.post(
                                "http://backend:80/submit_user_input",
                                json={
                                    "workflow_id": st.session_state.workflow_id,
                                    "user_input": json.dumps(user_response),
                                },
                            )
                            response.raise_for_status()
                            logging.info(
                                f"Backend response for submitting approval: {response.status_code}"
                            )
                        except requests.RequestException as e:
                            st.error(f"Failed to submit user input: {str(e)}")
                            return

     ...

In the end, when the workflow run finally finishes, the frontend client will get a response that contains the path to the final generated files (same slide deck in pdf format for rendering in the UI and pptx format for downloading as the final result). We display the pdf file and create a button for downloading the pptx file:

  if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
      download_url_pdf = st.session_state.download_url_pdf
      try:
          # Fetch the PDF content
          pdf_response = requests.get(download_url_pdf)
          pdf_response.raise_for_status()
          st.session_state.pdf_data = pdf_response.content

          st.markdown("### Generated Slide Deck:")
          # Display the PDF using an iframe
          st.markdown(
              f'',
              unsafe_allow_html=True,
          )
      except Exception as e:
          st.error(f"Failed to load the PDF file: {str(e)}")

  # Provide the download button for PPTX if available
  if (
      "download_url_pptx" in st.session_state
      and st.session_state.download_url_pptx
  ):
      download_url_pptx = st.session_state.download_url_pptx
      try:
          # Fetch the PPTX content
          pptx_response = requests.get(download_url_pptx)
          pptx_response.raise_for_status()
          pptx_data = pptx_response.content

          st.download_button(
              label="Download Generated PPTX",
              data=pptx_data,
              file_name="generated_slides.pptx",
              mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
          )
      except Exception as e:
          st.error(f"Failed to load the PPTX file: {str(e)}")

Putting everything together with docker-compose

We will create a multi-service Docker application with docker-compose to run the frontend and backend apps.

version: '3.8'

services:
  backend:
    build:
      context: ./backend
      args:
        - --no-cache
    ports:
      - "8000:80"
    networks:
      - app-network
    volumes:
      - .env:/app/.env
      - ./data:/app/data
      - ./workflow_artifacts:/app/workflow_artifacts
      - ~/.azure:/root/.azure

  frontend:
    build:
      context: ./frontend
      args:
        - --no-cache
    ports:
      - "8501:8501"
    networks:
      - app-network

networks:
  app-network:

That's it! Just run docker-compose up, and we now have an app that can run a research workflow based on the user's input query, prompt the user for feedback during the execution, and display the final result to the user.


Thank you for reading! Check out my GitHub for the complete implementation. I look forward to hearing your thoughts, input, and feedbacks. I work as a Data Science Consultant at Inmeta, part of Crayon Group. Feel free to connect with me on LinkedIn.

Tags: AI Genai Llamaindex Streamlit UI

Comment