Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions databricks_cli/jobs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from copy import deepcopy

from databricks_cli.clusters.api import ClusterApi
from databricks_cli.sdk import JobsService


class JobsApi(object):
def __init__(self, api_client):
self.api_client = api_client
self.client = JobsService(api_client)

def create_job(self, json, headers=None):
json = self._convert_cluster_name_to_id(json)
return self.client.client.perform_query('POST', '/jobs/create', data=json, headers=headers)

def list_jobs(self, headers=None):
Expand All @@ -43,6 +49,8 @@ def get_job(self, job_id, headers=None):
return self.client.get_job(job_id, headers=headers)

def reset_job(self, json, headers=None):
# reset should support cluster_name:
json = self._convert_cluster_name_to_id(json)
return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers)

def run_now(self, job_id, jar_params, notebook_params, python_params, spark_submit_params,
Expand All @@ -54,3 +62,40 @@ def _list_jobs_by_name(self, name, headers=None):
jobs = self.list_jobs(headers=headers)['jobs']
result = list(filter(lambda job: job['settings']['name'] == name, jobs))
return result

def clone_job(self, job_id, job_name, headers=None):
job_info = self.get_job(job_id, headers=headers)
if 'settings' not in job_info:
# failure
return job_info

upload_json = deepcopy(job_info['settings'])
upload_json['name'] = job_name

return self.create_job(json=upload_json, headers=headers)

def _convert_cluster_name_to_id(self, json):
"""
If json contains cluster_name instead of existing_cluster_id, convert it to a cluster_id
:return: json
"""

cluster_data = json
if 'new_settings' in json:
cluster_data = json['new_settings']

# early out the easy things
if not json or 'existing_cluster_id' in cluster_data:
return json

if 'cluster_name' in cluster_data:
cluster_id = self._get_cluster_id(cluster_data['cluster_name'])
cluster_data['existing_cluster_id'] = cluster_id
del cluster_data['cluster_name']
return json

def _get_cluster_id(self, cluster_name):
# at this point we might have cluster_name
# lookup the cluster.
clusters_api = ClusterApi(self.api_client)
return clusters_api.get_cluster_id_for_name(cluster_name)
118 changes: 106 additions & 12 deletions databricks_cli/jobs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
from json import loads as json_loads

import click
from cstriggers.core.trigger import QuartzCron
from tabulate import tabulate

from databricks_cli.click_types import OutputClickType, JsonClickType, JobIdClickType
from databricks_cli.jobs.api import JobsApi
from databricks_cli.utils import eat_exceptions, CONTEXT_SETTINGS, pretty_format, json_cli_base, \
truncate_string
from databricks_cli.click_types import OutputClickType, JsonClickType, \
JobIdClickType, ClusterIdClickType, OptionalOneOfOption
from databricks_cli.clusters.api import ClusterApi
from databricks_cli.configure.config import provide_api_client, profile_option, debug_option
from databricks_cli.jobs.api import JobsApi
from databricks_cli.utils import eat_exceptions, CLUSTER_OPTIONS, CONTEXT_SETTINGS, pretty_format, \
json_cli_base, truncate_string
from databricks_cli.version import print_version_callback, version


Expand Down Expand Up @@ -92,14 +95,58 @@ def _jobs_to_table(jobs_json):
return sorted(ret, key=lambda t: t[1].lower())


def list_all_jobs(api_client, cluster_id, cluster_name):
jobs_api = JobsApi(api_client)
jobs_json = jobs_api.list_jobs()

output = jobs_json

if cluster_id or cluster_name:
output = {'jobs': []}
clusters_api = ClusterApi(api_client)
if cluster_name:
cluster_id = clusters_api.get_cluster_id_for_name(cluster_name)

for job in jobs_json.get('jobs'):
settings = job.get('settings')
if settings.get('existing_cluster_id') == cluster_id:
output['jobs'].append(job)

return output


def get_next_runs(jobs_data):
start_iso = '2019-01-01T00:00:00'
end_iso = '2025-01-01T00:00:00'
jobs = jobs_data['jobs']
runs = {}
for job in jobs:
if 'schedule' in job['settings'] and \
'quartz_cron_expression' in job['settings']['schedule']:
expr = job['settings']['schedule']['quartz_cron_expression'].replace("*", "0")
expr = f'{expr} 2020-2030'
cron = QuartzCron(schedule_string=expr, start_date=start_iso, end_date=end_iso)
next_run = cron.next_trigger(isoformat=True)
runs[job['settings']['name']] = next_run.replace('2020-01-01T', '')

return runs


@click.command(context_settings=CONTEXT_SETTINGS,
short_help='Lists the jobs in the Databricks Job Service.')
@click.option('--output', default=None, help=OutputClickType.help, type=OutputClickType())
@click.option('--cluster-id', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS,
type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help,
required=False)
@click.option('--cluster-name', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS,
type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help,
required=False)
@click.option('--output', '-o', 'output_type', default=None,
help=OutputClickType.help, type=OutputClickType())
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def list_cli(api_client, output):
def list_cli(api_client, cluster_id, cluster_name, output_type):
"""
Lists the jobs in the Databricks Job Service.

Expand All @@ -113,12 +160,12 @@ def list_cli(api_client, output):

In table mode, the jobs are sorted by their name.
"""
jobs_api = JobsApi(api_client)
jobs_json = jobs_api.list_jobs()
if OutputClickType.is_json(output):
click.echo(pretty_format(jobs_json))

output = list_all_jobs(api_client=api_client, cluster_id=cluster_id, cluster_name=cluster_name)
if OutputClickType.is_json(output_type):
click.echo(pretty_format(output))
else:
click.echo(tabulate(_jobs_to_table(jobs_json), tablefmt='plain', disable_numparse=True))
click.echo(tabulate(_jobs_to_table(output), tablefmt='plain', disable_numparse=True))


@click.command(context_settings=CONTEXT_SETTINGS,
Expand Down Expand Up @@ -181,6 +228,51 @@ def run_now_cli(api_client, job_id, jar_params, notebook_params, python_params,
click.echo(pretty_format(res))


@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--job-id', required=True, type=JobIdClickType(), help=JobIdClickType.help)
@click.option('--job-name', required=True, help=JobIdClickType.help)
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def clone_cli(api_client, job_id, job_name):
"""
Clones an existing job
"""
click.echo(pretty_format(JobsApi(api_client).clone_job(job_id, job_name)))


@click.command(context_settings=CONTEXT_SETTINGS,
short_help="Lists the next run time for scheduled jobs.")
@click.option('--cluster-id', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS,
type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help,
required=False)
@click.option('--cluster-name', cls=OptionalOneOfOption, one_of=CLUSTER_OPTIONS,
type=ClusterIdClickType(), default=None, help=ClusterIdClickType.help,
required=False)
@click.option('--output', '-o', 'output_type', default=None,
help=OutputClickType.help, type=OutputClickType())
@debug_option
@profile_option
@eat_exceptions
@provide_api_client
def next_runs_cli(api_client, cluster_id, cluster_name, output_type):
"""
Lists the next run time for scheduled jobs.

Parameter options are specified in json and the format is documented in
https://docs.databricks.com/api/latest/jobs.html#jobsrunnow.
"""
jobs_data = list_all_jobs(api_client=api_client,
cluster_id=cluster_id, cluster_name=cluster_name)

output = get_next_runs(jobs_data=jobs_data)
if OutputClickType.is_json(output_type):
click.echo(pretty_format(output))
else:
click.echo(tabulate(_jobs_to_table(output), tablefmt='plain', disable_numparse=True))


@click.group(context_settings=CONTEXT_SETTINGS,
short_help='Utility to interact with jobs.')
@click.option('--version', '-v', is_flag=True, callback=print_version_callback,
Expand All @@ -199,8 +291,10 @@ def jobs_group(): # pragma: no cover


jobs_group.add_command(create_cli, name='create')
jobs_group.add_command(list_cli, name='list')
jobs_group.add_command(clone_cli, name='clone')
jobs_group.add_command(delete_cli, name='delete')
jobs_group.add_command(get_cli, name='get')
jobs_group.add_command(list_cli, name='list')
jobs_group.add_command(next_runs_cli, name='next-runs')
jobs_group.add_command(reset_cli, name='reset')
jobs_group.add_command(run_now_cli, name='run-now')
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
'tabulate>=0.7.7',
'six>=1.10.0',
'configparser>=0.3.5;python_version < "3.6"',
'cron-schedule-triggers>=0.0.11',
'tenacity>=6.2.0'
],
entry_points='''
[console_scripts]
Expand All @@ -57,6 +59,7 @@
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: Apache Software License',
'Development Status :: 5 - Production/Stable'
],
keywords='databricks cli',
url='https://github.com/databricks/databricks-cli'
Expand Down