Background


I needed to extract a significant number of access logs from Elastic Cloud for analytical purposes over several months. During our research, I discovered that the number of logs generated per month was in the hundreds of millions, which far exceeded the limit of Kibana's built-in tools. To overcome this issue, we implemented a Python script that leverages the Elasticsearch API and the search_after method to iteratively retrieve logs.


Challenges



API token creation


To authenticate API requests, I created an API key with the following permissions:


{
  "superuser": {
    "cluster": ["all"],
    "indices": [
      {
        "names": ["*"],
        "privileges": ["all"],
        "allow_restricted_indices": false
      },
      {
        "names": ["*"],
        "privileges": ["monitor", "read", "view_index_metadata", "read_cross_cluster", "manage"],
        "allow_restricted_indices": true
      }
    ]
  }
}


Notes:



Elasticsearch query setup


I used Kibana's Dev Tools to construct a query before implementing it in Python. The query included:



Example query:


{
  "query": {
    "bool": {
      "filter": [
        {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
        {"range": {"@timestamp": {"gte": "now-2h", "lte": "now"}}}
      ]
    }
  },
  "size": 10000,
  "sort": [{"_doc": "desc"}], 
  "pit": {
     "id": "", 
     "keep_alive": "60m"
  },
  "fields": [
    "json.EdgeRequestHost","json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
  ]
}


Implementation using Python


Before starting, install the elasticsearch package in any convenient way, e.g., pip install elasticsearch

I implemented a Python script using the elasticsearch library:


import json
import time
from datetime import datetime, timezone, timedelta
from elasticsearch import Elasticsearch

# Elasticsearch connection settings
ES_URL = ""
API_KEY = ""

# Query parameters
BATCH_SIZE = 10000 # Max 10000
OUTPUT_FILE = "logs.json"
INDEX = "EXAMPLE_INDEX"
KEEP_ALIVE = "60m"
TIME_WINDOW = 60  # Minutes

# Initialize Elasticsearch client
es = Elasticsearch(ES_URL, api_key=API_KEY, request_timeout=60, verify_certs=True)

def create_pit():
    """Create Point in Time"""
    return es.open_point_in_time(index=INDEX, keep_alive=KEEP_ALIVE)["id"]

def close_pit(pit_id):
    """Close Point in Time"""
    es.close_point_in_time(body={"id": pit_id})

def get_query(pit_id, search_after=None):
    """Generate search query for last TIME_WINDOW minutes"""
    now = datetime.now(timezone.utc)
    start_time = now - timedelta(minutes=TIME_WINDOW)
    query = {
        "pit": {"id": pit_id, "keep_alive": KEEP_ALIVE},
        "size": BATCH_SIZE,
        "sort": [{"_doc": "desc"}],
        "query": {
            "bool": {
                "filter": [
                    {"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
                    {"range": {"@timestamp": {"gte": start_time.isoformat(), "lte": now.isoformat(),
                                              "format": "strict_date_optional_time"}}}
                ]
            }
        },
        "fields": [
            "json.EdgeRequestHost", "json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
        ]
    }
    if search_after:
        query["search_after"] = search_after
    return query

def transform_hit(hit):
    """Transform record into required format"""
    fields = hit.get("fields", {})
    return {
        "remote_ip": fields.get("source.ip", ["-"])[0] if "source.ip" in fields else "-",
        "remote_log": "-",
        "user": "-",
        "timestamp": fields.get("json.EdgeStartTimestamp", ["-"])[0] if "json.EdgeStartTimestamp" in fields else "-",
        "request-path": fields.get('url.path', ['-'])[0] if "url.path" in fields else "-",
        "request-host": fields.get('json.EdgeRequestHost', ['-'])[0] if "json.EdgeRequestHost" in fields else "-",
        "status": "-",
        "response-bytes": "-",
        "time-take": "-",
        "referer": fields.get("json.ClientRequestReferer", ["-"])[0] if "json.ClientRequestReferer" in fields else "-",
        "ua": fields.get("json.ClientRequestUserAgent", ["-"])[0] if "json.ClientRequestUserAgent" in fields else "-"
    }

def fetch_logs():
    """Fetch logs from Elasticsearch"""
    pit_id = None
    start_time = time.time()
    try:
        pit_id = create_pit()
        print("PIT opened")
        total_records = 0
        search_after = None
        print("Starting logs extraction from Elasticsearch...")
        with open(OUTPUT_FILE, 'w', encoding='utf-8') as outfile:
            while True:
                response = es.search(body=get_query(pit_id, search_after))
                hits = response.get("hits", {}).get("hits", [])
                if not hits:
                    break
                for hit in hits:
                    outfile.write(json.dumps(transform_hit(hit)) + "\n")
                    total_records += 1
                    if total_records % BATCH_SIZE == 0:
                        elapsed_time = time.time() - start_time
                        elapsed_str = str(timedelta(seconds=elapsed_time))
                        print(f"Processed {total_records} records. Time elapsed: {elapsed_str}...")
                search_after = hits[-1].get("sort")
                time.sleep(0.1)
        elapsed_time = time.time() - start_time
        elapsed_str = str(timedelta(seconds=elapsed_time))
        print(f"\nTotal processed records: {total_records}. Saved to {OUTPUT_FILE}. Time taken: {elapsed_str}")
    except Exception as e:
        print(f"Error occurred: {e}")
    finally:
        if pit_id:
            try:
                close_pit(pit_id)
                print("PIT closed")
            except Exception as e:
                print(f"Error closing PIT: {e}")

if __name__ == "__main__":
    fetch_logs()


Details


Traffic filters in Elastic Cloud


To allow access from any machine, users may need to configure Traffic filters in Elastic Cloud.



Query development in Kibana dev tools


Before implementing queries in the script, I first built and tested them in Kibana dev tools.

Documentation: Kibana Console.


Creating an API key in Elastic Cloud


To authenticate requests, create an API key in Elastic Cloud.



Conclusion


Using search_after PIT allowed us to efficiently fetch large log datasets from Elastic Cloud.