diff --git a/community/google_document_ai/.env.Example b/community/google_document_ai/.env.Example new file mode 100644 index 00000000..2ae62d62 --- /dev/null +++ b/community/google_document_ai/.env.Example @@ -0,0 +1,16 @@ + +OPENAI_API_KEY= +GOOGLE_APPLICATION_CREDENTIALS={ + "type": "service_account", +} + +GOOGLE_PROCESSOR_ID= +GOOGLE_PROJECT_ID= +GOOGLE_LOCATION= + +# Restack Cloud (Optional) + +# RESTACK_ENGINE_ID= +# RESTACK_ENGINE_API_KEY= +# RESTACK_ENGINE_ADDRESS= +# RESTACK_ENGINE_API_ADDRESS= diff --git a/community/google_document_ai/.gitignore b/community/google_document_ai/.gitignore new file mode 100644 index 00000000..0dd162c4 --- /dev/null +++ b/community/google_document_ai/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +.env +poetry.lock +service_account.json \ No newline at end of file diff --git a/community/google_document_ai/Dockerfile b/community/google_document_ai/Dockerfile new file mode 100644 index 00000000..3d200846 --- /dev/null +++ b/community/google_document_ai/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install necessary system packages +RUN apt-get update && apt-get install -y \ + libglib2.0-0 \ + libgl1-mesa-glx \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install poetry + +COPY pyproject.toml ./ + +COPY . . + +# Configure poetry to not create virtual environment +RUN poetry config virtualenvs.create false + +# Install dependencies +RUN poetry install --no-interaction --no-ansi + +# Expose port 80 +EXPOSE 80 + +CMD poetry run python -m src.services \ No newline at end of file diff --git a/community/google_document_ai/README.md b/community/google_document_ai/README.md new file mode 100644 index 00000000..431646b8 --- /dev/null +++ b/community/google_document_ai/README.md @@ -0,0 +1,68 @@ +# Restack AI - PDF OCR and LLM summary + +## Motivation + +Demonstrates how to scale multi step workflows. +Use pytorch to OCR and OpenAI to make a summary. + +## Prerequisites + +- Docker (for running Restack) +- Python 3.10 or higher +- Poetry (for dependency management) + +## Start Restack + +To start the Restack, use the following Docker command: + +```bash +docker run -d --pull always --name restack -p 5233:5233 -p 6233:6233 -p 7233:7233 ghcr.io/restackio/restack:main +``` + +## Start python shell + +```bash +poetry env use 3.12 && poetry shell +``` + +## Install dependencies + +```bash +poetry install +``` + +```bash +poetry env info # Optional: copy the interpreter path to use in your IDE (e.g. Cursor, VSCode, etc.) +``` + +```bash +poetry run dev +``` + +## Run workflows + +### from UI + +You can run workflows from the UI by clicking the "Run" button. + +![Run workflows from UI](./ui-screenshot.png) + +### from API + +You can run workflows from the API by using the generated endpoint: + +`POST http://localhost:6233/api/workflows/TranscribeTranslateWorkflow` + +### from any client + +You can run workflows with any client connected to Restack, for example: + +```bash +poetry run schedule +``` + +executes `schedule_workflow.py` which will connect to Restack and execute the `TranscribeTranslateWorkflow` workflow. + +## Deploy on Restack Cloud + +To deploy the application on Restack, you can create an account at [https://console.restack.io](https://console.restack.io) diff --git a/community/google_document_ai/pyproject.toml b/community/google_document_ai/pyproject.toml new file mode 100644 index 00000000..85921b9e --- /dev/null +++ b/community/google_document_ai/pyproject.toml @@ -0,0 +1,28 @@ +[tool.poetry] +name = "google_document_ai" +version = "0.0.1" +description = "A simple example to use Google Document AI to OCR a pdf and then use OpenAI to summarize the text" +authors = [ + "Restack Team ", +] +readme = "README.md" +packages = [{include = "src"}] + +[tool.poetry.dependencies] +python = ">=3.12,<4.0" +openai = "^1.57.2" +python-dotenv = "1.0.1" +pydantic = "^2.10.3" +google-cloud-documentai = "^3.1.0" +restack-ai = "^0.0.54" +watchfiles = "^1.0.4" +python-doctr = {extras = ["torch"], version = "^0.10.0"} + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +dev = "src.services:watch_services" +services = "src.services:run_services" + diff --git a/community/google_document_ai/screenshot-run.png b/community/google_document_ai/screenshot-run.png new file mode 100644 index 00000000..b4ed16ef Binary files /dev/null and b/community/google_document_ai/screenshot-run.png differ diff --git a/community/google_document_ai/src/__init__.py b/community/google_document_ai/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/google_document_ai/src/client.py b/community/google_document_ai/src/client.py new file mode 100644 index 00000000..b71efb06 --- /dev/null +++ b/community/google_document_ai/src/client.py @@ -0,0 +1,20 @@ +import os +from restack_ai import Restack +from restack_ai.restack import CloudConnectionOptions +from dotenv import load_dotenv +# Load environment variables from a .env file +load_dotenv() + + +engine_id = os.getenv("RESTACK_ENGINE_ID") +address = os.getenv("RESTACK_ENGINE_ADDRESS") +api_key = os.getenv("RESTACK_ENGINE_API_KEY") +api_address = os.getenv("RESTACK_ENGINE_API_ADDRESS") + +connection_options = CloudConnectionOptions( + engine_id=engine_id, + address=address, + api_key=api_key, + api_address=api_address +) +client = Restack(connection_options) \ No newline at end of file diff --git a/community/google_document_ai/src/functions/__init__.py b/community/google_document_ai/src/functions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/google_document_ai/src/functions/google_client.py b/community/google_document_ai/src/functions/google_client.py new file mode 100644 index 00000000..03048d6e --- /dev/null +++ b/community/google_document_ai/src/functions/google_client.py @@ -0,0 +1,18 @@ +import os +import json +from google.oauth2 import service_account +from google.cloud import documentai_v1 as documentai + +def google_client(): + credentials_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'service_account.json') + if not os.path.exists(credentials_path): + raise FileNotFoundError("Service account credentials file not found.") + with open(credentials_path, 'r') as file: + credentials_dict = json.load(file) + if not credentials_dict: + raise ValueError("Credentials are not properly configured in the service account file.") + credentials = service_account.Credentials.from_service_account_info(credentials_dict) + client = documentai.DocumentProcessorServiceAsyncClient(credentials=credentials) + if not client: + raise Exception("Failed to initialize the Document AI client with the provided credentials.") + return client \ No newline at end of file diff --git a/community/google_document_ai/src/functions/google_doc_ai.py b/community/google_document_ai/src/functions/google_doc_ai.py new file mode 100644 index 00000000..0eb1e403 --- /dev/null +++ b/community/google_document_ai/src/functions/google_doc_ai.py @@ -0,0 +1,35 @@ +import os +import requests +from restack_ai.function import function, FunctionFailure +from pydantic import BaseModel, Field +from google.cloud import documentai_v1 as documentai + +from .google_client import google_client + +class OcrInput(BaseModel): + file_type: str = Field(default='application/pdf') + file_name: str + +@function.defn() +async def google_doc_ai_pdf(input: OcrInput): + try: + response = requests.get(f"{os.getenv('RESTACK_API_ADDRESS', 'http://localhost:6233')}/api/download/{input.file_name}") + response.raise_for_status() + content = response.content + + if input.file_type != "application/pdf": + raise FunctionFailure("Unsupported file type", non_retryable=True) + + from doctr.io import DocumentFile + doc = DocumentFile.from_pdf(content) + except Exception as e: + raise FunctionFailure("Error downloading file", non_retryable=True) + + client = google_client() + name = f"projects/{os.getenv('GOOGLE_PROJECT_ID')}/locations/{os.getenv('GOOGLE_LOCATION')}/processors/{os.getenv('GOOGLE_PROCESSOR_ID')}" + + inline_document = documentai.Document(content=content, mime_type=input.file_type) + request = documentai.ProcessRequest(inline_document=inline_document, name=name) + response = await client.process_document(request=request) + + return response.document.text diff --git a/community/google_document_ai/src/functions/openai_chat.py b/community/google_document_ai/src/functions/openai_chat.py new file mode 100644 index 00000000..fa259bca --- /dev/null +++ b/community/google_document_ai/src/functions/openai_chat.py @@ -0,0 +1,38 @@ +from pydantic import BaseModel +from restack_ai.function import function, log, FunctionFailure +from openai import OpenAI +import os +from dotenv import load_dotenv + +load_dotenv() + +class OpenAiChatInput(BaseModel): + user_content: str + system_content: str | None = None + model: str | None = None + +@function.defn() +async def openai_chat(input: OpenAiChatInput) -> str: + try: + log.info("openai_chat function started", input=input) + + + if (os.environ.get("OPENAI_API_KEY") is None): + raise FunctionFailure("OPENAI_API_KEY is not set", non_retryable=True) + + client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) + + messages = [] + if input.system_content: + messages.append({"role": "system", "content": input.system_content}) + messages.append({"role": "user", "content": input.user_content}) + + response = client.chat.completions.create( + model=input.model or "gpt-4o-mini", + messages=messages + ) + log.info("openai_chat function completed", response=response) + return response.choices[0].message.content + except Exception as e: + log.error("openai_chat function failed", error=e) + raise e diff --git a/community/google_document_ai/src/services.py b/community/google_document_ai/src/services.py new file mode 100644 index 00000000..08681d5b --- /dev/null +++ b/community/google_document_ai/src/services.py @@ -0,0 +1,33 @@ +import asyncio +from src.functions.openai_chat import openai_chat +from src.client import client +from src.workflows.pdf import PdfWorkflow +from src.workflows.files import FilesWorkflow +from src.functions.google_doc_ai import google_doc_ai_pdf +from watchfiles import run_process +import webbrowser +import os + +async def main(): + + await asyncio.gather( + await client.start_service( + workflows= [PdfWorkflow, FilesWorkflow], + functions= [google_doc_ai_pdf, openai_chat] + ) + ) + +def run_services(): + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Service interrupted by user. Exiting gracefully.") + +def watch_services(): + watch_path = os.getcwd() + print(f"Watching {watch_path} and its subdirectories for changes...") + webbrowser.open("http://localhost:5233") + run_process(watch_path, recursive=True, target=run_services) + +if __name__ == "__main__": + run_services() \ No newline at end of file diff --git a/community/google_document_ai/src/workflows/__init__.py b/community/google_document_ai/src/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/google_document_ai/src/workflows/files.py b/community/google_document_ai/src/workflows/files.py new file mode 100644 index 00000000..71ed379d --- /dev/null +++ b/community/google_document_ai/src/workflows/files.py @@ -0,0 +1,39 @@ +from restack_ai.workflow import workflow, log, workflow_info +from typing import List +from pydantic import BaseModel, Field +import asyncio +from .pdf import PdfWorkflow, PdfWorkflowInput + + +class FilesWorkflowInput(BaseModel): + files_upload: List[dict] = Field(files=True) + +@workflow.defn() +class FilesWorkflow: + @workflow.run + async def run(self, input: FilesWorkflowInput): + tasks = [] + parent_workflow_id = workflow_info().workflow_id + + for index, pdf_input in enumerate(input.files_upload, start=1): + log.info(f"Queue PdfWorkflow {index} for execution") + # Ensure child workflows are started and return an awaitable + task = workflow.child_execute( + PdfWorkflow, + workflow_id=f"{parent_workflow_id}-pdf-{index}", + input=PdfWorkflowInput( + file_upload=[pdf_input] + ) + ) + # Wrap the task in an asyncio.ensure_future to ensure it's awaitable + tasks.append(asyncio.ensure_future(task)) + + # Await all tasks at once to run them in parallel + results = await asyncio.gather(*tasks) + + for i, result in enumerate(results, start=1): + log.info(f"PdfWorkflow {i} completed", result=result) + + return { + "results": results + } diff --git a/community/google_document_ai/src/workflows/pdf.py b/community/google_document_ai/src/workflows/pdf.py new file mode 100644 index 00000000..50d9d683 --- /dev/null +++ b/community/google_document_ai/src/workflows/pdf.py @@ -0,0 +1,41 @@ +from restack_ai.workflow import workflow, import_functions, log, RetryPolicy +from datetime import timedelta +from pydantic import BaseModel,Field +from typing import List + +with import_functions(): + from src.functions.google_doc_ai import google_doc_ai_pdf, OcrInput + from src.functions.openai_chat import openai_chat, OpenAiChatInput + +class PdfWorkflowInput(BaseModel): + file_upload: List[dict] = Field(files=True) + +@workflow.defn() +class PdfWorkflow: + @workflow.run + async def run(self, input: PdfWorkflowInput): + log.info("PdfWorkflow started") + + ocr_result = await workflow.step( + google_doc_ai_pdf, + OcrInput( + file_type=input.file_upload[0]['type'], + file_name=input.file_upload[0]['name'] + ), + start_to_close_timeout=timedelta(seconds=120), + retry_policy=RetryPolicy( + maximum_attempts=1 + ) + ) + + llm_result = await workflow.step( + openai_chat, + OpenAiChatInput( + user_content=f"Make a summary of that PDF. Here is the OCR result: {ocr_result}", + model="gpt-4o-mini" + ), + start_to_close_timeout=timedelta(seconds=120) + ) + + log.info("PdfWorkflow completed") + return llm_result