1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
   | 
  import os import random import string import oss2
 
 
 
 
 
 
 
 
 
 
 
 
  access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>') access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>') bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>') endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
 
 
  for param in (access_key_id, access_key_secret, bucket_name, endpoint):     assert '<' not in param, '请设置参数:' + param
 
 
  bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
 
  def random_string(n):     return ''.join(random.choice(string.ascii_lowercase) for i in range(n))
 
  filename = random_string(32) + '.txt' content = oss2.to_bytes(random_string(1024 * 1024))
  with open(filename, 'wb') as fileobj:     fileobj.write(content)
 
  """ 断点续传上传 """
 
 
  oss2.resumable_upload(bucket, 'remote-normal.txt', filename)
 
  oss2.resumable_upload(bucket, 'remote-multipart.txt', filename, multipart_threshold=100 * 1024)
 
  """ 分片上传 """
 
 
  total_size = os.path.getsize(filename) part_size = oss2.determine_part_size(total_size, preferred_size=128 * 1024)
 
  key = 'remote-multipart2.txt' upload_id = bucket.init_multipart_upload(key).upload_id
 
 
  with open(filename, 'rb') as fileobj:     parts = []     part_number = 1     offset = 0     while offset < total_size:         size_to_upload = min(part_size, total_size - offset)         result = bucket.upload_part(key, upload_id, part_number,                                     oss2.SizedFileAdapter(fileobj, size_to_upload))         parts.append(oss2.models.PartInfo(part_number, result.etag, size = size_to_upload, part_crc = result.crc))
          offset += size_to_upload         part_number += 1
           bucket.complete_multipart_upload(key, upload_id, parts)
 
  with open(filename, 'rb') as fileobj:     assert bucket.get_object(key).read() == fileobj.read()
 
  os.remove(filename)
 
  |