-
Notifications
You must be signed in to change notification settings - Fork 75
Open
Description
- Nipyapi version: 0.21.0
- NiFi version: 2.0.0-M4
- NiFi-Registry version: N/A
- Python version:
- Operating System:
Description
I'm unclear how to use ProcessGroupsApi.upload_progress_group(). I've tried a few ways but they all threw exceptions.
What I Did
def upload_top_level_flow(api_client, pg_name, file_data):
pg_api = nipyapi.nifi.apis.process_groups_api.ProcessGroupsApi(api_client=api_client)
print(f"Creating new process group: {pg_name}")
my_pg = pg_api.upload_process_group(
id=pg_name,
group_name=pg_name,
position_x=400.0,
position_y=100.0,
client_id='12345',
file=file_data
)
def main():
# Suppress SSL warnings
requests.packages.urllib3.disable_warnings()
nifi_host = 'https://xxxxx:18443/nifi-api'
username = 'admin'
password = 'xxxx'
api_client = setup_nifi_connection(nifi_host, username, password)
print('\nattempt: 1')
try:
with open('/Users/chris.snow/Downloads/NiFi_Flow(3).json', 'rb') as file:
upload_top_level_flow(api_client, 'MYPG', file)
except Exception as e:
print(e)
print('\nattempt: 2')
try:
with open('/Users/chris.snow/Downloads/NiFi_Flow(3).json', 'rb') as file:
upload_top_level_flow(api_client, 'MYPG', file.read())
except Exception as e:
print(str(e)[0:100])
print('\nattempt: 3')
try:
upload_top_level_flow(api_client, 'MYPG', '/Users/chris.snow/Downloads/NiFi_Flow(3).json')
except Exception as e:
print(str(e)[0:100])
if __name__ == "__main__":
main()The response:
NiFi connection successfully set up with authentication.
attempt: 1
Creating new process group: MYPG
expected str, bytes or os.PathLike object, not BufferedReader
attempt: 2
Creating new process group: MYPG
[Errno 63] File name too long: b'{"flowContents":{"identifier":"e87ff5e2-4a7c-3725-a3ee-148beb2fbabc
attempt: 3
Creating new process group: MYPG
a bytes-like object is required, not 'float'
Urgency
Not blocking
Metadata
Metadata
Assignees
Labels
No labels