"""
Mask to Graph Utilities (mask_to_graph.py)
==========================================
Functions for converting a binary channel mask to a graphical representation.
"""
from loguru import logger
import cv2
import numpy as np
from skimage import morphology, measure
from rivgraph import walk
import rivgraph.ln_utils as lnu
import rivgraph.im_utils as imu
from rivgraph.ordered_set import OrderedSet
[docs]def skel_to_graph(Iskel):
"""
Resolves a skeleton into its consitutent links and nodes.
This function finds a starting point to walk along a skeleton, then begins
the walk. Rules are in place to ensure the network is fully resolved. One
of the key algorithms called by this function involves the identfication of
branchpoints in a way that eliminates unnecessary ones to create a parsimonious
network. Rules are baked in for how to walk along the skeleton in cases
where multiple branchpoints are clustered or there are multiple possible
links to walk along.
Note that some minor adjustments to the skeleton may be made in order to
reduce the complexity of the network. For example, in the case of a "+"
with a missing center pixel in the skeleton, this function will add the
pixel to the center to enable the use of a single branchpoint as opposed to
four.
The takeaway is that there is no guarantee that the input skeleton will
be perfectly preserved when network-ifying. One possible workaround, if
perfect preservation is required, is to resample the skeleton to double the
resolution.
Parameters
----------
Iskel : np.ndarray
Binary image of a skeleton.
Returns
-------
links : dict
Links of the network with four properties:
1. 'id' - a list of unique ids for each link in the network
2. 'idx' - a list containing the pixel indices within Iskel that defines the link. These are ordered.
3. 'conn' - a list of 2-element lists containing the node ids that the link is connected to.
4. 'n_networks' - the number of disconnected networks found in the skeleton
nodes : dict
Nodes of the network with four properties:
1. 'id' - a list of unique ids for each node in the network
2. 'idx' - the index within Iskel of the node location
3. 'conn' - a list of lists containing the link ids connected to this node
"""
def check_startpoint(spidx, Iskel):
"""
Returns True if a skeleton pixel's first neighbor is not a branchpoint
(i.e. the start pixel is valid for a walk), else returns False.
Parameters
----------
spidx : int
Index within Iskel of the point to check.
Iskel : np.array
Image of skeletonized mask.
Returns
-------
chk_sp : bool
True if the startpoint is valid; else False.
"""
neighs = walk.walkable_neighbors([spidx], Iskel)
isbp = walk.is_bp(neighs.pop(), Iskel)
if isbp == 0:
chk_sp = True
else:
chk_sp = False
return chk_sp
def find_starting_pixels(Iskel):
"""
Finds an endpoint pixel to begin walking to resolve network.
Parameters
----------
Iskel : np.array
Image of skeletonized mask.
Returns
-------
startpoints : list
Possible starting points for the walk.
"""
# Get skeleton connectivity
eps = imu.skel_endpoints(Iskel)
eps = set(np.ravel_multi_index(eps, Iskel.shape))
# Get one endpoint per connected component in network
rp, _ = imu.regionprops(Iskel, ['coords'])
startpoints = []
for ni in rp['coords']:
idcs = set(np.ravel_multi_index((ni[:,0], ni[:,1]), Iskel.shape))
# Find a valid endpoint for each connected component network
poss_id = idcs.intersection(eps)
if len(poss_id) > 0:
for pid in poss_id:
if check_startpoint(pid, Iskel) is True:
startpoints.append(pid)
break
return startpoints
# Pad the skeleton image to avoid edge problems when walking along skeleton
initial_dims = Iskel.shape
npad = 20
Iskel = np.pad(Iskel, npad, mode='constant', constant_values=0)
dims = Iskel.shape
# Find starting points of all the networks in Iskel
startpoints = find_starting_pixels(Iskel)
# Initialize topology storage vars
nodes = dict()
nodes['idx'] = OrderedSet([])
nodes['conn'] = []
links = dict()
links['idx'] = [[]]
links['conn'] = [[]]
links['id'] = OrderedSet([])
# Initialize first links emanting from all starting points
for i, sp in enumerate(startpoints):
links = lnu.link_updater(links, len(links['id']), sp, i)
nodes = lnu.node_updater(nodes, sp, i)
first_step = walk.walkable_neighbors(links['idx'][i], Iskel)
links = lnu.link_updater(links, i, first_step.pop())
links['n_networks'] = i+1
# Initialize set of links to be resolved
links2do = OrderedSet(links['id'])
while links2do:
linkid = next(iter(links2do))
linkidx = links['id'].index(linkid)
walking = 1
cantwalk = walk.cant_walk(links, linkidx, nodes, Iskel)
while walking:
# Get next possible steps
poss_steps = walk.walkable_neighbors(links['idx'][linkidx], Iskel)
# Now we have a few possible cases:
# 1) endpoint reached,
# 2) only one pixel to walk to: must check if it's a branchpoint so walk can terminate
# 3) two pixels to walk to: if neither is branchpoint, problem in skeleton. If one is branchpoint, walk to it and terminate link. If both are branchpoints, walk to the one that is 4-connected.
if len(poss_steps) == 0: # endpoint reached, update node, link connectivity
nodes = lnu.node_updater(nodes, links['idx'][linkidx][-1], linkid)
links = lnu.link_updater(links, linkid, conn=nodes['idx'].index(links['idx'][linkidx][-1]))
links2do.remove(linkid)
break # must break rather than set walking to 0 as we don't want to execute the rest of the code
if len(links['idx'][linkidx]) < 4:
poss_steps = list(poss_steps - cantwalk)
else:
poss_steps = list(poss_steps)
if len(poss_steps) == 0: # We have reached an emanating link, so delete the current one we're working on
links, nodes = walk.delete_link(linkid, links, nodes)
links2do.remove(linkid)
walking = 0
elif len(poss_steps) == 1: # Only one option, so we'll take the step
links = lnu.link_updater(links, linkid, poss_steps)
# But check if it's a branchpoint, and if so, stop marching along this link and resolve all the branchpoint links
if walk.is_bp(poss_steps[0], Iskel) == 1:
links, nodes, links2do = walk.handle_bp(linkid, poss_steps[0], nodes, links, links2do, Iskel)
links, nodes, links2do = walk.check_dup_links(linkid, links, nodes, links2do)
walking = 0 # on to next link
elif len(poss_steps) > 1: # Check to see if either/both/none are branchpoints
isbp = []
for n in poss_steps:
isbp.append(walk.is_bp(n, Iskel))
if sum(isbp) == 0:
# Compute 4-connected neighbors
isfourconn = []
for ps in poss_steps:
checkfour = links['idx'][linkidx][-1] - ps
if checkfour in [-1, 1, -dims[1], dims[1]]:
isfourconn.append(1)
else:
isfourconn.append(0)
# Compute noturn neighbors
noturn = walk.idcs_no_turnaround(links['idx'][linkidx][-2:], Iskel)
noturnidx = [n for n in noturn if n in poss_steps]
# If we can walk to a 4-connected pixel, we will
if sum(isfourconn) == 1:
links = lnu.link_updater(links, linkid, poss_steps[isfourconn.index(1)])
# If we can't walk to a 4-connected, try to walk in a direction that does not turn us around
elif len(noturnidx) == 1:
links = lnu.link_updater(links, linkid, noturnidx)
# Else, shit. You've found a critical flaw in the algorithm.
else:
logger.info('idx: {}, poss_steps: {}'.format(links['idx'][linkidx][-1], poss_steps))
raise RuntimeError('Ambiguous which step to take next :( Please raise issue at https://github.com/jonschwenk/RivGraph/issues.')
elif sum(isbp) == 1:
# If we've already accounted for this branchpoint, delete the link and halt
links = lnu.link_updater(links, linkid, poss_steps[isbp.index(1)])
links, nodes, links2do = walk.handle_bp(linkid, poss_steps[isbp.index(1)], nodes, links, links2do, Iskel)
links, nodes, links2do = walk.check_dup_links(linkid, links, nodes, links2do)
walking = 0
elif sum(isbp) > 1:
# In the case where we can walk to more than one branchpoint, choose the
# one that is 4-connected, as this is how we've designed branchpoint
# assignment for complete network resolution.
isfourconn = []
for ps in poss_steps:
checkfour = links['idx'][linkidx][-1] - ps
if checkfour in [-1, 1, -dims[1], dims[1]]:
isfourconn.append(1)
else:
isfourconn.append(0)
# Find poss_step(s) that is both 4-connected and a branchpoint
isbp_and_fourconn_idx = [i for i in range(0,len(isbp)) if isbp[i]==1 and isfourconn[i]==1]
# If we don't have exactly one, shit.
if len(isbp_and_fourconn_idx) != 1:
logger.info('idx: {}, poss_steps: {}'.format(links['idx'][linkidx][-1], poss_steps))
raise RuntimeError('There is not a unique branchpoint to step to.')
else:
links = lnu.link_updater(links, linkid, poss_steps[isbp_and_fourconn_idx[0]])
links, nodes, links2do = walk.handle_bp(linkid, poss_steps[isbp_and_fourconn_idx[0]], nodes, links, links2do, Iskel)
links, nodes, links2do = walk.check_dup_links(linkid, links, nodes, links2do)
walking = 0
# Put the link and node coordinates back into the unpadded
links, nodes = lnu.adjust_for_padding(links, nodes, npad, dims, initial_dims)
# Add indices to nodes--this probably should've been done in network extraction
# but since nodes have unique idx it was unnecessary.
nodes['id'] = OrderedSet(range(0,len(nodes['idx'])))
# Remove duplicate links if they exist; for some single-pixel links,
# duplicates are formed. Ideally the walking code should ensure that this
# doesn't happen, but for now removing duplicates suffices.
links, nodes = lnu.remove_duplicate_links(links, nodes)
return links, nodes
[docs]def skeletonize_mask(Imask):
"""
Skeletonizes an input binary image, typically a mask. Also performs some
skeleton simplification by (1) removing pixels that don't alter connectivity,
and (2) filling small skeleton holes and reskeletonizing.
Parameters
----------
Imask : np.array
Binary image to be skeletonized.
Returns
-------
Iskel : np.array
The skeletonization of Imask.
"""
# Create copy of mask to skeletonize
Iskel = np.array(Imask, copy=True, dtype='bool')
# Perform skeletonization
Iskel = morphology.skeletonize(Iskel)
# Simplify the skeleton (i.e. remove pixels that don't alter connectivity)
Iskel = simplify_skel(Iskel)
# Fill small skeleton holes, re-skeletonize, and re-simplify
Iskel = imu.fill_holes(Iskel, maxholesize=4)
Iskel = morphology.skeletonize(Iskel)
Iskel = simplify_skel(Iskel)
# Fill single pixel holes
Iskel = imu.fill_holes(Iskel, maxholesize=1)
return Iskel
[docs]def skeletonize_river_mask(I, es, padscale=2):
"""
Skeletonizes a binary mask of a river channel network. Differs from
skeletonize mask above by using knowledge of the exit sides of the river
with respect to the mask (I) to avoid edge effects of skeletonization by
mirroring the mask at its ends, then trimming it after processing. As with
skeletonize_mask, skeleton simplification is performed.
Parameters
----------
I : np.array
Binary river mask to skeletonize.
es : str
A two-character string (from N, E, S, or W) that denotes which sides
of the image the river intersects (upstream first) -- e.g. 'NS', 'EW',
'NW', etc.
padscale : int, optional
Pad multiplier that sets the size of the padding. Multplies the blob
size along the axis of the image that the blob intersect to determine
the padding distance. The default is 2.
Returns
-------
Iskel : np.array
The skeletonization of I.
"""
# Crop image
Ic, crop_pads = imu.crop_binary_im(I)
# Pad image (reflects channels at image edges)
Ip, pads = pad_river_im(Ic, es, pm=padscale)
# Skeletonize padded image
Iskel = morphology.skeletonize(Ip)
# Remove padding
Iskel = Iskel[pads[0]:Iskel.shape[0]-pads[1], pads[3]:Iskel.shape[1]-pads[2]]
# Add back what was cropped so skeleton image is original size
crop_pads_add = ((crop_pads[1], crop_pads[3]),(crop_pads[0], crop_pads[2]))
Iskel = np.pad(Iskel, crop_pads_add, mode='constant', constant_values=(0,0))
# Ensure skeleton is prepared for analysis by RivGraph
# Simplify the skeleton (i.e. remove pixels that don't alter connectivity)
Iskel = simplify_skel(Iskel)
# Fill small skeleton holes, re-skeletonize, and re-simplify
Iskel = imu.fill_holes(Iskel, maxholesize=4)
Iskel = morphology.skeletonize(Iskel)
Iskel = simplify_skel(Iskel)
# Fill single pixel holes
Iskel = imu.fill_holes(Iskel, maxholesize=1)
# The handling of edges can leave pieces of the skeleton stranded (i.e.
# disconnected from the main skeleton). Remove those here by keeping the
# largest blob.
Iskel = imu.largest_blobs(Iskel, nlargest=1, action='keep')
return Iskel
[docs]def simplify_skel(Iskel):
"""
This function iterates through all skeleton pixels pixels that have
connectivity > 2. It removes the pixel and checks if the
number of blobs has changed after removal. If so, the pixel is necessary to
maintain connectivity. Otherwise the pixel is not retained. It also adds
pixels to the centers of "+" cases, as this reduces the number
of branchpoints from 4 to 1.
Parameters
----------
Iskel : np.array
Image of the skeleton to simplify.
Returns
-------
Iskel : np.array
The simplified skeleton.
"""
Iskel = np.array(Iskel, dtype=np.uint8)
Ic = imu.im_connectivity(Iskel)
ypix, xpix = np.where(Ic > 2) # Get all pixels with connectivity > 2
for y, x in zip(ypix, xpix):
nv = imu.neighbor_vals(Iskel, x, y)
# Skip edge cases
if np.any(np.isnan(nv)) == True:
continue
# Create 3x3 array with center pixel removed
Inv = np.array([[nv[0], nv[1], nv[2]], [nv[3], 0,
nv[4]], [nv[5], nv[6], nv[7]]], dtype=bool)
# Check the connectivity after removing the pixel, set to zero if unchanged
Ilabeled = measure.label(Inv, background=0, connectivity=2)
if np.max(Ilabeled) == 1:
Iskel[y,x] = 0
# We simplify the network if we actually add pixels to the centers of
# "+" cases, so let's do that.
kern = np.array([[1, 10, 1], [10, 1, 10], [1, 10, 1]], dtype=np.uint8)
Iconv = cv2.filter2D(Iskel, -1, kern)
Iskel[Iconv==40] = 1
return Iskel
[docs]def pad_river_im(I, es, pm=2):
"""
Pads the edges of a binary river image by extending the ends of the river.
Different than mirrored padding in that the "mirror" here is just a
rectangle. This simplifies skeletonization and interpretation in cases
where the channel is complex near the boundaries.
Parameters
----------
I : np.array
Binary image to pad; typically a river channel network.
es : str
A two-character string (from N, E, S, or W) that denotes which sides
of the image the river intersects (upstream first) -- e.g. 'NS', 'EW',
'NW', etc.
pm : int, optional
Pad multiplier that sets the size of the padding. Multplies the blob
size along the axis of the image that the blob intersect to determine
the padding distance. The default is 2.
Returns
-------
Ip : np.array
The padded image..
pads : list
4 entry list containing the number of pixels padded on the [n, s, e, w]
edges of the image.
"""
Ip = I.copy() # so original array is not modified
pads = [0, 0, 0, 0] # saves the number of pixels padded to each [n,s,e,w] edge
if 'n' in es.lower():
rowidcs = np.where(Ip[0,:]==True)[0]
st = np.min(rowidcs)
en = np.max(rowidcs)
pads[0] = (en-st) * pm
# Make pad
addpad = np.zeros((pads[0], Ip.shape[1]), dtype=bool)
addpad[:,rowidcs] = True
# Add pad to image
Ip = np.concatenate((addpad, Ip))
if 's' in es.lower():
rowidcs = np.where(Ip[-1,:]==True)[0]
st = np.min(rowidcs)
en = np.max(rowidcs)
pads[1] = (en-st) * pm
# Make pad
addpad = np.zeros((pads[1], Ip.shape[1]), dtype=bool)
addpad[:,rowidcs] = True
# Add pad to image
Ip = np.concatenate((Ip, addpad))
if 'e' in es.lower():
colidcs = np.where(Ip[:,-1]==True)[0]
st = np.min(colidcs)
en = np.max(colidcs)
pads[2] = (en-st) * pm
# Make pad
addpad = np.zeros((Ip.shape[0], pads[2]), dtype=bool)
addpad[colidcs,:] = True
Ip = np.concatenate((Ip, addpad), axis=1)
if 'w' in es.lower():
colidcs = np.where(Ip[:,0]==True)[0]
st = np.min(colidcs)
en = np.max(colidcs)
pads[3] = (en-st) * pm
# Make pad
addpad = np.zeros((Ip.shape[0], pads[3]), dtype=bool)
addpad[colidcs,:] = True
Ip = np.concatenate((addpad, Ip), axis=1)
return Ip, pads