"""Resources that implement cloud abstractions"""
import json
import os
import uuid
from typing import Dict
from plynx.base import resource
from plynx.constants import NodeResources
from plynx.utils.config import get_cloud_service_config
[docs]CLOUD_SERVICE_CONFIG = get_cloud_service_config()
[docs]class CloudStorage(resource.BaseResource):
"""Storage Resource, i.e. S3 bucket"""
@staticmethod
}
@staticmethod
[docs] def prepare_output(filename: str, preview: bool = False) -> Dict[str, str]:
"""Postprocess output"""
uniq_id = str(uuid.uuid1())
cloud_filename = os.path.join(
CLOUD_SERVICE_CONFIG.prefix,
uniq_id,
)
if not preview:
with open(filename, 'w', encoding="utf8") as f:
json.dump({"path": cloud_filename}, f)
return {
NodeResources.OUTPUT: filename,
NodeResources.CLOUD_OUTPUT: cloud_filename,
}
@classmethod
[docs] def preview(cls, preview_object: resource.PreviewObject) -> str:
"""Preview resource"""
path = json.load(preview_object.fp)["path"]
url = "".join([CLOUD_SERVICE_CONFIG.url_prefix, path.split("//")[1], CLOUD_SERVICE_CONFIG.url_postfix])
return f"<a href={url}>{path}</a>"