You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

143 lines
4.5 KiB

import json
import os
import time
import logging
from datetime import datetime
from elasticsearch import Elasticsearch, helpers
# Configurazione logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configurazione Elasticsearch
ES_HOST = os.environ.get('ES_HOST', 'elasticsearch')
ES_PORT = os.environ.get('ES_PORT', '9200')
INDEX_NAME = 'geodata'
GEOJSON_FILE = '/app/data/sample.geojson'
def wait_for_elasticsearch(es):
"""Attende che Elasticsearch sia pronto."""
while True:
try:
if es.ping():
logging.info("Elasticsearch è pronto!")
break
except Exception:
pass
logging.info("In attesa di Elasticsearch...")
time.sleep(5)
def create_index(es):
"""Crea l'indice con il mapping corretto per i dati geospaziali."""
mapping = {
"mappings": {
"properties": {
"location": {
"type": "geo_point"
},
"timestamp": {
"type": "date"
},
"status": {
"type": "keyword"
},
"operator": {
"type": "keyword"
},
"duration": {
"type": "integer"
},
"timing_status": {
"type": "keyword"
},
"lead_time": {
"type": "integer"
},
"task_type": {
"type": "keyword"
},
"min_duration": {
"type": "integer"
},
"max_duration": {
"type": "integer"
},
"avg_duration": {
"type": "integer"
},
"task_duration": {
"type": "integer"
},
"id": {
"type": "integer"
}
}
}
}
if es.indices.exists(index=INDEX_NAME):
logging.info(f"L'indice '{INDEX_NAME}' esiste già.")
else:
es.indices.create(index=INDEX_NAME, body=mapping)
logging.info(f"Indice '{INDEX_NAME}' creato con successo.")
def process_geojson(file_path):
"""Legge il file GeoJSON e prepara i documenti per Elasticsearch."""
with open(file_path, 'r') as f:
data = json.load(f)
actions = []
for feature in data.get('features', []):
geometry = feature.get('geometry')
properties = feature.get('properties', {})
# Assicuriamoci che ci sia una geometria di tipo Point
if geometry and geometry.get('type') == 'Point':
lon, lat = geometry.get('coordinates')
# Creiamo il documento
doc = {
"_index": INDEX_NAME,
"_source": {
"location": {
"lat": lat,
"lon": lon
},
# Usa il timestamp presente o quello attuale
"timestamp": properties.get('timestamp', datetime.now().isoformat()),
"status": properties.get('status', 'UNKNOWN'),
"operator": properties.get('operator', 'Unknown Operator'),
"duration": properties.get('duration', 0),
**properties # Includi tutte le altre proprietà
}
}
actions.append(doc)
return actions
def main():
es = Elasticsearch([f"http://{ES_HOST}:{ES_PORT}"])
wait_for_elasticsearch(es)
create_index(es)
# Controllo se ci sono già dati per evitare duplicati
try:
count = es.count(index=INDEX_NAME)['count']
if count > 0:
logging.info(f"L'indice '{INDEX_NAME}' contiene già {count} documenti. Salto il caricamento.")
return
except Exception:
pass
if os.path.exists(GEOJSON_FILE):
logging.info(f"Caricamento dati da {GEOJSON_FILE}...")
actions = process_geojson(GEOJSON_FILE)
if actions:
helpers.bulk(es, actions)
logging.info(f"Caricati {len(actions)} documenti in Elasticsearch.")
else:
logging.warning("Nessun dato valido trovato nel file GeoJSON.")
else:
logging.error(f"File {GEOJSON_FILE} non trovato. Assicurati di montare il volume correttamente.")
if __name__ == "__main__":
main()