commit dc03439074acb84a5c6d8c9390845a9ef59cedf5 Author: Dionysus Date: Thu Jan 16 06:46:33 2025 +0000 Add app.py diff --git a/app.py b/app.py new file mode 100644 index 0000000..2a91ce9 --- /dev/null +++ b/app.py @@ -0,0 +1,220 @@ +from elasticsearch import Elasticsearch +import chromadb +from chromadb.utils import embedding_functions +import json +import requests +import logging +from typing import Dict, List, Any +from dotenv import load_dotenv +import os +from pathlib import Path + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class ElasticSearchAI: + def __init__(self): + # Initialize Elasticsearch client with environment variables + es_config = { + 'hosts': [f"{os.getenv('ES_HOST', 'localhost')}:{os.getenv('ES_PORT', '9200')}"], + 'basic_auth': ( + os.getenv('ES_USERNAME', 'elastic'), + os.getenv('ES_PASSWORD', '') + ) + } + + # Add SSL configuration if enabled + if os.getenv('ES_USE_SSL', 'false').lower() == 'true': + es_config.update({ + 'use_ssl': True, + 'verify_certs': os.getenv('ES_VERIFY_CERTS', 'true').lower() == 'true', + 'ca_certs': os.getenv('ES_CA_CERT_PATH') + }) + + self.es = Elasticsearch(**es_config) + + # Initialize ChromaDB client + chroma_config = { + 'host': os.getenv('CHROMADB_HOST', 'localhost'), + 'port': int(os.getenv('CHROMADB_PORT', '8000')), + 'persist_directory': os.getenv('CHROMADB_PERSISTENCE_DIR', './chroma_storage') + } + + self.chroma_client = chromadb.Client() + self.collection = self.chroma_client.create_collection( + name="search_results", + embedding_function=embedding_functions.DefaultEmbeddingFunction() + ) + + # Cache for index information + self.index_info = None + + # Ollama configuration + self.ollama_url = f"http://{os.getenv('OLLAMA_HOST', 'localhost')}:{os.getenv('OLLAMA_PORT', '11434')}" + self.ollama_model = os.getenv('OLLAMA_MODEL', 'llama2') + + def get_index_information(self) -> Dict: + """Gather index information from Elasticsearch""" + try: + # Get all havoc-* indices and their mappings + indices_info = self.es.indices.get("havoc-*") + + # Get index statistics + indices_stats = self.es.indices.stats(index="havoc-*") + + # Combine the information + index_data = {} + for index_name, info in indices_info.items(): + index_data[index_name] = { + "mappings": info["mappings"], + "description": { + "short": info["settings"]["index"].get("metadata", {}).get("desc_short", "No short description available"), + "long": info["settings"]["index"].get("metadata", {}).get("desc_long", "No long description available") + }, + "stats": { + "doc_count": indices_stats["indices"][index_name]["total"]["docs"]["count"], + "size_bytes": indices_stats["indices"][index_name]["total"]["store"]["size_in_bytes"] + } + } + + self.index_info = index_data + return index_data + except Exception as e: + logger.error(f"Error gathering index information: {str(e)}") + raise + + def query_ollama(self, user_prompt: str) -> List[Dict]: + """Query Ollama to generate Elasticsearch DSL queries""" + # Ensure we have fresh index information + if not self.index_info: + self.get_index_information() + + # Create a more informative system prompt using the gathered information + index_descriptions = [] + for index_name, info in self.index_info.items(): + index_descriptions.append(f""" + Index: {index_name} + Short description: {info['description']['short']} + Long description: {info['description']['long']} + Document count: {info['stats']['doc_count']:,} + Size: {info['stats']['size_bytes'] / (1024*1024*1024):.2f} GB + Mapping: {json.dumps(info['mappings'], indent=2)} + """) + + system_prompt = f""" + You are an Elasticsearch query generator. Convert the following natural language query into Elasticsearch DSL queries. + + Available indices and their information: + {''.join(index_descriptions)} + + Rules: + 1. Return a JSON array where each object represents queries for a single index + 2. Each object should have format: + {{"index": "index-name", "queries": [list of query objects for this index]}} + 3. Consider the mapping types when generating queries: + - For text fields, use match/match_phrase for full-text search + - For keyword fields, use term/terms for exact matches + - For IP fields, use proper IP query syntax + - For date fields, use date range queries when appropriate + - For nested fields, use nested queries + 4. Each query should be a valid Elasticsearch DSL query object + 5. Only query indices that are relevant to the user's request + + User query: {user_prompt} + """ + + response = requests.post( + f"{self.ollama_url}/api/generate", + json={ + "model": self.ollama_model, + "prompt": system_prompt, + "format": "json" + } + ) + + return json.loads(response.json()["response"]) + + def execute_search(self, query_data: List[Dict]) -> List[Dict[str, Any]]: + """Execute Elasticsearch search across specified indices with multiple queries""" + results = [] + + for index_queries in query_data: + index = index_queries["index"] + queries = index_queries["queries"] + + try: + # Execute each query for this index + for query in queries: + response = self.es.search( + index=index, + body={"query": query} + ) + results.extend(response["hits"]["hits"]) + except Exception as e: + logger.error(f"Error searching index {index}: {str(e)}") + + return results + + def store_in_chroma(self, results: List[Dict[str, Any]], query_id: str): + """Store search results in ChromaDB""" + documents = [] + metadatas = [] + ids = [] + + for i, result in enumerate(results): + documents.append(json.dumps(result["_source"])) + metadatas.append({ + "index": result["_index"], + "score": result["_score"] + }) + ids.append(f"{query_id}_{i}") + + self.collection.add( + documents=documents, + metadatas=metadatas, + ids=ids + ) + + def generate_response(self, user_prompt: str, results: List[Dict[str, Any]]) -> str: + """Generate a response based on search results""" + response_prompt = f""" + Generate a response for the user's query: {user_prompt} + + Based on these search results: {json.dumps(results, indent=2)} + + If the user requested JSON output, return valid JSON. + Otherwise, provide a natural language summary. + """ + + response = requests.post( + f"{self.ollama_url}/api/generate", + json={ + "model": self.ollama_model, + "prompt": response_prompt + } + ) + + return response.json()["response"] + + def process_query(self, user_prompt: str) -> str: + """Process a natural language query end-to-end""" + # Ensure we have fresh index information + if not self.index_info: + self.get_index_information() + + # Generate Elasticsearch query using Ollama + query_data = self.query_ollama(user_prompt) + + # Execute search + results = self.execute_search(query_data) + + # Store results in ChromaDB + query_id = str(hash(user_prompt)) + self.store_in_chroma(results, query_id) + + # Generate response + return self.generate_response(user_prompt, results) \ No newline at end of file