最近在測試HCatalog,由于Hcatalog本身就是一個獨立JAR包,雖然它也可以運行service,但是其實這個service就是metastore thrift server,我們在寫基于Hcatalog的mapreduce job時候只要把hcatalog JAR包和對應的hive-site.xml文件加入libjars和HADOOP_CLASSPATH中就可以了。 不過在測試的時候還是遇到了一些問題,hive metastore server在運行了一段時間后會拋如下錯誤
?
2013-06-19 10:35:51,718 ERROR server.TThreadPoolServer (TThreadPoolServer.java:run(182)) - Error occurred during processing of message. javax.jdo.JDOFatalUserException: Persistence Manager has been closed at org.datanucleus.jdo.JDOPersistenceManager.assertIsOpen(JDOPersistenceManager.java:2124) at org.datanucleus.jdo.JDOPersistenceManager.currentTransaction(JDOPersistenceManager.java:315) at org.apache.hadoop.hive.metastore.ObjectStore.openTransaction(ObjectStore.java:294) at org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:732) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hive.metastore.RetryingRawStore.invoke(RetryingRawStore.java:111) at com.sun.proxy.$Proxy5.getTable(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:982) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5017) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5005) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:32) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:34)
?
?
其中PersistenceManager負責控制一組持久化對象包括創建持久化對象和查詢對象,它是ObjectStore的一個實例變量,每個ObjectStore擁有一個pm,RawStore是metastore邏輯層和物理底層元數據庫(比如derby)交互的接口類,ObjectStore是RawStore的默認實現類。Hive Metastore Server啟動的時候會指定一個TProcessor,包裝了一個HMSHandler,內部有一個ThreadLocal<RawStore> threadLocalMS實例變量,每個thread維護一個RawStore
?
private final ThreadLocal<RawStore> threadLocalMS = new ThreadLocal<RawStore>() { @Override protected synchronized RawStore initialValue() { return null; } };
每一個從hive metastore client過來的請求都會從線程池中分配一個
WorkerProcess來處理,在HMSHandler中每一個方法都會通過getMS()獲取rawstore instance來做具體操作
?
?
public RawStore getMS() throws MetaException { RawStore ms = threadLocalMS.get(); if (ms == null) { ms = newRawStore(); threadLocalMS.set(ms); ms = threadLocalMS.get(); } return ms; }
看得出來RawStore是延遲加載,初始化后綁定到threadlocal變量中可以為以后復用
?
?
private RawStore newRawStore() throws MetaException { LOG.info(addPrefix("Opening raw store with implemenation class:" + rawStoreClassName)); Configuration conf = getConf(); return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get()); }
RawStore使用了動態代理模式(繼承
InvocationHandler接口
),內部實現了invoke函數,通過method.invoke()執行真正的邏輯,這樣的好處是可以在
method.invoke()上下文中添加自己其他的邏輯,RetryingRawStore就是在通過捕捉invoke函數拋出的異常,來達到重試的效果。由于使用reflection機制,異常是wrap在
InvocationTargetException中的,
不過在hive 0.9中竟然在捕捉到
此異常后直接throw出來了,而不是retry,明顯不對啊。我對它修改了下,拿出wrap的target exception,判斷是不是instance of jdoexception的,再做相應的處理
?
?
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; boolean gotNewConnectUrl = false; boolean reloadConf = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.METASTOREFORCERELOADCONF); boolean reloadConfOnJdoException = false; if (reloadConf) { updateConnectionURL(getConf(), null); } int retryCount = 0; Exception caughtException = null; while (true) { try { if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException) { initMS(); } ret = method.invoke(base, args); break; } catch (javax.jdo.JDOException e) { caughtException = (javax.jdo.JDOException) e.getCause(); } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getTargetException(); if (t instanceof JDOException){ caughtException = (JDOException) e.getTargetException(); reloadConfOnJdoException = true; LOG.error("rawstore jdoexception:" + caughtException.toString()); }else { throw e.getCause(); } } if (retryCount >= retryLimit) { throw caughtException; } assert (retryInterval >= 0); retryCount++; LOG.error( String.format( "JDO datastore error. Retrying metastore command " + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); Thread.sleep(retryInterval); // If we have a connection error, the JDO connection URL hook might // provide us with a new URL to access the datastore. String lastUrl = getConnectionURL(getConf()); gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); } return ret; }
初始化RawStore有兩種方式,一種是在
RetryingRawStore的構造函數中調用"
this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf);
" ?因為ObjectStore實現了Configurable,在newInstance方法中主動調用里面的setConf(conf)方法初始化RawStore,還有一種情況是在捕捉到異常后retry,也會調用
base.setConf(getConf());
?
?
private void initMS() { base.setConf(getConf()); }
?
ObjectStore的setConf方法中,先將PersistenceManagerFactory鎖住,pm close掉,設置成NULL,再初始化pm
?
public void setConf(Configuration conf) { // Although an instance of ObjectStore is accessed by one thread, there may // be many threads with ObjectStore instances. So the static variables // pmf and prop need to be protected with locks. pmfPropLock.lock(); try { isInitialized = false; hiveConf = conf; Properties propsFromConf = getDataSourceProps(conf); boolean propsChanged = !propsFromConf.equals(prop); if (propsChanged) { pmf = null; prop = null; } assert(!isActiveTransaction()); shutdown(); // Always want to re-create pm as we don't know if it were created by the // most recent instance of the pmf pm = null; openTrasactionCalls = 0; currentTransaction = null; transactionStatus = TXN_STATUS.NO_STATE; initialize(propsFromConf); if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); } else { LOG.info("Initialized ObjectStore"); } } finally { pmfPropLock.unlock(); } }
?
private void initialize(Properties dsProps) { LOG.info("ObjectStore, initialize called"); prop = dsProps; pm = getPersistenceManager(); isInitialized = pm != null; return; }
回到一開始報錯的那段信息,怎么會Persistence Manager會被關閉呢,仔細排查后才發現是由于HCatalog使用HiveMetastoreClient用完后主動調用了close方法,而一般Hive里面內部不會調這個方法.
?
HiveMetaStoreClient.java
?
public void close() { isConnected = false; try { if (null != client) { client.shutdown(); } } catch (TException e) { LOG.error("Unable to shutdown local metastore client", e); } // Transport would have got closed via client.shutdown(), so we dont need this, but // just in case, we make this call. if ((transport != null) && transport.isOpen()) { transport.close(); } }
對應server端HMSHandler中的shutdown方法
@Override public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; } logInfo("Metastore shutdown complete."); }
ObjectStore的shutdown方法
?
?
public void shutdown() { if (pm != null) { pm.close(); } }
?
?
?
我們看到shutdown方法里面只是把當前thread的ObjectStore拿出來后,做了一個ObjectStore shutdown方法,把pm關閉了。但是并沒有把ObjectStore銷毀掉,它還是存在于threadLocalMS中,下次還是會被拿出來,下一次這個thread服務于另外一個請求的時候又會被get出ObjectSture來,但是由于里面的pm已經close掉了所以肯定拋異常。正確的做法是應該加上threadLocalMS.remove()或者threadLocalMS.set(null),主動將其從ThreadLocalMap中刪除。
修改后的 shutdown方法
?
public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; threadLocalMS.remove(); } logInfo("Metastore shutdown complete."); }
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
