Tenable.sc API ile Otomatik Zafiyet Tarama ve Analiz Süreci

Siber güvenlik dünyasında zafiyet yönetimi her geçen gün daha otomatik, daha entegre ve daha akıllı hale geliyor. Özellikle kurumsal güvenlik çözümlerinde, manuel tarama süreçlerinin otomatikleştirilmesi zaman ve iş gücünden önemli ölçüde tasarruf sağlıyor. Bu yazıda, Tenable.sc ürününün REST API’si kullanılarak, önceden belirlenmiş bir .txt dosyasından IP adreslerinin okunması, bu IP’lerle Tenable.sc arayüzünde asset oluşturulması, ardından belirli bir policy ve repository ID’si ile tarama başlatılması ve son aşamada, önceden tanımlanmış bir query’ye göre tarama sonuçlarındaki high ve critical seviyedeki zafiyet sayılarının ekrana yazdırılmasını sağlayan bir Python betiğini detaylı şekilde inceleyeceğiz.

Ne Yapıyor Bu Script?

Aşağıdaki adımları otomatik olarak gerçekleştiren bir Python betiğimiz var:

IP adres listesini okur.

Belirtilen IP’lerle bir asset oluşturur.

O asset üzerinden bir tarama başlatır.

Taramayı takip eder ve sonuç ID’sini alır.

Taramaya ait zafiyet sonuçlarını çeker.

Verileri analiz eder ve high ve critical seviye zafiyet sayısını ekrana bastırır.

Kodun Yapısı

1. Ortam Hazırlığı python

import requests
import json
import time
import datetime
import random
import string
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

Tenable.sc API’si HTTPS üzerinden çalıştığından, self-signed sertifikalarda uyarıları bastırmak için InsecureRequestWarning devre dışı bırakıldı.

2. Giriş Bilgileri ve Değişkenler

# Değişkenler
SC_URL = "https://XX.XX.XX.XX"
ACCESS_KEY = "XXX"
SECRET_KEY = "XXX"
POLICY_ID = "1000140"
REPO_ID = "1"
ASSET_NAME_PREFIX = "TestAsset_OktayC"
SCAN_NAME_PREFIX = "TestScan_OktayC"
IP_LIST_FILE = "ip_list.txt"
SCAN_COMPLETION_TIMEOUT = 3600  # Tarama tamamlama için maksimum süre (saniye)
SCAN_STATUS_CHECK_INTERVAL = 30  # Tarama durumunu kontrol etme aralığı (saniye)
ANALYSIS_QUERY_ID = 14644  # Zafiyet analizi için kullanılacak sorgu ID'si
ANALYSIS_START_OFFSET = 0
ANALYSIS_END_OFFSET = 1000

Security Center’a bağlantı ve tarama işlemi için gerekli olan temel değişkenler bu bölümde tanımlanmıştır.

3. Header Yapısı

headers = {
    'User-Agent': 'PostmanRuntime/7.29.0',
    'x-apikey': f'accesskey={ACCESS_KEY}; secretkey={SECRET_KEY}',
    'Content-Type': 'application/json',
    'Accept': '*/*'  
}

API isteklerinde kullanılacak kimlik doğrulama ve içerik bilgilerini içeren header yapısı burada tanımlanmıştır. Access ve Secret Key değerleri x-apikey başlığı altında gönderilir.

4.Benzersiz İsim oluşturma fonksiyonu

def generate_unique_name(prefix):
    """Belirtilen önek ile benzersiz bir isim oluşturur."""
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    random_string = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))
    return f"{prefix}_{timestamp}_{random_string}"

Tarama adı ve asset adında çakışma oluşmaması için kullanıldı.

5. IP Adreslerini Okuma

def read_ips_from_file(file_path):
    """IP adreslerini bir dosyadan okur."""
    try:
        with open(file_path, "r") as f:
            ip_list = [line.strip() for line in f]
        print(f"[IP Okuma] {file_path} dosyasından {len(ip_list)} IP adresi okundu.")
        return ip_list
    except FileNotFoundError:
        print(f"[IP Okuma HATA] {file_path} dosyası bulunamadı.")
        return []
    except Exception as e:
        print(f"[IP Okuma HATA] Dosya okuma sırasında bir hata oluştu: {e}")
        return []

ip_list.txt dosyasındaki IP adreslerini okuyarak ip_list listesini döner.

6. Asset Oluşturma

def create_or_update_asset(name, ip_list):
    url_create = f"{SC_URL}/rest/asset"
    payload_create = {
        "name": name,
        "description": "Otomatik oluşturuldu.",
        "type": "static",
        "definedIPs": ",".join(ip_list)
    }
    try:
        response_create = requests.post(url_create, headers=headers, json=payload_create, verify=False)
        print(f"[Asset Oluşturma] Status: {response_create.status_code}")
        if response_create.status_code == 200:
            try:
                asset_data = response_create.json().get('response', {})
                asset_id = asset_data.get('id')
                asset_name = asset_data.get('name')
                if asset_id and asset_name:
                    print(f"[Asset Oluşturma] Asset başarıyla oluşturuldu (ID: {asset_id}, Adı: {asset_name}).")
                    return asset_id, asset_name
                else:
                    print(f"[Asset Oluşturma HATA] Asset oluşturuldu ancak ID veya isim alınamadı. Yanıt: {response_create.text}")
                    return None, None
            except json.JSONDecodeError as e:
                print(f"[Asset Oluşturma HATA] JSON ayrıştırma hatası: {e}, Yanıt: {response_create.text}")
                return None, None
        else:
            print(f"[Asset Oluşturma HATA] Asset oluşturulamadı. Durum Kodu: {response_create.status_code}, Yanıt: {response_create.text}")
            return None, None
    except requests.exceptions.RequestException as e:
        print(f"[Asset Oluşturma HATA] Asset oluşturma sırasında bir istek hatası oluştu: {e}")
        return None, None

Bu fonksiyon, verilen IP listesiyle Security Center üzerinde yeni bir asset oluşturur. Oluşturma işlemi başarılıysa asset ID ve adını döner; aksi halde hata mesajı verir.

7. Taramanın oluşturulması

def create_scan(policy_id, asset_id, asset_name, repository_id, scan_name):
    """Belirtilen asset ile bir tarama oluşturur ve başlatır."""
    url_create_scan = f"{SC_URL}/rest/scan"
    payload_create_scan = {
        "name": scan_name,
        "description": "Otomatik oluşturuldu",
        "type": "policy",
        "assets": [{"id": str(asset_id), "name": asset_name}],
        "schedule": {"type": "template"},
        "policy": {"id": str(policy_id)},
        "repository": {"id": str(repository_id)}
    }
    try:
        response_create_scan = requests.post(url_create_scan, headers=headers, json=payload_create_scan, verify=False)
        print(f"[Tarama Oluşturma] Status: {response_create_scan.status_code}")
        if response_create_scan.status_code == 200:
            scan_data = response_create_scan.json().get('response', {})
            scan_id = scan_data.get('id')
            if scan_id:
                print(f"[Tarama Oluşturma] Tarama başarıyla oluşturuldu (ID: {scan_id}). Başlatılıyor.")
                if launch_scan(scan_id):
                    return scan_id
                else:
                    return None
            else:
                print(f"[Tarama Oluşturma HATA] Tarama oluşturuldu ancak ID alınamadı. Yanıt: {response_create_scan.text}")
                return None
        else:
            print(f"[Tarama Oluşturma HATA] Tarama oluşturulamadı. Durum Kodu: {response_create_scan.status_code}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"[Tarama Yönetimi HATA] Tarama oluşturma sırasında bir istek hatası oluştu: {e}")
        return None

Bu fonksiyon, belirtilen policy ve asset bilgileriyle yeni bir tarama oluşturur ve başlatır. Başarılı olursa scan ID’sini döner, aksi durumda hata mesajı verir.

8. Taramanın başlatılması

def launch_scan(scan_id):
    """Belirtilen scan ID'sini başlatır."""
    url = f"{SC_URL}/rest/scan/{scan_id}/launch"
    try:
        response = requests.post(url, headers=headers, verify=False)
        print(f"[Tarama Başlatma] Status: {response.status_code}")
        if response.status_code == 200:
            print(f"[Tarama Başlatma] Tarama (ID: {scan_id}) başarıyla başlatıldı.")
            return True
        else:
            print(f"[Tarama Başlatma HATA] Tarama başlatılamadı (ID: {scan_id}). Durum Kodu: {response.status_code}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"[Tarama Başlatma HATA] Tarama başlatma sırasında bir istek hatası oluştu: {e}")
        return False

Bu fonksiyon, verilen scan ID’ye ait taramayı başlatır. Başarılı olursa True, hata durumunda False döner ve ilgili mesajları ekrana yazdırır.

9. Scan Result ID’nin çekilmesi

def get_latest_scan_result_id_by_name(scan_name):
    """Belirtilen tarama adının en son sonucunun ID'sini getirir."""
    url = f"{SC_URL}/rest/scanResult"
    params = {
        'fields': 'id,name,finishTime'
    }
    try:
        response = requests.get(url, headers=headers, params=params, verify=False)
        response.raise_for_status()
        data = response.json()
        usable_results = data.get('response', {}).get('usable', [])
        manageable_results = data.get('response', {}).get('manageable', [])
        all_results = usable_results + manageable_results

        # Belirtilen ada sahip sonuçları filtrele
        matching_results = [res for res in all_results if res.get('name') == scan_name]

        if not matching_results:
            print(f"[TARAMA SONUÇLARI UYARI] '{scan_name}' adlı tarama sonucu bulunamadı.")
            return None

        # finishTime'a göre sırala (en yeni önce)
        sorted_results = sorted(matching_results, key=lambda x: x.get('finishTime', 0), reverse=True)

        # En son sonucun ID'sini döndür
        if sorted_results:
            return sorted_results[0].get('id')
        else:
            print(f"[TARAMA SONUÇLARI UYARI] '{scan_name}' adlı taramanın sonucu bulunamadı.")
            return None
        
    except requests.exceptions.RequestException as e:
        print(f"[TARAMA SONUÇLARI HATA] Tarama sonuçları alınamadı: {e}")
        if isinstance(e, requests.exceptions.HTTPError):
            print(f"[UYARI] HTTP Hatası (Kod: {e.response.status_code}): {e.response.text}")
        return []
    except json.JSONDecodeError as e:
        print(f"[TARAMA SONUÇLARI HATA] JSON ayrıştırma hatası: {e}, Yanıt: {response.text}")
        return []

Bu fonksiyon, belirtilen tarama adına ait en son tarama sonucunun ID’sini döner. Eğer sonuç bulunamazsa None döner ve uyarı verir.

10. Tarama durumunun kontrolü

def get_scan_result_details(scan_result_id, fields=None):
    """Belirtilen Scan Result ID'sinin detaylarını getirir.    """
    url = f"{SC_URL}/rest/scanResult/{scan_result_id}"
    if fields:
        url += "?fields=" + ",".join(fields)

    try:
        response = requests.get(url, headers=headers, verify=False)
        response.raise_for_status()
        response_json = response.json()
        if 'response' in response_json:
            return response_json['response']
        else:
            print(f"[HATA] Beklenmeyen yanıt yapısı (ID: {scan_result_id}): {response_json}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"[HATA] Scan Result (ID: {scan_result_id}) alınamadı: {e}")
        if isinstance(e, requests.exceptions.HTTPError):
            print(f"[UYARI] HTTP Hatası (Kod: {e.response.status_code}): {e.response.text}")
        return None

Verilen Scan Result ID’ye ait tarama detaylarını getirir. Yanıtta response alanı yoksa veya istek başarısız olursa hata mesajı döner.

11) Tarama sonuclarının çekilmesi

def get_analysis_results(scan_result_id, start_offset, end_offset, scan_query_id):
    """Belirli bir tarama ve sorgu ID'si için zafiyet analiz sonuçlarını getirir."""
    url = f"{SC_URL}/rest/analysis"
    payload = {
        "type": "vuln",
        "query": {
            "id": scan_query_id
        },
        "sourceType": "individual",
        "startOffset": start_offset,
        "endOffset": end_offset,
        "scanID": scan_result_id,
        "view": "all"
    }
    try:
        print(f"[ZAFİYET ALMA] İstek URL'si: {url}")
        print(f"[ZAFİYET ALMA] Gönderilen Veri: {json.dumps(payload)}")
        response = requests.post(url, headers=headers, verify=False, json=payload)
        print(f"[ZAFİYET ALMA] Yanıt Durum Kodu: {response.status_code}")
        if response.status_code == 200:
            try:
                analysis_results = response.json().get('response')
                if analysis_results and 'results' in analysis_results:
                    return analysis_results
                else:
                    print("[UYARI] Analiz sonuçları boş veya beklenenden farklı geldi.")
                    return None
            except json.JSONDecodeError as e:
                print(f"[ZAFİYET ALMA HATA] JSON ayrıştırma hatası: {e}, Yanıt: {response.text}")
                return None
        else:
            print(f"[ZAFİYET ALMA HATA] Analiz sonuçları alınamadı. Durum Kodu: {response.status_code}, Yanıt: {response.text}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"[ZAFİYET ALMA HATA] Analiz sonuçları alınırken bir istek hatası oluştu: {e}")
        return None

Belirtilen tarama sonucu ve sorgu ID’sine göre zafiyet analiz verilerini getirir. Başarısız isteklerde veya beklenmeyen yanıt yapısında hata mesajı döner.

12) Ana fonksiyon

def main():
    """Ana çalıştırma fonksiyonu."""
    try:
        ip_list = read_ips_from_file(IP_LIST_FILE)
        if not ip_list:
            print("[ANA] IP listesi boş olduğu için işlem sonlandırılıyor.")
            return

        asset_name = generate_unique_name(ASSET_NAME_PREFIX)
        asset_id, created_asset_name = create_or_update_asset(asset_name, ip_list)

        if asset_id:
            scan_name = generate_unique_name(SCAN_NAME_PREFIX)
            scan_id = create_scan(POLICY_ID, asset_id, created_asset_name, REPO_ID, scan_name)

            if scan_id:
                print(f"[ANA] Tarama başlatıldı (ID: {scan_id}, Adı: {scan_name}). Tamamlanması bekleniyor...")
                start_time = time.time()
                while True:
                    time.sleep(SCAN_STATUS_CHECK_INTERVAL)
                    latest_scan_result_id = get_latest_scan_result_id_by_name(scan_name)
                    if latest_scan_result_id:
                        scan_result_details_status = get_scan_result_details(latest_scan_result_id, fields=['status'])
                        if scan_result_details_status:
                            status = scan_result_details_status.get('status')
                            print(f"[TARAMA DURUMU] Tarama Adı: {scan_name}, Sonuç ID: {latest_scan_result_id}, Durum: {status}")
                            if status == "Completed":
                                print("[TARAMA DURUMU] Tarama tamamlandı. Zafiyet sonuçları analiz ediliyor...")
                                scan_result_full_details = get_scan_result_details(latest_scan_result_id)
                                if scan_result_full_details:
                                    analysis_results = get_analysis_results(latest_scan_result_id, ANALYSIS_START_OFFSET, ANALYSIS_END_OFFSET, ANALYSIS_QUERY_ID)
                                    if analysis_results and 'results' in analysis_results:
                                        critical_count = 0
                                        high_count = 0
                                        for result in analysis_results['results']:
                                            if result.get('severity', {}).get('id') == "4":
                                                critical_count += 1
                                            elif result.get('severity', {}).get('id') == "3":
                                                high_count += 1

                                        print(f"\n[TARAMA SONUÇLARI] (Tarama Adı: {scan_name})")
                                        print(f"  Toplam Kritik Zafiyet Sayısı (İlk {ANALYSIS_END_OFFSET - ANALYSIS_START_OFFSET} Kayıtta): {critical_count}")
                                        print(f"  Toplam Yüksek Seviyeli Zafiyet Sayısı (İlk {ANALYSIS_END_OFFSET - ANALYSIS_START_OFFSET} Kayıtta): {high_count}")
                                        print(f"  Toplam Kayıt Sayısı: {analysis_results.get('totalRecords')}")
                                        break
                                    else:
                                        print("[UYARI] Tarama tamamlandı ancak zafiyet sonuçları alınamadı.")
                                        break
                                else:
                                    print("[HATA] Scan Result detayları alınamadı (ID: {latest_scan_result_id}).")
                                    break
                            elif status in ["Canceled", "Failed", "Aborted"]:
                                print(f"[TARAMA DURUMU] Tarama tamamlanmadı. Durum: {status}")
                                return
                            elif time.time() - start_time > SCAN_COMPLETION_TIMEOUT:
                                print("[TARAMA DURUMU] Tarama tamamlama zaman aşımına uğradı. İşlem sonlandırılıyor.")
                                break
                        else:
                            print(f"[UYARI] Scan Result (ID: {latest_scan_result_id}) durumu alınamadı.")
                    else:
                        print(f"[UYARI] \"{scan_name}\" adına sahip tamamlanmış bir tarama sonucu bulunamadı.")
                    time.sleep(SCAN_STATUS_CHECK_INTERVAL)
            else:
                print("[ANA HATA] Tarama oluşturma/başlatma başarısız oldu.")
        else:
            print("[ANA HATA] Asset oluşturma/güncelleme başarısız olduğu için tarama başlatılamadı.")

        print("[ANA] Script başarıyla tamamlandı.")

    except Exception as e:
        print(f"[ANA HATA] Beklenmeyen bir hata oluştu: {e}")

Script çıktısı

appsec

Makalemi okuduğunuz için teşekkür ederim. Umarım bu içerik, Tenable.sc API kullanımına dair pratik bir bakış sunmuş ve sizlere faydalı olmuştur. Başka yazılarda görüşmek üzere.

Written by

Oktay Çetin