Open
Description
Before submitting your bug report
- I believe this is a bug. I'll try to join the Continue Discord for questions
- I'm not able to find an open issue that reports the same bug
- I've seen the troubleshooting guide on the Continue Docs
Relevant environment info
- OS:Ubuntu 22.04.5 LTS
- Continue version:0.8.68
- IDE version:1.96.4
- Model:Qwen2.5-Coder-32B
- config.json:
{
"models": [
{
"model": "Qwen/Qwen2.5-Coder-32B-Instruct-GPTQ-Int8",
"title": "Qwen2.5-Coder-32B",
"apiBase": "internal-ip-address",
"provider": "openai",
"contextLength": 8192,
"completionOptions": {
"temperature": 0.7,
"topP": 0.8,
"topK": 20
},
"requestOptions": {
"extraBodyProperties": {
"repetition_penalty": 1.05
}
}
}
],
"tabAutocompleteModel": [
{
"model": "Qwen/Qwen2.5-Coder-7B",
"title": "Qwen2.5-Coder-7B",
"apiBase": "internal-ip-address",
"contextLength": 4096,
"provider": "openai",
"completionOptions": {
"temperature": 0.7,
"topP": 0.8
},
"requestOptions": {
"extraBodyProperties": {
"repetition_penalty": 1.05
}
}
}
],
"customCommands": [
{
"name": "test",
"prompt": "{{{ input }}}\n\nWrite a comprehensive set of unit tests for the selected code. It should setup, run tests that check for correctness including important edge cases, and teardown. Ensure that the tests are complete and sophisticated. Give the tests just as chat output, don't edit any file.",
"description": "Write unit tests for highlighted code"
}
],
"contextProviders": [
{
"name": "code",
"params": {}
},
{
"name": "docs",
"params": {}
},
{
"name": "diff",
"params": {}
},
{
"name": "terminal",
"params": {}
},
{
"name": "problems",
"params": {}
},
{
"name": "folder",
"params": {}
},
{
"name": "codebase",
"params": {}
}
],
"slashCommands": [
{
"name": "share",
"description": "Export the current chat session to markdown"
},
{
"name": "cmd",
"description": "Generate a shell command"
},
{
"name": "commit",
"description": "Generate a git commit message"
}
]
}
Description
When writing a doc string, code from the function for which the docstring should be created is removed
To reproduce
First the function looks like this:
After Right click -> Continue -> Write doc string the suggestion looks like this:
Log output
==========================================================================
==========================================================================
##### Completion options #####
{
"contextLength": 8192,
"temperature": 0.7,
"topP": 0.8,
"topK": 20,
"model": "Qwen/Qwen2.5-Coder-32B-Instruct-GPTQ-Int8",
"maxTokens": 4096,
"raw": true,
"prediction": {
"type": "content",
"content": " def validate_parameters(self) -> bool:\n all_params_exist = True\n for defined_param in self.params:\n if not hasattr(self, defined_param):\n all_params_exist = False\n\n return all_params_exist"
}
}
##### Request options #####
{
"extraBodyProperties": {
"repetition_penalty": 1.05
}
}
##### Prompt #####
The user has requested a section of code in a file to be rewritten.
This is the prefix of the file:
comm_type (CommunicationTypes): type of communication. Can be either publisher, subscriber, service client or service
param (str): parameter describing the topic a comm type, e.g. subscription listens to
Returns:
The communication object that belongs to the topic / service name
"""
def find_comm_type_by_name(name: str):
return next(
(
item["obj"]
for item in self.comm_objects
if item["name"] == name and item["comm_type"] == comm_type
),
None,
)
return find_comm_type_by_name(topic_name)
def find_comm_type_by_param(self, param: str) -> Dict:
return next(
(
item
for item in self.comm_objects
if "param" in item and item["param"] == param
),
None,
)
def _validate_comm_type(self, comm_type_dict: Dict[str, Any]) -> bool:
"""
Validates if the input dictionary has the required keys for the given communication type.
This method performs the following checks:
1. Ensures that the 'comm_type' key exists and its value is an instance of CommunicationTypes.
2. Retrieves the list of required keys for the specified communication type.
3. Verifies that all required keys are present in the input dictionary.
Parameters:
comm_type_dict (Dict[str, Any]): A dictionary containing the configuration for a communication type.
Returns:
bool: True if the input dictionary contains all required keys for the specified communication type, False otherwise.
Raises:
ValueError: If 'comm_type' is not an instance of CommunicationTypes or if the communication type is unknown.
Exception: Catches any other exceptions that may occur during validation and logs an error message.
"""
try:
comm_type = comm_type_dict.get("comm_type")
if not isinstance(comm_type, CommunicationTypes):
raise ValueError(
"Invalid 'comm_type'. It must be an instance of CommunicationTypes."
)
# Get the required keys for the given communication type
required_keys_for_type = self.required_keys.get(comm_type)
if required_keys_for_type is None:
raise ValueError(f"Unknown communication type: {comm_type}")
# Check if all required keys are present in the input dictionary
for key in required_keys_for_type:
if key not in comm_type_dict:
self.get_logger().error(
f"Missing required key: {key} for communication type: {comm_type}"
)
return False
# All checks passed
return True
except Exception as e:
print(f"Validation Error: {e}")
return False
def _activate_communication_objects(self):
"""
Activates communication objects according to their configuration.
This method iterates over the list of communication objects and validates their configuration.
If valid, it activates the communication object using the appropriate registration method.
Raises:
ValueError: If a communication object's configuration is not valid.
"""
for element in self.comm_objects:
if self._validate_comm_type(element):
element = self.activate[element["comm_type"]](element)
else:
raise ValueError(
f"{element} is not a valid configuration of a communication type"
)
def validate_comm_name(self, comm_name: str) -> str:
if comm_name[0] == "/":
return comm_name
else:
return f"/{comm_name}"
def _register_publisher(self, publisher: Dict):
"""
Registers a ROS publisher based on the provided configuration.
Parameters:
publisher (Dict): A dictionary containing the configuration for the publisher.
Returns:
Dict: The modified publisher dictionary including the created publisher object and timer.
"""
topic_name = self.validate_comm_name(publisher["name"])
publisher["obj"] = self.create_publisher(
publisher["msg_type"], f"{self.ns}{topic_name}", qos_profile=10
)
self.get_logger().info(f"Topic name: {publisher['obj'].topic_name}")
if "time_period" in publisher:
publisher["timer"] = self.create_timer(
publisher["time_period"],
self.__getattribute__(publisher["callback"]),
)
self.get_logger().info(f"Created publisher on topic {publisher['name']}")
return publisher
def _register_subscription(self, subscription: Dict):
"""
Registers a ROS subscription based on the provided configuration.
Parameters:
subscription (Dict): A dictionary containing the configuration for the subscription.
Returns:
Dict: The modified subscription dictionary including the created subscription object.
"""
topic_name = self.validate_comm_name(subscription["name"])
subscription["obj"] = self.create_subscription(
msg_type=subscription["msg_type"],
topic=f"{self.ns}{topic_name}",
callback=self.__getattribute__(subscription["callback"]),
qos_profile=10,
)
self.get_logger().info(f"Created subscription on topic {subscription['name']}")
return subscription
def _register_service(self, service: Dict):
"""
Registers a ROS service based on the provided configuration.
Parameters:
service (Dict): A dictionary containing the configuration for the service.
Returns:
Dict: The modified service dictionary including the created service object.
"""
service_name = self.validate_comm_name(service["name"])
service["obj"] = self.create_service(
srv_type=service["srv_type"],
srv_name=f"{self.ns}{service_name}",
callback=self.__getattribute__(service["callback"]),
)
self.get_logger().info(f"Created service with name {service['name']}")
return service
def _register_service_client(self, client: Dict):
"""
Registers a ROS service client based on the provided configuration.
Parameters:
client (Dict): A dictionary containing the configuration for the service client.
Returns:
Dict: The modified client dictionary including the created service client object.
"""
service_name = self.validate_comm_name(client["name"])
client["obj"] = self.create_client(
srv_type=client["srv_type"], srv_name=f"{self.ns}{service_name}"
)
self.get_logger().info(f"Created client for {client['name']}")
return client
def _register_actions(self):
"""
Placeholder for registering action servers. Currently not implemented.
This method logs a warning message indicating that the registration of action servers is not implemented.
"""
self.get_logger().warning(
"Registering action servers is currently not implemented"
)
def _register_action_clients(self):
"""
Placeholder for registering action clients. Currently not implemented.
"""
self.get_logger().warning(
"Registering action clients is currently not implemented"
)
def _unregister_communication(self, force: bool = False):
"""
Unregisters communication objects based on their configuration.
This method iterates over the list of communication objects and destroys their associated ROS entities.
If `force` is True, it will destroy all communication objects, otherwise, it will only destroy those
that are not marked as 'always_on'.
Parameters:
force (bool): If True, forces the destruction of all communication objects regardless of 'always_on'. Defaults to False.
"""
for comm_obj in self.comm_objects:
if comm_obj["always_on"] is False or force is True:
self.destroy[comm_obj["comm_type"]](comm_obj["obj"])
if (
comm_obj["comm_type"] == CommunicationTypes.PUBLISHER
and "timer" in comm_obj
):
self.destroy_timer(comm_obj["timer"])
def on_configure(self, state: State) -> TransitionCallbackReturn:
"""
Lifecycle callback for when the node enters the 'configuring' state.
:param state: The current state of the node
:return: The result of the transition
"""
self.logger.debug("I am configured now")
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
"""
Lifecycle callback for when the node enters the 'activating' state.
:param state: The current state of the node
:return: The result of the transition
"""
self._activate_communication_objects()
self.logger.debug("I am activated now")
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
"""
Lifecycle callback for when the node enters the 'deactivating' state.
:param state: The current state of the node
:return: The result of the transition
"""
self._unregister_communication()
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
"""
Lifecycle callback for when the node enters the 'shutting down' state.
:param state: The current state of the node
:return: The result of the transition
"""
self._unregister_communication(force=True)
return TransitionCallbackReturn.SUCCESS
def initialize_parameters(self, config_file: str):
"""
Initializes the node's parameters from a configuration file.
Parameters:
config_file (str): The path to the YAML configuration file containing the parameters.
This method reads the configuration file, extracts the parameters specific to the node's name,
and declares these parameters in the node.
"""
with open(config_file, "r") as f:
self.params = yaml.load(f)[self.get_name()]["ros__parameters"]
self.declare_parameters(namespace=None, parameters=tuple(self.params.items()))
This is the suffix of the file:
def parameter_change_callback(self, params: List) -> SetParametersResult:
"""
Callback function to handle changes in parameters.
This method processes a list of parameter changes. For each parameter, it checks if the parameter name
contains 'topic' or 'service'. If it does, the method calls `handle_comm_change` to manage the change.
Otherwise, it calls `handle_parameter_value_change` to handle the change.
If any parameter change fails, the method stops processing further changes and returns a failure result.
Parameters:
params (List[Parameter]): A list of parameters that have changed.
Returns:
SetParametersResult: An object indicating whether the parameter changes were successful.
"""
successful = True
for param in params:
if "topic" in param.name or "service" in param.name:
successful = self.handle_comm_change(param)
else:
successful = self.handle_parameter_value_change(param)
if not successful:
# TODO known issue: If a list of parameters comes and only one of them would fail, why stopping here?
# Not handling this case right now, as this case should not come up
break
# Return success message to indicate the parameters were set successfully
return SetParametersResult(successful=successful)
def handle_comm_change(self, param: Parameter) -> bool:
self.handle_parameter_value_change(param)
success = True
item = self.find_comm_type_by_param(param.name)
self.destroy[item['comm_type']](item['obj'])
item['name'] = param.value
item = self.activate[item["comm_type"]](item)
self.logger.debug(f"Handling {param} comm change was successfull: {success}")
return success
def handle_parameter_value_change(self, param: Parameter) -> bool:
success = True
if hasattr(self, param.name):
setattr(self, param.name, param.value)
else:
success = False
self.logger.debug(f"Handling {param} value change was successfull: {success}")
return success
This is the code to rewrite:
def validate_parameters(self) -> bool:
all_params_exist = True
for defined_param in self.params:
if not hasattr(self, defined_param):
all_params_exist = False
return all_params_exist
The user's request is: "Write a docstring for this code. Do not change anything about the code itself."
Here is the rewritten code: