Skip to content

Commit

Permalink
Refactor evaluation of entry message to be lazy.
Browse files Browse the repository at this point in the history
Now a worklog entry can be created and the hours of it accessed before parsing
the message into a task.
  • Loading branch information
icemac committed Dec 21, 2023
1 parent f76f390 commit 5c3d819
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 165 deletions.
250 changes: 144 additions & 106 deletions gtimelog2tick.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import itertools
import pathlib
import sys
from typing import Iterable
from typing import Iterable, Optional

import requests

Expand All @@ -17,25 +17,51 @@
Entry = collections.namedtuple('Entry', ('start', 'end', 'message'))
JiraWorkLog = collections.namedtuple('JiraWorkLog', ('id', 'start', 'end'))
TickSyncStatus = collections.namedtuple(
'TickSyncStatus', ('entry', 'json', 'action'))
Task = collections.namedtuple('Task', ('name', 'id'))
'TickSyncStatus', ('worklog', 'json', 'action'))


@dataclasses.dataclass
class Task:
name: str
id: int
project: Optional['Project'] = dataclasses.field(default=None, init=False)

@property
def title(self) -> str:
project_name = (
'<unknown project>' if self.project is None else self.project.name)
return f'{project_name}: {self.name}'


@dataclasses.dataclass
class Project:
name: str
id: int
tasks: list[Task] | None
_tasks: tuple[Task] = ()

def __post_init__(self) -> None:
# Set the project for the tasks set during __init__.
self.tasks = self._tasks

@property
def tasks(self) -> tuple[Task]:
return self._tasks

@tasks.setter
def tasks(self, value: tuple[Task]) -> None:
self._tasks = value
for task in value:
task.project = self


@dataclasses.dataclass
class WorkLog:
"""Entry in the work log."""

entry: Entry
text: str
task: str
task_id: int
config: dict
_text: str | None = dataclasses.field(default=None, init=False)
_task: Task | None = dataclasses.field(default=None, init=False)

def __post_init__(self) -> None:
self.start = self.entry.start
Expand All @@ -44,6 +70,82 @@ def __post_init__(self) -> None:
int((self.entry.end - self.entry.start).total_seconds()) / 3600,
2)

@property
def text(self) -> str:
if self._text is None:
self._parse_entry_message()
return self._text

@text.setter
def text(self, value: str) -> None:
self._text = value

@property
def task(self) -> Task:
if self._task is None:
self._parse_entry_message()
return self._task

@task.setter
def task(self, value: Task) -> None:
"""Create an arbitrary task without parsing the entry message."""
self._task = value

def _parse_entry_message(self) -> None:
"""Parse entry message into task and text and store them."""
msg = self.entry.message
try:
project_name, task_name, *text_parts = msg.split(':')
except ValueError:
raise DataError(
f'Error: Unable to split {msg!r}, it needs one colon or more.')

task_name = task_name.strip()
tick_projects = [
(x, x.name == project_name)
for x in self.config['tick_projects']
if x.name.startswith(project_name)]

if not tick_projects:
raise DataError(f'Cannot find a Tick project for {msg!r}.')
if len(tick_projects) > 1:
exact_match = [x for x, match in tick_projects if match]
if not exact_match:
raise DataError(
f'Found multiple Tick projects for {msg!r}, but no'
' exact match.'
f' ({", ".join(x[0].name for x in tick_projects)})')
tick_project = exact_match[0]
else:
tick_project = tick_projects[0][0]
if not tick_project.tasks:
raw_tasks = call(
self.config, 'get', f'/projects/{tick_project.id}/tasks.json')
tick_project.tasks = tuple(
Task(x['name'], x['id']) for x in raw_tasks)

possible_tasks = [
x
for x in tick_project.tasks
if x.name.startswith(task_name)]

if not possible_tasks:
raise DataError(f'Cannot find a Tick task for {msg!r}.')
if len(possible_tasks) > 1:
exact_match = [
task for task in possible_tasks if task.name == task_name]
if not exact_match:
raise DataError(
f'Found multiple Tick tasks for {msg!r}, but no'
' exact match.'
f' ({", ".join(x.name for x in tick_project.tasks)})')
task = exact_match[0]
else:
task = possible_tasks[0]

self._task = task
self._text = ':'.join(text_parts).strip()


class ConfigurationError(Exception):
pass
Expand Down Expand Up @@ -143,7 +245,7 @@ def read_config(config_file: pathlib.Path) -> dict:
if not raw_projects:
break
tick_projects.extend(
[Project(x['name'], x['id'], None) for x in raw_projects])
[Project(x['name'], x['id']) for x in raw_projects])
page += 1
config['tick_projects'] = tick_projects
return config
Expand Down Expand Up @@ -193,68 +295,6 @@ def read_timelog(
yield Entry(last, last, last_note)


def parse_entry_message(
config: dict,
message: str
) -> tuple[str, str, int | None]:
"""Parse entry message into "project: task", text and task_id."""
try:
project_name, task_name, *text_parts = message.split(':')
except ValueError:
# This can also happen if there is only a single line for a day, which
# is omitted later on, having too little entries is also handled later:
return (
'<no task>',
f'Error: Unable to split {message!r}, it needs one colon or more.',
None)
task_name = task_name.strip()

tick_projects = [
(x, x.name == project_name)
for x in config['tick_projects']
if x.name.startswith(project_name)]

if not tick_projects:
raise DataError(f'Cannot find a Tick project matching {message}.')
if len(tick_projects) > 1:
exact_match = [x for x, match in tick_projects if match]
if not exact_match:
raise DataError(
f'Found multiple Tick projects matching {message!r}, but no'
' exact match.'
f' ({", ".join(x[0].name for x in tick_projects)})')
tick_project = exact_match[0]
else:
tick_project = tick_projects[0][0]
if tick_project.tasks is None:
raw_tasks = call(
config, 'get', f'/projects/{tick_project.id}/tasks.json')
tick_project.tasks = [Task(x['name'], x['id']) for x in raw_tasks]

possible_tasks = [
x
for x in tick_project.tasks
if x.name.startswith(task_name)]

if not possible_tasks:
raise DataError(f'Cannot find a Tick task matching {message}.')
if len(possible_tasks) > 1:
exact_match = [
task for task in possible_tasks if task.name == task_name]
if not exact_match:
raise DataError(
f'Found multiple Tick tasks matching {message!r}, but no'
' exact match.'
f' ({", ".join(x.name for x in tick_project.tasks)})')
task = exact_match[0]
else:
task = possible_tasks[0]

task_name = f'{tick_project.name}: {task.name}'

return task_name, ':'.join(text_parts).strip(), task.id


def parse_timelog(
config: dict,
entries: Iterable[Entry],
Expand All @@ -270,11 +310,8 @@ def parse_timelog(
for x in config['requested_projects']):
continue

task, text, task_id = parse_entry_message(config, entry.message)
worklog = WorkLog(entry, text, task, task_id)
worklog = WorkLog(entry, config)
if worklog.hours > 0:
if worklog.task_id is None:
raise DataError(worklog.text)
yield worklog
elif worklog.hours < 0:
raise DataError(f'Negative hours: {worklog}')
Expand Down Expand Up @@ -351,67 +388,67 @@ def remove_tick_data(
entries = call(config, 'get', get_path)
for entry in entries:
date = datetime.datetime.strptime(entry['date'], '%Y-%m-%d')
sync_entry = WorkLog(
del_worklog = WorkLog(
Entry(date,
date + datetime.timedelta(hours=entry['hours']),
entry["id"]),
entry["notes"], '<unknown task name>', entry["task_id"]
)
entry["id"]), config={})
del_worklog.task = Task('<unknown task name>', entry["task_id"])
del_worklog.text = entry["notes"]
if dry_run:
yield TickSyncStatus(sync_entry, {}, 'delete (dry run)')
yield TickSyncStatus(del_worklog, {}, 'delete (dry run)')
else:
call(config, 'delete', f'/entries/{entry["id"]}.json', {204})
yield TickSyncStatus(sync_entry, {"id": entry["id"]}, 'delete')
yield TickSyncStatus(del_worklog, {"id": entry["id"]}, 'delete')


def add_tick_entry(
config: dict,
entry: WorkLog,
worklog: WorkLog,
dry_run: bool,
) -> Iterable[TickSyncStatus]:
"""Add a new tick entry."""
data = {
"date": entry.start.isoformat(),
"hours": entry.hours,
"notes": entry.text,
"task_id": entry.task_id,
"date": worklog.start.isoformat(),
"hours": worklog.hours,
"notes": worklog.text,
"task_id": worklog.task.id,
"user_id": config["user_id"],
}
if dry_run:
yield TickSyncStatus(entry, data, 'add (dry run)')
yield TickSyncStatus(worklog, data, 'add (dry run)')
else:
response = call(config, 'post', '/entries.json', {201}, data=data)
yield TickSyncStatus(entry, response, 'add')
yield TickSyncStatus(worklog, response, 'add')


def sync_with_tick(
config,
entries: Iterable[WorkLog],
worklogs: Iterable[WorkLog],
dry_run=False) -> Iterable[TickSyncStatus]:
def get_day(entry):
return entry.start.date()
for date, entries in itertools.groupby(entries, key=get_day):
for date, worklogs in itertools.groupby(worklogs, key=get_day):
yield from remove_tick_data(config, date, dry_run)
for entry in entries:
yield from add_tick_entry(config, entry, dry_run)
for worklog in worklogs:
yield from add_tick_entry(config, worklog, dry_run)


def log_tick_sync(
entries: Iterable[TickSyncStatus],
tick_sync_status_items: Iterable[TickSyncStatus],
ticklog) -> Iterable[TickSyncStatus]:
with ticklog.open('a') as f:
for entry, resp, action in entries:
comment = entry.text
for worklog, resp, action in tick_sync_status_items:
comment = worklog.text
f.write(','.join(map(str, [
get_now().isoformat(timespec='seconds'),
entry.start.isoformat(timespec='minutes'),
entry.hours,
worklog.start.isoformat(timespec='minutes'),
worklog.hours,
resp.get('id', ''),
action,
comment,
])) + '\n')

yield TickSyncStatus(entry, resp, action)
yield TickSyncStatus(worklog, resp, action)


class Date:
Expand All @@ -434,29 +471,30 @@ def __call__(self, value):


def show_results(
entries: Iterable[TickSyncStatus],
tick_sync_status_items: Iterable[TickSyncStatus],
stdout):
totals = {
'hours': collections.defaultdict(int),
'entries': collections.defaultdict(int),
}
print(file=stdout)
for entry, resp, action in entries:
for worklog, resp, action in tick_sync_status_items:
action = action.replace(' (dry run)', '')
if action == 'add':
print('ADD: {start} {amount:>8.2f}: {comment}'.format(
start=entry.start.isoformat(timespec='minutes'),
amount=entry.hours,
comment=entry.text,
start=worklog.start.isoformat(timespec='minutes'),
amount=worklog.hours,
comment=worklog.text,
), file=stdout)
totals['hours'][entry.task] += entry.hours
totals['entries'][entry.task] += 1
totals['hours'][worklog.task.title] += worklog.hours
totals['entries'][worklog.task.title] += 1
if totals['hours']:
print(file=stdout)
print('TOTALS:', file=stdout)
for task, hours in sorted(totals['hours'].items()):
entries = totals['entries'][task]
print(f'{task}: {hours:.2f} h in {entries} entries.', file=stdout)
num_entries = totals['entries'][task]
print(f'{task}: {hours:.2f} h in {num_entries} entries.',
file=stdout)


def _main(argv=None, stdout=sys.stdout):
Expand Down
Loading

0 comments on commit 5c3d819

Please sign in to comment.