hadoop 配置文件处理
[toc]
Configuration 类
Configuration 作用
1.加载配置文件
2.可以加载多个配置文件
3.支持动态修改配置
4.快速保存配置文件
构造方法
public Configuration();
public Configuration(boolean loadDefaults);
public Configuration(Configuration other);
主要成员变量
private boolean quietmode = true;
private ArrayList<Resource> resources = new ArrayList<Resource>();
private Set<String> finalParameters = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private Properties properties;
private Properties overlay;
private ClassLoader classLoader;
private Map<String, String[]> updatingResource;
private static AtomicReference<DeprecationContext> deprecationContext ;
加载资源方法
public void addResource(Configuration conf)
public void addResource(InputStream in)
public void addResource(InputStream in, String name)
public void addResource(Path file)
public void addResource(String name)
public void addResource(URL url)
addSource方法的具体实现
public synchronized void reloadConfiguration() {
properties = null;
finalParameters.clear();
}
private synchronized void addResourceObject(Resource resource) {
resources.add(resource);
reloadConfiguration();
}
set方法
作用: 设置修改或添加一个键-值对
public void set(String name, String value, String source) {
Preconditions.checkArgument(
name != null,
"Property name must not be null");
Preconditions.checkArgument(
value != null,
"The value of property " + name + " must not be null");
name = name.trim();
DeprecationContext deprecations = deprecationContext.get();
if (deprecations.getDeprecatedKeyMap().isEmpty()) {
getProps();
}
getOverlay().setProperty(name, value);
getProps().setProperty(name, value);
String newSource = (source == null ? "programatically" : source);
if (!isDeprecated(name)) {
updatingResource.put(name, new String[] {newSource});
String[] altNames = getAlternativeNames(name);
if(altNames != null) {
for(String n: altNames) {
if(!n.equals(name)) {
getOverlay().setProperty(n, value);
getProps().setProperty(n, value);
updatingResource.put(n, new String[] {newSource});
}
}
}
}
else {
String[] names = handleDeprecation(deprecationContext.get(), name);
String altSource = "because " + name + " is deprecated";
for(String n : names) {
getOverlay().setProperty(n, value);
getProps().setProperty(n, value);
updatingResource.put(n, new String[] {altSource});
}
}
}
get方法
public String get(String name) {
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = substituteVars(getProps().getProperty(n));
}
return result;
}
其中substituteVars()方法 会根据传入的值,自动扩展属性,也就是说,value中包含了${key}这种格式的变量时,变量会被替换成对应的值。
解析配置文件方法 getProps()
protected synchronized Properties getProps() {
if (properties == null) {
properties = new Properties();
Map<String, String[]> backup =
new ConcurrentHashMap<String, String[]>(updatingResource);
loadResources(properties, resources, quietmode);
if (overlay != null) {
properties.putAll(overlay);
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
String key = (String)item.getKey();
String[] source = backup.get(key);
if(source != null) {
updatingResource.put(key, source);
}
}
}
}
return properties;
}
private void loadResources(Properties properties,
ArrayList<Resource> resources,
boolean quiet) {
if(loadDefaults) {
for (String resource : defaultResources) {
loadResource(properties, new Resource(resource), quiet);
}
if(getResource("hadoop-site.xml")!=null) {
loadResource(properties, new Resource("hadoop-site.xml"), quiet);
}
}
for (int i = 0; i < resources.size(); i++) {
Resource ret = loadResource(properties, resources.get(i), quiet);
if (ret != null) {
resources.set(i, ret);
}
}
}
private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
String name = UNKNOWN_RESOURCE;
try {
Object resource = wrapper.getResource();
name = wrapper.getName();
DocumentBuilderFactory docBuilderFactory
= DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
docBuilderFactory.setNamespaceAware(true);
try {
docBuilderFactory.setXIncludeAware(true);
} catch (UnsupportedOperationException e) {
LOG.error("Failed to set setXIncludeAware(true) for parser "
+ docBuilderFactory
+ ":" + e,
e);
}
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = null;
Element root = null;
boolean returnCachedProperties = false;
if (resource instanceof URL) {
doc = parse(builder, (URL)resource);
} else if (resource instanceof String) {
URL url = getResource((String)resource);
doc = parse(builder, url);
} else if (resource instanceof Path) {
File file = new File(((Path)resource).toUri().getPath())
.getAbsoluteFile();
if (file.exists()) {
if (!quiet) {
LOG.debug("parsing File " + file);
}
doc = parse(builder, new BufferedInputStream(
new FileInputStream(file)), ((Path)resource).toString());
}
} else if (resource instanceof InputStream) {
doc = parse(builder, (InputStream) resource, null);
returnCachedProperties = true;
} else if (resource instanceof Properties) {
overlay(properties, (Properties)resource);
} else if (resource instanceof Element) {
root = (Element)resource;
}
if (root == null) {
if (doc == null) {
if (quiet) {
return null;
}
throw new RuntimeException(resource + " not found");
}
root = doc.getDocumentElement();
}
Properties toAddTo = properties;
if(returnCachedProperties) {
toAddTo = new Properties();
}
if (!"configuration".equals(root.getTagName()))
LOG.fatal("bad conf file: top-level element not <configuration>");
NodeList props = root.getChildNodes();
DeprecationContext deprecations = deprecationContext.get();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element))
continue;
Element prop = (Element)propNode;
if ("configuration".equals(prop.getTagName())) {
loadResource(toAddTo, new Resource(prop, name), quiet);
continue;
}
if (!"property".equals(prop.getTagName()))
LOG.warn("bad conf file: element not <property>");
NodeList fields = prop.getChildNodes();
String attr = null;
String value = null;
boolean finalParameter = false;
LinkedList<String> source = new LinkedList<String>();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element)fieldNode;
if ("name".equals(field.getTagName()) && field.hasChildNodes())
attr = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData().trim());
if ("value".equals(field.getTagName()) && field.hasChildNodes())
value = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData());
if ("final".equals(field.getTagName()) && field.hasChildNodes())
finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
if ("source".equals(field.getTagName()) && field.hasChildNodes())
source.add(StringInterner.weakIntern(
((Text)field.getFirstChild()).getData()));
}
source.add(name);
if (attr != null) {
if (deprecations.getDeprecatedKeyMap().containsKey(attr)) {
DeprecatedKeyInfo keyInfo =
deprecations.getDeprecatedKeyMap().get(attr);
keyInfo.clearAccessed();
for (String key:keyInfo.newKeys) {
loadProperty(toAddTo, name, key, value, finalParameter,
source.toArray(new String[source.size()]));
}
}
else {
loadProperty(toAddTo, name, attr, value, finalParameter,
source.toArray(new String[source.size()]));
}
}
}
if (returnCachedProperties) {
overlay(properties, toAddTo);
return new Resource(toAddTo, name);
}
return null;
} catch (IOException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (DOMException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (SAXException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (ParserConfigurationException e) {
LOG.fatal("error parsing conf " + name , e);
throw new RuntimeException(e);
}
}
Configurable 接口
主要有两个方法 getConf(),setConf()
主要使用方法: 和org.apahce.hadoop.util.ReflectionUtils的newInstance()配合使用。
public static <T> T newInstance(Class<T> theClass, Configuration conf) {
T result;
try {
Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
if (meth == null) {
meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
meth.setAccessible(true);
CONSTRUCTOR_CACHE.put(theClass, meth);
}
result = meth.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
setConf(result, conf);
return result;
}
public static void setConf(Object theObject, Configuration conf) {
if (conf != null) {
if (theObject instanceof Configurable) {
((Configurable) theObject).setConf(conf);
}
setJobConf(theObject, conf);
}
}
ConfServlet
主要作用: 处理来自页面的配置文件请求