1. Introduction
- Sentinel Hub은 Sentinel, Landsat 등의 위성이 찍은 위성 사진을 제공하는 플랫폼이다.
- Sentinel Hub의 API와 위성 사진 처리를 위한 python library인 eo-learn을 사용하면 원하는 위치와 날짜의 위성 사진을 쉽게 수집할 수 있다.
- 이 코드는 OrbitalAI repository를 참고하였다.
2. Setup
Install libraries
pip install sentinelhub[AWS]
pip install eo-learn
lmport libraries
import datetime
import numpy as np
from PIL import Image
from typing import Tuple
import matplotlib.pyplot as plt
from sentinelhub import (
SHConfig,
get_utm_crs,
transform_point,
CRS,
BBox,
DataCollection,
)
from eolearn.io import SentinelHubInputTask
from eolearn.core import (
EOTask,
EOPatch,
FeatureType,
)
constants
- Sentinel Hub에서 Free trial을 신청하면 client id와 secret을 받을 수 있다.
CLIENT_ID = 'YOUR SENTINEL HUB CLIENT ID'
CLIENT_SECRET = 'YOUR SENTINEL HUB CLIENT SECRET'
S2_RESOLUTION = 10
S2_BANDS = ["B02", "B03", "B04", "B08", "B05", "B06", "B07"]
BBOX_SIZE = 10240 # target image size * S2_RESOLUTION
Sentinel Hub Config
sh_config = SHConfig()
sh_config.sh_client_id = CLIENT_ID
sh_config.sh_client_secret = CLIENT_SECRET
3. Parameters
- 찾고자 하는 위경도와 시간대를 입력한다.
- 위경도에 해당하는 bounding box를 계산한다.
def get_utm_bbox(lat: float, lng: float):
utm_crs = get_utm_crs(lng, lat)
east, north = transform_point((lng, lat), CRS.WGS84, utm_crs)
east, north = 10 * int(east / 10), 10 * int(north / 10)
return BBox(
bbox=(
(east - BBOX_SIZE // 2, north - BBOX_SIZE // 2),
(east + BBOX_SIZE // 2, north + BBOX_SIZE // 2),
),
crs=utm_crs,
)
lat = 37.566535
lng = 126.9779692
time_interval = ("2023-06-10", "2023-06-15")
bbox = get_utm_bbox(lat, lng)
4. Tasks
- eo-learn core components
- EOPatch: 특정 위치의 해당하는 데이터를 담고 있는 객체
- EOTask: EOPatch를 대상으로 실행되는 연산
- sentinel2 products (DataCollection), url
- L1C
- L2A
Data download task
- 지정한 좌표와 시간대에 해당하는 Level1-C product를 다운로드한다.
- maxcc: Maximum cloud coverage
- time_difference: Minimum allowed time difference
additional_bands = [(FeatureType.DATA, name) for name in ["sunZenithAngles"]]
masks = [(FeatureType.MASK, "dataMask")]
input_task = SentinelHubInputTask(
data_collection = DataCollection.SENTINEL2_L1C,
resolution = S2_RESOLUTION,
bands_feature = (FeatureType.DATA, "BANDS"),
additional_data = masks + additional_bands,
maxcc = 0.7,
bands = S2_BANDS,
aux_request_args = {"processing": {"upsampling": "BICUBIC"}},
config = sh_config,
cache_folder = "./temp_data/",
time_difference = datetime.timedelta(minutes=180),
)
eop = input_task(bbox=bbox, time_interval=time_interval)
print(eop)
Visualization
- RGB에 해당하는 B04, B03, B02 밴드를 사용해 시각화한다.
- 더 나은 시각화를 위해 2.5를 곱해주었다. url
def bands_to_image(bands, gain=2.5):
img = np.clip(gain * bands, 0, 1) * 255
img = img.astype(np.uint8)
img = Image.fromarray(img)
return img
bands = eop.data['BANDS'][0][:, :, [2,1,0]] # (1024, 1024, 3)
img = bands_to_image(bands)
Scene classification task
- Level2-A product에서는 scene classification map을 제공한다., url
- mask에 SCL 데이터가 생긴 부분을 확인할 수 있다.
scl_download_task = SentinelHubInputTask(
data_collection = DataCollection.SENTINEL2_L2A,
resolution = S2_RESOLUTION,
additional_data = [(FeatureType.MASK, "SCL")],
aux_request_args = {"processing": {"upsampling": "BICUBIC"}},
config = sh_config,
cache_folder = "./temp_data/",
time_difference = datetime.timedelta(minutes=180),
)
eop = scl_download_task(eopatch=eop)
print(eop)
- Scene classification map 시각화
palette = [
[0, 0, 0], [255, 0, 0], [47, 47, 47], [100, 50, 0],
[0, 160, 0], [255, 230, 90], [0, 0, 255], [128, 128, 128],
[192, 192, 192], [255, 255, 255], [100, 200, 255], [255, 150, 255]
]
def mask_to_image(mask, palette):
H, W = mask.shape
image = np.zeros((H, W, 3))
for i, rgb in enumerate(palette):
image[np.where(mask==i)] = rgb
image = Image.fromarray(image.astype(np.uint8))
return image
scl_mask = eop.mask['SCL'][0, :, :, 0] # (1024, 1024)
scl_image = mask_to_image(scl_mask, palette)
Cloud segmentation task
- Scene classification 결과를 바탕으로 cloud map을 만든다.
class SCLCloudTask(EOTask):
def __init__(self, scl_feature: Tuple[FeatureType, str]):
self.scl_feature = self.parse_feature(scl_feature)
self.scl_cloud_feature = (FeatureType.MASK, "SCL_CLOUD")
self.scl_cloud_shadow_feature = (FeatureType.MASK, "SCL_CLOUD_SHADOW")
self.scl_cirrus_feature = (FeatureType.MASK, "SCL_CIRRUS")
def execute(self, eopatch: EOPatch) -> EOPatch:
scl = eopatch[self.scl_feature]
eopatch[self.scl_cloud_feature] = ((scl == 8) | (scl == 9)).astype(np.uint8)
eopatch[self.scl_cloud_shadow_feature] = (scl == 3).astype(np.uint8)
eopatch[self.scl_cirrus_feature] = (scl == 10).astype(np.uint8)
del eopatch[self.scl_feature]
return eopatch
scl_cloud_task = SCLCloudTask(scl_feature=(FeatureType.MASK, "SCL"))
eop = scl_cloud_task(eopatch=eop)
cloud_mask = eop.mask['SCL_CLOUD'][0, :, :, 0]
cloud_image = mask_to_image(cloud_mask, [[0, 0, 0], [255, 255, 255]])
'dev' 카테고리의 다른 글
GNU screen 사용법 (0) | 2023.05.18 |
---|---|
neo4j docker로 배포하기 (0) | 2023.05.17 |
GPU 서버 개발 환경 구축 (0) | 2023.05.16 |
Neo4j Python library 사용법 (0) | 2023.05.09 |