Source code for et_engine.config

import requests
import json
import os
from math import ceil
import asyncio, aiohttp, aiofiles
from tqdm import tqdm
import time


MIN_CHUNK_SIZE_BYTES = 8 * 1024 * 1024
MAX_CHUNK_SIZE_BYTES = 64 * 1024 * 1024


if os.environ.get("ENV") == "dev":
    API_ENDPOINT = "https://api-dev.exploretech.ai"
else:
    API_ENDPOINT = "https://api.exploretech.ai"


[docs] class PayloadTooLargeError(Exception): pass
[docs] class ChunkTooSmallError(Exception): pass
[docs] class MultipartUploadPresignedUrls:
[docs] def __init__(self, upload_id, urls, chunk_size): self.upload_id = upload_id self.urls = urls self.chunk_zie = chunk_size
[docs] class MultipartUpload:
[docs] def __init__(self, local_file, endpoint_url, chunk_size=MIN_CHUNK_SIZE_BYTES, timeout=7200, connections=100, method="POST"): self.local_file = local_file self.url = endpoint_url self.method = method self.file_size_bytes = os.stat(local_file).st_size self.num_parts = ceil(self.file_size_bytes / chunk_size) self.chunk_size = chunk_size self.timeout = timeout self.connections = connections self.presigned_urls = None self.uploaded_parts = None
[docs] def request_upload(self): response = requests.request( self.method, self.url, data=json.dumps({ "num_parts": self.num_parts }), headers={"Authorization": os.environ["ET_ENGINE_API_KEY"]} ) if response.status_code == 504: self.chunk_size = self.chunk_size * 2 self.num_parts = ceil(self.file_size_bytes / self.chunk_size) if self.chunk_size > MAX_CHUNK_SIZE_BYTES: raise Exception("Chunk Size Too Large") print(f"Increasing chunk size to {self.chunk_size / 1024 / 1024} MB") return self.request_upload() if response.ok: response = response.json() upload_id = response["UploadId"] urls = response["urls"] self.presigned_urls = MultipartUploadPresignedUrls(upload_id, urls, self.chunk_size) return self.presigned_urls else: raise Exception(f"Error creating multipart upload: {response}, {response.text}")
[docs] def upload(self): self.uploaded_parts = asyncio.run(self.upload_parts_in_parallel())
[docs] def complete_upload(self): if self.uploaded_parts is None: raise Exception("Upload not yet complete") # Step 4: Complete upload complete = requests.request( self.method, self.url, data=json.dumps({ "complete": "true", "parts": self.uploaded_parts, "UploadId": self.presigned_urls.upload_id }), headers={"Authorization": os.environ["ET_ENGINE_API_KEY"]} ) if complete.status_code != 200: raise Exception(f"Error completing upload: {complete}, {complete.reason}, {complete.text}")
[docs] async def upload_part(self, part_number, presigned_url, session, max_retries=5): """ Uploads one part in a multipart upload """ num_tries = 0 starting_byte = (part_number - 1) * self.chunk_size async with aiofiles.open(self.local_file, mode='rb') as file: await file.seek(starting_byte) chunk = await file.read(self.chunk_size) while num_tries < max_retries: async with session.put(presigned_url, data=chunk) as status: if status.ok: return {"ETag": status.headers["ETag"], "PartNumber": part_number} else: xml_response = await status.text() num_tries += 1 raise Exception(f"Error uploading part: {xml_response}, {status}")
[docs] async def upload_parts_in_parallel(self): """ Sends upload HTTP requests asynchronously to speed up file transfer """ if self.presigned_urls is None: raise Exception("Upload not yet requested") connector = aiohttp.TCPConnector(limit=self.connections) client_timeout = aiohttp.ClientTimeout(total=self.timeout) async with aiohttp.ClientSession(timeout=client_timeout, connector=connector) as session: upload_part_tasks = set() for part_number, presigned_url in self.presigned_urls.urls: task = asyncio.create_task( self.upload_part(part_number, presigned_url, session) ) upload_part_tasks.add(task) parts = [] for task in tqdm(asyncio.as_completed(upload_part_tasks), desc=f"[{self.file_size_bytes / 1024 / 1024 // 1} MB] '{self.local_file}'", total=len(upload_part_tasks)): completed_part = await task parts.append(completed_part) return parts
[docs] class DirectMultipartUpload:
[docs] def __init__(self, local_file, url, chunk_size=MIN_CHUNK_SIZE_BYTES, timeout=7200): self.local_file = local_file self.url = url self.file_size_bytes = os.stat(local_file).st_size self.num_parts = ceil(self.file_size_bytes / chunk_size) self.chunk_size = chunk_size self.timeout = timeout self.upload_id = None
[docs] def request_upload(self): response = requests.post( self.url, data=json.dumps({ 'size': self.file_size_bytes }), headers={ 'Authorization': os.environ['ET_ENGINE_API_KEY'] } ) if response.ok: upload_details = response.json() self.upload_id = upload_details['uploadId']
[docs] def upload(self): return asyncio.run(self.upload_parts_in_parallel())
[docs] def complete_upload(self): if self.upload_id is None: raise Exception("Upload not yet initialized") response = requests.post( self.url, data=json.dumps({ 'uploadId': self.upload_id, 'complete': True }), headers={ 'Authorization': os.environ['ET_ENGINE_API_KEY'] } ) response.raise_for_status()
[docs] async def upload_part(self, starting_byte, session): """ Uploads one part in a multipart upload """ if self.upload_id is None: raise Exception("Upload not yet initialized") async with aiofiles.open(self.local_file, mode='rb') as file: await file.seek(starting_byte) chunk = await file.read(self.chunk_size) chunk_length = len(chunk) content_range = f"[{self.upload_id}]:{starting_byte}-{starting_byte+chunk_length}" headers = { 'Authorization': os.environ['ET_ENGINE_API_KEY'], 'Content-Range': content_range } n_tries = 0 while n_tries < 5: try: async with session.put(self.url, data=chunk, headers=headers) as response: if not response.ok: raise Exception(f"Error uploading part: {response.text}") return response.status except: n_tries += 1 raise Exception("Max retries exceeded")
[docs] async def upload_parts_in_parallel(self): """ Sends upload HTTP requests asynchronously to speed up file transfer """ connector = aiohttp.TCPConnector(limit=5) client_timeout = aiohttp.ClientTimeout(total=self.timeout) async with aiohttp.ClientSession(timeout=client_timeout, connector=connector) as session: upload_part_tasks = set() for starting_byte in range(0, self.file_size_bytes, self.chunk_size): task = asyncio.create_task( self.upload_part(starting_byte, session) ) upload_part_tasks.add(task) parts = [] for task in tqdm(asyncio.as_completed(upload_part_tasks), desc=f"[{self.file_size_bytes / 1024 / 1024 // 1} MB] {self.local_file}", total=len(upload_part_tasks)): part_status = await task parts.append(part_status) return parts
[docs] class DirectMultipartDownload:
[docs] def __init__(self, local_file, url, chunk_size=MIN_CHUNK_SIZE_BYTES, timeout=7200): self.local_file = local_file self.url = url self.file_size_bytes = None self.num_parts = None self.download_id = None self.chunk_size = chunk_size self.timeout = timeout
[docs] def request_download(self): response = requests.get( self.url, params={ "init": True }, headers={ 'Authorization': os.environ['ET_ENGINE_API_KEY'] } ) if not response.ok: raise Exception(response.text) download_info = response.json() self.file_size_bytes = download_info['size'] self.download_id = download_info['download_id'] self.num_parts = ceil(self.file_size_bytes / self.chunk_size) self.initialize_file()
[docs] def initialize_file(self): if self.file_size_bytes is None or self.download_id is None or self.num_parts is None: raise Exception("Download not yet initialized") destination = f"{self.local_file}.{self.download_id}" with open(destination, "wb") as f: f.seek(self.file_size_bytes - 1) f.write(b'\0')
[docs] def download(self): return asyncio.run(self.download_parts_in_parallel())
[docs] def complete_download(self): destination = f"{self.local_file}.{self.download_id}" os.rename(destination, self.local_file)
[docs] async def download_part(self, starting_byte, session): """ Downloads one part in a multipart upload """ destination = f"{self.local_file}.{self.download_id}" async with aiofiles.open(destination, mode='r+b') as f: await f.seek(starting_byte, 0) content_range = f"{starting_byte}-{starting_byte+self.chunk_size}" headers = { 'Authorization': os.environ['ET_ENGINE_API_KEY'], 'Content-Range': content_range } n_tries = 0 while n_tries < 1: try: async with session.get(self.url, headers=headers) as response: if not response.ok: raise Exception(f"Error uploading part: {response.text}") await f.write(await response.content.read()) return response.status except: n_tries += 1 raise Exception("Max retries exceeded")
[docs] async def download_parts_in_parallel(self): """ Sends upload HTTP requests asynchronously to speed up file transfer """ connector = aiohttp.TCPConnector(limit=5) client_timeout = aiohttp.ClientTimeout(total=self.timeout) async with aiohttp.ClientSession(timeout=client_timeout, connector=connector) as session: download_part_tasks = set() for starting_byte in range(0, self.file_size_bytes, self.chunk_size): task = asyncio.create_task( self.download_part(starting_byte, session) ) download_part_tasks.add(task) parts = [] for task in tqdm(asyncio.as_completed(download_part_tasks), desc=f"[{self.file_size_bytes / 1024 / 1024 // 1} MB] {self.local_file}", total=len(download_part_tasks)): part_status = await task parts.append(part_status) return parts