-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathupload_wheels.py
More file actions
96 lines (72 loc) · 2.74 KB
/
upload_wheels.py
File metadata and controls
96 lines (72 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# SPDX-FileCopyrightText: 2023-2026 Espressif Systems (Shanghai) CO LTD
#
# SPDX-License-Identifier: Apache-2.0
#
"""This script uploads wheel files from the downloaded wheels directory to S3 bucket.
- argument S3 bucket
"""
import os
import re
import sys
import boto3
from colorama import Fore
from _helper_functions import print_color
s3 = boto3.resource("s3")
try:
BUCKET = s3.Bucket(sys.argv[1])
except IndexError:
raise SystemExit("Error: S3 bucket name not provided.")
WHEELS_DIR = f"{os.path.curdir}{(os.sep)}downloaded_wheels"
if not os.path.exists(WHEELS_DIR):
raise SystemExit(f"Error: The wheels directory {WHEELS_DIR} not found.")
def normalize(name):
return re.sub(r"[-_.]+", "-", name).lower()
def get_existing_wheels():
"""Get set of S3 keys for wheels currently on server."""
existing = set()
for obj in BUCKET.objects.filter(Prefix="pypi/"):
if obj.key.endswith(".whl"):
existing.add(obj.key)
return existing
print_color("---------- UPLOAD WHEELS TO S3 ----------")
existing_wheels = get_existing_wheels()
print(f"Found {len(existing_wheels)} existing wheels on S3\n")
print_color("---------- UPLOADING WHEELS ----------")
def collect_wheel_paths():
"""Collect (full_path, wheel_filename) for all .whl files in WHEELS_DIR.
Handles both flat layout (wheels directly in dir) and nested (wheels in subdirs).
"""
collected = []
for item in os.listdir(WHEELS_DIR):
path = os.path.join(WHEELS_DIR, item)
if os.path.isfile(path) and item.endswith(".whl"):
collected.append((path, item))
elif os.path.isdir(path):
for wheel in os.listdir(path):
if wheel.endswith(".whl"):
collected.append((os.path.join(path, wheel), wheel))
return collected
wheel_paths = collect_wheel_paths()
new_wheels = 0
existing_count = 0
for full_path, wheel in wheel_paths:
pattern = re.compile(r"^(.+?)-(\d+)")
match = pattern.search(wheel)
if match:
wheel_name = match.group(1)
wheel_name = normalize(wheel_name)
is_new = f"pypi/{wheel_name}/{wheel}" not in existing_wheels
BUCKET.upload_file(full_path, f"pypi/{wheel_name}/{wheel}", ExtraArgs={"ACL": "public-read"})
if is_new:
new_wheels += 1
print_color(f"++ {wheel_name}/{wheel}", Fore.GREEN)
else:
existing_count += 1
print(f" <- {wheel_name}/{wheel}")
print_color("---------- END UPLOADING ----------")
print_color("---------- STATISTICS ----------")
print_color(f"New wheels: {new_wheels}", Fore.GREEN)
print(f"Existing wheels (re-uploaded): {existing_count}")
print(f"Total uploaded: {new_wheels + existing_count}")
print_color("---------- END STATISTICS ----------")