1、Replicator運行代碼具體分析
def replicate(self, override_devices=None, override_partitions=None): """Run a replication pass""" self.start = time.time() self.suffix_count = 0 self.suffix_sync = 0 self.suffix_hash = 0 self.replication_count = 0 self.last_replication_count = -1 self.partition_times = [] if override_devices is None: override_devices = [] if override_partitions is None: override_partitions = [] #heartbeat 為心跳函數(shù) 依據(jù)配置,配置沒有 默覺得 300 stats = eventlet.spawn(self.heartbeat) #detect_lockup 檢查死鎖 lockup_detector = eventlet.spawn(self.detect_lockups) eventlet.sleep() # Give spawns a cycle try: #replication 的 woker 數(shù)量 self.run_pool = GreenPool(size=self.concurrency) # Returns a sorted list of jobs (dictionaries) that specify the # partitions, nodes, etc to be synced. # 返回專門為分區(qū),節(jié)點同步工作的排序的列表 # jobs = self.collect_jobs() for job in jobs: #重寫設備 if override_devices and job['device'] not in override_devices: continue #重寫分區(qū) if override_partitions and \ job['partition'] not in override_partitions: continue #假設重寫設備及其重寫分區(qū)在job 中 dev_path = join(self.devices_dir, job['device']) if self.mount_check and not ismount(dev_path): self.logger.warn(_('%s is not mounted'), job['device']) continue #ring沒有改變 if not self.check_ring(): self.logger.info(_("Ring change detected. Aborting " "current replication pass.")) return #假設 if job['delete']: self.run_pool.spawn(self.update_deleted, job) else: #運行的是更新 self.run_pool.spawn(self.update, job) with Timeout(self.lockup_timeout): self.run_pool.waitall() except (Exception, Timeout): self.logger.exception(_("Exception in top-level replication loop")) self.kill_coros() finally: stats.kill() lockup_detector.kill() self.stats_line()
def collect_jobs(self): """ Returns a sorted list of jobs (dictionaries) that specify the partitions, nodes, etc to be synced. """ jobs = [] ips = whataremyips() #replication_ip 和replication_port 在 RingBuilder中 load加入 #self.object_ring = Ring(self.swift_dir, ring_name='object') for local_dev in [dev for dev in self.object_ring.devs if dev and dev['replication_ip'] in ips and dev['replication_port'] == self.port]: dev_path = join(self.devices_dir, local_dev['device']) obj_path = join(dev_path, 'objects') tmp_path = join(dev_path, 'tmp') if self.mount_check and not ismount(dev_path): self.logger.warn(_('%s is not mounted'), local_dev['device']) continue #Remove any file in a given path that that was last modified before mtime. #/srv/1/node/sdb1/tmp下的文件 unlink_older_than(tmp_path, time.time() - self.reclaim_age) if not os.path.exists(obj_path): try: mkdirs(obj_path) except Exception: self.logger.exception('ERROR creating %s' % obj_path) continue #root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls #13069 133971 4799 58208 94238 for partition in os.listdir(obj_path): try: job_path = join(obj_path, partition) #推斷當前路徑是否為文件,假設是文件則刪除 if isfile(job_path): # # Clean up any (probably zero-byte) files where a # partition should be. self.logger.warning('Removing partition directory ' 'which was a file: %s', job_path) os.remove(job_path) continue #獲得每一個partion相應的設備 part_nodes = \ self.object_ring.get_part_nodes(int(partition)) #nodes為不是本機器nodes的其它replica-1個nodes nodes = [node for node in part_nodes if node['id'] != local_dev['id']] #對objects下全部partion遍歷,故有jobs的長度最大為_replica2part2dev分區(qū)備份中出現(xiàn)此設備有此設備id的分區(qū)和 jobs.append( dict(path=job_path, device=local_dev['device'], nodes=nodes, #len(nodes)>len(part_nodes)-1的情況是當前節(jié)點已經不再是 當前partition所相應的設備了,有可能刪除了該設備 delete=len(nodes) > len(part_nodes) - 1, partition=partition)) except (ValueError, OSError): continue #打亂順序 random.shuffle(jobs) if self.handoffs_first: # Move the handoff parts to the front of the list #將handoff 節(jié)點移到jobs隊列的前邊 jobs.sort(key=lambda job: not job['delete']) self.job_count = len(jobs) return jobs
對于第二層for循環(huán),os.listdir(obj_path)列出objects目錄下的全部partion,創(chuàng)建object是在objects目錄下創(chuàng)建objects所映射的分區(qū)號的文件件,再在partion目錄下創(chuàng)建以object的hash值后三位為名稱的目錄,然后再在后綴目錄下創(chuàng)建以object的hash值為目錄名的目錄,object會存儲為以object上傳時間戳為名.data為文件后綴的文件。通過理解一致性hash算法可知,增加虛擬節(jié)點后每個設備會多個虛擬節(jié)點和其相應,假設一個設備相應的分區(qū)為n則,obj_path下子目錄數(shù)目會<=n,由于存入的全部文件并不一定都能映射到當前設備所相應的分區(qū)。for循環(huán)首先判讀obj_path下是否為文件,若是文件則刪除,若不是則獲得該分區(qū)號,依據(jù)分區(qū)號獲得該分區(qū)號所映射的三個備份設備,并將設備id和本地設備id不想等的增加到nodes中,將nodes、path等信息增加到jobs中,最后打亂jobs的順序,再將handoff 節(jié)點移到隊列前邊。返回jobs。再到replicate方法,首先我們看job[delete]為False的情況。當job[delete]為False會運行update方法,下邊看update方法的詳細實現(xiàn):
def update(self, job): """ High-level method that replicates a single partition. :param job: a dict containing info about the partition to be replicated """ self.replication_count += 1 self.logger.increment('partition.update.count.%s' % (job['device'],)) begin = time.time() try: #get_hashes 從hashes.pkl獲取hashes值并更新 獲取本地的hashes job[path] 為 job_path = join(obj_path, partition) local_hash為hashes.pkl中的反序列化回來的內容 hashed為改變的 hashed, local_hash = tpool_reraise( get_hashes, job['path'], do_listdir=(self.replication_count % 10) == 0, reclaim_age=self.reclaim_age) self.suffix_hash += hashed self.logger.update_stats('suffix.hashes', hashed) # attempts_left = len(job['nodes']) #此時的nodes為除去本節(jié)點外的全部節(jié)點 由于 job['nodes]不包括本地節(jié)點get_more_nodes(int(job['partition']))能獲得除去本partion所相應節(jié)點 外的其它全部節(jié)點 nodes = itertools.chain( job['nodes'], self.object_ring.get_more_nodes(int(job['partition']))) #此時attempts_left 為2 若果replica為3 while attempts_left > 0: # If this throws StopIterator it will be caught way below node = next(nodes) attempts_left -= 1 try: with Timeout(self.http_timeout): #REPLICARE方法 相應 sever里面的RELICATE方法 resp = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '', headers=self.headers).getresponse() if resp.status == HTTP_INSUFFICIENT_STORAGE: self.logger.error(_('%(ip)s/%(device)s responded' ' as unmounted'), node) attempts_left += 1 continue if resp.status != HTTP_OK: self.logger.error(_("Invalid response %(resp)s " "from %(ip)s"), {'resp': resp.status, 'ip': node['replication_ip']}) continue #remote_hash 為 請求 'REPLICATE 返回的 remote_hash = pickle.loads(resp.read()) del resp #找出本地后綴和遠程后綴不同的 suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] #假設沒有說明沒有變動,則繼續(xù)請求下一個節(jié)點 if not suffixes: continue #效果就是運行get_hashes方法 hashed, recalc_hash = tpool_reraise( get_hashes, job['path'], recalculate=suffixes, reclaim_age=self.reclaim_age) self.logger.update_stats('suffix.hashes', hashed) local_hash = recalc_hash #假如 local_hash 為 123 321 122 remote_hash 123 321 124 則 122為變化的 #文件路徑hash值后三位會不會反復 suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] #找到了不同的并知道其節(jié)點則將其同步到相應的節(jié)點,是基于推送模式的,故傳的數(shù)據(jù)是自己本地的數(shù)據(jù) self.sync(node, job, suffixes) #同步變化的 with Timeout(self.http_timeout): conn = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) except (Exception, Timeout): self.logger.exception(_("Error syncing with node: %s") % node) #后綴數(shù)量 寫日志時會用到 self.suffix_count += len(local_hash) except (Exception, Timeout): self.logger.exception(_("Error syncing partition")) finally: self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.update.timing', begin)
update方法,中首先是獲得本地文件里當前設備所相應hashes.pkl文件里每一個后綴所相應的hahes值,形如{'a83': '0db7b416c9808517a1bb2157af20b09b'},當中key為文件內容hash值的后三字節(jié),value為后綴目錄下全部子目錄下(即以文件內容的md5值為名字的目錄)全部.data文件的文件名稱字的md5值,能夠理解為全部文件名稱的md5值和。
hashed, local_hash = tpool_reraise( get_hashes, job['path'], do_listdir=(self.replication_count % 10) == 0, reclaim_age=self.reclaim_age)
如上代碼片段會運行get_hashes方法,并將后邊參數(shù)傳遞給get_hashes
def get_hashes(partition_dir, recalculate=None, do_listdir=False, reclaim_age=ONE_WEEK): """ Get a list of hashes for the suffix dir. do_listdir causes it to mistrust the hash cache for suffix existence at the (unexpectedly high) cost of a listdir. reclaim_age is just passed on to hash_suffix. :param partition_dir: absolute path of partition to get hashes for :param recalculate: 形如 recalculate=['a83'] list of suffixes(后綴,即 hash值的后綴 310即為后綴 root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects/94238# ls 310 hashes.pkl ) which should be recalculated(又一次計算) when got :param do_listdir: force existence check for all hashes in the partition(對partion中的hashe強行運行檢查) :param reclaim_age: age at which to remove tombstones :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) """
因沒有傳遞recalulate這個參數(shù)故僅僅有do_listdir為True時會強制運行又一次計算后綴文件下全部文件名稱字的hash值。文件名稱字是時間戳,時間戳變了說明文件有更新,故須要和遠程同步,檢查是否為同一個版本號,不是同一個版本號的須要把本地版本號傳遞給遠程server。
attempts_left = len(job['nodes']) #此時的nodes為除去本節(jié)點外的全部節(jié)點 由于 job['nodes]不包括本地節(jié)點get_more_nodes(int(job['partition']))能獲得除去本partion所相應節(jié)點 外的其它全部節(jié)點 nodes = itertools.chain( job['nodes'], self.object_ring.get_more_nodes(int(job['partition'])))
如上代碼片段,attempts_left為當前job相應的分區(qū)去掉本地節(jié)點的其它的備份節(jié)點的個數(shù)。得到attempts_left后,下邊接著更新了nodes,當中get_more_nodes方法會得到出去本分區(qū)所相應節(jié)點之外的其它全部節(jié)點的迭代器,全部nodes是除去本節(jié)點外全部節(jié)點的一個迭代器。
下邊就是while循環(huán),循環(huán)attempts_left次,
resp = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '', headers=self.headers).getresponse()
依據(jù)迭代得到的node請求,因副本節(jié)點首先被迭代到,故首先請求副本節(jié)點。若果成功請求讀取resp返回的內容,得到遠程設備同一個partion下的remote_hash
suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] #假設沒有說明沒有變動,則繼續(xù)請求下一個節(jié)點 if not suffixes: continue
對照兩個設備同樣partion下的hashes.pkl文件同樣key而value不同的key。suffixes則說明和遠程備份文件都是同一個版本號,繼續(xù)請求下一個備份。假設不為空,則須要處理,同一時候再一次得到自己hashes.pkl目錄中的內容,由于上一次請求時間中可能有其它的備份已經有新的更新推送到本server了。得到本地最新的hashes.pkl內容后再一次對照,得到不同的同樣分區(qū)下的不同后綴
運行同步:
self.sync(node, job, suffixes) #同步變化的
在同步變化時作者如今使用rsync方法,沒有使用ssync,只是已經留出了ssync的實現(xiàn),當ssync方法穩(wěn)定時就會把rsync替換掉。(敬請期待)
def sync(self, node, job, suffixes): # Just exists for doc anchor point """ Synchronize local suffix directories from a partition with a remote node. :param node: the "dev" entry for the remote node to sync with :param job: information about the partition being synced :param suffixes: a list of suffixes which need to be pushed :returns: boolean indicating success or failure """ # self.sync_method = getattr(self, conf.get('sync_method') or 'rsync') #配置沒有 sync_method方法 則運行類自己的rsync方法 return self.sync_method(node, job, suffixes)? sync_method方法從例如以下獲得,沒有配置則運行rsync方法
? self.sync_method = getattr(self, conf.get('sync_method') or 'rsync')
def rsync(self, node, job, suffixes): """ Uses rsync to implement the sync method. This was the first sync method in Swift. """ if not os.path.exists(job['path']): return False args = [ 'rsync', '--recursive', '--whole-file', '--human-readable', '--xattrs', '--itemize-changes', '--ignore-existing', '--timeout=%s' % self.rsync_io_timeout, '--contimeout=%s' % self.rsync_io_timeout, '--bwlimit=%s' % self.rsync_bwlimit, ] node_ip = rsync_ip(node['replication_ip']) #包括了ip信息 if self.vm_test_mode: rsync_module = '%s::object%s' % (node_ip, node['replication_port']) else: rsync_module = '%s::object' % node_ip had_any = False for suffix in suffixes: spath = join(job['path'], suffix) if os.path.exists(spath): args.append(spath) had_any = True if not had_any: return False args.append(join(rsync_module, node['device'], 'objects', job['partition'])) #args里面包括了通的全部信息 包括設備名稱,設備分區(qū) return self._rsync(args) == 0rsync方法將接受的參數(shù)都放到args中,然后運行_rsync方法。
def _rsync(self, args): """ Execute the rsync binary to replicate a partition. :returns: return code of rsync process. 0 is successful """ start_time = time.time() ret_val = None try: with Timeout(self.rsync_timeout): #此處即為同步操作了,推送模式 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) results = proc.stdout.read() ret_val = proc.wait() except Timeout: self.logger.error(_("Killing long-running rsync: %s"), str(args)) proc.kill() return 1 # failure response code total_time = time.time() - start_time for result in results.split('\n'): if result == '': continue if result.startswith('cd+'): continue if not ret_val: self.logger.info(result) else: self.logger.error(result) if ret_val: error_line = _('Bad rsync return code: %(ret)d <- %(args)s') % \ {'args': str(args), 'ret': ret_val} if self.rsync_error_log_line_length: error_line = error_line[:self.rsync_error_log_line_length] self.logger.error(error_line) elif results: self.logger.info( _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), {'src': args[-2], 'dst': args[-1], 'time': total_time}) else: self.logger.debug( _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), {'src': args[-2], 'dst': args[-1], 'time': total_time}) return ret_val當中例如以下代碼片段就是運行詳細的推送:
#此處即為同步操作了,推送模式 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
若job[delete]為True出現(xiàn)這樣的情況的可能就是,因增刪了設備,Ring 又一次調整,當前partion中的備份不再有此server的ID如partion號為45678的在rebalance前的對于的備份設備的id為[1,2,3],假設當前設備id為1,則又一次rebalance后當前partion相應的備份為[4,2,3],則就會出現(xiàn)job[delete]為True的情況,我們看其代碼詳細實現(xiàn):
def update_deleted(self, job): """ High-level method that replicates a single partition that doesn't belong on (不應放在 )this node. :param job: a dict containing info about the partition to be replicated """ #得到parition下相應的后綴 def tpool_get_suffixes(path): return [suff for suff in os.listdir(path) if len(suff) == 3 and isdir(join(path, suff))] self.replication_count += 1 self.logger.increment('partition.delete.count.%s' % (job['device'],)) begin = time.time() try: responses = [] suffixes = tpool.execute(tpool_get_suffixes, job['path']) if suffixes: for node in job['nodes']: success = self.sync(node, job, suffixes) #運行同步 if success: with Timeout(self.http_timeout): conn = http_connect( node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() responses.append(success) if self.handoff_delete: # delete handoff if we have had handoff_delete successes delete_handoff = len([resp for resp in responses if resp]) >= \ self.handoff_delete else: # delete handoff if all syncs were successful delete_handoff = len(responses) == len(job['nodes']) and \ all(responses) #suffixes為空或 請求的三個已經都響應成功后刪除本地partion下的文件 if not suffixes or delete_handoff: self.logger.info(_("Removing partition: %s"), job['path']) tpool.execute(shutil.rmtree, job['path'], ignore_errors=True) except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) finally: self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin)
至此 replicate操作就解說完成, 文中若有理解不合理之處,請指正,謝謝!
更多文章、技術交流、商務合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
