'''
Created on 12.03.2013
:author: Sebastian Illing
:contact: sebastian.illing@met.fu-berlin.de
Copyright (C) 2014 Sebastian Illing
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
'''
import multiprocessing
import abc
import os
from cdo import *
cdo = Cdo()
from string import lowercase, translate, maketrans
import shutil
#from evaluation_system.model.file import *
from evaluation_system.model.solr import SolrFindFiles
from tool_abstract import ToolAbstract, unwrap_self_f
class FileError(Exception): pass
class NoFilesFoundError(FileError): pass
class UnexpectedFileFormat(FileError): pass
class NotEnoughYearsInFile(FileError): pass
class WrongDrsStruct(FileError): pass
class LevelNotFound(FileError): pass
[docs]class FindFiles(ToolAbstract):
'''
Wrapper class to use solr_search with "python friendly" output --> lists or dicts
'''
def __init__(self, tmpDir = '/', observation='', level=None, output='/'):
'''
Constructor
:param tmpDir: cache folder
:param observation: folder of "special" observation data
'''
self.tmpDir = self.checkPath(tmpDir)
self.output = self.checkPath(output)
self.observation = observation
self.level = level
super(FindFiles,self).__init__(output_tmp=tmpDir, output_dir=output)
[docs] def getFiles(self,year,fileType, model, variable, time_frequency='mon', product='*', ensemblemembers='*', institute='*', exp_prefix='d*', maxleadtime=10, minLeadtime=1):
'''
Method to get model files with solr_search.
:param year: decadal starting year
:param fileType: baseline1, cmip5, historical or...
:param model: model name i.e. MPI-ESM-LR
:param variable: CMOR variable
:param time_frequency: monthly, yearly, daily and so on
:return: list with all ensemblemembers members found
'''
#TODO: BUGFIX for minLeadyear
minLeadtime=1
output = list()
decStr = exp_prefix+str(year)
project = fileType.lower()
tmpList = list()
for fn in SolrFindFiles.search(experiment=decStr, latest_version=True, product=product, institute=institute,
variable=variable, time_frequency=time_frequency, model=model, project=project):
if(str(fn).split('.')[-1] == 'nc'):
tmpList.append(str(fn))
try:
test = tmpList[0]
except:
import time
time.sleep(5) # delays for 5 seconds
for fn in SolrFindFiles.search(experiment=decStr, latest_version=True, product=product, institute=institute,
variable=variable, time_frequency=time_frequency, model=model, project=project):
print str(fn)
if(str(fn).split('.')[-1] == 'nc'):
tmpList.append(str(fn))
try:
test = tmpList[0]
except:
if exp_prefix.find('*') != -1:
raise NoFilesFoundError, "Couldn't find files for %s in %s %s %s experiment: %s" % (variable, fileType, model, product, year)
#OK we can't find files, now try one last time using only the exp_prefix, i.e. "historical"
for fn in SolrFindFiles.search(experiment=exp_prefix, latest_version=True, product=product, institute=institute,
variable=variable, time_frequency=time_frequency, model=model, project=project):
if(str(fn).split('.')[-1] == 'nc'):
tmpList.append(str(fn))
try:
test = tmpList[0]
except:
#OK, there are no Files...
raise NoFilesFoundError, "Couldn't find files for %s in %s %s %s experiment: %s" % (variable, fileType, model, product, year)
#select only wanted ensemblemembers
if type(ensemblemembers) == list and ensemblemembers[0] != '*':
ensList = list()
for ens in ensemblemembers:
onlyfiles = [f for f in tmpList if f.find(ens) != -1]
if len(onlyfiles) > 0:
ensList.append(onlyfiles[0])
tmpList = ensList
for fn in tmpList:
years = cdo.showyear(input=str(fn))[0]
yearList = years.split(' ')
#print years
#print fn
if str(year+minLeadtime) not in yearList or str(year+maxleadtime) not in yearList:
print year
raise NotEnoughYearsInFile, "1Not enough years in %s %s %s for starting year %s" % (fileType, model, product, year)
if(len(years.split(' ')) > maxleadtime):
selStr = ','.join(map(str,range(year+minLeadtime,year+1+maxleadtime)))
fileName = str(fn).split('/')[-1]
output.append(cdo.selyear(selStr, input=str(fn), output=self.tmpDir+fileName+'_'+str(year+minLeadtime)+'-'+str(year+maxleadtime)))
else:
output.append(str(fn))
if len(cdo.showyear(input=output[-1])[0].split(' ')) < maxleadtime-minLeadtime:
raise NotEnoughYearsInFile, "2Not enough years in %s %s %s for starting year %s" % (fileType, model, product, year)
if(not output or not isinstance(output, list)):
raise NoFilesFoundError, "Couldn't find files for %s in %s %s %s for starting year %s" % (variable, fileType, model, product, year)
#check for curvilinear grid
if(not hasattr(self,'curvilinearGrid') or self.curvilinearGrid == True):
output = self.checkGrid(output, model)
#user wants to select levels
if self.level is not None:
return self.selectLevel(output)
else:
return output
[docs] def getReanalysis(self,year,fileType, experiment, variable, filePath='', time_frequency='mon', maxLeadtime=10, observation_ensemble='*', minLeadtime=1):
'''
Wrapper method to find reanalysis file with solr_search.
:param year: startyear
:param fileType: reanalysis or observation
:param experiment: i.e. NCEP, HadCrut or MERRA
:param variable: CMOR Variable
:param time_frequency: monthly, yearly, daily and so on
:return: "decadal" file with observations
'''
#TODO: BUGFIX for minLeadyear
minLeadtime=1
reanFiles = list()
if((experiment == 'HadCrut') and (variable == 'tas')):
return self.getObsFiles(variable, year, maxLeadtime=maxLeadtime)
#to use your own reanalysis data
if os.path.isfile(self.observation):
return self.getObsFiles(variable, year, maxLeadtime=maxLeadtime, minLeadtime=minLeadtime)
if(not hasattr(self,'mergedReanFile')):
#Observation or reanalysis?
facet = SolrFindFiles.facets(facets='data_type', experiment=experiment, variable=variable,
time_frequency=time_frequency)
try:
if facet['data_type'][0] == 'reanalysis':
searchList = SolrFindFiles.search(data_type=['reanalysis','observations'], experiment=experiment, variable=variable,
time_frequency=time_frequency, ensemble=observation_ensemble)
else:
searchList = SolrFindFiles.search(data_type=['reanalysis','observations'], experiment=experiment, variable=variable,
time_frequency=time_frequency, data_structure='grid')
except IndexError:
raise NoFilesFoundError, "Couldn't find files for %s in %s" % (variable, experiment)
for fn in searchList:
yearTmp = cdo.showyear(input=str(fn))[0]
fname = str(fn).split('/')[-1]
#reanFiles.append(cdo.yearmean(input=str(fn), output=self.tmpDir+fname+'_YEARMEAN'))
reanFiles.append(str(fn))
#print reanFiles
#if more than one year in File we break the loop and expect it to be a observationsfile
if(len(yearTmp.split(' ')) > 1 ):
break
if(len(reanFiles) == 0):
raise NoFilesFoundError, "Couldn't find files for %s in %s" % (variable, experiment)
mergedFile = cdo.mergetime(input=' '.join(reanFiles), output=self.tmpDir+'mergedREAN_YEARMEAN')
tmpMean = cdo.timmean(input=mergedFile)
self.mergedReanFile = cdo.sub(input=' '.join([mergedFile, tmpMean]), output=self.tmpDir+'reananomalies.nc')
#self.mergedReanFile = cdo.detrend(input=self.tmpDir+'reananomalies.nc', output=self.tmpDir+'reananomalies.nc_notrend')
#print self.mergedReanFile
if self.level is not None:
self.mergedReanFile = self._selectLevel(self.mergedReanFile)
#print self.mergedReanFile
if(not hasattr(self,'mergedReanFile')):
raise NoFilesFoundError, "Couldn't find files for %s in %s" % (variable, experiment)
years = cdo.showyear(input=self.mergedReanFile)[0]
if((years.find(str(year+minLeadtime)) != -1) and (years.find(str(year+maxLeadtime)) != -1)):
#create tmp decadal file
fileStr = ','.join(map(str,range(year+minLeadtime,year+maxLeadtime+1)))
tmp= cdo.selyear(fileStr, input=self.mergedReanFile, output=self.tmpDir+'reanalysis_'+experiment+str(year+1)+'-'+str(year+maxLeadtime)+'.nc')
return tmp
else:
raise NotEnoughYearsInFile, "%s-%s are not part of %s reanalysis" % (year+minLeadtime, year+maxLeadtime, experiment)
[docs] def getObsFiles(self, variable, year, maxLeadtime=10, minLeadtime=1):
'''
Get the observation files from an specified folder
:param variable:
:param year: start year of decadal
:return tmp file with maxLeadtime years of observation
'''
if not os.path.isfile(self.observation):
raise NoFilesFoundError, '%s does not exist.' % (self.observation)
years = cdo.showyear(input=self.observation)[0]
if(years.find(str(year+minLeadtime)) != -1) and (years.find(str(year+maxLeadtime)) != -1):
#create tmp decadal file
fileStr = ','.join(map(str,range(year+minLeadtime,year+maxLeadtime+1)))
tmpFile = cdo.selyear(fileStr, input=self.observation,
output=self.tmpDir+self.getFilename(self.observation)+'_'+str(year+minLeadtime)+'-'+str(year+maxLeadtime))
if self.level is not None:
return self._selectLevel(tmpFile)
else:
return tmpFile
else:
if years.find(str(year+minLeadtime)) == -1:
raise FileError, 'Can\'t find data for year %s in observational data! \n%s' % (year+minLeadtime, self.observation)
if years.find(str(year+maxLeadtime)) == -1:
raise FileError, 'Can\'t find data for year %s in observational data! \n%s' % (year+maxLeadtime, self.observation)
[docs] def checkGrid(self,fList,model):
'''
Checks if the file has a curvlinear grid. And remaps to lonlat grid after
'''
gridInfo = cdo.griddes(input=fList[0])
gridType = gridInfo[3]
if gridType.find('curvilinear') == -1:
self.curvilinearGrid = False
return fList
else:
self.curvilinearGrid = True
lon = self.__str2int(gridInfo[11])
lat = self.__str2int(gridInfo[12])
#single process, becaus multiproccessing caused memory problems
result = list()
for fn in fList:
result.append(self._ceckGrid(fn, model, lon, lat))
return result
def _ceckGrid(self, f, model, lon, lat):
if model.find('MPI-ESM') != -1:
lon=lon-1
sel_str = '2,%s,1,%s' % (lon,lat)
f = cdo.selindexbox(sel_str, input=f, output=self.tmpDir+self.getFilename(f)+'_sel_box')
grid_str = 'r%sx%s' % (lon,lat)
return cdo.remapbil(grid_str, input=f, output=self.tmpDir+self.getFilename(f)+'_lonlat')
[docs] def getFilename(self, fn):
'''
Helper to extract a filename out of a path
:deprecated !!!
:param fn
:return filename
'''
return self.extractFilename(fn)
def __str2int(self, str):
'''
Filter digits and convert str to int
:param str:
:return int
'''
all = maketrans('', '')
nodigs = all.translate(all, string.digits)
return int(str.translate(all, nodigs))
def getAllFilesInFolder(self, folder):
from os import listdir
from os.path import isfile, join
onlyfiles = [ join(folder,f) for f in listdir(folder) if isfile(join(folder,f)) ]
return onlyfiles
def getAllFilesInSubfolders(self, folder):
file_list = list()
for path, subdirs, files in os.walk(folder):
for name in files:
file_list.append(os.path.join(path, name))
return file_list
def _selectLevel(self, files):
try:
return cdo.sellevel(self.level, input=files, output=self.tmpDir+self.getFilename(files)+'_'+str(self.level)+'.nc')
except:
raise LevelNotFound, 'Level %s not found in %s' %(self.level, files)
[docs] def selectLevel(self,fileList):
'''
Select a specific level from the files
'''
#multi processing
num_proc = len(fileList)
pool = multiprocessing.Pool(processes=min([num_proc,24]))
poolArgs = zip([self]*num_proc, fileList, ['_selectLevel']*num_proc)
result = pool.map(unwrap_self_f, poolArgs)
pool.terminate()
pool.close()
return result