initial commit

This commit is contained in:
Logan Markewich
2023-12-29 15:55:13 -06:00
parent eaa9b6cdf8
commit 51a2ba85b1
17 changed files with 494 additions and 0 deletions
+4
View File
@@ -0,0 +1,4 @@
*.zip
__pycache__
pika*
venv
+142
View File
@@ -0,0 +1,142 @@
# LlamaIndex <> AWS
This repository contains the code needed to setup and configure a complete ingestion and retrieval API, deployed to amazon AWS.
The following tech stack is used:
- AWS Lambda for ingestion and retrieval with LlamaIndex
- RabbitMQ for queuing ingestion jobs
- A custom docker image for ingesting data with LlamaIndex
- Huggingface Text Embedding Interface for embedding our data
## Setup
First, ensure you have an AWS account. Ensure you have some quota room for G5 EC2 nodes.
Once you have an account, the following dependencies are needed:
- [awscli]()
- [eksctl]()
- [kubectl]()
- [krew]()
- [RabbitMQ krew package]()
- [Docker]()
### 1. Deploying Text Embedding Inteface
```bash
cd tei
sh setup.sh
```
This will create a cluster using eksctl, using g5.xlarge nodes. You can adjust the `--nodes` argument as needed, as well as the number of replicas in the `tei-deployment.yaml` file.
Note the public URL when you run `kubectl get svc`. The URL under `external IP` will be used in `./worker/woker-deployment.yaml`.
For convience, the `setup.sh` script prints the URL for you at the end.
### 2. Deploying RabbitMQ
The setup for RabbitMQ leverages an `operator` -- a specific abstraction in AWS that helps handle all the resources needed for running RabbitMQ.
```
cd raibbitmq
sh setup.sh
```
RabbitMQ will be deployed on a eksctl cluster, where each node shares provisioned storage using EBS. You'll notice in the `setup.sh` file some extra commands to install the EBS add-on, as well as granting IAM permissions for provisioning the storage.
Lastly, we use the `RabbitmqCluster` extension installed by `krew` to easily create our cluster using mostly default configs. You can visit the [example repo]() for more complex rabbitmq deployments.
The setup may take some time. Even after the setup script finishes, it takes a while for pods and storage to start. You can check the output of `kubectl get pods` or `kubectl describe pod <pod_name>` to see current status, or check your AWS EKS dashboard.
Note that the public URL printed at the end will be used in `./worker/woker-deployment.yaml`.
You can visit `<public_url>:15672` to login with username/password "guest" to monitor the status of RabbitMQ once it's fully deployed.
### 3. Deploying the Worker
Our worker deployment will continously consume messages from the RabbitMQ queue. Then, it will use our TEI deployment to embed documents and insert into our vector db (cloud-hosted weaviate, in this case, to simplify ingestion).
Before running anything here, you should:
- `cd` into the `worker/` folder
- modify the env vars in `worker/worker-deployment.yaml` to point to the appropiate rabbitmq, tei, and weaviate credentials.
- modify the pipeline and vector store setup if needed in `worker.py`
- run `docker login` if not already logged in
- run `docker build -t <image name> .`
- run `docker tag logan-markewich/worker:latest <image_name>:<image_version>`
- run `docker push <image_name>:<image_version>`
- edit `worker-deployment.yaml` and adjust the line `image: lloganm/worker:1.4` under `conatiner` to point to your docker image
With these setups complete, we can simply run `sh ./setup.sh` which will create a cluster, deploy our container, and setup a load balancer.
`kubectl get pods` will display when your pods are ready.
### 4. Configuring AWS Lambda for Ingestion
Lastly, we need to configure AWS Lambda as a public endpoint to send data into our queue for processing.
While this can be done using the CLI, I preferred using the AWS UI for this.
First, update `ingestion_lambda/lambda_function.py` to point to the proper URL for your rabbit-mq deployment (from step 2 -- I hope you wrote that down!)
Then:
```bash
cd ingestion_lambda
sh setup.sh
```
This creates a zip file with our lambda function, as well as all the dependencies needed to run the lambda function (namely just the `pika` package).
With our zip package, we can create our lambda function:
- Open the Lambda console
- click `create function`
- Use a python3.11 runtime, give the function a name
- click `create function` at the bottom
- In the lambda editor, click the `upload from` button and select `.zip file` -- upload the zip file we created earlier.
- Click deploy!
- Your public `Function URL` will show up in the top panel, or under `Configuration`
## Ingesting your Data
Once everything is deployed, you have a fully working ETL pipeline with LlamaIndex.
You can run ingestion by sending a POST request to your `Function URL` for your lambda function
```python
import requests
from llama_index import Document, SimpleDirectoryReader
documents = SimpleDirectoryReader("./data").load_data()
# this will also be the namespace for the vector store -- for weaviate, it needs to start with a captial and only alpha-numeric
user = "Loganm"
body = {
'user': user,
'documents': [doc.json() for doc in documents]
}
# use the URL of our lambda function here
response = requests.post("https://vguwrj5wc4wsd5lhgbgn37itay0lmkls.lambda-url.us-east-1.on.aws", json=body)
print(response.text)
```
## Using your Data
Once you've ingested data, querying with llama-index is a breeze. Our pipeline has automatically put the data into weaviate by default.
```python
from llama_index import VectorStoreIndex
from llama_index.vector_stores import WeaviateVectorStore
import weaviate
auth_config = weaviate.AuthApiKey(api_key="...")
client = weaviate.Client(url="...", auth_client_secret=auth_config)
vector_store = WeaviateVectorStore(weaviate_client=client, class_prefix="Loganm")
index = VectorStoreIndex.from_vector_store(vector_store)
```
+35
View File
@@ -0,0 +1,35 @@
import pika
import json
def lambda_handler(event, context):
user = event.get('user', '')
documents = event.get('documents', [])
if not user or not documents:
return {
'statusCode': 400,
'body': json.dumps('Missing user or documents')
}
credentials = pika.PlainCredentials("guest", "guest")
parameters = pika.ConnectionParameters(host="a5c51e88038e34e18ac2e8fc6e6281e7-1376501245.us-east-1.elb.amazonaws.com", port=5672, credentials=credentials)
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
channel.queue_declare(queue='etl')
for document in documents:
data = {
'user': user,
'documents': [document]
}
channel.basic_publish(
exchange="",
routing_key='etl',
body=json.dumps(data)
)
return {
'statusCode': 200,
'body': json.dumps('Documents queued for ingestion')
}
+1
View File
@@ -0,0 +1 @@
pika==1.3.2
+5
View File
@@ -0,0 +1,5 @@
#!/bin/sh
pip install -r requirements.txt -t .
zip -r9 ../ingestion_lambda.zip . -x "*.git*" "*setup.sh*" "*requirements.txt*" "*.zip*"
+22
View File
@@ -0,0 +1,22 @@
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: production-rabbitmqcluster
spec:
replicas: 2
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1
memory: 2Gi
rabbitmq:
additionalConfig: |
log.console.level = info
channel_max = 1700
default_user= guest
default_pass = guest
default_user_tags.administrator = true
service:
type: LoadBalancer
+37
View File
@@ -0,0 +1,37 @@
#!/bin/sh
# had to add these zones, else it fails to deploy
eksctl create cluster --name mqCluster --zones us-east-1a,us-east-1b,us-east-1c,us-east-1d,us-east-1f
sleep 5
eksctl utils associate-iam-oidc-provider --cluster=mqCluster --region us-east-1 --approve
sleep 5
eksctl create iamserviceaccount \
--name ebs-csi-controller-sa \
--namespace kube-system \
--cluster mqCluster \
--role-name AmazonEKS_EBS_CSI_DriverRole \
--role-only \
--attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \
--approve
sleep 5
eksctl create addon --name aws-ebs-csi-driver --cluster mqCluster --service-account-role-arn arn:aws:iam::$(aws sts get-caller-identity --query Account --output text):role/AmazonEKS_EBS_CSI_DriverRole --force
sleep 5
kubectl apply -f https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml
sleep 5
kubectl apply -f rabbitmqcluster.yaml
sleep 5
echo "RabbitMQ URL is: $(kubectl get svc production-rabbitmqcluster -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')"
echo "Note: It may take some time for pods and storage to be ready. Run 'kubectl get pods' to check status."
+33
View File
@@ -0,0 +1,33 @@
import json
import pika
from llama_index import Document
rabbitmq_url = "a3ad05b37871d4dd4a5dfbd8c573230e-623959034.us-east-1.elb.amazonaws.com"
rabbitmq_user = "guest"
rabbitmq_password = "guest"
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
parameters = pika.ConnectionParameters(
host=rabbitmq_url,
port=5672,
credentials=credentials
)
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
channel.queue_declare(queue='etl')
documents = [Document(text="logan")]
data = {
'user': "Logan", # must be upper-case
'documents': [document.json() for document in documents]
}
channel.basic_publish(exchange="", routing_key='etl', body=json.dumps(data))
def callback(ch, method, properties, body):
print(body, flush=True)
print("Success! Use `ctrl+c` to exit.", flush=True)
channel.basic_consume(queue='etl', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
+13
View File
@@ -0,0 +1,13 @@
#!/bin/sh
eksctl create cluster --name embeddings --node-type=g5.xlarge --nodes 1
sleep 5
kubectl create -f ./tei-deployment.yaml
sleep 5
kubectl create -f ./tei-service.yaml
echo "Embeddings URL is: $(kubectl get svc tei-service -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')"
+22
View File
@@ -0,0 +1,22 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: tei-deployment
labels:
app: tei-app
spec:
replicas: 1
selector:
matchLabels:
app: tei-app
template:
metadata:
labels:
app: tei-app
spec:
containers:
- name: tei-app
image: ghcr.io/huggingface/text-embeddings-inference:86-0.6
ports:
- containerPort: 80
args: ["--model-id", "BAAI/bge-large-en-v1.5", "--revision", "refs/pr/5"]
+13
View File
@@ -0,0 +1,13 @@
---
apiVersion: v1
kind: Service
metadata:
name: tei-service
spec:
type: LoadBalancer
selector:
app: tei-app
ports:
- protocol: TCP
port: 80
targetPort: 80
+13
View File
@@ -0,0 +1,13 @@
FROM python:3.11-alpine
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "worker.py"]
+5
View File
@@ -0,0 +1,5 @@
fastapi==0.108.0
llama-index==0.9.22
pika==1.3.2
uvicorn==0.25.0
weaviate-client==3.26.0
+11
View File
@@ -0,0 +1,11 @@
#!/bin/sh
eksctl create cluster --name mq-workers --zones us-east-1a,us-east-1b,us-east-1c,us-east-1d,us-east-1f
sleep 5
kubectl create -f ./worker-deployment.yaml
sleep 5
kubectl create -f ./worker-service.yaml
+38
View File
@@ -0,0 +1,38 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: mq-worker-deployment
labels:
app: mq-worker
spec:
replicas: 1
selector:
matchLabels:
app: mq-worker
template:
metadata:
labels:
app: mq-worker
spec:
containers:
- name: mq-worker
image: lloganm/worker:1.4
env:
- name: WEAVIATE_API_KEY
value: <you api key>
- name: WEAVIATE_URL
value: <you weaviate url>
- name: RABBITMQ_URL
value: <your rabbitmq url>
- name: RABBITMQ_USER
value: guest
- name: RABBITMQ_PASSWORD
value: guest
- name: TEI_URL
value: <your TEI url>
ports:
- containerPort: 8000
resources:
requests:
memory: 4Gi
cpu: "0.25"
+13
View File
@@ -0,0 +1,13 @@
---
apiVersion: v1
kind: Service
metadata:
name: mq-worker-service
spec:
type: LoadBalancer
selector:
app: mq-worker
ports:
- protocol: TCP
port: 80
targetPort: 8000
+87
View File
@@ -0,0 +1,87 @@
import json
import os
import threading
import fastapi
import pika
import uvicorn
import weaviate
from llama_index.embeddings import TextEmbeddingsInference
from llama_index.ingestion import IngestionPipeline
from llama_index.text_splitter import TokenTextSplitter
from llama_index.schema import Document
from llama_index.vector_stores import WeaviateVectorStore
app = fastapi.FastAPI()
def worker_thread():
"""Worker thread that runs the ingestion pipeline using rabbitmq."""
weaviate_api_key = os.environ['WEAVIATE_API_KEY']
weaviate_url = os.environ['WEAVIATE_URL']
auth_config = weaviate.AuthApiKey(api_key=weaviate_api_key)
rabbitmq_url = os.environ['RABBITMQ_URL']
rabbitmq_user = os.environ['RABBITMQ_USER']
rabbitmq_password = os.environ['RABBITMQ_PASSWORD']
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
parameters = pika.ConnectionParameters(
host=rabbitmq_url,
port=5672,
credentials=credentials
)
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
channel.queue_declare(queue='etl')
def callback(ch, method, properties, body):
try:
data = json.loads(body.decode('utf-8'))
documents = [Document.parse_raw(d) for d in data['documents']]
user = data['user']
user = user[0].upper() + user[1:]
client = weaviate.Client(url=weaviate_url, auth_client_secret=auth_config)
vector_store = WeaviateVectorStore(weaviate_client=client, class_prefix=user)
tei_url = os.environ['TEI_URL']
# setup pipeline
ingestion_pipeline = IngestionPipeline(
transformations=[
TokenTextSplitter(chunk_size=512),
TextEmbeddingsInference(
base_url=tei_url,
embed_batch_size=10,
model_name="BAAI/bge-large-en-v1.5"
),
],
vector_store=vector_store,
)
# ingest data directly into the users vector db
ingestion_pipeline.run(documents=documents)
except Exception as e:
print("Error during ingestion pipeline: ", e)
pass
channel.basic_consume(queue='etl', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
@app.get('/health')
def health():
return {'status': 'ok'}
if __name__ == '__main__':
# start worker thread
threading.Thread(target=worker_thread).start()
# start webserver
uvicorn.run(app, host='0.0.0.0', port=8000)