naruhodo.core.KnowledgeCoreJa module
import networkx as nx from naruhodo.utils.communication import Subprocess from naruhodo.utils.misc import preprocessText from naruhodo.backends.cabocha import CaboChunk, CabochaClient from naruhodo.utils.dicts import MeaninglessDict, SubDict, ObjDict, ObjPostDict, ObjPassiveSubDict, SubPassiveObjDict, NEList, EntityTypeDict, ParallelDict from naruhodo.core.DependencyCoreJa import DependencyCoreJa class KnowledgeCoreJa(DependencyCoreJa): """Analyze the input text and store the information into a knowledge structure graph(KSG).""" def __init__(self, autosub=False): """Initialize an analyzer for KSG.""" self.G = nx.DiGraph() self.autosub = autosub """ Graph object of this analyzer. It is actually a networkx directed graph object(DiGraph), so you can apply all operations available to DiGraph object using networkx. """ self.entityList = [dict() for x in range(len(NEList))] """ List of entities appeared during this analysis round. """ self.proList = list() """ List of pronouns appeared during this analysis round. """ self.pos = 0 """ Current position of the analyzer. """ self.proc = Subprocess('cabocha -f1') """ Communicator to backend for KnowledgeAnalyzer. """ def add(self, inp, pos): """Take in a string input and add it to the knowledge structure graph(KSG).""" self.pos = pos self.para = list() # Call backend for dependency parsing. cabo = CabochaClient() cabo.add(self.proc.query(inp), self.pos) pool = [cabo.root] plist = [cabo.root] self.vlist = dict() # Use BFS to get a list of nodes. while pool: pid = pool.pop(0) for cid in cabo.childrenList[pid]: pool.append(cid) plist.insert(0, cid) # Add nodes using plist(from leaves to roots). for i in range(len(plist)): pid = plist[i] self._addChildren(pid, cabo.chunks) self._processPara() # Return here if self.autosub is False. if not self.autosub: return # If root has no subject, add omitted subject node. if self.G.nodes[cabo.chunks[cabo.root].main]['sub'] == '': omitted = CaboChunk(-1, cabo.root) omitted.main = "省略される主体[{0}@{1}]".format(self.pos, 0) omitted.func = "(省略)" omitted.type = 0 omitted.pro = 7 omitted.surface = "省略される主体" omitted.yomi = "ショウリャクサレルシュゴ" self._addNode(omitted) self._addEdge(omitted.main, cabo.chunks[cabo.root].main, label="(省略)主体", etype="sub") self.G.nodes[cabo.chunks[cabo.root].main]['sub'] = omitted.main # Add autosub for i in range(len(plist)): pid = plist[i] if cabo.chunks[pid].type in [1, 2] and self.G.nodes[cabo.chunks[pid].main]['sub']== "": self._addEdge(self.G.nodes[cabo.chunks[cabo.root].main]['sub'], cabo.chunks[pid].main, label="主体候補", etype="autosub") self.G.nodes[cabo.chunks[pid].main]['sub'] = self.G.nodes[cabo.chunks[cabo.root].main]['sub'] def _addChildren(self, pid, chunks): """Add children following rules.""" if chunks[pid].type in [0, -1]: self._addEntity(pid, chunks) else: self._addPredicate(pid, chunks) def _processPara(self): """Process parallel words pairs.""" if self.para: for pair in self.para: # Add A properties to B for key in self.G.successors(pair[0]): if key != pair[1]: self._addEdge(pair[1], key, label=self.G.edges[pair[0], key]['label'], etype=self.G.edges[pair[0], key]['type']) for key in self.G.predecessors(pair[0]): if key != pair[1]: self._addEdge(key, pair[1], label=self.G.edges[key, pair[0]]['label'], etype=self.G.edges[key, pair[0]]['type']) # Add B properties to A for key in self.G.successors(pair[1]): if key != pair[0]: self._addEdge(pair[0], key, label=self.G.edges[pair[1], key]['label'], etype=self.G.edges[pair[1], key]['type']) for key in self.G.predecessors(pair[1]): if key != pair[0]: self._addEdge(key, pair[0], label=self.G.edges[key, pair[1]]['label'], etype=self.G.edges[key, pair[1]]['type']) def _addEntity(self, pid, chunks): """Add parent nodes that are nouns.""" parent = chunks[pid] sub = None # Find subject for i in range(len(parent.children)): child = chunks[parent.children[i]] if child.func in SubDict: sub = child if child.func == "では": if child.negative != 0 or any([val.negative != 0 for key, val in self.G.successors(child.main)]): pass else: sub = None if sub: self._addNode(parent, sub=sub.main) self._addEdge(sub.main, parent.main, label="陳述", etype="stat") else: self._addNode(parent) # Lopp through all children for i in range(len(parent.children)): child = chunks[parent.children[i]] # If child is noun if child.func in SubDict: if child.func == "では": if child.negative != 0 or any([val.negative != 0 for key, val in self.G.successors(child.main)]): pass else: self._addNode(child) self._addEdge(child.main, parent.main, label=child.func, etype="attr") elif child.type == 0 and child.func in ["と", "などと"] and child.id + 1 == parent.id and preprocessText(chunks[parent.parent].main) not in ["交代", "交換"]: self._addNode(child) self._addEdge(child.main, parent.main, label="並列", etype="para") self._addEdge(parent.main, child.main, label="並列", etype="para") self.para.append([child.main, parent.main]) elif child.type == 0 and child.func in ParallelDict and child.id + 1 == parent.id: self._addNode(child) self._addEdge(child.main, parent.main, label="並列", etype="para") self._addEdge(parent.main, child.main, label="並列", etype="para") self.para.append([child.main, parent.main]) else: self._addNode(child) self._addEdge(child.main, parent.main, label=child.func, etype="attr") def _addPredicate(self, pid, chunks): """Add children following rules.""" parent = chunks[pid] sub = None obj = None aux = list() auxlabel = "" # 1st round find absolute subject & object for i in range(len(parent.children)): child = chunks[parent.children[i]] # Process by categories. if child.func in SubDict: sub = child elif child.func in ObjDict: obj = child # 2nd round find potential subject & object with aux. for i in range(len(parent.children)): child = chunks[parent.children[i]] # Process by categories. if child.func in SubDict or child.func in ObjDict: continue elif child.func in ObjPostDict: if not obj and child.type in EntityTypeDict: obj = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) elif child.func in SubPassiveObjDict: if parent.passive == 1: if not obj and child.type in EntityTypeDict: obj = child elif not sub and child.type in EntityTypeDict: sub = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) else: if not sub and child.type in EntityTypeDict: sub = child elif not obj and child.type in EntityTypeDict: obj = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) elif child.func in ObjPassiveSubDict: if parent.passive == 1: if not sub and child.type in EntityTypeDict: sub = child elif not obj and child.type in EntityTypeDict: obj = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) else: if not obj and child.type in EntityTypeDict: obj = child elif not sub and child.type in EntityTypeDict: sub = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) if parent.passive == 0: # Add parent and subject. # if sub and obj: # parent.main = "<{0}>[{2}]{1}".format(sub.main, parent.main, obj.main) # elif sub: # parent.main = "<{0}>[NONE]{1}".format(sub.main, parent.main) # elif obj: # parent.main = "<NONE>[{1}]{0}".format(parent.main, obj.main) if sub: parent.main = "<{0}>{1}".format(sub.main, parent.main) self._addNode(parent, sub=sub.main) if not self.G.has_node(sub.main): self._addNode(sub) self._addEdge(sub.main, parent.main, label="主体\n", etype="sub") else: self._addNode(parent) # Add object. if obj: if not self.G.has_node(obj.main): self._addNode(obj) self._addEdge(parent.main, obj.main, label="客体\n" + auxlabel, etype="obj") else: # Add obj as sub # if sub and obj: # parent.main = "<{0}>[{2}]{1}".format(sub.main, parent.main, obj.main) # elif obj: # parent.main = "<{0}>[NONE]{1}".format(obj.main, parent.main) # elif sub: # parent.main = "<NONE>[{1}]{0}".format(parent.main, sub.main) if obj: parent.main = "<{0}>{1}".format(obj.main, parent.main) self._addNode(parent, sub=obj.main) if not self.G.has_node(obj.main): self._addNode(obj) self._addEdge(obj.main, parent.main, label="主体\n", etype="sub") else: self._addNode(parent) # Add sub as obj if sub: if not self.G.has_node(sub.main): self._addNode(sub) self._addEdge(parent.main, sub.main, label="客体\n", etype="obj") # # Add obj as aux. # if obj: # aux.append(obj.id) # auxlabel += "[{0}]\n".format(obj.surface) self._processAux(aux, parent.main, chunks) def _processAux(self, aux, pname, chunks): """Process aux list if any.""" if len(aux) > 0: for nid in aux: if not self.G.has_node(chunks[nid].main): self._addNode(chunks[nid]) if chunks[nid].main[-2:] in ["ため", "為め", "爲め"] or chunks[nid].main[-1] in ["爲", "為"]: self._addEdge(chunks[nid].main, pname, label="因果関係候補", etype="cause") else: self._addEdge(chunks[nid].main, pname, label=chunks[nid].func, etype="aux")
Module variables
var EntityTypeDict
var MeaninglessDict
var NEList
var ObjDict
var ObjPassiveSubDict
var ObjPostDict
var ParallelDict
var SubDict
var SubPassiveObjDict
Classes
class KnowledgeCoreJa
Analyze the input text and store the information into a knowledge structure graph(KSG).
class KnowledgeCoreJa(DependencyCoreJa): """Analyze the input text and store the information into a knowledge structure graph(KSG).""" def __init__(self, autosub=False): """Initialize an analyzer for KSG.""" self.G = nx.DiGraph() self.autosub = autosub """ Graph object of this analyzer. It is actually a networkx directed graph object(DiGraph), so you can apply all operations available to DiGraph object using networkx. """ self.entityList = [dict() for x in range(len(NEList))] """ List of entities appeared during this analysis round. """ self.proList = list() """ List of pronouns appeared during this analysis round. """ self.pos = 0 """ Current position of the analyzer. """ self.proc = Subprocess('cabocha -f1') """ Communicator to backend for KnowledgeAnalyzer. """ def add(self, inp, pos): """Take in a string input and add it to the knowledge structure graph(KSG).""" self.pos = pos self.para = list() # Call backend for dependency parsing. cabo = CabochaClient() cabo.add(self.proc.query(inp), self.pos) pool = [cabo.root] plist = [cabo.root] self.vlist = dict() # Use BFS to get a list of nodes. while pool: pid = pool.pop(0) for cid in cabo.childrenList[pid]: pool.append(cid) plist.insert(0, cid) # Add nodes using plist(from leaves to roots). for i in range(len(plist)): pid = plist[i] self._addChildren(pid, cabo.chunks) self._processPara() # Return here if self.autosub is False. if not self.autosub: return # If root has no subject, add omitted subject node. if self.G.nodes[cabo.chunks[cabo.root].main]['sub'] == '': omitted = CaboChunk(-1, cabo.root) omitted.main = "省略される主体[{0}@{1}]".format(self.pos, 0) omitted.func = "(省略)" omitted.type = 0 omitted.pro = 7 omitted.surface = "省略される主体" omitted.yomi = "ショウリャクサレルシュゴ" self._addNode(omitted) self._addEdge(omitted.main, cabo.chunks[cabo.root].main, label="(省略)主体", etype="sub") self.G.nodes[cabo.chunks[cabo.root].main]['sub'] = omitted.main # Add autosub for i in range(len(plist)): pid = plist[i] if cabo.chunks[pid].type in [1, 2] and self.G.nodes[cabo.chunks[pid].main]['sub']== "": self._addEdge(self.G.nodes[cabo.chunks[cabo.root].main]['sub'], cabo.chunks[pid].main, label="主体候補", etype="autosub") self.G.nodes[cabo.chunks[pid].main]['sub'] = self.G.nodes[cabo.chunks[cabo.root].main]['sub'] def _addChildren(self, pid, chunks): """Add children following rules.""" if chunks[pid].type in [0, -1]: self._addEntity(pid, chunks) else: self._addPredicate(pid, chunks) def _processPara(self): """Process parallel words pairs.""" if self.para: for pair in self.para: # Add A properties to B for key in self.G.successors(pair[0]): if key != pair[1]: self._addEdge(pair[1], key, label=self.G.edges[pair[0], key]['label'], etype=self.G.edges[pair[0], key]['type']) for key in self.G.predecessors(pair[0]): if key != pair[1]: self._addEdge(key, pair[1], label=self.G.edges[key, pair[0]]['label'], etype=self.G.edges[key, pair[0]]['type']) # Add B properties to A for key in self.G.successors(pair[1]): if key != pair[0]: self._addEdge(pair[0], key, label=self.G.edges[pair[1], key]['label'], etype=self.G.edges[pair[1], key]['type']) for key in self.G.predecessors(pair[1]): if key != pair[0]: self._addEdge(key, pair[0], label=self.G.edges[key, pair[1]]['label'], etype=self.G.edges[key, pair[1]]['type']) def _addEntity(self, pid, chunks): """Add parent nodes that are nouns.""" parent = chunks[pid] sub = None # Find subject for i in range(len(parent.children)): child = chunks[parent.children[i]] if child.func in SubDict: sub = child if child.func == "では": if child.negative != 0 or any([val.negative != 0 for key, val in self.G.successors(child.main)]): pass else: sub = None if sub: self._addNode(parent, sub=sub.main) self._addEdge(sub.main, parent.main, label="陳述", etype="stat") else: self._addNode(parent) # Lopp through all children for i in range(len(parent.children)): child = chunks[parent.children[i]] # If child is noun if child.func in SubDict: if child.func == "では": if child.negative != 0 or any([val.negative != 0 for key, val in self.G.successors(child.main)]): pass else: self._addNode(child) self._addEdge(child.main, parent.main, label=child.func, etype="attr") elif child.type == 0 and child.func in ["と", "などと"] and child.id + 1 == parent.id and preprocessText(chunks[parent.parent].main) not in ["交代", "交換"]: self._addNode(child) self._addEdge(child.main, parent.main, label="並列", etype="para") self._addEdge(parent.main, child.main, label="並列", etype="para") self.para.append([child.main, parent.main]) elif child.type == 0 and child.func in ParallelDict and child.id + 1 == parent.id: self._addNode(child) self._addEdge(child.main, parent.main, label="並列", etype="para") self._addEdge(parent.main, child.main, label="並列", etype="para") self.para.append([child.main, parent.main]) else: self._addNode(child) self._addEdge(child.main, parent.main, label=child.func, etype="attr") def _addPredicate(self, pid, chunks): """Add children following rules.""" parent = chunks[pid] sub = None obj = None aux = list() auxlabel = "" # 1st round find absolute subject & object for i in range(len(parent.children)): child = chunks[parent.children[i]] # Process by categories. if child.func in SubDict: sub = child elif child.func in ObjDict: obj = child # 2nd round find potential subject & object with aux. for i in range(len(parent.children)): child = chunks[parent.children[i]] # Process by categories. if child.func in SubDict or child.func in ObjDict: continue elif child.func in ObjPostDict: if not obj and child.type in EntityTypeDict: obj = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) elif child.func in SubPassiveObjDict: if parent.passive == 1: if not obj and child.type in EntityTypeDict: obj = child elif not sub and child.type in EntityTypeDict: sub = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) else: if not sub and child.type in EntityTypeDict: sub = child elif not obj and child.type in EntityTypeDict: obj = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) elif child.func in ObjPassiveSubDict: if parent.passive == 1: if not sub and child.type in EntityTypeDict: sub = child elif not obj and child.type in EntityTypeDict: obj = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) else: if not obj and child.type in EntityTypeDict: obj = child elif not sub and child.type in EntityTypeDict: sub = child else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) else: aux.append(child.id) auxlabel += "[{0}]\n".format(child.surface) if parent.passive == 0: # Add parent and subject. # if sub and obj: # parent.main = "<{0}>[{2}]{1}".format(sub.main, parent.main, obj.main) # elif sub: # parent.main = "<{0}>[NONE]{1}".format(sub.main, parent.main) # elif obj: # parent.main = "<NONE>[{1}]{0}".format(parent.main, obj.main) if sub: parent.main = "<{0}>{1}".format(sub.main, parent.main) self._addNode(parent, sub=sub.main) if not self.G.has_node(sub.main): self._addNode(sub) self._addEdge(sub.main, parent.main, label="主体\n", etype="sub") else: self._addNode(parent) # Add object. if obj: if not self.G.has_node(obj.main): self._addNode(obj) self._addEdge(parent.main, obj.main, label="客体\n" + auxlabel, etype="obj") else: # Add obj as sub # if sub and obj: # parent.main = "<{0}>[{2}]{1}".format(sub.main, parent.main, obj.main) # elif obj: # parent.main = "<{0}>[NONE]{1}".format(obj.main, parent.main) # elif sub: # parent.main = "<NONE>[{1}]{0}".format(parent.main, sub.main) if obj: parent.main = "<{0}>{1}".format(obj.main, parent.main) self._addNode(parent, sub=obj.main) if not self.G.has_node(obj.main): self._addNode(obj) self._addEdge(obj.main, parent.main, label="主体\n", etype="sub") else: self._addNode(parent) # Add sub as obj if sub: if not self.G.has_node(sub.main): self._addNode(sub) self._addEdge(parent.main, sub.main, label="客体\n", etype="obj") # # Add obj as aux. # if obj: # aux.append(obj.id) # auxlabel += "[{0}]\n".format(obj.surface) self._processAux(aux, parent.main, chunks) def _processAux(self, aux, pname, chunks): """Process aux list if any.""" if len(aux) > 0: for nid in aux: if not self.G.has_node(chunks[nid].main): self._addNode(chunks[nid]) if chunks[nid].main[-2:] in ["ため", "為め", "爲め"] or chunks[nid].main[-1] in ["爲", "為"]: self._addEdge(chunks[nid].main, pname, label="因果関係候補", etype="cause") else: self._addEdge(chunks[nid].main, pname, label=chunks[nid].func, etype="aux")
Ancestors (in MRO)
- KnowledgeCoreJa
- naruhodo.core.DependencyCoreJa.DependencyCoreJa
- builtins.object
Static methods
def __init__(
self, autosub=False)
Initialize an analyzer for KSG.
def __init__(self, autosub=False): """Initialize an analyzer for KSG.""" self.G = nx.DiGraph() self.autosub = autosub """ Graph object of this analyzer. It is actually a networkx directed graph object(DiGraph), so you can apply all operations available to DiGraph object using networkx. """ self.entityList = [dict() for x in range(len(NEList))] """ List of entities appeared during this analysis round. """ self.proList = list() """ List of pronouns appeared during this analysis round. """ self.pos = 0 """ Current position of the analyzer. """ self.proc = Subprocess('cabocha -f1') """ Communicator to backend for KnowledgeAnalyzer. """
def add(
self, inp, pos)
Take in a string input and add it to the knowledge structure graph(KSG).
def add(self, inp, pos): """Take in a string input and add it to the knowledge structure graph(KSG).""" self.pos = pos self.para = list() # Call backend for dependency parsing. cabo = CabochaClient() cabo.add(self.proc.query(inp), self.pos) pool = [cabo.root] plist = [cabo.root] self.vlist = dict() # Use BFS to get a list of nodes. while pool: pid = pool.pop(0) for cid in cabo.childrenList[pid]: pool.append(cid) plist.insert(0, cid) # Add nodes using plist(from leaves to roots). for i in range(len(plist)): pid = plist[i] self._addChildren(pid, cabo.chunks) self._processPara() # Return here if self.autosub is False. if not self.autosub: return # If root has no subject, add omitted subject node. if self.G.nodes[cabo.chunks[cabo.root].main]['sub'] == '': omitted = CaboChunk(-1, cabo.root) omitted.main = "省略される主体[{0}@{1}]".format(self.pos, 0) omitted.func = "(省略)" omitted.type = 0 omitted.pro = 7 omitted.surface = "省略される主体" omitted.yomi = "ショウリャクサレルシュゴ" self._addNode(omitted) self._addEdge(omitted.main, cabo.chunks[cabo.root].main, label="(省略)主体", etype="sub") self.G.nodes[cabo.chunks[cabo.root].main]['sub'] = omitted.main # Add autosub for i in range(len(plist)): pid = plist[i] if cabo.chunks[pid].type in [1, 2] and self.G.nodes[cabo.chunks[pid].main]['sub']== "": self._addEdge(self.G.nodes[cabo.chunks[cabo.root].main]['sub'], cabo.chunks[pid].main, label="主体候補", etype="autosub") self.G.nodes[cabo.chunks[pid].main]['sub'] = self.G.nodes[cabo.chunks[cabo.root].main]['sub']
Instance variables
var G
var autosub
Graph object of this analyzer. It is actually a networkx directed graph object(DiGraph), so you can apply all operations available to DiGraph object using networkx.
var entityList
List of entities appeared during this analysis round.
var pos
Current position of the analyzer.
var proList
List of pronouns appeared during this analysis round.
var proc
Communicator to backend for KnowledgeAnalyzer.