550 lines
21 KiB
Python
550 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""Upload firmware, manifest, and data assets to a MinIO (S3-compatible) bucket.
|
|
|
|
Features preserved from original GCS script:
|
|
- Optional backup (copies existing objects under destination prefix to timestamped folder under backups/)
|
|
- Upload firmware.bin, update.json, and recursively mirror a data directory
|
|
- Cache-Control set to disable caching on clients
|
|
|
|
Switches from google.cloud.storage to boto3 (S3 API) for MinIO compatibility.
|
|
"""
|
|
import os
|
|
import sys
|
|
import datetime
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
from botocore.config import Config
|
|
except ImportError:
|
|
print("ERROR: boto3 is required. Install with: pip install boto3")
|
|
sys.exit(1)
|
|
|
|
# =============================================================================
|
|
# CONFIGURATION CONSTANTS (edit as needed or supply via environment variables)
|
|
# =============================================================================
|
|
|
|
CREATE_BACKUP = False
|
|
UPLOAD_FIRMWARE = True
|
|
UPDATE_MANIFEST = True
|
|
UPLOAD_DATA = True
|
|
|
|
DIR_SKIP_LIST = [
|
|
"system", # Just directory names, not paths
|
|
"booths"
|
|
]
|
|
|
|
FILES_SKIP_LIST = [
|
|
# Add base filenames to skip regardless of directory, e.g. "readme.txt"
|
|
]
|
|
|
|
# Bucket / endpoint configuration
|
|
BUCKET_NAME = os.getenv('MINIO_BUCKET', 'boothifier')
|
|
DESTINATION_DIR = os.getenv('MINIO_DEST_PREFIX', 'latest') # prefix inside bucket
|
|
BACKUPS_DIR = os.getenv('MINIO_BACKUPS_PREFIX', 'backups')
|
|
|
|
PROJECT_ROOT_PATH = Path(__file__).parent.parent.resolve()
|
|
LOCAL_ROOT_PATH = Path(__file__).parent.resolve()
|
|
|
|
# Optional service account style JSON key (generated by MinIO Console). Expected fields:
|
|
# {"url":"https://minio.example.com/api/v1/service-account-credentials","accessKey":"...","secretKey":"...","api":"s3v4","path":"auto"}
|
|
MINIO_KEY_FILE = LOCAL_ROOT_PATH / 'minio-boothifier-key.json'
|
|
|
|
# Defaults before loading file / env
|
|
_json_access = None
|
|
_json_secret = None
|
|
_json_url = None
|
|
|
|
def _load_json_key():
|
|
global _json_access, _json_secret, _json_url
|
|
try:
|
|
if MINIO_KEY_FILE.is_file():
|
|
with open(MINIO_KEY_FILE, 'r', encoding='utf-8') as fh:
|
|
data = json.load(fh)
|
|
_json_access = data.get('accessKey') or None
|
|
_json_secret = data.get('secretKey') or None
|
|
_json_url = data.get('url') or None
|
|
except Exception as e:
|
|
print(f"WARN: Failed to load MinIO key file '{MINIO_KEY_FILE.name}': {e}")
|
|
|
|
_load_json_key()
|
|
|
|
def _derive_endpoint(url_value: str) -> str:
|
|
if not url_value:
|
|
return 'https://s3-minio.boothwizard.com'
|
|
# Remove known API suffix if present (/api/...)
|
|
# e.g. https://s3-minio.boothwizard.com/api/v1/service-account-credentials -> https://s3-minio.boothwizard.com
|
|
parts = url_value.split('/api/')
|
|
return parts[0] if parts else url_value
|
|
|
|
# MinIO credentials with precedence: ENV > JSON file > fallback
|
|
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT') or _derive_endpoint(_json_url)
|
|
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY') or _json_access or 'CHANGE_ME_ACCESS'
|
|
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY') or _json_secret or 'CHANGE_ME_SECRET'
|
|
MINIO_REGION = os.getenv('MINIO_REGION', 'us-east-1') # MinIO ignores but boto3 wants some value
|
|
|
|
# Addressing / SSL options
|
|
MINIO_ADDRESSING = os.getenv('MINIO_ADDRESSING_STYLE', 'path').lower() # 'path' or 'virtual'
|
|
MINIO_VERIFY_SSL = os.getenv('MINIO_TLS_VERIFY', '1') not in ('0','false','no')
|
|
MINIO_DEBUG = os.getenv('MINIO_DEBUG', '0') in ('1','true','yes')
|
|
MINIO_ALLOW_VARIANTS = os.getenv('MINIO_ALLOW_ENDPOINT_VARIANTS', '0') in ('1','true','yes') # normally false with nginx redirect
|
|
|
|
LOCAL_FIRMWARE_PATH = str(PROJECT_ROOT_PATH / '.pio' / 'build' / 'esp32s3dev' / 'firmware.bin')
|
|
LOCAL_DATA_DIRECTORY = str(PROJECT_ROOT_PATH / 'data')
|
|
VERSION_HEADER_PATH = str(PROJECT_ROOT_PATH / 'include' / 'version.h') # source of version/description/changelog
|
|
MANIFEST_FILENAME = os.getenv('MANIFEST_FILENAME', 'manifest.json') # destination manifest name
|
|
|
|
# =============================================================================
|
|
# HELPERS
|
|
# =============================================================================
|
|
|
|
def s3_client():
|
|
"""Create an S3 client pointed at MinIO endpoint, forcing path-style unless overridden, with short timeouts."""
|
|
addressing = 'path' if MINIO_ADDRESSING not in ('virtual','auto') else 'virtual'
|
|
cfg = Config(
|
|
s3={'addressing_style': addressing},
|
|
signature_version='s3v4',
|
|
connect_timeout=3,
|
|
read_timeout=5,
|
|
retries={'max_attempts': 2}
|
|
)
|
|
if MINIO_DEBUG:
|
|
masked_key = (MINIO_ACCESS_KEY[:3] + '...' + MINIO_ACCESS_KEY[-3:]) if MINIO_ACCESS_KEY else 'None'
|
|
print(f"[DEBUG] Creating client: endpoint={MINIO_ENDPOINT} addressing={addressing} verifySSL={MINIO_VERIFY_SSL} region={MINIO_REGION} accessKey={masked_key}")
|
|
return boto3.client(
|
|
's3',
|
|
endpoint_url=MINIO_ENDPOINT,
|
|
aws_access_key_id=MINIO_ACCESS_KEY,
|
|
aws_secret_access_key=MINIO_SECRET_KEY,
|
|
region_name=MINIO_REGION,
|
|
verify=MINIO_VERIFY_SSL,
|
|
config=cfg,
|
|
)
|
|
|
|
def _endpoint_variants(base: str):
|
|
"""Return endpoint variants only if explicitly allowed; otherwise just the base (nginx handles forwarding)."""
|
|
if not MINIO_ALLOW_VARIANTS:
|
|
return [base]
|
|
# Fallback to previous expanded logic if variants are enabled
|
|
try:
|
|
variants = []
|
|
if not base:
|
|
return variants
|
|
base = base.rstrip('/')
|
|
proto_sep = '://'
|
|
if proto_sep in base:
|
|
scheme, rest = base.split(proto_sep,1)
|
|
else:
|
|
scheme, rest = 'https', base
|
|
host_port = rest
|
|
if ':' in host_port:
|
|
host, port = host_port.split(':',1)
|
|
else:
|
|
host, port = host_port, ''
|
|
variants.append(f"{scheme}://{host_port}")
|
|
common_ports = ['9000','443','80']
|
|
for p in common_ports:
|
|
if port != p:
|
|
variants.append(f"{scheme}://{host}:{p}")
|
|
alt_scheme = 'http' if scheme == 'https' else 'https'
|
|
variants.append(f"{alt_scheme}://{host_port}")
|
|
for p in common_ports:
|
|
if port != p:
|
|
variants.append(f"{alt_scheme}://{host}:{p}")
|
|
seen = set()
|
|
uniq = []
|
|
for v in variants:
|
|
if v not in seen:
|
|
uniq.append(v)
|
|
seen.add(v)
|
|
return uniq
|
|
except Exception:
|
|
return [base]
|
|
|
|
def create_validated_client():
|
|
"""Validate (or create) client using only provided endpoint unless variants enabled."""
|
|
global MINIO_ENDPOINT
|
|
primary = MINIO_ENDPOINT
|
|
variants = _endpoint_variants(primary) or [primary]
|
|
errors = []
|
|
probe_bucket = BUCKET_NAME # we will head the target bucket directly
|
|
for candidate in variants:
|
|
saved = MINIO_ENDPOINT
|
|
MINIO_ENDPOINT = candidate
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] Probing endpoint candidate: {candidate}")
|
|
try:
|
|
c = s3_client()
|
|
try:
|
|
c.head_bucket(Bucket=probe_bucket)
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] head_bucket succeeded on {candidate} for '{probe_bucket}'.")
|
|
return c
|
|
except ClientError as e:
|
|
msg = str(e)
|
|
# Acceptable if bucket not found (we can create later)
|
|
if any(code in msg for code in ('404', 'NoSuchBucket', 'NotFound')):
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] Bucket not found on {candidate} (expected if first deploy). Using this endpoint.")
|
|
return c
|
|
if 'API Requests must be made to API port' in msg:
|
|
errors.append(f"{candidate}: wrong port (console endpoint)")
|
|
else:
|
|
errors.append(f"{candidate}: {msg}")
|
|
MINIO_ENDPOINT = saved
|
|
except Exception as ex:
|
|
errors.append(f"{candidate}: {ex}")
|
|
MINIO_ENDPOINT = saved
|
|
continue
|
|
print("ERROR: Could not validate any endpoint candidate.")
|
|
for e in errors:
|
|
print(' - ' + e)
|
|
print("Provide correct API endpoint (e.g. https://host:9000) via MINIO_ENDPOINT env var.")
|
|
sys.exit(3)
|
|
|
|
def list_objects(client, prefix: str):
|
|
"""Generator yielding object keys under a prefix (non-recursive listing with pagination)."""
|
|
kwargs = {'Bucket': BUCKET_NAME, 'Prefix': prefix}
|
|
while True:
|
|
resp = client.list_objects_v2(**kwargs)
|
|
for obj in resp.get('Contents', []):
|
|
yield obj['Key']
|
|
if not resp.get('IsTruncated'):
|
|
break
|
|
kwargs['ContinuationToken'] = resp['NextContinuationToken']
|
|
|
|
def normalize_prefix(p: str) -> str:
|
|
p = p.strip('/')
|
|
return p
|
|
|
|
def join_key(*parts: str) -> str:
|
|
parts_clean = [p.strip('/') for p in parts if p is not None and p != '']
|
|
return '/'.join(parts_clean)
|
|
|
|
def backup_existing_files(client, destination_prefix: str, backups_prefix: str, backup_folder: str):
|
|
if not destination_prefix:
|
|
prefix = ''
|
|
else:
|
|
prefix = destination_prefix + '/'
|
|
print(f"Scanning existing objects under '{prefix}' for backup...")
|
|
for key in list_objects(client, prefix):
|
|
if backups_prefix and key.startswith(backups_prefix + '/'): # Skip prior backups
|
|
continue
|
|
# relative path within destination
|
|
relative = key[len(prefix):] if prefix and key.startswith(prefix) else key
|
|
backup_key = join_key(backups_prefix, backup_folder, relative)
|
|
print(f"Backup copy: {key} -> {backup_key}")
|
|
client.copy_object(
|
|
Bucket=BUCKET_NAME,
|
|
CopySource={'Bucket': BUCKET_NAME, 'Key': key},
|
|
Key=backup_key,
|
|
MetadataDirective='COPY'
|
|
)
|
|
|
|
def upload_file(client, local_path: str, key: str, cache_control: str = 'private, max-age=0, no-transform'):
|
|
if not os.path.isfile(local_path):
|
|
print(f"WARN: File missing, skipping: {local_path}")
|
|
return
|
|
print(f"Upload: {local_path} -> s3://{BUCKET_NAME}/{key}")
|
|
extra_args = { 'CacheControl': cache_control }
|
|
client.upload_file(local_path, BUCKET_NAME, key, ExtraArgs=extra_args)
|
|
|
|
def upload_directory(client, local_directory: str, destination_prefix: str):
|
|
if not os.path.isdir(local_directory):
|
|
print(f"WARN: Data directory missing: {local_directory}")
|
|
return
|
|
skip_dirs = set(DIR_SKIP_LIST)
|
|
skip_files = set(FILES_SKIP_LIST)
|
|
for root, dirs, files in os.walk(local_directory):
|
|
rel_dir = os.path.relpath(root, local_directory).replace('\\','/')
|
|
if rel_dir == '.':
|
|
rel_dir = ''
|
|
# Prune directories in-place if their TOP-LEVEL relative segment matches skip list
|
|
pruned = []
|
|
for d in list(dirs):
|
|
seg = d # immediate subdir name
|
|
if seg in skip_dirs:
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] Skipping directory subtree: {os.path.join(root,d)}")
|
|
dirs.remove(d)
|
|
pruned.append(d)
|
|
for fname in files:
|
|
if fname in skip_files:
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] Skipping file by name: {os.path.join(root,fname)}")
|
|
continue
|
|
full = os.path.join(root, fname)
|
|
rel = os.path.relpath(full, local_directory).replace('\\','/') # force forward slashes for S3 keys
|
|
# If top-level directory of this file is in skip list, skip (covers deeper nested finds if any slipped through)
|
|
top_level = rel.split('/',1)[0]
|
|
if top_level in skip_dirs:
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] Skipping file in skipped dir: {rel}")
|
|
continue
|
|
key = join_key(destination_prefix, rel)
|
|
upload_file(client, full, key)
|
|
|
|
def ensure_bucket(client):
|
|
"""Ensure bucket exists; provide diagnostics if HeadBucket returns 400/other errors."""
|
|
try:
|
|
client.head_bucket(Bucket=BUCKET_NAME)
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] Bucket '{BUCKET_NAME}' exists.")
|
|
return
|
|
except ClientError as e:
|
|
code = e.response.get('Error', {}).get('Code')
|
|
status = e.response.get('ResponseMetadata', {}).get('HTTPStatusCode')
|
|
print(f"HeadBucket failed (code={code}, status={status}).")
|
|
|
|
# List buckets for diagnostics
|
|
try:
|
|
resp = client.list_buckets()
|
|
bucket_names = [b['Name'] for b in resp.get('Buckets', [])]
|
|
print(f"Available buckets: {bucket_names or 'None'}")
|
|
except Exception as le:
|
|
print(f"WARN: list_buckets failed: {le}")
|
|
|
|
if code in ('404', 'NoSuchBucket', 'NotFound'):
|
|
print(f"Bucket '{BUCKET_NAME}' not found. Attempting to create...")
|
|
try:
|
|
client.create_bucket(Bucket=BUCKET_NAME)
|
|
print(f"Created bucket '{BUCKET_NAME}'.")
|
|
return
|
|
except ClientError as ce:
|
|
print(f"ERROR: Cannot create bucket: {ce}")
|
|
sys.exit(2)
|
|
|
|
if status == 400:
|
|
print("HINTS: \n - Verify endpoint URL (MINIO_ENDPOINT).\n - Ensure no trailing slash in endpoint.\n - Check that TLS verify matches server cert (set MINIO_TLS_VERIFY=0 to test).\n - Confirm bucket name is correct and DNS compatible.\n - Credentials may lack permission: verify access key policies.")
|
|
|
|
# Retry once forcing path style if not already
|
|
if MINIO_ADDRESSING != 'path':
|
|
print("Retrying with path-style addressing...")
|
|
os.environ['MINIO_ADDRESSING_STYLE'] = 'path'
|
|
new_client = s3_client()
|
|
try:
|
|
new_client.head_bucket(Bucket=BUCKET_NAME)
|
|
print("Second attempt succeeded with path-style addressing.")
|
|
return
|
|
except ClientError as e2:
|
|
print(f"Second HeadBucket attempt failed: {e2}")
|
|
print(f"ERROR: head_bucket ultimately failed: {e}")
|
|
sys.exit(2)
|
|
|
|
# =============================================================================
|
|
# MAIN
|
|
# =============================================================================
|
|
|
|
def main():
|
|
dest_prefix = normalize_prefix(DESTINATION_DIR)
|
|
backups_prefix = normalize_prefix(BACKUPS_DIR) if BACKUPS_DIR else ''
|
|
client = create_validated_client()
|
|
if MINIO_DEBUG:
|
|
print("[DEBUG] Starting ensure_bucket phase...")
|
|
ensure_bucket(client)
|
|
|
|
# Create backup if needed
|
|
if CREATE_BACKUP:
|
|
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_folder = f"backup_{ts}"
|
|
print(f"Creating backup under '{backups_prefix}/{backup_folder}' from prefix '{dest_prefix}'")
|
|
backup_existing_files(client, dest_prefix, backups_prefix, backup_folder)
|
|
print("Backup complete.")
|
|
else:
|
|
print("Backup creation skipped.")
|
|
|
|
# Firmware
|
|
if UPLOAD_FIRMWARE:
|
|
firmware_key = join_key(dest_prefix, 'firmware.bin') if dest_prefix else 'firmware.bin'
|
|
upload_file(client, LOCAL_FIRMWARE_PATH, firmware_key)
|
|
print("Firmware upload complete.")
|
|
else:
|
|
print("Firmware upload skipped.")
|
|
|
|
# Data directory
|
|
if UPLOAD_DATA:
|
|
data_prefix = join_key(dest_prefix, 'data') if dest_prefix else 'data'
|
|
upload_directory(client, LOCAL_DATA_DIRECTORY, data_prefix)
|
|
print("All uploads complete.")
|
|
else:
|
|
print("Data upload skipped.")
|
|
|
|
# Manifest
|
|
if UPDATE_MANIFEST:
|
|
manifest_key = join_key(dest_prefix, MANIFEST_FILENAME) if dest_prefix else MANIFEST_FILENAME
|
|
try:
|
|
manifest_doc = build_and_write_manifest(client, dest_prefix)
|
|
upload_manifest_json(client, manifest_doc, manifest_key)
|
|
print(f"Manifest upload complete: s3://{BUCKET_NAME}/{manifest_key}")
|
|
except Exception as e:
|
|
print(f"ERROR: Manifest generation/upload failed: {e}")
|
|
sys.exit(4)
|
|
else:
|
|
print("Manifest upload skipped.")
|
|
|
|
# ================= Manifest Support =================
|
|
|
|
def md5_hex(path: str, chunk_size: int = 65536) -> str:
|
|
h = hashlib.md5()
|
|
with open(path, 'rb') as f:
|
|
while True:
|
|
chunk = f.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
def collect_data_files(data_root: str):
|
|
files = []
|
|
if not os.path.isdir(data_root):
|
|
return files
|
|
skip_dirs = set(DIR_SKIP_LIST)
|
|
skip_files = set(FILES_SKIP_LIST)
|
|
for root, dirs, filenames in os.walk(data_root):
|
|
# Prune dirs
|
|
for d in list(dirs):
|
|
if d in skip_dirs:
|
|
dirs.remove(d)
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] (manifest) Pruned dir: {os.path.join(root,d)}")
|
|
for fname in filenames:
|
|
if fname in skip_files:
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] (manifest) Skipped file: {os.path.join(root,fname)}")
|
|
continue
|
|
full = os.path.join(root, fname)
|
|
rel = os.path.relpath(full, data_root).replace('\\','/')
|
|
top_level = rel.split('/',1)[0]
|
|
if top_level in skip_dirs:
|
|
if MINIO_DEBUG:
|
|
print(f"[DEBUG] (manifest) Skipped by top-level dir: {rel}")
|
|
continue
|
|
entry = {
|
|
'path': f"data/{rel}",
|
|
'md5': md5_hex(full),
|
|
'size': os.path.getsize(full)
|
|
}
|
|
files.append(entry)
|
|
files.sort(key=lambda x: x['path'])
|
|
return files
|
|
|
|
def parse_version_header(path: str):
|
|
"""Parse version.h to extract version numbers, description, and changelog lines."""
|
|
if not os.path.isfile(path):
|
|
raise FileNotFoundError(f"version header not found: {path}")
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
major = minor = patch = None
|
|
desc_lines = []
|
|
changelog_lines = []
|
|
state = None # None | DESC | CHANGELOG
|
|
|
|
re_major = re.compile(r'#define\s+FIRMWARE_VERSION_MAJOR\s+(\d+)')
|
|
re_minor = re.compile(r'#define\s+FIRMWARE_VERSION_MINOR\s+(\d+)')
|
|
re_patch = re.compile(r'#define\s+FIRMWARE_VERSION_PATCH\s+(\d+)')
|
|
re_quote = re.compile(r'^\s*"(.*)"\\?\s*$')
|
|
|
|
for raw in lines:
|
|
line = raw.rstrip('\n')
|
|
if major is None:
|
|
m = re_major.match(line)
|
|
if m:
|
|
major = int(m.group(1))
|
|
continue
|
|
if minor is None:
|
|
m = re_minor.match(line)
|
|
if m:
|
|
minor = int(m.group(1))
|
|
continue
|
|
if patch is None:
|
|
m = re_patch.match(line)
|
|
if m:
|
|
patch = int(m.group(1))
|
|
continue
|
|
if line.startswith('#define FIRMWARE_DESCRIPTION'):
|
|
state = 'DESC'
|
|
continue
|
|
if line.startswith('#define FIRMWARE_CHANGELOG'):
|
|
state = 'CHANGELOG'
|
|
continue
|
|
if line.startswith('#define') and state in ('DESC','CHANGELOG'):
|
|
state = None
|
|
if state in ('DESC','CHANGELOG'):
|
|
mq = re_quote.match(line.strip())
|
|
if mq:
|
|
text = mq.group(1).replace('\\n', '\n')
|
|
if state == 'DESC':
|
|
desc_lines.append(text)
|
|
else:
|
|
changelog_lines.append(text)
|
|
else:
|
|
state = None
|
|
|
|
if None in (major, minor, patch):
|
|
raise ValueError('Failed to parse version numbers from version.h')
|
|
|
|
description = '\n'.join([t for t in desc_lines if t]).strip()
|
|
changelog = []
|
|
for seg in changelog_lines:
|
|
for part in seg.split('\n'):
|
|
part = part.strip()
|
|
if part:
|
|
changelog.append(part)
|
|
return {
|
|
'version': {'major': major, 'minor': minor, 'patch': patch},
|
|
'description': description,
|
|
'changelog': changelog
|
|
}
|
|
|
|
def build_and_write_manifest(client, dest_prefix: str):
|
|
# Parse version.h instead of manifest-local.json
|
|
base_info = parse_version_header(VERSION_HEADER_PATH)
|
|
# Timestamp
|
|
now = datetime.datetime.now()
|
|
release_date = now.strftime('%Y-%m-%d')
|
|
release_time = now.strftime('%H:%M:%S')
|
|
# Firmware info
|
|
fw_path_local = LOCAL_FIRMWARE_PATH
|
|
if not os.path.isfile(fw_path_local):
|
|
raise FileNotFoundError(f"Firmware file not found: {fw_path_local}")
|
|
fw_md5 = md5_hex(fw_path_local)
|
|
fw_size = os.path.getsize(fw_path_local)
|
|
# Data files
|
|
data_files = collect_data_files(LOCAL_DATA_DIRECTORY)
|
|
manifest = {
|
|
'version': {
|
|
'major': int(base_info['version']['major']),
|
|
'minor': int(base_info['version']['minor']),
|
|
'patch': int(base_info['version']['patch'])
|
|
},
|
|
'release_date': release_date,
|
|
'release_time': release_time,
|
|
'description': base_info.get('description',''),
|
|
'changelog': base_info.get('changelog', []),
|
|
'firmware': {
|
|
'path': 'firmware.bin',
|
|
'md5': fw_md5,
|
|
'size': fw_size
|
|
},
|
|
'files': data_files
|
|
}
|
|
return manifest
|
|
|
|
def upload_manifest_json(client, manifest_obj: dict, key: str):
|
|
body = json.dumps(manifest_obj, indent=4).encode('utf-8')
|
|
client.put_object(
|
|
Bucket=BUCKET_NAME,
|
|
Key=key,
|
|
Body=body,
|
|
ContentType='application/json',
|
|
CacheControl='private, max-age=0, no-transform'
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|