Code for Open Channel Energy Equation Based Algorithm
From GeoMod
Basic logic of open channel flow energy equation based algorithm:
Data for the project: Image:Umned3 2.txt
Code for class water10, open channel flow energy equation based algorithm to modeling the flood inundation:
from Numeric import *
from visual import *
from string import *
from random import uniform
from visual.graph import * # import graphing features
from water_tile import *
'''Some thoughts and codes borrowed from Dr. Urbano's raster_map class. '''
'''This file is based on open channel flow energy equation.'''
class water:
def __init__(self, rasterfile, imin=0, imax=-99999, jmin=0, jmax=-99999,preciptation = 0.00000353, time = 4300,t_step =0.2): #datarr, xmin, ymax, dx, nrows, ncols):
## '''a 'heavy rain' event is defined for this study as one that produces at least six inches in twelve hours or less
## at a single station, which is about the maximum amount of rain expected once every 100 years at a particular site
## in the Memphis CWA (Weather Bureau, 1961).
## 6 inches (152.399995 mm) /12 hr (12*3600 = 43,200 sec): 0.00353 mm/sc or 0.00000353 m/sc 0.2117 mm /minutes,
## 12.712 mm/hr '''
infile = open(rasterfile,"r")
ct = 0
line = infile.readline()
while line:
ct += 1
l = split(strip(line))
if ct == 1:
ncols = int(l[1])
if ct == 2:
nrows = int(l[1])
if ct == 3:
xmin = float(l[1])
if ct == 4:
ymin = float(l[1])
if ct == 5:
dx = float(l[1])
if ct == 7:
nodata = l[1]
if ct >= 7:
if ct == 7:
topo = zeros((nrows*ncols,), Float)
t_ct = -1
for i in l:
t_ct += 1
if i == nodata: #set no-data values to zero
topo[t_ct] = 0.0
else:
topo[t_ct] = float(i)-80 # standardization
line = infile.readline()
infile.close
ymax = ymin + (dx * (nrows-1)) # top left corner
'''datarr is a 2d array '''
self.imin = imin
self.imax = imax
self.jmin = jmin
self.jmax = jmax
self.xmin = xmin
self.ymax = ymax
self.dx = dx
self.nrows = nrows
self.ncols = ncols
self.topo = resize(topo, (nrows, ncols))
self.elev = zeros((nrows, ncols,), Float)
self.o_elev = zeros((nrows, ncols,), Float)
for i in range(0,ncols-1):
for j in range(0,ncols-1):
self.elev[i,j] = self.topo[i,j]
self.o_elev[i,j] = self.elev[i,j]
self.Q = zeros((nrows, ncols,), Float)
self.o_Q = zeros((nrows, ncols,), Float)
self.depth = zeros((nrows, ncols,), Float)
self.P = preciptation
self.t = time
self.t_step = t_step
def get_lowest(self,imin,imax,jmin,jmax):
'''To get the lowest point in topo[imin:imax,jmin:jmax] and return the # of rows and columns '''
min_v = self.topo[imin,jmin]
r = imin
c = jmin
for i in range(imin,imax):
for j in range(jmin,jmax):
if self.topo[i,j] < min_v:
min_v = self.topo[i,j]
r = i
c = j
return r,c,min_v
#### BASIC LOGIC OF OPEN CHANEEL FLOW ENERGY EQUATION ALGORITHM:
## Energy equation: y1 + v1**/(2g) = y2 + v2**/(2g) ==> if v1 =0, v2 = sqrt(2g(y1-y2)),
## the average velocity from y1 to y2: v= (0 + v2)/2 =v2/2
## the total time from y1 to y2: t= dx/v = 2dx/v2 , the ratio of runoff = 1/t = v2/2dx
## function d_N,d_S,d_W,d_E, and d_Q are defined to calculate water change based on energy equation
def d_N(self,i,j):
'''Determine the water communication with the adjacent cell on the North(i-1), positive: inflow; negative: outflow'''
d_N = 0
if i-1 >=self.imin+1: #Do not calculate the boundary cell
if self.o_elev[i-1,j] - self.o_elev[i,j]>0 and self.o_Q[i-1,j]>0: # No water or no elevation difference,no flow
d_N = sqrt(2*9.81* (self.o_elev[i-1,j]-self.o_elev[i,j]))/(2*self.dx)*self.o_Q[i-1,j] #inflow
if d_N > self.o_Q[i-1,j]: # the inflow can not exceed total amount availabe
d_N = self.o_Q[i-1,j]
if self.o_elev[i-1,j] - self.o_elev[i,j] < 0 and self.o_Q[i,j] >0: # elevation of water in adjacent cell lower, loose water
d_N = -sqrt(2*9.81* (self.o_elev[i,j]-self.o_elev[i-1,j]))/(2*self.dx)*self.o_Q[i,j] #outflow
if d_N < -self.o_Q[i,j]: # the outflow can not exceed total amount availabe
d_N = -self.o_Q[i,j]
return d_N
def d_S(self,i,j):
'''Determine the water communication with the adjacent cell on the South(i+1), positive: inflow; negative: outflow'''
d_S = 0
if i+1 <= self.imax-1: #Do not calculate the boundary cell
if self.o_elev[i+1,j] - self.o_elev[i,j]>0 and self.o_Q[i+1,j]>0: # No water or no elevation difference,no flow
d_S = sqrt(2*9.81* (self.o_elev[i+1,j]-self.o_elev[i,j]))/(2*self.dx)*self.o_Q[i+1,j] #inflow
if d_S > self.o_Q[i+1,j]: # the inflow can not exceed total amount availabe
d_S = self.o_Q[i+1,j]
if self.o_elev[i+1,j] - self.o_elev[i,j] < 0 and self.o_Q[i,j] >0: # elevation of water in adjacent cell lower, loose water
d_S = -sqrt(2*9.81* (self.o_elev[i,j]-self.o_elev[i+1,j]))/(2*self.dx)*self.o_Q[i,j] #outflow
if d_S < -self.o_Q[i,j]: # the outflow can not exceed total amount availabe
d_S = -self.o_Q[i,j]
return d_S
def d_W(self,i,j):
'''Determine the water communication with the adjacent cell on the West(j-1), positive: inflow; negative: outflow'''
d_W = 0
if j-1 >=self.jmin+1: #Do not calculate the boundary cell
if self.o_elev[i,j-1] - self.o_elev[i,j]>0 and self.o_Q[i,j-1]>0: # No water or no elevation difference,no flow
d_W = sqrt(2*9.81* (self.o_elev[i,j-1]-self.o_elev[i,j]))/(2*self.dx)*self.o_Q[i,j-1] #inflow
if d_W > self.o_Q[i,j-1]: # the inflow can not exceed total amount availabe
d_W = self.o_Q[i,j-1]
if self.o_elev[i,j-1] - self.o_elev[i,j] < 0 and self.o_Q[i,j] >0: # elevation of water in adjacent cell lower, loose water
d_W = -sqrt(2*9.81* (self.o_elev[i,j]-self.o_elev[i,j-1]))/(2*self.dx)*self.o_Q[i,j] #outflow
if d_W < -self.o_Q[i,j]: # the outflow can not exceed total amount availabe
d_W = -self.o_Q[i,j]
return d_W
def d_E(self,i,j):
'''Determine the water communication with the adjacent cell on the East(j-1), positive: inflow; negative: outflow'''
d_E = 0
if j+1 <= self.jmax-1: #Do not calculate the boundary cell
if self.o_elev[i,j+1] - self.o_elev[i,j]>0 and self.o_Q[i,j+1]>0: # No water or no elevation difference,no flow
d_E = sqrt(2*9.81* (self.o_elev[i,j+1]-self.o_elev[i,j]))/(2*self.dx)*self.o_Q[i,j+1] #inflow
if d_E > self.o_Q[i,j+1]: # the inflow can not exceed total amount availabe
d_E = self.o_Q[i,j+1]
if self.o_elev[i,j+1] - self.o_elev[i,j] < 0 and self.o_Q[i,j] >0: # elevation of water in adjacent cell lower, loose water
d_E = -sqrt(2*9.81* (self.o_elev[i,j]-self.o_elev[i,j+1]))/(2*self.dx)*self.o_Q[i,j] #outflow
if d_E < -self.o_Q[i,j]: #the outflow can not exceed total amount availabe
d_E = -self.o_Q[i,j]
return d_E
def d_Q(self,i,j):
'''calcuate the change of Q in every time step'''
d_Q = (self.d_N(i,j) + self.d_S(i,j) + self.d_W(i,j) + self.d_E(i,j))*self.t_step
if d_Q < -self.Q[i,j]: # outflow can not exceed available amount
d_Q = -self.Q[i,j]
d_Q += self.P*pow(self.dx,2)*self.t_step #plus constant preciptation
return d_Q
def draw_flood(self,scale = 1.0, center = 0):
''' draw the flood water in the specific area of [imin:imax, jmin:jmax]'''
#create graph display
gdisp = gdisplay(x=0, y=0, width=480, height=350,title = "Depth vs. Time - Energy Equation Based",
ytitle="depth(m)cyan:[r,c],orange:[r,c-1],red:[r,c-2])", xtitle="time(sc)")
Tcurve = gcurve(color=color.cyan) # a connected curve object
Tcurve2 = gcurve(color=color.orange)
Tcurve3 = gcurve(color=color.red)
self.scale = scale
default_val = -99999
if self.imax == default_val:
self.imax = self.nrows
if self.jmax == default_val:
self.jmax = self.ncols
tdata = self.depth[self.imin:self.imax,self.jmin:self.jmax]
#draw in topography
tmax = max(max(tdata)) #find the maximum of depth
tmin = min(min(tdata)) #find the minimum of depth
colval = 0
flood = []
time = 0
print self.topo[self.imin:self.imax, self.jmin:self.jmax]
r,c,l = self.get_lowest(self.imin,self.imax,self.jmin,self.jmax) #get the lowest point and its topo elevation in drawing area
print "Lowest point:"
print r,c,l
while time < self.t:
## if time > 1000:
## self.P=0
for i in range(self.imin,self.imax):
for j in range(self.jmin,self.jmax):
dQ = self.d_Q(i,j) #calculate water change
self.Q[i,j] = self.o_Q[i,j] +dQ #update water volume
tp = self.topo[i,j]
self.elev[i,j] = self.o_elev[i,j] + dQ/(2*pow(self.dx,2)) #calculate the water elevation
if self.elev[i,j] < tp: #make sure water elevation can not be lower than topo elevation
self.elev[i,j] = tp
self.depth[i,j] = self.elev[i,j] - self.topo[i,j] # calculate water depth
# water_tile is a class inheretant from box class
flood_water = water_tile((0,0,0),self.dx,self.depth[i,j]* scale,self.dx,(colval,0,0))
flood.append(flood_water)
flood[-1].x = self.xmin + j* self.dx
flood[-1].y = self.ymax - i* self.dx
flood[-1].z = self.elev[i,j]* scale
flood[-1].depth = self.elev[i,j]-self.topo[i,j]
if tmax < 1: # if max of depth less than 1, set color of cell random red
colval = uniform(0.6,1)
else:
colval = (flood[-1].z - tmin) / (tmax - tmin)
print ("Maximum depth:")
print tmax,time
flood[-1].depth = self.elev[i,j]-self.topo[i,j]
for i in range(self.imin,self.imax):
for j in range(self.jmin,self.jmax): #update o_elev and o_Q
self.o_elev[i,j] = self.elev[i,j]
self.o_Q[i,j] = self.Q[i,j]
if center == 1:
xc = self.xmin + (self.dx * ((self.jmin+self.jmax)/2))
yc = self.ymax - (self.dx * ((self.imin+self.imax)/2))
'''determine center'''
scene.center = vector(xc, yc, 0)
print("elev[r,c],z, and topo:"), self.elev[r,c], self.elev[r,c]/2.0 + self.topo[r,c], self.topo[r,c]
print "Depth[r,c]:", self.depth[r,c]
time = time + self.t_step
print"tiem=:",time
'''draw graph the depth of the lowest cell and two adjacent cells'''
Tcurve.plot(pos=(time, self.depth[r,c]))
Tcurve2.plot(pos=(time, self.depth[r,c-1]))
Tcurve3.plot(pos=(time, self.depth[r,c-2]))
rate(100000000))
Code for class water_tile:
from Numeric import *
from visual import *
from random import uniform
'''A inheretant class from box, includes property of depth '''
class water_tile(box):
def __init__(self,pos = (0,0,0),length = 1, width = 1, height = 1, color = (0,uniform(0,1),0)):
box.__init__(self, length=length, width=width , height=height, color=color)
self.depth = 0
Main program:
from visual import *
from topo import *
from water10 import *
#import topography
UM_map = topo("umned3_2.txt")
#UM_flood = water("umned3_2.txt",10,128,10,85)
UM_flood = water("umned3_2.txt",0,50,0,50,t_step =0.2)
#UM_map.draw_map(100, 138, 80,101, 10, 1)
UM_map.line_3d(0, 138, 0,101, 10, 1)
#UM_map.line_3d(0, 50, 0,50, 10)
#UM_map.draw_map(0, 138, 0,101, 10,1)
print UM_map.area()
print UM_map.rainfall(50)
##print "total preciptation:"
##print UM_flood.P *100*2000*pow(UM_flood.dx,2)
UM_flood.draw_flood(10, 1)
Code for class topo:
from Numeric import *
from string import *
from visual import *
'''A revised version of Dr. Urbano's raster_map class, add some functions. '''
class topo:
def __init__(self, rasterfile): #datarr, xmin, ymax, dx, nrows, ncols):
infile = open(rasterfile,"r")
ct = 0
line = infile.readline()
while line:
ct += 1
l = split(strip(line))
if ct == 1:
ncols = int(l[1])
if ct == 2:
nrows = int(l[1])
if ct == 3:
xmin = float(l[1])
if ct == 4:
ymin = float(l[1])
if ct == 5:
dx = float(l[1])
if ct == 7:
nodata = l[1]
if ct >= 7:
if ct == 7:
topo = zeros((nrows*ncols,), Float)
t_ct = -1
for i in l:
t_ct += 1
if i == nodata: #set no-data values to zero
topo[t_ct] = 0.0
else:
topo[t_ct] = float(i)-80 # standardization
line = infile.readline()
infile.close
ymax = ymin + (dx * (nrows-1)) # top left corner
'''datarr is a 2d array '''
self.xmin = xmin
self.ymax = ymax
self.dx = dx
self.nrows = nrows
self.ncols = ncols
self.data = resize(topo, (nrows, ncols))
def get_val(self, pos):
r, c = self.get_rc(pos)
return self.data[r,c]
def get_rc(self, pos):
r = int(((self.ymax - pos.y + self.dx/2.0) ) /self.dx )
c = int(((pos.x - self.xmin + self.dx/2.0) ) /self.dx)
return (r, c)
def get_lowest(self,imin,imax,jmin,jmax):
'''To get the lowest point in topo[imin:imax,jmin:jmax] and return the # of rows and columns '''
min_v = self.topo[imin,jmin]
r = imin
c = jmin
for i in range(imin,imax):
for j in range(jmin,jmax):
if self.topo[i,j] < min_v:
min_v = self.topo[i,j]
r = i
c = j
return r,c,min_v
def draw_map(self, imin=0, imax=-99999, jmin=0, jmax=-99999, scale = 1.0, center = 0):
''' draw a specific region of the raster map'''
self.scale = scale
default_val = -99999
if imax == default_val:
imax = self.nrows
if jmax == default_val:
jmax = self.ncols
print "drawing map "
print "imin, imax ", imin, imax
print "jmin, jmax ", jmin, jmax
tdata = self.data[imin:imax,jmin:jmax]
#draw in topography
tmax = max(max(tdata))
tmin = min(min(tdata))
for i in range(imin,imax):
for j in range(jmin,jmax):
ipos = self.xmin + j* self.dx
jpos = self.ymax - i* self.dx
kpos = self.data[i,j] #self.get_val(vector(ipos,jpos,0))
colval = (kpos - tmin) / (tmax - tmin)
box(pos=(ipos,jpos, 0.5*kpos*scale),
length = self.dx, height = self.dx, width = self.data[i,j]*scale,
color = (colval,colval,colval))
#center scene on area
#print self.xmin + jmin* self.dx, self.ymax - imax* self.dx
if center == 1:
xc = self.xmin + (self.dx * ((jmin+jmax)/2))
yc = self.ymax - (self.dx * ((imin+imax)/2))
print "center", xc, yc
scene.center = vector(xc, yc, 0)
def draw_boundary(self):
bound = curve(color=color.blue)
bound.append(pos=(self.xmin, self.ymax,self.data[0,0]))
bound.append(pos=(self.xmin+self.dx*self.ncols, self.ymax,self.data[0,0]))
bound.append(pos=(self.xmin+self.dx*self.ncols,
self.ymax-self.dx*self.nrows,self.data[0,0]))
bound.append(pos=(self.xmin,
self.ymax-self.dx*self.nrows,self.data[0,0]))
bound.append(pos=(self.xmin, self.ymax,self.data[0,0]))
def line_3d(self, imin=0, imax=-99999, jmin=0, jmax=-99999, scale=1.0, center=1):
''' draw a specific region of the raster map as a wire mesh
imin - minimum column value to be drawn
imax - maximum column value
jmin - miinimum row
jmax - maximum row
scale - to add a vertical exageration
center - to center scene on area drawn if center == 1'''
self.scale = scale
default_val = -99999
if imax == default_val:
imax = self.nrows
if jmax == default_val:
jmax = self.ncols
print
tdata = self.data[imin:imax,jmin:jmax]
self.imin=imin
self.imax=imax
self.jmin=jmin
self.jmax=jmax
for i in range(imin, imax):
lin = curve(color=(0,1,0))
for j in range (jmin, jmax):
ipos = self.xmin + j* self.dx
jpos = self.ymax - i* self.dx
kpos = self.data[i,j]
lin.append(pos=(ipos,jpos,kpos*scale))
#print lin.pos[-1]
for j in range (jmin, jmax):
lin = curve(color=(0,1,0))
for i in range(imin, imax):
ipos = self.xmin + j* self.dx
jpos = self.ymax - i* self.dx
kpos = self.data[i,j]
lin.append(pos=(ipos,jpos,kpos*scale))
#center of scene of area
#print self.xmin + jmin* self.dx, self.ymax - imax* self.dx
if center == 1:
xc = self.xmin + (self.dx * ((jmin+jmax)/2))
yc = self.ymax - (self.dx * ((imin+imax)/2))
print "center", xc, yc
scene.center = vector(xc, yc, 0)
def local_3x3(self, pos):
#find the local 3x3 matrix of topography around the cell centered at pos
r, c = self.get_rc(pos)
#print "r,c=", r, c
return self.data[r-1:r+2, c-1:c+2]
def area(self):
x = self.dx * self.ncols
y = self.dx * self.nrows
area = x * y
return area
def rainfall(self, time = 60, r_rate = 0.0002117):
'''r_rate: mm/minute, time: minutes'''
Q = r_rate * time * self.area()
return Q
def low_points(self,arr):
r1 = 0
c1 = 0
r2 = 0
c2 = 1
inter = 0
low_point1 = arr[0,0]
low_point2 = arr[0,1]
if low_point1 > low_point2:
inter = low_point1
low_point1 = low_point2
low_point2 = inter
c1 = 1
c2 = 0
for i in range(0,3):
for j in range(0,3):
if not(i == 1 and j == 1):
if arr[i,j] < low_point2:
if arr[i,j] < low_point1:
low_point2 = low_point1
r2 = r1
c2 = c1
low_point1 = arr[i,j]
r1 = i
c1 = j
if arr[i,j] > low_point1:
low_point2 = arr[i,j]
r2 = i
c2 = j
print("results"),r1,c1,low_point1,r2,c2,low_point2
return r1,c1,low_point1,r2,c2,low_point2
def slope(self, i1, j1, i2, j2):
slope = (self.data[i1,j1] - self.data[i2,j2])/self.dx
return slope

