Siber güvenlik dünyasında zafiyet yönetimi her geçen gün daha otomatik, daha entegre ve daha akıllı hale geliyor. Özellikle kurumsal güvenlik çözümlerinde, manuel tarama süreçlerinin otomatikleştirilmesi zaman ve iş gücünden önemli ölçüde tasarruf sağlıyor. Bu yazıda, Tenable.sc ürününün REST API’si kullanılarak, önceden belirlenmiş bir .txt dosyasından IP adreslerinin okunması, bu IP’lerle Tenable.sc arayüzünde asset oluşturulması, ardından belirli bir policy ve repository ID’si ile tarama başlatılması ve son aşamada, önceden tanımlanmış bir query’ye göre tarama sonuçlarındaki high ve critical seviyedeki zafiyet sayılarının ekrana yazdırılmasını sağlayan bir Python betiğini detaylı şekilde inceleyeceğiz.
Ne Yapıyor Bu Script?
Aşağıdaki adımları otomatik olarak gerçekleştiren bir Python betiğimiz var:
IP adres listesini okur.
Belirtilen IP’lerle bir asset oluşturur.
O asset üzerinden bir tarama başlatır.
Taramayı takip eder ve sonuç ID’sini alır.
Taramaya ait zafiyet sonuçlarını çeker.
Verileri analiz eder ve high ve critical seviye zafiyet sayısını ekrana bastırır.
Kodun Yapısı
1. Ortam Hazırlığı python
import requests
import json
import time
import datetime
import random
import string
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
Tenable.sc API’si HTTPS üzerinden çalıştığından, self-signed sertifikalarda uyarıları bastırmak için InsecureRequestWarning devre dışı bırakıldı.
2. Giriş Bilgileri ve Değişkenler
# Değişkenler
SC_URL = "https://XX.XX.XX.XX"
ACCESS_KEY = "XXX"
SECRET_KEY = "XXX"
POLICY_ID = "1000140"
REPO_ID = "1"
ASSET_NAME_PREFIX = "TestAsset_OktayC"
SCAN_NAME_PREFIX = "TestScan_OktayC"
IP_LIST_FILE = "ip_list.txt"
SCAN_COMPLETION_TIMEOUT = 3600 # Tarama tamamlama için maksimum süre (saniye)
SCAN_STATUS_CHECK_INTERVAL = 30 # Tarama durumunu kontrol etme aralığı (saniye)
ANALYSIS_QUERY_ID = 14644 # Zafiyet analizi için kullanılacak sorgu ID'si
ANALYSIS_START_OFFSET = 0
ANALYSIS_END_OFFSET = 1000
Security Center’a bağlantı ve tarama işlemi için gerekli olan temel değişkenler bu bölümde tanımlanmıştır.
3. Header Yapısı
headers = {
'User-Agent': 'PostmanRuntime/7.29.0',
'x-apikey': f'accesskey={ACCESS_KEY}; secretkey={SECRET_KEY}',
'Content-Type': 'application/json',
'Accept': '*/*'
}
API isteklerinde kullanılacak kimlik doğrulama ve içerik bilgilerini içeren header yapısı burada tanımlanmıştır. Access ve Secret Key değerleri x-apikey başlığı altında gönderilir.
4.Benzersiz İsim oluşturma fonksiyonu
def generate_unique_name(prefix):
"""Belirtilen önek ile benzersiz bir isim oluşturur."""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
random_string = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))
return f"{prefix}_{timestamp}_{random_string}"
Tarama adı ve asset adında çakışma oluşmaması için kullanıldı.
5. IP Adreslerini Okuma
def read_ips_from_file(file_path):
"""IP adreslerini bir dosyadan okur."""
try:
with open(file_path, "r") as f:
ip_list = [line.strip() for line in f]
print(f"[IP Okuma] {file_path} dosyasından {len(ip_list)} IP adresi okundu.")
return ip_list
except FileNotFoundError:
print(f"[IP Okuma HATA] {file_path} dosyası bulunamadı.")
return []
except Exception as e:
print(f"[IP Okuma HATA] Dosya okuma sırasında bir hata oluştu: {e}")
return []
ip_list.txt dosyasındaki IP adreslerini okuyarak ip_list listesini döner.
6. Asset Oluşturma
def create_or_update_asset(name, ip_list):
url_create = f"{SC_URL}/rest/asset"
payload_create = {
"name": name,
"description": "Otomatik oluşturuldu.",
"type": "static",
"definedIPs": ",".join(ip_list)
}
try:
response_create = requests.post(url_create, headers=headers, json=payload_create, verify=False)
print(f"[Asset Oluşturma] Status: {response_create.status_code}")
if response_create.status_code == 200:
try:
asset_data = response_create.json().get('response', {})
asset_id = asset_data.get('id')
asset_name = asset_data.get('name')
if asset_id and asset_name:
print(f"[Asset Oluşturma] Asset başarıyla oluşturuldu (ID: {asset_id}, Adı: {asset_name}).")
return asset_id, asset_name
else:
print(f"[Asset Oluşturma HATA] Asset oluşturuldu ancak ID veya isim alınamadı. Yanıt: {response_create.text}")
return None, None
except json.JSONDecodeError as e:
print(f"[Asset Oluşturma HATA] JSON ayrıştırma hatası: {e}, Yanıt: {response_create.text}")
return None, None
else:
print(f"[Asset Oluşturma HATA] Asset oluşturulamadı. Durum Kodu: {response_create.status_code}, Yanıt: {response_create.text}")
return None, None
except requests.exceptions.RequestException as e:
print(f"[Asset Oluşturma HATA] Asset oluşturma sırasında bir istek hatası oluştu: {e}")
return None, None
Bu fonksiyon, verilen IP listesiyle Security Center üzerinde yeni bir asset oluşturur. Oluşturma işlemi başarılıysa asset ID ve adını döner; aksi halde hata mesajı verir.
7. Taramanın oluşturulması
def create_scan(policy_id, asset_id, asset_name, repository_id, scan_name):
"""Belirtilen asset ile bir tarama oluşturur ve başlatır."""
url_create_scan = f"{SC_URL}/rest/scan"
payload_create_scan = {
"name": scan_name,
"description": "Otomatik oluşturuldu",
"type": "policy",
"assets": [{"id": str(asset_id), "name": asset_name}],
"schedule": {"type": "template"},
"policy": {"id": str(policy_id)},
"repository": {"id": str(repository_id)}
}
try:
response_create_scan = requests.post(url_create_scan, headers=headers, json=payload_create_scan, verify=False)
print(f"[Tarama Oluşturma] Status: {response_create_scan.status_code}")
if response_create_scan.status_code == 200:
scan_data = response_create_scan.json().get('response', {})
scan_id = scan_data.get('id')
if scan_id:
print(f"[Tarama Oluşturma] Tarama başarıyla oluşturuldu (ID: {scan_id}). Başlatılıyor.")
if launch_scan(scan_id):
return scan_id
else:
return None
else:
print(f"[Tarama Oluşturma HATA] Tarama oluşturuldu ancak ID alınamadı. Yanıt: {response_create_scan.text}")
return None
else:
print(f"[Tarama Oluşturma HATA] Tarama oluşturulamadı. Durum Kodu: {response_create_scan.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"[Tarama Yönetimi HATA] Tarama oluşturma sırasında bir istek hatası oluştu: {e}")
return None
Bu fonksiyon, belirtilen policy ve asset bilgileriyle yeni bir tarama oluşturur ve başlatır. Başarılı olursa scan ID’sini döner, aksi durumda hata mesajı verir.
8. Taramanın başlatılması
def launch_scan(scan_id):
"""Belirtilen scan ID'sini başlatır."""
url = f"{SC_URL}/rest/scan/{scan_id}/launch"
try:
response = requests.post(url, headers=headers, verify=False)
print(f"[Tarama Başlatma] Status: {response.status_code}")
if response.status_code == 200:
print(f"[Tarama Başlatma] Tarama (ID: {scan_id}) başarıyla başlatıldı.")
return True
else:
print(f"[Tarama Başlatma HATA] Tarama başlatılamadı (ID: {scan_id}). Durum Kodu: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"[Tarama Başlatma HATA] Tarama başlatma sırasında bir istek hatası oluştu: {e}")
return False
Bu fonksiyon, verilen scan ID’ye ait taramayı başlatır. Başarılı olursa True, hata durumunda False döner ve ilgili mesajları ekrana yazdırır.
9. Scan Result ID’nin çekilmesi
def get_latest_scan_result_id_by_name(scan_name):
"""Belirtilen tarama adının en son sonucunun ID'sini getirir."""
url = f"{SC_URL}/rest/scanResult"
params = {
'fields': 'id,name,finishTime'
}
try:
response = requests.get(url, headers=headers, params=params, verify=False)
response.raise_for_status()
data = response.json()
usable_results = data.get('response', {}).get('usable', [])
manageable_results = data.get('response', {}).get('manageable', [])
all_results = usable_results + manageable_results
# Belirtilen ada sahip sonuçları filtrele
matching_results = [res for res in all_results if res.get('name') == scan_name]
if not matching_results:
print(f"[TARAMA SONUÇLARI UYARI] '{scan_name}' adlı tarama sonucu bulunamadı.")
return None
# finishTime'a göre sırala (en yeni önce)
sorted_results = sorted(matching_results, key=lambda x: x.get('finishTime', 0), reverse=True)
# En son sonucun ID'sini döndür
if sorted_results:
return sorted_results[0].get('id')
else:
print(f"[TARAMA SONUÇLARI UYARI] '{scan_name}' adlı taramanın sonucu bulunamadı.")
return None
except requests.exceptions.RequestException as e:
print(f"[TARAMA SONUÇLARI HATA] Tarama sonuçları alınamadı: {e}")
if isinstance(e, requests.exceptions.HTTPError):
print(f"[UYARI] HTTP Hatası (Kod: {e.response.status_code}): {e.response.text}")
return []
except json.JSONDecodeError as e:
print(f"[TARAMA SONUÇLARI HATA] JSON ayrıştırma hatası: {e}, Yanıt: {response.text}")
return []
Bu fonksiyon, belirtilen tarama adına ait en son tarama sonucunun ID’sini döner. Eğer sonuç bulunamazsa None döner ve uyarı verir.
10. Tarama durumunun kontrolü
def get_scan_result_details(scan_result_id, fields=None):
"""Belirtilen Scan Result ID'sinin detaylarını getirir. """
url = f"{SC_URL}/rest/scanResult/{scan_result_id}"
if fields:
url += "?fields=" + ",".join(fields)
try:
response = requests.get(url, headers=headers, verify=False)
response.raise_for_status()
response_json = response.json()
if 'response' in response_json:
return response_json['response']
else:
print(f"[HATA] Beklenmeyen yanıt yapısı (ID: {scan_result_id}): {response_json}")
return None
except requests.exceptions.RequestException as e:
print(f"[HATA] Scan Result (ID: {scan_result_id}) alınamadı: {e}")
if isinstance(e, requests.exceptions.HTTPError):
print(f"[UYARI] HTTP Hatası (Kod: {e.response.status_code}): {e.response.text}")
return None
Verilen Scan Result ID’ye ait tarama detaylarını getirir. Yanıtta response alanı yoksa veya istek başarısız olursa hata mesajı döner.
11) Tarama sonuclarının çekilmesi
def get_analysis_results(scan_result_id, start_offset, end_offset, scan_query_id):
"""Belirli bir tarama ve sorgu ID'si için zafiyet analiz sonuçlarını getirir."""
url = f"{SC_URL}/rest/analysis"
payload = {
"type": "vuln",
"query": {
"id": scan_query_id
},
"sourceType": "individual",
"startOffset": start_offset,
"endOffset": end_offset,
"scanID": scan_result_id,
"view": "all"
}
try:
print(f"[ZAFİYET ALMA] İstek URL'si: {url}")
print(f"[ZAFİYET ALMA] Gönderilen Veri: {json.dumps(payload)}")
response = requests.post(url, headers=headers, verify=False, json=payload)
print(f"[ZAFİYET ALMA] Yanıt Durum Kodu: {response.status_code}")
if response.status_code == 200:
try:
analysis_results = response.json().get('response')
if analysis_results and 'results' in analysis_results:
return analysis_results
else:
print("[UYARI] Analiz sonuçları boş veya beklenenden farklı geldi.")
return None
except json.JSONDecodeError as e:
print(f"[ZAFİYET ALMA HATA] JSON ayrıştırma hatası: {e}, Yanıt: {response.text}")
return None
else:
print(f"[ZAFİYET ALMA HATA] Analiz sonuçları alınamadı. Durum Kodu: {response.status_code}, Yanıt: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"[ZAFİYET ALMA HATA] Analiz sonuçları alınırken bir istek hatası oluştu: {e}")
return None
Belirtilen tarama sonucu ve sorgu ID’sine göre zafiyet analiz verilerini getirir. Başarısız isteklerde veya beklenmeyen yanıt yapısında hata mesajı döner.
12) Ana fonksiyon
def main():
"""Ana çalıştırma fonksiyonu."""
try:
ip_list = read_ips_from_file(IP_LIST_FILE)
if not ip_list:
print("[ANA] IP listesi boş olduğu için işlem sonlandırılıyor.")
return
asset_name = generate_unique_name(ASSET_NAME_PREFIX)
asset_id, created_asset_name = create_or_update_asset(asset_name, ip_list)
if asset_id:
scan_name = generate_unique_name(SCAN_NAME_PREFIX)
scan_id = create_scan(POLICY_ID, asset_id, created_asset_name, REPO_ID, scan_name)
if scan_id:
print(f"[ANA] Tarama başlatıldı (ID: {scan_id}, Adı: {scan_name}). Tamamlanması bekleniyor...")
start_time = time.time()
while True:
time.sleep(SCAN_STATUS_CHECK_INTERVAL)
latest_scan_result_id = get_latest_scan_result_id_by_name(scan_name)
if latest_scan_result_id:
scan_result_details_status = get_scan_result_details(latest_scan_result_id, fields=['status'])
if scan_result_details_status:
status = scan_result_details_status.get('status')
print(f"[TARAMA DURUMU] Tarama Adı: {scan_name}, Sonuç ID: {latest_scan_result_id}, Durum: {status}")
if status == "Completed":
print("[TARAMA DURUMU] Tarama tamamlandı. Zafiyet sonuçları analiz ediliyor...")
scan_result_full_details = get_scan_result_details(latest_scan_result_id)
if scan_result_full_details:
analysis_results = get_analysis_results(latest_scan_result_id, ANALYSIS_START_OFFSET, ANALYSIS_END_OFFSET, ANALYSIS_QUERY_ID)
if analysis_results and 'results' in analysis_results:
critical_count = 0
high_count = 0
for result in analysis_results['results']:
if result.get('severity', {}).get('id') == "4":
critical_count += 1
elif result.get('severity', {}).get('id') == "3":
high_count += 1
print(f"\n[TARAMA SONUÇLARI] (Tarama Adı: {scan_name})")
print(f" Toplam Kritik Zafiyet Sayısı (İlk {ANALYSIS_END_OFFSET - ANALYSIS_START_OFFSET} Kayıtta): {critical_count}")
print(f" Toplam Yüksek Seviyeli Zafiyet Sayısı (İlk {ANALYSIS_END_OFFSET - ANALYSIS_START_OFFSET} Kayıtta): {high_count}")
print(f" Toplam Kayıt Sayısı: {analysis_results.get('totalRecords')}")
break
else:
print("[UYARI] Tarama tamamlandı ancak zafiyet sonuçları alınamadı.")
break
else:
print("[HATA] Scan Result detayları alınamadı (ID: {latest_scan_result_id}).")
break
elif status in ["Canceled", "Failed", "Aborted"]:
print(f"[TARAMA DURUMU] Tarama tamamlanmadı. Durum: {status}")
return
elif time.time() - start_time > SCAN_COMPLETION_TIMEOUT:
print("[TARAMA DURUMU] Tarama tamamlama zaman aşımına uğradı. İşlem sonlandırılıyor.")
break
else:
print(f"[UYARI] Scan Result (ID: {latest_scan_result_id}) durumu alınamadı.")
else:
print(f"[UYARI] \"{scan_name}\" adına sahip tamamlanmış bir tarama sonucu bulunamadı.")
time.sleep(SCAN_STATUS_CHECK_INTERVAL)
else:
print("[ANA HATA] Tarama oluşturma/başlatma başarısız oldu.")
else:
print("[ANA HATA] Asset oluşturma/güncelleme başarısız olduğu için tarama başlatılamadı.")
print("[ANA] Script başarıyla tamamlandı.")
except Exception as e:
print(f"[ANA HATA] Beklenmeyen bir hata oluştu: {e}")
Script çıktısı
Makalemi okuduğunuz için teşekkür ederim. Umarım bu içerik, Tenable.sc API kullanımına dair pratik bir bakış sunmuş ve sizlere faydalı olmuştur. Başka yazılarda görüşmek üzere.