dsjob.pjb.xml抽出
import sys
#from lxml import etree
from collections import Counter
import glob
import os
import xml.etree.ElementTree as ET
import datetime
import re
from collections import defaultdict
root_stages = []
stages = {}
stage_outpins = {}
stage_inpins = {}
link_to_inpins = {}
link_from_outpins = {}
pin_names = {}
pin_ids = {}
flows = []
terminlal_flows = []
def initialize_global_list():
global job_name
job_name= ""
global flows
flows = []
global link_from_outpins
link_from_outpins = {}
global link_to_inpins
link_to_inpins = {}
global pin_ids
pin_ids = {}
global pin_names
pin_names = {}
global stage_inpins
stage_inpins = {}
global stage_outpins
stage_outpins = {}
global stages
stages = {}
global terminlal_flows
terminlal_flows = []
global root_stages
root_stages = []
def detectDb2WithSort():
for flow in flows :
if flow[0] == 'Container:Start':
start='Container:Start'+":"+flow[1]
else:
start=flow[0]
if flow[-1] == 'Container:End':
end = flow[-2] + ":" 'Container:End'
else:
end=flow[-1]
terminlal_flows.append((start + ":" + end))
counter = Counter(terminlal_flows)
for flow , count in counter.items():
print(f'{job_name},{flow},{count}')
def detectForkJoin():
for flow in flows :
if flow[0] == 'Container:Start':
start='Container:Start'+":"+flow[1]
else:
start=flow[0]
if flow[-1] == 'Container:End':
end = flow[-2] + ":" 'Container:End'
else:
end=flow[-1]
terminlal_flows.append((start + ":" + end))
counter = Counter(terminlal_flows)
for flow , count in counter.items():
print(f'{job_name},{flow},{count}')
def appendNextLink(pre_stage,flow,is_container):
output_pins = []
child_flow_id = 0
for pin, stage in stage_outpins.items() :
if stage == pre_stage:
output_pins.append(pin)
if is_container == True :
for outpin_id in output_pins:
l_flow = flow.copy()
outpin_name = pin_names[outpin_id]
link_name = link_from_outpins[outpin_name]
l_flow.append(pre_stage + ":" + link_name)
inpin_id = link_to_inpins[link_name]
inpin_name = pin_ids[inpin_id]
nest_stage = stage_inpins[inpin_name]
l_flow.append(nest_stage)
if stages[nest_stage]["stage_type"] != None :
appendNextLink(nest_stage, l_flow, False)
else :
for outpin_id in output_pins :
l_flow = flow.copy()
outpin_name = pin_names[outpin_id]
link_name = link_from_outpins[outpin_name]
l_flow.append(link_name)
inpin_id = link_to_inpins[link_name]
inpin_name = pin_ids[inpin_id]
nest_stage = stage_inpins[inpin_name]
l_flow.append(nest_stage)
if stages[nest_stage]["stage_type"] != None :
appendNextLink(nest_stage, l_flow, False)
else :
print(flow)
flows.append(flow)
if len(output_pins) == 0 :
print(flow)
flows.append(flow)
def getFlow():
stages["Container_End"] = {"stage_type": "Container_End"}
for root_stage in root_stages:
flow = []
flow.append(root_stage)
appendNextLink(root_stage,flow,False)
for stage_name,property in stages.items():
flow = []
if property["stage_type"] == None :
appendNextLink(stage_name,flow,True)
def getchild(f,element):
if element.tag == '{http:///com/ibm/datastage/ai/dtm/ds.ecore}DSJobDefSDO' or element.tag == '{http:///com/ibm/datastage/ai/dtm/ds.ecore}DSSharedContainerDefSDO':
global job_name
job_name=element.attrib.get('name')
if element.tag == 'contains_JobObject' and element.attrib.get('{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSStageSDO':
root_stage_name = element.attrib.get('name')
stage_type = element.attrib.get('stageType')
inputPins = element.attrib.get('inputPins')
outputPins = element.attrib.get('outputPins')
stages[root_stage_name] = {"stage_type" : stage_type }
if inputPins == None :
root_stages.append(root_stage_name)
if inputPins != None:
pins = inputPins.split("|")
for pin in pins:
stage_inpins[pin] = root_stage_name
if outputPins != None:
pins = outputPins.split("|")
for pin in pins:
stage_outpins[pin] = root_stage_name
if element.tag == 'has_DSDesignView' and element.attrib.get('name') == "コンテナー":
root_stage_name = "Container"
inputPins = element.attrib.get('inputPins')
outputPins = element.attrib.get('outputPins')
root_stages.append(root_stage_name+":"+"Start")
stages[root_stage_name+":"+"Start"] = {"stage_type" : "ContainerStart" }
stages[root_stage_name+":"+"End"] = {"stage_type" : "ContainerEnd" }
if inputPins != None:
pins = inputPins.split("|")
for pin in pins:
stage_inpins[pin] = root_stage_name+":"+"End"
if outputPins != None:
pins = outputPins.split("|")
for pin in pins:
stage_outpins[pin] = root_stage_name+":"+"Start"
if element.tag == 'has_OutputPin' and element.attrib.get('{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSOutputPinSDO':
pin_id = element.attrib.get('{http://www.omg.org/XMI}id')
pin_name = element.attrib.get('internalID')
pin_names[pin_name] = pin_id
pin_ids[pin_id] = pin_name
if element.tag == 'has_InputPin' and element.attrib.get('{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSInputPinSDO':
pin_id = element.attrib.get('{http://www.omg.org/XMI}id')
pin_name = element.attrib.get('internalID')
pin_names[pin_name] = pin_id
pin_ids[pin_id] = pin_name
if element.tag == 'contains_JobObject' and element.attrib.get(
'{http://www.w3.org/2001/XMLSchema-instance}type') == 'com.ibm.datastage.ai.dtm.ds:DSLinkSDO':
link_name = element.attrib.get('name')
to_InputPin = element.attrib.get('to_InputPin')
from_OutputPin = element.attrib.get('from_OutputPin')
link_from_outpins[from_OutputPin] = link_name
link_to_inpins[link_name] = to_InputPin
for child in element:
getchild(f,child)
return 0
def main( args=sys.argv ):
# 開始時間出力
now = datetime.datetime.now()
time = now.__format__("%Y%m%d_%H%M%S")
print(f'start : {time}')
len_args = len(args)
i = 1
while (i < len_args):
if (args[i] == "--indir" and (i + 1) < len_args):
indir = args[i + 1]
i += 1
elif (args[i] == "--outdir" and (i + 1) < len_args):
outdir = args[i + 1]
i += 1
i += 1
filelist = glob.glob(indir + '*.xml')
if os.path.exists(outdir + 'ds_job_analyze.csv'):
os.remove(outdir + 'ds_job_analyze.csv')
with open(outdir + 'ds_job_analyze.csv','a') as f:
f.write('file_name,job_name,root_stage,flow_path\n')
#情報抽出
counter = 0
for xmlfile in filelist :
#parser = etree.XMLParser(recover=True)
#tree = etree.parse(xmlfile, parser=parser)
tree = ET.ElementTree(file=xmlfile)
root = tree.getroot()
#変数初期化
initialize_global_list()
#XMlを読み込んでメタデータを変数に格納
getchild(f,root)
getFlow()
detectForkJoin()
for root_stage in root_stages :
f.write(f'{xmlfile},{job_name},flow_path\n')
if counter == round(len(filelist)/10) :
counter += 1
print(f'{counter}/{len(filelist)}')
# 終了時間出力
now = datetime.datetime.now()
time = now.__format__("%Y%m%d_%H%M%S")
print(f'end : {time} ')
if __name__ == "__main__":
sys.exit( main() )
コメント
コメントを投稿